From 3c5db3c1fdcc3ce8085a83633f5e145f03393b04 Mon Sep 17 00:00:00 2001 From: "tv0924@icloud.com" Date: Thu, 26 Jun 2025 15:20:30 +0900 Subject: [PATCH] =?UTF-8?q?[Update]=20=EC=9E=90=EB=8F=99=20=EC=98=A4?= =?UTF-8?q?=ED=83=90=20=EA=B2=80=EC=A6=9D=EC=9D=84=20=EC=9C=84=ED=95=9C=20?= =?UTF-8?q?=EB=9D=BC=EC=9A=B0=ED=84=B0=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 17 +++++- addon/csrf_check.py | 12 ++--- addon/init.py | 32 +++++++++--- lib/false_true_varifing_task.py | 59 +++++++++++++++++++++ runner/backend/__init__.py | 91 ++++++++++++++++++++++++++++++--- 5 files changed, 188 insertions(+), 23 deletions(-) create mode 100644 lib/false_true_varifing_task.py diff --git a/README.md b/README.md index 2beaafe..8e15d4c 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ venv와 패키지가 설치가 됩니다. > 그렇지 않으면 실행되지 않습니다. > > 윈도우 환경에서는 `sudo certutil -addstore root mitmproxy-ca-cert.cer`로 인증합니다. -> +> > Sudo가 활성화되어있지 않은 환경에서는 관리자로 상향된 쉘에서 실행합니다. > > MacOS 환경에서는 `sudo security add-trusted-cert -d -p ssl -p basic -k /Library/Keychains/System.keychain ~/.mitmproxy/mitmproxy-ca-cert.pem`으로 인증합니다. @@ -58,8 +58,15 @@ class LoggerAddon: self.checker = Example() def request(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것 - self.checker.test(flow) + # 오탐 검사하고 있을때는 검증하지 않음 + if false_true_varifing_task.is_verifing_false_true(): + return + self.checker.test(flow) + def response(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것 + # 오탐 검사하고 있을때는 검증하지 않음 + if false_true_varifing_task.is_verifing_false_true(): + return self.checker.test(flow) ``` @@ -88,3 +95,9 @@ class Example: ``` 이러한 예제를 참고하여 작성하여주세요. + +# 백엔드 API DOCS + +``` +http://localhost:11081/redoc +``` diff --git a/addon/csrf_check.py b/addon/csrf_check.py index 34cd700..c64ec98 100644 --- a/addon/csrf_check.py +++ b/addon/csrf_check.py @@ -76,10 +76,10 @@ class CsrfChecker: resp_nonce = self.get_query_param(loc, param) if param else None if resp_nonce is None: - report_vuln(title="CSRF Risk", desc="Missing nonce in redirect response", status="HIGH", uri=flow.request.url) + report_vuln(title="CSRF Risk", desc="Missing nonce in redirect response", status="CRITICAL", uri=flow.request.url) return 1 if orig_nonce != resp_nonce: - report_vuln(title="CSRF Risk", desc="Nonce mismatch request↔response", status="MEDIUM", uri=flow.request.url) + report_vuln(title="CSRF Risk", desc="Nonce mismatch request↔response", status="HIGH", uri=flow.request.url) return 1 return 0 @@ -103,11 +103,11 @@ class CsrfChecker: if new_nonce is None: return 0 if new_nonce == orig_nonce: - report_vuln(title="CSRF Risk", desc="Nonce reused without cookies", status="HIGH", uri=flow.request.url) + report_vuln(title="CSRF Risk", desc="Nonce reused without cookies", status="CRITICAL", uri=flow.request.url) return 1 # (2) 두 번의 리다이렉트 비교 - async with httpx.AsyncClient(follow_redirects=False) as cli: + async with httpx.AsyncClient(follow_redirects=True) as cli: # 원본 쿼리 req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers) # nonce 교체 쿼리 @@ -120,7 +120,7 @@ class CsrfChecker: and urlparse(req1.headers.get("location", "")).path == urlparse(req2.headers.get("location", "")).path ): - report_vuln(title="CSRF Risk", desc="Identical redirects on nonce swap → potential CSRF", status="MEDIUM", uri=flow.request.url) + report_vuln(title="CSRF Risk", desc="Identical redirects on nonce swap → potential CSRF", status="NOT-VERIFIED-HIGH", uri=flow.request.url) return 1 return 0 @@ -130,7 +130,7 @@ class CsrfChecker: # 1) 요청에 nonce 없으면 if is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow): - report_vuln(title="CSRF Risk", desc="Missing nonce in OAuth request", status="HIGH", uri=flow.request.url) + report_vuln(title="CSRF Risk", desc="Missing nonce in OAuth request", status="CRITICAL", uri=flow.request.url) return # 2) 리다이렉트에서 nonce 검사 diff --git a/addon/init.py b/addon/init.py index fb69258..634d344 100644 --- a/addon/init.py +++ b/addon/init.py @@ -9,6 +9,10 @@ from access_token import AccessTokenScanner from addon.google_login_hint import GoogleLoginHint import os from dotenv import load_dotenv +from lib.false_true_varifing_task import FalseTrueVarifingTask + +# Initialize the singleton task manager +false_true_varifing_task = FalseTrueVarifingTask() load_dotenv(override=True) @@ -21,6 +25,10 @@ class PKCEAddon: f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}" ) try: + # 오탐 검사하고 있을때는 검증하지 않음 + if false_true_varifing_task.is_verifing_false_true(): + return + await self.checker.test(flow) except Exception as e: print(f"[ERROR] Addon failed: {e}") @@ -33,6 +41,9 @@ class CsrfAddon: async def response(self, flow: http.HTTPFlow): try: + # 오탐 검사하고 있을때는 검증하지 않음 + if false_true_varifing_task.is_verifing_false_true(): + return await self.checker.response(flow) except Exception as e: print(f"[ERROR] CSRF Addon failed: {e}") @@ -42,21 +53,18 @@ class CsrfAddon: class ScopeAddon: def __init__(self): self.checker = ScopeDetection() - self._flow_map = {} # 요청 정보를 저장 - - async def request(self, flow: http.HTTPFlow): - self._flow_map[flow.id] = { - "method": flow.request.method, - "url": flow.request.pretty_url, - "query": flow.request.query, - } async def response(self, flow: http.HTTPFlow): try: + # 오탐 검사하고 있을때는 검증하지 않음 + if false_true_varifing_task.is_verifing_false_true(): + return await self.checker.test(flow) except Exception as e: print(f"[ERROR] ScopeDetection failed: {e}") + + class NonceAddon: def __init__(self): self.checker = NonceChecker() @@ -70,12 +78,17 @@ class NonceAddon: print(f"[ERROR] NonceAddon failed: {e}") pass + + class AccessTokenAddon: def __init__(self): self.checker = AccessTokenScanner() async def response(self, flow: http.HTTPFlow): try: + # 오탐 검사하고 있을때는 검증하지 않음 + if false_true_varifing_task.is_verifing_false_true(): + return await self.checker.scan(flow) except Exception as e: print(f"[ERROR] AccessToken Addon failed: {e}") @@ -88,6 +101,9 @@ class RedirectBypassAddon: # request 대신 response 로 바꿔 보세요: async def response(self, flow: http.HTTPFlow): try: + # 오탐 검사하고 있을때는 검증하지 않음 + if false_true_varifing_task.is_verifing_false_true(): + return await self.checker.test(flow) except Exception as e: print(f"[ERROR] RedirectBypass Addon failed: {e}") diff --git a/lib/false_true_varifing_task.py b/lib/false_true_varifing_task.py new file mode 100644 index 0000000..62865e1 --- /dev/null +++ b/lib/false_true_varifing_task.py @@ -0,0 +1,59 @@ +from typing import Any +from copy import deepcopy + +class FalseTrueVarifingTask: + """ + A singleton class representing a task that can be either false or true. + This class is used to handle tasks that require verification of their truth value. + """ + + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(FalseTrueVarifingTask, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + self._is_verifing = False + self.task_queue = [] + self._initialized = True + + def reset(self): + """ + Reset the task queue and verification status. + """ + self._is_verifing = False + self.task_queue.clear() + + def add_task(self, task_name: str, data: Any): + """ + Add a task to the task queue. + :param task: The task to be added. + """ + self.task_queue.append({ + "task_name": task_name, + "data": data}) + + def start_verification(self): + """ + Start the verification process for the tasks in the queue. + """ + self._is_verifing = True + + def get_task_queue(self): + """ + Get a copy of the current task queue. + :return: A copy of the task queue. + """ + return deepcopy(self.task_queue) + + def is_verifing_false_true(self): + """ + Get the current verification status. + :return: True if verification is in progress, False otherwise. + """ + return self._is_verifing diff --git a/runner/backend/__init__.py b/runner/backend/__init__.py index 015c483..1410f52 100644 --- a/runner/backend/__init__.py +++ b/runner/backend/__init__.py @@ -1,17 +1,94 @@ from fastapi import FastAPI, Query, HTTPException from fastapi.responses import Response import lib.cur_target_url as cur_target_url +from lib.false_true_varifing_task import FalseTrueVarifingTask +from pydantic import BaseModel, Field +from lib.report_vuln import report_vuln as save_vuln + +# Initialize the singleton task manager +false_true_varifing_task = FalseTrueVarifingTask() app = FastAPI() -@app.post("/start") -async def start(url: str = Query(None)): - if url: - cur_target_url.save(url) - print(f"Target URL set to: {url}") - return {"message": f"Target URL set to: {url}"} - return {"error": "No URL provided"} + +@app.post( + "/start", + summary="취약점 검증을 위한 대상 URL 설정", + description=""" +이 엔드포인트는 시스템이 취약점 검증 작업에 사용할 대상 URL을 설정합니다. + +유효한 URL이 제공되면: +- 해당 URL이 저장됩니다. +- 검증 작업 큐가 초기화됩니다. +- 새로운 검증 작업을 시작할 준비가 완료됩니다. + +URL이 제공되지 않으면, 오류가 반환됩니다. +""" +, + tags=["1st STEP"] +) +async def start(url: str = Query(..., description="The URL to target for vulnerability verification")): + cur_target_url.save(url) + false_true_varifing_task.reset() + return {"message": f"Target URL set to: {url}"} + + + + + + +@app.post( + "/start-false-true-verifing", + summary="시스템에 False-True 검증 작업 시작을 알림", + description=""" +이 엔드포인트는 시스템에 False-True 방식의 검증 작업을 시작하도록 지시합니다. +또한 검증을 위해 준비된 작업 목록을 반환합니다. +""", + tags=["2nd STEP"] + ) +async def start_false_true_verifing(): + false_true_varifing_task.start_verification() + task_queue = false_true_varifing_task.get_task_queue() + return {"payload": task_queue} + + + + + + +class VulnerabilityReport(BaseModel): + title: str = Field(..., description="Short title for the vulnerability") + url: str = Field(..., description="URL where the vulnerability was discovered") + status: str = Field(..., description="Status of the vulnerability (e.g., VERIFIED-CRITICAL)") + desc: str = Field(..., description="Detailed description of the issue") + +@app.post( + "/report-vuln", + summary="취약점 보고", + description=""" +정탐인 취약점을 시스템에 보고합니다. +보고 시 다음 정보를 포함해야 합니다: + +- **title**: 취약점의 간단한 이름 +- **url**: 취약점이 발견된 위치 (URL) +- **status**: 심각도 +- **desc**: 취약점에 대한 상세 설명 +""", + tags=["3rd STEP"] +) +async def report_vuln(vuln: VulnerabilityReport): + save_vuln( + title=vuln.title, + desc=vuln.desc, + status=vuln.status, + uri=vuln.url + ) + + return {"message": "Vulnerability reported successfully"} + + + @app.exception_handler(404)