Merge pull request #8 from j93es/feature/access-token

[FEAT] : AccessToken 탐지 기능 이식 및 탐지 범위 확장(탐지 기준 완화 및 기준별 status차등 부여)
This commit is contained in:
김민곤 2025-06-09 20:15:12 +09:00 committed by GitHub
commit eda0c5a679
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 170 additions and 1 deletions

155
addon/access_token.py Normal file
View file

@ -0,0 +1,155 @@
import re
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Any
from mitmproxy.http import HTTPFlow
import lib.target as target
from lib.report import save_report
# 결과 리포트 저장용 데이터 클래스
@dataclass
class TokenLeakResult:
title: str
description: str
uri: str
status: str = "MEDIUM" # 기본 상태
def to_report(self, target_value) -> Dict[str, str]:
"""리포트 저장 포맷(dict)으로 변환"""
return {"target": target_value, **asdict(self)}
# 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너
class AccessTokenScanner:
async def scan(self, flow: HTTPFlow) -> None:
"""단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사."""
print(f"[TOKENDEBUG] Request URL: {flow.request.url}")
findings: List[TokenLeakResult] = []
findings.extend(await self._scan_request(flow.request))
findings.extend(await self._scan_response(flow.response, flow.request.url))
if findings:
target_value = target.load()
save_report([f.to_report(target_value) for f in findings])
# 내부 구현
async def _scan_request(self, request: Any) -> List[TokenLeakResult]:
results: List[TokenLeakResult] = []
print("[TOKENDEBUG] ==scan request==")
# URL 검사
token_result = self._extract_token(request.url)
if token_result:
token, has_bearer = token_result
results.append(
TokenLeakResult(
title="Token Leak in Request URL",
description=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}",
uri=request.url,
status="MEDIUM" if has_bearer else "LOW"
)
)
# Body 검사 (텍스트 컨텐츠인 경우)
if request.content:
body_text = request.get_text(strict=False)
token_result = self._extract_token(body_text)
if token_result:
token, has_bearer = token_result
results.append(
TokenLeakResult(
title="Token Leak in Request Body",
description=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}",
uri=request.url,
status="MEDIUM" if has_bearer else "LOW"
)
)
return results
async def _scan_response(self, response: Optional[Any], request_url: str) -> List[TokenLeakResult]:
if response is None:
return []
results: List[TokenLeakResult] = []
print("[TOKENDEBUG] ==scan response==")
# Location 헤더 검사 (리다이렉트)
if location_header := response.headers.get("Location"):
token_result = self._extract_token(location_header)
if token_result:
token, has_bearer = token_result
if has_bearer:
results.append(
TokenLeakResult(
title="Token Leak in Redirect URL (Location header)",
description=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}",
uri=location_header,
)
)
# Body 검사 (텍스트 컨텐츠인 경우)
if response.content:
body_text = response.get_text(strict=False)
token_result = self._extract_token(body_text)
if token_result:
token, has_bearer = token_result
if has_bearer:
results.append(
TokenLeakResult(
title="Token Leak in Response Body",
description=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}",
uri=request_url,
)
)
return results
# 토큰 탐지 키워드드
_TOKEN_KEYS = [
"access_token",
"accesstoken",
"refresh_token",
"refreshtoken",
"auth_token",
"session_token",
"secret_token",
"ssoauth",
]
# "bearer" 표시가 동시에 존재할 때만 토큰으로 판단하여 false positive를 줄임
_TOKEN_TYPE_REGEXES = [
re.compile(r"token[_-]?type[=:]?\s*bearer", re.IGNORECASE),
re.compile(r"authorization\s*[:=]\s*bearer", re.IGNORECASE),
]
# 동적 컴파일: key=value / "key": "value" / Bearer <token>
_TOKEN_PATTERNS = (
[
re.compile(fr"{key}[=:]\s*([A-Za-z0-9\-._~+/]{{10,}})", re.IGNORECASE)
for key in _TOKEN_KEYS
]
+ [
re.compile(fr"\"{key}\"\s*:\s*\"([^\"]{{10,}})\"", re.IGNORECASE)
for key in _TOKEN_KEYS
]
+ [re.compile(r"bearer\s+([A-Za-z0-9\-._~+/]{10,})", re.IGNORECASE)]
)
def _extract_token(self, text: str) -> Optional[str]:
"""텍스트 블록에서 토큰 후보를 추출. Bearer 유형이 동반된 경우에 한정."""
if not text:
return None
# Bearer 타입이 같이 존재하는지 미리 확인 (속도 & 정확도↑)
has_bearer = any(rx.search(text) for rx in self._TOKEN_TYPE_REGEXES)
for pattern in self._TOKEN_PATTERNS:
if (m := pattern.search(text)) and m.group(1):
print(f"[TOKENDEBUG] token: {m.group(1)}")
print(f"[TOKENDEBUG] has_bearer: {has_bearer}")
return m.group(1), has_bearer
print("[TOKENDEBUG] No matched.")
return None

View file

@ -4,6 +4,7 @@ from pkce_check import PKCEDowngradeChecker
from ScopeDetection import ScopeDetection
from csrf_check import CsrfChecker
from nonce_check import NonceChecker
from access_token import AccessTokenScanner
class PKCEAddon:
def __init__(self):
@ -61,4 +62,17 @@ class NonceAddon:
print(f"[ERROR] NonceAddon failed: {e}")
pass
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon()]
class AccessTokenAddon:
def __init__(self):
self.checker = AccessTokenScanner()
async def response(self, flow: http.HTTPFlow):
try:
await self.checker.scan(flow)
except Exception as e:
print(f"[ERROR] AccessToken Addon failed: {e}")
pass
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), AccessTokenAddon()]