Compare commits

..

1 commit

Author SHA1 Message Date
4677eae575 날라가는데 고치긴 귀찮아서... 2025-06-08 13:42:55 +09:00
26 changed files with 209 additions and 2821 deletions

2
.env
View file

@ -1,2 +0,0 @@
# Google OAuth 설정
GOOGLE_ID=whs.imnya.ng@gmail.com

View file

@ -1,52 +0,0 @@
name: CI/CD Pipeline
on:
push:
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.13]
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
enable-cache: true
cache-dependency-glob: "uv.lock"
- name: Set up Python ${{ matrix.python-version }}
run: uv python install ${{ matrix.python-version }}
- name: Install dependencies
run: uv sync
- name: Set up environment variables
run: |
echo "GOOGLE_ID=bot.imnya.ng@gmail.com" > .env
- name: Start application and run proxy test
run: |
# Start the application in background
uv run main.py &
APP_PID=$!
# Wait for application to start
sleep 5
# Test proxy functionality
sudo cp ~/.mitmproxy/mitmproxy-ca-cert.pem /usr/local/share/ca-certificates/mitmproxy-ca-cert.crt
sudo update-ca-certificates
mkdir data
echo https://github.com > ./data/target.dump
curl -k -x https://localhost:11080 "https://github.com/login/oauth/authorize?client_id=Ov23lixietSCQOHxPvcr&redirect_uri=http://localhost:8787&scope=read:user+user:email&response_type=code&code_challenge=abc123&code_challenge_method=S256"
# Clean up
kill $APP_PID

69
.gitignore vendored
View file

@ -9,71 +9,4 @@ wheels/
# Virtual environments # Virtual environments
.venv .venv
.env data/
data/
# Created by https://www.toptal.com/developers/gitignore/api/macos,windows
# Edit at https://www.toptal.com/developers/gitignore?templates=macos,windows
### macOS ###
# General
.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two \r
Icon
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
### macOS Patch ###
# iCloud generated files
*.icloud
### Windows ###
# Windows thumbnail cache files
Thumbs.db
Thumbs.db:encryptable
ehthumbs.db
ehthumbs_vista.db
# Dump file
*.stackdump
# Folder config file
[Dd]esktop.ini
# Recycle Bin used on file shares
$RECYCLE.BIN/
# Windows Installer files
*.cab
*.msi
*.msix
*.msm
*.msp
# Windows shortcuts
*.lnk
# End of https://www.toptal.com/developers/gitignore/api/macos,windows

View file

@ -1,7 +1,5 @@
# 환경 설정 # 환경 설정
## Python Virtual Environment
이 프로젝트는 [uv](https://docs.astral.sh/uv/getting-started/installation/)라는 Python 패키지 관리자를 사용하여 설정해야합니다. 이 프로젝트는 [uv](https://docs.astral.sh/uv/getting-started/installation/)라는 Python 패키지 관리자를 사용하여 설정해야합니다.
uv 설치 후 다음과 같은 명령어를 입력합니다. uv 설치 후 다음과 같은 명령어를 입력합니다.
@ -10,33 +8,8 @@ uv 설치 후 다음과 같은 명령어를 입력합니다.
uv sync uv sync
``` ```
## Environment
venv와 패키지가 설치가 됩니다. venv와 패키지가 설치가 됩니다.
.env.example을 복사하여 .env를 붙여넣습니다.
`GOOGLE_ID=`에 봇에서 쓸 구글 계정의 전체 GMail을 기입합니다.
입력하지 않는다면 Google OAuth시 자동적으로 넘어가지 않을 수도 있습니다.
---
> [oauth-backend](https://github.com/j93es/oauth-backend) 프록시를 사용한다면 이 가이드에 따라 인증서 또한 설정되어야만 합니다.
>
> 그렇지 않으면 실행되지 않습니다.
>
> 윈도우 환경에서는 `sudo certutil -addstore root mitmproxy-ca-cert.cer`로 인증합니다.
>
> Sudo가 활성화되어있지 않은 환경에서는 관리자로 상향된 쉘에서 실행합니다.
>
> MacOS 환경에서는 `sudo security add-trusted-cert -d -p ssl -p basic -k /Library/Keychains/System.keychain ~/.mitmproxy/mitmproxy-ca-cert.pem`으로 인증합니다.
>
> 다른 플렛폼은 수동으로 설정되어야만 합니다.
> https://docs.mitmproxy.org/stable/concepts/certificates/
---
# 실행 방법 # 실행 방법
``` ```
@ -44,51 +17,46 @@ uv run main.py
``` ```
이러면 http(s)://localhost:11080로 서버가 열리게 됩니다. 이러면 http(s)://localhost:11080로 서버가 열리게 됩니다.
http://localhost:11081로 백엔드 서버가 열리게 됩니다.
# 기여 방법 # 기여 방법
`./addon/init.py` `./addon/init.py`
```py ```py
... from example_check import Example
async def request(self, flow: http.HTTPFlow):
if false_true_varifing_task.is_verifing_false_true(): class LoggerAddon:
return def __init__(self):
self.checker = Example()
def request(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것
self.checker.test(flow)
def response(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것
self.checker.test(flow)
tasks = [
try_catch(self.google_login_hint.request(flow)) if self.google_login_hint else None,
try_catch(PKCEDowngradeChecker().test(flow)),
try_catch(Example().test(flow))
]
await asyncio.gather(*tasks)
...
``` ```
`./addon/example.py` `./addon/example.py`
```py ```py
from lib.report_vuln import report_vuln import lib.target as target
from lib.report import save_report
class Example: class Example:
async def test(self, flow): async def test(self, flow):
req = flow.request
method = req.method
url = req.pretty_url url = req.pretty_url
# data/report.csv에 저장 # data/report.csv에 저장
report_vuln( report_data = [{
title="PKCE Plain Method", 'target': target.load(),
desc="PKCE method is set to 'plain'. Possible downgrade.", 'status': "CRITICAL",
status="CRITICAL", 'title': "PKCE Downgrade Vulnerability",
uri=url, 'description': "PKCE downgrade vulnerability detected! Both URLs returned authorization code.",
) 'uri': f"Original: {url}\nDowngraded: {downgraded_url}"
}]
save_report(report_data)
``` ```
이러한 예제를 참고하여 작성하여주세요. 이러한 예제를 참고하여 작성하여주세요.
# 백엔드 API DOCS
`uv run main.py`으로 백엔드를 실행한 후에, 다음의 url에 접속합니다.
```
http://localhost:11081/redoc
```

53
addon/ScopeDetection.py Normal file
View file

@ -0,0 +1,53 @@
import lib.target as target
from lib.report import save_report
class ScopeDetection:
def get_scope_from_query(self, query: str) -> str | None:
if not query:
return None
import urllib.parse
parsed = urllib.parse.parse_qs(query)
scope_values = parsed.get("scope", [])
if scope_values:
return scope_values[0]
return None
async def check_scope(self, flow):
req = flow.request
res = flow.response
# req.query가 MultiDictView일 수 있으므로 문자열로 변환
if hasattr(req.query, "urlencode"):
query = req.query.urlencode()
else:
query = str(req.query) if req.query else ""
location = res.headers.get("location", "")
query_scope = self.get_scope_from_query(query)
location_scope = self.get_scope_from_query(location)
result = []
if query_scope in ["all", "*"]:
result.append(f"Scope value issue detected in request: {query_scope}")
if location_scope in ["all", "*"]:
result.append(f"Scope value issue detected in response location: {location_scope}")
return result if result else 0
async def test(self, flow):
req = flow.request
method = req.method
url = req.pretty_url
result = await self.check_scope(flow)
if result != 0:
report_data = [{
'target': target.load(),
'status': "WARNING",
'title': "OAuth scope value issue",
'description': f"{method} {url}: {', '.join(result)}",
'uri': url
}]
save_report(report_data)

View file

@ -1,151 +0,0 @@
import re
import asyncio
from typing import Optional, Any
from mitmproxy.http import HTTPFlow
from urllib.parse import urlparse, parse_qs
from lib.report_vuln import report_vuln
# 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너
class AccessTokenScanner:
async def scan(self, flow: HTTPFlow) -> None:
"""단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사."""
print(f"[TOKENDEBUG] Request URL: {flow.request.url}")
async_gather = []
async_gather.append(self._scan_request(flow.request))
async_gather.append(self._scan_response(flow.response, flow.request.url))
await asyncio.gather(*async_gather)
# 내부 구현
async def _scan_request(self, request: Any):
print("[TOKENDEBUG] ==scan request==")
# URL 검사
if self._is_implicit_flow(request.url):
print("[TOKENDEBUG] OAuth Implicit Flow detected.")
report_vuln(
title="Token Leak in Request URL",
desc="취약한 Grant Type입니다 (Implicit Grant Type)",
status="MEDIUM",
uri=request.url
)
# Body 검사 (텍스트 컨텐츠인 경우)
if request.content:
body_text = request.get_text(strict=False)
token_result = self._extract_token(body_text)
if token_result:
report_vuln(
title="Token Leak in Request Body",
desc=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token_result[:20]}",
status="LOW",
uri=request.url
)
async def _scan_response(self, response: Optional[Any], request_url: str):
if response is None:
return
print("[TOKENDEBUG] ==scan response==")
# Location 헤더 검사 (리다이렉트)
if location_header := response.headers.get("Location"):
token_result = self._extract_token(location_header)
if token_result:
report_vuln(
title="Token Leak in Redirect URL (Location header)",
desc=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token_result[:20]}",
status="MEDIUM",
uri=location_header,
)
# Body 검사 (텍스트 컨텐츠인 경우)
if response.content:
body_text = response.get_text(strict=False)
token_result = self._extract_token(body_text)
if token_result:
report_vuln(
title="Token Leak in Response Body",
desc=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token_result[:20]}",
status="LOW",
uri=request_url,
)
# 토큰 탐지 키워드드
_TOKEN_KEYS = [
"access_token",
"accesstoken",
"refresh_token",
"refreshtoken",
"auth_token",
"session_token",
]
# "bearer" 표시가 동시에 존재할 때만 토큰으로 판단하여 false positive를 줄임
_TOKEN_TYPE_REGEXES = [
re.compile(r"token[_-]?type[=:]?\s*bearer", re.IGNORECASE),
re.compile(r"authorization\s*[:=]\s*bearer", re.IGNORECASE),
]
# 동적 컴파일: key=value / "key": "value" / Bearer <token>
_TOKEN_PATTERNS = (
[
re.compile(fr"{key}[=:]\s*([A-Za-z0-9\-._~+/]{{10,}})", re.IGNORECASE)
for key in _TOKEN_KEYS
]
+ [
re.compile(fr"\"{key}\"\s*:\s*\"([^\"]{{10,}})\"", re.IGNORECASE)
for key in _TOKEN_KEYS
]
+ [re.compile(r"bearer\s+([A-Za-z0-9\-._~+/]{10,})", re.IGNORECASE)]
)
def _extract_token(self, text: str) -> Optional[str]:
"""텍스트 블록에서 토큰 후보를 추출. Bearer 유형이 동반된 경우에 한정."""
if not text:
return None
# Bearer 타입이 같이 존재하는지 미리 확인 (속도 & 정확도↑)
has_bearer = any(rx.search(text) for rx in self._TOKEN_TYPE_REGEXES)
for pattern in self._TOKEN_PATTERNS:
if (m := pattern.search(text)) and m.group(1):
print(f"[TOKENDEBUG] token: {m.group(1)}")
print(f"[TOKENDEBUG] has_bearer: {has_bearer}")
if has_bearer:
return m.group(1)
print("[TOKENDEBUG] No matched.")
return None
def _is_implicit_flow(self, request_url: str) -> bool:
"""
URL의 파라미터에서 OAuth Implicit Flow 패턴을 체크합니다.
Args:
request_url: 체크할 요청 URL
Returns:
bool: client_id, redirect_uri, response_type이 모두 존재하고
response_type 값이 'token' 경우 True, 그렇지 않으면 False
"""
try:
parsed_url = urlparse(request_url)
query_params = parse_qs(parsed_url.query)
# 필요한 파라미터들이 모두 존재하는지 확인
required_params = ['redirect_uri', 'response_type']
for param in required_params:
if param not in query_params:
return False
# response_type 값이 'token'인지 확인
response_type_values = query_params.get('response_type', [])
# response_type 파라미터가 존재하고 값 중에 'token'이 있는지 확인
return 'token' in response_type_values or 'id_token' in response_type_values
except Exception:
return False

View file

@ -1,29 +0,0 @@
from lib.report_vuln import report_vuln
from urllib.parse import urlparse, parse_qs
class ClientSecret:
def get_target_from_query(self, query: str, target: str) -> str | None:
if not query:
return None
parsed = parse_qs(query)
scope_values = parsed.get(target, [])
if scope_values:
return scope_values[0]
return None
async def test(self, flow):
req = flow.request
parsed = urlparse(req.pretty_url)
query = parsed.query
query_client_id = self.get_target_from_query(query, "client_id")
query_client_secret = self.get_target_from_query(query, "client_secret")
if query_client_id and query_client_secret:
report_vuln(
title="OAuth Client Secret Exposure",
desc=f"Client ID and Secret found in request: {query_client_id}, {query_client_secret}",
status="CRITICAL",
uri=req.pretty_url
)

View file

@ -1,169 +0,0 @@
# csrf_check.py
from mitmproxy import http
from urllib.parse import urlparse, parse_qs, unquote
import httpx
from typing import Optional, Union, List
from lib.report_vuln import report_vuln
from lib.utils.is_oauth_uri import is_oauth_uri
class CsrfChecker:
nonce_params = {
"state", "nonce", "csrf_token", "csrf"
}
def get_header(self, headers: http.Headers, name: str) -> Optional[str]:
# mitmproxy Headers는 case-insensitive
raw = headers.get(name)
if raw is None:
return None
# percent-encoding 디코딩 (예: '%20' → ' ')
return unquote(raw)
def get_query_param(self, uri: str, param: str) -> Optional[str]:
return parse_qs(urlparse(uri).query).get(param, [None])[0]
def set_query_param(self, qs: dict, param: str, value: str) -> dict:
new_qs = dict(qs)
new_qs[param] = [value]
return new_qs
def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool:
code = flow.response.status_code
loc = self.get_header(flow.response.headers, "location") or ""
return 300 <= code < 400 and is_oauth_uri(loc)
def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool:
qs = parse_qs(urlparse(flow.request.url).query)
for p in self.nonce_params:
val = qs.get(p) # 값이 없으면 None, 있으면 리스트 혹은 단일값
if val:
return True
return False
def find_nonce_param(self, uri: str) -> Optional[str]:
qs_keys = parse_qs(urlparse(uri).query).keys()
for p in self.nonce_params:
if p in qs_keys:
return p
return None
async def fetch_no_cookie(self, flow: http.HTTPFlow) -> httpx.Response:
# HTTPX로 비동기 재요청: 쿠키 제외
headers = {
k: v for k, v in flow.request.headers.items()
if k.lower() != "cookie"
}
async with httpx.AsyncClient(follow_redirects=False) as cli:
return await cli.request(
method=flow.request.method,
url=flow.request.url,
headers=headers,
content=flow.request.get_content(),
)
def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]:
# ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답
if not (is_oauth_uri(flow.request.url)
and self.check_nonce_in_request(flow)
and self.is_oauth_redirect(flow)):
return 0
param = self.find_nonce_param(flow.request.url)
orig_nonce = self.get_query_param(flow.request.url, param) if param else None
loc = self.get_header(flow.response.headers, "location") or ""
resp_nonce = self.get_query_param(loc, param) if param else None
if resp_nonce is None:
report_vuln(
title="CSRF Risk",
desc="Missing nonce in redirect response",
status="CRITICAL",
uri=flow.request.url
)
return 1
if orig_nonce != resp_nonce:
report_vuln(
title="CSRF Risk",
desc="Nonce mismatch request↔response",
status="HIGH",
uri=flow.request.url
)
return 1
return 0
async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]:
# OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사
if is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow):
return 0
loc0 = self.get_header(flow.response.headers, "location") or ""
param = self.find_nonce_param(loc0) or "state"
qs0 = parse_qs(urlparse(loc0).query)
orig_nonce = qs0.get(param, [None])[0]
# (1) 쿠키 없는 재요청 → 새 nonce
resp_no_cookie = await self.fetch_no_cookie(flow)
if resp_no_cookie.status_code >= 400:
return 0
loc1 = resp_no_cookie.headers.get("location", "")
new_nonce = parse_qs(urlparse(loc1).query).get(param, [None])[0]
if new_nonce is None:
return 0
if new_nonce == orig_nonce:
report_vuln(
title="CSRF Risk",
desc="Nonce reused without cookies",
status="CRITICAL",
uri=flow.request.url
)
return 1
# (2) 두 번의 리다이렉트 비교
async with httpx.AsyncClient(follow_redirects=True) as cli:
# 원본 쿼리
req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers)
# nonce 교체 쿼리
qs0[param] = [new_nonce]
req2 = await cli.get(loc0, params=qs0, headers=flow.request.headers)
if (
req1.status_code == req2.status_code
and 200 <= req1.status_code < 400
and urlparse(req1.headers.get("location", "")).path
== urlparse(req2.headers.get("location", "")).path
):
report_vuln(
title="CSRF Risk",
desc="Identical redirects on nonce swap → potential CSRF",
status="NOT-VERIFIED-HIGH",
uri=flow.request.url
)
return 1
return 0
async def response(self, flow: http.HTTPFlow) -> None:
try:
# 1) 요청에 nonce 없으면
if is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow):
report_vuln(
title="CSRF Risk",
desc="Missing nonce in OAuth request",
status="CRITICAL",
uri=flow.request.url
)
return
# 2) 리다이렉트에서 nonce 검사
r1 = self.check_redirect_nonce(flow)
# 3) nonce 재사용 검사
r2 = await self.check_nonce_reuse(flow)
except Exception as e:
print(f"[ERROR] CSRF Check failed: {e}")
return

View file

@ -1,67 +0,0 @@
import os
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
from dotenv import load_dotenv
# .env 파일 로드
load_dotenv(override=True)
class GoogleLoginHint:
def __init__(self):
self.google_id = os.getenv('GOOGLE_ID', '')
if not self.google_id:
print("⚠️ Warning: GOOGLE_ID not found in .env file")
async def request(self, flow):
"""Google OAuth 요청을 가로채서 login_hint를 추가하거나 수정"""
req = flow.request
url = req.pretty_url
# Google OAuth 인증 URL인지 확인
if self._is_google_oauth_url(url):
print(f"🔍 Google OAuth URL detected: {url}")
# URL 파싱
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
# login_hint 추가 또는 수정
if self.google_id:
query_params['login_hint'] = [self.google_id]
print(f"✅ Added/Updated login_hint: {self.google_id}")
# 새로운 쿼리 스트링 생성
new_query = urlencode(query_params, doseq=True)
# 새로운 URL 생성
new_url = urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
new_query,
parsed_url.fragment
))
# 요청 URL 수정 - URL과 호스트 모두 업데이트
flow.request.url = new_url
print(f"🔄 Modified URL: {new_url}")
def _is_google_oauth_url(self, url):
"""Google OAuth URL인지 확인"""
google_oauth_domains = [
'accounts.google.com',
'oauth2.googleapis.com'
]
parsed_url = urlparse(url)
domain = parsed_url.netloc.lower()
# Google OAuth 도메인 확인
for google_domain in google_oauth_domains:
if google_domain in domain:
# OAuth 관련 경로 확인
path = parsed_url.path.lower()
if any(oauth_path in path for oauth_path in ['/oauth2', '/auth', '/login']):
return True
return False

View file

@ -1,52 +0,0 @@
from lib.report_vuln import report_vuln
import httpx
from lib.utils.is_oauth_uri import is_oauth_uri
from urllib.parse import urlparse, parse_qs
class GoogleResponseTypeToken:
def get_taregt_from_query(self, query: str, target: str) -> str | None:
if not query:
return None
parsed = parse_qs(query)
scope_values = parsed.get(target, [])
if scope_values:
return scope_values[0]
return None
async def test(self, flow):
req = flow.request
if not is_oauth_uri(req.pretty_url):
return
if req.pretty_host != "accounts.google.com":
return
if "response_type=token" in req.pretty_url:
return
url = f"{req.pretty_url}".replace("response_type=code", "response_type=token")
async with httpx.AsyncClient(follow_redirects=True) as cli:
response = await cli.request(
method=req.method,
url=url,
headers=req.headers,
content=req.get_content(),
)
if response.status_code >= 400:
return
if "<b>400.</b>" in response.text:
return
if "response_type=token" in str(response.url):
report_vuln(
"Google Response Type Token",
f"Response type token allowed in {req.pretty_url}",
"HIGH",
str(response.url)
)

View file

@ -1,93 +1,39 @@
from mitmproxy import http from mitmproxy import http
import asyncio import asyncio
from pkce_check import PKCEDowngradeChecker from pkce_check import PKCEDowngradeChecker
from addon.scope_detection import ScopeDetection from ScopeDetection import ScopeDetection
from csrf_check import CsrfChecker
from client_secret import ClientSecret
from addon.open_redirect_check import OpenRedirectChecker
from access_token import AccessTokenScanner
from addon.google_login_hint import GoogleLoginHint
from addon.google_response_type_token import GoogleResponseTypeToken
import os
from dotenv import load_dotenv
from lib.utils.try_catch import try_catch
from lib.false_true_varifing_task import FalseTrueVarifingTask
# Initialize the singleton task manager
false_true_varifing_task = FalseTrueVarifingTask()
load_dotenv(override=True) class PKCEAddon:
def __init__(self):
_open_redirect_checker = OpenRedirectChecker() self.checker = PKCEDowngradeChecker()
class AddonBase:
"""
Base class for addons.
Each addon should implement its own request or response method.
"""
def __init__(self) -> None:
if os.getenv('GOOGLE_ID'):
self.google_login_hint = GoogleLoginHint()
else:
self.google_login_hint = None
def should_ignore(self, flow: http.HTTPFlow) -> bool:
"""Check if the request should be ignored."""
ignore_domains = [
".googleapis.com",
"android.clients.google.com", # Added missing comma here
".adtrafficquality.google",
".googlesyndication.com",
"cdn.jsdelivr.net",
"update.googleapis.com",
".google-analytics.com",
".gstatic.com"
]
# Ignore .googleapis.com domains
for domain in ignore_domains:
if domain in flow.request.pretty_host:
return True
# Ignore static files (JS, CSS, fonts, images, etc.)
# Split on '?' to remove query parameters before checking extension
path = flow.request.path.split('?')[0].lower()
static_extensions = [
'.js', '.css', '.woff2', '.woff', '.ttf', '.otf', '.svg',
'.png', '.jpg', '.jpeg', '.gif', '.webp', '.ico', '.bmp',
'.tiff', '.tif', '.webm', '.mp4', '.avi', '.mov', '.pdf', '.md',
'.txt', '.csv'
]
if any(path.endswith(ext) for ext in static_extensions):
return True
return False
async def request(self, flow: http.HTTPFlow): async def request(self, flow: http.HTTPFlow):
if self.google_login_hint: print(
await try_catch(self.google_login_hint.request(flow)) f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}"
)
if false_true_varifing_task.is_verifing_false_true(): try:
return await self.checker.test(flow)
except Exception as e:
tasks = [ print(f"[ERROR] Addon failed: {e}")
try_catch(PKCEDowngradeChecker().test(flow)),
] class ScopeAddon:
await asyncio.gather(*tasks) def __init__(self):
self.checker = ScopeDetection()
self._flow_map = {} # 요청 정보를 저장
async def request(self, flow: http.HTTPFlow):
self._flow_map[flow.id] = {
"method": flow.request.method,
"url": flow.request.pretty_url,
"query": flow.request.query,
}
async def response(self, flow: http.HTTPFlow): async def response(self, flow: http.HTTPFlow):
if false_true_varifing_task.is_verifing_false_true() or self.should_ignore(flow): try:
return await self.checker.test(flow)
except Exception as e:
tasks = [ print(f"[ERROR] ScopeDetection failed: {e}")
try_catch(CsrfChecker().response(flow)),
try_catch(ScopeDetection().test(flow)),
try_catch(ClientSecret().test(flow)),
try_catch(AccessTokenScanner().scan(flow)),
try_catch(GoogleResponseTypeToken().test(flow)),
try_catch(_open_redirect_checker.test(flow)),
]
await asyncio.gather(*tasks)
addons = [AddonBase()]
addons = [PKCEAddon(), ScopeAddon()]

View file

@ -1,82 +0,0 @@
import jwt
from urllib.parse import urlparse, parse_qs
from typing import Union
from lib.report_vuln import report_vuln
class NonceChecker:
def is_oidc_flow(self, flow) -> bool:
req = flow.request
res = flow.response
url = req.pretty_url
parsed = urlparse(url)
query = parse_qs(parsed.query)
location = res.headers.get("location", "")
content_type = res.headers.get("content-type", "")
if "/authorize" in url and "response_type" in query and "openid" in query.get("scope", [""])[0]:
return True
if "application/json" in content_type:
if "id_token" in res.text:
return True
if res.status_code in [302, 303]:
if isinstance(location, list):
location = location[0]
if "id_token=" in location:
return True
if "/authorize" in url and "nonce" in query:
return True
return False
def extract_id_token(self, response) -> Union[str, None]:
"""
응답에서 id_token을 추출하는 함수.
"""
# 1. JSON 응답에 id_token 있음
try:
if "application/json" in response.headers.get("content-type", ""):
data = response.json()
return data.get("id_token")
except Exception:
pass
# 2. Location 헤더에서 id_token 파싱 (예: #id_token=...&access_token=...)
location = response.headers.get("location", "")
if location:
if "#" in location:
fragment = location.split("#")[1]
params = parse_qs(fragment)
return params.get("id_token", [None])[0]
elif "?" in location:
query = location.split("?")[1]
params = parse_qs(query)
return params.get("id_token", [None])[0]
return None
def decode_id_token(self, id_token: str) -> dict:
try:
return jwt.decode(id_token, options={"verify_signature": False})
except Exception as e:
return {}
# TODO id_token을 파싱하는 부분이 누락되어있습니다.
def check_nonce_in_id_token(self, flow, id_token: str) -> bool:
decoded = self.decode_id_token(id_token)
nonce = decoded.get("nonce")
if not nonce:
report_vuln(
title="Nonce Check Failed",
desc="id_token에 nonce가 없습니다.",
status="HIGH",
uri=flow.request.url
)
return False
else:
return True

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,10 @@
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import asyncio
import httpx import httpx
from typing import Dict, List from typing import Dict, List
from lib.report_vuln import report_vuln
import lib.target as target
from lib.report import save_report
class PKCEDowngradeChecker: class PKCEDowngradeChecker:
@ -55,19 +58,27 @@ class PKCEDowngradeChecker:
async def report_missing_parameters(self, url: str, is_openid: bool): async def report_missing_parameters(self, url: str, is_openid: bool):
status = "MEDIUM" if is_openid else "LOW" status = "MEDIUM" if is_openid else "LOW"
report_vuln( self.save(
title="PKCE Parameters Missing", [
desc="PKCE parameters are missing or incomplete.", self.make_report(
status=status, status,
uri=url, "PKCE Parameters Missing",
"PKCE parameters are missing or incomplete.",
url,
)
]
) )
async def report_plain_method(self, url: str): async def report_plain_method(self, url: str):
report_vuln( self.save(
title="PKCE Plain Method", [
desc="PKCE method is set to 'plain'. Possible downgrade.", self.make_report(
status="CRITICAL", "CRITICAL",
uri=url, "PKCE Plain Method",
"PKCE method is set to 'plain'. Possible downgrade.",
url,
)
]
) )
def create_downgraded_url(self, parsed, query): def create_downgraded_url(self, parsed, query):
@ -139,11 +150,15 @@ class PKCEDowngradeChecker:
else: else:
return # Likely safe return # Likely safe
report_vuln( self.save(
title=title, [
desc=description, self.make_report(
status=status, status,
uri=f"Original: {original_url}\nDowngraded: {downgraded_url}", title,
description,
f"Original: {original_url}\nDowngraded: {downgraded_url}",
)
]
) )
def same_redirect_destination(self, orig_loc, down_loc): def same_redirect_destination(self, orig_loc, down_loc):
@ -151,3 +166,16 @@ class PKCEDowngradeChecker:
down = urlparse(down_loc) down = urlparse(down_loc)
return orig.netloc == down.netloc and orig.path == down.path return orig.netloc == down.netloc and orig.path == down.path
def make_report(
self, status: str, title: str, description: str, uri: str
) -> Dict[str, str]:
return {
"target": target.load(),
"status": status,
"title": title,
"description": description,
"uri": uri,
}
def save(self, report_data: List[Dict[str, str]]):
save_report(report_data)

View file

@ -1,32 +0,0 @@
from lib.report_vuln import report_vuln
from lib.utils.is_oauth_uri import is_oauth_uri
from urllib.parse import urlparse, parse_qs
class ScopeDetection:
def get_scope_from_query(self, query: str) -> str | None:
if not query:
return None
parsed = parse_qs(query)
scope_values = parsed.get("scope", [])
if scope_values:
return scope_values[0]
return None
async def test(self, flow):
if not is_oauth_uri(flow.request.pretty_url):
return
req = flow.request
parsed = urlparse(req.pretty_url)
query = parsed.query
query_scope = self.get_scope_from_query(query)
if query_scope in ["all", "*"]:
report_vuln(
title="OAuth Scope Value Issue",
desc=f"Scope value issue detected in request: {query_scope}",
status="WARNING",
uri=req.pretty_url
)

View file

@ -1,65 +0,0 @@
from typing import Any
from copy import deepcopy
class FalseTrueVarifingTask:
"""
A singleton class representing a task that can be either false or true.
This class is used to handle tasks that require verification of their truth value.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(FalseTrueVarifingTask, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._is_verifing = False
self.task_queue = []
self._initialized = True
def reset(self):
"""
Reset the task queue and verification status.
"""
self._is_verifing = False
self.task_queue.clear()
# 각 addon의 검증 로직에서 해당 함수를 호출하여, 추후 오탐 검증을 위한 작업을 추가할 수 있습니다.
# TODO: 모델 지정해두기
def add_task(self, task_name: str, initial_uri: str, data: Any):
"""
Add a task to the task queue.
:param task: The task to be added.
"""
self.task_queue.append(
{
"task_name": task_name,
"initial_uri": initial_uri,
"data": data
}
)
def start_verification(self):
"""
Start the verification process for the tasks in the queue.
"""
self._is_verifing = True
def get_task_queue(self):
"""
Get a copy of the current task queue.
:return: A copy of the task queue.
"""
return deepcopy(self.task_queue)
def is_verifing_false_true(self):
"""
Get the current verification status.
:return: True if verification is in progress, False otherwise.
"""
return self._is_verifing

36
lib/report.py Normal file
View file

@ -0,0 +1,36 @@
# save as data/report.csv
import os
import csv
from typing import List, Dict, Any
# get unix timestamp
import time
def get_timestamp() -> str:
"""
Get the current Unix timestamp as a string.
:return: Current Unix timestamp.
"""
return str(int(time.time()))
# target, status, title, description, uri
# file path는 'data/report.csv'로 고정
def save_report(report_data: List[Dict[str, Any]], file_path: str = f'data/report-{get_timestamp()}.csv') -> None:
"""
Save the report data to a CSV file.
:param report_data: List of dictionaries containing report data.
:param file_path: Path to the CSV file where the report will be saved.
"""
os.makedirs(os.path.dirname(file_path), exist_ok=True)
fieldnames = ['target', 'status', 'title', 'description', 'uri']
with open(file_path, mode='w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for row in report_data:
# Replace actual newlines with literal \n strings
escaped_row = {k: str(v).replace('\n', '\\n') if v is not None else v for k, v in row.items()}
writer.writerow(escaped_row)

View file

@ -1,34 +0,0 @@
# save as data/report.csv
import os
import csv
from mitmproxy import http
import lib.cur_target_url as cur_target_url
# target, status, title, description, uri
# file path는 'data/report.csv'로 고정
def report_vuln(title: str, desc: str, status: str, uri: str) -> None:
file_path: str = 'data/report.csv'
os.makedirs(os.path.dirname(file_path), exist_ok=True)
"""
report_data 안의 레포트를 줄씩 CSV에 추가로 저장합니다.
파일이 없으면 헤더를 먼저 쓰고, 있으면 레코드만 이어서 씁니다.
"""
fieldnames = ['target', 'status', 'title', 'description', 'uri']
file_exists = os.path.exists(file_path)
with open(file_path, mode='a', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# 파일이 없던 새로 만들 때만 헤더 작성
if not file_exists:
writer.writeheader()
writer.writerow({
'target': cur_target_url.load(),
'status': status,
'title': title,
'description': desc,
'uri': uri,
})

View file

@ -1,10 +0,0 @@
from urllib.parse import urlparse, parse_qs
def is_oauth_uri(uri: str) -> bool:
qs = parse_qs(urlparse(uri).query)
qs_keys = [*qs]
if "client_id" in qs_keys and any(p in qs_keys for p in (
"redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")):
return True
return False

View file

@ -1,5 +0,0 @@
async def try_catch(coro):
try:
return await coro
except Exception as e:
print(f"[ERROR] {coro} failed: {e}")

30
main.py
View file

@ -1,23 +1,21 @@
from runner.proxy import run_proxy from runner.proxy import run_proxy
import subprocess
import threading import threading
import uvicorn
from runner.backend import app
def run_fastapi_server():
"""FastAPI 서버를 실행하는 함수"""
uvicorn.run(app, host="localhost", port=11081, log_level="info")
if __name__ == "__main__": if __name__ == "__main__":
# Start web server in a separate thread
server_process = subprocess.Popen([
"granian",
"--interface", "asgi",
"--host", "0.0.0.0",
"--port", "11081",
"--loop", "asyncio",
"--reload",
"runner.backend:app",
])
try: try:
# FastAPI 서버를 백그라운드 스레드에서 실행 # Run mitmdump proxy
fastapi_thread = threading.Thread(target=run_fastapi_server, daemon=True)
fastapi_thread.start()
print("🚀 FastAPI server started on http://localhost:11081")
# Run mitmdump proxy (메인 스레드에서 실행)
print("🛡️ Starting mitmdump proxy on port 11080...")
run_proxy() run_proxy()
except KeyboardInterrupt:
print("🛑 Shutting down...")
finally: finally:
print("✅ Mitmdump proxy has been stopped.") server_process.terminate()

View file

@ -10,5 +10,4 @@ dependencies = [
"httpx>=0.28.1", "httpx>=0.28.1",
"fastapi[standard]>=0.115.12", "fastapi[standard]>=0.115.12",
"granian>=2.3.2", "granian>=2.3.2",
"PyJWT>=2.10.1",
] ]

View file

@ -1,107 +1,17 @@
from fastapi import FastAPI, Query, HTTPException from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import Response from fastapi.responses import Response
import lib.cur_target_url as cur_target_url import lib.target as target
from lib.false_true_varifing_task import FalseTrueVarifingTask
from pydantic import BaseModel, Field
from lib.report_vuln import report_vuln as save_vuln
# Initialize the singleton task manager
false_true_varifing_task = FalseTrueVarifingTask()
app = FastAPI() app = FastAPI()
@app.post("/start")
@app.post( async def start(url: str = Query(None)):
"/start", if url:
summary="취약점 검증을 위한 대상 URL 설정", target.save(url)
description=""" print(f"Target URL set to: {url}")
엔드포인트는 시스템이 취약점 검증 작업에 사용할 대상 URL을 설정합니다. return {"message": f"Target URL set to: {url}"}
return {"error": "No URL provided"}
유효한 URL이 제공되면:
- 해당 URL이 저장됩니다.
- 검증 작업 큐가 초기화됩니다.
- 새로운 검증 작업을 시작할 준비가 완료됩니다.
URL이 제공되지 않으면, 오류가 반환됩니다.
"""
,
tags=["1st STEP"]
)
async def start(url: str = Query(..., description="The URL to target for vulnerability verification")):
cur_target_url.save(url)
false_true_varifing_task.reset()
return {"message": f"Target URL set to: {url}"}
@app.post(
"/start-false-true-verifing",
summary="시스템에 오탐 검증 작업 시작을 알림",
description="""
엔드포인트는 시스템에 오탐 검증 작업이 시작되었음을 알립니다.
또한 시스템은 미리 준비된 오탐 검증 작업 목록을 반환합니다.
```json
{
"payload": [
{
"task_name": "pkce_task", # 검증 작업의 이름
"initial_uri": "http://auth.example.com", # browser가 처음 접속할 URI
"data": any # 추가 데이터
},
...
]
}
```
""",
tags=["2nd STEP"]
)
async def start_false_true_verifing():
false_true_varifing_task.start_verification()
task_queue = false_true_varifing_task.get_task_queue()
return {"payload": task_queue}
class VulnerabilityReport(BaseModel):
title: str = Field(..., description="Short title for the vulnerability")
url: str = Field(..., description="URL where the vulnerability was discovered")
status: str = Field(..., description="Status of the vulnerability (e.g., VERIFIED-CRITICAL)")
desc: str = Field(..., description="Detailed description of the issue")
@app.post(
"/report-vuln",
summary="취약점 보고",
description="""
정탐인 취약점을 시스템에 보고합니다.
보고 다음 정보를 포함해야 합니다:
- **title**: 취약점의 간단한 이름
- **url**: 취약점이 발견된 위치 (URL)
- **status**: 심각도
- **desc**: 취약점에 대한 상세 설명
""",
tags=["3rd STEP"]
)
async def report_vuln(vuln: VulnerabilityReport):
save_vuln(
title=vuln.title,
desc=vuln.desc,
status=vuln.status,
uri=vuln.url
)
return {"message": "Vulnerability reported successfully"}
@app.exception_handler(404) @app.exception_handler(404)
@ -110,4 +20,4 @@ async def not_found_handler(request, exc):
@app.exception_handler(405) @app.exception_handler(405)
async def method_not_allowed_handler(request, exc): async def method_not_allowed_handler(request, exc):
return Response(status_code=405) return Response(status_code=405)

View file

@ -3,4 +3,4 @@ from mitmproxy.tools.main import mitmdump
def run_proxy(): def run_proxy():
sys.argv = ["mitmdump", "-s", "./addon/init.py", "--listen-port", "11080"] sys.argv = ["mitmdump", "-s", "./addon/init.py", "--listen-port", "11080"]
mitmdump() mitmdump()

11
uv.lock generated
View file

@ -755,7 +755,6 @@ dependencies = [
{ name = "granian" }, { name = "granian" },
{ name = "httpx" }, { name = "httpx" },
{ name = "mitmproxy" }, { name = "mitmproxy" },
{ name = "pyjwt" },
] ]
[package.metadata] [package.metadata]
@ -765,7 +764,6 @@ requires-dist = [
{ name = "granian", specifier = ">=2.3.2" }, { name = "granian", specifier = ">=2.3.2" },
{ name = "httpx", specifier = ">=0.28.1" }, { name = "httpx", specifier = ">=0.28.1" },
{ name = "mitmproxy", specifier = ">=12.1.1" }, { name = "mitmproxy", specifier = ">=12.1.1" },
{ name = "pyjwt", specifier = ">=2.10.1" },
] ]
[[package]] [[package]]
@ -918,15 +916,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/8a/0b/9fcc47d19c48b59121088dd6da2488a49d5f72dacf8262e2790a1d2c7d15/pygments-2.19.1-py3-none-any.whl", hash = "sha256:9ea1544ad55cecf4b8242fab6dd35a93bbce657034b0611ee383099054ab6d8c", size = 1225293, upload-time = "2025-01-06T17:26:25.553Z" }, { url = "https://files.pythonhosted.org/packages/8a/0b/9fcc47d19c48b59121088dd6da2488a49d5f72dacf8262e2790a1d2c7d15/pygments-2.19.1-py3-none-any.whl", hash = "sha256:9ea1544ad55cecf4b8242fab6dd35a93bbce657034b0611ee383099054ab6d8c", size = 1225293, upload-time = "2025-01-06T17:26:25.553Z" },
] ]
[[package]]
name = "pyjwt"
version = "2.10.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/e7/46/bd74733ff231675599650d3e47f361794b22ef3e3770998dda30d3b63726/pyjwt-2.10.1.tar.gz", hash = "sha256:3cc5772eb20009233caf06e9d8a0577824723b44e6648ee0a2aedb6cf9381953", size = 87785, upload-time = "2024-11-28T03:43:29.933Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/61/ad/689f02752eeec26aed679477e80e632ef1b682313be70793d798c1d5fc8f/PyJWT-2.10.1-py3-none-any.whl", hash = "sha256:dcdd193e30abefd5debf142f9adfcdd2b58004e644f25406ffaebd50bd98dacb", size = 22997, upload-time = "2024-11-28T03:43:27.893Z" },
]
[[package]] [[package]]
name = "pylsqpack" name = "pylsqpack"
version = "0.3.22" version = "0.3.22"