nonceCheck 오류 수정

This commit is contained in:
sultanofdisco 2025-06-12 21:47:07 +09:00
commit ac32d5d604
5 changed files with 253 additions and 4 deletions

43
.github/workflows/ci.yml vendored Normal file
View file

@ -0,0 +1,43 @@
name: CI/CD Pipeline
on:
push:
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.13]
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v4
with:
enable-cache: true
cache-dependency-glob: "uv.lock"
- name: Set up Python ${{ matrix.python-version }}
run: uv python install ${{ matrix.python-version }}
- name: Install dependencies
run: uv sync
- name: Start application and run proxy test
run: |
# Start the application in background
uv run main.py &
APP_PID=$!
# Wait for application to start
sleep 5
# Test proxy functionality
curl -x http://localhost:11080 http://example.com
# Clean up
kill $APP_PID

View file

@ -58,7 +58,13 @@ class NonceAddon:
async def response(self, flow: http.HTTPFlow):
try:
<<<<<<< HEAD
await self.checker.check_nonce_in_id_token(flow)
=======
pass
# TODO id_token을 파싱하는 부분이 누락되어있습니다.
# await self.checker.check_nonce_in_id_token(flow)
>>>>>>> 99fc280517f09bb93d586c26f01239f32c04c56c
except Exception as e:
print(f"[ERROR] NonceAddon failed: {e}")
pass

View file

@ -73,7 +73,7 @@ class NonceChecker:
except Exception as e:
return {}
<<<<<<< HEAD
def check_nonce_in_id_token(self, flow) -> bool:
if not flow.response or not self.is_oidc_flow(flow):
@ -91,10 +91,8 @@ class NonceChecker:
return True
id_token = self.extract_id_token(flow)
=======
# TODO id_token을 파싱하는 부분이 누락되어있습니다.
def check_nonce_in_id_token(self, flow, id_token: str) -> bool:
>>>>>>> 99fc280517f09bb93d586c26f01239f32c04c56c
decoded = self.decode_id_token(id_token)
nonce = decoded.get("nonce")

202
addon/redirect_uri_check.py Normal file
View file

@ -0,0 +1,202 @@
from mitmproxy import http
import aiohttp
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import lib.target as target
from lib.report import save_report
class BypassPayload:
""" 우회 패턴 정의 """
def __init__(self, name: str, mutate_func, description: str):
self.name = name
self.mutate = mutate_func #우회 url 만드는 함수
self.description = description
class RedirectBypassChecker:
def __init__(self):
""" 우회 페이로드 목록 """
self.bypass_payloads = [
BypassPayload(
name=r"@",
mutate_func=self._mutate_pattern1,
description=r"@ 기호를 이용한 호스트 우회 공격: evil.com@target.com"
),
]
self.session = None
""" 우회 URL 생성 목록 """
# 1. @
def _mutate_pattern1(self, original: str) -> str:
parsed = urlparse(original)
mutated = f"https://evil.com@{parsed.netloc}{parsed.path}"
print(f"[redirect_uri_check] original: {original} → mutated: {mutated}")
return mutated
'''aiohttp 세션 생성 (재사용)'''
async def _get_session(self):
if self.session is None:
timeout = aiohttp.ClientTimeout(total=10)
self.session = aiohttp.ClientSession(timeout=timeout)
return self.session
'''세션 정리'''
async def close_session(self):
if self.session:
await self.session.close()
self.session = None
""" 우회된 URL에 GET 요청을 보내고, 서버 응답 정보 반환 """
async def _send_request(self, url, headers=None):
try:
session = await self._get_session() # 세션 준비
request_headers = headers or {} # 요청 헤더 설정 - headers가 주어지지 않았다면 빈 딕셔너리 사용
# 서버에 GET 요청 전송
async with session.get(url, allow_redirects=False, headers=request_headers) as response:
return {
'status': response.status,
'location': response.headers.get("Location", ""),
'headers': dict(response.headers)
}
except Exception as e:
print(f"[ERROR] 요청 실패 ({url}): {e}")
return {'status': 500, 'location': '', 'headers': {}}
""" redirect_uri가 기준 도메인(baseUrl)에 속하는지 판단 """
def _is_baseline_valid(self, redirect_uri: str, base_url: str) -> bool:
try:
redirect_parsed = urlparse(redirect_uri)
base_parsed = urlparse(base_url)
redirect_host = redirect_parsed.hostname
base_host = base_parsed.hostname
if not redirect_host or not base_host:
return False
if "@" in redirect_uri:
if redirect_host != base_host:
print(f"[ALERT] 우회 공격 탐지: {redirect_host} != {base_host}")
return False
at_parts = redirect_uri.split('@')
if len(at_parts) > 1:
before_at = at_parts[0]
if '//' in before_at:
potential_domain = before_at.split('//')[-1]
if '.' in potential_domain and potential_domain != base_host:
print(f"[CRITICAL] @ 우회 공격: {potential_domain}@{base_host}")
return False
is_valid = (redirect_host == base_host or redirect_host.endswith(f".{base_host}"))
return is_valid
except Exception as e:
print(f"[ERROR] 도메인 검증 실패: {e}")
return False
""" Location 헤더에서 authorization code 추출 """
def _extract_code_from_location(self, location: str) -> str:
if not location:
return ""
try:
parsed = urlparse(location)
query = parse_qs(parsed.query)
return query.get('code', [''])[0]
except:
return ""
""" 메인 테스트 함수 - mitmproxy flow 처리 """
async def test(self, flow: http.HTTPFlow):
url = flow.request.pretty_url
parsed = urlparse(url)
query = parse_qs(parsed.query)
if "redirect_uri" not in query:
return
original_redirect_uri = query["redirect_uri"][0]
print(f"[DEBUG] 원본 redirect_uri: {original_redirect_uri}")
for payload in self.bypass_payloads:
try:
await self._test_bypass_pattern(
url, query, parsed, original_redirect_uri, payload, headers={}
)
except Exception as e:
print(f"[ERROR] 우회 패턴 테스트 실패 ({payload.name}): {e}")
continue
""" 개별 우회 패턴 테스트 """
async def _test_bypass_pattern(self, original_url, query, parsed_url, original_redirect_uri, payload, headers):
print(f"[SCAN] 우회 패턴 테스트: {payload.name}")
# 우회 URL 생성
bypassed_uri = payload.mutate(original_redirect_uri)
# 새로운 쿼리 파라미터 구성
modified_query = query.copy()
modified_query["redirect_uri"] = [bypassed_uri]
new_query_string = urlencode(modified_query, doseq=True)
test_url = urlunparse(parsed_url._replace(query=new_query_string))
# 요청 전송
response = await self._send_request(test_url, headers)
# 응답 분석
await self._analyze_response(original_url, test_url, bypassed_uri, response, payload)
""" 응답 분석 및 취약점 판단 """
async def _analyze_response(self, original_url, test_url, bypassed_uri, response, payload):
status = response['status']
location = response['location']
# 리다이렉트 응답이 아니면 스킵
if status not in [301, 302, 303, 307, 308]:
return
# Location 헤더에서 code 추출
auth_code = self._extract_code_from_location(location)
if auth_code and not self._is_baseline_valid(bypassed_uri, original_url):
# 취약점 발견 시에만 로그
print(f"[🎯 VULNERABILITY] {payload.name} 우회 성공!")
await self._report_vulnerability(original_url, test_url, bypassed_uri, location, auth_code, payload)
""" 취약점 보고서 생성 """
async def _report_vulnerability(self, original_url, test_url, bypassed_uri, location, auth_code, payload):
# payload가 문자열인지 객체인지 확인
if hasattr(payload, 'name'):
pattern_name = payload.name
pattern_description = payload.description
else:
pattern_name = str(payload)
pattern_description = "Unknown bypass pattern"
description = (
f"Redirect URI 우회 취약점 발견!\n\n"
f"-- 상세 정보 --:\n"
f"• 우회 패턴: {pattern_name}\n"
f"• 설명: {pattern_description}\n"
f"• 원본 URL: {original_url}\n"
f"• 우회된 redirect_uri: {bypassed_uri}\n"
f"• 테스트 URL: {test_url}\n"
f"• 리다이렉트 위치: {location}\n"
f"• 발급된 인가 코드: {auth_code[:10]}...\n\n"
)
report_data = [{
"target": target.load(),
"status": "CRITICAL",
"title": "Redirect URI Bypass Vulnerability",
"description": description,
"uri": test_url # uri 필드 추가
}]
save_report(report_data)
print(f"[🎯 CRITICAL] Redirect URI 우회 취약점 발견 및 보고 완료!")
print(f"[INFO] 패턴: {pattern_name}, 우회 URI: {bypassed_uri}")

View file

@ -3,4 +3,4 @@ from mitmproxy.tools.main import mitmdump
def run_proxy():
sys.argv = ["mitmdump", "-s", "./addon/init.py", "--listen-port", "11080"]
mitmdump()
mitmdump()