Merge pull request #3 from j93es/yeon

ScopeDetection 코드 수정 완료
This commit is contained in:
암냥 2025-06-08 01:26:37 +09:00 committed by GitHub
commit 909f5006bd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 80 additions and 1 deletions

53
addon/ScopeDetection.py Normal file
View file

@ -0,0 +1,53 @@
import lib.target as target
from lib.report import save_report
class ScopeDetection:
def get_scope_from_query(self, query: str) -> str | None:
if not query:
return None
import urllib.parse
parsed = urllib.parse.parse_qs(query)
scope_values = parsed.get("scope", [])
if scope_values:
return scope_values[0]
return None
async def check_scope(self, flow):
req = flow.request
res = flow.response
# req.query가 MultiDictView일 수 있으므로 문자열로 변환
if hasattr(req.query, "urlencode"):
query = req.query.urlencode()
else:
query = str(req.query) if req.query else ""
location = res.headers.get("location", "")
query_scope = self.get_scope_from_query(query)
location_scope = self.get_scope_from_query(location)
result = []
if query_scope in ["all", "*"]:
result.append(f"Scope value issue detected in request: {query_scope}")
if location_scope in ["all", "*"]:
result.append(f"Scope value issue detected in response location: {location_scope}")
return result if result else 0
async def test(self, flow):
req = flow.request
method = req.method
url = req.pretty_url
result = await self.check_scope(flow)
if result != 0:
report_data = [{
'target': target.load(),
'status': "WARNING",
'title': "OAuth scope value issue",
'description': f"{method} {url}: {', '.join(result)}",
'uri': url
}]
save_report(report_data)

View file

@ -1,6 +1,7 @@
from mitmproxy import http
import asyncio
from pkce_check import PKCEDowngradeChecker
from ScopeDetection import ScopeDetection
class PKCEAddon:
def __init__(self):
@ -13,4 +14,25 @@ class PKCEAddon:
except Exception as e:
print(f"[ERROR] Addon failed: {e}")
addons = [PKCEAddon()]
class ScopeAddon:
def __init__(self):
self.checker = ScopeDetection()
self._flow_map = {} # 요청 정보를 저장
async def request(self, flow: http.HTTPFlow):
self._flow_map[flow.id] = {
"method": flow.request.method,
"url": flow.request.pretty_url,
"query": flow.request.query,
}
async def response(self, flow: http.HTTPFlow):
try:
await self.checker.test(flow)
except Exception as e:
print(f"[ERROR] ScopeDetection failed: {e}")
addons = [PKCEAddon(),
ScopeAddon()
]

View file

@ -1,4 +1,5 @@
# save as data/report.csv
import os
import csv
from typing import List, Dict, Any
@ -6,6 +7,9 @@ from typing import List, Dict, Any
# file path는 'data/report.csv'로 고정
def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report.csv') -> None:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
"""
Save the report data to a CSV file.