from urllib.parse import urlparse, parse_qs, urlencode, urlunparse import httpx from typing import Dict, List from lib.report_vuln import report_vuln class PKCEDowngradeChecker: required_keys = [ "client_id", "response_type", "code_challenge", "code_challenge_method", ] async def test(self, flow): req = flow.request if req.method.upper() != "GET": print("[DEBUG] Skipping non-GET request") return url = req.pretty_url parsed = urlparse(url) query = parse_qs(parsed.query) print(f"[DEBUG] Checking URL: {url}") print(f"[DEBUG] Query parameters: {list(query.keys())}") if not self.has_required_pkce_keys(query): print(f"[DEBUG] Missing required keys: {self.missing_keys(query)}") return method_val = query.get("code_challenge_method", [None])[0] challenge_val = query.get("code_challenge", [None])[0] is_openid = self.is_openid_flow(query, url) if not method_val or not challenge_val: await self.report_missing_parameters(url, is_openid) return if method_val.lower() == "plain": await self.report_plain_method(url) return downgraded_url = self.create_downgraded_url(parsed, query) await self.compare_responses(url, downgraded_url, req.headers, is_openid) def has_required_pkce_keys(self, query: Dict[str, List[str]]) -> bool: return all(k in query for k in self.required_keys) def missing_keys(self, query: Dict[str, List[str]]) -> List[str]: return [k for k in self.required_keys if k not in query] def is_openid_flow(self, query: Dict[str, List[str]], url: str) -> bool: return "openid" in query.get("scope", [""])[0] or "id_token" in url async def report_missing_parameters(self, url: str, is_openid: bool): status = "MEDIUM" if is_openid else "LOW" report_vuln( title="PKCE Parameters Missing", desc="PKCE parameters are missing or incomplete.", status=status, uri=url, ) async def report_plain_method(self, url: str): report_vuln( title="PKCE Plain Method", desc="PKCE method is set to 'plain'. Possible downgrade.", status="CRITICAL", uri=url, ) def create_downgraded_url(self, parsed, query): downgraded_query = query.copy() downgraded_query.pop("code_challenge", None) downgraded_query.pop("code_challenge_method", None) new_query = urlencode(downgraded_query, doseq=True) return str(urlunparse(parsed._replace(query=new_query))) async def compare_responses(self, original_url, downgraded_url, headers, is_openid): print(f"[DEBUG] Original: {original_url}") print(f"[DEBUG] Downgraded: {downgraded_url}") async with httpx.AsyncClient(follow_redirects=False, verify=False) as client: try: orig_resp = await client.get(original_url, headers=dict(headers)) down_resp = await client.get(downgraded_url, headers=dict(headers)) orig_status = orig_resp.status_code down_status = down_resp.status_code orig_loc = orig_resp.headers.get("location", "") down_loc = down_resp.headers.get("location", "") print(f"[DEBUG] Original status: {orig_status}, location: {orig_loc}") print(f"[DEBUG] Downgraded status: {down_status}, location: {down_loc}") if self.both_success(orig_status, down_status): await self.analyze_results( original_url, downgraded_url, orig_loc, down_loc, is_openid ) else: print( "[DEBUG] Requests had different success status - likely PKCE is enforced" ) except Exception as e: print(f"[ERROR] Request failed: {e}") def both_success(self, orig_status, down_status): return orig_status in [200, 301, 302] and down_status in [200, 301, 302] async def analyze_results( self, original_url, downgraded_url, orig_loc, down_loc, is_openid ): both_redirect = all( code in [301, 302] for code in [orig_loc and 302, down_loc and 302] ) both_have_code = "code=" in orig_loc and "code=" in down_loc if both_redirect and both_have_code: status = "CRITICAL" title = "PKCE Downgrade Vulnerability" description = ( "Both URLs returned an authorization code without enforcing PKCE." ) elif both_redirect and self.same_redirect_destination(orig_loc, down_loc): status = "MEDIUM" if is_openid else "LOW" title = "Potential PKCE Downgrade" description = ( "Redirects are similar even without PKCE. Possible vulnerability." ) elif ( "code=" in down_loc or "id_token=" in down_loc or "access_token=" in down_loc ): status = "MEDIUM" if is_openid else "LOW" title = "PKCE Not Enforced" description = "OAuth flow succeeded without PKCE parameters." else: return # Likely safe report_vuln( title=title, desc=description, status=status, uri=f"Original: {original_url}\nDowngraded: {downgraded_url}", ) def same_redirect_destination(self, orig_loc, down_loc): orig = urlparse(orig_loc) down = urlparse(down_loc) return orig.netloc == down.netloc and orig.path == down.path