diff --git a/addon/init.py b/addon/init.py index 314e6f5..d0618d9 100644 --- a/addon/init.py +++ b/addon/init.py @@ -18,6 +18,8 @@ false_true_varifing_task = FalseTrueVarifingTask() load_dotenv(override=True) +_open_redirect_checker = OpenRedirectChecker() + class AddonBase: """ Base class for addons. @@ -62,8 +64,6 @@ class AddonBase: return False - - async def request(self, flow: http.HTTPFlow): if self.google_login_hint: await try_catch(self.google_login_hint.request(flow)) @@ -73,7 +73,7 @@ class AddonBase: tasks = [ try_catch(PKCEDowngradeChecker().test(flow)), - try_catch(OpenRedirectChecker().test(flow)), + try_catch(_open_redirect_checker.test(flow)), ] await asyncio.gather(*tasks) diff --git a/addon/open_redirect_check.py b/addon/open_redirect_check.py index f3d0c93..202ca35 100644 --- a/addon/open_redirect_check.py +++ b/addon/open_redirect_check.py @@ -3,9 +3,14 @@ import aiohttp import asyncio import random import time -from urllib.parse import urlparse, parse_qs, urlencode, urlunparse +from urllib.parse import urlparse, parse_qs, urlencode, urlunparse, unquote from lib.report_vuln import report_vuln +GLOBAL_REDIRECT_TRACKING = { + 'tracked_redirect_uris': set(), + 'last_cleanup': time.time() +} + class RedirectRateLimiter: """OAuth Open Redirect 체크 전용 rate limiter""" def __init__(self): @@ -1085,10 +1090,67 @@ class OpenRedirectChecker: print(f"[ERROR] 요청 실패 ({url}): {e}") return {'status': 500, 'location': '', 'headers': {}} + def _track_redirect_uri_from_authorization(self, url, parsed, query): + """Authorization 요청에서 redirect_uri 추적""" + + # 메이저 OAuth 제공자에서만 추적 + major_oauth_providers = [ + "accounts.google.com", "oauth2.googleapis.com", + "www.facebook.com", "facebook.com", "graph.facebook.com", + "github.com", "api.github.com", + "login.microsoftonline.com", "login.live.com", + "login.microsoft.com", "account.microsoft.com", + "appleid.apple.com", "accounts.apple.com", + "login.yahoo.com", "accounts.twitter.com", "api.twitter.com", + "linkedin.com", "www.linkedin.com" + ] + + # 메이저 OAuth 제공자가 아니면 추적하지 않음 + if not any(provider in parsed.netloc for provider in major_oauth_providers): + return False + + if 'redirect_uri' in query: + redirect_uri = unquote(query['redirect_uri'][0]) + + # 글로벌 저장소에 추가 + GLOBAL_REDIRECT_TRACKING['tracked_redirect_uris'].add(redirect_uri) + + print(f"[REDIRECT_TRACKING] 📝 redirect_uri 추적: {redirect_uri}") + + # 정기적인 정리 + self._cleanup_old_tracking_data() + + return True + + def _is_tracked_oauth_callback(self, url, parsed, query): + """추적된 redirect_uri와 현재 요청이 매칭되는지 확인""" + + current_base_url = f"https://{parsed.netloc}{parsed.path}" + + if not GLOBAL_REDIRECT_TRACKING['tracked_redirect_uris']: + return False + + for tracked_uri in GLOBAL_REDIRECT_TRACKING['tracked_redirect_uris']: + if current_base_url == tracked_uri: + return True + + return False + def _is_client_redirect_request(self, url, parsed, query): """OAuth 콜백 감지 - OAuth 로그인 완료 후 돌아온 페이지인가 확인""" - # 0. OAuth 제공자 제외 + # B2B/내부 시스템 도메인 제외 + internal_domains = [ + 'zendesk.com', + 'salesforce.com', + 'servicenow.com', + 'atlassian.com', + ] + + if any(domain in parsed.netloc.lower() for domain in internal_domains): + return False + + # OAuth 제공자 제외 oauth_provider_domains = [ # Google "accounts.google.com", "oauth2.googleapis.com", @@ -1111,15 +1173,24 @@ class OpenRedirectChecker: if any(provider in parsed.netloc for provider in oauth_provider_domains): return False + + # 0. 추적된 redirect_uri 매칭 + if self._is_tracked_oauth_callback(url, parsed, query): + # OAuth 응답 파라미터도 확인 + oauth_response_params = ['code', 'access_token', 'id_token', 'state', 'error'] + has_oauth_response = any(param in query for param in oauth_response_params) + + if has_oauth_response: + print(f"[TRACKED_OAUTH] 🎯 추적된 OAuth 콜백 감지: {parsed.netloc}") + return True # 1. URL 경로로 OAuth 콜백인지 확인 - 기존 OAuth 콜백 패턴 oauth_callback_patterns = [ '/oauth/callback', '/auth/callback', '/login/callback', '/oauth/redirect', '/auth/return', '/sso/callback', '/signup', '/register', - '/api/auth/v3/social/callback', # velog.io - '/social/callback', # 일반화 - '/auth/social/callback', # 변형 + '/api/auth/v3/social/callback', '/social/callback', '/auth/social/callback', + '/access/return_to', '/access/', '/return', ] path = parsed.path.lower() @@ -1179,6 +1250,11 @@ class OpenRedirectChecker: # 최종 판단 기준 2 - OAuth 요청 파라미터와 리다이렉트 파라미터가 둘 다 있으면 → OAuth 혼합 패턴! if has_oauth_request and has_oauth_redirect: print(f"[CLIENT_OAUTH] 📱 OAuth 혼합 패턴 감지: {parsed.netloc}") + return True + + # 리다이렉트 패턴 + 리다이렉트 파라미터면 충분 + if is_oauth_callback and has_redirect_param: + print(f"[CLIENT_REDIRECT] 📱 최종 리다이렉트 엔드포인트 감지: {parsed.netloc}") return True return False @@ -1186,7 +1262,7 @@ class OpenRedirectChecker: def _is_self_oauth_request(self, url, parsed, query): """자체 OAuth - authorize 엔드포인트 요청인지 확인 (로그인 불필요)""" - # 0. OAuth 제공자 도메인 제외 + # OAuth 제공자 도메인 제외 oauth_provider_domains = [ "accounts.google.com", "oauth2.googleapis.com", "www.facebook.com", "facebook.com", "graph.facebook.com", @@ -1202,6 +1278,34 @@ class OpenRedirectChecker: if any(provider in parsed.netloc for provider in oauth_provider_domains): return False + + # B2B 연동 제외 (redirect_uri 기반) + if 'redirect_uri' in query: + redirect_uri = unquote(query['redirect_uri'][0]) + + # B2B/내부 시스템 도메인들 + internal_domains = [ + 'zendesk.com', + 'salesforce.com', + 'servicenow.com', + 'atlassian.com', + 'slack.com', + ] + + # redirect_uri가 내부 시스템이면 제외 + if any(domain in redirect_uri.lower() for domain in internal_domains): + return False + + # 내부 시스템 연동 경로 제외 + internal_system_patterns = [ + '/websso/bootstrap', # SSO 부트스트랩 + '/api/internal/', # 내부 API + '/system/', # 시스템 연동 + ] + + path = parsed.path.lower() + if any(pattern in path for pattern in internal_system_patterns): + return False # 1. 자체 OAuth 엔드포인트 패턴 확인 oauth_endpoints = [ @@ -1248,17 +1352,38 @@ class OpenRedirectChecker: if flow.request.method != "GET": return - # OAuth 관련 요청인지 확인 - if (self._is_client_redirect_request(url, parsed, query) or - self._is_self_oauth_request(url, parsed, query)): - await self._test_oauth_redirect(url, parsed, query) - return + # 1. Authorization 요청에서 redirect_uri 추적 + if self._track_redirect_uri_from_authorization(url, parsed, query): + return # Authorization 요청은 여기서 끝 + + # 2. 각 조건을 독립적으로 체크하여 우선순위 결정 + is_tracked_callback = self._is_tracked_oauth_callback(url, parsed, query) + is_self_oauth = self._is_self_oauth_request(url, parsed, query) + is_client_redirect = self._is_client_redirect_request(url, parsed, query) - async def _test_oauth_redirect(self, url, parsed, query): + # 3. 우선순위에 따라 테스트 (높은 위험도 우선) + if is_self_oauth: # 가장 높은 위험도 + print(f"[TEST_TYPE] 🏠 자체 OAuth 시스템: {parsed.netloc}") + await self._test_oauth_redirect(url, parsed, query, test_type="SELF_OAUTH") + elif is_tracked_callback: # 중간 위험도 + print(f"[TEST_TYPE] 🎯 추적된 OAuth 콜백: {parsed.netloc}") + await self._test_oauth_redirect(url, parsed, query, test_type="TRACKED_CALLBACK") + elif is_client_redirect: # 일반 위험도 + print(f"[TEST_TYPE] 📱 클라이언트 리다이렉트: {parsed.netloc}") + await self._test_oauth_redirect(url, parsed, query, test_type="CLIENT_REDIRECT") + + return + + def _remove_tracked_uri(self, uri_to_remove): + """테스트 완료된 URI를 추적 목록에서 제거""" + if uri_to_remove in GLOBAL_REDIRECT_TRACKING['tracked_redirect_uris']: + GLOBAL_REDIRECT_TRACKING['tracked_redirect_uris'].discard(uri_to_remove) + + async def _test_oauth_redirect(self, url, parsed, query, test_type="UNKNOWN"): """OAuth 리다이렉트 취약점 테스트""" - # 중복 테스트 방지 - endpoint_key = f"OAUTH:{parsed.netloc}{parsed.path}" + # 테스트 타입별로 중복 방지 + endpoint_key = f"{test_type}:{parsed.netloc}{parsed.path}" if endpoint_key in self.tested_endpoints: return # 이미 테스트했으므로 패스 @@ -1267,8 +1392,11 @@ class OpenRedirectChecker: self.testing_targets.add(endpoint_key) + # 현재 테스트할 URI 저장 + current_uri = f"https://{parsed.netloc}{parsed.path}" + try: - print(f"[START] 🔍 테스트 시작: {parsed.netloc}{parsed.path}") + print(f"[START] 🔍 테스트 시작 ({test_type}): {parsed.netloc}{parsed.path}") print(f"[TARGET] 타겟: {parsed.netloc}") # 1. state 파라미터 확인 후 조작 @@ -1298,7 +1426,10 @@ class OpenRedirectChecker: # 테스트할 파라미터가 없으면 종료 if not test_params: - print(f"[OAUTH] 리다이렉트 파라미터 없음 - 테스트 ❌") + print(f"[{test_type}] 리다이렉트 파라미터 없음 - 테스트 ❌") + # 테스트할 파라미터 없을 때도 URI 정리 + if test_type == "TRACKED_CALLBACK": + self._remove_tracked_uri(current_uri) return print(f"[OAUTH] 📍 발견된 파라미터들: {test_params}") @@ -1309,7 +1440,7 @@ class OpenRedirectChecker: # 3. 각 파라미터별로 모든 우회 패턴 테스트 for param_name, original_value in test_params: - print(f"\n[OAUTH] 🎯 {param_name} 파라미터 테스트 시작") + print(f"\n[{test_type}] 🎯 {param_name} 파라미터 테스트 시작") for i, payload in enumerate(self.bypass_payloads, 1): print(f"[{i:2d}/{len(self.bypass_payloads)}] {payload.name}", end=" ... ") @@ -1337,8 +1468,15 @@ class OpenRedirectChecker: print("-" * 50) print(f"[OAUTH] ✅ OAuth 테스트 완료: {success_count}개 취약점") + # 테스트 완료 후 URI 정리 + if test_type == "TRACKED_CALLBACK": + self._remove_tracked_uri(current_uri) + except Exception as e: - print(f"[ERROR] OAuth 테스트 실패: {e}") + print(f"[ERROR] {test_type} 테스트 실패: {e}") + # 에러 발생 시에도 URI 정리 + if test_type == "TRACKED_CALLBACK": + self._remove_tracked_uri(current_uri) finally: self.testing_targets.discard(endpoint_key) self.tested_endpoints.add(endpoint_key) @@ -1448,14 +1586,10 @@ class OpenRedirectChecker: # 레이트 리미팅 체크 can_proceed = await redirect_limiter.wait_if_needed(payload.name) if not can_proceed: - print(f"[TARGET_ABANDON] ⏹️ 패턴 테스트 중단: {payload.name}") return False try: - # 1. 테스트 시작 - print(f"[PATTERN] 🚀 {payload.name} 테스트 시작") - - # 2. 우회 URL 생성 + # 1. 우회 URL 생성 if original_redirect_value.startswith('http'): # 이미 완전한 URL인 경우 bypassed_redirect = payload.mutate(original_redirect_value) @@ -1464,23 +1598,18 @@ class OpenRedirectChecker: full_original = f"https://{original_redirect_value}" bypassed_redirect = payload.mutate(full_original) - # 3. 파라미터 보존 전략 - 원본 url의 다른 파라미터들은 그대로 두고 redirect 파라미터만 악성으로 바꿈 + # 2. 파라미터 보존 전략 - 원본 url의 다른 파라미터들은 그대로 두고 redirect 파라미터만 악성으로 바꿈 modified_query = query.copy() modified_query[redirect_param] = [bypassed_redirect] new_query_string = urlencode(modified_query, doseq=True) test_url = urlunparse(parsed_url._replace(query=new_query_string)) - # 4. 요청 전송 + # 3. 요청 전송 response = await self._send_request(test_url) - # 5. 응답 분석 + # 4. 응답 분석 result = await self._analyze_response(original_url, test_url, bypassed_redirect, response, payload) - - if result: - print(f"[PATTERN] ✅ {payload.name} → SUCCESS") - else: - print(f"[PATTERN] ✅ {payload.name} → SAFE") return result