[Update] save vuln report logic

This commit is contained in:
tv0924@icloud.com 2025-06-26 12:20:41 +09:00
commit 3a1422a2f2
9 changed files with 121 additions and 190 deletions

View file

@ -1,23 +1,11 @@
import re import re
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Any from typing import List, Dict, Optional, Any
import asyncio
from mitmproxy.http import HTTPFlow from mitmproxy.http import HTTPFlow
import lib.cur_target_url as cur_target_url from lib.report_vuln import report_vuln
from lib.report_vuln import save_report
# 결과 리포트 저장용 데이터 클래스
@dataclass
class TokenLeakResult:
title: str
description: str
uri: str
status: str = "MEDIUM" # 기본 상태
def to_report(self, target_value) -> Dict[str, str]:
"""리포트 저장 포맷(dict)으로 변환"""
return {"target": target_value, **asdict(self)}
# 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너 # 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너
@ -26,31 +14,25 @@ class AccessTokenScanner:
async def scan(self, flow: HTTPFlow) -> None: async def scan(self, flow: HTTPFlow) -> None:
"""단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사.""" """단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사."""
print(f"[TOKENDEBUG] Request URL: {flow.request.url}") print(f"[TOKENDEBUG] Request URL: {flow.request.url}")
findings: List[TokenLeakResult] = []
async_gather = []
findings.extend(await self._scan_request(flow.request)) async_gather.append(self._scan_request(flow.request))
findings.extend(await self._scan_response(flow.response, flow.request.url)) async_gather.append(self._scan_response(flow.response, flow.request.url))
await asyncio.gather(*async_gather)
if findings:
target_value = cur_target_url.load()
save_report([f.to_report(target_value) for f in findings])
# 내부 구현 # 내부 구현
async def _scan_request(self, request: Any) -> List[TokenLeakResult]: async def _scan_request(self, request: Any):
results: List[TokenLeakResult] = []
print("[TOKENDEBUG] ==scan request==") print("[TOKENDEBUG] ==scan request==")
# URL 검사 # URL 검사
token_result = self._extract_token(request.url) token_result = self._extract_token(request.url)
if token_result: if token_result:
token, has_bearer = token_result token, has_bearer = token_result
results.append( report_vuln(
TokenLeakResult( title="Token Leak in Request URL",
title="Token Leak in Request URL", desc=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}",
description=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}", status="MEDIUM" if has_bearer else "LOW",
uri=request.url, uri=request.url
status="MEDIUM" if has_bearer else "LOW"
)
) )
# Body 검사 (텍스트 컨텐츠인 경우) # Body 검사 (텍스트 컨텐츠인 경우)
@ -59,22 +41,17 @@ class AccessTokenScanner:
token_result = self._extract_token(body_text) token_result = self._extract_token(body_text)
if token_result: if token_result:
token, has_bearer = token_result token, has_bearer = token_result
results.append( report_vuln(
TokenLeakResult( title="Token Leak in Request Body",
title="Token Leak in Request Body", desc=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}",
description=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}", status="MEDIUM" if has_bearer else "LOW",
uri=request.url, uri=request.url
status="MEDIUM" if has_bearer else "LOW"
)
) )
return results async def _scan_response(self, response: Optional[Any], request_url: str):
async def _scan_response(self, response: Optional[Any], request_url: str) -> List[TokenLeakResult]:
if response is None: if response is None:
return [] return
results: List[TokenLeakResult] = []
print("[TOKENDEBUG] ==scan response==") print("[TOKENDEBUG] ==scan response==")
# Location 헤더 검사 (리다이렉트) # Location 헤더 검사 (리다이렉트)
if location_header := response.headers.get("Location"): if location_header := response.headers.get("Location"):
@ -82,12 +59,11 @@ class AccessTokenScanner:
if token_result: if token_result:
token, has_bearer = token_result token, has_bearer = token_result
if has_bearer: if has_bearer:
results.append( report_vuln(
TokenLeakResult( title="Token Leak in Redirect URL (Location header)",
title="Token Leak in Redirect URL (Location header)", desc=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}",
description=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}", status="MEDIUM",
uri=location_header, uri=location_header,
)
) )
# Body 검사 (텍스트 컨텐츠인 경우) # Body 검사 (텍스트 컨텐츠인 경우)
@ -97,16 +73,13 @@ class AccessTokenScanner:
if token_result: if token_result:
token, has_bearer = token_result token, has_bearer = token_result
if has_bearer: if has_bearer:
results.append( report_vuln(
TokenLeakResult( title="Token Leak in Response Body",
title="Token Leak in Response Body", desc=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}",
description=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}", status="MEDIUM",
uri=request_url, uri=request_url,
)
) )
return results
# 토큰 탐지 키워드드 # 토큰 탐지 키워드드
_TOKEN_KEYS = [ _TOKEN_KEYS = [
"access_token", "access_token",

View file

@ -4,23 +4,14 @@ from urllib.parse import urlparse, parse_qs, unquote
import httpx import httpx
from typing import Optional, Union, List from typing import Optional, Union, List
import lib.cur_target_url as cur_target_url from lib.report_vuln import report_vuln
from lib.report_vuln import save_report from lib.utils.is_oauth_uri import is_oauth_uri
class CsrfChecker: class CsrfChecker:
nonce_params = { nonce_params = {
"state", "nonce", "as", "frame_id", "csrf_token", "csrf" "state", "nonce", "as", "frame_id", "csrf_token", "csrf"
} }
def is_oauth_uri(self, uri: str) -> bool:
qs = parse_qs(urlparse(uri).query)
qs_keys = [*qs]
if "client_id" in qs_keys and any(p in qs_keys for p in (
"redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")):
return True
return False
def get_header(self, headers: http.Headers, name: str) -> Optional[str]: def get_header(self, headers: http.Headers, name: str) -> Optional[str]:
# mitmproxy Headers는 case-insensitive # mitmproxy Headers는 case-insensitive
raw = headers.get(name) raw = headers.get(name)
@ -40,7 +31,7 @@ class CsrfChecker:
def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool: def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool:
code = flow.response.status_code code = flow.response.status_code
loc = self.get_header(flow.response.headers, "location") or "" loc = self.get_header(flow.response.headers, "location") or ""
return 300 <= code < 400 and self.is_oauth_uri(loc) return 300 <= code < 400 and is_oauth_uri(loc)
def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool: def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool:
qs = parse_qs(urlparse(flow.request.url).query) qs = parse_qs(urlparse(flow.request.url).query)
@ -71,10 +62,10 @@ class CsrfChecker:
headers=headers, headers=headers,
content=flow.request.get_content(), content=flow.request.get_content(),
) )
def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]: def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]:
# ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답 # ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답
if not (self.is_oauth_uri(flow.request.url) if not (is_oauth_uri(flow.request.url)
and self.check_nonce_in_request(flow) and self.check_nonce_in_request(flow)
and self.is_oauth_redirect(flow)): and self.is_oauth_redirect(flow)):
return 0 return 0
@ -85,18 +76,19 @@ class CsrfChecker:
resp_nonce = self.get_query_param(loc, param) if param else None resp_nonce = self.get_query_param(loc, param) if param else None
if resp_nonce is None: if resp_nonce is None:
return ["Missing nonce in redirect"] report_vuln(title="CSRF Risk", desc="Missing nonce in redirect response", status="HIGH", uri=flow.request.url)
return 1
if orig_nonce != resp_nonce: if orig_nonce != resp_nonce:
return ["Nonce mismatch request↔response"] report_vuln(title="CSRF Risk", desc="Nonce mismatch request↔response", status="MEDIUM", uri=flow.request.url)
return 1
return 0 return 0
async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]: async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]:
# OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사 # OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사
if self.is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow): if is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow):
return 0 return 0
loc0 = self.get_header(flow.response.headers, "location") or "" loc0 = self.get_header(flow.response.headers, "location") or ""
param = self.find_nonce_param(loc0) or "state" param = self.find_nonce_param(loc0) or "state"
qs0 = parse_qs(urlparse(loc0).query) qs0 = parse_qs(urlparse(loc0).query)
@ -111,7 +103,8 @@ class CsrfChecker:
if new_nonce is None: if new_nonce is None:
return 0 return 0
if new_nonce == orig_nonce: if new_nonce == orig_nonce:
return ["Nonce reused without cookies"] report_vuln(title="CSRF Risk", desc="Nonce reused without cookies", status="HIGH", uri=flow.request.url)
return 1
# (2) 두 번의 리다이렉트 비교 # (2) 두 번의 리다이렉트 비교
async with httpx.AsyncClient(follow_redirects=False) as cli: async with httpx.AsyncClient(follow_redirects=False) as cli:
@ -127,42 +120,25 @@ class CsrfChecker:
and urlparse(req1.headers.get("location", "")).path and urlparse(req1.headers.get("location", "")).path
== urlparse(req2.headers.get("location", "")).path == urlparse(req2.headers.get("location", "")).path
): ):
return ["Identical redirects on nonce swap → potential CSRF"] report_vuln(title="CSRF Risk", desc="Identical redirects on nonce swap → potential CSRF", status="MEDIUM", uri=flow.request.url)
return 1
return 0 return 0
async def response(self, flow: http.HTTPFlow) -> None: async def response(self, flow: http.HTTPFlow) -> None:
try: try:
msgs: List[str] = []
# 1) 요청에 nonce 없으면 # 1) 요청에 nonce 없으면
if self.is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow): if is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow):
msgs.append("Missing state/nonce in request") report_vuln(title="CSRF Risk", desc="Missing nonce in OAuth request", status="HIGH", uri=flow.request.url)
return
# 2) 리다이렉트에서 nonce 검사 # 2) 리다이렉트에서 nonce 검사
r1 = self.check_redirect_nonce(flow) r1 = self.check_redirect_nonce(flow)
if r1:
msgs.extend(r1 if isinstance(r1, list) else [])
# 3) nonce 재사용 검사 # 3) nonce 재사용 검사
r2 = await self.check_nonce_reuse(flow) r2 = await self.check_nonce_reuse(flow)
if r2:
msgs.extend(r2 if isinstance(r2, list) else [])
if msgs:
desc = " | ".join(msgs)
status = "MEDIUM"
report_data = [{
'target': cur_target_url.load(),
'status': status,
'title': "CSRF Risk",
'description': desc,
'uri': flow.request.url,
}]
save_report(report_data)
print(f"[INFO] CSRF Check: {desc}")
else:
pass
except Exception as e: except Exception as e:
print(f"[ERROR] CSRF Check failed: {e}") print(f"[ERROR] CSRF Check failed: {e}")
return return

View file

@ -107,4 +107,4 @@ class GoogleLoginHintAddon():
except Exception as e: except Exception as e:
print(f"[ERROR] GoogleLoginHint Addon failed: {e}") print(f"[ERROR] GoogleLoginHint Addon failed: {e}")
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), AccessTokenAddon(), RedirectBypassAddon(), GoogleLoginHintAddon()] addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), AccessTokenAddon(), GoogleLoginHintAddon(), RedirectBypassAddon()]

View file

@ -1,10 +1,8 @@
import jwt import jwt
from urllib.parse import urlparse, parse_qs from urllib.parse import urlparse, parse_qs
from typing import Union from typing import Union
import httpx
import lib.cur_target_url as cur_target_url from lib.report_vuln import report_vuln
from lib.report_vuln import save_report
class NonceChecker: class NonceChecker:
def is_oidc_flow(self, flow) -> bool: def is_oidc_flow(self, flow) -> bool:
@ -72,17 +70,13 @@ class NonceChecker:
def check_nonce_in_id_token(self, flow, id_token: str) -> bool: def check_nonce_in_id_token(self, flow, id_token: str) -> bool:
decoded = self.decode_id_token(id_token) decoded = self.decode_id_token(id_token)
nonce = decoded.get("nonce") nonce = decoded.get("nonce")
req = flow.request
url = req.pretty_url
if not nonce: if not nonce:
report_data = [{ report_vuln(
'target': cur_target_url.load(), title="Nonce Check Failed",
'status': "CRITICAL", desc="id_token에 nonce가 없습니다.",
'title': "nonce is missing in id_token", status="HIGH",
'description': "Nonce is present in the request but missing in the id_token.", uri=flow.request.url
'uri': f"Original: {url}\nDecoded ID Token: {decoded}", )
}]
save_report(report_data)
return False return False
else: else:
return True return True

View file

@ -4,7 +4,7 @@ import httpx
from typing import Dict, List from typing import Dict, List
import lib.cur_target_url as cur_target_url import lib.cur_target_url as cur_target_url
from lib.report_vuln import save_report from lib.report_vuln import report_vuln
class PKCEDowngradeChecker: class PKCEDowngradeChecker:
@ -58,27 +58,19 @@ class PKCEDowngradeChecker:
async def report_missing_parameters(self, url: str, is_openid: bool): async def report_missing_parameters(self, url: str, is_openid: bool):
status = "MEDIUM" if is_openid else "LOW" status = "MEDIUM" if is_openid else "LOW"
self.save( report_vuln(
[ title="PKCE Parameters Missing",
self.make_report( desc="PKCE parameters are missing or incomplete.",
status, status=status,
"PKCE Parameters Missing", uri=url,
"PKCE parameters are missing or incomplete.",
url,
)
]
) )
async def report_plain_method(self, url: str): async def report_plain_method(self, url: str):
self.save( report_vuln(
[ title="PKCE Plain Method",
self.make_report( desc="PKCE method is set to 'plain'. Possible downgrade.",
"CRITICAL", status="CRITICAL",
"PKCE Plain Method", uri=url,
"PKCE method is set to 'plain'. Possible downgrade.",
url,
)
]
) )
def create_downgraded_url(self, parsed, query): def create_downgraded_url(self, parsed, query):
@ -150,15 +142,11 @@ class PKCEDowngradeChecker:
else: else:
return # Likely safe return # Likely safe
self.save( report_vuln(
[ title=title,
self.make_report( desc=description,
status, status=status,
title, uri=f"Original: {original_url}\nDowngraded: {downgraded_url}",
description,
f"Original: {original_url}\nDowngraded: {downgraded_url}",
)
]
) )
def same_redirect_destination(self, orig_loc, down_loc): def same_redirect_destination(self, orig_loc, down_loc):
@ -166,16 +154,3 @@ class PKCEDowngradeChecker:
down = urlparse(down_loc) down = urlparse(down_loc)
return orig.netloc == down.netloc and orig.path == down.path return orig.netloc == down.netloc and orig.path == down.path
def make_report(
self, status: str, title: str, description: str, uri: str
) -> Dict[str, str]:
return {
"target": cur_target_url.load(),
"status": status,
"title": title,
"description": description,
"uri": uri,
}
def save(self, report_data: List[Dict[str, str]]):
save_report(report_data)

View file

@ -4,8 +4,7 @@ import asyncio
import random import random
import time import time
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import lib.cur_target_url as cur_target_url from lib.report_vuln import report_vuln
from lib.report_vuln import save_report
class RedirectRateLimiter: class RedirectRateLimiter:
"""redirect_uri_check 전용 rate limiter""" """redirect_uri_check 전용 rate limiter"""
@ -1287,13 +1286,21 @@ class RedirectBypassChecker:
except: except:
return "" return ""
def _is_code_in_location(self, location: str) -> bool:
return self._extract_code_from_location(location) != ""
""" mitmproxy용 - 실제 OAuth 플로우 가로채서 분석 """ """ mitmproxy용 - 실제 OAuth 플로우 가로채서 분석 """
async def test(self, flow: http.HTTPFlow): async def test(self, flow: http.HTTPFlow):
url = flow.request.pretty_url url = flow.request.pretty_url
parsed = urlparse(url) parsed = urlparse(url)
query = parse_qs(parsed.query) query = parse_qs(parsed.query)
if "redirect_uri" not in query: # location 헤더에 code가 없으면 스킵
location = flow.response.headers.get("Location", "")
if not self._is_code_in_location(location):
return
if not query or "redirect_uri" not in query:
return return
original_redirect_uri = query["redirect_uri"][0] original_redirect_uri = query["redirect_uri"][0]
@ -1347,15 +1354,13 @@ class RedirectBypassChecker:
# 리다이렉트 응답이 아니면 스킵 # 리다이렉트 응답이 아니면 스킵
if status not in [301, 302, 303, 307, 308]: if status not in [301, 302, 303, 307, 308]:
return False return False
# Location 헤더에서 code 추출
auth_code = self._extract_code_from_location(location)
# 베이스라인 검증 # 베이스라인 검증
is_valid = self._is_baseline_valid(bypassed_uri, original_url) is_valid = self._is_baseline_valid(bypassed_uri, original_url)
if auth_code and not is_valid: if self._is_code_in_location(location) and not is_valid:
# 취약점 발견 시에만 로그 # 취약점 발견 시에만 로그
auth_code = self._extract_code_from_location(location)
print(f"[🎯 VULNERABILITY] {payload.name} 우회 성공!") print(f"[🎯 VULNERABILITY] {payload.name} 우회 성공!")
await self._report_vulnerability(original_url, test_url, bypassed_uri, location, auth_code, payload) await self._report_vulnerability(original_url, test_url, bypassed_uri, location, auth_code, payload)
return True return True
@ -1384,14 +1389,12 @@ class RedirectBypassChecker:
f"• 발급된 인가 코드: {auth_code[:10]}...\n\n" f"• 발급된 인가 코드: {auth_code[:10]}...\n\n"
) )
report_data = [{ report_vuln(
"target": cur_target_url.load(), title="Redirect URI Bypass Vulnerability",
"status": "CRITICAL", desc=description,
"title": "Redirect URI Bypass Vulnerability", status="CRITICAL",
"description": description, uri=test_url
"uri": test_url # uri 필드 추가 )
}]
save_report(report_data)
print(f"[🎯 CRITICAL] Redirect URI 우회 취약점 발견 및 보고 완료!") print(f"[🎯 CRITICAL] Redirect URI 우회 취약점 발견 및 보고 완료!")
print(f"[INFO] 패턴: {pattern_name}, 우회 URI: {bypassed_uri}") print(f"[INFO] 패턴: {pattern_name}, 우회 URI: {bypassed_uri}")

View file

@ -1,5 +1,5 @@
import lib.cur_target_url as cur_target_url import lib.cur_target_url as cur_target_url
from lib.report_vuln import save_report from lib.report_vuln import report_vuln
class ScopeDetection: class ScopeDetection:
def get_scope_from_query(self, query: str) -> str | None: def get_scope_from_query(self, query: str) -> str | None:
@ -43,11 +43,9 @@ class ScopeDetection:
result = await self.check_scope(flow) result = await self.check_scope(flow)
if result != 0: if result != 0:
report_data = [{ report_vuln(
'target': cur_target_url.load(), title="OAuth Scope Value Issue",
'status': "WARNING", desc=f"Detected scope value issue in {method} {url}: {', '.join(result)}",
'title': "OAuth scope value issue", status="WARNING",
'description': f"{method} {url}: {', '.join(result)}", uri=url
'uri': url )
}]
save_report(report_data)

View file

@ -1,14 +1,16 @@
# save as data/report.csv # save as data/report.csv
import os import os
import csv import csv
from typing import List, Dict, Any from mitmproxy import http
import lib.cur_target_url as cur_target_url
# target, status, title, description, uri # target, status, title, description, uri
# file path는 'data/report.csv'로 고정 # file path는 'data/report.csv'로 고정
def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report.csv') -> None: def report_vuln(title: str, desc: str, status: str, uri: str) -> None:
os.makedirs(os.path.dirname(file_path), exist_ok=True) file_path: str = 'data/report.csv'
os.makedirs(os.path.dirname(file_path), exist_ok=True)
""" """
report_data 안의 레포트를 줄씩 CSV에 추가로 저장합니다. report_data 안의 레포트를 줄씩 CSV에 추가로 저장합니다.
@ -23,10 +25,10 @@ def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report
if not file_exists: if not file_exists:
writer.writeheader() writer.writeheader()
for row in report_data: writer.writerow({
# None 방지 & 줄바꿈 이스케이프 'target': cur_target_url.load(),
escaped = { 'status': status,
k: str(v).replace('\n', '\\n') if v is not None else '' 'title': title,
for k, v in row.items() 'description': desc,
} 'uri': uri,
writer.writerow(escaped) })

10
lib/utils/is_oauth_uri.py Normal file
View file

@ -0,0 +1,10 @@
from urllib.parse import urlparse, parse_qs
def is_oauth_uri(uri: str) -> bool:
qs = parse_qs(urlparse(uri).query)
qs_keys = [*qs]
if "client_id" in qs_keys and any(p in qs_keys for p in (
"redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")):
return True
return False