diff --git a/addon/ScopeDetection.py b/addon/ScopeDetection.py new file mode 100644 index 0000000..a955aeb --- /dev/null +++ b/addon/ScopeDetection.py @@ -0,0 +1,53 @@ +import lib.target as target +from lib.report import save_report + +class ScopeDetection: + def get_scope_from_query(self, query: str) -> str | None: + if not query: + return None + import urllib.parse + parsed = urllib.parse.parse_qs(query) + scope_values = parsed.get("scope", []) + if scope_values: + return scope_values[0] + return None + + async def check_scope(self, flow): + req = flow.request + res = flow.response + + # req.query가 MultiDictView일 수 있으므로 문자열로 변환 + if hasattr(req.query, "urlencode"): + query = req.query.urlencode() + else: + query = str(req.query) if req.query else "" + + location = res.headers.get("location", "") + + query_scope = self.get_scope_from_query(query) + location_scope = self.get_scope_from_query(location) + + result = [] + if query_scope in ["all", "*"]: + result.append(f"Scope value issue detected in request: {query_scope}") + if location_scope in ["all", "*"]: + result.append(f"Scope value issue detected in response location: {location_scope}") + + return result if result else 0 + + async def test(self, flow): + req = flow.request + method = req.method + url = req.pretty_url + + result = await self.check_scope(flow) + + if result != 0: + report_data = [{ + 'target': target.load(), + 'status': "WARNING", + 'title': "OAuth scope value issue", + 'description': f"{method} {url}: {', '.join(result)}", + 'uri': url + }] + save_report(report_data) diff --git a/addon/init.py b/addon/init.py index 081cb87..e31fe1e 100644 --- a/addon/init.py +++ b/addon/init.py @@ -1,6 +1,7 @@ from mitmproxy import http import asyncio from pkce_check import PKCEDowngradeChecker +from ScopeDetection import ScopeDetection class PKCEAddon: @@ -16,5 +17,23 @@ class PKCEAddon: except Exception as e: print(f"[ERROR] Addon failed: {e}") +class ScopeAddon: + def __init__(self): + self.checker = ScopeDetection() + self._flow_map = {} # 요청 정보를 저장 -addons = [PKCEAddon()] + async def request(self, flow: http.HTTPFlow): + self._flow_map[flow.id] = { + "method": flow.request.method, + "url": flow.request.pretty_url, + "query": flow.request.query, + } + + async def response(self, flow: http.HTTPFlow): + try: + await self.checker.test(flow) + except Exception as e: + print(f"[ERROR] ScopeDetection failed: {e}") + + +addons = [PKCEAddon(), ScopeAddon()] diff --git a/lib/report.py b/lib/report.py index ba8f1d3..e6198a7 100644 --- a/lib/report.py +++ b/lib/report.py @@ -1,4 +1,5 @@ # save as data/report.csv +import os import csv from typing import List, Dict, Any @@ -6,6 +7,9 @@ from typing import List, Dict, Any # file path는 'data/report.csv'로 고정 def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report.csv') -> None: + os.makedirs(os.path.dirname(file_path), exist_ok=True) + + """ Save the report data to a CSV file.