mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 03:41:52 +09:00
Merge branch 'main' into gyu
This commit is contained in:
commit
ef61667cfe
4 changed files with 215 additions and 3 deletions
155
addon/access_token.py
Normal file
155
addon/access_token.py
Normal file
|
|
@ -0,0 +1,155 @@
|
|||
import re
|
||||
from dataclasses import dataclass, asdict
|
||||
from typing import List, Dict, Optional, Any
|
||||
|
||||
from mitmproxy.http import HTTPFlow
|
||||
|
||||
import lib.target as target
|
||||
from lib.report import save_report
|
||||
|
||||
# 결과 리포트 저장용 데이터 클래스
|
||||
@dataclass
|
||||
class TokenLeakResult:
|
||||
title: str
|
||||
description: str
|
||||
uri: str
|
||||
status: str = "MEDIUM" # 기본 상태
|
||||
|
||||
def to_report(self, target_value) -> Dict[str, str]:
|
||||
"""리포트 저장 포맷(dict)으로 변환"""
|
||||
return {"target": target_value, **asdict(self)}
|
||||
|
||||
|
||||
# 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너
|
||||
class AccessTokenScanner:
|
||||
|
||||
async def scan(self, flow: HTTPFlow) -> None:
|
||||
"""단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사."""
|
||||
print(f"[TOKENDEBUG] Request URL: {flow.request.url}")
|
||||
findings: List[TokenLeakResult] = []
|
||||
|
||||
findings.extend(await self._scan_request(flow.request))
|
||||
findings.extend(await self._scan_response(flow.response, flow.request.url))
|
||||
|
||||
if findings:
|
||||
target_value = target.load()
|
||||
save_report([f.to_report(target_value) for f in findings])
|
||||
|
||||
# 내부 구현
|
||||
async def _scan_request(self, request: Any) -> List[TokenLeakResult]:
|
||||
results: List[TokenLeakResult] = []
|
||||
|
||||
print("[TOKENDEBUG] ==scan request==")
|
||||
# URL 검사
|
||||
token_result = self._extract_token(request.url)
|
||||
if token_result:
|
||||
token, has_bearer = token_result
|
||||
results.append(
|
||||
TokenLeakResult(
|
||||
title="Token Leak in Request URL",
|
||||
description=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…",
|
||||
uri=request.url,
|
||||
status="MEDIUM" if has_bearer else "LOW"
|
||||
)
|
||||
)
|
||||
|
||||
# Body 검사 (텍스트 컨텐츠인 경우)
|
||||
if request.content:
|
||||
body_text = request.get_text(strict=False)
|
||||
token_result = self._extract_token(body_text)
|
||||
if token_result:
|
||||
token, has_bearer = token_result
|
||||
results.append(
|
||||
TokenLeakResult(
|
||||
title="Token Leak in Request Body",
|
||||
description=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…",
|
||||
uri=request.url,
|
||||
status="MEDIUM" if has_bearer else "LOW"
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
async def _scan_response(self, response: Optional[Any], request_url: str) -> List[TokenLeakResult]:
|
||||
if response is None:
|
||||
return []
|
||||
|
||||
results: List[TokenLeakResult] = []
|
||||
print("[TOKENDEBUG] ==scan response==")
|
||||
# Location 헤더 검사 (리다이렉트)
|
||||
if location_header := response.headers.get("Location"):
|
||||
token_result = self._extract_token(location_header)
|
||||
if token_result:
|
||||
token, has_bearer = token_result
|
||||
if has_bearer:
|
||||
results.append(
|
||||
TokenLeakResult(
|
||||
title="Token Leak in Redirect URL (Location header)",
|
||||
description=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…",
|
||||
uri=location_header,
|
||||
)
|
||||
)
|
||||
|
||||
# Body 검사 (텍스트 컨텐츠인 경우)
|
||||
if response.content:
|
||||
body_text = response.get_text(strict=False)
|
||||
token_result = self._extract_token(body_text)
|
||||
if token_result:
|
||||
token, has_bearer = token_result
|
||||
if has_bearer:
|
||||
results.append(
|
||||
TokenLeakResult(
|
||||
title="Token Leak in Response Body",
|
||||
description=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…",
|
||||
uri=request_url,
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
# 토큰 탐지 키워드드
|
||||
_TOKEN_KEYS = [
|
||||
"access_token",
|
||||
"accesstoken",
|
||||
"refresh_token",
|
||||
"refreshtoken",
|
||||
"auth_token",
|
||||
"session_token",
|
||||
"secret_token",
|
||||
"ssoauth",
|
||||
]
|
||||
|
||||
# "bearer" 표시가 동시에 존재할 때만 토큰으로 판단하여 false positive를 줄임
|
||||
_TOKEN_TYPE_REGEXES = [
|
||||
re.compile(r"token[_-]?type[=:]?\s*bearer", re.IGNORECASE),
|
||||
re.compile(r"authorization\s*[:=]\s*bearer", re.IGNORECASE),
|
||||
]
|
||||
|
||||
# 동적 컴파일: key=value / "key": "value" / Bearer <token>
|
||||
_TOKEN_PATTERNS = (
|
||||
[
|
||||
re.compile(fr"{key}[=:]\s*([A-Za-z0-9\-._~+/]{{10,}})", re.IGNORECASE)
|
||||
for key in _TOKEN_KEYS
|
||||
]
|
||||
+ [
|
||||
re.compile(fr"\"{key}\"\s*:\s*\"([^\"]{{10,}})\"", re.IGNORECASE)
|
||||
for key in _TOKEN_KEYS
|
||||
]
|
||||
+ [re.compile(r"bearer\s+([A-Za-z0-9\-._~+/]{10,})", re.IGNORECASE)]
|
||||
)
|
||||
|
||||
def _extract_token(self, text: str) -> Optional[str]:
|
||||
"""텍스트 블록에서 토큰 후보를 추출. Bearer 유형이 동반된 경우에 한정."""
|
||||
if not text:
|
||||
return None
|
||||
|
||||
# Bearer 타입이 같이 존재하는지 미리 확인 (속도 & 정확도↑)
|
||||
has_bearer = any(rx.search(text) for rx in self._TOKEN_TYPE_REGEXES)
|
||||
|
||||
for pattern in self._TOKEN_PATTERNS:
|
||||
if (m := pattern.search(text)) and m.group(1):
|
||||
print(f"[TOKENDEBUG] token: {m.group(1)}")
|
||||
print(f"[TOKENDEBUG] has_bearer: {has_bearer}")
|
||||
return m.group(1), has_bearer
|
||||
print("[TOKENDEBUG] No matched.")
|
||||
return None
|
||||
|
|
@ -5,6 +5,7 @@ from ScopeDetection import ScopeDetection
|
|||
from csrf_check import CsrfChecker
|
||||
from nonce_check import NonceChecker
|
||||
from redirect_uri_check import RedirectBypassChecker
|
||||
from access_token import AccessTokenScanner
|
||||
|
||||
class PKCEAddon:
|
||||
def __init__(self):
|
||||
|
|
@ -57,11 +58,24 @@ class NonceAddon:
|
|||
|
||||
async def response(self, flow: http.HTTPFlow):
|
||||
try:
|
||||
await self.checker.response(flow)
|
||||
pass
|
||||
# TODO id_token을 파싱하는 부분이 누락되어있습니다.
|
||||
# await self.checker.check_nonce_in_id_token(flow)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] NonceAddon failed: {e}")
|
||||
pass
|
||||
|
||||
class AccessTokenAddon:
|
||||
def __init__(self):
|
||||
self.checker = AccessTokenScanner()
|
||||
|
||||
async def response(self, flow: http.HTTPFlow):
|
||||
try:
|
||||
await self.checker.scan(flow)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] AccessToken Addon failed: {e}")
|
||||
pass
|
||||
|
||||
class RedirectBypassAddon:
|
||||
def __init__(self):
|
||||
self.checker = RedirectBypassChecker()
|
||||
|
|
@ -73,4 +87,4 @@ class RedirectBypassAddon:
|
|||
except Exception as e:
|
||||
print(f"[ERROR] RedirectBypass Addon failed: {e}")
|
||||
|
||||
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), RedirectBypassAddon()]
|
||||
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), AccessTokenAddon(), RedirectBypassAddon()]
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ class NonceChecker:
|
|||
except Exception as e:
|
||||
return {}
|
||||
|
||||
|
||||
# TODO id_token을 파싱하는 부분이 누락되어있습니다.
|
||||
def check_nonce_in_id_token(self, flow, id_token: str) -> bool:
|
||||
decoded = self.decode_id_token(id_token)
|
||||
nonce = decoded.get("nonce")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue