mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 05:31:51 +09:00
Compare commits
71 commits
feature/ac
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3018b4fd23 |
||
|
|
27e7a290ba | ||
|
|
a4b14ab20f | ||
|
|
8e0523e734 | ||
|
|
7d378fa91f | ||
|
|
905db47d8a | ||
|
|
182ea21178 | ||
|
61e4ed6119 |
|||
|
|
8ef13de441 | ||
|
|
9898f215f3 | ||
|
|
a3b54028b7 |
||
|
|
e2ee91034d | ||
|
|
1a97b9d403 |
||
|
|
cf5746685a | ||
|
c8815f3f28 |
|||
|
|
a1758a60d4 | ||
|
|
4758d7a689 |
||
| 87d5b0209c | |||
| 5edab9244c | |||
|
|
949b156f19 |
||
|
|
c20bcdebf3 | ||
|
|
6e5c37423c |
||
|
|
0d81fdd49f | ||
|
|
00c81f365a |
||
|
|
58d5deb435 | ||
|
|
05a095df7d | ||
|
|
4deb032708 | ||
|
|
3c5db3c1fd | ||
|
|
53db0fb14e | ||
|
|
3a1422a2f2 | ||
|
|
062552d3d8 | ||
|
|
afcfd7de87 |
||
|
|
1c6fc53a81 | ||
|
|
6dceba0c24 | ||
|
|
0bee707406 |
||
| 69622e4648 | |||
|
e063dadb72 |
|||
| 897173ba46 | |||
| c511b3bfd7 | |||
|
9071ed11b7 |
|||
|
5d1624a96a |
|||
|
ba277ccec1 |
|||
|
|
3af5787064 | ||
|
|
0c7994a52f | ||
|
|
9a14872964 | ||
|
|
b221c4a9e6 | ||
|
|
990eb1b643 | ||
|
|
c593a92b11 | ||
|
|
cf3bfee039 | ||
|
|
32efcbe1a0 | ||
|
|
3850b0de2f | ||
|
|
00e3958300 | ||
|
|
40867acb26 | ||
|
|
c311aaad71 | ||
|
|
05bbdc65c1 | ||
|
1b3f58b432 |
|||
|
99fc280517 |
|||
|
12d0ed73ff |
|||
|
db514172dc |
|||
|
|
ba6064c378 | ||
|
|
57625307a7 | ||
|
|
ef61667cfe | ||
|
30e2730cb1 |
|||
|
5b2dec4db8 |
|||
|
|
7ac749fa36 | ||
|
|
0be13ec5f2 | ||
|
|
367a7156bf | ||
|
|
aa8bf95a5c | ||
|
59b3d7d9d2 |
|||
|
|
31ca96f037 | ||
|
|
eda0c5a679 |
23 changed files with 2438 additions and 349 deletions
2
.env
Normal file
2
.env
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
# Google OAuth 설정
|
||||
GOOGLE_ID=whs.imnya.ng@gmail.com
|
||||
52
.github/workflows/ci.yml
vendored
Normal file
52
.github/workflows/ci.yml
vendored
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
name: CI/CD Pipeline
|
||||
|
||||
on:
|
||||
push:
|
||||
pull_request:
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
python-version: [3.13]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Install uv
|
||||
uses: astral-sh/setup-uv@v6
|
||||
with:
|
||||
enable-cache: true
|
||||
cache-dependency-glob: "uv.lock"
|
||||
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
run: uv python install ${{ matrix.python-version }}
|
||||
|
||||
- name: Install dependencies
|
||||
run: uv sync
|
||||
|
||||
- name: Set up environment variables
|
||||
run: |
|
||||
echo "GOOGLE_ID=bot.imnya.ng@gmail.com" > .env
|
||||
|
||||
- name: Start application and run proxy test
|
||||
run: |
|
||||
# Start the application in background
|
||||
uv run main.py &
|
||||
APP_PID=$!
|
||||
|
||||
# Wait for application to start
|
||||
sleep 5
|
||||
|
||||
# Test proxy functionality
|
||||
sudo cp ~/.mitmproxy/mitmproxy-ca-cert.pem /usr/local/share/ca-certificates/mitmproxy-ca-cert.crt
|
||||
sudo update-ca-certificates
|
||||
|
||||
mkdir data
|
||||
echo https://github.com > ./data/target.dump
|
||||
curl -k -x https://localhost:11080 "https://github.com/login/oauth/authorize?client_id=Ov23lixietSCQOHxPvcr&redirect_uri=http://localhost:8787&scope=read:user+user:email&response_type=code&code_challenge=abc123&code_challenge_method=S256"
|
||||
|
||||
# Clean up
|
||||
kill $APP_PID
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -9,6 +9,8 @@ wheels/
|
|||
# Virtual environments
|
||||
.venv
|
||||
|
||||
.env
|
||||
|
||||
data/
|
||||
|
||||
|
||||
|
|
|
|||
76
README.md
76
README.md
|
|
@ -1,5 +1,7 @@
|
|||
# 환경 설정
|
||||
|
||||
## Python Virtual Environment
|
||||
|
||||
이 프로젝트는 [uv](https://docs.astral.sh/uv/getting-started/installation/)라는 Python 패키지 관리자를 사용하여 설정해야합니다.
|
||||
|
||||
uv 설치 후 다음과 같은 명령어를 입력합니다.
|
||||
|
|
@ -8,8 +10,33 @@ uv 설치 후 다음과 같은 명령어를 입력합니다.
|
|||
uv sync
|
||||
```
|
||||
|
||||
## Environment
|
||||
|
||||
venv와 패키지가 설치가 됩니다.
|
||||
|
||||
.env.example을 복사하여 .env를 붙여넣습니다.
|
||||
|
||||
`GOOGLE_ID=`에 봇에서 쓸 구글 계정의 전체 GMail을 기입합니다.
|
||||
|
||||
입력하지 않는다면 Google OAuth시 자동적으로 넘어가지 않을 수도 있습니다.
|
||||
|
||||
---
|
||||
|
||||
> [oauth-backend](https://github.com/j93es/oauth-backend) 프록시를 사용한다면 이 가이드에 따라 인증서 또한 설정되어야만 합니다.
|
||||
>
|
||||
> 그렇지 않으면 실행되지 않습니다.
|
||||
>
|
||||
> 윈도우 환경에서는 `sudo certutil -addstore root mitmproxy-ca-cert.cer`로 인증합니다.
|
||||
>
|
||||
> Sudo가 활성화되어있지 않은 환경에서는 관리자로 상향된 쉘에서 실행합니다.
|
||||
>
|
||||
> MacOS 환경에서는 `sudo security add-trusted-cert -d -p ssl -p basic -k /Library/Keychains/System.keychain ~/.mitmproxy/mitmproxy-ca-cert.pem`으로 인증합니다.
|
||||
>
|
||||
> 다른 플렛폼은 수동으로 설정되어야만 합니다.
|
||||
> https://docs.mitmproxy.org/stable/concepts/certificates/
|
||||
|
||||
---
|
||||
|
||||
# 실행 방법
|
||||
|
||||
```
|
||||
|
|
@ -17,46 +44,51 @@ uv run main.py
|
|||
```
|
||||
|
||||
이러면 http(s)://localhost:11080로 서버가 열리게 됩니다.
|
||||
http://localhost:11081로 백엔드 서버가 열리게 됩니다.
|
||||
|
||||
# 기여 방법
|
||||
|
||||
`./addon/init.py`
|
||||
|
||||
```py
|
||||
from example_check import Example
|
||||
|
||||
class LoggerAddon:
|
||||
def __init__(self):
|
||||
self.checker = Example()
|
||||
|
||||
def request(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것
|
||||
self.checker.test(flow)
|
||||
def response(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것
|
||||
self.checker.test(flow)
|
||||
...
|
||||
async def request(self, flow: http.HTTPFlow):
|
||||
if false_true_varifing_task.is_verifing_false_true():
|
||||
return
|
||||
|
||||
tasks = [
|
||||
try_catch(self.google_login_hint.request(flow)) if self.google_login_hint else None,
|
||||
try_catch(PKCEDowngradeChecker().test(flow)),
|
||||
try_catch(Example().test(flow))
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
...
|
||||
```
|
||||
|
||||
`./addon/example.py`
|
||||
|
||||
```py
|
||||
import lib.target as target
|
||||
from lib.report import save_report
|
||||
from lib.report_vuln import report_vuln
|
||||
|
||||
class Example:
|
||||
async def test(self, flow):
|
||||
req = flow.request
|
||||
method = req.method
|
||||
url = req.pretty_url
|
||||
|
||||
# data/report.csv에 저장
|
||||
report_data = [{
|
||||
'target': target.load(),
|
||||
'status': "CRITICAL",
|
||||
'title': "PKCE Downgrade Vulnerability",
|
||||
'description': "PKCE downgrade vulnerability detected! Both URLs returned authorization code.",
|
||||
'uri': f"Original: {url}\nDowngraded: {downgraded_url}"
|
||||
}]
|
||||
save_report(report_data)
|
||||
report_vuln(
|
||||
title="PKCE Plain Method",
|
||||
desc="PKCE method is set to 'plain'. Possible downgrade.",
|
||||
status="CRITICAL",
|
||||
uri=url,
|
||||
)
|
||||
```
|
||||
|
||||
이러한 예제를 참고하여 작성하여주세요.
|
||||
|
||||
# 백엔드 API DOCS
|
||||
|
||||
`uv run main.py`으로 백엔드를 실행한 후에, 다음의 url에 접속합니다.
|
||||
|
||||
```
|
||||
http://localhost:11081/redoc
|
||||
```
|
||||
|
|
|
|||
|
|
@ -1,53 +0,0 @@
|
|||
import lib.target as target
|
||||
from lib.report import save_report
|
||||
|
||||
class ScopeDetection:
|
||||
def get_scope_from_query(self, query: str) -> str | None:
|
||||
if not query:
|
||||
return None
|
||||
import urllib.parse
|
||||
parsed = urllib.parse.parse_qs(query)
|
||||
scope_values = parsed.get("scope", [])
|
||||
if scope_values:
|
||||
return scope_values[0]
|
||||
return None
|
||||
|
||||
async def check_scope(self, flow):
|
||||
req = flow.request
|
||||
res = flow.response
|
||||
|
||||
# req.query가 MultiDictView일 수 있으므로 문자열로 변환
|
||||
if hasattr(req.query, "urlencode"):
|
||||
query = req.query.urlencode()
|
||||
else:
|
||||
query = str(req.query) if req.query else ""
|
||||
|
||||
location = res.headers.get("location", "")
|
||||
|
||||
query_scope = self.get_scope_from_query(query)
|
||||
location_scope = self.get_scope_from_query(location)
|
||||
|
||||
result = []
|
||||
if query_scope in ["all", "*"]:
|
||||
result.append(f"Scope value issue detected in request: {query_scope}")
|
||||
if location_scope in ["all", "*"]:
|
||||
result.append(f"Scope value issue detected in response location: {location_scope}")
|
||||
|
||||
return result if result else 0
|
||||
|
||||
async def test(self, flow):
|
||||
req = flow.request
|
||||
method = req.method
|
||||
url = req.pretty_url
|
||||
|
||||
result = await self.check_scope(flow)
|
||||
|
||||
if result != 0:
|
||||
report_data = [{
|
||||
'target': target.load(),
|
||||
'status': "WARNING",
|
||||
'title': "OAuth scope value issue",
|
||||
'description': f"{method} {url}: {', '.join(result)}",
|
||||
'uri': url
|
||||
}]
|
||||
save_report(report_data)
|
||||
|
|
@ -1,23 +1,10 @@
|
|||
import re
|
||||
from dataclasses import dataclass, asdict
|
||||
from typing import List, Dict, Optional, Any
|
||||
import asyncio
|
||||
|
||||
from typing import Optional, Any
|
||||
from mitmproxy.http import HTTPFlow
|
||||
|
||||
import lib.target as target
|
||||
from lib.report import save_report
|
||||
|
||||
# 결과 리포트 저장용 데이터 클래스
|
||||
@dataclass
|
||||
class TokenLeakResult:
|
||||
title: str
|
||||
description: str
|
||||
uri: str
|
||||
status: str = "MEDIUM" # 기본 상태
|
||||
|
||||
def to_report(self, target_value) -> Dict[str, str]:
|
||||
"""리포트 저장 포맷(dict)으로 변환"""
|
||||
return {"target": target_value, **asdict(self)}
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
from lib.report_vuln import report_vuln
|
||||
|
||||
|
||||
# 요청/응답에서 액세스 토큰 노출 여부를 검사하는 스캐너
|
||||
|
|
@ -26,31 +13,24 @@ class AccessTokenScanner:
|
|||
async def scan(self, flow: HTTPFlow) -> None:
|
||||
"""단일 HTTPFlow(request + response)에 대해 요청과 응답을 모두 검사."""
|
||||
print(f"[TOKENDEBUG] Request URL: {flow.request.url}")
|
||||
findings: List[TokenLeakResult] = []
|
||||
|
||||
findings.extend(await self._scan_request(flow.request))
|
||||
findings.extend(await self._scan_response(flow.response, flow.request.url))
|
||||
|
||||
if findings:
|
||||
target_value = target.load()
|
||||
save_report([f.to_report(target_value) for f in findings])
|
||||
|
||||
async_gather = []
|
||||
async_gather.append(self._scan_request(flow.request))
|
||||
async_gather.append(self._scan_response(flow.response, flow.request.url))
|
||||
await asyncio.gather(*async_gather)
|
||||
|
||||
# 내부 구현
|
||||
async def _scan_request(self, request: Any) -> List[TokenLeakResult]:
|
||||
results: List[TokenLeakResult] = []
|
||||
async def _scan_request(self, request: Any):
|
||||
|
||||
print("[TOKENDEBUG] ==scan request==")
|
||||
# URL 검사
|
||||
token_result = self._extract_token(request.url)
|
||||
if token_result:
|
||||
token, has_bearer = token_result
|
||||
results.append(
|
||||
TokenLeakResult(
|
||||
title="Token Leak in Request URL",
|
||||
description=f"요청 URL에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…",
|
||||
uri=request.url,
|
||||
status="MEDIUM" if has_bearer else "LOW"
|
||||
)
|
||||
if self._is_implicit_flow(request.url):
|
||||
print("[TOKENDEBUG] OAuth Implicit Flow detected.")
|
||||
report_vuln(
|
||||
title="Token Leak in Request URL",
|
||||
desc="취약한 Grant Type입니다 (Implicit Grant Type)",
|
||||
status="MEDIUM",
|
||||
uri=request.url
|
||||
)
|
||||
|
||||
# Body 검사 (텍스트 컨텐츠인 경우)
|
||||
|
|
@ -58,54 +38,40 @@ class AccessTokenScanner:
|
|||
body_text = request.get_text(strict=False)
|
||||
token_result = self._extract_token(body_text)
|
||||
if token_result:
|
||||
token, has_bearer = token_result
|
||||
results.append(
|
||||
TokenLeakResult(
|
||||
title="Token Leak in Request Body",
|
||||
description=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token[:20]}…",
|
||||
uri=request.url,
|
||||
status="MEDIUM" if has_bearer else "LOW"
|
||||
)
|
||||
report_vuln(
|
||||
title="Token Leak in Request Body",
|
||||
desc=f"요청 본문에 토큰이 포함되어 있습니다 (앞 20자): {token_result[:20]}…",
|
||||
status="LOW",
|
||||
uri=request.url
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
async def _scan_response(self, response: Optional[Any], request_url: str) -> List[TokenLeakResult]:
|
||||
async def _scan_response(self, response: Optional[Any], request_url: str):
|
||||
if response is None:
|
||||
return []
|
||||
return
|
||||
|
||||
results: List[TokenLeakResult] = []
|
||||
print("[TOKENDEBUG] ==scan response==")
|
||||
# Location 헤더 검사 (리다이렉트)
|
||||
if location_header := response.headers.get("Location"):
|
||||
token_result = self._extract_token(location_header)
|
||||
if token_result:
|
||||
token, has_bearer = token_result
|
||||
if has_bearer:
|
||||
results.append(
|
||||
TokenLeakResult(
|
||||
title="Token Leak in Redirect URL (Location header)",
|
||||
description=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…",
|
||||
uri=location_header,
|
||||
)
|
||||
)
|
||||
report_vuln(
|
||||
title="Token Leak in Redirect URL (Location header)",
|
||||
desc=f"Location 헤더에 토큰이 노출되었습니다 (앞 20자): {token_result[:20]}…",
|
||||
status="MEDIUM",
|
||||
uri=location_header,
|
||||
)
|
||||
|
||||
# Body 검사 (텍스트 컨텐츠인 경우)
|
||||
if response.content:
|
||||
body_text = response.get_text(strict=False)
|
||||
token_result = self._extract_token(body_text)
|
||||
if token_result:
|
||||
token, has_bearer = token_result
|
||||
if has_bearer:
|
||||
results.append(
|
||||
TokenLeakResult(
|
||||
title="Token Leak in Response Body",
|
||||
description=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token[:20]}…",
|
||||
uri=request_url,
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
report_vuln(
|
||||
title="Token Leak in Response Body",
|
||||
desc=f"응답 본문에 토큰이 노출되었습니다 (앞 20자): {token_result[:20]}…",
|
||||
status="LOW",
|
||||
uri=request_url,
|
||||
)
|
||||
|
||||
# 토큰 탐지 키워드드
|
||||
_TOKEN_KEYS = [
|
||||
|
|
@ -115,8 +81,6 @@ class AccessTokenScanner:
|
|||
"refreshtoken",
|
||||
"auth_token",
|
||||
"session_token",
|
||||
"secret_token",
|
||||
"ssoauth",
|
||||
]
|
||||
|
||||
# "bearer" 표시가 동시에 존재할 때만 토큰으로 판단하여 false positive를 줄임
|
||||
|
|
@ -150,6 +114,38 @@ class AccessTokenScanner:
|
|||
if (m := pattern.search(text)) and m.group(1):
|
||||
print(f"[TOKENDEBUG] token: {m.group(1)}")
|
||||
print(f"[TOKENDEBUG] has_bearer: {has_bearer}")
|
||||
return m.group(1), has_bearer
|
||||
if has_bearer:
|
||||
return m.group(1)
|
||||
print("[TOKENDEBUG] No matched.")
|
||||
return None
|
||||
|
||||
def _is_implicit_flow(self, request_url: str) -> bool:
|
||||
"""
|
||||
URL의 파라미터에서 OAuth Implicit Flow 패턴을 체크합니다.
|
||||
|
||||
Args:
|
||||
request_url: 체크할 요청 URL
|
||||
|
||||
Returns:
|
||||
bool: client_id, redirect_uri, response_type이 모두 존재하고
|
||||
response_type 값이 'token'인 경우 True, 그렇지 않으면 False
|
||||
"""
|
||||
try:
|
||||
parsed_url = urlparse(request_url)
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
|
||||
# 필요한 파라미터들이 모두 존재하는지 확인
|
||||
required_params = ['redirect_uri', 'response_type']
|
||||
|
||||
for param in required_params:
|
||||
if param not in query_params:
|
||||
return False
|
||||
|
||||
# response_type 값이 'token'인지 확인
|
||||
response_type_values = query_params.get('response_type', [])
|
||||
|
||||
# response_type 파라미터가 존재하고 값 중에 'token'이 있는지 확인
|
||||
return 'token' in response_type_values or 'id_token' in response_type_values
|
||||
|
||||
except Exception:
|
||||
return False
|
||||
29
addon/client_secret.py
Normal file
29
addon/client_secret.py
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
from lib.report_vuln import report_vuln
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
class ClientSecret:
|
||||
def get_target_from_query(self, query: str, target: str) -> str | None:
|
||||
if not query:
|
||||
return None
|
||||
parsed = parse_qs(query)
|
||||
scope_values = parsed.get(target, [])
|
||||
if scope_values:
|
||||
return scope_values[0]
|
||||
return None
|
||||
|
||||
async def test(self, flow):
|
||||
req = flow.request
|
||||
|
||||
parsed = urlparse(req.pretty_url)
|
||||
query = parsed.query
|
||||
|
||||
query_client_id = self.get_target_from_query(query, "client_id")
|
||||
query_client_secret = self.get_target_from_query(query, "client_secret")
|
||||
|
||||
if query_client_id and query_client_secret:
|
||||
report_vuln(
|
||||
title="OAuth Client Secret Exposure",
|
||||
desc=f"Client ID and Secret found in request: {query_client_id}, {query_client_secret}",
|
||||
status="CRITICAL",
|
||||
uri=req.pretty_url
|
||||
)
|
||||
|
|
@ -1,26 +1,17 @@
|
|||
# csrf_check.py
|
||||
from mitmproxy import http, ctx
|
||||
from mitmproxy import http
|
||||
from urllib.parse import urlparse, parse_qs, unquote
|
||||
import httpx
|
||||
from typing import Optional, Union, List
|
||||
|
||||
import lib.target as target
|
||||
from lib.report import save_report
|
||||
from lib.report_vuln import report_vuln
|
||||
from lib.utils.is_oauth_uri import is_oauth_uri
|
||||
|
||||
class CsrfChecker:
|
||||
nonce_params = {
|
||||
"state", "nonce", "as", "frame_id", "csrf_token", "csrf"
|
||||
"state", "nonce", "csrf_token", "csrf"
|
||||
}
|
||||
|
||||
def is_oauth_uri(self, uri: str) -> bool:
|
||||
qs = parse_qs(urlparse(uri).query)
|
||||
qs_keys = [*qs]
|
||||
|
||||
if "client_id" in qs_keys and any(p in qs_keys for p in (
|
||||
"redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")):
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_header(self, headers: http.Headers, name: str) -> Optional[str]:
|
||||
# mitmproxy Headers는 case-insensitive
|
||||
raw = headers.get(name)
|
||||
|
|
@ -40,7 +31,7 @@ class CsrfChecker:
|
|||
def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool:
|
||||
code = flow.response.status_code
|
||||
loc = self.get_header(flow.response.headers, "location") or ""
|
||||
return 300 <= code < 400 and self.is_oauth_uri(loc)
|
||||
return 300 <= code < 400 and is_oauth_uri(loc)
|
||||
|
||||
def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool:
|
||||
qs = parse_qs(urlparse(flow.request.url).query)
|
||||
|
|
@ -71,10 +62,10 @@ class CsrfChecker:
|
|||
headers=headers,
|
||||
content=flow.request.get_content(),
|
||||
)
|
||||
|
||||
|
||||
def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]:
|
||||
# ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답
|
||||
if not (self.is_oauth_uri(flow.request.url)
|
||||
if not (is_oauth_uri(flow.request.url)
|
||||
and self.check_nonce_in_request(flow)
|
||||
and self.is_oauth_redirect(flow)):
|
||||
return 0
|
||||
|
|
@ -85,18 +76,29 @@ class CsrfChecker:
|
|||
resp_nonce = self.get_query_param(loc, param) if param else None
|
||||
|
||||
if resp_nonce is None:
|
||||
return ["Missing nonce in redirect"]
|
||||
report_vuln(
|
||||
title="CSRF Risk",
|
||||
desc="Missing nonce in redirect response",
|
||||
status="CRITICAL",
|
||||
uri=flow.request.url
|
||||
)
|
||||
return 1
|
||||
if orig_nonce != resp_nonce:
|
||||
return ["Nonce mismatch request↔response"]
|
||||
report_vuln(
|
||||
title="CSRF Risk",
|
||||
desc="Nonce mismatch request↔response",
|
||||
status="HIGH",
|
||||
uri=flow.request.url
|
||||
)
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]:
|
||||
# OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사
|
||||
if self.is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow):
|
||||
if is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow):
|
||||
return 0
|
||||
|
||||
|
||||
loc0 = self.get_header(flow.response.headers, "location") or ""
|
||||
param = self.find_nonce_param(loc0) or "state"
|
||||
qs0 = parse_qs(urlparse(loc0).query)
|
||||
|
|
@ -111,10 +113,16 @@ class CsrfChecker:
|
|||
if new_nonce is None:
|
||||
return 0
|
||||
if new_nonce == orig_nonce:
|
||||
return ["Nonce reused without cookies"]
|
||||
report_vuln(
|
||||
title="CSRF Risk",
|
||||
desc="Nonce reused without cookies",
|
||||
status="CRITICAL",
|
||||
uri=flow.request.url
|
||||
)
|
||||
return 1
|
||||
|
||||
# (2) 두 번의 리다이렉트 비교
|
||||
async with httpx.AsyncClient(follow_redirects=False) as cli:
|
||||
async with httpx.AsyncClient(follow_redirects=True) as cli:
|
||||
# 원본 쿼리
|
||||
req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers)
|
||||
# nonce 교체 쿼리
|
||||
|
|
@ -127,42 +135,35 @@ class CsrfChecker:
|
|||
and urlparse(req1.headers.get("location", "")).path
|
||||
== urlparse(req2.headers.get("location", "")).path
|
||||
):
|
||||
return ["Identical redirects on nonce swap → potential CSRF"]
|
||||
report_vuln(
|
||||
title="CSRF Risk",
|
||||
desc="Identical redirects on nonce swap → potential CSRF",
|
||||
status="NOT-VERIFIED-HIGH",
|
||||
uri=flow.request.url
|
||||
)
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
async def response(self, flow: http.HTTPFlow) -> None:
|
||||
try:
|
||||
msgs: List[str] = []
|
||||
|
||||
# 1) 요청에 nonce 없으면
|
||||
if self.is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow):
|
||||
msgs.append("Missing state/nonce in request")
|
||||
if is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow):
|
||||
report_vuln(
|
||||
title="CSRF Risk",
|
||||
desc="Missing nonce in OAuth request",
|
||||
status="CRITICAL",
|
||||
uri=flow.request.url
|
||||
)
|
||||
return
|
||||
|
||||
# 2) 리다이렉트에서 nonce 검사
|
||||
r1 = self.check_redirect_nonce(flow)
|
||||
if r1:
|
||||
msgs.extend(r1 if isinstance(r1, list) else [])
|
||||
|
||||
# 3) nonce 재사용 검사
|
||||
r2 = await self.check_nonce_reuse(flow)
|
||||
if r2:
|
||||
msgs.extend(r2 if isinstance(r2, list) else [])
|
||||
|
||||
if msgs:
|
||||
desc = " | ".join(msgs)
|
||||
status = "MEDIUM"
|
||||
report_data = [{
|
||||
'target': target.load(),
|
||||
'status': status,
|
||||
'title': "CSRF Risk",
|
||||
'description': desc,
|
||||
'uri': flow.request.url,
|
||||
}]
|
||||
save_report(report_data)
|
||||
print(f"[INFO] CSRF Check: {desc}")
|
||||
else:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"[ERROR] CSRF Check failed: {e}")
|
||||
return
|
||||
|
|
|
|||
67
addon/google_login_hint.py
Normal file
67
addon/google_login_hint.py
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
import os
|
||||
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# .env 파일 로드
|
||||
load_dotenv(override=True)
|
||||
|
||||
class GoogleLoginHint:
|
||||
def __init__(self):
|
||||
self.google_id = os.getenv('GOOGLE_ID', '')
|
||||
if not self.google_id:
|
||||
print("⚠️ Warning: GOOGLE_ID not found in .env file")
|
||||
|
||||
async def request(self, flow):
|
||||
"""Google OAuth 요청을 가로채서 login_hint를 추가하거나 수정"""
|
||||
req = flow.request
|
||||
url = req.pretty_url
|
||||
|
||||
# Google OAuth 인증 URL인지 확인
|
||||
if self._is_google_oauth_url(url):
|
||||
print(f"🔍 Google OAuth URL detected: {url}")
|
||||
|
||||
# URL 파싱
|
||||
parsed_url = urlparse(url)
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
|
||||
# login_hint 추가 또는 수정
|
||||
if self.google_id:
|
||||
query_params['login_hint'] = [self.google_id]
|
||||
print(f"✅ Added/Updated login_hint: {self.google_id}")
|
||||
|
||||
# 새로운 쿼리 스트링 생성
|
||||
new_query = urlencode(query_params, doseq=True)
|
||||
|
||||
# 새로운 URL 생성
|
||||
new_url = urlunparse((
|
||||
parsed_url.scheme,
|
||||
parsed_url.netloc,
|
||||
parsed_url.path,
|
||||
parsed_url.params,
|
||||
new_query,
|
||||
parsed_url.fragment
|
||||
))
|
||||
|
||||
# 요청 URL 수정 - URL과 호스트 모두 업데이트
|
||||
flow.request.url = new_url
|
||||
print(f"🔄 Modified URL: {new_url}")
|
||||
|
||||
def _is_google_oauth_url(self, url):
|
||||
"""Google OAuth URL인지 확인"""
|
||||
google_oauth_domains = [
|
||||
'accounts.google.com',
|
||||
'oauth2.googleapis.com'
|
||||
]
|
||||
|
||||
parsed_url = urlparse(url)
|
||||
domain = parsed_url.netloc.lower()
|
||||
|
||||
# Google OAuth 도메인 확인
|
||||
for google_domain in google_oauth_domains:
|
||||
if google_domain in domain:
|
||||
# OAuth 관련 경로 확인
|
||||
path = parsed_url.path.lower()
|
||||
if any(oauth_path in path for oauth_path in ['/oauth2', '/auth', '/login']):
|
||||
return True
|
||||
|
||||
return False
|
||||
52
addon/google_response_type_token.py
Normal file
52
addon/google_response_type_token.py
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
from lib.report_vuln import report_vuln
|
||||
import httpx
|
||||
from lib.utils.is_oauth_uri import is_oauth_uri
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
class GoogleResponseTypeToken:
|
||||
def get_taregt_from_query(self, query: str, target: str) -> str | None:
|
||||
if not query:
|
||||
return None
|
||||
parsed = parse_qs(query)
|
||||
scope_values = parsed.get(target, [])
|
||||
if scope_values:
|
||||
return scope_values[0]
|
||||
return None
|
||||
|
||||
async def test(self, flow):
|
||||
req = flow.request
|
||||
|
||||
if not is_oauth_uri(req.pretty_url):
|
||||
return
|
||||
|
||||
if req.pretty_host != "accounts.google.com":
|
||||
return
|
||||
|
||||
if "response_type=token" in req.pretty_url:
|
||||
return
|
||||
|
||||
url = f"{req.pretty_url}".replace("response_type=code", "response_type=token")
|
||||
|
||||
async with httpx.AsyncClient(follow_redirects=True) as cli:
|
||||
response = await cli.request(
|
||||
method=req.method,
|
||||
url=url,
|
||||
headers=req.headers,
|
||||
content=req.get_content(),
|
||||
)
|
||||
|
||||
|
||||
if response.status_code >= 400:
|
||||
return
|
||||
|
||||
if "<b>400.</b>" in response.text:
|
||||
return
|
||||
|
||||
if "response_type=token" in str(response.url):
|
||||
report_vuln(
|
||||
"Google Response Type Token",
|
||||
f"Response type token allowed in {req.pretty_url}",
|
||||
"HIGH",
|
||||
str(response.url)
|
||||
)
|
||||
|
||||
149
addon/init.py
149
addon/init.py
|
|
@ -1,78 +1,93 @@
|
|||
from mitmproxy import http
|
||||
import asyncio
|
||||
from pkce_check import PKCEDowngradeChecker
|
||||
from ScopeDetection import ScopeDetection
|
||||
from addon.scope_detection import ScopeDetection
|
||||
from csrf_check import CsrfChecker
|
||||
from nonce_check import NonceChecker
|
||||
from client_secret import ClientSecret
|
||||
from addon.open_redirect_check import OpenRedirectChecker
|
||||
from access_token import AccessTokenScanner
|
||||
from addon.google_login_hint import GoogleLoginHint
|
||||
from addon.google_response_type_token import GoogleResponseTypeToken
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
from lib.utils.try_catch import try_catch
|
||||
from lib.false_true_varifing_task import FalseTrueVarifingTask
|
||||
|
||||
class PKCEAddon:
|
||||
def __init__(self):
|
||||
self.checker = PKCEDowngradeChecker()
|
||||
# Initialize the singleton task manager
|
||||
false_true_varifing_task = FalseTrueVarifingTask()
|
||||
|
||||
async def request(self, flow: http.HTTPFlow):
|
||||
print(
|
||||
f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}"
|
||||
)
|
||||
try:
|
||||
await self.checker.test(flow)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Addon failed: {e}")
|
||||
pass
|
||||
|
||||
|
||||
class CsrfAddon:
|
||||
def __init__(self):
|
||||
self.checker = CsrfChecker()
|
||||
load_dotenv(override=True)
|
||||
|
||||
async def response(self, flow: http.HTTPFlow):
|
||||
try:
|
||||
await self.checker.response(flow)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] CSRF Addon failed: {e}")
|
||||
pass
|
||||
|
||||
|
||||
class ScopeAddon:
|
||||
def __init__(self):
|
||||
self.checker = ScopeDetection()
|
||||
self._flow_map = {} # 요청 정보를 저장
|
||||
_open_redirect_checker = OpenRedirectChecker()
|
||||
|
||||
async def request(self, flow: http.HTTPFlow):
|
||||
self._flow_map[flow.id] = {
|
||||
"method": flow.request.method,
|
||||
"url": flow.request.pretty_url,
|
||||
"query": flow.request.query,
|
||||
}
|
||||
|
||||
async def response(self, flow: http.HTTPFlow):
|
||||
try:
|
||||
await self.checker.test(flow)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] ScopeDetection failed: {e}")
|
||||
|
||||
class NonceAddon:
|
||||
def __init__(self):
|
||||
self.checker = NonceChecker()
|
||||
|
||||
async def response(self, flow: http.HTTPFlow):
|
||||
try:
|
||||
await self.checker.response(flow)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] NonceAddon failed: {e}")
|
||||
pass
|
||||
|
||||
|
||||
class AccessTokenAddon:
|
||||
def __init__(self):
|
||||
self.checker = AccessTokenScanner()
|
||||
|
||||
async def response(self, flow: http.HTTPFlow):
|
||||
try:
|
||||
await self.checker.scan(flow)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] AccessToken Addon failed: {e}")
|
||||
pass
|
||||
class AddonBase:
|
||||
"""
|
||||
Base class for addons.
|
||||
Each addon should implement its own request or response method.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
if os.getenv('GOOGLE_ID'):
|
||||
self.google_login_hint = GoogleLoginHint()
|
||||
else:
|
||||
self.google_login_hint = None
|
||||
|
||||
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), AccessTokenAddon()]
|
||||
def should_ignore(self, flow: http.HTTPFlow) -> bool:
|
||||
"""Check if the request should be ignored."""
|
||||
ignore_domains = [
|
||||
".googleapis.com",
|
||||
"android.clients.google.com", # Added missing comma here
|
||||
".adtrafficquality.google",
|
||||
".googlesyndication.com",
|
||||
"cdn.jsdelivr.net",
|
||||
"update.googleapis.com",
|
||||
".google-analytics.com",
|
||||
".gstatic.com"
|
||||
]
|
||||
# Ignore .googleapis.com domains
|
||||
for domain in ignore_domains:
|
||||
if domain in flow.request.pretty_host:
|
||||
return True
|
||||
|
||||
# Ignore static files (JS, CSS, fonts, images, etc.)
|
||||
# Split on '?' to remove query parameters before checking extension
|
||||
path = flow.request.path.split('?')[0].lower()
|
||||
static_extensions = [
|
||||
'.js', '.css', '.woff2', '.woff', '.ttf', '.otf', '.svg',
|
||||
'.png', '.jpg', '.jpeg', '.gif', '.webp', '.ico', '.bmp',
|
||||
'.tiff', '.tif', '.webm', '.mp4', '.avi', '.mov', '.pdf', '.md',
|
||||
'.txt', '.csv'
|
||||
]
|
||||
|
||||
if any(path.endswith(ext) for ext in static_extensions):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
async def request(self, flow: http.HTTPFlow):
|
||||
if self.google_login_hint:
|
||||
await try_catch(self.google_login_hint.request(flow))
|
||||
|
||||
if false_true_varifing_task.is_verifing_false_true():
|
||||
return
|
||||
|
||||
tasks = [
|
||||
try_catch(PKCEDowngradeChecker().test(flow)),
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
async def response(self, flow: http.HTTPFlow):
|
||||
if false_true_varifing_task.is_verifing_false_true() or self.should_ignore(flow):
|
||||
return
|
||||
|
||||
tasks = [
|
||||
try_catch(CsrfChecker().response(flow)),
|
||||
try_catch(ScopeDetection().test(flow)),
|
||||
try_catch(ClientSecret().test(flow)),
|
||||
try_catch(AccessTokenScanner().scan(flow)),
|
||||
try_catch(GoogleResponseTypeToken().test(flow)),
|
||||
try_catch(_open_redirect_checker.test(flow)),
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
addons = [AddonBase()]
|
||||
|
|
|
|||
|
|
@ -1,10 +1,8 @@
|
|||
import jwt
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
from typing import Union
|
||||
import httpx
|
||||
|
||||
import lib.target as target
|
||||
from lib.report import save_report
|
||||
from lib.report_vuln import report_vuln
|
||||
|
||||
class NonceChecker:
|
||||
def is_oidc_flow(self, flow) -> bool:
|
||||
|
|
@ -68,21 +66,17 @@ class NonceChecker:
|
|||
except Exception as e:
|
||||
return {}
|
||||
|
||||
|
||||
# TODO id_token을 파싱하는 부분이 누락되어있습니다.
|
||||
def check_nonce_in_id_token(self, flow, id_token: str) -> bool:
|
||||
decoded = self.decode_id_token(id_token)
|
||||
nonce = decoded.get("nonce")
|
||||
req = flow.request
|
||||
url = req.pretty_url
|
||||
if not nonce:
|
||||
report_data = [{
|
||||
'target': target.load(),
|
||||
'status': "CRITICAL",
|
||||
'title': "nonce is missing in id_token",
|
||||
'description': "Nonce is present in the request but missing in the id_token.",
|
||||
'uri': f"Original: {url}\nDecoded ID Token: {decoded}",
|
||||
}]
|
||||
save_report(report_data)
|
||||
report_vuln(
|
||||
title="Nonce Check Failed",
|
||||
desc="id_token에 nonce가 없습니다.",
|
||||
status="HIGH",
|
||||
uri=flow.request.url
|
||||
)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
|
|
|||
1722
addon/open_redirect_check.py
Normal file
1722
addon/open_redirect_check.py
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -1,10 +1,7 @@
|
|||
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
|
||||
import asyncio
|
||||
import httpx
|
||||
from typing import Dict, List
|
||||
|
||||
import lib.target as target
|
||||
from lib.report import save_report
|
||||
from lib.report_vuln import report_vuln
|
||||
|
||||
|
||||
class PKCEDowngradeChecker:
|
||||
|
|
@ -58,27 +55,19 @@ class PKCEDowngradeChecker:
|
|||
|
||||
async def report_missing_parameters(self, url: str, is_openid: bool):
|
||||
status = "MEDIUM" if is_openid else "LOW"
|
||||
self.save(
|
||||
[
|
||||
self.make_report(
|
||||
status,
|
||||
"PKCE Parameters Missing",
|
||||
"PKCE parameters are missing or incomplete.",
|
||||
url,
|
||||
)
|
||||
]
|
||||
report_vuln(
|
||||
title="PKCE Parameters Missing",
|
||||
desc="PKCE parameters are missing or incomplete.",
|
||||
status=status,
|
||||
uri=url,
|
||||
)
|
||||
|
||||
async def report_plain_method(self, url: str):
|
||||
self.save(
|
||||
[
|
||||
self.make_report(
|
||||
"CRITICAL",
|
||||
"PKCE Plain Method",
|
||||
"PKCE method is set to 'plain'. Possible downgrade.",
|
||||
url,
|
||||
)
|
||||
]
|
||||
report_vuln(
|
||||
title="PKCE Plain Method",
|
||||
desc="PKCE method is set to 'plain'. Possible downgrade.",
|
||||
status="CRITICAL",
|
||||
uri=url,
|
||||
)
|
||||
|
||||
def create_downgraded_url(self, parsed, query):
|
||||
|
|
@ -150,15 +139,11 @@ class PKCEDowngradeChecker:
|
|||
else:
|
||||
return # Likely safe
|
||||
|
||||
self.save(
|
||||
[
|
||||
self.make_report(
|
||||
status,
|
||||
title,
|
||||
description,
|
||||
f"Original: {original_url}\nDowngraded: {downgraded_url}",
|
||||
)
|
||||
]
|
||||
report_vuln(
|
||||
title=title,
|
||||
desc=description,
|
||||
status=status,
|
||||
uri=f"Original: {original_url}\nDowngraded: {downgraded_url}",
|
||||
)
|
||||
|
||||
def same_redirect_destination(self, orig_loc, down_loc):
|
||||
|
|
@ -166,16 +151,3 @@ class PKCEDowngradeChecker:
|
|||
down = urlparse(down_loc)
|
||||
return orig.netloc == down.netloc and orig.path == down.path
|
||||
|
||||
def make_report(
|
||||
self, status: str, title: str, description: str, uri: str
|
||||
) -> Dict[str, str]:
|
||||
return {
|
||||
"target": target.load(),
|
||||
"status": status,
|
||||
"title": title,
|
||||
"description": description,
|
||||
"uri": uri,
|
||||
}
|
||||
|
||||
def save(self, report_data: List[Dict[str, str]]):
|
||||
save_report(report_data)
|
||||
|
|
|
|||
32
addon/scope_detection.py
Normal file
32
addon/scope_detection.py
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
from lib.report_vuln import report_vuln
|
||||
from lib.utils.is_oauth_uri import is_oauth_uri
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
class ScopeDetection:
|
||||
def get_scope_from_query(self, query: str) -> str | None:
|
||||
if not query:
|
||||
return None
|
||||
parsed = parse_qs(query)
|
||||
scope_values = parsed.get("scope", [])
|
||||
if scope_values:
|
||||
return scope_values[0]
|
||||
return None
|
||||
|
||||
async def test(self, flow):
|
||||
if not is_oauth_uri(flow.request.pretty_url):
|
||||
return
|
||||
|
||||
req = flow.request
|
||||
|
||||
parsed = urlparse(req.pretty_url)
|
||||
query = parsed.query
|
||||
|
||||
query_scope = self.get_scope_from_query(query)
|
||||
|
||||
if query_scope in ["all", "*"]:
|
||||
report_vuln(
|
||||
title="OAuth Scope Value Issue",
|
||||
desc=f"Scope value issue detected in request: {query_scope}",
|
||||
status="WARNING",
|
||||
uri=req.pretty_url
|
||||
)
|
||||
65
lib/false_true_varifing_task.py
Normal file
65
lib/false_true_varifing_task.py
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
from typing import Any
|
||||
from copy import deepcopy
|
||||
|
||||
class FalseTrueVarifingTask:
|
||||
"""
|
||||
A singleton class representing a task that can be either false or true.
|
||||
This class is used to handle tasks that require verification of their truth value.
|
||||
"""
|
||||
|
||||
_instance = None
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(FalseTrueVarifingTask, cls).__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
if self._initialized:
|
||||
return
|
||||
self._is_verifing = False
|
||||
self.task_queue = []
|
||||
self._initialized = True
|
||||
|
||||
def reset(self):
|
||||
"""
|
||||
Reset the task queue and verification status.
|
||||
"""
|
||||
self._is_verifing = False
|
||||
self.task_queue.clear()
|
||||
|
||||
# 각 addon의 검증 로직에서 해당 함수를 호출하여, 추후 오탐 검증을 위한 작업을 추가할 수 있습니다.
|
||||
# TODO: 모델 지정해두기
|
||||
def add_task(self, task_name: str, initial_uri: str, data: Any):
|
||||
"""
|
||||
Add a task to the task queue.
|
||||
:param task: The task to be added.
|
||||
"""
|
||||
self.task_queue.append(
|
||||
{
|
||||
"task_name": task_name,
|
||||
"initial_uri": initial_uri,
|
||||
"data": data
|
||||
}
|
||||
)
|
||||
|
||||
def start_verification(self):
|
||||
"""
|
||||
Start the verification process for the tasks in the queue.
|
||||
"""
|
||||
self._is_verifing = True
|
||||
|
||||
def get_task_queue(self):
|
||||
"""
|
||||
Get a copy of the current task queue.
|
||||
:return: A copy of the task queue.
|
||||
"""
|
||||
return deepcopy(self.task_queue)
|
||||
|
||||
def is_verifing_false_true(self):
|
||||
"""
|
||||
Get the current verification status.
|
||||
:return: True if verification is in progress, False otherwise.
|
||||
"""
|
||||
return self._is_verifing
|
||||
|
|
@ -1,14 +1,16 @@
|
|||
# save as data/report.csv
|
||||
import os
|
||||
import csv
|
||||
from typing import List, Dict, Any
|
||||
from mitmproxy import http
|
||||
import lib.cur_target_url as cur_target_url
|
||||
|
||||
# target, status, title, description, uri
|
||||
|
||||
# file path는 'data/report.csv'로 고정
|
||||
def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report.csv') -> None:
|
||||
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
||||
def report_vuln(title: str, desc: str, status: str, uri: str) -> None:
|
||||
file_path: str = 'data/report.csv'
|
||||
|
||||
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
||||
|
||||
"""
|
||||
report_data 안의 각 레포트를 한 줄씩 CSV에 추가로 저장합니다.
|
||||
|
|
@ -23,10 +25,10 @@ def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report
|
|||
if not file_exists:
|
||||
writer.writeheader()
|
||||
|
||||
for row in report_data:
|
||||
# None 방지 & 줄바꿈 이스케이프
|
||||
escaped = {
|
||||
k: str(v).replace('\n', '\\n') if v is not None else ''
|
||||
for k, v in row.items()
|
||||
}
|
||||
writer.writerow(escaped)
|
||||
writer.writerow({
|
||||
'target': cur_target_url.load(),
|
||||
'status': status,
|
||||
'title': title,
|
||||
'description': desc,
|
||||
'uri': uri,
|
||||
})
|
||||
10
lib/utils/is_oauth_uri.py
Normal file
10
lib/utils/is_oauth_uri.py
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
def is_oauth_uri(uri: str) -> bool:
|
||||
qs = parse_qs(urlparse(uri).query)
|
||||
qs_keys = [*qs]
|
||||
|
||||
if "client_id" in qs_keys and any(p in qs_keys for p in (
|
||||
"redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")):
|
||||
return True
|
||||
return False
|
||||
5
lib/utils/try_catch.py
Normal file
5
lib/utils/try_catch.py
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
async def try_catch(coro):
|
||||
try:
|
||||
return await coro
|
||||
except Exception as e:
|
||||
print(f"[ERROR] {coro} failed: {e}")
|
||||
30
main.py
30
main.py
|
|
@ -1,21 +1,23 @@
|
|||
from runner.proxy import run_proxy
|
||||
import subprocess
|
||||
import threading
|
||||
import uvicorn
|
||||
from runner.backend import app
|
||||
|
||||
def run_fastapi_server():
|
||||
"""FastAPI 서버를 실행하는 함수"""
|
||||
uvicorn.run(app, host="localhost", port=11081, log_level="info")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Start web server in a separate thread
|
||||
server_process = subprocess.Popen([
|
||||
"granian",
|
||||
"--interface", "asgi",
|
||||
"--host", "0.0.0.0",
|
||||
"--port", "11081",
|
||||
"--loop", "asyncio",
|
||||
"--reload",
|
||||
"runner.backend:app",
|
||||
])
|
||||
|
||||
try:
|
||||
# Run mitmdump proxy
|
||||
# FastAPI 서버를 백그라운드 스레드에서 실행
|
||||
fastapi_thread = threading.Thread(target=run_fastapi_server, daemon=True)
|
||||
fastapi_thread.start()
|
||||
print("🚀 FastAPI server started on http://localhost:11081")
|
||||
|
||||
# Run mitmdump proxy (메인 스레드에서 실행)
|
||||
print("🛡️ Starting mitmdump proxy on port 11080...")
|
||||
run_proxy()
|
||||
except KeyboardInterrupt:
|
||||
print("🛑 Shutting down...")
|
||||
finally:
|
||||
server_process.terminate()
|
||||
print("✅ Mitmdump proxy has been stopped.")
|
||||
|
|
@ -1,17 +1,107 @@
|
|||
from fastapi import FastAPI, Query, HTTPException
|
||||
from fastapi.responses import Response
|
||||
import lib.target as target
|
||||
import lib.cur_target_url as cur_target_url
|
||||
from lib.false_true_varifing_task import FalseTrueVarifingTask
|
||||
from pydantic import BaseModel, Field
|
||||
from lib.report_vuln import report_vuln as save_vuln
|
||||
|
||||
# Initialize the singleton task manager
|
||||
false_true_varifing_task = FalseTrueVarifingTask()
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@app.post("/start")
|
||||
async def start(url: str = Query(None)):
|
||||
if url:
|
||||
target.save(url)
|
||||
print(f"Target URL set to: {url}")
|
||||
return {"message": f"Target URL set to: {url}"}
|
||||
return {"error": "No URL provided"}
|
||||
|
||||
@app.post(
|
||||
"/start",
|
||||
summary="취약점 검증을 위한 대상 URL 설정",
|
||||
description="""
|
||||
이 엔드포인트는 시스템이 취약점 검증 작업에 사용할 대상 URL을 설정합니다.
|
||||
|
||||
유효한 URL이 제공되면:
|
||||
- 해당 URL이 저장됩니다.
|
||||
- 검증 작업 큐가 초기화됩니다.
|
||||
- 새로운 검증 작업을 시작할 준비가 완료됩니다.
|
||||
|
||||
URL이 제공되지 않으면, 오류가 반환됩니다.
|
||||
"""
|
||||
,
|
||||
tags=["1st STEP"]
|
||||
)
|
||||
async def start(url: str = Query(..., description="The URL to target for vulnerability verification")):
|
||||
cur_target_url.save(url)
|
||||
false_true_varifing_task.reset()
|
||||
return {"message": f"Target URL set to: {url}"}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@app.post(
|
||||
"/start-false-true-verifing",
|
||||
summary="시스템에 오탐 검증 작업 시작을 알림",
|
||||
description="""
|
||||
이 엔드포인트는 시스템에 오탐 검증 작업이 시작되었음을 알립니다.
|
||||
또한 시스템은 미리 준비된 오탐 검증 작업 목록을 반환합니다.
|
||||
|
||||
```json
|
||||
{
|
||||
"payload": [
|
||||
{
|
||||
"task_name": "pkce_task", # 검증 작업의 이름
|
||||
"initial_uri": "http://auth.example.com", # browser가 처음 접속할 URI
|
||||
"data": any # 추가 데이터
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
```
|
||||
""",
|
||||
tags=["2nd STEP"]
|
||||
)
|
||||
async def start_false_true_verifing():
|
||||
false_true_varifing_task.start_verification()
|
||||
task_queue = false_true_varifing_task.get_task_queue()
|
||||
return {"payload": task_queue}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class VulnerabilityReport(BaseModel):
|
||||
title: str = Field(..., description="Short title for the vulnerability")
|
||||
url: str = Field(..., description="URL where the vulnerability was discovered")
|
||||
status: str = Field(..., description="Status of the vulnerability (e.g., VERIFIED-CRITICAL)")
|
||||
desc: str = Field(..., description="Detailed description of the issue")
|
||||
|
||||
@app.post(
|
||||
"/report-vuln",
|
||||
summary="취약점 보고",
|
||||
description="""
|
||||
정탐인 취약점을 시스템에 보고합니다.
|
||||
보고 시 다음 정보를 포함해야 합니다:
|
||||
|
||||
- **title**: 취약점의 간단한 이름
|
||||
- **url**: 취약점이 발견된 위치 (URL)
|
||||
- **status**: 심각도
|
||||
- **desc**: 취약점에 대한 상세 설명
|
||||
""",
|
||||
tags=["3rd STEP"]
|
||||
)
|
||||
async def report_vuln(vuln: VulnerabilityReport):
|
||||
save_vuln(
|
||||
title=vuln.title,
|
||||
desc=vuln.desc,
|
||||
status=vuln.status,
|
||||
uri=vuln.url
|
||||
)
|
||||
|
||||
return {"message": "Vulnerability reported successfully"}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@app.exception_handler(404)
|
||||
|
|
@ -20,4 +110,4 @@ async def not_found_handler(request, exc):
|
|||
|
||||
@app.exception_handler(405)
|
||||
async def method_not_allowed_handler(request, exc):
|
||||
return Response(status_code=405)
|
||||
return Response(status_code=405)
|
||||
|
|
|
|||
|
|
@ -3,4 +3,4 @@ from mitmproxy.tools.main import mitmdump
|
|||
|
||||
def run_proxy():
|
||||
sys.argv = ["mitmdump", "-s", "./addon/init.py", "--listen-port", "11080"]
|
||||
mitmdump()
|
||||
mitmdump()
|
||||
Loading…
Add table
Add a link
Reference in a new issue