# pkce_check.py from urllib.parse import urlparse, parse_qs, urlencode, urlunparse import asyncio import httpx import csv import os from typing import List, Dict, Any import lib.target as target from lib.report import save_report class PKCEDowngradeChecker: required_keys = ["client_id", "response_type", "code_challenge", "code_challenge_method"] async def test(self, flow): req = flow.request method = req.method url = req.pretty_url print(f"[DEBUG] PKCE check - Method: {method}, URL: {url}") if method.upper() != "GET": print(f"[DEBUG] Skipping non-GET request") return parsed = urlparse(url) query = parse_qs(parsed.query) print(f"[DEBUG] Query parameters: {list(query.keys())}") if not all(k in query for k in self.required_keys): missing_keys = [k for k in self.required_keys if k not in query] print(f"[DEBUG] Missing required keys: {missing_keys}") return print(f"[DEBUG] Found OAuth request with PKCE parameters") is_openid = "openid" in query.get("scope", [""])[0] or "id_token" in url method_val = query.get("code_challenge_method", [None])[0] challenge_val = query.get("code_challenge", [None])[0] if not method_val or not challenge_val: status = "MEDIUM" if is_openid else "LOW" report_data = [{ 'target': target.load(), 'status': status, 'title': "PKCE Parameters Missing", 'description': "PKCE parameters are missing or incomplete.", 'uri': url }] save_report(report_data) return if method_val.lower() == "plain": status = "CRITICAL" report_data = [{ 'target': target.load(), 'status': status, 'title': "PKCE Plain Method", 'description': "PKCE method is set to 'plain'. Possible downgrade.", 'uri': url }] save_report(report_data) return # Create downgraded request downgraded_query = query.copy() downgraded_query.pop("code_challenge", None) downgraded_query.pop("code_challenge_method", None) new_query = urlencode(downgraded_query, doseq=True) downgraded_url = urlunparse(parsed._replace(query=new_query)) # Ensure downgraded_url is a string, not bytes if isinstance(downgraded_url, bytes): downgraded_url = downgraded_url.decode('utf-8') # Ensure it's definitely a string downgraded_url = str(downgraded_url) print(f"[DEBUG] Testing downgraded URL: {downgraded_url}") async with httpx.AsyncClient(follow_redirects=False, verify=False) as client: try: print(f"[DEBUG] Sending original request...") orig_resp = await client.get(url, headers=dict(req.headers)) print(f"[DEBUG] Original response: {orig_resp.status_code}") print(f"[DEBUG] Sending downgraded request...") down_resp = await client.get(downgraded_url, headers=dict(req.headers)) print(f"[DEBUG] Downgraded response: {down_resp.status_code}") orig_loc = orig_resp.headers.get("location", "") down_loc = down_resp.headers.get("location", "") print(f"[DEBUG] Original location: {orig_loc[:100]}..." if len(orig_loc) > 100 else f"[DEBUG] Original location: {orig_loc}") print(f"[DEBUG] Downgraded location: {down_loc[:100]}..." if len(down_loc) > 100 else f"[DEBUG] Downgraded location: {down_loc}") both_redirect = orig_resp.status_code in [301, 302] and down_resp.status_code in [301, 302] both_have_code = "code=" in orig_loc and "code=" in down_loc print(f"[DEBUG] Both redirect: {both_redirect}") print(f"[DEBUG] Both have code: {both_have_code}") # Check if both requests succeeded (either with redirect or direct response) orig_success = orig_resp.status_code in [200, 301, 302] down_success = down_resp.status_code in [200, 301, 302] if orig_success and down_success: # If both redirect and both have code, it's a clear vulnerability if both_redirect and both_have_code: status = "CRITICAL" report_data = [{ 'target': target.load(), 'status': status, 'title': "PKCE Downgrade Vulnerability", 'description': "PKCE downgrade vulnerability detected! Both URLs returned authorization code.", 'uri': f"Original: {url}\nDowngraded: {downgraded_url}" }] save_report(report_data) # If responses are similar (both redirect to similar pages), it might be vulnerable elif both_redirect: # Check if the redirect locations are similar (excluding PKCE parameters) orig_parsed = urlparse(orig_loc) down_parsed = urlparse(down_loc) if orig_parsed.path == down_parsed.path and orig_parsed.netloc == down_parsed.netloc: status = "MEDIUM" if is_openid else "LOW" report_data = [{ 'target': target.load(), 'status': status, 'title': "Potential PKCE Downgrade", 'description': "Potential PKCE downgrade vulnerability! Server accepts requests without PKCE.", 'uri': f"Original: {url}\nDowngraded: {downgraded_url}" }] save_report(report_data) elif down_resp.status_code != 400: # 400 would be expected for missing required parameters status = "MEDIUM" if is_openid else "LOW" report_data = [{ 'target': target.load(), 'status': status, 'title': "PKCE Not Enforced", 'description': "Server accepts OAuth request without PKCE parameters.", 'uri': f"Original: {url}\nDowngraded: {downgraded_url}" }] save_report(report_data) else: print(f"[DEBUG] Requests had different success status - likely PKCE is enforced") except Exception as e: print(f"[ERROR] Request failed: {e}")