Compare commits

...

35 commits

Author SHA1 Message Date
James
3018b4fd23
Merge pull request #27 from j93es/gyu 2025-07-30 21:08:56 +09:00
gyuu04
27e7a290ba 진행률 오류 수정 2025-07-22 16:02:45 +09:00
gyuu04
a4b14ab20f Update init.py 2025-07-21 20:30:27 +09:00
gyuu04
8e0523e734 open redirect 2025-07-18 13:35:43 +09:00
gyuu04
7d378fa91f Update open_redirect_check.py 2025-07-17 13:59:05 +09:00
gyuu04
905db47d8a Merge remote-tracking branch 'origin/main' into gyu 2025-07-17 12:34:10 +09:00
gyuu04
182ea21178 open redirect 탐지 2025-07-17 12:11:03 +09:00
61e4ed6119
Merge pull request #26 from j93es/feat/at-hhotfix
[REFACTOR]: Access Token 탐지 조건 완화
2025-07-16 21:06:38 +09:00
KMINGON
8ef13de441 [REFACTOR]: Access TOken 탐지 조건 완화 2025-07-16 21:03:35 +09:00
gyuu04
9898f215f3 open redirect 2025-07-13 14:28:11 +09:00
James
a3b54028b7
Merge pull request #25 from j93es/0712 2025-07-12 16:46:13 +09:00
tv0924@icloud.com
e2ee91034d [Update] client secret | google response type token | google login hint 2025-07-12 12:08:03 +09:00
김민곤
1a97b9d403
Merge pull request #23 from j93es/fix/access-token
Access-Token 동작 오류 Hotfix
2025-07-05 15:27:54 +09:00
KMINGON
cf5746685a [FIX]: implicit type 체크 함수 인자 오류로 동작 안하던 것 수정 2025-07-04 21:31:15 +09:00
c8815f3f28
Merge pull request #22 from j93es/0702-1
[Update] 검증 진행 로직 변경 및 csrf 로직 변경
2025-07-02 23:10:01 +09:00
tv0924@icloud.com
a1758a60d4 [Update] 검증 진행 로직 변경 및 csrf 로직 변경 2025-07-02 11:40:29 +09:00
James
4758d7a689
Merge pull request #21 from j93es/feat/ignore
일부 트래커와 cdn, 여러 파일 확장자를 제외했습니다.
2025-07-01 21:42:24 +09:00
87d5b0209c [Enhance] 정적 파일 확장자 목록에 '.md' 및 '.txt' 추가 2025-06-30 22:03:25 +09:00
5edab9244c 일부 트래커와 cdn, 여러 파일 확장자를 제외했습니다.
뭔가 Type이 이슈가 있는거 같은데 아무래도 내 IDE 설정이 빡세서 그런거 같긴 하네요.
2025-06-30 21:44:08 +09:00
김민곤
949b156f19
Merge pull request #20 from j93es/refactor/access-token
[REFACTOR]: 요청 별 검증 함수를 분리하여 오탐률 개선
2025-06-29 21:03:30 +09:00
KMINGON
c20bcdebf3 [REFACTOR]: 요청 별 검증 함수를 분리하여 오탐률 줄임 2025-06-29 17:14:43 +09:00
James
6e5c37423c
Merge pull request #19 from j93es/date/0626-2 2025-06-26 19:47:03 +09:00
tv0924@icloud.com
0d81fdd49f [Refactor and Enhance] addon init.py의 비동기 작업을 더욱 효율적으로 수행 2025-06-26 19:07:35 +09:00
James
00c81f365a
Merge pull request #18 from j93es/date/0626-1
자동 오탐 검증을 위한 라우터 추가
2025-06-26 18:39:41 +09:00
tv0924@icloud.com
58d5deb435 [Update] 라우터 반환 형태 2025-06-26 15:45:39 +09:00
tv0924@icloud.com
05a095df7d [Docs] api docs 2025-06-26 15:35:26 +09:00
tv0924@icloud.com
4deb032708 [Docs] api docs 2025-06-26 15:35:12 +09:00
tv0924@icloud.com
3c5db3c1fd [Update] 자동 오탐 검증을 위한 라우터 추가 2025-06-26 15:20:30 +09:00
tv0924@icloud.com
53db0fb14e [Fix] scope detection 2025-06-26 12:40:14 +09:00
tv0924@icloud.com
3a1422a2f2 [Update] save vuln report logic 2025-06-26 12:20:41 +09:00
tv0924@icloud.com
062552d3d8 [Refactor] 리팩터링 2025-06-26 10:43:52 +09:00
gyuu04
afcfd7de87
Merge pull request #17 from j93es/gyu
OAuth redirect_uri 우회 패턴 17개 추가 및 테스트 완료
2025-06-25 14:17:24 +09:00
gyuu04
1c6fc53a81 redirect_uri 우회 패턴 추가
- 57개 우회 패턴 구체화
- 적응형 레이트 리미팅 추가 (차단 방지)
2025-06-25 14:14:19 +09:00
gyuu04
6dceba0c24 OAuth redirect_uri 우회 패턴 17개 추가 및 테스트 완료
- 안전한 테스트 도메인 적용 (evil.example)
2025-06-24 16:23:05 +09:00
James
0bee707406
Merge pull request #16 from j93es/chore/env
chroe: Set the environment variable
2025-06-21 15:24:58 +09:00
20 changed files with 2267 additions and 563 deletions

2
.gitignore vendored
View file

@ -9,6 +9,8 @@ wheels/
# Virtual environments
.venv
.env
data/

View file

@ -51,40 +51,44 @@ http://localhost:11081로 백엔드 서버가 열리게 됩니다.
`./addon/init.py`
```py
from example_check import Example
class LoggerAddon:
def __init__(self):
self.checker = Example()
def request(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것
self.checker.test(flow)
def response(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것
self.checker.test(flow)
...
async def request(self, flow: http.HTTPFlow):
if false_true_varifing_task.is_verifing_false_true():
return
tasks = [
try_catch(self.google_login_hint.request(flow)) if self.google_login_hint else None,
try_catch(PKCEDowngradeChecker().test(flow)),
try_catch(Example().test(flow))
]
await asyncio.gather(*tasks)
...
```
`./addon/example.py`
```py
import lib.target as target
from lib.report import save_report
from lib.report_vuln import report_vuln
class Example:
async def test(self, flow):
req = flow.request
method = req.method
url = req.pretty_url
# data/report.csv에 저장
report_data = [{
'target': target.load(),
'status': "CRITICAL",
'title': "PKCE Downgrade Vulnerability",
'description': "PKCE downgrade vulnerability detected! Both URLs returned authorization code.",
'uri': f"Original: {url}\nDowngraded: {downgraded_url}"
}]
save_report(report_data)
report_vuln(
title="PKCE Plain Method",
desc="PKCE method is set to 'plain'. Possible downgrade.",
status="CRITICAL",
uri=url,
)
```
이러한 예제를 참고하여 작성하여주세요.
# 백엔드 API DOCS
`uv run main.py`으로 백엔드를 실행한 후에, 다음의 url에 접속합니다.
```
http://localhost:11081/redoc
```

View file

@ -1,53 +0,0 @@
import lib.target as target
from lib.report import save_report
class ScopeDetection:
def get_scope_from_query(self, query: str) -> str | None:
if not query:
return None
import urllib.parse
parsed = urllib.parse.parse_qs(query)
scope_values = parsed.get("scope", [])
if scope_values:
return scope_values[0]
return None
async def check_scope(self, flow):
req = flow.request
res = flow.response
# req.query가 MultiDictView일 수 있으므로 문자열로 변환
if hasattr(req.query, "urlencode"):
query = req.query.urlencode()
else:
query = str(req.query) if req.query else ""
location = res.headers.get("location", "")
query_scope = self.get_scope_from_query(query)
location_scope = self.get_scope_from_query(location)
result = []
if query_scope in ["all", "*"]:
result.append(f"Scope value issue detected in request: {query_scope}")
if location_scope in ["all", "*"]:
result.append(f"Scope value issue detected in response location: {location_scope}")
return result if result else 0
async def test(self, flow):
req = flow.request
method = req.method
url = req.pretty_url
result = await self.check_scope(flow)
if result != 0:
report_data = [{
'target': target.load(),
'status': "WARNING",
'title': "OAuth scope value issue",
'description': f"{method} {url}: {', '.join(result)}",
'uri': url
}]
save_report(report_data)

View file

@ -1,23 +1,10 @@
import re
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Any
import asyncio
from typing import Optional, Any
from mitmproxy.http import HTTPFlow
import lib.target as target
from lib.report import save_report
# 결과 리포트 저장용 데이터 클래스
@dataclass
class TokenLeakResult:
title: str
description: str
uri: str
status: str = "MEDIUM" # 기본 상태
def to_report(self, target_value) -> Dict[str, str]:
"""리포트 저장 포맷(dict)으로 변환"""
return {"target": target_value, **asdict(self)}
from urllib.parse import urlparse, parse_qs
from lib.report_vuln import report_vuln
# 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너
@ -26,31 +13,24 @@ class AccessTokenScanner:
async def scan(self, flow: HTTPFlow) -> None:
"""단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사."""
print(f"[TOKENDEBUG] Request URL: {flow.request.url}")
findings: List[TokenLeakResult] = []
findings.extend(await self._scan_request(flow.request))
findings.extend(await self._scan_response(flow.response, flow.request.url))
if findings:
target_value = target.load()
save_report([f.to_report(target_value) for f in findings])
async_gather = []
async_gather.append(self._scan_request(flow.request))
async_gather.append(self._scan_response(flow.response, flow.request.url))
await asyncio.gather(*async_gather)
# 내부 구현
async def _scan_request(self, request: Any) -> List[TokenLeakResult]:
results: List[TokenLeakResult] = []
async def _scan_request(self, request: Any):
print("[TOKENDEBUG] ==scan request==")
# URL 검사
token_result = self._extract_token(request.url)
if token_result:
token, has_bearer = token_result
results.append(
TokenLeakResult(
title="Token Leak in Request URL",
description=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}",
uri=request.url,
status="MEDIUM" if has_bearer else "LOW"
)
if self._is_implicit_flow(request.url):
print("[TOKENDEBUG] OAuth Implicit Flow detected.")
report_vuln(
title="Token Leak in Request URL",
desc="취약한 Grant Type입니다 (Implicit Grant Type)",
status="MEDIUM",
uri=request.url
)
# Body 검사 (텍스트 컨텐츠인 경우)
@ -58,54 +38,40 @@ class AccessTokenScanner:
body_text = request.get_text(strict=False)
token_result = self._extract_token(body_text)
if token_result:
token, has_bearer = token_result
results.append(
TokenLeakResult(
title="Token Leak in Request Body",
description=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}",
uri=request.url,
status="MEDIUM" if has_bearer else "LOW"
)
report_vuln(
title="Token Leak in Request Body",
desc=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token_result[:20]}",
status="LOW",
uri=request.url
)
return results
async def _scan_response(self, response: Optional[Any], request_url: str) -> List[TokenLeakResult]:
async def _scan_response(self, response: Optional[Any], request_url: str):
if response is None:
return []
return
results: List[TokenLeakResult] = []
print("[TOKENDEBUG] ==scan response==")
# Location 헤더 검사 (리다이렉트)
if location_header := response.headers.get("Location"):
token_result = self._extract_token(location_header)
if token_result:
token, has_bearer = token_result
if has_bearer:
results.append(
TokenLeakResult(
title="Token Leak in Redirect URL (Location header)",
description=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}",
uri=location_header,
)
)
report_vuln(
title="Token Leak in Redirect URL (Location header)",
desc=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token_result[:20]}",
status="MEDIUM",
uri=location_header,
)
# Body 검사 (텍스트 컨텐츠인 경우)
if response.content:
body_text = response.get_text(strict=False)
token_result = self._extract_token(body_text)
if token_result:
token, has_bearer = token_result
if has_bearer:
results.append(
TokenLeakResult(
title="Token Leak in Response Body",
description=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}",
uri=request_url,
)
)
return results
report_vuln(
title="Token Leak in Response Body",
desc=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token_result[:20]}",
status="LOW",
uri=request_url,
)
# 토큰 탐지 키워드드
_TOKEN_KEYS = [
@ -115,8 +81,6 @@ class AccessTokenScanner:
"refreshtoken",
"auth_token",
"session_token",
"secret_token",
"ssoauth",
]
# "bearer" 표시가 동시에 존재할 때만 토큰으로 판단하여 false positive를 줄임
@ -150,6 +114,38 @@ class AccessTokenScanner:
if (m := pattern.search(text)) and m.group(1):
print(f"[TOKENDEBUG] token: {m.group(1)}")
print(f"[TOKENDEBUG] has_bearer: {has_bearer}")
return m.group(1), has_bearer
if has_bearer:
return m.group(1)
print("[TOKENDEBUG] No matched.")
return None
def _is_implicit_flow(self, request_url: str) -> bool:
"""
URL의 파라미터에서 OAuth Implicit Flow 패턴을 체크합니다.
Args:
request_url: 체크할 요청 URL
Returns:
bool: client_id, redirect_uri, response_type이 모두 존재하고
response_type 값이 'token' 경우 True, 그렇지 않으면 False
"""
try:
parsed_url = urlparse(request_url)
query_params = parse_qs(parsed_url.query)
# 필요한 파라미터들이 모두 존재하는지 확인
required_params = ['redirect_uri', 'response_type']
for param in required_params:
if param not in query_params:
return False
# response_type 값이 'token'인지 확인
response_type_values = query_params.get('response_type', [])
# response_type 파라미터가 존재하고 값 중에 'token'이 있는지 확인
return 'token' in response_type_values or 'id_token' in response_type_values
except Exception:
return False

29
addon/client_secret.py Normal file
View file

@ -0,0 +1,29 @@
from lib.report_vuln import report_vuln
from urllib.parse import urlparse, parse_qs
class ClientSecret:
def get_target_from_query(self, query: str, target: str) -> str | None:
if not query:
return None
parsed = parse_qs(query)
scope_values = parsed.get(target, [])
if scope_values:
return scope_values[0]
return None
async def test(self, flow):
req = flow.request
parsed = urlparse(req.pretty_url)
query = parsed.query
query_client_id = self.get_target_from_query(query, "client_id")
query_client_secret = self.get_target_from_query(query, "client_secret")
if query_client_id and query_client_secret:
report_vuln(
title="OAuth Client Secret Exposure",
desc=f"Client ID and Secret found in request: {query_client_id}, {query_client_secret}",
status="CRITICAL",
uri=req.pretty_url
)

View file

@ -1,26 +1,17 @@
# csrf_check.py
from mitmproxy import http, ctx
from mitmproxy import http
from urllib.parse import urlparse, parse_qs, unquote
import httpx
from typing import Optional, Union, List
import lib.target as target
from lib.report import save_report
from lib.report_vuln import report_vuln
from lib.utils.is_oauth_uri import is_oauth_uri
class CsrfChecker:
nonce_params = {
"state", "nonce", "as", "frame_id", "csrf_token", "csrf"
"state", "nonce", "csrf_token", "csrf"
}
def is_oauth_uri(self, uri: str) -> bool:
qs = parse_qs(urlparse(uri).query)
qs_keys = [*qs]
if "client_id" in qs_keys and any(p in qs_keys for p in (
"redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")):
return True
return False
def get_header(self, headers: http.Headers, name: str) -> Optional[str]:
# mitmproxy Headers는 case-insensitive
raw = headers.get(name)
@ -40,7 +31,7 @@ class CsrfChecker:
def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool:
code = flow.response.status_code
loc = self.get_header(flow.response.headers, "location") or ""
return 300 <= code < 400 and self.is_oauth_uri(loc)
return 300 <= code < 400 and is_oauth_uri(loc)
def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool:
qs = parse_qs(urlparse(flow.request.url).query)
@ -74,7 +65,7 @@ class CsrfChecker:
def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]:
# ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답
if not (self.is_oauth_uri(flow.request.url)
if not (is_oauth_uri(flow.request.url)
and self.check_nonce_in_request(flow)
and self.is_oauth_redirect(flow)):
return 0
@ -85,18 +76,29 @@ class CsrfChecker:
resp_nonce = self.get_query_param(loc, param) if param else None
if resp_nonce is None:
return ["Missing nonce in redirect"]
report_vuln(
title="CSRF Risk",
desc="Missing nonce in redirect response",
status="CRITICAL",
uri=flow.request.url
)
return 1
if orig_nonce != resp_nonce:
return ["Nonce mismatch request↔response"]
report_vuln(
title="CSRF Risk",
desc="Nonce mismatch request↔response",
status="HIGH",
uri=flow.request.url
)
return 1
return 0
async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]:
# OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사
if self.is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow):
if is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow):
return 0
loc0 = self.get_header(flow.response.headers, "location") or ""
param = self.find_nonce_param(loc0) or "state"
qs0 = parse_qs(urlparse(loc0).query)
@ -111,10 +113,16 @@ class CsrfChecker:
if new_nonce is None:
return 0
if new_nonce == orig_nonce:
return ["Nonce reused without cookies"]
report_vuln(
title="CSRF Risk",
desc="Nonce reused without cookies",
status="CRITICAL",
uri=flow.request.url
)
return 1
# (2) 두 번의 리다이렉트 비교
async with httpx.AsyncClient(follow_redirects=False) as cli:
async with httpx.AsyncClient(follow_redirects=True) as cli:
# 원본 쿼리
req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers)
# nonce 교체 쿼리
@ -127,42 +135,35 @@ class CsrfChecker:
and urlparse(req1.headers.get("location", "")).path
== urlparse(req2.headers.get("location", "")).path
):
return ["Identical redirects on nonce swap → potential CSRF"]
report_vuln(
title="CSRF Risk",
desc="Identical redirects on nonce swap → potential CSRF",
status="NOT-VERIFIED-HIGH",
uri=flow.request.url
)
return 1
return 0
async def response(self, flow: http.HTTPFlow) -> None:
try:
msgs: List[str] = []
# 1) 요청에 nonce 없으면
if self.is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow):
msgs.append("Missing state/nonce in request")
if is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow):
report_vuln(
title="CSRF Risk",
desc="Missing nonce in OAuth request",
status="CRITICAL",
uri=flow.request.url
)
return
# 2) 리다이렉트에서 nonce 검사
r1 = self.check_redirect_nonce(flow)
if r1:
msgs.extend(r1 if isinstance(r1, list) else [])
# 3) nonce 재사용 검사
r2 = await self.check_nonce_reuse(flow)
if r2:
msgs.extend(r2 if isinstance(r2, list) else [])
if msgs:
desc = " | ".join(msgs)
status = "MEDIUM"
report_data = [{
'target': target.load(),
'status': status,
'title': "CSRF Risk",
'description': desc,
'uri': flow.request.url,
}]
save_report(report_data)
print(f"[INFO] CSRF Check: {desc}")
else:
pass
except Exception as e:
print(f"[ERROR] CSRF Check failed: {e}")
return

View file

@ -44,8 +44,8 @@ class GoogleLoginHint:
# 요청 URL 수정 - URL과 호스트 모두 업데이트
flow.request.url = new_url
flow.request.pretty_url = new_url
print(f"🔄 Modified URL: {new_url}")
def _is_google_oauth_url(self, url):
"""Google OAuth URL인지 확인"""
google_oauth_domains = [

View file

@ -0,0 +1,52 @@
from lib.report_vuln import report_vuln
import httpx
from lib.utils.is_oauth_uri import is_oauth_uri
from urllib.parse import urlparse, parse_qs
class GoogleResponseTypeToken:
def get_taregt_from_query(self, query: str, target: str) -> str | None:
if not query:
return None
parsed = parse_qs(query)
scope_values = parsed.get(target, [])
if scope_values:
return scope_values[0]
return None
async def test(self, flow):
req = flow.request
if not is_oauth_uri(req.pretty_url):
return
if req.pretty_host != "accounts.google.com":
return
if "response_type=token" in req.pretty_url:
return
url = f"{req.pretty_url}".replace("response_type=code", "response_type=token")
async with httpx.AsyncClient(follow_redirects=True) as cli:
response = await cli.request(
method=req.method,
url=url,
headers=req.headers,
content=req.get_content(),
)
if response.status_code >= 400:
return
if "<b>400.</b>" in response.text:
return
if "response_type=token" in str(response.url):
report_vuln(
"Google Response Type Token",
f"Response type token allowed in {req.pretty_url}",
"HIGH",
str(response.url)
)

View file

@ -1,110 +1,93 @@
from mitmproxy import http
import asyncio
from pkce_check import PKCEDowngradeChecker
from ScopeDetection import ScopeDetection
from addon.scope_detection import ScopeDetection
from csrf_check import CsrfChecker
from nonce_check import NonceChecker
from redirect_uri_check import RedirectBypassChecker
from client_secret import ClientSecret
from addon.open_redirect_check import OpenRedirectChecker
from access_token import AccessTokenScanner
from GoogleLoginHint import GoogleLoginHint
from addon.google_login_hint import GoogleLoginHint
from addon.google_response_type_token import GoogleResponseTypeToken
import os
from dotenv import load_dotenv
from lib.utils.try_catch import try_catch
from lib.false_true_varifing_task import FalseTrueVarifingTask
# Initialize the singleton task manager
false_true_varifing_task = FalseTrueVarifingTask()
load_dotenv(override=True)
class PKCEAddon:
def __init__(self):
self.checker = PKCEDowngradeChecker()
_open_redirect_checker = OpenRedirectChecker()
async def request(self, flow: http.HTTPFlow):
print(
f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}"
)
try:
await self.checker.test(flow)
except Exception as e:
print(f"[ERROR] Addon failed: {e}")
pass
class AddonBase:
"""
Base class for addons.
Each addon should implement its own request or response method.
"""
class CsrfAddon:
def __init__(self):
self.checker = CsrfChecker()
async def response(self, flow: http.HTTPFlow):
try:
await self.checker.response(flow)
except Exception as e:
print(f"[ERROR] CSRF Addon failed: {e}")
pass
class ScopeAddon:
def __init__(self):
self.checker = ScopeDetection()
self._flow_map = {} # 요청 정보를 저장
async def request(self, flow: http.HTTPFlow):
self._flow_map[flow.id] = {
"method": flow.request.method,
"url": flow.request.pretty_url,
"query": flow.request.query,
}
async def response(self, flow: http.HTTPFlow):
try:
await self.checker.test(flow)
except Exception as e:
print(f"[ERROR] ScopeDetection failed: {e}")
class NonceAddon:
def __init__(self):
self.checker = NonceChecker()
async def response(self, flow: http.HTTPFlow):
try:
pass
# TODO id_token을 파싱하는 부분이 누락되어있습니다.
# await self.checker.check_nonce_in_id_token(flow)
except Exception as e:
print(f"[ERROR] NonceAddon failed: {e}")
pass
class AccessTokenAddon:
def __init__(self):
self.checker = AccessTokenScanner()
async def response(self, flow: http.HTTPFlow):
try:
await self.checker.scan(flow)
except Exception as e:
print(f"[ERROR] AccessToken Addon failed: {e}")
pass
class RedirectBypassAddon:
def __init__(self):
self.checker = RedirectBypassChecker()
# request 대신 response 로 바꿔 보세요:
async def response(self, flow: http.HTTPFlow):
try:
await self.checker.test(flow)
except Exception as e:
print(f"[ERROR] RedirectBypass Addon failed: {e}")
class GoogleLoginHintAddon():
def __init__(self) -> None:
if os.getenv('GOOGLE_ID'):
self.checker = GoogleLoginHint()
self.google_login_hint = GoogleLoginHint()
else:
self.checker = None
self.google_login_hint = None
def should_ignore(self, flow: http.HTTPFlow) -> bool:
"""Check if the request should be ignored."""
ignore_domains = [
".googleapis.com",
"android.clients.google.com", # Added missing comma here
".adtrafficquality.google",
".googlesyndication.com",
"cdn.jsdelivr.net",
"update.googleapis.com",
".google-analytics.com",
".gstatic.com"
]
# Ignore .googleapis.com domains
for domain in ignore_domains:
if domain in flow.request.pretty_host:
return True
# Ignore static files (JS, CSS, fonts, images, etc.)
# Split on '?' to remove query parameters before checking extension
path = flow.request.path.split('?')[0].lower()
static_extensions = [
'.js', '.css', '.woff2', '.woff', '.ttf', '.otf', '.svg',
'.png', '.jpg', '.jpeg', '.gif', '.webp', '.ico', '.bmp',
'.tiff', '.tif', '.webm', '.mp4', '.avi', '.mov', '.pdf', '.md',
'.txt', '.csv'
]
if any(path.endswith(ext) for ext in static_extensions):
return True
return False
async def request(self, flow: http.HTTPFlow):
if self.checker is None:
return
try:
await self.checker.request(flow)
except Exception as e:
print(f"[ERROR] GoogleLoginHint Addon failed: {e}")
if self.google_login_hint:
await try_catch(self.google_login_hint.request(flow))
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), AccessTokenAddon(), RedirectBypassAddon(), GoogleLoginHintAddon()]
if false_true_varifing_task.is_verifing_false_true():
return
tasks = [
try_catch(PKCEDowngradeChecker().test(flow)),
]
await asyncio.gather(*tasks)
async def response(self, flow: http.HTTPFlow):
if false_true_varifing_task.is_verifing_false_true() or self.should_ignore(flow):
return
tasks = [
try_catch(CsrfChecker().response(flow)),
try_catch(ScopeDetection().test(flow)),
try_catch(ClientSecret().test(flow)),
try_catch(AccessTokenScanner().scan(flow)),
try_catch(GoogleResponseTypeToken().test(flow)),
try_catch(_open_redirect_checker.test(flow)),
]
await asyncio.gather(*tasks)
addons = [AddonBase()]

View file

@ -1,10 +1,8 @@
import jwt
from urllib.parse import urlparse, parse_qs
from typing import Union
import httpx
import lib.target as target
from lib.report import save_report
from lib.report_vuln import report_vuln
class NonceChecker:
def is_oidc_flow(self, flow) -> bool:
@ -72,17 +70,13 @@ class NonceChecker:
def check_nonce_in_id_token(self, flow, id_token: str) -> bool:
decoded = self.decode_id_token(id_token)
nonce = decoded.get("nonce")
req = flow.request
url = req.pretty_url
if not nonce:
report_data = [{
'target': target.load(),
'status': "CRITICAL",
'title': "nonce is missing in id_token",
'description': "Nonce is present in the request but missing in the id_token.",
'uri': f"Original: {url}\nDecoded ID Token: {decoded}",
}]
save_report(report_data)
report_vuln(
title="Nonce Check Failed",
desc="id_token에 nonce가 없습니다.",
status="HIGH",
uri=flow.request.url
)
return False
else:
return True

1722
addon/open_redirect_check.py Normal file

File diff suppressed because it is too large Load diff

View file

@ -1,10 +1,7 @@
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import asyncio
import httpx
from typing import Dict, List
import lib.target as target
from lib.report import save_report
from lib.report_vuln import report_vuln
class PKCEDowngradeChecker:
@ -58,27 +55,19 @@ class PKCEDowngradeChecker:
async def report_missing_parameters(self, url: str, is_openid: bool):
status = "MEDIUM" if is_openid else "LOW"
self.save(
[
self.make_report(
status,
"PKCE Parameters Missing",
"PKCE parameters are missing or incomplete.",
url,
)
]
report_vuln(
title="PKCE Parameters Missing",
desc="PKCE parameters are missing or incomplete.",
status=status,
uri=url,
)
async def report_plain_method(self, url: str):
self.save(
[
self.make_report(
"CRITICAL",
"PKCE Plain Method",
"PKCE method is set to 'plain'. Possible downgrade.",
url,
)
]
report_vuln(
title="PKCE Plain Method",
desc="PKCE method is set to 'plain'. Possible downgrade.",
status="CRITICAL",
uri=url,
)
def create_downgraded_url(self, parsed, query):
@ -150,15 +139,11 @@ class PKCEDowngradeChecker:
else:
return # Likely safe
self.save(
[
self.make_report(
status,
title,
description,
f"Original: {original_url}\nDowngraded: {downgraded_url}",
)
]
report_vuln(
title=title,
desc=description,
status=status,
uri=f"Original: {original_url}\nDowngraded: {downgraded_url}",
)
def same_redirect_destination(self, orig_loc, down_loc):
@ -166,16 +151,3 @@ class PKCEDowngradeChecker:
down = urlparse(down_loc)
return orig.netloc == down.netloc and orig.path == down.path
def make_report(
self, status: str, title: str, description: str, uri: str
) -> Dict[str, str]:
return {
"target": target.load(),
"status": status,
"title": title,
"description": description,
"uri": uri,
}
def save(self, report_data: List[Dict[str, str]]):
save_report(report_data)

View file

@ -1,202 +0,0 @@
from mitmproxy import http
import aiohttp
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import lib.target as target
from lib.report import save_report
class BypassPayload:
""" 우회 패턴 정의 """
def __init__(self, name: str, mutate_func, description: str):
self.name = name
self.mutate = mutate_func #우회 url 만드는 함수
self.description = description
class RedirectBypassChecker:
def __init__(self):
""" 우회 페이로드 목록 """
self.bypass_payloads = [
BypassPayload(
name=r"@",
mutate_func=self._mutate_pattern1,
description=r"@ 기호를 이용한 호스트 우회 공격: evil.com@target.com"
),
]
self.session = None
""" 우회 URL 생성 목록 """
# 1. @
def _mutate_pattern1(self, original: str) -> str:
parsed = urlparse(original)
mutated = f"https://evil.com@{parsed.netloc}{parsed.path}"
print(f"[redirect_uri_check] original: {original} → mutated: {mutated}")
return mutated
'''aiohttp 세션 생성 (재사용)'''
async def _get_session(self):
if self.session is None:
timeout = aiohttp.ClientTimeout(total=10)
self.session = aiohttp.ClientSession(timeout=timeout)
return self.session
'''세션 정리'''
async def close_session(self):
if self.session:
await self.session.close()
self.session = None
""" 우회된 URL에 GET 요청을 보내고, 서버 응답 정보 반환 """
async def _send_request(self, url, headers=None):
try:
session = await self._get_session() # 세션 준비
request_headers = headers or {} # 요청 헤더 설정 - headers가 주어지지 않았다면 빈 딕셔너리 사용
# 서버에 GET 요청 전송
async with session.get(url, allow_redirects=False, headers=request_headers) as response:
return {
'status': response.status,
'location': response.headers.get("Location", ""),
'headers': dict(response.headers)
}
except Exception as e:
print(f"[ERROR] 요청 실패 ({url}): {e}")
return {'status': 500, 'location': '', 'headers': {}}
""" redirect_uri가 기준 도메인(baseUrl)에 속하는지 판단 """
def _is_baseline_valid(self, redirect_uri: str, base_url: str) -> bool:
try:
redirect_parsed = urlparse(redirect_uri)
base_parsed = urlparse(base_url)
redirect_host = redirect_parsed.hostname
base_host = base_parsed.hostname
if not redirect_host or not base_host:
return False
if "@" in redirect_uri:
if redirect_host != base_host:
print(f"[ALERT] 우회 공격 탐지: {redirect_host} != {base_host}")
return False
at_parts = redirect_uri.split('@')
if len(at_parts) > 1:
before_at = at_parts[0]
if '//' in before_at:
potential_domain = before_at.split('//')[-1]
if '.' in potential_domain and potential_domain != base_host:
print(f"[CRITICAL] @ 우회 공격: {potential_domain}@{base_host}")
return False
is_valid = (redirect_host == base_host or redirect_host.endswith(f".{base_host}"))
return is_valid
except Exception as e:
print(f"[ERROR] 도메인 검증 실패: {e}")
return False
""" Location 헤더에서 authorization code 추출 """
def _extract_code_from_location(self, location: str) -> str:
if not location:
return ""
try:
parsed = urlparse(location)
query = parse_qs(parsed.query)
return query.get('code', [''])[0]
except:
return ""
""" 메인 테스트 함수 - mitmproxy flow 처리 """
async def test(self, flow: http.HTTPFlow):
url = flow.request.pretty_url
parsed = urlparse(url)
query = parse_qs(parsed.query)
if "redirect_uri" not in query:
return
original_redirect_uri = query["redirect_uri"][0]
print(f"[DEBUG] 원본 redirect_uri: {original_redirect_uri}")
for payload in self.bypass_payloads:
try:
await self._test_bypass_pattern(
url, query, parsed, original_redirect_uri, payload, headers={}
)
except Exception as e:
print(f"[ERROR] 우회 패턴 테스트 실패 ({payload.name}): {e}")
continue
""" 개별 우회 패턴 테스트 """
async def _test_bypass_pattern(self, original_url, query, parsed_url, original_redirect_uri, payload, headers):
print(f"[SCAN] 우회 패턴 테스트: {payload.name}")
# 우회 URL 생성
bypassed_uri = payload.mutate(original_redirect_uri)
# 새로운 쿼리 파라미터 구성
modified_query = query.copy()
modified_query["redirect_uri"] = [bypassed_uri]
new_query_string = urlencode(modified_query, doseq=True)
test_url = urlunparse(parsed_url._replace(query=new_query_string))
# 요청 전송
response = await self._send_request(test_url, headers)
# 응답 분석
await self._analyze_response(original_url, test_url, bypassed_uri, response, payload)
""" 응답 분석 및 취약점 판단 """
async def _analyze_response(self, original_url, test_url, bypassed_uri, response, payload):
status = response['status']
location = response['location']
# 리다이렉트 응답이 아니면 스킵
if status not in [301, 302, 303, 307, 308]:
return
# Location 헤더에서 code 추출
auth_code = self._extract_code_from_location(location)
if auth_code and not self._is_baseline_valid(bypassed_uri, original_url):
# 취약점 발견 시에만 로그
print(f"[🎯 VULNERABILITY] {payload.name} 우회 성공!")
await self._report_vulnerability(original_url, test_url, bypassed_uri, location, auth_code, payload)
""" 취약점 보고서 생성 """
async def _report_vulnerability(self, original_url, test_url, bypassed_uri, location, auth_code, payload):
# payload가 문자열인지 객체인지 확인
if hasattr(payload, 'name'):
pattern_name = payload.name
pattern_description = payload.description
else:
pattern_name = str(payload)
pattern_description = "Unknown bypass pattern"
description = (
f"Redirect URI 우회 취약점 발견!\n\n"
f"-- 상세 정보 --:\n"
f"• 우회 패턴: {pattern_name}\n"
f"• 설명: {pattern_description}\n"
f"• 원본 URL: {original_url}\n"
f"• 우회된 redirect_uri: {bypassed_uri}\n"
f"• 테스트 URL: {test_url}\n"
f"• 리다이렉트 위치: {location}\n"
f"• 발급된 인가 코드: {auth_code[:10]}...\n\n"
)
report_data = [{
"target": target.load(),
"status": "CRITICAL",
"title": "Redirect URI Bypass Vulnerability",
"description": description,
"uri": test_url # uri 필드 추가
}]
save_report(report_data)
print(f"[🎯 CRITICAL] Redirect URI 우회 취약점 발견 및 보고 완료!")
print(f"[INFO] 패턴: {pattern_name}, 우회 URI: {bypassed_uri}")

32
addon/scope_detection.py Normal file
View file

@ -0,0 +1,32 @@
from lib.report_vuln import report_vuln
from lib.utils.is_oauth_uri import is_oauth_uri
from urllib.parse import urlparse, parse_qs
class ScopeDetection:
def get_scope_from_query(self, query: str) -> str | None:
if not query:
return None
parsed = parse_qs(query)
scope_values = parsed.get("scope", [])
if scope_values:
return scope_values[0]
return None
async def test(self, flow):
if not is_oauth_uri(flow.request.pretty_url):
return
req = flow.request
parsed = urlparse(req.pretty_url)
query = parsed.query
query_scope = self.get_scope_from_query(query)
if query_scope in ["all", "*"]:
report_vuln(
title="OAuth Scope Value Issue",
desc=f"Scope value issue detected in request: {query_scope}",
status="WARNING",
uri=req.pretty_url
)

View file

@ -0,0 +1,65 @@
from typing import Any
from copy import deepcopy
class FalseTrueVarifingTask:
"""
A singleton class representing a task that can be either false or true.
This class is used to handle tasks that require verification of their truth value.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(FalseTrueVarifingTask, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._is_verifing = False
self.task_queue = []
self._initialized = True
def reset(self):
"""
Reset the task queue and verification status.
"""
self._is_verifing = False
self.task_queue.clear()
# 각 addon의 검증 로직에서 해당 함수를 호출하여, 추후 오탐 검증을 위한 작업을 추가할 수 있습니다.
# TODO: 모델 지정해두기
def add_task(self, task_name: str, initial_uri: str, data: Any):
"""
Add a task to the task queue.
:param task: The task to be added.
"""
self.task_queue.append(
{
"task_name": task_name,
"initial_uri": initial_uri,
"data": data
}
)
def start_verification(self):
"""
Start the verification process for the tasks in the queue.
"""
self._is_verifing = True
def get_task_queue(self):
"""
Get a copy of the current task queue.
:return: A copy of the task queue.
"""
return deepcopy(self.task_queue)
def is_verifing_false_true(self):
"""
Get the current verification status.
:return: True if verification is in progress, False otherwise.
"""
return self._is_verifing

View file

@ -1,14 +1,16 @@
# save as data/report.csv
import os
import csv
from typing import List, Dict, Any
from mitmproxy import http
import lib.cur_target_url as cur_target_url
# target, status, title, description, uri
# file path는 'data/report.csv'로 고정
def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report.csv') -> None:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
def report_vuln(title: str, desc: str, status: str, uri: str) -> None:
file_path: str = 'data/report.csv'
os.makedirs(os.path.dirname(file_path), exist_ok=True)
"""
report_data 안의 레포트를 줄씩 CSV에 추가로 저장합니다.
@ -23,10 +25,10 @@ def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report
if not file_exists:
writer.writeheader()
for row in report_data:
# None 방지 & 줄바꿈 이스케이프
escaped = {
k: str(v).replace('\n', '\\n') if v is not None else ''
for k, v in row.items()
}
writer.writerow(escaped)
writer.writerow({
'target': cur_target_url.load(),
'status': status,
'title': title,
'description': desc,
'uri': uri,
})

10
lib/utils/is_oauth_uri.py Normal file
View file

@ -0,0 +1,10 @@
from urllib.parse import urlparse, parse_qs
def is_oauth_uri(uri: str) -> bool:
qs = parse_qs(urlparse(uri).query)
qs_keys = [*qs]
if "client_id" in qs_keys and any(p in qs_keys for p in (
"redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")):
return True
return False

5
lib/utils/try_catch.py Normal file
View file

@ -0,0 +1,5 @@
async def try_catch(coro):
try:
return await coro
except Exception as e:
print(f"[ERROR] {coro} failed: {e}")

View file

@ -1,17 +1,107 @@
from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import Response
import lib.target as target
import lib.cur_target_url as cur_target_url
from lib.false_true_varifing_task import FalseTrueVarifingTask
from pydantic import BaseModel, Field
from lib.report_vuln import report_vuln as save_vuln
# Initialize the singleton task manager
false_true_varifing_task = FalseTrueVarifingTask()
app = FastAPI()
@app.post("/start")
async def start(url: str = Query(None)):
if url:
target.save(url)
print(f"Target URL set to: {url}")
return {"message": f"Target URL set to: {url}"}
return {"error": "No URL provided"}
@app.post(
"/start",
summary="취약점 검증을 위한 대상 URL 설정",
description="""
엔드포인트는 시스템이 취약점 검증 작업에 사용할 대상 URL을 설정합니다.
유효한 URL이 제공되면:
- 해당 URL이 저장됩니다.
- 검증 작업 큐가 초기화됩니다.
- 새로운 검증 작업을 시작할 준비가 완료됩니다.
URL이 제공되지 않으면, 오류가 반환됩니다.
"""
,
tags=["1st STEP"]
)
async def start(url: str = Query(..., description="The URL to target for vulnerability verification")):
cur_target_url.save(url)
false_true_varifing_task.reset()
return {"message": f"Target URL set to: {url}"}
@app.post(
"/start-false-true-verifing",
summary="시스템에 오탐 검증 작업 시작을 알림",
description="""
엔드포인트는 시스템에 오탐 검증 작업이 시작되었음을 알립니다.
또한 시스템은 미리 준비된 오탐 검증 작업 목록을 반환합니다.
```json
{
"payload": [
{
"task_name": "pkce_task", # 검증 작업의 이름
"initial_uri": "http://auth.example.com", # browser가 처음 접속할 URI
"data": any # 추가 데이터
},
...
]
}
```
""",
tags=["2nd STEP"]
)
async def start_false_true_verifing():
false_true_varifing_task.start_verification()
task_queue = false_true_varifing_task.get_task_queue()
return {"payload": task_queue}
class VulnerabilityReport(BaseModel):
title: str = Field(..., description="Short title for the vulnerability")
url: str = Field(..., description="URL where the vulnerability was discovered")
status: str = Field(..., description="Status of the vulnerability (e.g., VERIFIED-CRITICAL)")
desc: str = Field(..., description="Detailed description of the issue")
@app.post(
"/report-vuln",
summary="취약점 보고",
description="""
정탐인 취약점을 시스템에 보고합니다.
보고 다음 정보를 포함해야 합니다:
- **title**: 취약점의 간단한 이름
- **url**: 취약점이 발견된 위치 (URL)
- **status**: 심각도
- **desc**: 취약점에 대한 상세 설명
""",
tags=["3rd STEP"]
)
async def report_vuln(vuln: VulnerabilityReport):
save_vuln(
title=vuln.title,
desc=vuln.desc,
status=vuln.status,
uri=vuln.url
)
return {"message": "Vulnerability reported successfully"}
@app.exception_handler(404)