From 5f84d249738fa7aa82e5587a26d3ddb2fef778a1 Mon Sep 17 00:00:00 2001 From: imnyang Date: Sat, 7 Jun 2025 15:12:15 +0900 Subject: [PATCH] =?UTF-8?q?=EB=AD=90=20=EB=8C=80=EC=B6=A9=ED=96=88?= =?UTF-8?q?=EC=96=B4=EC=9A=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 ++ addon/pkce_check.py | 88 ++++++++++++++++++++++++++++++--------------- lib/report.py | 23 ++++++++++++ lib/target.py | 32 +++++++++++++++++ main.py | 22 ++++++++++++ 5 files changed, 138 insertions(+), 29 deletions(-) create mode 100644 lib/report.py create mode 100644 lib/target.py diff --git a/.gitignore b/.gitignore index 505a3b1..7c57f3a 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ wheels/ # Virtual environments .venv + +data/ \ No newline at end of file diff --git a/addon/pkce_check.py b/addon/pkce_check.py index 907b945..580d78e 100644 --- a/addon/pkce_check.py +++ b/addon/pkce_check.py @@ -3,6 +3,12 @@ from urllib.parse import urlparse, parse_qs, urlencode, urlunparse import asyncio import httpx +import csv +import os +from typing import List, Dict, Any + +import lib.target as target +from lib.report import save_report class PKCEDowngradeChecker: required_keys = ["client_id", "response_type", "code_challenge", "code_challenge_method"] @@ -35,11 +41,27 @@ class PKCEDowngradeChecker: challenge_val = query.get("code_challenge", [None])[0] if not method_val or not challenge_val: - self.report("PKCE parameters missing or incomplete.", url, is_openid) + status = "MEDIUM" if is_openid else "LOW" + report_data = [{ + 'target': target.load(), + 'status': status, + 'title': "PKCE Parameters Missing", + 'description': "PKCE parameters are missing or incomplete.", + 'uri': url + }] + save_report(report_data) return if method_val.lower() == "plain": - self.report("PKCE method is set to 'plain'. Possible downgrade.", url, is_openid) + status = "CRITICAL" + report_data = [{ + 'target': target.load(), + 'status': status, + 'title': "PKCE Plain Method", + 'description': "PKCE method is set to 'plain'. Possible downgrade.", + 'uri': url + }] + save_report(report_data) return # Create downgraded request @@ -49,6 +71,13 @@ class PKCEDowngradeChecker: new_query = urlencode(downgraded_query, doseq=True) downgraded_url = urlunparse(parsed._replace(query=new_query)) + + # Ensure downgraded_url is a string, not bytes + if isinstance(downgraded_url, bytes): + downgraded_url = downgraded_url.decode('utf-8') + + # Ensure it's definitely a string + downgraded_url = str(downgraded_url) print(f"[DEBUG] Testing downgraded URL: {downgraded_url}") @@ -81,12 +110,15 @@ class PKCEDowngradeChecker: if orig_success and down_success: # If both redirect and both have code, it's a clear vulnerability if both_redirect and both_have_code: - self.report( - "PKCE downgrade vulnerability detected! Both URLs returned authorization code.", - f"Original: {url}\nDowngraded: {downgraded_url}", - is_openid, - critical=True - ) + status = "CRITICAL" + report_data = [{ + 'target': target.load(), + 'status': status, + 'title': "PKCE Downgrade Vulnerability", + 'description': "PKCE downgrade vulnerability detected! Both URLs returned authorization code.", + 'uri': f"Original: {url}\nDowngraded: {downgraded_url}" + }] + save_report(report_data) # If responses are similar (both redirect to similar pages), it might be vulnerable elif both_redirect: # Check if the redirect locations are similar (excluding PKCE parameters) @@ -94,30 +126,28 @@ class PKCEDowngradeChecker: down_parsed = urlparse(down_loc) if orig_parsed.path == down_parsed.path and orig_parsed.netloc == down_parsed.netloc: - self.report( - "Potential PKCE downgrade vulnerability! Server accepts requests without PKCE.", - f"Original: {url}\nDowngraded: {downgraded_url}", - is_openid, - critical=False - ) - # If downgraded request doesn't fail, it's concerning + status = "MEDIUM" if is_openid else "LOW" + report_data = [{ + 'target': target.load(), + 'status': status, + 'title': "Potential PKCE Downgrade", + 'description': "Potential PKCE downgrade vulnerability! Server accepts requests without PKCE.", + 'uri': f"Original: {url}\nDowngraded: {downgraded_url}" + }] + save_report(report_data) elif down_resp.status_code != 400: # 400 would be expected for missing required parameters - self.report( - "Server accepts OAuth request without PKCE parameters.", - f"Original: {url}\nDowngraded: {downgraded_url}", - is_openid, - critical=False - ) + status = "MEDIUM" if is_openid else "LOW" + report_data = [{ + 'target': target.load(), + 'status': status, + 'title': "PKCE Not Enforced", + 'description': "Server accepts OAuth request without PKCE parameters.", + 'uri': f"Original: {url}\nDowngraded: {downgraded_url}" + }] + save_report(report_data) + else: print(f"[DEBUG] Requests had different success status - likely PKCE is enforced") except Exception as e: print(f"[ERROR] Request failed: {e}") - - def report(self, msg, context_url, is_openid, critical=False): - tag = "[CRITICAL]" if critical else "[WARN]" - flow_type = "OpenID" if is_openid else "OAuth2" - # ANSI color codes for red text - RED = '\033[91m' - RESET = '\033[0m' - print(f"{RED}{tag} {flow_type} - {msg}\n → {context_url}{RESET}\n") diff --git a/lib/report.py b/lib/report.py new file mode 100644 index 0000000..ba8f1d3 --- /dev/null +++ b/lib/report.py @@ -0,0 +1,23 @@ +# save as data/report.csv +import csv +from typing import List, Dict, Any + +# target, status, title, description, uri + +# file path는 'data/report.csv'로 고정 +def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report.csv') -> None: + """ + Save the report data to a CSV file. + + :param report_data: List of dictionaries containing report data. + :param file_path: Path to the CSV file where the report will be saved. + """ + fieldnames = ['target', 'status', 'title', 'description', 'uri'] + + with open(file_path, mode='w', newline='', encoding='utf-8') as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + for row in report_data: + # Replace actual newlines with literal \n strings + escaped_row = {k: str(v).replace('\n', '\\n') if v is not None else v for k, v in row.items()} + writer.writerow(escaped_row) diff --git a/lib/target.py b/lib/target.py new file mode 100644 index 0000000..586d89b --- /dev/null +++ b/lib/target.py @@ -0,0 +1,32 @@ +# save ./data/target.temp file as domain string +import os + +def save(target: str, file_path: str = "./data/target.dump") -> None: + """ + Save the target domain to a temporary file. + + :param target: Target domain to be saved. + """ + # Create directory if it doesn't exist + os.makedirs(os.path.dirname(file_path), exist_ok=True) + + with open(file_path, 'w', encoding='utf-8') as f: + f.write(target) + + print(f"Target saved to {file_path}") # Debug message + +def load(file_path: str = "./data/target.dump") -> str: + """ + Load the target domain from a temporary file. + + :return: Target domain string. + """ + if not os.path.exists(file_path): + print(f"[ERROR] Target file {file_path} does not exist.") + return "" + + with open(file_path, 'r', encoding='utf-8') as f: + target = f.read().strip() + + print(f"Loaded target from {file_path}: {target}") # Debug message + return target diff --git a/main.py b/main.py index 13e1460..c72ab4c 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,31 @@ import sys from mitmproxy.tools.main import mitmdump +from flask import Flask, request +import threading +import lib.target as target def main(): sys.argv = ["mitmdump", "-s", "./addon/init.py", "--listen-port", "11080"] mitmdump() +# get target from browser use web api +app = Flask(__name__) + +@app.route('/start', methods=['GET', 'POST']) +def start(): + target_url = request.args.get('url') + if target_url: + target.save(target_url) + print(f"Target URL set to: {target_url}") + return f"Target URL set to: {target_url}" + return "No URL provided" + +def run_web_server(): + app.run(host='localhost', port=11081, debug=False) + +# Start web server in a separate thread +web_thread = threading.Thread(target=run_web_server, daemon=True) +web_thread.start() + if __name__ == "__main__": main() \ No newline at end of file