import re from dataclasses import dataclass, asdict from typing import List, Dict, Optional, Any from mitmproxy.http import HTTPFlow import lib.target as target from lib.report import save_report # 결과 리포트 저장용 데이터 클래스 @dataclass class TokenLeakResult: title: str description: str uri: str status: str = "MEDIUM" # 기본 상태 def to_report(self, target_value) -> Dict[str, str]: """리포트 저장 포맷(dict)으로 변환""" return {"target": target_value, **asdict(self)} # 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너 class AccessTokenScanner: async def scan(self, flow: HTTPFlow) -> None: """단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사.""" print(f"[TOKENDEBUG] Request URL: {flow.request.url}") findings: List[TokenLeakResult] = [] findings.extend(await self._scan_request(flow.request)) findings.extend(await self._scan_response(flow.response, flow.request.url)) if findings: target_value = target.load() save_report([f.to_report(target_value) for f in findings]) # 내부 구현 async def _scan_request(self, request: Any) -> List[TokenLeakResult]: results: List[TokenLeakResult] = [] print("[TOKENDEBUG] ==scan request==") # URL 검사 token_result = self._extract_token(request.url) if token_result: token, has_bearer = token_result results.append( TokenLeakResult( title="Token Leak in Request URL", description=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…", uri=request.url, status="MEDIUM" if has_bearer else "LOW" ) ) # Body 검사 (텍스트 컨텐츠인 경우) if request.content: body_text = request.get_text(strict=False) token_result = self._extract_token(body_text) if token_result: token, has_bearer = token_result results.append( TokenLeakResult( title="Token Leak in Request Body", description=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…", uri=request.url, status="MEDIUM" if has_bearer else "LOW" ) ) return results async def _scan_response(self, response: Optional[Any], request_url: str) -> List[TokenLeakResult]: if response is None: return [] results: List[TokenLeakResult] = [] print("[TOKENDEBUG] ==scan response==") # Location 헤더 검사 (리다이렉트) if location_header := response.headers.get("Location"): token_result = self._extract_token(location_header) if token_result: token, has_bearer = token_result if has_bearer: results.append( TokenLeakResult( title="Token Leak in Redirect URL (Location header)", description=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…", uri=location_header, ) ) # Body 검사 (텍스트 컨텐츠인 경우) if response.content: body_text = response.get_text(strict=False) token_result = self._extract_token(body_text) if token_result: token, has_bearer = token_result if has_bearer: results.append( TokenLeakResult( title="Token Leak in Response Body", description=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…", uri=request_url, ) ) return results # 토큰 탐지 키워드드 _TOKEN_KEYS = [ "access_token", "accesstoken", "refresh_token", "refreshtoken", "auth_token", "session_token", "secret_token", "ssoauth", ] # "bearer" 표시가 동시에 존재할 때만 토큰으로 판단하여 false positive를 줄임 _TOKEN_TYPE_REGEXES = [ re.compile(r"token[_-]?type[=:]?\s*bearer", re.IGNORECASE), re.compile(r"authorization\s*[:=]\s*bearer", re.IGNORECASE), ] # 동적 컴파일: key=value / "key": "value" / Bearer _TOKEN_PATTERNS = ( [ re.compile(fr"{key}[=:]\s*([A-Za-z0-9\-._~+/]{{10,}})", re.IGNORECASE) for key in _TOKEN_KEYS ] + [ re.compile(fr"\"{key}\"\s*:\s*\"([^\"]{{10,}})\"", re.IGNORECASE) for key in _TOKEN_KEYS ] + [re.compile(r"bearer\s+([A-Za-z0-9\-._~+/]{10,})", re.IGNORECASE)] ) def _extract_token(self, text: str) -> Optional[str]: """텍스트 블록에서 토큰 후보를 추출. Bearer 유형이 동반된 경우에 한정.""" if not text: return None # Bearer 타입이 같이 존재하는지 미리 확인 (속도 & 정확도↑) has_bearer = any(rx.search(text) for rx in self._TOKEN_TYPE_REGEXES) for pattern in self._TOKEN_PATTERNS: if (m := pattern.search(text)) and m.group(1): print(f"[TOKENDEBUG] token: {m.group(1)}") print(f"[TOKENDEBUG] has_bearer: {has_bearer}") return m.group(1), has_bearer print("[TOKENDEBUG] No matched.") return None