oauth-backend/addon/access_token.py

155 lines
5.7 KiB
Python

import re
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Any
from mitmproxy.http import HTTPFlow
import lib.target as target
from lib.report import save_report
# 결과 리포트 저장용 데이터 클래스
@dataclass
class TokenLeakResult:
title: str
description: str
uri: str
status: str = "MEDIUM" # 기본 상태
def to_report(self, target_value) -> Dict[str, str]:
"""리포트 저장 포맷(dict)으로 변환"""
return {"target": target_value, **asdict(self)}
# 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너
class AccessTokenScanner:
async def scan(self, flow: HTTPFlow) -> None:
"""단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사."""
print(f"[TOKENDEBUG] Request URL: {flow.request.url}")
findings: List[TokenLeakResult] = []
findings.extend(await self._scan_request(flow.request))
findings.extend(await self._scan_response(flow.response, flow.request.url))
if findings:
target_value = target.load()
save_report([f.to_report(target_value) for f in findings])
# 내부 구현
async def _scan_request(self, request: Any) -> List[TokenLeakResult]:
results: List[TokenLeakResult] = []
print("[TOKENDEBUG] ==scan request==")
# URL 검사
token_result = self._extract_token(request.url)
if token_result:
token, has_bearer = token_result
results.append(
TokenLeakResult(
title="Token Leak in Request URL",
description=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}",
uri=request.url,
status="MEDIUM" if has_bearer else "LOW"
)
)
# Body 검사 (텍스트 컨텐츠인 경우)
if request.content:
body_text = request.get_text(strict=False)
token_result = self._extract_token(body_text)
if token_result:
token, has_bearer = token_result
results.append(
TokenLeakResult(
title="Token Leak in Request Body",
description=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}",
uri=request.url,
status="MEDIUM" if has_bearer else "LOW"
)
)
return results
async def _scan_response(self, response: Optional[Any], request_url: str) -> List[TokenLeakResult]:
if response is None:
return []
results: List[TokenLeakResult] = []
print("[TOKENDEBUG] ==scan response==")
# Location 헤더 검사 (리다이렉트)
if location_header := response.headers.get("Location"):
token_result = self._extract_token(location_header)
if token_result:
token, has_bearer = token_result
if has_bearer:
results.append(
TokenLeakResult(
title="Token Leak in Redirect URL (Location header)",
description=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}",
uri=location_header,
)
)
# Body 검사 (텍스트 컨텐츠인 경우)
if response.content:
body_text = response.get_text(strict=False)
token_result = self._extract_token(body_text)
if token_result:
token, has_bearer = token_result
if has_bearer:
results.append(
TokenLeakResult(
title="Token Leak in Response Body",
description=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}",
uri=request_url,
)
)
return results
# 토큰 탐지 키워드드
_TOKEN_KEYS = [
"access_token",
"accesstoken",
"refresh_token",
"refreshtoken",
"auth_token",
"session_token",
"secret_token",
"ssoauth",
]
# "bearer" 표시가 동시에 존재할 때만 토큰으로 판단하여 false positive를 줄임
_TOKEN_TYPE_REGEXES = [
re.compile(r"token[_-]?type[=:]?\s*bearer", re.IGNORECASE),
re.compile(r"authorization\s*[:=]\s*bearer", re.IGNORECASE),
]
# 동적 컴파일: key=value / "key": "value" / Bearer <token>
_TOKEN_PATTERNS = (
[
re.compile(fr"{key}[=:]\s*([A-Za-z0-9\-._~+/]{{10,}})", re.IGNORECASE)
for key in _TOKEN_KEYS
]
+ [
re.compile(fr"\"{key}\"\s*:\s*\"([^\"]{{10,}})\"", re.IGNORECASE)
for key in _TOKEN_KEYS
]
+ [re.compile(r"bearer\s+([A-Za-z0-9\-._~+/]{10,})", re.IGNORECASE)]
)
def _extract_token(self, text: str) -> Optional[str]:
"""텍스트 블록에서 토큰 후보를 추출. Bearer 유형이 동반된 경우에 한정."""
if not text:
return None
# Bearer 타입이 같이 존재하는지 미리 확인 (속도 & 정확도↑)
has_bearer = any(rx.search(text) for rx in self._TOKEN_TYPE_REGEXES)
for pattern in self._TOKEN_PATTERNS:
if (m := pattern.search(text)) and m.group(1):
print(f"[TOKENDEBUG] token: {m.group(1)}")
print(f"[TOKENDEBUG] has_bearer: {has_bearer}")
return m.group(1), has_bearer
print("[TOKENDEBUG] No matched.")
return None