from mitmproxy import http import aiohttp from urllib.parse import urlparse, parse_qs, urlencode, urlunparse import lib.target as target from lib.report import save_report class BypassPayload: """ 우회 패턴 정의 """ def __init__(self, name: str, mutate_func, description: str): self.name = name self.mutate = mutate_func #우회 url 만드는 함수 self.description = description class RedirectBypassChecker: def __init__(self): """ 우회 페이로드 목록 """ self.bypass_payloads = [ BypassPayload( name=r"@", mutate_func=self._mutate_pattern1, description=r"@ 기호를 이용한 호스트 우회 공격: evil.com@target.com" ), ] self.session = None """ 우회 URL 생성 목록 """ # 1. @ def _mutate_pattern1(self, original: str) -> str: parsed = urlparse(original) mutated = f"https://evil.com@{parsed.netloc}{parsed.path}" print(f"[redirect_uri_check] original: {original} → mutated: {mutated}") return mutated '''aiohttp 세션 생성 (재사용)''' async def _get_session(self): if self.session is None: timeout = aiohttp.ClientTimeout(total=10) self.session = aiohttp.ClientSession(timeout=timeout) return self.session '''세션 정리''' async def close_session(self): if self.session: await self.session.close() self.session = None """ 우회된 URL에 GET 요청을 보내고, 서버 응답 정보 반환 """ async def _send_request(self, url, headers=None): try: session = await self._get_session() # 세션 준비 request_headers = headers or {} # 요청 헤더 설정 - headers가 주어지지 않았다면 빈 딕셔너리 사용 # 서버에 GET 요청 전송 async with session.get(url, allow_redirects=False, headers=request_headers) as response: return { 'status': response.status, 'location': response.headers.get("Location", ""), 'headers': dict(response.headers) } except Exception as e: print(f"[ERROR] 요청 실패 ({url}): {e}") return {'status': 500, 'location': '', 'headers': {}} """ redirect_uri가 기준 도메인(baseUrl)에 속하는지 판단 """ def _is_baseline_valid(self, redirect_uri: str, base_url: str) -> bool: try: redirect_parsed = urlparse(redirect_uri) base_parsed = urlparse(base_url) redirect_host = redirect_parsed.hostname base_host = base_parsed.hostname if not redirect_host or not base_host: return False if "@" in redirect_uri: if redirect_host != base_host: print(f"[ALERT] 우회 공격 탐지: {redirect_host} != {base_host}") return False at_parts = redirect_uri.split('@') if len(at_parts) > 1: before_at = at_parts[0] if '//' in before_at: potential_domain = before_at.split('//')[-1] if '.' in potential_domain and potential_domain != base_host: print(f"[CRITICAL] @ 우회 공격: {potential_domain}@{base_host}") return False is_valid = (redirect_host == base_host or redirect_host.endswith(f".{base_host}")) return is_valid except Exception as e: print(f"[ERROR] 도메인 검증 실패: {e}") return False """ Location 헤더에서 authorization code 추출 """ def _extract_code_from_location(self, location: str) -> str: if not location: return "" try: parsed = urlparse(location) query = parse_qs(parsed.query) return query.get('code', [''])[0] except: return "" """ 메인 테스트 함수 - mitmproxy flow 처리 """ async def test(self, flow: http.HTTPFlow): url = flow.request.pretty_url parsed = urlparse(url) query = parse_qs(parsed.query) if "redirect_uri" not in query: return original_redirect_uri = query["redirect_uri"][0] print(f"[DEBUG] 원본 redirect_uri: {original_redirect_uri}") for payload in self.bypass_payloads: try: await self._test_bypass_pattern( url, query, parsed, original_redirect_uri, payload, headers={} ) except Exception as e: print(f"[ERROR] 우회 패턴 테스트 실패 ({payload.name}): {e}") continue """ 개별 우회 패턴 테스트 """ async def _test_bypass_pattern(self, original_url, query, parsed_url, original_redirect_uri, payload, headers): print(f"[SCAN] 우회 패턴 테스트: {payload.name}") # 우회 URL 생성 bypassed_uri = payload.mutate(original_redirect_uri) # 새로운 쿼리 파라미터 구성 modified_query = query.copy() modified_query["redirect_uri"] = [bypassed_uri] new_query_string = urlencode(modified_query, doseq=True) test_url = urlunparse(parsed_url._replace(query=new_query_string)) # 요청 전송 response = await self._send_request(test_url, headers) # 응답 분석 await self._analyze_response(original_url, test_url, bypassed_uri, response, payload) """ 응답 분석 및 취약점 판단 """ async def _analyze_response(self, original_url, test_url, bypassed_uri, response, payload): status = response['status'] location = response['location'] # 리다이렉트 응답이 아니면 스킵 if status not in [301, 302, 303, 307, 308]: return # Location 헤더에서 code 추출 auth_code = self._extract_code_from_location(location) if auth_code and not self._is_baseline_valid(bypassed_uri, original_url): # 취약점 발견 시에만 로그 print(f"[🎯 VULNERABILITY] {payload.name} 우회 성공!") await self._report_vulnerability(original_url, test_url, bypassed_uri, location, auth_code, payload) """ 취약점 보고서 생성 """ async def _report_vulnerability(self, original_url, test_url, bypassed_uri, location, auth_code, payload): # payload가 문자열인지 객체인지 확인 if hasattr(payload, 'name'): pattern_name = payload.name pattern_description = payload.description else: pattern_name = str(payload) pattern_description = "Unknown bypass pattern" description = ( f"Redirect URI 우회 취약점 발견!\n\n" f"-- 상세 정보 --:\n" f"• 우회 패턴: {pattern_name}\n" f"• 설명: {pattern_description}\n" f"• 원본 URL: {original_url}\n" f"• 우회된 redirect_uri: {bypassed_uri}\n" f"• 테스트 URL: {test_url}\n" f"• 리다이렉트 위치: {location}\n" f"• 발급된 인가 코드: {auth_code[:10]}...\n\n" ) report_data = [{ "target": target.load(), "status": "CRITICAL", "title": "Redirect URI Bypass Vulnerability", "description": description, "uri": test_url # uri 필드 추가 }] save_report(report_data) print(f"[🎯 CRITICAL] Redirect URI 우회 취약점 발견 및 보고 완료!") print(f"[INFO] 패턴: {pattern_name}, 우회 URI: {bypassed_uri}")