From 062552d3d8da29c4f8ef87003c1f1a9cdd41afa9 Mon Sep 17 00:00:00 2001 From: "tv0924@icloud.com" Date: Thu, 26 Jun 2025 10:43:52 +0900 Subject: [PATCH 1/7] =?UTF-8?q?[Refactor]=20=EB=A6=AC=ED=8C=A9=ED=84=B0?= =?UTF-8?q?=EB=A7=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 ++ addon/access_token.py | 6 +++--- addon/csrf_check.py | 6 +++--- addon/{GoogleLoginHint.py => google_login_hint.py} | 0 addon/init.py | 4 ++-- addon/nonce_check.py | 6 +++--- addon/pkce_check.py | 6 +++--- addon/redirect_uri_check.py | 6 +++--- addon/{ScopeDetection.py => scope_detection.py} | 6 +++--- lib/{target.py => cur_target_url.py} | 0 lib/{report.py => report_vuln.py} | 0 runner/backend/__init__.py | 4 ++-- 12 files changed, 24 insertions(+), 22 deletions(-) rename addon/{GoogleLoginHint.py => google_login_hint.py} (100%) rename addon/{ScopeDetection.py => scope_detection.py} (92%) rename lib/{target.py => cur_target_url.py} (100%) rename lib/{report.py => report_vuln.py} (100%) diff --git a/.gitignore b/.gitignore index 225a08d..c9116f2 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,8 @@ wheels/ # Virtual environments .venv +.env + data/ diff --git a/addon/access_token.py b/addon/access_token.py index 6f1c8bb..228cd69 100644 --- a/addon/access_token.py +++ b/addon/access_token.py @@ -4,8 +4,8 @@ from typing import List, Dict, Optional, Any from mitmproxy.http import HTTPFlow -import lib.target as target -from lib.report import save_report +import lib.cur_target_url as cur_target_url +from lib.report_vuln import save_report # 결과 리포트 저장용 데이터 클래스 @dataclass @@ -32,7 +32,7 @@ class AccessTokenScanner: findings.extend(await self._scan_response(flow.response, flow.request.url)) if findings: - target_value = target.load() + target_value = cur_target_url.load() save_report([f.to_report(target_value) for f in findings]) # 내부 구현 diff --git a/addon/csrf_check.py b/addon/csrf_check.py index 4e07407..564e337 100644 --- a/addon/csrf_check.py +++ b/addon/csrf_check.py @@ -4,8 +4,8 @@ from urllib.parse import urlparse, parse_qs, unquote import httpx from typing import Optional, Union, List -import lib.target as target -from lib.report import save_report +import lib.cur_target_url as cur_target_url +from lib.report_vuln import save_report class CsrfChecker: nonce_params = { @@ -153,7 +153,7 @@ class CsrfChecker: desc = " | ".join(msgs) status = "MEDIUM" report_data = [{ - 'target': target.load(), + 'target': cur_target_url.load(), 'status': status, 'title': "CSRF Risk", 'description': desc, diff --git a/addon/GoogleLoginHint.py b/addon/google_login_hint.py similarity index 100% rename from addon/GoogleLoginHint.py rename to addon/google_login_hint.py diff --git a/addon/init.py b/addon/init.py index c54a7fc..3ce221d 100644 --- a/addon/init.py +++ b/addon/init.py @@ -1,12 +1,12 @@ from mitmproxy import http import asyncio from pkce_check import PKCEDowngradeChecker -from ScopeDetection import ScopeDetection +from addon.scope_detection import ScopeDetection from csrf_check import CsrfChecker from nonce_check import NonceChecker from redirect_uri_check import RedirectBypassChecker from access_token import AccessTokenScanner -from GoogleLoginHint import GoogleLoginHint +from addon.google_login_hint import GoogleLoginHint import os from dotenv import load_dotenv diff --git a/addon/nonce_check.py b/addon/nonce_check.py index c0af077..e252c86 100644 --- a/addon/nonce_check.py +++ b/addon/nonce_check.py @@ -3,8 +3,8 @@ from urllib.parse import urlparse, parse_qs from typing import Union import httpx -import lib.target as target -from lib.report import save_report +import lib.cur_target_url as cur_target_url +from lib.report_vuln import save_report class NonceChecker: def is_oidc_flow(self, flow) -> bool: @@ -76,7 +76,7 @@ class NonceChecker: url = req.pretty_url if not nonce: report_data = [{ - 'target': target.load(), + 'target': cur_target_url.load(), 'status': "CRITICAL", 'title': "nonce is missing in id_token", 'description': "Nonce is present in the request but missing in the id_token.", diff --git a/addon/pkce_check.py b/addon/pkce_check.py index cac1693..afd4df9 100644 --- a/addon/pkce_check.py +++ b/addon/pkce_check.py @@ -3,8 +3,8 @@ import asyncio import httpx from typing import Dict, List -import lib.target as target -from lib.report import save_report +import lib.cur_target_url as cur_target_url +from lib.report_vuln import save_report class PKCEDowngradeChecker: @@ -170,7 +170,7 @@ class PKCEDowngradeChecker: self, status: str, title: str, description: str, uri: str ) -> Dict[str, str]: return { - "target": target.load(), + "target": cur_target_url.load(), "status": status, "title": title, "description": description, diff --git a/addon/redirect_uri_check.py b/addon/redirect_uri_check.py index 8acc4dd..43df4cb 100644 --- a/addon/redirect_uri_check.py +++ b/addon/redirect_uri_check.py @@ -4,8 +4,8 @@ import asyncio import random import time from urllib.parse import urlparse, parse_qs, urlencode, urlunparse -import lib.target as target -from lib.report import save_report +import lib.cur_target_url as cur_target_url +from lib.report_vuln import save_report class RedirectRateLimiter: """redirect_uri_check 전용 rate limiter""" @@ -1385,7 +1385,7 @@ class RedirectBypassChecker: ) report_data = [{ - "target": target.load(), + "target": cur_target_url.load(), "status": "CRITICAL", "title": "Redirect URI Bypass Vulnerability", "description": description, diff --git a/addon/ScopeDetection.py b/addon/scope_detection.py similarity index 92% rename from addon/ScopeDetection.py rename to addon/scope_detection.py index a955aeb..2d14049 100644 --- a/addon/ScopeDetection.py +++ b/addon/scope_detection.py @@ -1,5 +1,5 @@ -import lib.target as target -from lib.report import save_report +import lib.cur_target_url as cur_target_url +from lib.report_vuln import save_report class ScopeDetection: def get_scope_from_query(self, query: str) -> str | None: @@ -44,7 +44,7 @@ class ScopeDetection: if result != 0: report_data = [{ - 'target': target.load(), + 'target': cur_target_url.load(), 'status': "WARNING", 'title': "OAuth scope value issue", 'description': f"{method} {url}: {', '.join(result)}", diff --git a/lib/target.py b/lib/cur_target_url.py similarity index 100% rename from lib/target.py rename to lib/cur_target_url.py diff --git a/lib/report.py b/lib/report_vuln.py similarity index 100% rename from lib/report.py rename to lib/report_vuln.py diff --git a/runner/backend/__init__.py b/runner/backend/__init__.py index 4b60e63..015c483 100644 --- a/runner/backend/__init__.py +++ b/runner/backend/__init__.py @@ -1,6 +1,6 @@ from fastapi import FastAPI, Query, HTTPException from fastapi.responses import Response -import lib.target as target +import lib.cur_target_url as cur_target_url app = FastAPI() @@ -8,7 +8,7 @@ app = FastAPI() @app.post("/start") async def start(url: str = Query(None)): if url: - target.save(url) + cur_target_url.save(url) print(f"Target URL set to: {url}") return {"message": f"Target URL set to: {url}"} return {"error": "No URL provided"} From 3a1422a2f2be39838f099f15ce92e18ca2646c37 Mon Sep 17 00:00:00 2001 From: "tv0924@icloud.com" Date: Thu, 26 Jun 2025 12:20:41 +0900 Subject: [PATCH 2/7] [Update] save vuln report logic --- addon/access_token.py | 87 +++++++++++++------------------------ addon/csrf_check.py | 58 ++++++++----------------- addon/init.py | 2 +- addon/nonce_check.py | 20 +++------ addon/pkce_check.py | 57 +++++++----------------- addon/redirect_uri_check.py | 37 ++++++++-------- addon/scope_detection.py | 16 +++---- lib/report_vuln.py | 22 +++++----- lib/utils/is_oauth_uri.py | 10 +++++ 9 files changed, 120 insertions(+), 189 deletions(-) create mode 100644 lib/utils/is_oauth_uri.py diff --git a/addon/access_token.py b/addon/access_token.py index 228cd69..9c18bb6 100644 --- a/addon/access_token.py +++ b/addon/access_token.py @@ -1,23 +1,11 @@ import re from dataclasses import dataclass, asdict from typing import List, Dict, Optional, Any +import asyncio from mitmproxy.http import HTTPFlow -import lib.cur_target_url as cur_target_url -from lib.report_vuln import save_report - -# 결과 리포트 저장용 데이터 클래스 -@dataclass -class TokenLeakResult: - title: str - description: str - uri: str - status: str = "MEDIUM" # 기본 상태 - - def to_report(self, target_value) -> Dict[str, str]: - """리포트 저장 포맷(dict)으로 변환""" - return {"target": target_value, **asdict(self)} +from lib.report_vuln import report_vuln # 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너 @@ -26,31 +14,25 @@ class AccessTokenScanner: async def scan(self, flow: HTTPFlow) -> None: """단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사.""" print(f"[TOKENDEBUG] Request URL: {flow.request.url}") - findings: List[TokenLeakResult] = [] - - findings.extend(await self._scan_request(flow.request)) - findings.extend(await self._scan_response(flow.response, flow.request.url)) - - if findings: - target_value = cur_target_url.load() - save_report([f.to_report(target_value) for f in findings]) + + async_gather = [] + async_gather.append(self._scan_request(flow.request)) + async_gather.append(self._scan_response(flow.response, flow.request.url)) + await asyncio.gather(*async_gather) # 내부 구현 - async def _scan_request(self, request: Any) -> List[TokenLeakResult]: - results: List[TokenLeakResult] = [] + async def _scan_request(self, request: Any): print("[TOKENDEBUG] ==scan request==") # URL 검사 token_result = self._extract_token(request.url) if token_result: token, has_bearer = token_result - results.append( - TokenLeakResult( - title="Token Leak in Request URL", - description=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…", - uri=request.url, - status="MEDIUM" if has_bearer else "LOW" - ) + report_vuln( + title="Token Leak in Request URL", + desc=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…", + status="MEDIUM" if has_bearer else "LOW", + uri=request.url ) # Body 검사 (텍스트 컨텐츠인 경우) @@ -59,22 +41,17 @@ class AccessTokenScanner: token_result = self._extract_token(body_text) if token_result: token, has_bearer = token_result - results.append( - TokenLeakResult( - title="Token Leak in Request Body", - description=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…", - uri=request.url, - status="MEDIUM" if has_bearer else "LOW" - ) + report_vuln( + title="Token Leak in Request Body", + desc=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…", + status="MEDIUM" if has_bearer else "LOW", + uri=request.url ) - return results - - async def _scan_response(self, response: Optional[Any], request_url: str) -> List[TokenLeakResult]: + async def _scan_response(self, response: Optional[Any], request_url: str): if response is None: - return [] + return - results: List[TokenLeakResult] = [] print("[TOKENDEBUG] ==scan response==") # Location 헤더 검사 (리다이렉트) if location_header := response.headers.get("Location"): @@ -82,12 +59,11 @@ class AccessTokenScanner: if token_result: token, has_bearer = token_result if has_bearer: - results.append( - TokenLeakResult( - title="Token Leak in Redirect URL (Location header)", - description=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…", - uri=location_header, - ) + report_vuln( + title="Token Leak in Redirect URL (Location header)", + desc=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…", + status="MEDIUM", + uri=location_header, ) # Body 검사 (텍스트 컨텐츠인 경우) @@ -97,16 +73,13 @@ class AccessTokenScanner: if token_result: token, has_bearer = token_result if has_bearer: - results.append( - TokenLeakResult( - title="Token Leak in Response Body", - description=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…", - uri=request_url, - ) + report_vuln( + title="Token Leak in Response Body", + desc=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…", + status="MEDIUM", + uri=request_url, ) - return results - # 토큰 탐지 키워드드 _TOKEN_KEYS = [ "access_token", diff --git a/addon/csrf_check.py b/addon/csrf_check.py index 564e337..34cd700 100644 --- a/addon/csrf_check.py +++ b/addon/csrf_check.py @@ -4,23 +4,14 @@ from urllib.parse import urlparse, parse_qs, unquote import httpx from typing import Optional, Union, List -import lib.cur_target_url as cur_target_url -from lib.report_vuln import save_report +from lib.report_vuln import report_vuln +from lib.utils.is_oauth_uri import is_oauth_uri class CsrfChecker: nonce_params = { "state", "nonce", "as", "frame_id", "csrf_token", "csrf" } - def is_oauth_uri(self, uri: str) -> bool: - qs = parse_qs(urlparse(uri).query) - qs_keys = [*qs] - - if "client_id" in qs_keys and any(p in qs_keys for p in ( - "redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")): - return True - return False - def get_header(self, headers: http.Headers, name: str) -> Optional[str]: # mitmproxy Headers는 case-insensitive raw = headers.get(name) @@ -40,7 +31,7 @@ class CsrfChecker: def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool: code = flow.response.status_code loc = self.get_header(flow.response.headers, "location") or "" - return 300 <= code < 400 and self.is_oauth_uri(loc) + return 300 <= code < 400 and is_oauth_uri(loc) def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool: qs = parse_qs(urlparse(flow.request.url).query) @@ -71,10 +62,10 @@ class CsrfChecker: headers=headers, content=flow.request.get_content(), ) - + def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]: # ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답 - if not (self.is_oauth_uri(flow.request.url) + if not (is_oauth_uri(flow.request.url) and self.check_nonce_in_request(flow) and self.is_oauth_redirect(flow)): return 0 @@ -85,18 +76,19 @@ class CsrfChecker: resp_nonce = self.get_query_param(loc, param) if param else None if resp_nonce is None: - return ["Missing nonce in redirect"] + report_vuln(title="CSRF Risk", desc="Missing nonce in redirect response", status="HIGH", uri=flow.request.url) + return 1 if orig_nonce != resp_nonce: - return ["Nonce mismatch request↔response"] + report_vuln(title="CSRF Risk", desc="Nonce mismatch request↔response", status="MEDIUM", uri=flow.request.url) + return 1 return 0 async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]: # OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사 - if self.is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow): + if is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow): return 0 - loc0 = self.get_header(flow.response.headers, "location") or "" param = self.find_nonce_param(loc0) or "state" qs0 = parse_qs(urlparse(loc0).query) @@ -111,7 +103,8 @@ class CsrfChecker: if new_nonce is None: return 0 if new_nonce == orig_nonce: - return ["Nonce reused without cookies"] + report_vuln(title="CSRF Risk", desc="Nonce reused without cookies", status="HIGH", uri=flow.request.url) + return 1 # (2) 두 번의 리다이렉트 비교 async with httpx.AsyncClient(follow_redirects=False) as cli: @@ -127,42 +120,25 @@ class CsrfChecker: and urlparse(req1.headers.get("location", "")).path == urlparse(req2.headers.get("location", "")).path ): - return ["Identical redirects on nonce swap → potential CSRF"] + report_vuln(title="CSRF Risk", desc="Identical redirects on nonce swap → potential CSRF", status="MEDIUM", uri=flow.request.url) + return 1 return 0 async def response(self, flow: http.HTTPFlow) -> None: try: - msgs: List[str] = [] # 1) 요청에 nonce 없으면 - if self.is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow): - msgs.append("Missing state/nonce in request") + if is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow): + report_vuln(title="CSRF Risk", desc="Missing nonce in OAuth request", status="HIGH", uri=flow.request.url) + return # 2) 리다이렉트에서 nonce 검사 r1 = self.check_redirect_nonce(flow) - if r1: - msgs.extend(r1 if isinstance(r1, list) else []) # 3) nonce 재사용 검사 r2 = await self.check_nonce_reuse(flow) - if r2: - msgs.extend(r2 if isinstance(r2, list) else []) - if msgs: - desc = " | ".join(msgs) - status = "MEDIUM" - report_data = [{ - 'target': cur_target_url.load(), - 'status': status, - 'title': "CSRF Risk", - 'description': desc, - 'uri': flow.request.url, - }] - save_report(report_data) - print(f"[INFO] CSRF Check: {desc}") - else: - pass except Exception as e: print(f"[ERROR] CSRF Check failed: {e}") return diff --git a/addon/init.py b/addon/init.py index 3ce221d..fb69258 100644 --- a/addon/init.py +++ b/addon/init.py @@ -107,4 +107,4 @@ class GoogleLoginHintAddon(): except Exception as e: print(f"[ERROR] GoogleLoginHint Addon failed: {e}") -addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), AccessTokenAddon(), RedirectBypassAddon(), GoogleLoginHintAddon()] +addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), AccessTokenAddon(), GoogleLoginHintAddon(), RedirectBypassAddon()] diff --git a/addon/nonce_check.py b/addon/nonce_check.py index e252c86..1df6beb 100644 --- a/addon/nonce_check.py +++ b/addon/nonce_check.py @@ -1,10 +1,8 @@ import jwt from urllib.parse import urlparse, parse_qs from typing import Union -import httpx -import lib.cur_target_url as cur_target_url -from lib.report_vuln import save_report +from lib.report_vuln import report_vuln class NonceChecker: def is_oidc_flow(self, flow) -> bool: @@ -72,17 +70,13 @@ class NonceChecker: def check_nonce_in_id_token(self, flow, id_token: str) -> bool: decoded = self.decode_id_token(id_token) nonce = decoded.get("nonce") - req = flow.request - url = req.pretty_url if not nonce: - report_data = [{ - 'target': cur_target_url.load(), - 'status': "CRITICAL", - 'title': "nonce is missing in id_token", - 'description': "Nonce is present in the request but missing in the id_token.", - 'uri': f"Original: {url}\nDecoded ID Token: {decoded}", - }] - save_report(report_data) + report_vuln( + title="Nonce Check Failed", + desc="id_token에 nonce가 없습니다.", + status="HIGH", + uri=flow.request.url + ) return False else: return True diff --git a/addon/pkce_check.py b/addon/pkce_check.py index afd4df9..3466edc 100644 --- a/addon/pkce_check.py +++ b/addon/pkce_check.py @@ -4,7 +4,7 @@ import httpx from typing import Dict, List import lib.cur_target_url as cur_target_url -from lib.report_vuln import save_report +from lib.report_vuln import report_vuln class PKCEDowngradeChecker: @@ -58,27 +58,19 @@ class PKCEDowngradeChecker: async def report_missing_parameters(self, url: str, is_openid: bool): status = "MEDIUM" if is_openid else "LOW" - self.save( - [ - self.make_report( - status, - "PKCE Parameters Missing", - "PKCE parameters are missing or incomplete.", - url, - ) - ] + report_vuln( + title="PKCE Parameters Missing", + desc="PKCE parameters are missing or incomplete.", + status=status, + uri=url, ) async def report_plain_method(self, url: str): - self.save( - [ - self.make_report( - "CRITICAL", - "PKCE Plain Method", - "PKCE method is set to 'plain'. Possible downgrade.", - url, - ) - ] + report_vuln( + title="PKCE Plain Method", + desc="PKCE method is set to 'plain'. Possible downgrade.", + status="CRITICAL", + uri=url, ) def create_downgraded_url(self, parsed, query): @@ -150,15 +142,11 @@ class PKCEDowngradeChecker: else: return # Likely safe - self.save( - [ - self.make_report( - status, - title, - description, - f"Original: {original_url}\nDowngraded: {downgraded_url}", - ) - ] + report_vuln( + title=title, + desc=description, + status=status, + uri=f"Original: {original_url}\nDowngraded: {downgraded_url}", ) def same_redirect_destination(self, orig_loc, down_loc): @@ -166,16 +154,3 @@ class PKCEDowngradeChecker: down = urlparse(down_loc) return orig.netloc == down.netloc and orig.path == down.path - def make_report( - self, status: str, title: str, description: str, uri: str - ) -> Dict[str, str]: - return { - "target": cur_target_url.load(), - "status": status, - "title": title, - "description": description, - "uri": uri, - } - - def save(self, report_data: List[Dict[str, str]]): - save_report(report_data) diff --git a/addon/redirect_uri_check.py b/addon/redirect_uri_check.py index 43df4cb..f6af225 100644 --- a/addon/redirect_uri_check.py +++ b/addon/redirect_uri_check.py @@ -4,8 +4,7 @@ import asyncio import random import time from urllib.parse import urlparse, parse_qs, urlencode, urlunparse -import lib.cur_target_url as cur_target_url -from lib.report_vuln import save_report +from lib.report_vuln import report_vuln class RedirectRateLimiter: """redirect_uri_check 전용 rate limiter""" @@ -1287,13 +1286,21 @@ class RedirectBypassChecker: except: return "" + def _is_code_in_location(self, location: str) -> bool: + return self._extract_code_from_location(location) != "" + """ mitmproxy용 - 실제 OAuth 플로우 가로채서 분석 """ async def test(self, flow: http.HTTPFlow): url = flow.request.pretty_url parsed = urlparse(url) query = parse_qs(parsed.query) - - if "redirect_uri" not in query: + + # location 헤더에 code가 없으면 스킵 + location = flow.response.headers.get("Location", "") + if not self._is_code_in_location(location): + return + + if not query or "redirect_uri" not in query: return original_redirect_uri = query["redirect_uri"][0] @@ -1347,15 +1354,13 @@ class RedirectBypassChecker: # 리다이렉트 응답이 아니면 스킵 if status not in [301, 302, 303, 307, 308]: return False - - # Location 헤더에서 code 추출 - auth_code = self._extract_code_from_location(location) # 베이스라인 검증 is_valid = self._is_baseline_valid(bypassed_uri, original_url) - - if auth_code and not is_valid: + + if self._is_code_in_location(location) and not is_valid: # 취약점 발견 시에만 로그 + auth_code = self._extract_code_from_location(location) print(f"[🎯 VULNERABILITY] {payload.name} 우회 성공!") await self._report_vulnerability(original_url, test_url, bypassed_uri, location, auth_code, payload) return True @@ -1384,14 +1389,12 @@ class RedirectBypassChecker: f"• 발급된 인가 코드: {auth_code[:10]}...\n\n" ) - report_data = [{ - "target": cur_target_url.load(), - "status": "CRITICAL", - "title": "Redirect URI Bypass Vulnerability", - "description": description, - "uri": test_url # uri 필드 추가 - }] + report_vuln( + title="Redirect URI Bypass Vulnerability", + desc=description, + status="CRITICAL", + uri=test_url + ) - save_report(report_data) print(f"[🎯 CRITICAL] Redirect URI 우회 취약점 발견 및 보고 완료!") print(f"[INFO] 패턴: {pattern_name}, 우회 URI: {bypassed_uri}") \ No newline at end of file diff --git a/addon/scope_detection.py b/addon/scope_detection.py index 2d14049..453a5ab 100644 --- a/addon/scope_detection.py +++ b/addon/scope_detection.py @@ -1,5 +1,5 @@ import lib.cur_target_url as cur_target_url -from lib.report_vuln import save_report +from lib.report_vuln import report_vuln class ScopeDetection: def get_scope_from_query(self, query: str) -> str | None: @@ -43,11 +43,9 @@ class ScopeDetection: result = await self.check_scope(flow) if result != 0: - report_data = [{ - 'target': cur_target_url.load(), - 'status': "WARNING", - 'title': "OAuth scope value issue", - 'description': f"{method} {url}: {', '.join(result)}", - 'uri': url - }] - save_report(report_data) + report_vuln( + title="OAuth Scope Value Issue", + desc=f"Detected scope value issue in {method} {url}: {', '.join(result)}", + status="WARNING", + uri=url + ) diff --git a/lib/report_vuln.py b/lib/report_vuln.py index 40925d0..e2644ef 100644 --- a/lib/report_vuln.py +++ b/lib/report_vuln.py @@ -1,14 +1,16 @@ # save as data/report.csv import os import csv -from typing import List, Dict, Any +from mitmproxy import http +import lib.cur_target_url as cur_target_url # target, status, title, description, uri # file path는 'data/report.csv'로 고정 -def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report.csv') -> None: - os.makedirs(os.path.dirname(file_path), exist_ok=True) +def report_vuln(title: str, desc: str, status: str, uri: str) -> None: + file_path: str = 'data/report.csv' + os.makedirs(os.path.dirname(file_path), exist_ok=True) """ report_data 안의 각 레포트를 한 줄씩 CSV에 추가로 저장합니다. @@ -23,10 +25,10 @@ def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report if not file_exists: writer.writeheader() - for row in report_data: - # None 방지 & 줄바꿈 이스케이프 - escaped = { - k: str(v).replace('\n', '\\n') if v is not None else '' - for k, v in row.items() - } - writer.writerow(escaped) + writer.writerow({ + 'target': cur_target_url.load(), + 'status': status, + 'title': title, + 'description': desc, + 'uri': uri, + }) diff --git a/lib/utils/is_oauth_uri.py b/lib/utils/is_oauth_uri.py new file mode 100644 index 0000000..29f99db --- /dev/null +++ b/lib/utils/is_oauth_uri.py @@ -0,0 +1,10 @@ +from urllib.parse import urlparse, parse_qs + +def is_oauth_uri(uri: str) -> bool: + qs = parse_qs(urlparse(uri).query) + qs_keys = [*qs] + + if "client_id" in qs_keys and any(p in qs_keys for p in ( + "redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")): + return True + return False \ No newline at end of file From 53db0fb14e8afe2b2848d1aa02c5974ababe417d Mon Sep 17 00:00:00 2001 From: "tv0924@icloud.com" Date: Thu, 26 Jun 2025 12:40:14 +0900 Subject: [PATCH 3/7] [Fix] scope detection --- addon/scope_detection.py | 51 +++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/addon/scope_detection.py b/addon/scope_detection.py index 453a5ab..a5a2e2d 100644 --- a/addon/scope_detection.py +++ b/addon/scope_detection.py @@ -1,12 +1,12 @@ -import lib.cur_target_url as cur_target_url from lib.report_vuln import report_vuln +from lib.utils.is_oauth_uri import is_oauth_uri +from urllib.parse import urlparse, parse_qs class ScopeDetection: def get_scope_from_query(self, query: str) -> str | None: if not query: return None - import urllib.parse - parsed = urllib.parse.parse_qs(query) + parsed = parse_qs(query) scope_values = parsed.get("scope", []) if scope_values: return scope_values[0] @@ -16,36 +16,33 @@ class ScopeDetection: req = flow.request res = flow.response - # req.query가 MultiDictView일 수 있으므로 문자열로 변환 - if hasattr(req.query, "urlencode"): - query = req.query.urlencode() - else: - query = str(req.query) if req.query else "" + parsed = urlparse(req.pretty_url) + query = parsed.query - location = res.headers.get("location", "") + location = res.headers.get("Location", "") + location_query = urlparse(location).query query_scope = self.get_scope_from_query(query) - location_scope = self.get_scope_from_query(location) + location_scope = self.get_scope_from_query(location_query) - result = [] if query_scope in ["all", "*"]: - result.append(f"Scope value issue detected in request: {query_scope}") - if location_scope in ["all", "*"]: - result.append(f"Scope value issue detected in response location: {location_scope}") - - return result if result else 0 - - async def test(self, flow): - req = flow.request - method = req.method - url = req.pretty_url - - result = await self.check_scope(flow) - - if result != 0: report_vuln( title="OAuth Scope Value Issue", - desc=f"Detected scope value issue in {method} {url}: {', '.join(result)}", + desc=f"Scope value issue detected in request: {query_scope}", status="WARNING", - uri=url + uri=req.pretty_url ) + if location_scope in ["all", "*"]: + report_vuln( + title="OAuth Scope Value Issue", + desc=f"Scope value issue detected in response location: {location_scope}", + status="WARNING", + uri=location + ) + + async def test(self, flow): + + if not is_oauth_uri(flow.request.pretty_url): + return + + await self.check_scope(flow) From 3c5db3c1fdcc3ce8085a83633f5e145f03393b04 Mon Sep 17 00:00:00 2001 From: "tv0924@icloud.com" Date: Thu, 26 Jun 2025 15:20:30 +0900 Subject: [PATCH 4/7] =?UTF-8?q?[Update]=20=EC=9E=90=EB=8F=99=20=EC=98=A4?= =?UTF-8?q?=ED=83=90=20=EA=B2=80=EC=A6=9D=EC=9D=84=20=EC=9C=84=ED=95=9C=20?= =?UTF-8?q?=EB=9D=BC=EC=9A=B0=ED=84=B0=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 17 +++++- addon/csrf_check.py | 12 ++--- addon/init.py | 32 +++++++++--- lib/false_true_varifing_task.py | 59 +++++++++++++++++++++ runner/backend/__init__.py | 91 ++++++++++++++++++++++++++++++--- 5 files changed, 188 insertions(+), 23 deletions(-) create mode 100644 lib/false_true_varifing_task.py diff --git a/README.md b/README.md index 2beaafe..8e15d4c 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ venv와 패키지가 설치가 됩니다. > 그렇지 않으면 실행되지 않습니다. > > 윈도우 환경에서는 `sudo certutil -addstore root mitmproxy-ca-cert.cer`로 인증합니다. -> +> > Sudo가 활성화되어있지 않은 환경에서는 관리자로 상향된 쉘에서 실행합니다. > > MacOS 환경에서는 `sudo security add-trusted-cert -d -p ssl -p basic -k /Library/Keychains/System.keychain ~/.mitmproxy/mitmproxy-ca-cert.pem`으로 인증합니다. @@ -58,8 +58,15 @@ class LoggerAddon: self.checker = Example() def request(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것 - self.checker.test(flow) + # 오탐 검사하고 있을때는 검증하지 않음 + if false_true_varifing_task.is_verifing_false_true(): + return + self.checker.test(flow) + def response(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것 + # 오탐 검사하고 있을때는 검증하지 않음 + if false_true_varifing_task.is_verifing_false_true(): + return self.checker.test(flow) ``` @@ -88,3 +95,9 @@ class Example: ``` 이러한 예제를 참고하여 작성하여주세요. + +# 백엔드 API DOCS + +``` +http://localhost:11081/redoc +``` diff --git a/addon/csrf_check.py b/addon/csrf_check.py index 34cd700..c64ec98 100644 --- a/addon/csrf_check.py +++ b/addon/csrf_check.py @@ -76,10 +76,10 @@ class CsrfChecker: resp_nonce = self.get_query_param(loc, param) if param else None if resp_nonce is None: - report_vuln(title="CSRF Risk", desc="Missing nonce in redirect response", status="HIGH", uri=flow.request.url) + report_vuln(title="CSRF Risk", desc="Missing nonce in redirect response", status="CRITICAL", uri=flow.request.url) return 1 if orig_nonce != resp_nonce: - report_vuln(title="CSRF Risk", desc="Nonce mismatch request↔response", status="MEDIUM", uri=flow.request.url) + report_vuln(title="CSRF Risk", desc="Nonce mismatch request↔response", status="HIGH", uri=flow.request.url) return 1 return 0 @@ -103,11 +103,11 @@ class CsrfChecker: if new_nonce is None: return 0 if new_nonce == orig_nonce: - report_vuln(title="CSRF Risk", desc="Nonce reused without cookies", status="HIGH", uri=flow.request.url) + report_vuln(title="CSRF Risk", desc="Nonce reused without cookies", status="CRITICAL", uri=flow.request.url) return 1 # (2) 두 번의 리다이렉트 비교 - async with httpx.AsyncClient(follow_redirects=False) as cli: + async with httpx.AsyncClient(follow_redirects=True) as cli: # 원본 쿼리 req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers) # nonce 교체 쿼리 @@ -120,7 +120,7 @@ class CsrfChecker: and urlparse(req1.headers.get("location", "")).path == urlparse(req2.headers.get("location", "")).path ): - report_vuln(title="CSRF Risk", desc="Identical redirects on nonce swap → potential CSRF", status="MEDIUM", uri=flow.request.url) + report_vuln(title="CSRF Risk", desc="Identical redirects on nonce swap → potential CSRF", status="NOT-VERIFIED-HIGH", uri=flow.request.url) return 1 return 0 @@ -130,7 +130,7 @@ class CsrfChecker: # 1) 요청에 nonce 없으면 if is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow): - report_vuln(title="CSRF Risk", desc="Missing nonce in OAuth request", status="HIGH", uri=flow.request.url) + report_vuln(title="CSRF Risk", desc="Missing nonce in OAuth request", status="CRITICAL", uri=flow.request.url) return # 2) 리다이렉트에서 nonce 검사 diff --git a/addon/init.py b/addon/init.py index fb69258..634d344 100644 --- a/addon/init.py +++ b/addon/init.py @@ -9,6 +9,10 @@ from access_token import AccessTokenScanner from addon.google_login_hint import GoogleLoginHint import os from dotenv import load_dotenv +from lib.false_true_varifing_task import FalseTrueVarifingTask + +# Initialize the singleton task manager +false_true_varifing_task = FalseTrueVarifingTask() load_dotenv(override=True) @@ -21,6 +25,10 @@ class PKCEAddon: f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}" ) try: + # 오탐 검사하고 있을때는 검증하지 않음 + if false_true_varifing_task.is_verifing_false_true(): + return + await self.checker.test(flow) except Exception as e: print(f"[ERROR] Addon failed: {e}") @@ -33,6 +41,9 @@ class CsrfAddon: async def response(self, flow: http.HTTPFlow): try: + # 오탐 검사하고 있을때는 검증하지 않음 + if false_true_varifing_task.is_verifing_false_true(): + return await self.checker.response(flow) except Exception as e: print(f"[ERROR] CSRF Addon failed: {e}") @@ -42,21 +53,18 @@ class CsrfAddon: class ScopeAddon: def __init__(self): self.checker = ScopeDetection() - self._flow_map = {} # 요청 정보를 저장 - - async def request(self, flow: http.HTTPFlow): - self._flow_map[flow.id] = { - "method": flow.request.method, - "url": flow.request.pretty_url, - "query": flow.request.query, - } async def response(self, flow: http.HTTPFlow): try: + # 오탐 검사하고 있을때는 검증하지 않음 + if false_true_varifing_task.is_verifing_false_true(): + return await self.checker.test(flow) except Exception as e: print(f"[ERROR] ScopeDetection failed: {e}") + + class NonceAddon: def __init__(self): self.checker = NonceChecker() @@ -70,12 +78,17 @@ class NonceAddon: print(f"[ERROR] NonceAddon failed: {e}") pass + + class AccessTokenAddon: def __init__(self): self.checker = AccessTokenScanner() async def response(self, flow: http.HTTPFlow): try: + # 오탐 검사하고 있을때는 검증하지 않음 + if false_true_varifing_task.is_verifing_false_true(): + return await self.checker.scan(flow) except Exception as e: print(f"[ERROR] AccessToken Addon failed: {e}") @@ -88,6 +101,9 @@ class RedirectBypassAddon: # request 대신 response 로 바꿔 보세요: async def response(self, flow: http.HTTPFlow): try: + # 오탐 검사하고 있을때는 검증하지 않음 + if false_true_varifing_task.is_verifing_false_true(): + return await self.checker.test(flow) except Exception as e: print(f"[ERROR] RedirectBypass Addon failed: {e}") diff --git a/lib/false_true_varifing_task.py b/lib/false_true_varifing_task.py new file mode 100644 index 0000000..62865e1 --- /dev/null +++ b/lib/false_true_varifing_task.py @@ -0,0 +1,59 @@ +from typing import Any +from copy import deepcopy + +class FalseTrueVarifingTask: + """ + A singleton class representing a task that can be either false or true. + This class is used to handle tasks that require verification of their truth value. + """ + + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(FalseTrueVarifingTask, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + self._is_verifing = False + self.task_queue = [] + self._initialized = True + + def reset(self): + """ + Reset the task queue and verification status. + """ + self._is_verifing = False + self.task_queue.clear() + + def add_task(self, task_name: str, data: Any): + """ + Add a task to the task queue. + :param task: The task to be added. + """ + self.task_queue.append({ + "task_name": task_name, + "data": data}) + + def start_verification(self): + """ + Start the verification process for the tasks in the queue. + """ + self._is_verifing = True + + def get_task_queue(self): + """ + Get a copy of the current task queue. + :return: A copy of the task queue. + """ + return deepcopy(self.task_queue) + + def is_verifing_false_true(self): + """ + Get the current verification status. + :return: True if verification is in progress, False otherwise. + """ + return self._is_verifing diff --git a/runner/backend/__init__.py b/runner/backend/__init__.py index 015c483..1410f52 100644 --- a/runner/backend/__init__.py +++ b/runner/backend/__init__.py @@ -1,17 +1,94 @@ from fastapi import FastAPI, Query, HTTPException from fastapi.responses import Response import lib.cur_target_url as cur_target_url +from lib.false_true_varifing_task import FalseTrueVarifingTask +from pydantic import BaseModel, Field +from lib.report_vuln import report_vuln as save_vuln + +# Initialize the singleton task manager +false_true_varifing_task = FalseTrueVarifingTask() app = FastAPI() -@app.post("/start") -async def start(url: str = Query(None)): - if url: - cur_target_url.save(url) - print(f"Target URL set to: {url}") - return {"message": f"Target URL set to: {url}"} - return {"error": "No URL provided"} + +@app.post( + "/start", + summary="취약점 검증을 위한 대상 URL 설정", + description=""" +이 엔드포인트는 시스템이 취약점 검증 작업에 사용할 대상 URL을 설정합니다. + +유효한 URL이 제공되면: +- 해당 URL이 저장됩니다. +- 검증 작업 큐가 초기화됩니다. +- 새로운 검증 작업을 시작할 준비가 완료됩니다. + +URL이 제공되지 않으면, 오류가 반환됩니다. +""" +, + tags=["1st STEP"] +) +async def start(url: str = Query(..., description="The URL to target for vulnerability verification")): + cur_target_url.save(url) + false_true_varifing_task.reset() + return {"message": f"Target URL set to: {url}"} + + + + + + +@app.post( + "/start-false-true-verifing", + summary="시스템에 False-True 검증 작업 시작을 알림", + description=""" +이 엔드포인트는 시스템에 False-True 방식의 검증 작업을 시작하도록 지시합니다. +또한 검증을 위해 준비된 작업 목록을 반환합니다. +""", + tags=["2nd STEP"] + ) +async def start_false_true_verifing(): + false_true_varifing_task.start_verification() + task_queue = false_true_varifing_task.get_task_queue() + return {"payload": task_queue} + + + + + + +class VulnerabilityReport(BaseModel): + title: str = Field(..., description="Short title for the vulnerability") + url: str = Field(..., description="URL where the vulnerability was discovered") + status: str = Field(..., description="Status of the vulnerability (e.g., VERIFIED-CRITICAL)") + desc: str = Field(..., description="Detailed description of the issue") + +@app.post( + "/report-vuln", + summary="취약점 보고", + description=""" +정탐인 취약점을 시스템에 보고합니다. +보고 시 다음 정보를 포함해야 합니다: + +- **title**: 취약점의 간단한 이름 +- **url**: 취약점이 발견된 위치 (URL) +- **status**: 심각도 +- **desc**: 취약점에 대한 상세 설명 +""", + tags=["3rd STEP"] +) +async def report_vuln(vuln: VulnerabilityReport): + save_vuln( + title=vuln.title, + desc=vuln.desc, + status=vuln.status, + uri=vuln.url + ) + + return {"message": "Vulnerability reported successfully"} + + + @app.exception_handler(404) From 4deb032708767099e167bfb88fad2fe9495fc0f7 Mon Sep 17 00:00:00 2001 From: "tv0924@icloud.com" Date: Thu, 26 Jun 2025 15:35:12 +0900 Subject: [PATCH 5/7] [Docs] api docs --- addon/csrf_check.py | 35 ++++++++++++++++++++++++++++----- lib/false_true_varifing_task.py | 1 + runner/backend/__init__.py | 4 ++-- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/addon/csrf_check.py b/addon/csrf_check.py index c64ec98..91b7e4e 100644 --- a/addon/csrf_check.py +++ b/addon/csrf_check.py @@ -76,10 +76,20 @@ class CsrfChecker: resp_nonce = self.get_query_param(loc, param) if param else None if resp_nonce is None: - report_vuln(title="CSRF Risk", desc="Missing nonce in redirect response", status="CRITICAL", uri=flow.request.url) + report_vuln( + title="CSRF Risk", + desc="Missing nonce in redirect response", + status="CRITICAL", + uri=flow.request.url + ) return 1 if orig_nonce != resp_nonce: - report_vuln(title="CSRF Risk", desc="Nonce mismatch request↔response", status="HIGH", uri=flow.request.url) + report_vuln( + title="CSRF Risk", + desc="Nonce mismatch request↔response", + status="HIGH", + uri=flow.request.url + ) return 1 return 0 @@ -103,7 +113,12 @@ class CsrfChecker: if new_nonce is None: return 0 if new_nonce == orig_nonce: - report_vuln(title="CSRF Risk", desc="Nonce reused without cookies", status="CRITICAL", uri=flow.request.url) + report_vuln( + title="CSRF Risk", + desc="Nonce reused without cookies", + status="CRITICAL", + uri=flow.request.url + ) return 1 # (2) 두 번의 리다이렉트 비교 @@ -120,7 +135,12 @@ class CsrfChecker: and urlparse(req1.headers.get("location", "")).path == urlparse(req2.headers.get("location", "")).path ): - report_vuln(title="CSRF Risk", desc="Identical redirects on nonce swap → potential CSRF", status="NOT-VERIFIED-HIGH", uri=flow.request.url) + report_vuln( + title="CSRF Risk", + desc="Identical redirects on nonce swap → potential CSRF", + status="NOT-VERIFIED-HIGH", + uri=flow.request.url + ) return 1 return 0 @@ -130,7 +150,12 @@ class CsrfChecker: # 1) 요청에 nonce 없으면 if is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow): - report_vuln(title="CSRF Risk", desc="Missing nonce in OAuth request", status="CRITICAL", uri=flow.request.url) + report_vuln( + title="CSRF Risk", + desc="Missing nonce in OAuth request", + status="CRITICAL", + uri=flow.request.url + ) return # 2) 리다이렉트에서 nonce 검사 diff --git a/lib/false_true_varifing_task.py b/lib/false_true_varifing_task.py index 62865e1..45fa782 100644 --- a/lib/false_true_varifing_task.py +++ b/lib/false_true_varifing_task.py @@ -29,6 +29,7 @@ class FalseTrueVarifingTask: self._is_verifing = False self.task_queue.clear() + # 각 addon의 검증 로직에서 해당 함수를 호출하여, 추후 오탐 검증을 위한 작업을 추가할 수 있습니다. def add_task(self, task_name: str, data: Any): """ Add a task to the task queue. diff --git a/runner/backend/__init__.py b/runner/backend/__init__.py index 1410f52..a279cf5 100644 --- a/runner/backend/__init__.py +++ b/runner/backend/__init__.py @@ -42,8 +42,8 @@ async def start(url: str = Query(..., description="The URL to target for vulnera "/start-false-true-verifing", summary="시스템에 False-True 검증 작업 시작을 알림", description=""" -이 엔드포인트는 시스템에 False-True 방식의 검증 작업을 시작하도록 지시합니다. -또한 검증을 위해 준비된 작업 목록을 반환합니다. +이 엔드포인트는 시스템에 False-True 방식의 검증 작업이 시작되었음을 알립니다. +또한 시스템은 미리 준비된 오탐 검증 작업 목록을 반환합니다. """, tags=["2nd STEP"] ) From 05a095df7d7108e22c77de6f015395af51c45a71 Mon Sep 17 00:00:00 2001 From: "tv0924@icloud.com" Date: Thu, 26 Jun 2025 15:35:26 +0900 Subject: [PATCH 6/7] [Docs] api docs --- runner/backend/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runner/backend/__init__.py b/runner/backend/__init__.py index a279cf5..749309d 100644 --- a/runner/backend/__init__.py +++ b/runner/backend/__init__.py @@ -40,9 +40,9 @@ async def start(url: str = Query(..., description="The URL to target for vulnera @app.post( "/start-false-true-verifing", - summary="시스템에 False-True 검증 작업 시작을 알림", + summary="시스템에 오탐 검증 작업 시작을 알림", description=""" -이 엔드포인트는 시스템에 False-True 방식의 검증 작업이 시작되었음을 알립니다. +이 엔드포인트는 시스템에 오탐 검증 작업이 시작되었음을 알립니다. 또한 시스템은 미리 준비된 오탐 검증 작업 목록을 반환합니다. """, tags=["2nd STEP"] From 58d5deb435ddc97e5b6106e2afc629eb0e06a062 Mon Sep 17 00:00:00 2001 From: "tv0924@icloud.com" Date: Thu, 26 Jun 2025 15:45:39 +0900 Subject: [PATCH 7/7] =?UTF-8?q?[Update]=20=EB=9D=BC=EC=9A=B0=ED=84=B0=20?= =?UTF-8?q?=EB=B0=98=ED=99=98=20=ED=98=95=ED=83=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 ++ lib/false_true_varifing_task.py | 13 +++++++++---- runner/backend/__init__.py | 13 +++++++++++++ 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 8e15d4c..f922164 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,8 @@ class Example: # 백엔드 API DOCS +`uv run main.py`으로 백엔드를 실행한 후에, 다음의 url에 접속합니다. + ``` http://localhost:11081/redoc ``` diff --git a/lib/false_true_varifing_task.py b/lib/false_true_varifing_task.py index 45fa782..4b9bde4 100644 --- a/lib/false_true_varifing_task.py +++ b/lib/false_true_varifing_task.py @@ -30,14 +30,19 @@ class FalseTrueVarifingTask: self.task_queue.clear() # 각 addon의 검증 로직에서 해당 함수를 호출하여, 추후 오탐 검증을 위한 작업을 추가할 수 있습니다. - def add_task(self, task_name: str, data: Any): + # TODO: 모델 지정해두기 + def add_task(self, task_name: str, initial_uri: str, data: Any): """ Add a task to the task queue. :param task: The task to be added. """ - self.task_queue.append({ - "task_name": task_name, - "data": data}) + self.task_queue.append( + { + "task_name": task_name, + "initial_uri": initial_uri, + "data": data + } + ) def start_verification(self): """ diff --git a/runner/backend/__init__.py b/runner/backend/__init__.py index 749309d..b917dbe 100644 --- a/runner/backend/__init__.py +++ b/runner/backend/__init__.py @@ -44,6 +44,19 @@ async def start(url: str = Query(..., description="The URL to target for vulnera description=""" 이 엔드포인트는 시스템에 오탐 검증 작업이 시작되었음을 알립니다. 또한 시스템은 미리 준비된 오탐 검증 작업 목록을 반환합니다. + +```json +{ + "payload": [ + { + "task_name": "pkce_task", # 검증 작업의 이름 + "initial_uri": "http://auth.example.com", # browser가 처음 접속할 URI + "data": any # 추가 데이터 + }, + ... + ] +} +``` """, tags=["2nd STEP"] )