Merge pull request #18 from j93es/date/0626-1

자동 오탐 검증을 위한 라우터 추가
This commit is contained in:
James 2025-06-26 18:39:41 +09:00 committed by GitHub
commit 00c81f365a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 397 additions and 256 deletions

2
.gitignore vendored
View file

@ -9,6 +9,8 @@ wheels/
# Virtual environments # Virtual environments
.venv .venv
.env
data/ data/

View file

@ -58,8 +58,15 @@ class LoggerAddon:
self.checker = Example() self.checker = Example()
def request(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것 def request(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것
self.checker.test(flow) # 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
self.checker.test(flow)
def response(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것 def response(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
self.checker.test(flow) self.checker.test(flow)
``` ```
@ -88,3 +95,11 @@ class Example:
``` ```
이러한 예제를 참고하여 작성하여주세요. 이러한 예제를 참고하여 작성하여주세요.
# 백엔드 API DOCS
`uv run main.py`으로 백엔드를 실행한 후에, 다음의 url에 접속합니다.
```
http://localhost:11081/redoc
```

View file

@ -1,53 +0,0 @@
import lib.target as target
from lib.report import save_report
class ScopeDetection:
def get_scope_from_query(self, query: str) -> str | None:
if not query:
return None
import urllib.parse
parsed = urllib.parse.parse_qs(query)
scope_values = parsed.get("scope", [])
if scope_values:
return scope_values[0]
return None
async def check_scope(self, flow):
req = flow.request
res = flow.response
# req.query가 MultiDictView일 수 있으므로 문자열로 변환
if hasattr(req.query, "urlencode"):
query = req.query.urlencode()
else:
query = str(req.query) if req.query else ""
location = res.headers.get("location", "")
query_scope = self.get_scope_from_query(query)
location_scope = self.get_scope_from_query(location)
result = []
if query_scope in ["all", "*"]:
result.append(f"Scope value issue detected in request: {query_scope}")
if location_scope in ["all", "*"]:
result.append(f"Scope value issue detected in response location: {location_scope}")
return result if result else 0
async def test(self, flow):
req = flow.request
method = req.method
url = req.pretty_url
result = await self.check_scope(flow)
if result != 0:
report_data = [{
'target': target.load(),
'status': "WARNING",
'title': "OAuth scope value issue",
'description': f"{method} {url}: {', '.join(result)}",
'uri': url
}]
save_report(report_data)

View file

@ -1,23 +1,11 @@
import re import re
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Any from typing import List, Dict, Optional, Any
import asyncio
from mitmproxy.http import HTTPFlow from mitmproxy.http import HTTPFlow
import lib.target as target from lib.report_vuln import report_vuln
from lib.report import save_report
# 결과 리포트 저장용 데이터 클래스
@dataclass
class TokenLeakResult:
title: str
description: str
uri: str
status: str = "MEDIUM" # 기본 상태
def to_report(self, target_value) -> Dict[str, str]:
"""리포트 저장 포맷(dict)으로 변환"""
return {"target": target_value, **asdict(self)}
# 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너 # 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너
@ -26,31 +14,25 @@ class AccessTokenScanner:
async def scan(self, flow: HTTPFlow) -> None: async def scan(self, flow: HTTPFlow) -> None:
"""단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사.""" """단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사."""
print(f"[TOKENDEBUG] Request URL: {flow.request.url}") print(f"[TOKENDEBUG] Request URL: {flow.request.url}")
findings: List[TokenLeakResult] = []
findings.extend(await self._scan_request(flow.request)) async_gather = []
findings.extend(await self._scan_response(flow.response, flow.request.url)) async_gather.append(self._scan_request(flow.request))
async_gather.append(self._scan_response(flow.response, flow.request.url))
if findings: await asyncio.gather(*async_gather)
target_value = target.load()
save_report([f.to_report(target_value) for f in findings])
# 내부 구현 # 내부 구현
async def _scan_request(self, request: Any) -> List[TokenLeakResult]: async def _scan_request(self, request: Any):
results: List[TokenLeakResult] = []
print("[TOKENDEBUG] ==scan request==") print("[TOKENDEBUG] ==scan request==")
# URL 검사 # URL 검사
token_result = self._extract_token(request.url) token_result = self._extract_token(request.url)
if token_result: if token_result:
token, has_bearer = token_result token, has_bearer = token_result
results.append( report_vuln(
TokenLeakResult( title="Token Leak in Request URL",
title="Token Leak in Request URL", desc=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}",
description=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}", status="MEDIUM" if has_bearer else "LOW",
uri=request.url, uri=request.url
status="MEDIUM" if has_bearer else "LOW"
)
) )
# Body 검사 (텍스트 컨텐츠인 경우) # Body 검사 (텍스트 컨텐츠인 경우)
@ -59,22 +41,17 @@ class AccessTokenScanner:
token_result = self._extract_token(body_text) token_result = self._extract_token(body_text)
if token_result: if token_result:
token, has_bearer = token_result token, has_bearer = token_result
results.append( report_vuln(
TokenLeakResult( title="Token Leak in Request Body",
title="Token Leak in Request Body", desc=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}",
description=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}", status="MEDIUM" if has_bearer else "LOW",
uri=request.url, uri=request.url
status="MEDIUM" if has_bearer else "LOW"
)
) )
return results async def _scan_response(self, response: Optional[Any], request_url: str):
async def _scan_response(self, response: Optional[Any], request_url: str) -> List[TokenLeakResult]:
if response is None: if response is None:
return [] return
results: List[TokenLeakResult] = []
print("[TOKENDEBUG] ==scan response==") print("[TOKENDEBUG] ==scan response==")
# Location 헤더 검사 (리다이렉트) # Location 헤더 검사 (리다이렉트)
if location_header := response.headers.get("Location"): if location_header := response.headers.get("Location"):
@ -82,12 +59,11 @@ class AccessTokenScanner:
if token_result: if token_result:
token, has_bearer = token_result token, has_bearer = token_result
if has_bearer: if has_bearer:
results.append( report_vuln(
TokenLeakResult( title="Token Leak in Redirect URL (Location header)",
title="Token Leak in Redirect URL (Location header)", desc=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}",
description=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}", status="MEDIUM",
uri=location_header, uri=location_header,
)
) )
# Body 검사 (텍스트 컨텐츠인 경우) # Body 검사 (텍스트 컨텐츠인 경우)
@ -97,16 +73,13 @@ class AccessTokenScanner:
if token_result: if token_result:
token, has_bearer = token_result token, has_bearer = token_result
if has_bearer: if has_bearer:
results.append( report_vuln(
TokenLeakResult( title="Token Leak in Response Body",
title="Token Leak in Response Body", desc=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}",
description=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}", status="MEDIUM",
uri=request_url, uri=request_url,
)
) )
return results
# 토큰 탐지 키워드드 # 토큰 탐지 키워드드
_TOKEN_KEYS = [ _TOKEN_KEYS = [
"access_token", "access_token",

View file

@ -4,23 +4,14 @@ from urllib.parse import urlparse, parse_qs, unquote
import httpx import httpx
from typing import Optional, Union, List from typing import Optional, Union, List
import lib.target as target from lib.report_vuln import report_vuln
from lib.report import save_report from lib.utils.is_oauth_uri import is_oauth_uri
class CsrfChecker: class CsrfChecker:
nonce_params = { nonce_params = {
"state", "nonce", "as", "frame_id", "csrf_token", "csrf" "state", "nonce", "as", "frame_id", "csrf_token", "csrf"
} }
def is_oauth_uri(self, uri: str) -> bool:
qs = parse_qs(urlparse(uri).query)
qs_keys = [*qs]
if "client_id" in qs_keys and any(p in qs_keys for p in (
"redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")):
return True
return False
def get_header(self, headers: http.Headers, name: str) -> Optional[str]: def get_header(self, headers: http.Headers, name: str) -> Optional[str]:
# mitmproxy Headers는 case-insensitive # mitmproxy Headers는 case-insensitive
raw = headers.get(name) raw = headers.get(name)
@ -40,7 +31,7 @@ class CsrfChecker:
def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool: def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool:
code = flow.response.status_code code = flow.response.status_code
loc = self.get_header(flow.response.headers, "location") or "" loc = self.get_header(flow.response.headers, "location") or ""
return 300 <= code < 400 and self.is_oauth_uri(loc) return 300 <= code < 400 and is_oauth_uri(loc)
def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool: def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool:
qs = parse_qs(urlparse(flow.request.url).query) qs = parse_qs(urlparse(flow.request.url).query)
@ -74,7 +65,7 @@ class CsrfChecker:
def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]: def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]:
# ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답 # ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답
if not (self.is_oauth_uri(flow.request.url) if not (is_oauth_uri(flow.request.url)
and self.check_nonce_in_request(flow) and self.check_nonce_in_request(flow)
and self.is_oauth_redirect(flow)): and self.is_oauth_redirect(flow)):
return 0 return 0
@ -85,18 +76,29 @@ class CsrfChecker:
resp_nonce = self.get_query_param(loc, param) if param else None resp_nonce = self.get_query_param(loc, param) if param else None
if resp_nonce is None: if resp_nonce is None:
return ["Missing nonce in redirect"] report_vuln(
title="CSRF Risk",
desc="Missing nonce in redirect response",
status="CRITICAL",
uri=flow.request.url
)
return 1
if orig_nonce != resp_nonce: if orig_nonce != resp_nonce:
return ["Nonce mismatch request↔response"] report_vuln(
title="CSRF Risk",
desc="Nonce mismatch request↔response",
status="HIGH",
uri=flow.request.url
)
return 1
return 0 return 0
async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]: async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]:
# OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사 # OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사
if self.is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow): if is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow):
return 0 return 0
loc0 = self.get_header(flow.response.headers, "location") or "" loc0 = self.get_header(flow.response.headers, "location") or ""
param = self.find_nonce_param(loc0) or "state" param = self.find_nonce_param(loc0) or "state"
qs0 = parse_qs(urlparse(loc0).query) qs0 = parse_qs(urlparse(loc0).query)
@ -111,10 +113,16 @@ class CsrfChecker:
if new_nonce is None: if new_nonce is None:
return 0 return 0
if new_nonce == orig_nonce: if new_nonce == orig_nonce:
return ["Nonce reused without cookies"] report_vuln(
title="CSRF Risk",
desc="Nonce reused without cookies",
status="CRITICAL",
uri=flow.request.url
)
return 1
# (2) 두 번의 리다이렉트 비교 # (2) 두 번의 리다이렉트 비교
async with httpx.AsyncClient(follow_redirects=False) as cli: async with httpx.AsyncClient(follow_redirects=True) as cli:
# 원본 쿼리 # 원본 쿼리
req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers) req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers)
# nonce 교체 쿼리 # nonce 교체 쿼리
@ -127,42 +135,35 @@ class CsrfChecker:
and urlparse(req1.headers.get("location", "")).path and urlparse(req1.headers.get("location", "")).path
== urlparse(req2.headers.get("location", "")).path == urlparse(req2.headers.get("location", "")).path
): ):
return ["Identical redirects on nonce swap → potential CSRF"] report_vuln(
title="CSRF Risk",
desc="Identical redirects on nonce swap → potential CSRF",
status="NOT-VERIFIED-HIGH",
uri=flow.request.url
)
return 1
return 0 return 0
async def response(self, flow: http.HTTPFlow) -> None: async def response(self, flow: http.HTTPFlow) -> None:
try: try:
msgs: List[str] = []
# 1) 요청에 nonce 없으면 # 1) 요청에 nonce 없으면
if self.is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow): if is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow):
msgs.append("Missing state/nonce in request") report_vuln(
title="CSRF Risk",
desc="Missing nonce in OAuth request",
status="CRITICAL",
uri=flow.request.url
)
return
# 2) 리다이렉트에서 nonce 검사 # 2) 리다이렉트에서 nonce 검사
r1 = self.check_redirect_nonce(flow) r1 = self.check_redirect_nonce(flow)
if r1:
msgs.extend(r1 if isinstance(r1, list) else [])
# 3) nonce 재사용 검사 # 3) nonce 재사용 검사
r2 = await self.check_nonce_reuse(flow) r2 = await self.check_nonce_reuse(flow)
if r2:
msgs.extend(r2 if isinstance(r2, list) else [])
if msgs:
desc = " | ".join(msgs)
status = "MEDIUM"
report_data = [{
'target': target.load(),
'status': status,
'title': "CSRF Risk",
'description': desc,
'uri': flow.request.url,
}]
save_report(report_data)
print(f"[INFO] CSRF Check: {desc}")
else:
pass
except Exception as e: except Exception as e:
print(f"[ERROR] CSRF Check failed: {e}") print(f"[ERROR] CSRF Check failed: {e}")
return return

View file

@ -1,14 +1,18 @@
from mitmproxy import http from mitmproxy import http
import asyncio import asyncio
from pkce_check import PKCEDowngradeChecker from pkce_check import PKCEDowngradeChecker
from ScopeDetection import ScopeDetection from addon.scope_detection import ScopeDetection
from csrf_check import CsrfChecker from csrf_check import CsrfChecker
from nonce_check import NonceChecker from nonce_check import NonceChecker
from redirect_uri_check import RedirectBypassChecker from redirect_uri_check import RedirectBypassChecker
from access_token import AccessTokenScanner from access_token import AccessTokenScanner
from GoogleLoginHint import GoogleLoginHint from addon.google_login_hint import GoogleLoginHint
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
from lib.false_true_varifing_task import FalseTrueVarifingTask
# Initialize the singleton task manager
false_true_varifing_task = FalseTrueVarifingTask()
load_dotenv(override=True) load_dotenv(override=True)
@ -21,6 +25,10 @@ class PKCEAddon:
f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}" f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}"
) )
try: try:
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
await self.checker.test(flow) await self.checker.test(flow)
except Exception as e: except Exception as e:
print(f"[ERROR] Addon failed: {e}") print(f"[ERROR] Addon failed: {e}")
@ -33,6 +41,9 @@ class CsrfAddon:
async def response(self, flow: http.HTTPFlow): async def response(self, flow: http.HTTPFlow):
try: try:
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
await self.checker.response(flow) await self.checker.response(flow)
except Exception as e: except Exception as e:
print(f"[ERROR] CSRF Addon failed: {e}") print(f"[ERROR] CSRF Addon failed: {e}")
@ -42,21 +53,18 @@ class CsrfAddon:
class ScopeAddon: class ScopeAddon:
def __init__(self): def __init__(self):
self.checker = ScopeDetection() self.checker = ScopeDetection()
self._flow_map = {} # 요청 정보를 저장
async def request(self, flow: http.HTTPFlow):
self._flow_map[flow.id] = {
"method": flow.request.method,
"url": flow.request.pretty_url,
"query": flow.request.query,
}
async def response(self, flow: http.HTTPFlow): async def response(self, flow: http.HTTPFlow):
try: try:
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
await self.checker.test(flow) await self.checker.test(flow)
except Exception as e: except Exception as e:
print(f"[ERROR] ScopeDetection failed: {e}") print(f"[ERROR] ScopeDetection failed: {e}")
class NonceAddon: class NonceAddon:
def __init__(self): def __init__(self):
self.checker = NonceChecker() self.checker = NonceChecker()
@ -70,12 +78,17 @@ class NonceAddon:
print(f"[ERROR] NonceAddon failed: {e}") print(f"[ERROR] NonceAddon failed: {e}")
pass pass
class AccessTokenAddon: class AccessTokenAddon:
def __init__(self): def __init__(self):
self.checker = AccessTokenScanner() self.checker = AccessTokenScanner()
async def response(self, flow: http.HTTPFlow): async def response(self, flow: http.HTTPFlow):
try: try:
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
await self.checker.scan(flow) await self.checker.scan(flow)
except Exception as e: except Exception as e:
print(f"[ERROR] AccessToken Addon failed: {e}") print(f"[ERROR] AccessToken Addon failed: {e}")
@ -88,6 +101,9 @@ class RedirectBypassAddon:
# request 대신 response 로 바꿔 보세요: # request 대신 response 로 바꿔 보세요:
async def response(self, flow: http.HTTPFlow): async def response(self, flow: http.HTTPFlow):
try: try:
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
await self.checker.test(flow) await self.checker.test(flow)
except Exception as e: except Exception as e:
print(f"[ERROR] RedirectBypass Addon failed: {e}") print(f"[ERROR] RedirectBypass Addon failed: {e}")
@ -107,4 +123,4 @@ class GoogleLoginHintAddon():
except Exception as e: except Exception as e:
print(f"[ERROR] GoogleLoginHint Addon failed: {e}") print(f"[ERROR] GoogleLoginHint Addon failed: {e}")
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), AccessTokenAddon(), RedirectBypassAddon(), GoogleLoginHintAddon()] addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), AccessTokenAddon(), GoogleLoginHintAddon(), RedirectBypassAddon()]

View file

@ -1,10 +1,8 @@
import jwt import jwt
from urllib.parse import urlparse, parse_qs from urllib.parse import urlparse, parse_qs
from typing import Union from typing import Union
import httpx
import lib.target as target from lib.report_vuln import report_vuln
from lib.report import save_report
class NonceChecker: class NonceChecker:
def is_oidc_flow(self, flow) -> bool: def is_oidc_flow(self, flow) -> bool:
@ -72,17 +70,13 @@ class NonceChecker:
def check_nonce_in_id_token(self, flow, id_token: str) -> bool: def check_nonce_in_id_token(self, flow, id_token: str) -> bool:
decoded = self.decode_id_token(id_token) decoded = self.decode_id_token(id_token)
nonce = decoded.get("nonce") nonce = decoded.get("nonce")
req = flow.request
url = req.pretty_url
if not nonce: if not nonce:
report_data = [{ report_vuln(
'target': target.load(), title="Nonce Check Failed",
'status': "CRITICAL", desc="id_token에 nonce가 없습니다.",
'title': "nonce is missing in id_token", status="HIGH",
'description': "Nonce is present in the request but missing in the id_token.", uri=flow.request.url
'uri': f"Original: {url}\nDecoded ID Token: {decoded}", )
}]
save_report(report_data)
return False return False
else: else:
return True return True

View file

@ -3,8 +3,8 @@ import asyncio
import httpx import httpx
from typing import Dict, List from typing import Dict, List
import lib.target as target import lib.cur_target_url as cur_target_url
from lib.report import save_report from lib.report_vuln import report_vuln
class PKCEDowngradeChecker: class PKCEDowngradeChecker:
@ -58,27 +58,19 @@ class PKCEDowngradeChecker:
async def report_missing_parameters(self, url: str, is_openid: bool): async def report_missing_parameters(self, url: str, is_openid: bool):
status = "MEDIUM" if is_openid else "LOW" status = "MEDIUM" if is_openid else "LOW"
self.save( report_vuln(
[ title="PKCE Parameters Missing",
self.make_report( desc="PKCE parameters are missing or incomplete.",
status, status=status,
"PKCE Parameters Missing", uri=url,
"PKCE parameters are missing or incomplete.",
url,
)
]
) )
async def report_plain_method(self, url: str): async def report_plain_method(self, url: str):
self.save( report_vuln(
[ title="PKCE Plain Method",
self.make_report( desc="PKCE method is set to 'plain'. Possible downgrade.",
"CRITICAL", status="CRITICAL",
"PKCE Plain Method", uri=url,
"PKCE method is set to 'plain'. Possible downgrade.",
url,
)
]
) )
def create_downgraded_url(self, parsed, query): def create_downgraded_url(self, parsed, query):
@ -150,15 +142,11 @@ class PKCEDowngradeChecker:
else: else:
return # Likely safe return # Likely safe
self.save( report_vuln(
[ title=title,
self.make_report( desc=description,
status, status=status,
title, uri=f"Original: {original_url}\nDowngraded: {downgraded_url}",
description,
f"Original: {original_url}\nDowngraded: {downgraded_url}",
)
]
) )
def same_redirect_destination(self, orig_loc, down_loc): def same_redirect_destination(self, orig_loc, down_loc):
@ -166,16 +154,3 @@ class PKCEDowngradeChecker:
down = urlparse(down_loc) down = urlparse(down_loc)
return orig.netloc == down.netloc and orig.path == down.path return orig.netloc == down.netloc and orig.path == down.path
def make_report(
self, status: str, title: str, description: str, uri: str
) -> Dict[str, str]:
return {
"target": target.load(),
"status": status,
"title": title,
"description": description,
"uri": uri,
}
def save(self, report_data: List[Dict[str, str]]):
save_report(report_data)

View file

@ -4,8 +4,7 @@ import asyncio
import random import random
import time import time
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import lib.target as target from lib.report_vuln import report_vuln
from lib.report import save_report
class RedirectRateLimiter: class RedirectRateLimiter:
"""redirect_uri_check 전용 rate limiter""" """redirect_uri_check 전용 rate limiter"""
@ -1287,13 +1286,21 @@ class RedirectBypassChecker:
except: except:
return "" return ""
def _is_code_in_location(self, location: str) -> bool:
return self._extract_code_from_location(location) != ""
""" mitmproxy용 - 실제 OAuth 플로우 가로채서 분석 """ """ mitmproxy용 - 실제 OAuth 플로우 가로채서 분석 """
async def test(self, flow: http.HTTPFlow): async def test(self, flow: http.HTTPFlow):
url = flow.request.pretty_url url = flow.request.pretty_url
parsed = urlparse(url) parsed = urlparse(url)
query = parse_qs(parsed.query) query = parse_qs(parsed.query)
if "redirect_uri" not in query: # location 헤더에 code가 없으면 스킵
location = flow.response.headers.get("Location", "")
if not self._is_code_in_location(location):
return
if not query or "redirect_uri" not in query:
return return
original_redirect_uri = query["redirect_uri"][0] original_redirect_uri = query["redirect_uri"][0]
@ -1348,14 +1355,12 @@ class RedirectBypassChecker:
if status not in [301, 302, 303, 307, 308]: if status not in [301, 302, 303, 307, 308]:
return False return False
# Location 헤더에서 code 추출
auth_code = self._extract_code_from_location(location)
# 베이스라인 검증 # 베이스라인 검증
is_valid = self._is_baseline_valid(bypassed_uri, original_url) is_valid = self._is_baseline_valid(bypassed_uri, original_url)
if auth_code and not is_valid: if self._is_code_in_location(location) and not is_valid:
# 취약점 발견 시에만 로그 # 취약점 발견 시에만 로그
auth_code = self._extract_code_from_location(location)
print(f"[🎯 VULNERABILITY] {payload.name} 우회 성공!") print(f"[🎯 VULNERABILITY] {payload.name} 우회 성공!")
await self._report_vulnerability(original_url, test_url, bypassed_uri, location, auth_code, payload) await self._report_vulnerability(original_url, test_url, bypassed_uri, location, auth_code, payload)
return True return True
@ -1384,14 +1389,12 @@ class RedirectBypassChecker:
f"• 발급된 인가 코드: {auth_code[:10]}...\n\n" f"• 발급된 인가 코드: {auth_code[:10]}...\n\n"
) )
report_data = [{ report_vuln(
"target": target.load(), title="Redirect URI Bypass Vulnerability",
"status": "CRITICAL", desc=description,
"title": "Redirect URI Bypass Vulnerability", status="CRITICAL",
"description": description, uri=test_url
"uri": test_url # uri 필드 추가 )
}]
save_report(report_data)
print(f"[🎯 CRITICAL] Redirect URI 우회 취약점 발견 및 보고 완료!") print(f"[🎯 CRITICAL] Redirect URI 우회 취약점 발견 및 보고 완료!")
print(f"[INFO] 패턴: {pattern_name}, 우회 URI: {bypassed_uri}") print(f"[INFO] 패턴: {pattern_name}, 우회 URI: {bypassed_uri}")

48
addon/scope_detection.py Normal file
View file

@ -0,0 +1,48 @@
from lib.report_vuln import report_vuln
from lib.utils.is_oauth_uri import is_oauth_uri
from urllib.parse import urlparse, parse_qs
class ScopeDetection:
def get_scope_from_query(self, query: str) -> str | None:
if not query:
return None
parsed = parse_qs(query)
scope_values = parsed.get("scope", [])
if scope_values:
return scope_values[0]
return None
async def check_scope(self, flow):
req = flow.request
res = flow.response
parsed = urlparse(req.pretty_url)
query = parsed.query
location = res.headers.get("Location", "")
location_query = urlparse(location).query
query_scope = self.get_scope_from_query(query)
location_scope = self.get_scope_from_query(location_query)
if query_scope in ["all", "*"]:
report_vuln(
title="OAuth Scope Value Issue",
desc=f"Scope value issue detected in request: {query_scope}",
status="WARNING",
uri=req.pretty_url
)
if location_scope in ["all", "*"]:
report_vuln(
title="OAuth Scope Value Issue",
desc=f"Scope value issue detected in response location: {location_scope}",
status="WARNING",
uri=location
)
async def test(self, flow):
if not is_oauth_uri(flow.request.pretty_url):
return
await self.check_scope(flow)

View file

@ -0,0 +1,65 @@
from typing import Any
from copy import deepcopy
class FalseTrueVarifingTask:
"""
A singleton class representing a task that can be either false or true.
This class is used to handle tasks that require verification of their truth value.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(FalseTrueVarifingTask, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._is_verifing = False
self.task_queue = []
self._initialized = True
def reset(self):
"""
Reset the task queue and verification status.
"""
self._is_verifing = False
self.task_queue.clear()
# 각 addon의 검증 로직에서 해당 함수를 호출하여, 추후 오탐 검증을 위한 작업을 추가할 수 있습니다.
# TODO: 모델 지정해두기
def add_task(self, task_name: str, initial_uri: str, data: Any):
"""
Add a task to the task queue.
:param task: The task to be added.
"""
self.task_queue.append(
{
"task_name": task_name,
"initial_uri": initial_uri,
"data": data
}
)
def start_verification(self):
"""
Start the verification process for the tasks in the queue.
"""
self._is_verifing = True
def get_task_queue(self):
"""
Get a copy of the current task queue.
:return: A copy of the task queue.
"""
return deepcopy(self.task_queue)
def is_verifing_false_true(self):
"""
Get the current verification status.
:return: True if verification is in progress, False otherwise.
"""
return self._is_verifing

View file

@ -1,14 +1,16 @@
# save as data/report.csv # save as data/report.csv
import os import os
import csv import csv
from typing import List, Dict, Any from mitmproxy import http
import lib.cur_target_url as cur_target_url
# target, status, title, description, uri # target, status, title, description, uri
# file path는 'data/report.csv'로 고정 # file path는 'data/report.csv'로 고정
def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report.csv') -> None: def report_vuln(title: str, desc: str, status: str, uri: str) -> None:
os.makedirs(os.path.dirname(file_path), exist_ok=True) file_path: str = 'data/report.csv'
os.makedirs(os.path.dirname(file_path), exist_ok=True)
""" """
report_data 안의 레포트를 줄씩 CSV에 추가로 저장합니다. report_data 안의 레포트를 줄씩 CSV에 추가로 저장합니다.
@ -23,10 +25,10 @@ def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report
if not file_exists: if not file_exists:
writer.writeheader() writer.writeheader()
for row in report_data: writer.writerow({
# None 방지 & 줄바꿈 이스케이프 'target': cur_target_url.load(),
escaped = { 'status': status,
k: str(v).replace('\n', '\\n') if v is not None else '' 'title': title,
for k, v in row.items() 'description': desc,
} 'uri': uri,
writer.writerow(escaped) })

10
lib/utils/is_oauth_uri.py Normal file
View file

@ -0,0 +1,10 @@
from urllib.parse import urlparse, parse_qs
def is_oauth_uri(uri: str) -> bool:
qs = parse_qs(urlparse(uri).query)
qs_keys = [*qs]
if "client_id" in qs_keys and any(p in qs_keys for p in (
"redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")):
return True
return False

View file

@ -1,17 +1,107 @@
from fastapi import FastAPI, Query, HTTPException from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import Response from fastapi.responses import Response
import lib.target as target import lib.cur_target_url as cur_target_url
from lib.false_true_varifing_task import FalseTrueVarifingTask
from pydantic import BaseModel, Field
from lib.report_vuln import report_vuln as save_vuln
# Initialize the singleton task manager
false_true_varifing_task = FalseTrueVarifingTask()
app = FastAPI() app = FastAPI()
@app.post("/start")
async def start(url: str = Query(None)): @app.post(
if url: "/start",
target.save(url) summary="취약점 검증을 위한 대상 URL 설정",
print(f"Target URL set to: {url}") description="""
return {"message": f"Target URL set to: {url}"} 엔드포인트는 시스템이 취약점 검증 작업에 사용할 대상 URL을 설정합니다.
return {"error": "No URL provided"}
유효한 URL이 제공되면:
- 해당 URL이 저장됩니다.
- 검증 작업 큐가 초기화됩니다.
- 새로운 검증 작업을 시작할 준비가 완료됩니다.
URL이 제공되지 않으면, 오류가 반환됩니다.
"""
,
tags=["1st STEP"]
)
async def start(url: str = Query(..., description="The URL to target for vulnerability verification")):
cur_target_url.save(url)
false_true_varifing_task.reset()
return {"message": f"Target URL set to: {url}"}
@app.post(
"/start-false-true-verifing",
summary="시스템에 오탐 검증 작업 시작을 알림",
description="""
엔드포인트는 시스템에 오탐 검증 작업이 시작되었음을 알립니다.
또한 시스템은 미리 준비된 오탐 검증 작업 목록을 반환합니다.
```json
{
"payload": [
{
"task_name": "pkce_task", # 검증 작업의 이름
"initial_uri": "http://auth.example.com", # browser가 처음 접속할 URI
"data": any # 추가 데이터
},
...
]
}
```
""",
tags=["2nd STEP"]
)
async def start_false_true_verifing():
false_true_varifing_task.start_verification()
task_queue = false_true_varifing_task.get_task_queue()
return {"payload": task_queue}
class VulnerabilityReport(BaseModel):
title: str = Field(..., description="Short title for the vulnerability")
url: str = Field(..., description="URL where the vulnerability was discovered")
status: str = Field(..., description="Status of the vulnerability (e.g., VERIFIED-CRITICAL)")
desc: str = Field(..., description="Detailed description of the issue")
@app.post(
"/report-vuln",
summary="취약점 보고",
description="""
정탐인 취약점을 시스템에 보고합니다.
보고 다음 정보를 포함해야 합니다:
- **title**: 취약점의 간단한 이름
- **url**: 취약점이 발견된 위치 (URL)
- **status**: 심각도
- **desc**: 취약점에 대한 상세 설명
""",
tags=["3rd STEP"]
)
async def report_vuln(vuln: VulnerabilityReport):
save_vuln(
title=vuln.title,
desc=vuln.desc,
status=vuln.status,
uri=vuln.url
)
return {"message": "Vulnerability reported successfully"}
@app.exception_handler(404) @app.exception_handler(404)