Compare commits

..

4 commits

Author SHA1 Message Date
tk
2bfe057889 최신 main pull 2025-06-12 21:50:46 +09:00
tk
568d3f0ce5 [수정] req,res 구분 코드 2025-06-09 23:35:52 +09:00
tk
4059cc7adb 불필요한 코드 수정 2025-06-09 23:12:28 +09:00
tk
cba2d545b6 client_secret 검증(query,body,header) 2025-06-09 21:37:34 +09:00
24 changed files with 657 additions and 2394 deletions

2
.env
View file

@ -1,2 +0,0 @@
# Google OAuth 설정
GOOGLE_ID=whs.imnya.ng@gmail.com

View file

@ -16,7 +16,7 @@ jobs:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- name: Install uv - name: Install uv
uses: astral-sh/setup-uv@v6 uses: astral-sh/setup-uv@v4
with: with:
enable-cache: true enable-cache: true
cache-dependency-glob: "uv.lock" cache-dependency-glob: "uv.lock"
@ -27,10 +27,6 @@ jobs:
- name: Install dependencies - name: Install dependencies
run: uv sync run: uv sync
- name: Set up environment variables
run: |
echo "GOOGLE_ID=bot.imnya.ng@gmail.com" > .env
- name: Start application and run proxy test - name: Start application and run proxy test
run: | run: |
# Start the application in background # Start the application in background
@ -41,12 +37,7 @@ jobs:
sleep 5 sleep 5
# Test proxy functionality # Test proxy functionality
sudo cp ~/.mitmproxy/mitmproxy-ca-cert.pem /usr/local/share/ca-certificates/mitmproxy-ca-cert.crt curl -x http://localhost:11080 http://example.com
sudo update-ca-certificates
mkdir data
echo https://github.com > ./data/target.dump
curl -k -x https://localhost:11080 "https://github.com/login/oauth/authorize?client_id=Ov23lixietSCQOHxPvcr&redirect_uri=http://localhost:8787&scope=read:user+user:email&response_type=code&code_challenge=abc123&code_challenge_method=S256"
# Clean up # Clean up
kill $APP_PID kill $APP_PID

2
.gitignore vendored
View file

@ -9,8 +9,6 @@ wheels/
# Virtual environments # Virtual environments
.venv .venv
.env
data/ data/

View file

@ -1,7 +1,5 @@
# 환경 설정 # 환경 설정
## Python Virtual Environment
이 프로젝트는 [uv](https://docs.astral.sh/uv/getting-started/installation/)라는 Python 패키지 관리자를 사용하여 설정해야합니다. 이 프로젝트는 [uv](https://docs.astral.sh/uv/getting-started/installation/)라는 Python 패키지 관리자를 사용하여 설정해야합니다.
uv 설치 후 다음과 같은 명령어를 입력합니다. uv 설치 후 다음과 같은 명령어를 입력합니다.
@ -10,33 +8,8 @@ uv 설치 후 다음과 같은 명령어를 입력합니다.
uv sync uv sync
``` ```
## Environment
venv와 패키지가 설치가 됩니다. venv와 패키지가 설치가 됩니다.
.env.example을 복사하여 .env를 붙여넣습니다.
`GOOGLE_ID=`에 봇에서 쓸 구글 계정의 전체 GMail을 기입합니다.
입력하지 않는다면 Google OAuth시 자동적으로 넘어가지 않을 수도 있습니다.
---
> [oauth-backend](https://github.com/j93es/oauth-backend) 프록시를 사용한다면 이 가이드에 따라 인증서 또한 설정되어야만 합니다.
>
> 그렇지 않으면 실행되지 않습니다.
>
> 윈도우 환경에서는 `sudo certutil -addstore root mitmproxy-ca-cert.cer`로 인증합니다.
>
> Sudo가 활성화되어있지 않은 환경에서는 관리자로 상향된 쉘에서 실행합니다.
>
> MacOS 환경에서는 `sudo security add-trusted-cert -d -p ssl -p basic -k /Library/Keychains/System.keychain ~/.mitmproxy/mitmproxy-ca-cert.pem`으로 인증합니다.
>
> 다른 플렛폼은 수동으로 설정되어야만 합니다.
> https://docs.mitmproxy.org/stable/concepts/certificates/
---
# 실행 방법 # 실행 방법
``` ```
@ -44,51 +17,46 @@ uv run main.py
``` ```
이러면 http(s)://localhost:11080로 서버가 열리게 됩니다. 이러면 http(s)://localhost:11080로 서버가 열리게 됩니다.
http://localhost:11081로 백엔드 서버가 열리게 됩니다.
# 기여 방법 # 기여 방법
`./addon/init.py` `./addon/init.py`
```py ```py
... from example_check import Example
async def request(self, flow: http.HTTPFlow):
if false_true_varifing_task.is_verifing_false_true(): class LoggerAddon:
return def __init__(self):
self.checker = Example()
def request(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것
self.checker.test(flow)
def response(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것
self.checker.test(flow)
tasks = [
try_catch(self.google_login_hint.request(flow)) if self.google_login_hint else None,
try_catch(PKCEDowngradeChecker().test(flow)),
try_catch(Example().test(flow))
]
await asyncio.gather(*tasks)
...
``` ```
`./addon/example.py` `./addon/example.py`
```py ```py
from lib.report_vuln import report_vuln import lib.target as target
from lib.report import save_report
class Example: class Example:
async def test(self, flow): async def test(self, flow):
req = flow.request
method = req.method
url = req.pretty_url url = req.pretty_url
# data/report.csv에 저장 # data/report.csv에 저장
report_vuln( report_data = [{
title="PKCE Plain Method", 'target': target.load(),
desc="PKCE method is set to 'plain'. Possible downgrade.", 'status': "CRITICAL",
status="CRITICAL", 'title': "PKCE Downgrade Vulnerability",
uri=url, 'description': "PKCE downgrade vulnerability detected! Both URLs returned authorization code.",
) 'uri': f"Original: {url}\nDowngraded: {downgraded_url}"
}]
save_report(report_data)
``` ```
이러한 예제를 참고하여 작성하여주세요. 이러한 예제를 참고하여 작성하여주세요.
# 백엔드 API DOCS
`uv run main.py`으로 백엔드를 실행한 후에, 다음의 url에 접속합니다.
```
http://localhost:11081/redoc
```

53
addon/ScopeDetection.py Normal file
View file

@ -0,0 +1,53 @@
import lib.target as target
from lib.report import save_report
class ScopeDetection:
def get_scope_from_query(self, query: str) -> str | None:
if not query:
return None
import urllib.parse
parsed = urllib.parse.parse_qs(query)
scope_values = parsed.get("scope", [])
if scope_values:
return scope_values[0]
return None
async def check_scope(self, flow):
req = flow.request
res = flow.response
# req.query가 MultiDictView일 수 있으므로 문자열로 변환
if hasattr(req.query, "urlencode"):
query = req.query.urlencode()
else:
query = str(req.query) if req.query else ""
location = res.headers.get("location", "")
query_scope = self.get_scope_from_query(query)
location_scope = self.get_scope_from_query(location)
result = []
if query_scope in ["all", "*"]:
result.append(f"Scope value issue detected in request: {query_scope}")
if location_scope in ["all", "*"]:
result.append(f"Scope value issue detected in response location: {location_scope}")
return result if result else 0
async def test(self, flow):
req = flow.request
method = req.method
url = req.pretty_url
result = await self.check_scope(flow)
if result != 0:
report_data = [{
'target': target.load(),
'status': "WARNING",
'title': "OAuth scope value issue",
'description': f"{method} {url}: {', '.join(result)}",
'uri': url
}]
save_report(report_data)

View file

@ -1,10 +1,23 @@
import re import re
import asyncio from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Any
from typing import Optional, Any
from mitmproxy.http import HTTPFlow from mitmproxy.http import HTTPFlow
from urllib.parse import urlparse, parse_qs
from lib.report_vuln import report_vuln import lib.target as target
from lib.report import save_report
# 결과 리포트 저장용 데이터 클래스
@dataclass
class TokenLeakResult:
title: str
description: str
uri: str
status: str = "MEDIUM" # 기본 상태
def to_report(self, target_value) -> Dict[str, str]:
"""리포트 저장 포맷(dict)으로 변환"""
return {"target": target_value, **asdict(self)}
# 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너 # 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너
@ -13,24 +26,31 @@ class AccessTokenScanner:
async def scan(self, flow: HTTPFlow) -> None: async def scan(self, flow: HTTPFlow) -> None:
"""단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사.""" """단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사."""
print(f"[TOKENDEBUG] Request URL: {flow.request.url}") print(f"[TOKENDEBUG] Request URL: {flow.request.url}")
findings: List[TokenLeakResult] = []
async_gather = [] findings.extend(await self._scan_request(flow.request))
async_gather.append(self._scan_request(flow.request)) findings.extend(await self._scan_response(flow.response, flow.request.url))
async_gather.append(self._scan_response(flow.response, flow.request.url))
await asyncio.gather(*async_gather) if findings:
target_value = target.load()
save_report([f.to_report(target_value) for f in findings])
# 내부 구현 # 내부 구현
async def _scan_request(self, request: Any): async def _scan_request(self, request: Any) -> List[TokenLeakResult]:
results: List[TokenLeakResult] = []
print("[TOKENDEBUG] ==scan request==") print("[TOKENDEBUG] ==scan request==")
# URL 검사 # URL 검사
if self._is_implicit_flow(request.url): token_result = self._extract_token(request.url)
print("[TOKENDEBUG] OAuth Implicit Flow detected.") if token_result:
report_vuln( token, has_bearer = token_result
title="Token Leak in Request URL", results.append(
desc="취약한 Grant Type입니다 (Implicit Grant Type)", TokenLeakResult(
status="MEDIUM", title="Token Leak in Request URL",
uri=request.url description=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}",
uri=request.url,
status="MEDIUM" if has_bearer else "LOW"
)
) )
# Body 검사 (텍스트 컨텐츠인 경우) # Body 검사 (텍스트 컨텐츠인 경우)
@ -38,40 +58,54 @@ class AccessTokenScanner:
body_text = request.get_text(strict=False) body_text = request.get_text(strict=False)
token_result = self._extract_token(body_text) token_result = self._extract_token(body_text)
if token_result: if token_result:
report_vuln( token, has_bearer = token_result
title="Token Leak in Request Body", results.append(
desc=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token_result[:20]}", TokenLeakResult(
status="LOW", title="Token Leak in Request Body",
uri=request.url description=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}",
uri=request.url,
status="MEDIUM" if has_bearer else "LOW"
)
) )
async def _scan_response(self, response: Optional[Any], request_url: str): return results
if response is None:
return
async def _scan_response(self, response: Optional[Any], request_url: str) -> List[TokenLeakResult]:
if response is None:
return []
results: List[TokenLeakResult] = []
print("[TOKENDEBUG] ==scan response==") print("[TOKENDEBUG] ==scan response==")
# Location 헤더 검사 (리다이렉트) # Location 헤더 검사 (리다이렉트)
if location_header := response.headers.get("Location"): if location_header := response.headers.get("Location"):
token_result = self._extract_token(location_header) token_result = self._extract_token(location_header)
if token_result: if token_result:
report_vuln( token, has_bearer = token_result
title="Token Leak in Redirect URL (Location header)", if has_bearer:
desc=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token_result[:20]}", results.append(
status="MEDIUM", TokenLeakResult(
uri=location_header, title="Token Leak in Redirect URL (Location header)",
) description=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}",
uri=location_header,
)
)
# Body 검사 (텍스트 컨텐츠인 경우) # Body 검사 (텍스트 컨텐츠인 경우)
if response.content: if response.content:
body_text = response.get_text(strict=False) body_text = response.get_text(strict=False)
token_result = self._extract_token(body_text) token_result = self._extract_token(body_text)
if token_result: if token_result:
report_vuln( token, has_bearer = token_result
title="Token Leak in Response Body", if has_bearer:
desc=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token_result[:20]}", results.append(
status="LOW", TokenLeakResult(
uri=request_url, title="Token Leak in Response Body",
) description=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}",
uri=request_url,
)
)
return results
# 토큰 탐지 키워드드 # 토큰 탐지 키워드드
_TOKEN_KEYS = [ _TOKEN_KEYS = [
@ -81,6 +115,8 @@ class AccessTokenScanner:
"refreshtoken", "refreshtoken",
"auth_token", "auth_token",
"session_token", "session_token",
"secret_token",
"ssoauth",
] ]
# "bearer" 표시가 동시에 존재할 때만 토큰으로 판단하여 false positive를 줄임 # "bearer" 표시가 동시에 존재할 때만 토큰으로 판단하여 false positive를 줄임
@ -114,38 +150,6 @@ class AccessTokenScanner:
if (m := pattern.search(text)) and m.group(1): if (m := pattern.search(text)) and m.group(1):
print(f"[TOKENDEBUG] token: {m.group(1)}") print(f"[TOKENDEBUG] token: {m.group(1)}")
print(f"[TOKENDEBUG] has_bearer: {has_bearer}") print(f"[TOKENDEBUG] has_bearer: {has_bearer}")
if has_bearer: return m.group(1), has_bearer
return m.group(1)
print("[TOKENDEBUG] No matched.") print("[TOKENDEBUG] No matched.")
return None return None
def _is_implicit_flow(self, request_url: str) -> bool:
"""
URL의 파라미터에서 OAuth Implicit Flow 패턴을 체크합니다.
Args:
request_url: 체크할 요청 URL
Returns:
bool: client_id, redirect_uri, response_type이 모두 존재하고
response_type 값이 'token' 경우 True, 그렇지 않으면 False
"""
try:
parsed_url = urlparse(request_url)
query_params = parse_qs(parsed_url.query)
# 필요한 파라미터들이 모두 존재하는지 확인
required_params = ['redirect_uri', 'response_type']
for param in required_params:
if param not in query_params:
return False
# response_type 값이 'token'인지 확인
response_type_values = query_params.get('response_type', [])
# response_type 파라미터가 존재하고 값 중에 'token'이 있는지 확인
return 'token' in response_type_values or 'id_token' in response_type_values
except Exception:
return False

View file

@ -0,0 +1,76 @@
# clientsecret_check.py
from mitmproxy import http
from urllib.parse import urlparse, parse_qs
from typing import List
import lib.target as target
from lib.report import save_report
class ClientSecretChecker:
def is_oauth_uri(self, uri: str) -> bool:
qs = parse_qs(urlparse(uri).query)
keys = qs.keys()
return "client_id" in keys and ("client_secret" in keys or "client_secret" in uri)
def has_client_secret_in_uri(self, uri: str) -> bool:
qs = parse_qs(urlparse(uri).query)
return "client_secret" in qs
def is_post_with_client_secret(self, flow: http.HTTPFlow) -> bool:
if flow.request.method != "POST":
return False
content_type = flow.request.headers.get("Content-Type", "")
if "application/x-www-form-urlencoded" in content_type:
body = parse_qs(flow.request.get_text())
return "client_secret" in body
return False
def is_exposed_in_referer(self, flow: http.HTTPFlow) -> bool:
referer = flow.request.headers.get("Referer", "")
return "client_secret" in referer
def check_client_secret_leak(self, flow: http.HTTPFlow) -> List[str]:
messages = []
if self.has_client_secret_in_uri(flow.request.url):
messages.append("client_secret found in URL query string")
if self.is_post_with_client_secret(flow):
messages.append("client_secret found in POST body")
if self.is_exposed_in_referer(flow):
messages.append("client_secret exposed in Referer header")
return messages
def _report(self, flow: http.HTTPFlow, issues: List[str], direction: str):
desc = " | ".join(issues)
report_data = [{
'target': target.load(),
'status': "HIGH",
'title': f"OAuth Client Secret Exposure ({direction})",
'description': desc,
'uri': flow.request.url,
}]
save_report(report_data)
print(f"[INFO] Client Secret Leak Detected ({direction}): {desc}")
def request(self, flow: http.HTTPFlow) -> None:
try:
if not self.is_oauth_uri(flow.request.url):
return
issues = self.check_client_secret_leak(flow)
if issues:
self._report(flow, issues, "request")
except Exception as e:
print(f"[ERROR] Client Secret Check (request) failed: {e}")
def response(self, flow: http.HTTPFlow) -> None:
try:
if not self.is_oauth_uri(flow.request.url):
return
issues = self.check_client_secret_leak(flow)
if issues:
self._report(flow, issues, "response")
except Exception as e:
print(f"[ERROR] Client Secret Check (response) failed: {e}")

View file

@ -1,29 +0,0 @@
from lib.report_vuln import report_vuln
from urllib.parse import urlparse, parse_qs
class ClientSecret:
def get_target_from_query(self, query: str, target: str) -> str | None:
if not query:
return None
parsed = parse_qs(query)
scope_values = parsed.get(target, [])
if scope_values:
return scope_values[0]
return None
async def test(self, flow):
req = flow.request
parsed = urlparse(req.pretty_url)
query = parsed.query
query_client_id = self.get_target_from_query(query, "client_id")
query_client_secret = self.get_target_from_query(query, "client_secret")
if query_client_id and query_client_secret:
report_vuln(
title="OAuth Client Secret Exposure",
desc=f"Client ID and Secret found in request: {query_client_id}, {query_client_secret}",
status="CRITICAL",
uri=req.pretty_url
)

View file

@ -1,17 +1,26 @@
# csrf_check.py # csrf_check.py
from mitmproxy import http from mitmproxy import http, ctx
from urllib.parse import urlparse, parse_qs, unquote from urllib.parse import urlparse, parse_qs, unquote
import httpx import httpx
from typing import Optional, Union, List from typing import Optional, Union, List
from lib.report_vuln import report_vuln import lib.target as target
from lib.utils.is_oauth_uri import is_oauth_uri from lib.report import save_report
class CsrfChecker: class CsrfChecker:
nonce_params = { nonce_params = {
"state", "nonce", "csrf_token", "csrf" "state", "nonce", "as", "frame_id", "csrf_token", "csrf"
} }
def is_oauth_uri(self, uri: str) -> bool:
qs = parse_qs(urlparse(uri).query)
qs_keys = [*qs]
if "client_id" in qs_keys and any(p in qs_keys for p in (
"redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")):
return True
return False
def get_header(self, headers: http.Headers, name: str) -> Optional[str]: def get_header(self, headers: http.Headers, name: str) -> Optional[str]:
# mitmproxy Headers는 case-insensitive # mitmproxy Headers는 case-insensitive
raw = headers.get(name) raw = headers.get(name)
@ -31,7 +40,7 @@ class CsrfChecker:
def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool: def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool:
code = flow.response.status_code code = flow.response.status_code
loc = self.get_header(flow.response.headers, "location") or "" loc = self.get_header(flow.response.headers, "location") or ""
return 300 <= code < 400 and is_oauth_uri(loc) return 300 <= code < 400 and self.is_oauth_uri(loc)
def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool: def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool:
qs = parse_qs(urlparse(flow.request.url).query) qs = parse_qs(urlparse(flow.request.url).query)
@ -65,7 +74,7 @@ class CsrfChecker:
def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]: def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]:
# ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답 # ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답
if not (is_oauth_uri(flow.request.url) if not (self.is_oauth_uri(flow.request.url)
and self.check_nonce_in_request(flow) and self.check_nonce_in_request(flow)
and self.is_oauth_redirect(flow)): and self.is_oauth_redirect(flow)):
return 0 return 0
@ -76,29 +85,18 @@ class CsrfChecker:
resp_nonce = self.get_query_param(loc, param) if param else None resp_nonce = self.get_query_param(loc, param) if param else None
if resp_nonce is None: if resp_nonce is None:
report_vuln( return ["Missing nonce in redirect"]
title="CSRF Risk",
desc="Missing nonce in redirect response",
status="CRITICAL",
uri=flow.request.url
)
return 1
if orig_nonce != resp_nonce: if orig_nonce != resp_nonce:
report_vuln( return ["Nonce mismatch request↔response"]
title="CSRF Risk",
desc="Nonce mismatch request↔response",
status="HIGH",
uri=flow.request.url
)
return 1
return 0 return 0
async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]: async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]:
# OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사 # OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사
if is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow): if self.is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow):
return 0 return 0
loc0 = self.get_header(flow.response.headers, "location") or "" loc0 = self.get_header(flow.response.headers, "location") or ""
param = self.find_nonce_param(loc0) or "state" param = self.find_nonce_param(loc0) or "state"
qs0 = parse_qs(urlparse(loc0).query) qs0 = parse_qs(urlparse(loc0).query)
@ -113,16 +111,10 @@ class CsrfChecker:
if new_nonce is None: if new_nonce is None:
return 0 return 0
if new_nonce == orig_nonce: if new_nonce == orig_nonce:
report_vuln( return ["Nonce reused without cookies"]
title="CSRF Risk",
desc="Nonce reused without cookies",
status="CRITICAL",
uri=flow.request.url
)
return 1
# (2) 두 번의 리다이렉트 비교 # (2) 두 번의 리다이렉트 비교
async with httpx.AsyncClient(follow_redirects=True) as cli: async with httpx.AsyncClient(follow_redirects=False) as cli:
# 원본 쿼리 # 원본 쿼리
req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers) req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers)
# nonce 교체 쿼리 # nonce 교체 쿼리
@ -135,35 +127,42 @@ class CsrfChecker:
and urlparse(req1.headers.get("location", "")).path and urlparse(req1.headers.get("location", "")).path
== urlparse(req2.headers.get("location", "")).path == urlparse(req2.headers.get("location", "")).path
): ):
report_vuln( return ["Identical redirects on nonce swap → potential CSRF"]
title="CSRF Risk",
desc="Identical redirects on nonce swap → potential CSRF",
status="NOT-VERIFIED-HIGH",
uri=flow.request.url
)
return 1
return 0 return 0
async def response(self, flow: http.HTTPFlow) -> None: async def response(self, flow: http.HTTPFlow) -> None:
try: try:
msgs: List[str] = []
# 1) 요청에 nonce 없으면 # 1) 요청에 nonce 없으면
if is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow): if self.is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow):
report_vuln( msgs.append("Missing state/nonce in request")
title="CSRF Risk",
desc="Missing nonce in OAuth request",
status="CRITICAL",
uri=flow.request.url
)
return
# 2) 리다이렉트에서 nonce 검사 # 2) 리다이렉트에서 nonce 검사
r1 = self.check_redirect_nonce(flow) r1 = self.check_redirect_nonce(flow)
if r1:
msgs.extend(r1 if isinstance(r1, list) else [])
# 3) nonce 재사용 검사 # 3) nonce 재사용 검사
r2 = await self.check_nonce_reuse(flow) r2 = await self.check_nonce_reuse(flow)
if r2:
msgs.extend(r2 if isinstance(r2, list) else [])
if msgs:
desc = " | ".join(msgs)
status = "MEDIUM"
report_data = [{
'target': target.load(),
'status': status,
'title': "CSRF Risk",
'description': desc,
'uri': flow.request.url,
}]
save_report(report_data)
print(f"[INFO] CSRF Check: {desc}")
else:
pass
except Exception as e: except Exception as e:
print(f"[ERROR] CSRF Check failed: {e}") print(f"[ERROR] CSRF Check failed: {e}")
return return

View file

@ -1,67 +0,0 @@
import os
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
from dotenv import load_dotenv
# .env 파일 로드
load_dotenv(override=True)
class GoogleLoginHint:
def __init__(self):
self.google_id = os.getenv('GOOGLE_ID', '')
if not self.google_id:
print("⚠️ Warning: GOOGLE_ID not found in .env file")
async def request(self, flow):
"""Google OAuth 요청을 가로채서 login_hint를 추가하거나 수정"""
req = flow.request
url = req.pretty_url
# Google OAuth 인증 URL인지 확인
if self._is_google_oauth_url(url):
print(f"🔍 Google OAuth URL detected: {url}")
# URL 파싱
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
# login_hint 추가 또는 수정
if self.google_id:
query_params['login_hint'] = [self.google_id]
print(f"✅ Added/Updated login_hint: {self.google_id}")
# 새로운 쿼리 스트링 생성
new_query = urlencode(query_params, doseq=True)
# 새로운 URL 생성
new_url = urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
new_query,
parsed_url.fragment
))
# 요청 URL 수정 - URL과 호스트 모두 업데이트
flow.request.url = new_url
print(f"🔄 Modified URL: {new_url}")
def _is_google_oauth_url(self, url):
"""Google OAuth URL인지 확인"""
google_oauth_domains = [
'accounts.google.com',
'oauth2.googleapis.com'
]
parsed_url = urlparse(url)
domain = parsed_url.netloc.lower()
# Google OAuth 도메인 확인
for google_domain in google_oauth_domains:
if google_domain in domain:
# OAuth 관련 경로 확인
path = parsed_url.path.lower()
if any(oauth_path in path for oauth_path in ['/oauth2', '/auth', '/login']):
return True
return False

View file

@ -1,52 +0,0 @@
from lib.report_vuln import report_vuln
import httpx
from lib.utils.is_oauth_uri import is_oauth_uri
from urllib.parse import urlparse, parse_qs
class GoogleResponseTypeToken:
def get_taregt_from_query(self, query: str, target: str) -> str | None:
if not query:
return None
parsed = parse_qs(query)
scope_values = parsed.get(target, [])
if scope_values:
return scope_values[0]
return None
async def test(self, flow):
req = flow.request
if not is_oauth_uri(req.pretty_url):
return
if req.pretty_host != "accounts.google.com":
return
if "response_type=token" in req.pretty_url:
return
url = f"{req.pretty_url}".replace("response_type=code", "response_type=token")
async with httpx.AsyncClient(follow_redirects=True) as cli:
response = await cli.request(
method=req.method,
url=url,
headers=req.headers,
content=req.get_content(),
)
if response.status_code >= 400:
return
if "<b>400.</b>" in response.text:
return
if "response_type=token" in str(response.url):
report_vuln(
"Google Response Type Token",
f"Response type token allowed in {req.pretty_url}",
"HIGH",
str(response.url)
)

View file

@ -1,93 +1,109 @@
from mitmproxy import http from mitmproxy import http
import asyncio import asyncio
from pkce_check import PKCEDowngradeChecker from pkce_check import PKCEDowngradeChecker
from addon.scope_detection import ScopeDetection from ScopeDetection import ScopeDetection
from csrf_check import CsrfChecker from csrf_check import CsrfChecker
from client_secret import ClientSecret from nonce_check import NonceChecker
from addon.open_redirect_check import OpenRedirectChecker from cleintsecret_check import ClientSecretChecker
from redirect_uri_check import RedirectBypassChecker
from access_token import AccessTokenScanner from access_token import AccessTokenScanner
from addon.google_login_hint import GoogleLoginHint
from addon.google_response_type_token import GoogleResponseTypeToken
import os
from dotenv import load_dotenv
from lib.utils.try_catch import try_catch
from lib.false_true_varifing_task import FalseTrueVarifingTask
# Initialize the singleton task manager class PKCEAddon:
false_true_varifing_task = FalseTrueVarifingTask() def __init__(self):
self.checker = PKCEDowngradeChecker()
load_dotenv(override=True)
_open_redirect_checker = OpenRedirectChecker()
class AddonBase:
"""
Base class for addons.
Each addon should implement its own request or response method.
"""
def __init__(self) -> None:
if os.getenv('GOOGLE_ID'):
self.google_login_hint = GoogleLoginHint()
else:
self.google_login_hint = None
def should_ignore(self, flow: http.HTTPFlow) -> bool:
"""Check if the request should be ignored."""
ignore_domains = [
".googleapis.com",
"android.clients.google.com", # Added missing comma here
".adtrafficquality.google",
".googlesyndication.com",
"cdn.jsdelivr.net",
"update.googleapis.com",
".google-analytics.com",
".gstatic.com"
]
# Ignore .googleapis.com domains
for domain in ignore_domains:
if domain in flow.request.pretty_host:
return True
# Ignore static files (JS, CSS, fonts, images, etc.)
# Split on '?' to remove query parameters before checking extension
path = flow.request.path.split('?')[0].lower()
static_extensions = [
'.js', '.css', '.woff2', '.woff', '.ttf', '.otf', '.svg',
'.png', '.jpg', '.jpeg', '.gif', '.webp', '.ico', '.bmp',
'.tiff', '.tif', '.webm', '.mp4', '.avi', '.mov', '.pdf', '.md',
'.txt', '.csv'
]
if any(path.endswith(ext) for ext in static_extensions):
return True
return False
async def request(self, flow: http.HTTPFlow): async def request(self, flow: http.HTTPFlow):
if self.google_login_hint: print(
await try_catch(self.google_login_hint.request(flow)) f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}"
)
try:
await self.checker.test(flow)
except Exception as e:
print(f"[ERROR] Addon failed: {e}")
pass
if false_true_varifing_task.is_verifing_false_true():
return
tasks = [ class CsrfAddon:
try_catch(PKCEDowngradeChecker().test(flow)), def __init__(self):
] self.checker = CsrfChecker()
await asyncio.gather(*tasks)
async def response(self, flow: http.HTTPFlow): async def response(self, flow: http.HTTPFlow):
if false_true_varifing_task.is_verifing_false_true() or self.should_ignore(flow): try:
return await self.checker.response(flow)
except Exception as e:
print(f"[ERROR] CSRF Addon failed: {e}")
pass
tasks = [
try_catch(CsrfChecker().response(flow)),
try_catch(ScopeDetection().test(flow)),
try_catch(ClientSecret().test(flow)),
try_catch(AccessTokenScanner().scan(flow)),
try_catch(GoogleResponseTypeToken().test(flow)),
try_catch(_open_redirect_checker.test(flow)),
]
await asyncio.gather(*tasks)
addons = [AddonBase()] class ScopeAddon:
def __init__(self):
self.checker = ScopeDetection()
self._flow_map = {} # 요청 정보를 저장
async def request(self, flow: http.HTTPFlow):
self._flow_map[flow.id] = {
"method": flow.request.method,
"url": flow.request.pretty_url,
"query": flow.request.query,
}
async def response(self, flow: http.HTTPFlow):
try:
await self.checker.test(flow)
except Exception as e:
print(f"[ERROR] ScopeDetection failed: {e}")
class NonceAddon:
def __init__(self):
self.checker = NonceChecker()
async def response(self, flow: http.HTTPFlow):
try:
pass
# TODO id_token을 파싱하는 부분이 누락되어있습니다.
# await self.checker.check_nonce_in_id_token(flow)
except Exception as e:
print(f"[ERROR] NonceAddon failed: {e}")
pass
class ClientSecretAddon:
def __init__(self):
self.checker = ClientSecretChecker()
async def request(self, flow: http.HTTPFlow):
try:
self.checker.request(flow)
except Exception as e:
print(f"[ERROR] ClientSecretAddon request failed: {e}")
pass
async def response(self, flow: http.HTTPFlow):
try:
self.checker.response(flow)
except Exception as e:
print(f"[ERROR] ClientSecretAddon response failed: {e}")
pass
class AccessTokenAddon:
def __init__(self):
self.checker = AccessTokenScanner()
async def response(self, flow: http.HTTPFlow):
try:
await self.checker.scan(flow)
except Exception as e:
print(f"[ERROR] AccessToken Addon failed: {e}")
pass
class RedirectBypassAddon:
def __init__(self):
self.checker = RedirectBypassChecker()
# request 대신 response 로 바꿔 보세요:
async def response(self, flow: http.HTTPFlow):
try:
await self.checker.test(flow)
except Exception as e:
print(f"[ERROR] RedirectBypass Addon failed: {e}")
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), ClientSecretAddon(), AccessTokenAddon(), RedirectBypassAddon()]

View file

@ -1,8 +1,10 @@
import jwt import jwt
from urllib.parse import urlparse, parse_qs from urllib.parse import urlparse, parse_qs
from typing import Union from typing import Union
import httpx
from lib.report_vuln import report_vuln import lib.target as target
from lib.report import save_report
class NonceChecker: class NonceChecker:
def is_oidc_flow(self, flow) -> bool: def is_oidc_flow(self, flow) -> bool:
@ -70,13 +72,17 @@ class NonceChecker:
def check_nonce_in_id_token(self, flow, id_token: str) -> bool: def check_nonce_in_id_token(self, flow, id_token: str) -> bool:
decoded = self.decode_id_token(id_token) decoded = self.decode_id_token(id_token)
nonce = decoded.get("nonce") nonce = decoded.get("nonce")
req = flow.request
url = req.pretty_url
if not nonce: if not nonce:
report_vuln( report_data = [{
title="Nonce Check Failed", 'target': target.load(),
desc="id_token에 nonce가 없습니다.", 'status': "CRITICAL",
status="HIGH", 'title': "nonce is missing in id_token",
uri=flow.request.url 'description': "Nonce is present in the request but missing in the id_token.",
) 'uri': f"Original: {url}\nDecoded ID Token: {decoded}",
}]
save_report(report_data)
return False return False
else: else:
return True return True

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,10 @@
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import asyncio
import httpx import httpx
from typing import Dict, List from typing import Dict, List
from lib.report_vuln import report_vuln
import lib.target as target
from lib.report import save_report
class PKCEDowngradeChecker: class PKCEDowngradeChecker:
@ -55,19 +58,27 @@ class PKCEDowngradeChecker:
async def report_missing_parameters(self, url: str, is_openid: bool): async def report_missing_parameters(self, url: str, is_openid: bool):
status = "MEDIUM" if is_openid else "LOW" status = "MEDIUM" if is_openid else "LOW"
report_vuln( self.save(
title="PKCE Parameters Missing", [
desc="PKCE parameters are missing or incomplete.", self.make_report(
status=status, status,
uri=url, "PKCE Parameters Missing",
"PKCE parameters are missing or incomplete.",
url,
)
]
) )
async def report_plain_method(self, url: str): async def report_plain_method(self, url: str):
report_vuln( self.save(
title="PKCE Plain Method", [
desc="PKCE method is set to 'plain'. Possible downgrade.", self.make_report(
status="CRITICAL", "CRITICAL",
uri=url, "PKCE Plain Method",
"PKCE method is set to 'plain'. Possible downgrade.",
url,
)
]
) )
def create_downgraded_url(self, parsed, query): def create_downgraded_url(self, parsed, query):
@ -139,11 +150,15 @@ class PKCEDowngradeChecker:
else: else:
return # Likely safe return # Likely safe
report_vuln( self.save(
title=title, [
desc=description, self.make_report(
status=status, status,
uri=f"Original: {original_url}\nDowngraded: {downgraded_url}", title,
description,
f"Original: {original_url}\nDowngraded: {downgraded_url}",
)
]
) )
def same_redirect_destination(self, orig_loc, down_loc): def same_redirect_destination(self, orig_loc, down_loc):
@ -151,3 +166,16 @@ class PKCEDowngradeChecker:
down = urlparse(down_loc) down = urlparse(down_loc)
return orig.netloc == down.netloc and orig.path == down.path return orig.netloc == down.netloc and orig.path == down.path
def make_report(
self, status: str, title: str, description: str, uri: str
) -> Dict[str, str]:
return {
"target": target.load(),
"status": status,
"title": title,
"description": description,
"uri": uri,
}
def save(self, report_data: List[Dict[str, str]]):
save_report(report_data)

202
addon/redirect_uri_check.py Normal file
View file

@ -0,0 +1,202 @@
from mitmproxy import http
import aiohttp
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import lib.target as target
from lib.report import save_report
class BypassPayload:
""" 우회 패턴 정의 """
def __init__(self, name: str, mutate_func, description: str):
self.name = name
self.mutate = mutate_func #우회 url 만드는 함수
self.description = description
class RedirectBypassChecker:
def __init__(self):
""" 우회 페이로드 목록 """
self.bypass_payloads = [
BypassPayload(
name=r"@",
mutate_func=self._mutate_pattern1,
description=r"@ 기호를 이용한 호스트 우회 공격: evil.com@target.com"
),
]
self.session = None
""" 우회 URL 생성 목록 """
# 1. @
def _mutate_pattern1(self, original: str) -> str:
parsed = urlparse(original)
mutated = f"https://evil.com@{parsed.netloc}{parsed.path}"
print(f"[redirect_uri_check] original: {original} → mutated: {mutated}")
return mutated
'''aiohttp 세션 생성 (재사용)'''
async def _get_session(self):
if self.session is None:
timeout = aiohttp.ClientTimeout(total=10)
self.session = aiohttp.ClientSession(timeout=timeout)
return self.session
'''세션 정리'''
async def close_session(self):
if self.session:
await self.session.close()
self.session = None
""" 우회된 URL에 GET 요청을 보내고, 서버 응답 정보 반환 """
async def _send_request(self, url, headers=None):
try:
session = await self._get_session() # 세션 준비
request_headers = headers or {} # 요청 헤더 설정 - headers가 주어지지 않았다면 빈 딕셔너리 사용
# 서버에 GET 요청 전송
async with session.get(url, allow_redirects=False, headers=request_headers) as response:
return {
'status': response.status,
'location': response.headers.get("Location", ""),
'headers': dict(response.headers)
}
except Exception as e:
print(f"[ERROR] 요청 실패 ({url}): {e}")
return {'status': 500, 'location': '', 'headers': {}}
""" redirect_uri가 기준 도메인(baseUrl)에 속하는지 판단 """
def _is_baseline_valid(self, redirect_uri: str, base_url: str) -> bool:
try:
redirect_parsed = urlparse(redirect_uri)
base_parsed = urlparse(base_url)
redirect_host = redirect_parsed.hostname
base_host = base_parsed.hostname
if not redirect_host or not base_host:
return False
if "@" in redirect_uri:
if redirect_host != base_host:
print(f"[ALERT] 우회 공격 탐지: {redirect_host} != {base_host}")
return False
at_parts = redirect_uri.split('@')
if len(at_parts) > 1:
before_at = at_parts[0]
if '//' in before_at:
potential_domain = before_at.split('//')[-1]
if '.' in potential_domain and potential_domain != base_host:
print(f"[CRITICAL] @ 우회 공격: {potential_domain}@{base_host}")
return False
is_valid = (redirect_host == base_host or redirect_host.endswith(f".{base_host}"))
return is_valid
except Exception as e:
print(f"[ERROR] 도메인 검증 실패: {e}")
return False
""" Location 헤더에서 authorization code 추출 """
def _extract_code_from_location(self, location: str) -> str:
if not location:
return ""
try:
parsed = urlparse(location)
query = parse_qs(parsed.query)
return query.get('code', [''])[0]
except:
return ""
""" 메인 테스트 함수 - mitmproxy flow 처리 """
async def test(self, flow: http.HTTPFlow):
url = flow.request.pretty_url
parsed = urlparse(url)
query = parse_qs(parsed.query)
if "redirect_uri" not in query:
return
original_redirect_uri = query["redirect_uri"][0]
print(f"[DEBUG] 원본 redirect_uri: {original_redirect_uri}")
for payload in self.bypass_payloads:
try:
await self._test_bypass_pattern(
url, query, parsed, original_redirect_uri, payload, headers={}
)
except Exception as e:
print(f"[ERROR] 우회 패턴 테스트 실패 ({payload.name}): {e}")
continue
""" 개별 우회 패턴 테스트 """
async def _test_bypass_pattern(self, original_url, query, parsed_url, original_redirect_uri, payload, headers):
print(f"[SCAN] 우회 패턴 테스트: {payload.name}")
# 우회 URL 생성
bypassed_uri = payload.mutate(original_redirect_uri)
# 새로운 쿼리 파라미터 구성
modified_query = query.copy()
modified_query["redirect_uri"] = [bypassed_uri]
new_query_string = urlencode(modified_query, doseq=True)
test_url = urlunparse(parsed_url._replace(query=new_query_string))
# 요청 전송
response = await self._send_request(test_url, headers)
# 응답 분석
await self._analyze_response(original_url, test_url, bypassed_uri, response, payload)
""" 응답 분석 및 취약점 판단 """
async def _analyze_response(self, original_url, test_url, bypassed_uri, response, payload):
status = response['status']
location = response['location']
# 리다이렉트 응답이 아니면 스킵
if status not in [301, 302, 303, 307, 308]:
return
# Location 헤더에서 code 추출
auth_code = self._extract_code_from_location(location)
if auth_code and not self._is_baseline_valid(bypassed_uri, original_url):
# 취약점 발견 시에만 로그
print(f"[🎯 VULNERABILITY] {payload.name} 우회 성공!")
await self._report_vulnerability(original_url, test_url, bypassed_uri, location, auth_code, payload)
""" 취약점 보고서 생성 """
async def _report_vulnerability(self, original_url, test_url, bypassed_uri, location, auth_code, payload):
# payload가 문자열인지 객체인지 확인
if hasattr(payload, 'name'):
pattern_name = payload.name
pattern_description = payload.description
else:
pattern_name = str(payload)
pattern_description = "Unknown bypass pattern"
description = (
f"Redirect URI 우회 취약점 발견!\n\n"
f"-- 상세 정보 --:\n"
f"• 우회 패턴: {pattern_name}\n"
f"• 설명: {pattern_description}\n"
f"• 원본 URL: {original_url}\n"
f"• 우회된 redirect_uri: {bypassed_uri}\n"
f"• 테스트 URL: {test_url}\n"
f"• 리다이렉트 위치: {location}\n"
f"• 발급된 인가 코드: {auth_code[:10]}...\n\n"
)
report_data = [{
"target": target.load(),
"status": "CRITICAL",
"title": "Redirect URI Bypass Vulnerability",
"description": description,
"uri": test_url # uri 필드 추가
}]
save_report(report_data)
print(f"[🎯 CRITICAL] Redirect URI 우회 취약점 발견 및 보고 완료!")
print(f"[INFO] 패턴: {pattern_name}, 우회 URI: {bypassed_uri}")

View file

@ -1,32 +0,0 @@
from lib.report_vuln import report_vuln
from lib.utils.is_oauth_uri import is_oauth_uri
from urllib.parse import urlparse, parse_qs
class ScopeDetection:
def get_scope_from_query(self, query: str) -> str | None:
if not query:
return None
parsed = parse_qs(query)
scope_values = parsed.get("scope", [])
if scope_values:
return scope_values[0]
return None
async def test(self, flow):
if not is_oauth_uri(flow.request.pretty_url):
return
req = flow.request
parsed = urlparse(req.pretty_url)
query = parsed.query
query_scope = self.get_scope_from_query(query)
if query_scope in ["all", "*"]:
report_vuln(
title="OAuth Scope Value Issue",
desc=f"Scope value issue detected in request: {query_scope}",
status="WARNING",
uri=req.pretty_url
)

View file

@ -1,65 +0,0 @@
from typing import Any
from copy import deepcopy
class FalseTrueVarifingTask:
"""
A singleton class representing a task that can be either false or true.
This class is used to handle tasks that require verification of their truth value.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(FalseTrueVarifingTask, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._is_verifing = False
self.task_queue = []
self._initialized = True
def reset(self):
"""
Reset the task queue and verification status.
"""
self._is_verifing = False
self.task_queue.clear()
# 각 addon의 검증 로직에서 해당 함수를 호출하여, 추후 오탐 검증을 위한 작업을 추가할 수 있습니다.
# TODO: 모델 지정해두기
def add_task(self, task_name: str, initial_uri: str, data: Any):
"""
Add a task to the task queue.
:param task: The task to be added.
"""
self.task_queue.append(
{
"task_name": task_name,
"initial_uri": initial_uri,
"data": data
}
)
def start_verification(self):
"""
Start the verification process for the tasks in the queue.
"""
self._is_verifing = True
def get_task_queue(self):
"""
Get a copy of the current task queue.
:return: A copy of the task queue.
"""
return deepcopy(self.task_queue)
def is_verifing_false_true(self):
"""
Get the current verification status.
:return: True if verification is in progress, False otherwise.
"""
return self._is_verifing

View file

@ -1,17 +1,15 @@
# save as data/report.csv # save as data/report.csv
import os import os
import csv import csv
from mitmproxy import http from typing import List, Dict, Any
import lib.cur_target_url as cur_target_url
# target, status, title, description, uri # target, status, title, description, uri
# file path는 'data/report.csv'로 고정 # file path는 'data/report.csv'로 고정
def report_vuln(title: str, desc: str, status: str, uri: str) -> None: def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report.csv') -> None:
file_path: str = 'data/report.csv'
os.makedirs(os.path.dirname(file_path), exist_ok=True) os.makedirs(os.path.dirname(file_path), exist_ok=True)
""" """
report_data 안의 레포트를 줄씩 CSV에 추가로 저장합니다. report_data 안의 레포트를 줄씩 CSV에 추가로 저장합니다.
파일이 없으면 헤더를 먼저 쓰고, 있으면 레코드만 이어서 씁니다. 파일이 없으면 헤더를 먼저 쓰고, 있으면 레코드만 이어서 씁니다.
@ -25,10 +23,10 @@ def report_vuln(title: str, desc: str, status: str, uri: str) -> None:
if not file_exists: if not file_exists:
writer.writeheader() writer.writeheader()
writer.writerow({ for row in report_data:
'target': cur_target_url.load(), # None 방지 & 줄바꿈 이스케이프
'status': status, escaped = {
'title': title, k: str(v).replace('\n', '\\n') if v is not None else ''
'description': desc, for k, v in row.items()
'uri': uri, }
}) writer.writerow(escaped)

View file

@ -1,10 +0,0 @@
from urllib.parse import urlparse, parse_qs
def is_oauth_uri(uri: str) -> bool:
qs = parse_qs(urlparse(uri).query)
qs_keys = [*qs]
if "client_id" in qs_keys and any(p in qs_keys for p in (
"redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")):
return True
return False

View file

@ -1,5 +0,0 @@
async def try_catch(coro):
try:
return await coro
except Exception as e:
print(f"[ERROR] {coro} failed: {e}")

30
main.py
View file

@ -1,23 +1,21 @@
from runner.proxy import run_proxy from runner.proxy import run_proxy
import subprocess
import threading import threading
import uvicorn
from runner.backend import app
def run_fastapi_server():
"""FastAPI 서버를 실행하는 함수"""
uvicorn.run(app, host="localhost", port=11081, log_level="info")
if __name__ == "__main__": if __name__ == "__main__":
try: # Start web server in a separate thread
# FastAPI 서버를 백그라운드 스레드에서 실행 server_process = subprocess.Popen([
fastapi_thread = threading.Thread(target=run_fastapi_server, daemon=True) "granian",
fastapi_thread.start() "--interface", "asgi",
print("🚀 FastAPI server started on http://localhost:11081") "--host", "0.0.0.0",
"--port", "11081",
"--loop", "asyncio",
"--reload",
"runner.backend:app",
])
# Run mitmdump proxy (메인 스레드에서 실행) try:
print("🛡️ Starting mitmdump proxy on port 11080...") # Run mitmdump proxy
run_proxy() run_proxy()
except KeyboardInterrupt:
print("🛑 Shutting down...")
finally: finally:
print("✅ Mitmdump proxy has been stopped.") server_process.terminate()

View file

@ -1,107 +1,17 @@
from fastapi import FastAPI, Query, HTTPException from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import Response from fastapi.responses import Response
import lib.cur_target_url as cur_target_url import lib.target as target
from lib.false_true_varifing_task import FalseTrueVarifingTask
from pydantic import BaseModel, Field
from lib.report_vuln import report_vuln as save_vuln
# Initialize the singleton task manager
false_true_varifing_task = FalseTrueVarifingTask()
app = FastAPI() app = FastAPI()
@app.post("/start")
@app.post( async def start(url: str = Query(None)):
"/start", if url:
summary="취약점 검증을 위한 대상 URL 설정", target.save(url)
description=""" print(f"Target URL set to: {url}")
엔드포인트는 시스템이 취약점 검증 작업에 사용할 대상 URL을 설정합니다. return {"message": f"Target URL set to: {url}"}
return {"error": "No URL provided"}
유효한 URL이 제공되면:
- 해당 URL이 저장됩니다.
- 검증 작업 큐가 초기화됩니다.
- 새로운 검증 작업을 시작할 준비가 완료됩니다.
URL이 제공되지 않으면, 오류가 반환됩니다.
"""
,
tags=["1st STEP"]
)
async def start(url: str = Query(..., description="The URL to target for vulnerability verification")):
cur_target_url.save(url)
false_true_varifing_task.reset()
return {"message": f"Target URL set to: {url}"}
@app.post(
"/start-false-true-verifing",
summary="시스템에 오탐 검증 작업 시작을 알림",
description="""
엔드포인트는 시스템에 오탐 검증 작업이 시작되었음을 알립니다.
또한 시스템은 미리 준비된 오탐 검증 작업 목록을 반환합니다.
```json
{
"payload": [
{
"task_name": "pkce_task", # 검증 작업의 이름
"initial_uri": "http://auth.example.com", # browser가 처음 접속할 URI
"data": any # 추가 데이터
},
...
]
}
```
""",
tags=["2nd STEP"]
)
async def start_false_true_verifing():
false_true_varifing_task.start_verification()
task_queue = false_true_varifing_task.get_task_queue()
return {"payload": task_queue}
class VulnerabilityReport(BaseModel):
title: str = Field(..., description="Short title for the vulnerability")
url: str = Field(..., description="URL where the vulnerability was discovered")
status: str = Field(..., description="Status of the vulnerability (e.g., VERIFIED-CRITICAL)")
desc: str = Field(..., description="Detailed description of the issue")
@app.post(
"/report-vuln",
summary="취약점 보고",
description="""
정탐인 취약점을 시스템에 보고합니다.
보고 다음 정보를 포함해야 합니다:
- **title**: 취약점의 간단한 이름
- **url**: 취약점이 발견된 위치 (URL)
- **status**: 심각도
- **desc**: 취약점에 대한 상세 설명
""",
tags=["3rd STEP"]
)
async def report_vuln(vuln: VulnerabilityReport):
save_vuln(
title=vuln.title,
desc=vuln.desc,
status=vuln.status,
uri=vuln.url
)
return {"message": "Vulnerability reported successfully"}
@app.exception_handler(404) @app.exception_handler(404)