From 367a7156bf01be92611e4250e3d48d185da6561c Mon Sep 17 00:00:00 2001 From: gyuu04 Date: Mon, 9 Jun 2025 22:34:34 +0900 Subject: [PATCH 1/3] =?UTF-8?q?=ED=85=8C=EC=8A=A4=ED=8A=B8=20=EC=A0=84=20?= =?UTF-8?q?=EA=B8=B0=EC=A1=B4=20=EC=BD=94=EB=93=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- addon/init.py | 14 ++- addon/redirect_uri_check.py | 194 ++++++++++++++++++++++++++++++++++++ 2 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 addon/redirect_uri_check.py diff --git a/addon/init.py b/addon/init.py index 1ef743d..14e4aab 100644 --- a/addon/init.py +++ b/addon/init.py @@ -4,6 +4,7 @@ from pkce_check import PKCEDowngradeChecker from ScopeDetection import ScopeDetection from csrf_check import CsrfChecker from nonce_check import NonceChecker +from redirect_uri_check import RedirectBypassChecker class PKCEAddon: def __init__(self): @@ -61,4 +62,15 @@ class NonceAddon: print(f"[ERROR] NonceAddon failed: {e}") pass -addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon()] +class RedirectBypassAddon: + def __init__(self): + self.checker = RedirectBypassChecker() + + # request 대신 response 로 바꿔 보세요: + async def response(self, flow: http.HTTPFlow): + try: + await self.checker.test(flow) + except Exception as e: + print(f"[ERROR] RedirectBypass Addon failed: {e}") + +addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), RedirectBypassAddon()] diff --git a/addon/redirect_uri_check.py b/addon/redirect_uri_check.py new file mode 100644 index 0000000..7ecb1fb --- /dev/null +++ b/addon/redirect_uri_check.py @@ -0,0 +1,194 @@ +from mitmproxy import http +import aiohttp +from urllib.parse import urlparse, parse_qs, urlencode, urlunparse +import lib.target as target +from lib.report import save_report + +class BypassPayload: + """ 우회 패턴 정의 """ + def __init__(self, name: str, mutate_func, description: str): + self.name = name + self.mutate = mutate_func #우회 url 만드는 함수 + self.description = description + + +class RedirectBypassChecker: + def __init__(self): + """ 우회 페이로드 목록 """ + self.bypass_payloads = [ + BypassPayload( + name="\@", + mutate_func=self._mutate_pattern1, + description="브라우저는 \@를 사용자정보(@user)로 해석하지만, 일부 서버는 도메인 전체로 해석 → evil.com으로 우회" + ), + """ 테스트 후 우회 패턴 추가 예정 """ + ] + self.session = None + + """ 우회 URL 생성 목록 """ + # 1. \@ + def _mutate_pattern1(self, original: str) -> str: + parsed = urlparse(original) + mutated = f"https://evil.com@{parsed.netloc}{parsed.path}" + print(f"[redirect_uri_check] original: {original} → mutated: {mutated}") + return mutated + + '''aiohttp 세션 생성 (재사용)''' + async def _get_session(self): + if self.session is None: + timeout = aiohttp.ClientTimeout(total=10) + self.session = aiohttp.ClientSession(timeout=timeout) + return self.session + + '''세션 정리''' + async def close_session(self): + if self.session: + await self.session.close() + self.session = None + + """ 우회된 URL에 GET 요청을 보내고, 서버 응답 정보 반환 """ + async def _send_request(self, url, headers=None): + try: + session = await self._get_session() # 세션 준비 + request_headers = headers or {} # 요청 헤더 설정 - headers가 주어지지 않았다면 빈 딕셔너리 사용 + + # 서버에 GET 요청 전송 + async with session.get(url, allow_redirects=False, headers=request_headers) as response: + return { + 'status': response.status, + 'location': response.headers.get("Location", ""), + 'headers': dict(response.headers) + } + except Exception as e: + print(f"[ERROR] 요청 실패 ({url}): {e}") + return {'status': 500, 'location': '', 'headers': {}} + + """ redirect_uri가 기준 도메인(baseUrl)에 속하는지 판단 """ + def _is_baseline_valid(self, redirect_uri: str, base_url: str) -> bool: + try: + redirect_parsed = urlparse(redirect_uri) + base_parsed = urlparse(base_url) + + redirect_host = redirect_parsed.hostname + base_host = base_parsed.hostname + + if not redirect_host or not base_host: + return False + + # 동일 도메인 또는 하위 도메인인지 확인 + return (redirect_host == base_host or redirect_host.endswith(f".{base_host}")) + + except Exception as e: + print(f"[ERROR] 도메인 검증 실패: {e}") + return False + + """ Location 헤더에서 authorization code 추출 """ + def _extract_code_from_location(self, location: str) -> str: + if not location: + return "" + + try: + parsed = urlparse(location) + query = parse_qs(parsed.query) + return query.get('code', [''])[0] + except: + return "" + + """ 메인 테스트 함수 - mitmproxy flow 처리 """ + async def test(self, flow: http.HTTPFlow): + url = flow.request.pretty_url + parsed = urlparse(url) + query = parse_qs(parsed.query) + + if "redirect_uri" not in query: + return + + original_redirect_uri = query["redirect_uri"][0] + print(f"[DEBUG] 원본 redirect_uri: {original_redirect_uri}") + + # 원본 요청 헤더 복사 (User-Agent 등) + original_headers = { + name: value for name, value in flow.request.headers.items() + if name.lower() not in ['host', 'content-length'] + } + + for payload in self.bypass_payloads: + try: + await self._test_bypass_pattern( + url, query, parsed, original_redirect_uri, payload, original_headers + ) + except Exception as e: + print(f"[ERROR] 우회 패턴 테스트 실패 ({payload.name}): {e}") + continue + + """ 개별 우회 패턴 테스트 """ + async def _test_bypass_pattern(self, original_url, query, parsed_url, original_redirect_uri, payload, headers): + + # 우회 URL 생성 + bypassed_uri = payload.mutate(original_redirect_uri) + print(f"[DEBUG] 테스트 중인 우회 패턴 ({payload.name}): {bypassed_uri}") + + # 새로운 쿼리 파라미터 구성 + modified_query = query.copy() + modified_query["redirect_uri"] = [bypassed_uri] + new_query_string = urlencode(modified_query, doseq=True) + test_url = urlunparse(parsed_url._replace(query=new_query_string)) + + print(f"[DEBUG] 테스트 URL: {test_url}") + + # 요청 전송 + response = await self._send_request(test_url, headers) + + # 응답 분석 + await self._analyze_response( + original_url, test_url, bypassed_uri, response, payload + ) + + """ 응답 분석 및 취약점 판단 """ + async def _analyze_response(self, original_url, test_url, bypassed_uri, response, payload): + status = response['status'] + location = response['location'] + + print(f"[DEBUG] 응답 상태: {status}, Location: {location}") + + # 리다이렉트 응답이 아니면 스킵 + if status not in [301, 302, 303, 307, 308]: + print(f"[DEBUG] 리다이렉트가 아닌 응답: {status}") + return + + # Location 헤더에서 code 추출 + auth_code = self._extract_code_from_location(location) + + if auth_code and not self._is_baseline_valid(bypassed_uri, original_url): + # 취약점 발견! + await self._report_vulnerability( + original_url, test_url, bypassed_uri, location, auth_code, payload + ) + elif auth_code: + print(f"[DEBUG] 인가 코드 발급되었지만 유효한 도메인: {bypassed_uri}") + else: + print(f"[DEBUG] 인가 코드 발급되지 않음") + + """ 취약점 보고서 생성 """ + async def _report_vulnerability(self, original_url, test_url, bypassed_uri, location, auth_code, payload): + description = ( + f"Redirect URI 우회 취약점 발견!\n\n" + f"-- 상세 정보 --:\n" + f"• 우회 패턴: {payload.name}\n" + f"• 설명: {payload.description}\n" + f"• 원본 URL: {original_url}\n" + f"• 우회된 redirect_uri: {bypassed_uri}\n" + f"• 테스트 URL: {test_url}\n" + f"• 리다이렉트 위치: {location}\n" + f"• 발급된 인가 코드: {auth_code[:10]}...\n\n" + ) + + report_data = [{ + "target": target.load(), + "title": "Redirect URI Bypass Vulnerability", + "description": description + }] + + save_report(report_data) + print(f"[🎯 CRITICAL] Redirect URI 우회 취약점 발견 및 보고 완료!") + print(f"[INFO] 패턴: {payload.name}, 우회 URI: {bypassed_uri}") From 7ac749fa3660091d167fcfa92a245e039ac68595 Mon Sep 17 00:00:00 2001 From: gyuu04 Date: Tue, 10 Jun 2025 00:18:00 +0900 Subject: [PATCH 2/3] =?UTF-8?q?=EC=A7=80=EA=B8=88=20=EB=8B=B9=EC=9E=A5=20?= =?UTF-8?q?=ED=95=84=EC=9A=94=20=EC=97=86=EB=8A=94=20=EB=B6=80=EB=B6=84=20?= =?UTF-8?q?=EC=82=AD=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- addon/redirect_uri_check.py | 8 +------- runner/proxy.py | 2 +- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/addon/redirect_uri_check.py b/addon/redirect_uri_check.py index 7ecb1fb..358904e 100644 --- a/addon/redirect_uri_check.py +++ b/addon/redirect_uri_check.py @@ -106,16 +106,10 @@ class RedirectBypassChecker: original_redirect_uri = query["redirect_uri"][0] print(f"[DEBUG] 원본 redirect_uri: {original_redirect_uri}") - # 원본 요청 헤더 복사 (User-Agent 등) - original_headers = { - name: value for name, value in flow.request.headers.items() - if name.lower() not in ['host', 'content-length'] - } - for payload in self.bypass_payloads: try: await self._test_bypass_pattern( - url, query, parsed, original_redirect_uri, payload, original_headers + url, query, parsed, original_redirect_uri, payload, headers={} ) except Exception as e: print(f"[ERROR] 우회 패턴 테스트 실패 ({payload.name}): {e}") diff --git a/runner/proxy.py b/runner/proxy.py index 7cc2650..d7b856a 100644 --- a/runner/proxy.py +++ b/runner/proxy.py @@ -3,4 +3,4 @@ from mitmproxy.tools.main import mitmdump def run_proxy(): sys.argv = ["mitmdump", "-s", "./addon/init.py", "--listen-port", "11080"] - mitmdump() + mitmdump() \ No newline at end of file From 57625307a795a6420c9c7e86b1dc53986119d703 Mon Sep 17 00:00:00 2001 From: gyuu04 Date: Wed, 11 Jun 2025 15:11:50 +0900 Subject: [PATCH 3/3] =?UTF-8?q?=ED=85=8C=EC=8A=A4=ED=8A=B8=20=EC=99=84?= =?UTF-8?q?=EB=A3=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- addon/redirect_uri_check.py | 74 ++++++++++++++++++++++--------------- 1 file changed, 44 insertions(+), 30 deletions(-) diff --git a/addon/redirect_uri_check.py b/addon/redirect_uri_check.py index 358904e..176e537 100644 --- a/addon/redirect_uri_check.py +++ b/addon/redirect_uri_check.py @@ -17,16 +17,16 @@ class RedirectBypassChecker: """ 우회 페이로드 목록 """ self.bypass_payloads = [ BypassPayload( - name="\@", + name=r"@", mutate_func=self._mutate_pattern1, - description="브라우저는 \@를 사용자정보(@user)로 해석하지만, 일부 서버는 도메인 전체로 해석 → evil.com으로 우회" + description=r"@ 기호를 이용한 호스트 우회 공격: evil.com@target.com" ), - """ 테스트 후 우회 패턴 추가 예정 """ + ] self.session = None """ 우회 URL 생성 목록 """ - # 1. \@ + # 1. @ def _mutate_pattern1(self, original: str) -> str: parsed = urlparse(original) mutated = f"https://evil.com@{parsed.netloc}{parsed.path}" @@ -74,10 +74,24 @@ class RedirectBypassChecker: if not redirect_host or not base_host: return False + + if "@" in redirect_uri: + if redirect_host != base_host: + print(f"[ALERT] 우회 공격 탐지: {redirect_host} != {base_host}") + return False - # 동일 도메인 또는 하위 도메인인지 확인 - return (redirect_host == base_host or redirect_host.endswith(f".{base_host}")) - + at_parts = redirect_uri.split('@') + if len(at_parts) > 1: + before_at = at_parts[0] + if '//' in before_at: + potential_domain = before_at.split('//')[-1] + if '.' in potential_domain and potential_domain != base_host: + print(f"[CRITICAL] @ 우회 공격: {potential_domain}@{base_host}") + return False + + is_valid = (redirect_host == base_host or redirect_host.endswith(f".{base_host}")) + return is_valid + except Exception as e: print(f"[ERROR] 도메인 검증 실패: {e}") return False @@ -118,58 +132,56 @@ class RedirectBypassChecker: """ 개별 우회 패턴 테스트 """ async def _test_bypass_pattern(self, original_url, query, parsed_url, original_redirect_uri, payload, headers): + print(f"[SCAN] 우회 패턴 테스트: {payload.name}") + # 우회 URL 생성 bypassed_uri = payload.mutate(original_redirect_uri) - print(f"[DEBUG] 테스트 중인 우회 패턴 ({payload.name}): {bypassed_uri}") - + # 새로운 쿼리 파라미터 구성 modified_query = query.copy() modified_query["redirect_uri"] = [bypassed_uri] new_query_string = urlencode(modified_query, doseq=True) test_url = urlunparse(parsed_url._replace(query=new_query_string)) - - print(f"[DEBUG] 테스트 URL: {test_url}") # 요청 전송 response = await self._send_request(test_url, headers) - + # 응답 분석 - await self._analyze_response( - original_url, test_url, bypassed_uri, response, payload - ) + await self._analyze_response(original_url, test_url, bypassed_uri, response, payload) """ 응답 분석 및 취약점 판단 """ async def _analyze_response(self, original_url, test_url, bypassed_uri, response, payload): status = response['status'] location = response['location'] - print(f"[DEBUG] 응답 상태: {status}, Location: {location}") - # 리다이렉트 응답이 아니면 스킵 if status not in [301, 302, 303, 307, 308]: - print(f"[DEBUG] 리다이렉트가 아닌 응답: {status}") return # Location 헤더에서 code 추출 auth_code = self._extract_code_from_location(location) if auth_code and not self._is_baseline_valid(bypassed_uri, original_url): - # 취약점 발견! - await self._report_vulnerability( - original_url, test_url, bypassed_uri, location, auth_code, payload - ) - elif auth_code: - print(f"[DEBUG] 인가 코드 발급되었지만 유효한 도메인: {bypassed_uri}") - else: - print(f"[DEBUG] 인가 코드 발급되지 않음") + # 취약점 발견 시에만 로그 + print(f"[🎯 VULNERABILITY] {payload.name} 우회 성공!") + await self._report_vulnerability(original_url, test_url, bypassed_uri, location, auth_code, payload) + """ 취약점 보고서 생성 """ async def _report_vulnerability(self, original_url, test_url, bypassed_uri, location, auth_code, payload): + # payload가 문자열인지 객체인지 확인 + if hasattr(payload, 'name'): + pattern_name = payload.name + pattern_description = payload.description + else: + pattern_name = str(payload) + pattern_description = "Unknown bypass pattern" + description = ( f"Redirect URI 우회 취약점 발견!\n\n" f"-- 상세 정보 --:\n" - f"• 우회 패턴: {payload.name}\n" - f"• 설명: {payload.description}\n" + f"• 우회 패턴: {pattern_name}\n" + f"• 설명: {pattern_description}\n" f"• 원본 URL: {original_url}\n" f"• 우회된 redirect_uri: {bypassed_uri}\n" f"• 테스트 URL: {test_url}\n" @@ -179,10 +191,12 @@ class RedirectBypassChecker: report_data = [{ "target": target.load(), + "status": "CRITICAL", "title": "Redirect URI Bypass Vulnerability", - "description": description + "description": description, + "uri": test_url # uri 필드 추가 }] save_report(report_data) print(f"[🎯 CRITICAL] Redirect URI 우회 취약점 발견 및 보고 완료!") - print(f"[INFO] 패턴: {payload.name}, 우회 URI: {bypassed_uri}") + print(f"[INFO] 패턴: {pattern_name}, 우회 URI: {bypassed_uri}") \ No newline at end of file