mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 07:31:51 +09:00
151 lines
No EOL
5.7 KiB
Python
151 lines
No EOL
5.7 KiB
Python
import re
|
|
import asyncio
|
|
|
|
from typing import Optional, Any
|
|
from mitmproxy.http import HTTPFlow
|
|
from urllib.parse import urlparse, parse_qs
|
|
from lib.report_vuln import report_vuln
|
|
|
|
|
|
# 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너
|
|
class AccessTokenScanner:
|
|
|
|
async def scan(self, flow: HTTPFlow) -> None:
|
|
"""단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사."""
|
|
print(f"[TOKENDEBUG] Request URL: {flow.request.url}")
|
|
|
|
async_gather = []
|
|
async_gather.append(self._scan_request(flow.request))
|
|
async_gather.append(self._scan_response(flow.response, flow.request.url))
|
|
await asyncio.gather(*async_gather)
|
|
|
|
# 내부 구현
|
|
async def _scan_request(self, request: Any):
|
|
|
|
print("[TOKENDEBUG] ==scan request==")
|
|
# URL 검사
|
|
if self._is_implicit_flow(request.url):
|
|
print("[TOKENDEBUG] OAuth Implicit Flow detected.")
|
|
report_vuln(
|
|
title="Token Leak in Request URL",
|
|
desc="취약한 Grant Type입니다 (Implicit Grant Type)",
|
|
status="MEDIUM",
|
|
uri=request.url
|
|
)
|
|
|
|
# Body 검사 (텍스트 컨텐츠인 경우)
|
|
if request.content:
|
|
body_text = request.get_text(strict=False)
|
|
token_result = self._extract_token(body_text)
|
|
if token_result:
|
|
report_vuln(
|
|
title="Token Leak in Request Body",
|
|
desc=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token_result[:20]}…",
|
|
status="LOW",
|
|
uri=request.url
|
|
)
|
|
|
|
async def _scan_response(self, response: Optional[Any], request_url: str):
|
|
if response is None:
|
|
return
|
|
|
|
print("[TOKENDEBUG] ==scan response==")
|
|
# Location 헤더 검사 (리다이렉트)
|
|
if location_header := response.headers.get("Location"):
|
|
token_result = self._extract_token(location_header)
|
|
if token_result:
|
|
report_vuln(
|
|
title="Token Leak in Redirect URL (Location header)",
|
|
desc=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token_result[:20]}…",
|
|
status="MEDIUM",
|
|
uri=location_header,
|
|
)
|
|
|
|
# Body 검사 (텍스트 컨텐츠인 경우)
|
|
if response.content:
|
|
body_text = response.get_text(strict=False)
|
|
token_result = self._extract_token(body_text)
|
|
if token_result:
|
|
report_vuln(
|
|
title="Token Leak in Response Body",
|
|
desc=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token_result[:20]}…",
|
|
status="LOW",
|
|
uri=request_url,
|
|
)
|
|
|
|
# 토큰 탐지 키워드드
|
|
_TOKEN_KEYS = [
|
|
"access_token",
|
|
"accesstoken",
|
|
"refresh_token",
|
|
"refreshtoken",
|
|
"auth_token",
|
|
"session_token",
|
|
]
|
|
|
|
# "bearer" 표시가 동시에 존재할 때만 토큰으로 판단하여 false positive를 줄임
|
|
_TOKEN_TYPE_REGEXES = [
|
|
re.compile(r"token[_-]?type[=:]?\s*bearer", re.IGNORECASE),
|
|
re.compile(r"authorization\s*[:=]\s*bearer", re.IGNORECASE),
|
|
]
|
|
|
|
# 동적 컴파일: key=value / "key": "value" / Bearer <token>
|
|
_TOKEN_PATTERNS = (
|
|
[
|
|
re.compile(fr"{key}[=:]\s*([A-Za-z0-9\-._~+/]{{10,}})", re.IGNORECASE)
|
|
for key in _TOKEN_KEYS
|
|
]
|
|
+ [
|
|
re.compile(fr"\"{key}\"\s*:\s*\"([^\"]{{10,}})\"", re.IGNORECASE)
|
|
for key in _TOKEN_KEYS
|
|
]
|
|
+ [re.compile(r"bearer\s+([A-Za-z0-9\-._~+/]{10,})", re.IGNORECASE)]
|
|
)
|
|
|
|
def _extract_token(self, text: str) -> Optional[str]:
|
|
"""텍스트 블록에서 토큰 후보를 추출. Bearer 유형이 동반된 경우에 한정."""
|
|
if not text:
|
|
return None
|
|
|
|
# Bearer 타입이 같이 존재하는지 미리 확인 (속도 & 정확도↑)
|
|
has_bearer = any(rx.search(text) for rx in self._TOKEN_TYPE_REGEXES)
|
|
|
|
for pattern in self._TOKEN_PATTERNS:
|
|
if (m := pattern.search(text)) and m.group(1):
|
|
print(f"[TOKENDEBUG] token: {m.group(1)}")
|
|
print(f"[TOKENDEBUG] has_bearer: {has_bearer}")
|
|
if has_bearer:
|
|
return m.group(1)
|
|
print("[TOKENDEBUG] No matched.")
|
|
return None
|
|
|
|
def _is_implicit_flow(self, request_url: str) -> bool:
|
|
"""
|
|
URL의 파라미터에서 OAuth Implicit Flow 패턴을 체크합니다.
|
|
|
|
Args:
|
|
request_url: 체크할 요청 URL
|
|
|
|
Returns:
|
|
bool: client_id, redirect_uri, response_type이 모두 존재하고
|
|
response_type 값이 'token'인 경우 True, 그렇지 않으면 False
|
|
"""
|
|
try:
|
|
parsed_url = urlparse(request_url)
|
|
query_params = parse_qs(parsed_url.query)
|
|
|
|
# 필요한 파라미터들이 모두 존재하는지 확인
|
|
required_params = ['client_id', 'redirect_uri', 'response_type']
|
|
|
|
for param in required_params:
|
|
if param not in query_params:
|
|
return False
|
|
|
|
# response_type 값이 'token'인지 확인
|
|
response_type_values = query_params.get('response_type', [])
|
|
|
|
# response_type 파라미터가 존재하고 값 중에 'token'이 있는지 확인
|
|
return 'token' in response_type_values
|
|
|
|
except Exception:
|
|
return False |