[Update] 자동 오탐 검증을 위한 라우터 추가

This commit is contained in:
tv0924@icloud.com 2025-06-26 15:20:30 +09:00
commit 3c5db3c1fd
5 changed files with 188 additions and 23 deletions

View file

@ -27,7 +27,7 @@ venv와 패키지가 설치가 됩니다.
> 그렇지 않으면 실행되지 않습니다. > 그렇지 않으면 실행되지 않습니다.
> >
> 윈도우 환경에서는 `sudo certutil -addstore root mitmproxy-ca-cert.cer`로 인증합니다. > 윈도우 환경에서는 `sudo certutil -addstore root mitmproxy-ca-cert.cer`로 인증합니다.
> >
> Sudo가 활성화되어있지 않은 환경에서는 관리자로 상향된 쉘에서 실행합니다. > Sudo가 활성화되어있지 않은 환경에서는 관리자로 상향된 쉘에서 실행합니다.
> >
> MacOS 환경에서는 `sudo security add-trusted-cert -d -p ssl -p basic -k /Library/Keychains/System.keychain ~/.mitmproxy/mitmproxy-ca-cert.pem`으로 인증합니다. > MacOS 환경에서는 `sudo security add-trusted-cert -d -p ssl -p basic -k /Library/Keychains/System.keychain ~/.mitmproxy/mitmproxy-ca-cert.pem`으로 인증합니다.
@ -58,8 +58,15 @@ class LoggerAddon:
self.checker = Example() self.checker = Example()
def request(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것 def request(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것
self.checker.test(flow) # 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
self.checker.test(flow)
def response(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것 def response(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
self.checker.test(flow) self.checker.test(flow)
``` ```
@ -88,3 +95,9 @@ class Example:
``` ```
이러한 예제를 참고하여 작성하여주세요. 이러한 예제를 참고하여 작성하여주세요.
# 백엔드 API DOCS
```
http://localhost:11081/redoc
```

View file

@ -76,10 +76,10 @@ class CsrfChecker:
resp_nonce = self.get_query_param(loc, param) if param else None resp_nonce = self.get_query_param(loc, param) if param else None
if resp_nonce is None: if resp_nonce is None:
report_vuln(title="CSRF Risk", desc="Missing nonce in redirect response", status="HIGH", uri=flow.request.url) report_vuln(title="CSRF Risk", desc="Missing nonce in redirect response", status="CRITICAL", uri=flow.request.url)
return 1 return 1
if orig_nonce != resp_nonce: if orig_nonce != resp_nonce:
report_vuln(title="CSRF Risk", desc="Nonce mismatch request↔response", status="MEDIUM", uri=flow.request.url) report_vuln(title="CSRF Risk", desc="Nonce mismatch request↔response", status="HIGH", uri=flow.request.url)
return 1 return 1
return 0 return 0
@ -103,11 +103,11 @@ class CsrfChecker:
if new_nonce is None: if new_nonce is None:
return 0 return 0
if new_nonce == orig_nonce: if new_nonce == orig_nonce:
report_vuln(title="CSRF Risk", desc="Nonce reused without cookies", status="HIGH", uri=flow.request.url) report_vuln(title="CSRF Risk", desc="Nonce reused without cookies", status="CRITICAL", uri=flow.request.url)
return 1 return 1
# (2) 두 번의 리다이렉트 비교 # (2) 두 번의 리다이렉트 비교
async with httpx.AsyncClient(follow_redirects=False) as cli: async with httpx.AsyncClient(follow_redirects=True) as cli:
# 원본 쿼리 # 원본 쿼리
req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers) req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers)
# nonce 교체 쿼리 # nonce 교체 쿼리
@ -120,7 +120,7 @@ class CsrfChecker:
and urlparse(req1.headers.get("location", "")).path and urlparse(req1.headers.get("location", "")).path
== urlparse(req2.headers.get("location", "")).path == urlparse(req2.headers.get("location", "")).path
): ):
report_vuln(title="CSRF Risk", desc="Identical redirects on nonce swap → potential CSRF", status="MEDIUM", uri=flow.request.url) report_vuln(title="CSRF Risk", desc="Identical redirects on nonce swap → potential CSRF", status="NOT-VERIFIED-HIGH", uri=flow.request.url)
return 1 return 1
return 0 return 0
@ -130,7 +130,7 @@ class CsrfChecker:
# 1) 요청에 nonce 없으면 # 1) 요청에 nonce 없으면
if is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow): if is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow):
report_vuln(title="CSRF Risk", desc="Missing nonce in OAuth request", status="HIGH", uri=flow.request.url) report_vuln(title="CSRF Risk", desc="Missing nonce in OAuth request", status="CRITICAL", uri=flow.request.url)
return return
# 2) 리다이렉트에서 nonce 검사 # 2) 리다이렉트에서 nonce 검사

View file

@ -9,6 +9,10 @@ from access_token import AccessTokenScanner
from addon.google_login_hint import GoogleLoginHint from addon.google_login_hint import GoogleLoginHint
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
from lib.false_true_varifing_task import FalseTrueVarifingTask
# Initialize the singleton task manager
false_true_varifing_task = FalseTrueVarifingTask()
load_dotenv(override=True) load_dotenv(override=True)
@ -21,6 +25,10 @@ class PKCEAddon:
f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}" f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}"
) )
try: try:
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
await self.checker.test(flow) await self.checker.test(flow)
except Exception as e: except Exception as e:
print(f"[ERROR] Addon failed: {e}") print(f"[ERROR] Addon failed: {e}")
@ -33,6 +41,9 @@ class CsrfAddon:
async def response(self, flow: http.HTTPFlow): async def response(self, flow: http.HTTPFlow):
try: try:
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
await self.checker.response(flow) await self.checker.response(flow)
except Exception as e: except Exception as e:
print(f"[ERROR] CSRF Addon failed: {e}") print(f"[ERROR] CSRF Addon failed: {e}")
@ -42,21 +53,18 @@ class CsrfAddon:
class ScopeAddon: class ScopeAddon:
def __init__(self): def __init__(self):
self.checker = ScopeDetection() self.checker = ScopeDetection()
self._flow_map = {} # 요청 정보를 저장
async def request(self, flow: http.HTTPFlow):
self._flow_map[flow.id] = {
"method": flow.request.method,
"url": flow.request.pretty_url,
"query": flow.request.query,
}
async def response(self, flow: http.HTTPFlow): async def response(self, flow: http.HTTPFlow):
try: try:
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
await self.checker.test(flow) await self.checker.test(flow)
except Exception as e: except Exception as e:
print(f"[ERROR] ScopeDetection failed: {e}") print(f"[ERROR] ScopeDetection failed: {e}")
class NonceAddon: class NonceAddon:
def __init__(self): def __init__(self):
self.checker = NonceChecker() self.checker = NonceChecker()
@ -70,12 +78,17 @@ class NonceAddon:
print(f"[ERROR] NonceAddon failed: {e}") print(f"[ERROR] NonceAddon failed: {e}")
pass pass
class AccessTokenAddon: class AccessTokenAddon:
def __init__(self): def __init__(self):
self.checker = AccessTokenScanner() self.checker = AccessTokenScanner()
async def response(self, flow: http.HTTPFlow): async def response(self, flow: http.HTTPFlow):
try: try:
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
await self.checker.scan(flow) await self.checker.scan(flow)
except Exception as e: except Exception as e:
print(f"[ERROR] AccessToken Addon failed: {e}") print(f"[ERROR] AccessToken Addon failed: {e}")
@ -88,6 +101,9 @@ class RedirectBypassAddon:
# request 대신 response 로 바꿔 보세요: # request 대신 response 로 바꿔 보세요:
async def response(self, flow: http.HTTPFlow): async def response(self, flow: http.HTTPFlow):
try: try:
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
await self.checker.test(flow) await self.checker.test(flow)
except Exception as e: except Exception as e:
print(f"[ERROR] RedirectBypass Addon failed: {e}") print(f"[ERROR] RedirectBypass Addon failed: {e}")

View file

@ -0,0 +1,59 @@
from typing import Any
from copy import deepcopy
class FalseTrueVarifingTask:
"""
A singleton class representing a task that can be either false or true.
This class is used to handle tasks that require verification of their truth value.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(FalseTrueVarifingTask, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._is_verifing = False
self.task_queue = []
self._initialized = True
def reset(self):
"""
Reset the task queue and verification status.
"""
self._is_verifing = False
self.task_queue.clear()
def add_task(self, task_name: str, data: Any):
"""
Add a task to the task queue.
:param task: The task to be added.
"""
self.task_queue.append({
"task_name": task_name,
"data": data})
def start_verification(self):
"""
Start the verification process for the tasks in the queue.
"""
self._is_verifing = True
def get_task_queue(self):
"""
Get a copy of the current task queue.
:return: A copy of the task queue.
"""
return deepcopy(self.task_queue)
def is_verifing_false_true(self):
"""
Get the current verification status.
:return: True if verification is in progress, False otherwise.
"""
return self._is_verifing

View file

@ -1,17 +1,94 @@
from fastapi import FastAPI, Query, HTTPException from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import Response from fastapi.responses import Response
import lib.cur_target_url as cur_target_url import lib.cur_target_url as cur_target_url
from lib.false_true_varifing_task import FalseTrueVarifingTask
from pydantic import BaseModel, Field
from lib.report_vuln import report_vuln as save_vuln
# Initialize the singleton task manager
false_true_varifing_task = FalseTrueVarifingTask()
app = FastAPI() app = FastAPI()
@app.post("/start")
async def start(url: str = Query(None)): @app.post(
if url: "/start",
cur_target_url.save(url) summary="취약점 검증을 위한 대상 URL 설정",
print(f"Target URL set to: {url}") description="""
return {"message": f"Target URL set to: {url}"} 엔드포인트는 시스템이 취약점 검증 작업에 사용할 대상 URL을 설정합니다.
return {"error": "No URL provided"}
유효한 URL이 제공되면:
- 해당 URL이 저장됩니다.
- 검증 작업 큐가 초기화됩니다.
- 새로운 검증 작업을 시작할 준비가 완료됩니다.
URL이 제공되지 않으면, 오류가 반환됩니다.
"""
,
tags=["1st STEP"]
)
async def start(url: str = Query(..., description="The URL to target for vulnerability verification")):
cur_target_url.save(url)
false_true_varifing_task.reset()
return {"message": f"Target URL set to: {url}"}
@app.post(
"/start-false-true-verifing",
summary="시스템에 False-True 검증 작업 시작을 알림",
description="""
엔드포인트는 시스템에 False-True 방식의 검증 작업을 시작하도록 지시합니다.
또한 검증을 위해 준비된 작업 목록을 반환합니다.
""",
tags=["2nd STEP"]
)
async def start_false_true_verifing():
false_true_varifing_task.start_verification()
task_queue = false_true_varifing_task.get_task_queue()
return {"payload": task_queue}
class VulnerabilityReport(BaseModel):
title: str = Field(..., description="Short title for the vulnerability")
url: str = Field(..., description="URL where the vulnerability was discovered")
status: str = Field(..., description="Status of the vulnerability (e.g., VERIFIED-CRITICAL)")
desc: str = Field(..., description="Detailed description of the issue")
@app.post(
"/report-vuln",
summary="취약점 보고",
description="""
정탐인 취약점을 시스템에 보고합니다.
보고 다음 정보를 포함해야 합니다:
- **title**: 취약점의 간단한 이름
- **url**: 취약점이 발견된 위치 (URL)
- **status**: 심각도
- **desc**: 취약점에 대한 상세 설명
""",
tags=["3rd STEP"]
)
async def report_vuln(vuln: VulnerabilityReport):
save_vuln(
title=vuln.title,
desc=vuln.desc,
status=vuln.status,
uri=vuln.url
)
return {"message": "Vulnerability reported successfully"}
@app.exception_handler(404) @app.exception_handler(404)