From 34c547c1b1322f0f922e67c19d2524ebe81e6573 Mon Sep 17 00:00:00 2001 From: KMINGON Date: Mon, 9 Jun 2025 20:00:58 +0900 Subject: [PATCH 1/5] =?UTF-8?q?[FEAT]=20:=20AccessToken=20=ED=83=90?= =?UTF-8?q?=EC=A7=80=20=EA=B8=B0=EB=8A=A5=20=EC=9D=B4=EC=8B=9D=20=EB=B0=8F?= =?UTF-8?q?=20=ED=83=90=EC=A7=80=20=EB=B2=94=EC=9C=84=20=ED=99=95=EC=9E=A5?= =?UTF-8?q?(=ED=83=90=EC=A7=80=20=EA=B8=B0=EC=A4=80=20=EC=99=84=ED=99=94?= =?UTF-8?q?=20=EB=B0=8F=20=EA=B8=B0=EC=A4=80=EB=B3=84=20status=EC=B0=A8?= =?UTF-8?q?=EB=93=B1=20=EB=B6=80=EC=97=AC)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- addon/access_token.py | 155 ++++++++++++++++++++++++++++++++++++++++++ addon/init.py | 16 ++++- 2 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 addon/access_token.py diff --git a/addon/access_token.py b/addon/access_token.py new file mode 100644 index 0000000..6f1c8bb --- /dev/null +++ b/addon/access_token.py @@ -0,0 +1,155 @@ +import re +from dataclasses import dataclass, asdict +from typing import List, Dict, Optional, Any + +from mitmproxy.http import HTTPFlow + +import lib.target as target +from lib.report import save_report + +# 결과 리포트 저장용 데이터 클래스 +@dataclass +class TokenLeakResult: + title: str + description: str + uri: str + status: str = "MEDIUM" # 기본 상태 + + def to_report(self, target_value) -> Dict[str, str]: + """리포트 저장 포맷(dict)으로 변환""" + return {"target": target_value, **asdict(self)} + + +# 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너 +class AccessTokenScanner: + + async def scan(self, flow: HTTPFlow) -> None: + """단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사.""" + print(f"[TOKENDEBUG] Request URL: {flow.request.url}") + findings: List[TokenLeakResult] = [] + + findings.extend(await self._scan_request(flow.request)) + findings.extend(await self._scan_response(flow.response, flow.request.url)) + + if findings: + target_value = target.load() + save_report([f.to_report(target_value) for f in findings]) + + # 내부 구현 + async def _scan_request(self, request: Any) -> List[TokenLeakResult]: + results: List[TokenLeakResult] = [] + + print("[TOKENDEBUG] ==scan request==") + # URL 검사 + token_result = self._extract_token(request.url) + if token_result: + token, has_bearer = token_result + results.append( + TokenLeakResult( + title="Token Leak in Request URL", + description=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…", + uri=request.url, + status="MEDIUM" if has_bearer else "LOW" + ) + ) + + # Body 검사 (텍스트 컨텐츠인 경우) + if request.content: + body_text = request.get_text(strict=False) + token_result = self._extract_token(body_text) + if token_result: + token, has_bearer = token_result + results.append( + TokenLeakResult( + title="Token Leak in Request Body", + description=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…", + uri=request.url, + status="MEDIUM" if has_bearer else "LOW" + ) + ) + + return results + + async def _scan_response(self, response: Optional[Any], request_url: str) -> List[TokenLeakResult]: + if response is None: + return [] + + results: List[TokenLeakResult] = [] + print("[TOKENDEBUG] ==scan response==") + # Location 헤더 검사 (리다이렉트) + if location_header := response.headers.get("Location"): + token_result = self._extract_token(location_header) + if token_result: + token, has_bearer = token_result + if has_bearer: + results.append( + TokenLeakResult( + title="Token Leak in Redirect URL (Location header)", + description=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…", + uri=location_header, + ) + ) + + # Body 검사 (텍스트 컨텐츠인 경우) + if response.content: + body_text = response.get_text(strict=False) + token_result = self._extract_token(body_text) + if token_result: + token, has_bearer = token_result + if has_bearer: + results.append( + TokenLeakResult( + title="Token Leak in Response Body", + description=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…", + uri=request_url, + ) + ) + + return results + + # 토큰 탐지 키워드드 + _TOKEN_KEYS = [ + "access_token", + "accesstoken", + "refresh_token", + "refreshtoken", + "auth_token", + "session_token", + "secret_token", + "ssoauth", + ] + + # "bearer" 표시가 동시에 존재할 때만 토큰으로 판단하여 false positive를 줄임 + _TOKEN_TYPE_REGEXES = [ + re.compile(r"token[_-]?type[=:]?\s*bearer", re.IGNORECASE), + re.compile(r"authorization\s*[:=]\s*bearer", re.IGNORECASE), + ] + + # 동적 컴파일: key=value / "key": "value" / Bearer + _TOKEN_PATTERNS = ( + [ + re.compile(fr"{key}[=:]\s*([A-Za-z0-9\-._~+/]{{10,}})", re.IGNORECASE) + for key in _TOKEN_KEYS + ] + + [ + re.compile(fr"\"{key}\"\s*:\s*\"([^\"]{{10,}})\"", re.IGNORECASE) + for key in _TOKEN_KEYS + ] + + [re.compile(r"bearer\s+([A-Za-z0-9\-._~+/]{10,})", re.IGNORECASE)] + ) + + def _extract_token(self, text: str) -> Optional[str]: + """텍스트 블록에서 토큰 후보를 추출. Bearer 유형이 동반된 경우에 한정.""" + if not text: + return None + + # Bearer 타입이 같이 존재하는지 미리 확인 (속도 & 정확도↑) + has_bearer = any(rx.search(text) for rx in self._TOKEN_TYPE_REGEXES) + + for pattern in self._TOKEN_PATTERNS: + if (m := pattern.search(text)) and m.group(1): + print(f"[TOKENDEBUG] token: {m.group(1)}") + print(f"[TOKENDEBUG] has_bearer: {has_bearer}") + return m.group(1), has_bearer + print("[TOKENDEBUG] No matched.") + return None diff --git a/addon/init.py b/addon/init.py index 1ef743d..c485248 100644 --- a/addon/init.py +++ b/addon/init.py @@ -4,6 +4,7 @@ from pkce_check import PKCEDowngradeChecker from ScopeDetection import ScopeDetection from csrf_check import CsrfChecker from nonce_check import NonceChecker +from access_token import AccessTokenScanner class PKCEAddon: def __init__(self): @@ -61,4 +62,17 @@ class NonceAddon: print(f"[ERROR] NonceAddon failed: {e}") pass -addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon()] + +class AccessTokenAddon: + def __init__(self): + self.checker = AccessTokenScanner() + + async def response(self, flow: http.HTTPFlow): + try: + await self.checker.scan(flow) + except Exception as e: + print(f"[ERROR] AccessToken Addon failed: {e}") + pass + + +addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), AccessTokenAddon()] From 31ca96f03763033fad7c3d7a42b3a16029882e7e Mon Sep 17 00:00:00 2001 From: imnyang Date: Mon, 9 Jun 2025 22:14:03 +0900 Subject: [PATCH 2/5] =?UTF-8?q?[FEAT]=20:=20CI/CD=20=ED=8C=8C=EC=9D=B4?= =?UTF-8?q?=ED=94=84=EB=9D=BC=EC=9D=B8=20=EC=84=A4=EC=A0=95=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80=20=EB=B0=8F=20=EB=B3=B4=EC=95=88=20=EA=B2=80=EC=82=AC?= =?UTF-8?q?=20=EB=8B=A8=EA=B3=84=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/ci.yml | 44 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..157300d --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,44 @@ +name: CI/CD Pipeline + +on: + push: + branches: [main, develop] + pull_request: + branches: [main] + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.13] + + steps: + - uses: actions/checkout@v4 + + - name: Install uv + uses: astral-sh/setup-uv@v4 + with: + enable-cache: true + cache-dependency-glob: "uv.lock" + + - name: Set up Python ${{ matrix.python-version }} + run: uv python install ${{ matrix.python-version }} + + - name: Install dependencies + run: uv sync + + - name: Start application and run proxy test + run: | + # Start the application in background + uv run main.py & + APP_PID=$! + + # Wait for application to start + sleep 5 + + # Test proxy functionality + curl -x http://localhost:11080 http://example.com + + # Clean up + kill $APP_PID From 59b3d7d9d25a220a61732ab500904feee9b85333 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=95=94=EB=83=A5=20=28imnyang=29?= Date: Mon, 9 Jun 2025 22:14:31 +0900 Subject: [PATCH 3/5] Update ci.yml --- .github/workflows/ci.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 157300d..4ef8184 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,7 +2,6 @@ name: CI/CD Pipeline on: push: - branches: [main, develop] pull_request: branches: [main] From aa8bf95a5c3f07d6d8947af2930aa4bb8fa2a3ca Mon Sep 17 00:00:00 2001 From: "tv0924@icloud.com" Date: Mon, 9 Jun 2025 22:29:39 +0900 Subject: [PATCH 4/5] [Fix] NonceAddon --- addon/init.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/addon/init.py b/addon/init.py index c485248..f682683 100644 --- a/addon/init.py +++ b/addon/init.py @@ -57,7 +57,7 @@ class NonceAddon: async def response(self, flow: http.HTTPFlow): try: - await self.checker.response(flow) + await self.checker.check_nonce_in_id_token(flow) except Exception as e: print(f"[ERROR] NonceAddon failed: {e}") pass From 0be13ec5f2d4667b670845bb46c4b7bda13de24e Mon Sep 17 00:00:00 2001 From: "tv0924@icloud.com" Date: Mon, 9 Jun 2025 22:35:58 +0900 Subject: [PATCH 5/5] [Add] TODO --- addon/init.py | 4 +++- addon/nonce_check.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/addon/init.py b/addon/init.py index f682683..3c86b8c 100644 --- a/addon/init.py +++ b/addon/init.py @@ -57,7 +57,9 @@ class NonceAddon: async def response(self, flow: http.HTTPFlow): try: - await self.checker.check_nonce_in_id_token(flow) + pass + # TODO id_token을 파싱하는 부분이 누락되어있습니다. + # await self.checker.check_nonce_in_id_token(flow) except Exception as e: print(f"[ERROR] NonceAddon failed: {e}") pass diff --git a/addon/nonce_check.py b/addon/nonce_check.py index bb1f379..c0af077 100644 --- a/addon/nonce_check.py +++ b/addon/nonce_check.py @@ -68,7 +68,7 @@ class NonceChecker: except Exception as e: return {} - + # TODO id_token을 파싱하는 부분이 누락되어있습니다. def check_nonce_in_id_token(self, flow, id_token: str) -> bool: decoded = self.decode_id_token(id_token) nonce = decoded.get("nonce")