diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..4ef8184 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,43 @@ +name: CI/CD Pipeline + +on: + push: + pull_request: + branches: [main] + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.13] + + steps: + - uses: actions/checkout@v4 + + - name: Install uv + uses: astral-sh/setup-uv@v4 + with: + enable-cache: true + cache-dependency-glob: "uv.lock" + + - name: Set up Python ${{ matrix.python-version }} + run: uv python install ${{ matrix.python-version }} + + - name: Install dependencies + run: uv sync + + - name: Start application and run proxy test + run: | + # Start the application in background + uv run main.py & + APP_PID=$! + + # Wait for application to start + sleep 5 + + # Test proxy functionality + curl -x http://localhost:11080 http://example.com + + # Clean up + kill $APP_PID diff --git a/addon/access_token.py b/addon/access_token.py new file mode 100644 index 0000000..6f1c8bb --- /dev/null +++ b/addon/access_token.py @@ -0,0 +1,155 @@ +import re +from dataclasses import dataclass, asdict +from typing import List, Dict, Optional, Any + +from mitmproxy.http import HTTPFlow + +import lib.target as target +from lib.report import save_report + +# 결과 리포트 저장용 데이터 클래스 +@dataclass +class TokenLeakResult: + title: str + description: str + uri: str + status: str = "MEDIUM" # 기본 상태 + + def to_report(self, target_value) -> Dict[str, str]: + """리포트 저장 포맷(dict)으로 변환""" + return {"target": target_value, **asdict(self)} + + +# 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너 +class AccessTokenScanner: + + async def scan(self, flow: HTTPFlow) -> None: + """단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사.""" + print(f"[TOKENDEBUG] Request URL: {flow.request.url}") + findings: List[TokenLeakResult] = [] + + findings.extend(await self._scan_request(flow.request)) + findings.extend(await self._scan_response(flow.response, flow.request.url)) + + if findings: + target_value = target.load() + save_report([f.to_report(target_value) for f in findings]) + + # 내부 구현 + async def _scan_request(self, request: Any) -> List[TokenLeakResult]: + results: List[TokenLeakResult] = [] + + print("[TOKENDEBUG] ==scan request==") + # URL 검사 + token_result = self._extract_token(request.url) + if token_result: + token, has_bearer = token_result + results.append( + TokenLeakResult( + title="Token Leak in Request URL", + description=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…", + uri=request.url, + status="MEDIUM" if has_bearer else "LOW" + ) + ) + + # Body 검사 (텍스트 컨텐츠인 경우) + if request.content: + body_text = request.get_text(strict=False) + token_result = self._extract_token(body_text) + if token_result: + token, has_bearer = token_result + results.append( + TokenLeakResult( + title="Token Leak in Request Body", + description=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…", + uri=request.url, + status="MEDIUM" if has_bearer else "LOW" + ) + ) + + return results + + async def _scan_response(self, response: Optional[Any], request_url: str) -> List[TokenLeakResult]: + if response is None: + return [] + + results: List[TokenLeakResult] = [] + print("[TOKENDEBUG] ==scan response==") + # Location 헤더 검사 (리다이렉트) + if location_header := response.headers.get("Location"): + token_result = self._extract_token(location_header) + if token_result: + token, has_bearer = token_result + if has_bearer: + results.append( + TokenLeakResult( + title="Token Leak in Redirect URL (Location header)", + description=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…", + uri=location_header, + ) + ) + + # Body 검사 (텍스트 컨텐츠인 경우) + if response.content: + body_text = response.get_text(strict=False) + token_result = self._extract_token(body_text) + if token_result: + token, has_bearer = token_result + if has_bearer: + results.append( + TokenLeakResult( + title="Token Leak in Response Body", + description=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…", + uri=request_url, + ) + ) + + return results + + # 토큰 탐지 키워드드 + _TOKEN_KEYS = [ + "access_token", + "accesstoken", + "refresh_token", + "refreshtoken", + "auth_token", + "session_token", + "secret_token", + "ssoauth", + ] + + # "bearer" 표시가 동시에 존재할 때만 토큰으로 판단하여 false positive를 줄임 + _TOKEN_TYPE_REGEXES = [ + re.compile(r"token[_-]?type[=:]?\s*bearer", re.IGNORECASE), + re.compile(r"authorization\s*[:=]\s*bearer", re.IGNORECASE), + ] + + # 동적 컴파일: key=value / "key": "value" / Bearer + _TOKEN_PATTERNS = ( + [ + re.compile(fr"{key}[=:]\s*([A-Za-z0-9\-._~+/]{{10,}})", re.IGNORECASE) + for key in _TOKEN_KEYS + ] + + [ + re.compile(fr"\"{key}\"\s*:\s*\"([^\"]{{10,}})\"", re.IGNORECASE) + for key in _TOKEN_KEYS + ] + + [re.compile(r"bearer\s+([A-Za-z0-9\-._~+/]{10,})", re.IGNORECASE)] + ) + + def _extract_token(self, text: str) -> Optional[str]: + """텍스트 블록에서 토큰 후보를 추출. Bearer 유형이 동반된 경우에 한정.""" + if not text: + return None + + # Bearer 타입이 같이 존재하는지 미리 확인 (속도 & 정확도↑) + has_bearer = any(rx.search(text) for rx in self._TOKEN_TYPE_REGEXES) + + for pattern in self._TOKEN_PATTERNS: + if (m := pattern.search(text)) and m.group(1): + print(f"[TOKENDEBUG] token: {m.group(1)}") + print(f"[TOKENDEBUG] has_bearer: {has_bearer}") + return m.group(1), has_bearer + print("[TOKENDEBUG] No matched.") + return None diff --git a/addon/init.py b/addon/init.py index d84c551..5b35835 100644 --- a/addon/init.py +++ b/addon/init.py @@ -5,6 +5,8 @@ from ScopeDetection import ScopeDetection from csrf_check import CsrfChecker from nonce_check import NonceChecker from cleintsecret_check import ClientSecretChecker +from redirect_uri_check import RedirectBypassChecker +from access_token import AccessTokenScanner class PKCEAddon: def __init__(self): @@ -57,7 +59,9 @@ class NonceAddon: async def response(self, flow: http.HTTPFlow): try: - await self.checker.response(flow) + pass + # TODO id_token을 파싱하는 부분이 누락되어있습니다. + # await self.checker.check_nonce_in_id_token(flow) except Exception as e: print(f"[ERROR] NonceAddon failed: {e}") pass @@ -78,6 +82,28 @@ class ClientSecretAddon: self.checker.response(flow) except Exception as e: print(f"[ERROR] ClientSecretAddon response failed: {e}") + pass + +class AccessTokenAddon: + def __init__(self): + self.checker = AccessTokenScanner() + + async def response(self, flow: http.HTTPFlow): + try: + await self.checker.scan(flow) + except Exception as e: + print(f"[ERROR] AccessToken Addon failed: {e}") pass - -addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), ClientSecretAddon()] + +class RedirectBypassAddon: + def __init__(self): + self.checker = RedirectBypassChecker() + + # request 대신 response 로 바꿔 보세요: + async def response(self, flow: http.HTTPFlow): + try: + await self.checker.test(flow) + except Exception as e: + print(f"[ERROR] RedirectBypass Addon failed: {e}") + +addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), ClientSecretAddon(), AccessTokenAddon(), RedirectBypassAddon()] diff --git a/addon/nonce_check.py b/addon/nonce_check.py index bb1f379..c0af077 100644 --- a/addon/nonce_check.py +++ b/addon/nonce_check.py @@ -68,7 +68,7 @@ class NonceChecker: except Exception as e: return {} - + # TODO id_token을 파싱하는 부분이 누락되어있습니다. def check_nonce_in_id_token(self, flow, id_token: str) -> bool: decoded = self.decode_id_token(id_token) nonce = decoded.get("nonce") diff --git a/addon/redirect_uri_check.py b/addon/redirect_uri_check.py new file mode 100644 index 0000000..176e537 --- /dev/null +++ b/addon/redirect_uri_check.py @@ -0,0 +1,202 @@ +from mitmproxy import http +import aiohttp +from urllib.parse import urlparse, parse_qs, urlencode, urlunparse +import lib.target as target +from lib.report import save_report + +class BypassPayload: + """ 우회 패턴 정의 """ + def __init__(self, name: str, mutate_func, description: str): + self.name = name + self.mutate = mutate_func #우회 url 만드는 함수 + self.description = description + + +class RedirectBypassChecker: + def __init__(self): + """ 우회 페이로드 목록 """ + self.bypass_payloads = [ + BypassPayload( + name=r"@", + mutate_func=self._mutate_pattern1, + description=r"@ 기호를 이용한 호스트 우회 공격: evil.com@target.com" + ), + + ] + self.session = None + + """ 우회 URL 생성 목록 """ + # 1. @ + def _mutate_pattern1(self, original: str) -> str: + parsed = urlparse(original) + mutated = f"https://evil.com@{parsed.netloc}{parsed.path}" + print(f"[redirect_uri_check] original: {original} → mutated: {mutated}") + return mutated + + '''aiohttp 세션 생성 (재사용)''' + async def _get_session(self): + if self.session is None: + timeout = aiohttp.ClientTimeout(total=10) + self.session = aiohttp.ClientSession(timeout=timeout) + return self.session + + '''세션 정리''' + async def close_session(self): + if self.session: + await self.session.close() + self.session = None + + """ 우회된 URL에 GET 요청을 보내고, 서버 응답 정보 반환 """ + async def _send_request(self, url, headers=None): + try: + session = await self._get_session() # 세션 준비 + request_headers = headers or {} # 요청 헤더 설정 - headers가 주어지지 않았다면 빈 딕셔너리 사용 + + # 서버에 GET 요청 전송 + async with session.get(url, allow_redirects=False, headers=request_headers) as response: + return { + 'status': response.status, + 'location': response.headers.get("Location", ""), + 'headers': dict(response.headers) + } + except Exception as e: + print(f"[ERROR] 요청 실패 ({url}): {e}") + return {'status': 500, 'location': '', 'headers': {}} + + """ redirect_uri가 기준 도메인(baseUrl)에 속하는지 판단 """ + def _is_baseline_valid(self, redirect_uri: str, base_url: str) -> bool: + try: + redirect_parsed = urlparse(redirect_uri) + base_parsed = urlparse(base_url) + + redirect_host = redirect_parsed.hostname + base_host = base_parsed.hostname + + if not redirect_host or not base_host: + return False + + if "@" in redirect_uri: + if redirect_host != base_host: + print(f"[ALERT] 우회 공격 탐지: {redirect_host} != {base_host}") + return False + + at_parts = redirect_uri.split('@') + if len(at_parts) > 1: + before_at = at_parts[0] + if '//' in before_at: + potential_domain = before_at.split('//')[-1] + if '.' in potential_domain and potential_domain != base_host: + print(f"[CRITICAL] @ 우회 공격: {potential_domain}@{base_host}") + return False + + is_valid = (redirect_host == base_host or redirect_host.endswith(f".{base_host}")) + return is_valid + + except Exception as e: + print(f"[ERROR] 도메인 검증 실패: {e}") + return False + + """ Location 헤더에서 authorization code 추출 """ + def _extract_code_from_location(self, location: str) -> str: + if not location: + return "" + + try: + parsed = urlparse(location) + query = parse_qs(parsed.query) + return query.get('code', [''])[0] + except: + return "" + + """ 메인 테스트 함수 - mitmproxy flow 처리 """ + async def test(self, flow: http.HTTPFlow): + url = flow.request.pretty_url + parsed = urlparse(url) + query = parse_qs(parsed.query) + + if "redirect_uri" not in query: + return + + original_redirect_uri = query["redirect_uri"][0] + print(f"[DEBUG] 원본 redirect_uri: {original_redirect_uri}") + + for payload in self.bypass_payloads: + try: + await self._test_bypass_pattern( + url, query, parsed, original_redirect_uri, payload, headers={} + ) + except Exception as e: + print(f"[ERROR] 우회 패턴 테스트 실패 ({payload.name}): {e}") + continue + + """ 개별 우회 패턴 테스트 """ + async def _test_bypass_pattern(self, original_url, query, parsed_url, original_redirect_uri, payload, headers): + + print(f"[SCAN] 우회 패턴 테스트: {payload.name}") + + # 우회 URL 생성 + bypassed_uri = payload.mutate(original_redirect_uri) + + # 새로운 쿼리 파라미터 구성 + modified_query = query.copy() + modified_query["redirect_uri"] = [bypassed_uri] + new_query_string = urlencode(modified_query, doseq=True) + test_url = urlunparse(parsed_url._replace(query=new_query_string)) + + # 요청 전송 + response = await self._send_request(test_url, headers) + + # 응답 분석 + await self._analyze_response(original_url, test_url, bypassed_uri, response, payload) + + """ 응답 분석 및 취약점 판단 """ + async def _analyze_response(self, original_url, test_url, bypassed_uri, response, payload): + status = response['status'] + location = response['location'] + + # 리다이렉트 응답이 아니면 스킵 + if status not in [301, 302, 303, 307, 308]: + return + + # Location 헤더에서 code 추출 + auth_code = self._extract_code_from_location(location) + + if auth_code and not self._is_baseline_valid(bypassed_uri, original_url): + # 취약점 발견 시에만 로그 + print(f"[🎯 VULNERABILITY] {payload.name} 우회 성공!") + await self._report_vulnerability(original_url, test_url, bypassed_uri, location, auth_code, payload) + + + """ 취약점 보고서 생성 """ + async def _report_vulnerability(self, original_url, test_url, bypassed_uri, location, auth_code, payload): + # payload가 문자열인지 객체인지 확인 + if hasattr(payload, 'name'): + pattern_name = payload.name + pattern_description = payload.description + else: + pattern_name = str(payload) + pattern_description = "Unknown bypass pattern" + + description = ( + f"Redirect URI 우회 취약점 발견!\n\n" + f"-- 상세 정보 --:\n" + f"• 우회 패턴: {pattern_name}\n" + f"• 설명: {pattern_description}\n" + f"• 원본 URL: {original_url}\n" + f"• 우회된 redirect_uri: {bypassed_uri}\n" + f"• 테스트 URL: {test_url}\n" + f"• 리다이렉트 위치: {location}\n" + f"• 발급된 인가 코드: {auth_code[:10]}...\n\n" + ) + + report_data = [{ + "target": target.load(), + "status": "CRITICAL", + "title": "Redirect URI Bypass Vulnerability", + "description": description, + "uri": test_url # uri 필드 추가 + }] + + save_report(report_data) + print(f"[🎯 CRITICAL] Redirect URI 우회 취약점 발견 및 보고 완료!") + print(f"[INFO] 패턴: {pattern_name}, 우회 URI: {bypassed_uri}") \ No newline at end of file diff --git a/runner/proxy.py b/runner/proxy.py index 7cc2650..d7b856a 100644 --- a/runner/proxy.py +++ b/runner/proxy.py @@ -3,4 +3,4 @@ from mitmproxy.tools.main import mitmdump def run_proxy(): sys.argv = ["mitmdump", "-s", "./addon/init.py", "--listen-port", "11080"] - mitmdump() + mitmdump() \ No newline at end of file