oauth-backend/runner/backend/__init__.py
2025-06-26 15:45:39 +09:00

113 lines
3.1 KiB
Python

from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import Response
import lib.cur_target_url as cur_target_url
from lib.false_true_varifing_task import FalseTrueVarifingTask
from pydantic import BaseModel, Field
from lib.report_vuln import report_vuln as save_vuln
# Initialize the singleton task manager
false_true_varifing_task = FalseTrueVarifingTask()
app = FastAPI()
@app.post(
"/start",
summary="취약점 검증을 위한 대상 URL 설정",
description="""
이 엔드포인트는 시스템이 취약점 검증 작업에 사용할 대상 URL을 설정합니다.
유효한 URL이 제공되면:
- 해당 URL이 저장됩니다.
- 검증 작업 큐가 초기화됩니다.
- 새로운 검증 작업을 시작할 준비가 완료됩니다.
URL이 제공되지 않으면, 오류가 반환됩니다.
"""
,
tags=["1st STEP"]
)
async def start(url: str = Query(..., description="The URL to target for vulnerability verification")):
cur_target_url.save(url)
false_true_varifing_task.reset()
return {"message": f"Target URL set to: {url}"}
@app.post(
"/start-false-true-verifing",
summary="시스템에 오탐 검증 작업 시작을 알림",
description="""
이 엔드포인트는 시스템에 오탐 검증 작업이 시작되었음을 알립니다.
또한 시스템은 미리 준비된 오탐 검증 작업 목록을 반환합니다.
```json
{
"payload": [
{
"task_name": "pkce_task", # 검증 작업의 이름
"initial_uri": "http://auth.example.com", # browser가 처음 접속할 URI
"data": any # 추가 데이터
},
...
]
}
```
""",
tags=["2nd STEP"]
)
async def start_false_true_verifing():
false_true_varifing_task.start_verification()
task_queue = false_true_varifing_task.get_task_queue()
return {"payload": task_queue}
class VulnerabilityReport(BaseModel):
title: str = Field(..., description="Short title for the vulnerability")
url: str = Field(..., description="URL where the vulnerability was discovered")
status: str = Field(..., description="Status of the vulnerability (e.g., VERIFIED-CRITICAL)")
desc: str = Field(..., description="Detailed description of the issue")
@app.post(
"/report-vuln",
summary="취약점 보고",
description="""
정탐인 취약점을 시스템에 보고합니다.
보고 시 다음 정보를 포함해야 합니다:
- **title**: 취약점의 간단한 이름
- **url**: 취약점이 발견된 위치 (URL)
- **status**: 심각도
- **desc**: 취약점에 대한 상세 설명
""",
tags=["3rd STEP"]
)
async def report_vuln(vuln: VulnerabilityReport):
save_vuln(
title=vuln.title,
desc=vuln.desc,
status=vuln.status,
uri=vuln.url
)
return {"message": "Vulnerability reported successfully"}
@app.exception_handler(404)
async def not_found_handler(request, exc):
return Response(status_code=404)
@app.exception_handler(405)
async def method_not_allowed_handler(request, exc):
return Response(status_code=405)