mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 03:51:51 +09:00
테스트 전 기존 코드
This commit is contained in:
parent
69268f0a9a
commit
367a7156bf
2 changed files with 207 additions and 1 deletions
|
|
@ -4,6 +4,7 @@ from pkce_check import PKCEDowngradeChecker
|
|||
from ScopeDetection import ScopeDetection
|
||||
from csrf_check import CsrfChecker
|
||||
from nonce_check import NonceChecker
|
||||
from redirect_uri_check import RedirectBypassChecker
|
||||
|
||||
class PKCEAddon:
|
||||
def __init__(self):
|
||||
|
|
@ -61,4 +62,15 @@ class NonceAddon:
|
|||
print(f"[ERROR] NonceAddon failed: {e}")
|
||||
pass
|
||||
|
||||
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon()]
|
||||
class RedirectBypassAddon:
|
||||
def __init__(self):
|
||||
self.checker = RedirectBypassChecker()
|
||||
|
||||
# request 대신 response 로 바꿔 보세요:
|
||||
async def response(self, flow: http.HTTPFlow):
|
||||
try:
|
||||
await self.checker.test(flow)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] RedirectBypass Addon failed: {e}")
|
||||
|
||||
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), RedirectBypassAddon()]
|
||||
|
|
|
|||
194
addon/redirect_uri_check.py
Normal file
194
addon/redirect_uri_check.py
Normal file
|
|
@ -0,0 +1,194 @@
|
|||
from mitmproxy import http
|
||||
import aiohttp
|
||||
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
|
||||
import lib.target as target
|
||||
from lib.report import save_report
|
||||
|
||||
class BypassPayload:
|
||||
""" 우회 패턴 정의 """
|
||||
def __init__(self, name: str, mutate_func, description: str):
|
||||
self.name = name
|
||||
self.mutate = mutate_func #우회 url 만드는 함수
|
||||
self.description = description
|
||||
|
||||
|
||||
class RedirectBypassChecker:
|
||||
def __init__(self):
|
||||
""" 우회 페이로드 목록 """
|
||||
self.bypass_payloads = [
|
||||
BypassPayload(
|
||||
name="\@",
|
||||
mutate_func=self._mutate_pattern1,
|
||||
description="브라우저는 \@를 사용자정보(@user)로 해석하지만, 일부 서버는 도메인 전체로 해석 → evil.com으로 우회"
|
||||
),
|
||||
""" 테스트 후 우회 패턴 추가 예정 """
|
||||
]
|
||||
self.session = None
|
||||
|
||||
""" 우회 URL 생성 목록 """
|
||||
# 1. \@
|
||||
def _mutate_pattern1(self, original: str) -> str:
|
||||
parsed = urlparse(original)
|
||||
mutated = f"https://evil.com@{parsed.netloc}{parsed.path}"
|
||||
print(f"[redirect_uri_check] original: {original} → mutated: {mutated}")
|
||||
return mutated
|
||||
|
||||
'''aiohttp 세션 생성 (재사용)'''
|
||||
async def _get_session(self):
|
||||
if self.session is None:
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
self.session = aiohttp.ClientSession(timeout=timeout)
|
||||
return self.session
|
||||
|
||||
'''세션 정리'''
|
||||
async def close_session(self):
|
||||
if self.session:
|
||||
await self.session.close()
|
||||
self.session = None
|
||||
|
||||
""" 우회된 URL에 GET 요청을 보내고, 서버 응답 정보 반환 """
|
||||
async def _send_request(self, url, headers=None):
|
||||
try:
|
||||
session = await self._get_session() # 세션 준비
|
||||
request_headers = headers or {} # 요청 헤더 설정 - headers가 주어지지 않았다면 빈 딕셔너리 사용
|
||||
|
||||
# 서버에 GET 요청 전송
|
||||
async with session.get(url, allow_redirects=False, headers=request_headers) as response:
|
||||
return {
|
||||
'status': response.status,
|
||||
'location': response.headers.get("Location", ""),
|
||||
'headers': dict(response.headers)
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"[ERROR] 요청 실패 ({url}): {e}")
|
||||
return {'status': 500, 'location': '', 'headers': {}}
|
||||
|
||||
""" redirect_uri가 기준 도메인(baseUrl)에 속하는지 판단 """
|
||||
def _is_baseline_valid(self, redirect_uri: str, base_url: str) -> bool:
|
||||
try:
|
||||
redirect_parsed = urlparse(redirect_uri)
|
||||
base_parsed = urlparse(base_url)
|
||||
|
||||
redirect_host = redirect_parsed.hostname
|
||||
base_host = base_parsed.hostname
|
||||
|
||||
if not redirect_host or not base_host:
|
||||
return False
|
||||
|
||||
# 동일 도메인 또는 하위 도메인인지 확인
|
||||
return (redirect_host == base_host or redirect_host.endswith(f".{base_host}"))
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] 도메인 검증 실패: {e}")
|
||||
return False
|
||||
|
||||
""" Location 헤더에서 authorization code 추출 """
|
||||
def _extract_code_from_location(self, location: str) -> str:
|
||||
if not location:
|
||||
return ""
|
||||
|
||||
try:
|
||||
parsed = urlparse(location)
|
||||
query = parse_qs(parsed.query)
|
||||
return query.get('code', [''])[0]
|
||||
except:
|
||||
return ""
|
||||
|
||||
""" 메인 테스트 함수 - mitmproxy flow 처리 """
|
||||
async def test(self, flow: http.HTTPFlow):
|
||||
url = flow.request.pretty_url
|
||||
parsed = urlparse(url)
|
||||
query = parse_qs(parsed.query)
|
||||
|
||||
if "redirect_uri" not in query:
|
||||
return
|
||||
|
||||
original_redirect_uri = query["redirect_uri"][0]
|
||||
print(f"[DEBUG] 원본 redirect_uri: {original_redirect_uri}")
|
||||
|
||||
# 원본 요청 헤더 복사 (User-Agent 등)
|
||||
original_headers = {
|
||||
name: value for name, value in flow.request.headers.items()
|
||||
if name.lower() not in ['host', 'content-length']
|
||||
}
|
||||
|
||||
for payload in self.bypass_payloads:
|
||||
try:
|
||||
await self._test_bypass_pattern(
|
||||
url, query, parsed, original_redirect_uri, payload, original_headers
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] 우회 패턴 테스트 실패 ({payload.name}): {e}")
|
||||
continue
|
||||
|
||||
""" 개별 우회 패턴 테스트 """
|
||||
async def _test_bypass_pattern(self, original_url, query, parsed_url, original_redirect_uri, payload, headers):
|
||||
|
||||
# 우회 URL 생성
|
||||
bypassed_uri = payload.mutate(original_redirect_uri)
|
||||
print(f"[DEBUG] 테스트 중인 우회 패턴 ({payload.name}): {bypassed_uri}")
|
||||
|
||||
# 새로운 쿼리 파라미터 구성
|
||||
modified_query = query.copy()
|
||||
modified_query["redirect_uri"] = [bypassed_uri]
|
||||
new_query_string = urlencode(modified_query, doseq=True)
|
||||
test_url = urlunparse(parsed_url._replace(query=new_query_string))
|
||||
|
||||
print(f"[DEBUG] 테스트 URL: {test_url}")
|
||||
|
||||
# 요청 전송
|
||||
response = await self._send_request(test_url, headers)
|
||||
|
||||
# 응답 분석
|
||||
await self._analyze_response(
|
||||
original_url, test_url, bypassed_uri, response, payload
|
||||
)
|
||||
|
||||
""" 응답 분석 및 취약점 판단 """
|
||||
async def _analyze_response(self, original_url, test_url, bypassed_uri, response, payload):
|
||||
status = response['status']
|
||||
location = response['location']
|
||||
|
||||
print(f"[DEBUG] 응답 상태: {status}, Location: {location}")
|
||||
|
||||
# 리다이렉트 응답이 아니면 스킵
|
||||
if status not in [301, 302, 303, 307, 308]:
|
||||
print(f"[DEBUG] 리다이렉트가 아닌 응답: {status}")
|
||||
return
|
||||
|
||||
# Location 헤더에서 code 추출
|
||||
auth_code = self._extract_code_from_location(location)
|
||||
|
||||
if auth_code and not self._is_baseline_valid(bypassed_uri, original_url):
|
||||
# 취약점 발견!
|
||||
await self._report_vulnerability(
|
||||
original_url, test_url, bypassed_uri, location, auth_code, payload
|
||||
)
|
||||
elif auth_code:
|
||||
print(f"[DEBUG] 인가 코드 발급되었지만 유효한 도메인: {bypassed_uri}")
|
||||
else:
|
||||
print(f"[DEBUG] 인가 코드 발급되지 않음")
|
||||
|
||||
""" 취약점 보고서 생성 """
|
||||
async def _report_vulnerability(self, original_url, test_url, bypassed_uri, location, auth_code, payload):
|
||||
description = (
|
||||
f"Redirect URI 우회 취약점 발견!\n\n"
|
||||
f"-- 상세 정보 --:\n"
|
||||
f"• 우회 패턴: {payload.name}\n"
|
||||
f"• 설명: {payload.description}\n"
|
||||
f"• 원본 URL: {original_url}\n"
|
||||
f"• 우회된 redirect_uri: {bypassed_uri}\n"
|
||||
f"• 테스트 URL: {test_url}\n"
|
||||
f"• 리다이렉트 위치: {location}\n"
|
||||
f"• 발급된 인가 코드: {auth_code[:10]}...\n\n"
|
||||
)
|
||||
|
||||
report_data = [{
|
||||
"target": target.load(),
|
||||
"title": "Redirect URI Bypass Vulnerability",
|
||||
"description": description
|
||||
}]
|
||||
|
||||
save_report(report_data)
|
||||
print(f"[🎯 CRITICAL] Redirect URI 우회 취약점 발견 및 보고 완료!")
|
||||
print(f"[INFO] 패턴: {payload.name}, 우회 URI: {bypassed_uri}")
|
||||
Loading…
Add table
Add a link
Reference in a new issue