mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 05:31:51 +09:00
최신 main pull
This commit is contained in:
commit
2bfe057889
6 changed files with 431 additions and 5 deletions
43
.github/workflows/ci.yml
vendored
Normal file
43
.github/workflows/ci.yml
vendored
Normal file
|
|
@ -0,0 +1,43 @@
|
||||||
|
name: CI/CD Pipeline
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
pull_request:
|
||||||
|
branches: [main]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
test:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
strategy:
|
||||||
|
matrix:
|
||||||
|
python-version: [3.13]
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Install uv
|
||||||
|
uses: astral-sh/setup-uv@v4
|
||||||
|
with:
|
||||||
|
enable-cache: true
|
||||||
|
cache-dependency-glob: "uv.lock"
|
||||||
|
|
||||||
|
- name: Set up Python ${{ matrix.python-version }}
|
||||||
|
run: uv python install ${{ matrix.python-version }}
|
||||||
|
|
||||||
|
- name: Install dependencies
|
||||||
|
run: uv sync
|
||||||
|
|
||||||
|
- name: Start application and run proxy test
|
||||||
|
run: |
|
||||||
|
# Start the application in background
|
||||||
|
uv run main.py &
|
||||||
|
APP_PID=$!
|
||||||
|
|
||||||
|
# Wait for application to start
|
||||||
|
sleep 5
|
||||||
|
|
||||||
|
# Test proxy functionality
|
||||||
|
curl -x http://localhost:11080 http://example.com
|
||||||
|
|
||||||
|
# Clean up
|
||||||
|
kill $APP_PID
|
||||||
155
addon/access_token.py
Normal file
155
addon/access_token.py
Normal file
|
|
@ -0,0 +1,155 @@
|
||||||
|
import re
|
||||||
|
from dataclasses import dataclass, asdict
|
||||||
|
from typing import List, Dict, Optional, Any
|
||||||
|
|
||||||
|
from mitmproxy.http import HTTPFlow
|
||||||
|
|
||||||
|
import lib.target as target
|
||||||
|
from lib.report import save_report
|
||||||
|
|
||||||
|
# 결과 리포트 저장용 데이터 클래스
|
||||||
|
@dataclass
|
||||||
|
class TokenLeakResult:
|
||||||
|
title: str
|
||||||
|
description: str
|
||||||
|
uri: str
|
||||||
|
status: str = "MEDIUM" # 기본 상태
|
||||||
|
|
||||||
|
def to_report(self, target_value) -> Dict[str, str]:
|
||||||
|
"""리포트 저장 포맷(dict)으로 변환"""
|
||||||
|
return {"target": target_value, **asdict(self)}
|
||||||
|
|
||||||
|
|
||||||
|
# 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너
|
||||||
|
class AccessTokenScanner:
|
||||||
|
|
||||||
|
async def scan(self, flow: HTTPFlow) -> None:
|
||||||
|
"""단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사."""
|
||||||
|
print(f"[TOKENDEBUG] Request URL: {flow.request.url}")
|
||||||
|
findings: List[TokenLeakResult] = []
|
||||||
|
|
||||||
|
findings.extend(await self._scan_request(flow.request))
|
||||||
|
findings.extend(await self._scan_response(flow.response, flow.request.url))
|
||||||
|
|
||||||
|
if findings:
|
||||||
|
target_value = target.load()
|
||||||
|
save_report([f.to_report(target_value) for f in findings])
|
||||||
|
|
||||||
|
# 내부 구현
|
||||||
|
async def _scan_request(self, request: Any) -> List[TokenLeakResult]:
|
||||||
|
results: List[TokenLeakResult] = []
|
||||||
|
|
||||||
|
print("[TOKENDEBUG] ==scan request==")
|
||||||
|
# URL 검사
|
||||||
|
token_result = self._extract_token(request.url)
|
||||||
|
if token_result:
|
||||||
|
token, has_bearer = token_result
|
||||||
|
results.append(
|
||||||
|
TokenLeakResult(
|
||||||
|
title="Token Leak in Request URL",
|
||||||
|
description=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…",
|
||||||
|
uri=request.url,
|
||||||
|
status="MEDIUM" if has_bearer else "LOW"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Body 검사 (텍스트 컨텐츠인 경우)
|
||||||
|
if request.content:
|
||||||
|
body_text = request.get_text(strict=False)
|
||||||
|
token_result = self._extract_token(body_text)
|
||||||
|
if token_result:
|
||||||
|
token, has_bearer = token_result
|
||||||
|
results.append(
|
||||||
|
TokenLeakResult(
|
||||||
|
title="Token Leak in Request Body",
|
||||||
|
description=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…",
|
||||||
|
uri=request.url,
|
||||||
|
status="MEDIUM" if has_bearer else "LOW"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
async def _scan_response(self, response: Optional[Any], request_url: str) -> List[TokenLeakResult]:
|
||||||
|
if response is None:
|
||||||
|
return []
|
||||||
|
|
||||||
|
results: List[TokenLeakResult] = []
|
||||||
|
print("[TOKENDEBUG] ==scan response==")
|
||||||
|
# Location 헤더 검사 (리다이렉트)
|
||||||
|
if location_header := response.headers.get("Location"):
|
||||||
|
token_result = self._extract_token(location_header)
|
||||||
|
if token_result:
|
||||||
|
token, has_bearer = token_result
|
||||||
|
if has_bearer:
|
||||||
|
results.append(
|
||||||
|
TokenLeakResult(
|
||||||
|
title="Token Leak in Redirect URL (Location header)",
|
||||||
|
description=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…",
|
||||||
|
uri=location_header,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Body 검사 (텍스트 컨텐츠인 경우)
|
||||||
|
if response.content:
|
||||||
|
body_text = response.get_text(strict=False)
|
||||||
|
token_result = self._extract_token(body_text)
|
||||||
|
if token_result:
|
||||||
|
token, has_bearer = token_result
|
||||||
|
if has_bearer:
|
||||||
|
results.append(
|
||||||
|
TokenLeakResult(
|
||||||
|
title="Token Leak in Response Body",
|
||||||
|
description=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…",
|
||||||
|
uri=request_url,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
# 토큰 탐지 키워드드
|
||||||
|
_TOKEN_KEYS = [
|
||||||
|
"access_token",
|
||||||
|
"accesstoken",
|
||||||
|
"refresh_token",
|
||||||
|
"refreshtoken",
|
||||||
|
"auth_token",
|
||||||
|
"session_token",
|
||||||
|
"secret_token",
|
||||||
|
"ssoauth",
|
||||||
|
]
|
||||||
|
|
||||||
|
# "bearer" 표시가 동시에 존재할 때만 토큰으로 판단하여 false positive를 줄임
|
||||||
|
_TOKEN_TYPE_REGEXES = [
|
||||||
|
re.compile(r"token[_-]?type[=:]?\s*bearer", re.IGNORECASE),
|
||||||
|
re.compile(r"authorization\s*[:=]\s*bearer", re.IGNORECASE),
|
||||||
|
]
|
||||||
|
|
||||||
|
# 동적 컴파일: key=value / "key": "value" / Bearer <token>
|
||||||
|
_TOKEN_PATTERNS = (
|
||||||
|
[
|
||||||
|
re.compile(fr"{key}[=:]\s*([A-Za-z0-9\-._~+/]{{10,}})", re.IGNORECASE)
|
||||||
|
for key in _TOKEN_KEYS
|
||||||
|
]
|
||||||
|
+ [
|
||||||
|
re.compile(fr"\"{key}\"\s*:\s*\"([^\"]{{10,}})\"", re.IGNORECASE)
|
||||||
|
for key in _TOKEN_KEYS
|
||||||
|
]
|
||||||
|
+ [re.compile(r"bearer\s+([A-Za-z0-9\-._~+/]{10,})", re.IGNORECASE)]
|
||||||
|
)
|
||||||
|
|
||||||
|
def _extract_token(self, text: str) -> Optional[str]:
|
||||||
|
"""텍스트 블록에서 토큰 후보를 추출. Bearer 유형이 동반된 경우에 한정."""
|
||||||
|
if not text:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Bearer 타입이 같이 존재하는지 미리 확인 (속도 & 정확도↑)
|
||||||
|
has_bearer = any(rx.search(text) for rx in self._TOKEN_TYPE_REGEXES)
|
||||||
|
|
||||||
|
for pattern in self._TOKEN_PATTERNS:
|
||||||
|
if (m := pattern.search(text)) and m.group(1):
|
||||||
|
print(f"[TOKENDEBUG] token: {m.group(1)}")
|
||||||
|
print(f"[TOKENDEBUG] has_bearer: {has_bearer}")
|
||||||
|
return m.group(1), has_bearer
|
||||||
|
print("[TOKENDEBUG] No matched.")
|
||||||
|
return None
|
||||||
|
|
@ -5,6 +5,8 @@ from ScopeDetection import ScopeDetection
|
||||||
from csrf_check import CsrfChecker
|
from csrf_check import CsrfChecker
|
||||||
from nonce_check import NonceChecker
|
from nonce_check import NonceChecker
|
||||||
from cleintsecret_check import ClientSecretChecker
|
from cleintsecret_check import ClientSecretChecker
|
||||||
|
from redirect_uri_check import RedirectBypassChecker
|
||||||
|
from access_token import AccessTokenScanner
|
||||||
|
|
||||||
class PKCEAddon:
|
class PKCEAddon:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
|
@ -57,7 +59,9 @@ class NonceAddon:
|
||||||
|
|
||||||
async def response(self, flow: http.HTTPFlow):
|
async def response(self, flow: http.HTTPFlow):
|
||||||
try:
|
try:
|
||||||
await self.checker.response(flow)
|
pass
|
||||||
|
# TODO id_token을 파싱하는 부분이 누락되어있습니다.
|
||||||
|
# await self.checker.check_nonce_in_id_token(flow)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[ERROR] NonceAddon failed: {e}")
|
print(f"[ERROR] NonceAddon failed: {e}")
|
||||||
pass
|
pass
|
||||||
|
|
@ -80,4 +84,26 @@ class ClientSecretAddon:
|
||||||
print(f"[ERROR] ClientSecretAddon response failed: {e}")
|
print(f"[ERROR] ClientSecretAddon response failed: {e}")
|
||||||
pass
|
pass
|
||||||
|
|
||||||
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), ClientSecretAddon()]
|
class AccessTokenAddon:
|
||||||
|
def __init__(self):
|
||||||
|
self.checker = AccessTokenScanner()
|
||||||
|
|
||||||
|
async def response(self, flow: http.HTTPFlow):
|
||||||
|
try:
|
||||||
|
await self.checker.scan(flow)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] AccessToken Addon failed: {e}")
|
||||||
|
pass
|
||||||
|
|
||||||
|
class RedirectBypassAddon:
|
||||||
|
def __init__(self):
|
||||||
|
self.checker = RedirectBypassChecker()
|
||||||
|
|
||||||
|
# request 대신 response 로 바꿔 보세요:
|
||||||
|
async def response(self, flow: http.HTTPFlow):
|
||||||
|
try:
|
||||||
|
await self.checker.test(flow)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] RedirectBypass Addon failed: {e}")
|
||||||
|
|
||||||
|
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), ClientSecretAddon(), AccessTokenAddon(), RedirectBypassAddon()]
|
||||||
|
|
|
||||||
|
|
@ -68,7 +68,7 @@ class NonceChecker:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
# TODO id_token을 파싱하는 부분이 누락되어있습니다.
|
||||||
def check_nonce_in_id_token(self, flow, id_token: str) -> bool:
|
def check_nonce_in_id_token(self, flow, id_token: str) -> bool:
|
||||||
decoded = self.decode_id_token(id_token)
|
decoded = self.decode_id_token(id_token)
|
||||||
nonce = decoded.get("nonce")
|
nonce = decoded.get("nonce")
|
||||||
|
|
|
||||||
202
addon/redirect_uri_check.py
Normal file
202
addon/redirect_uri_check.py
Normal file
|
|
@ -0,0 +1,202 @@
|
||||||
|
from mitmproxy import http
|
||||||
|
import aiohttp
|
||||||
|
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
|
||||||
|
import lib.target as target
|
||||||
|
from lib.report import save_report
|
||||||
|
|
||||||
|
class BypassPayload:
|
||||||
|
""" 우회 패턴 정의 """
|
||||||
|
def __init__(self, name: str, mutate_func, description: str):
|
||||||
|
self.name = name
|
||||||
|
self.mutate = mutate_func #우회 url 만드는 함수
|
||||||
|
self.description = description
|
||||||
|
|
||||||
|
|
||||||
|
class RedirectBypassChecker:
|
||||||
|
def __init__(self):
|
||||||
|
""" 우회 페이로드 목록 """
|
||||||
|
self.bypass_payloads = [
|
||||||
|
BypassPayload(
|
||||||
|
name=r"@",
|
||||||
|
mutate_func=self._mutate_pattern1,
|
||||||
|
description=r"@ 기호를 이용한 호스트 우회 공격: evil.com@target.com"
|
||||||
|
),
|
||||||
|
|
||||||
|
]
|
||||||
|
self.session = None
|
||||||
|
|
||||||
|
""" 우회 URL 생성 목록 """
|
||||||
|
# 1. @
|
||||||
|
def _mutate_pattern1(self, original: str) -> str:
|
||||||
|
parsed = urlparse(original)
|
||||||
|
mutated = f"https://evil.com@{parsed.netloc}{parsed.path}"
|
||||||
|
print(f"[redirect_uri_check] original: {original} → mutated: {mutated}")
|
||||||
|
return mutated
|
||||||
|
|
||||||
|
'''aiohttp 세션 생성 (재사용)'''
|
||||||
|
async def _get_session(self):
|
||||||
|
if self.session is None:
|
||||||
|
timeout = aiohttp.ClientTimeout(total=10)
|
||||||
|
self.session = aiohttp.ClientSession(timeout=timeout)
|
||||||
|
return self.session
|
||||||
|
|
||||||
|
'''세션 정리'''
|
||||||
|
async def close_session(self):
|
||||||
|
if self.session:
|
||||||
|
await self.session.close()
|
||||||
|
self.session = None
|
||||||
|
|
||||||
|
""" 우회된 URL에 GET 요청을 보내고, 서버 응답 정보 반환 """
|
||||||
|
async def _send_request(self, url, headers=None):
|
||||||
|
try:
|
||||||
|
session = await self._get_session() # 세션 준비
|
||||||
|
request_headers = headers or {} # 요청 헤더 설정 - headers가 주어지지 않았다면 빈 딕셔너리 사용
|
||||||
|
|
||||||
|
# 서버에 GET 요청 전송
|
||||||
|
async with session.get(url, allow_redirects=False, headers=request_headers) as response:
|
||||||
|
return {
|
||||||
|
'status': response.status,
|
||||||
|
'location': response.headers.get("Location", ""),
|
||||||
|
'headers': dict(response.headers)
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] 요청 실패 ({url}): {e}")
|
||||||
|
return {'status': 500, 'location': '', 'headers': {}}
|
||||||
|
|
||||||
|
""" redirect_uri가 기준 도메인(baseUrl)에 속하는지 판단 """
|
||||||
|
def _is_baseline_valid(self, redirect_uri: str, base_url: str) -> bool:
|
||||||
|
try:
|
||||||
|
redirect_parsed = urlparse(redirect_uri)
|
||||||
|
base_parsed = urlparse(base_url)
|
||||||
|
|
||||||
|
redirect_host = redirect_parsed.hostname
|
||||||
|
base_host = base_parsed.hostname
|
||||||
|
|
||||||
|
if not redirect_host or not base_host:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if "@" in redirect_uri:
|
||||||
|
if redirect_host != base_host:
|
||||||
|
print(f"[ALERT] 우회 공격 탐지: {redirect_host} != {base_host}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
at_parts = redirect_uri.split('@')
|
||||||
|
if len(at_parts) > 1:
|
||||||
|
before_at = at_parts[0]
|
||||||
|
if '//' in before_at:
|
||||||
|
potential_domain = before_at.split('//')[-1]
|
||||||
|
if '.' in potential_domain and potential_domain != base_host:
|
||||||
|
print(f"[CRITICAL] @ 우회 공격: {potential_domain}@{base_host}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
is_valid = (redirect_host == base_host or redirect_host.endswith(f".{base_host}"))
|
||||||
|
return is_valid
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] 도메인 검증 실패: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
""" Location 헤더에서 authorization code 추출 """
|
||||||
|
def _extract_code_from_location(self, location: str) -> str:
|
||||||
|
if not location:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
try:
|
||||||
|
parsed = urlparse(location)
|
||||||
|
query = parse_qs(parsed.query)
|
||||||
|
return query.get('code', [''])[0]
|
||||||
|
except:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
""" 메인 테스트 함수 - mitmproxy flow 처리 """
|
||||||
|
async def test(self, flow: http.HTTPFlow):
|
||||||
|
url = flow.request.pretty_url
|
||||||
|
parsed = urlparse(url)
|
||||||
|
query = parse_qs(parsed.query)
|
||||||
|
|
||||||
|
if "redirect_uri" not in query:
|
||||||
|
return
|
||||||
|
|
||||||
|
original_redirect_uri = query["redirect_uri"][0]
|
||||||
|
print(f"[DEBUG] 원본 redirect_uri: {original_redirect_uri}")
|
||||||
|
|
||||||
|
for payload in self.bypass_payloads:
|
||||||
|
try:
|
||||||
|
await self._test_bypass_pattern(
|
||||||
|
url, query, parsed, original_redirect_uri, payload, headers={}
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] 우회 패턴 테스트 실패 ({payload.name}): {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
""" 개별 우회 패턴 테스트 """
|
||||||
|
async def _test_bypass_pattern(self, original_url, query, parsed_url, original_redirect_uri, payload, headers):
|
||||||
|
|
||||||
|
print(f"[SCAN] 우회 패턴 테스트: {payload.name}")
|
||||||
|
|
||||||
|
# 우회 URL 생성
|
||||||
|
bypassed_uri = payload.mutate(original_redirect_uri)
|
||||||
|
|
||||||
|
# 새로운 쿼리 파라미터 구성
|
||||||
|
modified_query = query.copy()
|
||||||
|
modified_query["redirect_uri"] = [bypassed_uri]
|
||||||
|
new_query_string = urlencode(modified_query, doseq=True)
|
||||||
|
test_url = urlunparse(parsed_url._replace(query=new_query_string))
|
||||||
|
|
||||||
|
# 요청 전송
|
||||||
|
response = await self._send_request(test_url, headers)
|
||||||
|
|
||||||
|
# 응답 분석
|
||||||
|
await self._analyze_response(original_url, test_url, bypassed_uri, response, payload)
|
||||||
|
|
||||||
|
""" 응답 분석 및 취약점 판단 """
|
||||||
|
async def _analyze_response(self, original_url, test_url, bypassed_uri, response, payload):
|
||||||
|
status = response['status']
|
||||||
|
location = response['location']
|
||||||
|
|
||||||
|
# 리다이렉트 응답이 아니면 스킵
|
||||||
|
if status not in [301, 302, 303, 307, 308]:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Location 헤더에서 code 추출
|
||||||
|
auth_code = self._extract_code_from_location(location)
|
||||||
|
|
||||||
|
if auth_code and not self._is_baseline_valid(bypassed_uri, original_url):
|
||||||
|
# 취약점 발견 시에만 로그
|
||||||
|
print(f"[🎯 VULNERABILITY] {payload.name} 우회 성공!")
|
||||||
|
await self._report_vulnerability(original_url, test_url, bypassed_uri, location, auth_code, payload)
|
||||||
|
|
||||||
|
|
||||||
|
""" 취약점 보고서 생성 """
|
||||||
|
async def _report_vulnerability(self, original_url, test_url, bypassed_uri, location, auth_code, payload):
|
||||||
|
# payload가 문자열인지 객체인지 확인
|
||||||
|
if hasattr(payload, 'name'):
|
||||||
|
pattern_name = payload.name
|
||||||
|
pattern_description = payload.description
|
||||||
|
else:
|
||||||
|
pattern_name = str(payload)
|
||||||
|
pattern_description = "Unknown bypass pattern"
|
||||||
|
|
||||||
|
description = (
|
||||||
|
f"Redirect URI 우회 취약점 발견!\n\n"
|
||||||
|
f"-- 상세 정보 --:\n"
|
||||||
|
f"• 우회 패턴: {pattern_name}\n"
|
||||||
|
f"• 설명: {pattern_description}\n"
|
||||||
|
f"• 원본 URL: {original_url}\n"
|
||||||
|
f"• 우회된 redirect_uri: {bypassed_uri}\n"
|
||||||
|
f"• 테스트 URL: {test_url}\n"
|
||||||
|
f"• 리다이렉트 위치: {location}\n"
|
||||||
|
f"• 발급된 인가 코드: {auth_code[:10]}...\n\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
report_data = [{
|
||||||
|
"target": target.load(),
|
||||||
|
"status": "CRITICAL",
|
||||||
|
"title": "Redirect URI Bypass Vulnerability",
|
||||||
|
"description": description,
|
||||||
|
"uri": test_url # uri 필드 추가
|
||||||
|
}]
|
||||||
|
|
||||||
|
save_report(report_data)
|
||||||
|
print(f"[🎯 CRITICAL] Redirect URI 우회 취약점 발견 및 보고 완료!")
|
||||||
|
print(f"[INFO] 패턴: {pattern_name}, 우회 URI: {bypassed_uri}")
|
||||||
Loading…
Add table
Add a link
Reference in a new issue