From e9c59ae6f69f8cf43ad8870970c0be2ca179fe04 Mon Sep 17 00:00:00 2001 From: seungyeoncherry Date: Sat, 7 Jun 2025 22:25:51 +0900 Subject: [PATCH 1/4] =?UTF-8?q?=EC=88=98=EC=A0=95/test=20=EC=99=84?= =?UTF-8?q?=EB=A3=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- addon/ScopeDetection.py | 53 +++++++++++++++++++++++++++++++++++++++++ addon/init.py | 24 ++++++++++++++++++- 2 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 addon/ScopeDetection.py diff --git a/addon/ScopeDetection.py b/addon/ScopeDetection.py new file mode 100644 index 0000000..a955aeb --- /dev/null +++ b/addon/ScopeDetection.py @@ -0,0 +1,53 @@ +import lib.target as target +from lib.report import save_report + +class ScopeDetection: + def get_scope_from_query(self, query: str) -> str | None: + if not query: + return None + import urllib.parse + parsed = urllib.parse.parse_qs(query) + scope_values = parsed.get("scope", []) + if scope_values: + return scope_values[0] + return None + + async def check_scope(self, flow): + req = flow.request + res = flow.response + + # req.query가 MultiDictView일 수 있으므로 문자열로 변환 + if hasattr(req.query, "urlencode"): + query = req.query.urlencode() + else: + query = str(req.query) if req.query else "" + + location = res.headers.get("location", "") + + query_scope = self.get_scope_from_query(query) + location_scope = self.get_scope_from_query(location) + + result = [] + if query_scope in ["all", "*"]: + result.append(f"Scope value issue detected in request: {query_scope}") + if location_scope in ["all", "*"]: + result.append(f"Scope value issue detected in response location: {location_scope}") + + return result if result else 0 + + async def test(self, flow): + req = flow.request + method = req.method + url = req.pretty_url + + result = await self.check_scope(flow) + + if result != 0: + report_data = [{ + 'target': target.load(), + 'status': "WARNING", + 'title': "OAuth scope value issue", + 'description': f"{method} {url}: {', '.join(result)}", + 'uri': url + }] + save_report(report_data) diff --git a/addon/init.py b/addon/init.py index 25bebeb..6b93810 100644 --- a/addon/init.py +++ b/addon/init.py @@ -1,6 +1,7 @@ from mitmproxy import http import asyncio from pkce_check import PKCEDowngradeChecker +from ScopeDetection import ScopeDetection class PKCEAddon: def __init__(self): @@ -13,4 +14,25 @@ class PKCEAddon: except Exception as e: print(f"[ERROR] Addon failed: {e}") -addons = [PKCEAddon()] +class ScopeAddon: + def __init__(self): + self.checker = ScopeDetection() + self._flow_map = {} # 요청 정보를 저장 + + async def request(self, flow: http.HTTPFlow): + self._flow_map[flow.id] = { + "method": flow.request.method, + "url": flow.request.pretty_url, + "query": flow.request.query, + } + + async def response(self, flow: http.HTTPFlow): + try: + await self.checker.test(flow) + except Exception as e: + print(f"[ERROR] ScopeDetection failed: {e}") + + +addons = [PKCEAddon(), + ScopeAddon() +] From 526ab3c3f6437810ff851cf9569288fd7d195cf2 Mon Sep 17 00:00:00 2001 From: seungyeoncherry Date: Sat, 7 Jun 2025 22:34:11 +0900 Subject: [PATCH 2/4] =?UTF-8?q?=EC=88=98=EC=A0=95/test=20=EC=99=84?= =?UTF-8?q?=EB=A3=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/report.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/report.py b/lib/report.py index ba8f1d3..48d010f 100644 --- a/lib/report.py +++ b/lib/report.py @@ -1,4 +1,5 @@ # save as data/report.csv +import os import csv from typing import List, Dict, Any @@ -6,6 +7,8 @@ from typing import List, Dict, Any # file path는 'data/report.csv'로 고정 def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report.csv') -> None: + os.makedirs(os.path.dirname(file_path), exist_ok=True) + """ Save the report data to a CSV file. From 9882a0586bbefd75ef0aead107b45de4ab7f1719 Mon Sep 17 00:00:00 2001 From: seungyeoncherry Date: Sat, 7 Jun 2025 22:36:57 +0900 Subject: [PATCH 3/4] =?UTF-8?q?=EC=88=98=EC=A0=95/test=20=EC=99=84?= =?UTF-8?q?=EB=A3=8C1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- addon/ScopeDetection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/addon/ScopeDetection.py b/addon/ScopeDetection.py index a955aeb..21b76a6 100644 --- a/addon/ScopeDetection.py +++ b/addon/ScopeDetection.py @@ -45,7 +45,7 @@ class ScopeDetection: if result != 0: report_data = [{ 'target': target.load(), - 'status': "WARNING", + 'status': "WARNING!", 'title': "OAuth scope value issue", 'description': f"{method} {url}: {', '.join(result)}", 'uri': url From d883441defcf4e249af93c4067af6ee13e700cd6 Mon Sep 17 00:00:00 2001 From: seungyeoncherry Date: Sat, 7 Jun 2025 22:49:55 +0900 Subject: [PATCH 4/4] =?UTF-8?q?=EC=BD=94=EB=93=9C=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- addon/ScopeDetection.py | 2 +- lib/report.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/addon/ScopeDetection.py b/addon/ScopeDetection.py index 21b76a6..a955aeb 100644 --- a/addon/ScopeDetection.py +++ b/addon/ScopeDetection.py @@ -45,7 +45,7 @@ class ScopeDetection: if result != 0: report_data = [{ 'target': target.load(), - 'status': "WARNING!", + 'status': "WARNING", 'title': "OAuth scope value issue", 'description': f"{method} {url}: {', '.join(result)}", 'uri': url diff --git a/lib/report.py b/lib/report.py index 48d010f..e6198a7 100644 --- a/lib/report.py +++ b/lib/report.py @@ -8,6 +8,7 @@ from typing import List, Dict, Any # file path는 'data/report.csv'로 고정 def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report.csv') -> None: os.makedirs(os.path.dirname(file_path), exist_ok=True) + """ Save the report data to a CSV file.