diff --git a/addon/ScopeDetection.py b/addon/ScopeDetection.py new file mode 100644 index 0000000..a955aeb --- /dev/null +++ b/addon/ScopeDetection.py @@ -0,0 +1,53 @@ +import lib.target as target +from lib.report import save_report + +class ScopeDetection: + def get_scope_from_query(self, query: str) -> str | None: + if not query: + return None + import urllib.parse + parsed = urllib.parse.parse_qs(query) + scope_values = parsed.get("scope", []) + if scope_values: + return scope_values[0] + return None + + async def check_scope(self, flow): + req = flow.request + res = flow.response + + # req.query가 MultiDictView일 수 있으므로 문자열로 변환 + if hasattr(req.query, "urlencode"): + query = req.query.urlencode() + else: + query = str(req.query) if req.query else "" + + location = res.headers.get("location", "") + + query_scope = self.get_scope_from_query(query) + location_scope = self.get_scope_from_query(location) + + result = [] + if query_scope in ["all", "*"]: + result.append(f"Scope value issue detected in request: {query_scope}") + if location_scope in ["all", "*"]: + result.append(f"Scope value issue detected in response location: {location_scope}") + + return result if result else 0 + + async def test(self, flow): + req = flow.request + method = req.method + url = req.pretty_url + + result = await self.check_scope(flow) + + if result != 0: + report_data = [{ + 'target': target.load(), + 'status': "WARNING", + 'title': "OAuth scope value issue", + 'description': f"{method} {url}: {', '.join(result)}", + 'uri': url + }] + save_report(report_data) diff --git a/addon/init.py b/addon/init.py index 25bebeb..6b93810 100644 --- a/addon/init.py +++ b/addon/init.py @@ -1,6 +1,7 @@ from mitmproxy import http import asyncio from pkce_check import PKCEDowngradeChecker +from ScopeDetection import ScopeDetection class PKCEAddon: def __init__(self): @@ -13,4 +14,25 @@ class PKCEAddon: except Exception as e: print(f"[ERROR] Addon failed: {e}") -addons = [PKCEAddon()] +class ScopeAddon: + def __init__(self): + self.checker = ScopeDetection() + self._flow_map = {} # 요청 정보를 저장 + + async def request(self, flow: http.HTTPFlow): + self._flow_map[flow.id] = { + "method": flow.request.method, + "url": flow.request.pretty_url, + "query": flow.request.query, + } + + async def response(self, flow: http.HTTPFlow): + try: + await self.checker.test(flow) + except Exception as e: + print(f"[ERROR] ScopeDetection failed: {e}") + + +addons = [PKCEAddon(), + ScopeAddon() +]