from urllib.parse import urlparse, parse_qs, urlencode, urlunparse import asyncio import httpx from typing import Dict, List import lib.target as target from lib.report import save_report class PKCEDowngradeChecker: required_keys = [ "client_id", "response_type", "code_challenge", "code_challenge_method", ] async def test(self, flow): req = flow.request if req.method.upper() != "GET": print("[DEBUG] Skipping non-GET request") return url = req.pretty_url parsed = urlparse(url) query = parse_qs(parsed.query) print(f"[DEBUG] Checking URL: {url}") print(f"[DEBUG] Query parameters: {list(query.keys())}") if not self.has_required_pkce_keys(query): print(f"[DEBUG] Missing required keys: {self.missing_keys(query)}") return method_val = query.get("code_challenge_method", [None])[0] challenge_val = query.get("code_challenge", [None])[0] is_openid = self.is_openid_flow(query, url) if not method_val or not challenge_val: await self.report_missing_parameters(url, is_openid) return if method_val.lower() == "plain": await self.report_plain_method(url) return downgraded_url = self.create_downgraded_url(parsed, query) await self.compare_responses(url, downgraded_url, req.headers, is_openid) def has_required_pkce_keys(self, query: Dict[str, List[str]]) -> bool: return all(k in query for k in self.required_keys) def missing_keys(self, query: Dict[str, List[str]]) -> List[str]: return [k for k in self.required_keys if k not in query] def is_openid_flow(self, query: Dict[str, List[str]], url: str) -> bool: return "openid" in query.get("scope", [""])[0] or "id_token" in url async def report_missing_parameters(self, url: str, is_openid: bool): status = "MEDIUM" if is_openid else "LOW" self.save( [ self.make_report( status, "PKCE Parameters Missing", "PKCE parameters are missing or incomplete.", url, ) ] ) async def report_plain_method(self, url: str): self.save( [ self.make_report( "CRITICAL", "PKCE Plain Method", "PKCE method is set to 'plain'. Possible downgrade.", url, ) ] ) def create_downgraded_url(self, parsed, query): downgraded_query = query.copy() downgraded_query.pop("code_challenge", None) downgraded_query.pop("code_challenge_method", None) new_query = urlencode(downgraded_query, doseq=True) return str(urlunparse(parsed._replace(query=new_query))) async def compare_responses(self, original_url, downgraded_url, headers, is_openid): print(f"[DEBUG] Original: {original_url}") print(f"[DEBUG] Downgraded: {downgraded_url}") async with httpx.AsyncClient(follow_redirects=False, verify=False) as client: try: orig_resp = await client.get(original_url, headers=dict(headers)) down_resp = await client.get(downgraded_url, headers=dict(headers)) orig_status = orig_resp.status_code down_status = down_resp.status_code orig_loc = orig_resp.headers.get("location", "") down_loc = down_resp.headers.get("location", "") print(f"[DEBUG] Original status: {orig_status}, location: {orig_loc}") print(f"[DEBUG] Downgraded status: {down_status}, location: {down_loc}") if self.both_success(orig_status, down_status): await self.analyze_results( original_url, downgraded_url, orig_loc, down_loc, is_openid ) else: print( "[DEBUG] Requests had different success status - likely PKCE is enforced" ) except Exception as e: print(f"[ERROR] Request failed: {e}") def both_success(self, orig_status, down_status): return orig_status in [200, 301, 302] and down_status in [200, 301, 302] async def analyze_results( self, original_url, downgraded_url, orig_loc, down_loc, is_openid ): both_redirect = all( code in [301, 302] for code in [orig_loc and 302, down_loc and 302] ) both_have_code = "code=" in orig_loc and "code=" in down_loc if both_redirect and both_have_code: status = "CRITICAL" title = "PKCE Downgrade Vulnerability" description = ( "Both URLs returned an authorization code without enforcing PKCE." ) elif both_redirect and self.same_redirect_destination(orig_loc, down_loc): status = "MEDIUM" if is_openid else "LOW" title = "Potential PKCE Downgrade" description = ( "Redirects are similar even without PKCE. Possible vulnerability." ) elif ( "code=" in down_loc or "id_token=" in down_loc or "access_token=" in down_loc ): status = "MEDIUM" if is_openid else "LOW" title = "PKCE Not Enforced" description = "OAuth flow succeeded without PKCE parameters." else: return # Likely safe self.save( [ self.make_report( status, title, description, f"Original: {original_url}\nDowngraded: {downgraded_url}", ) ] ) def same_redirect_destination(self, orig_loc, down_loc): orig = urlparse(orig_loc) down = urlparse(down_loc) return orig.netloc == down.netloc and orig.path == down.path def make_report( self, status: str, title: str, description: str, uri: str ) -> Dict[str, str]: return { "target": target.load(), "status": status, "title": title, "description": description, "uri": uri, } def save(self, report_data: List[Dict[str, str]]): save_report(report_data)