oauth-backend/addon/pkce_check.py
2025-06-26 12:20:41 +09:00

156 lines
5.8 KiB
Python

from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import asyncio
import httpx
from typing import Dict, List
import lib.cur_target_url as cur_target_url
from lib.report_vuln import report_vuln
class PKCEDowngradeChecker:
required_keys = [
"client_id",
"response_type",
"code_challenge",
"code_challenge_method",
]
async def test(self, flow):
req = flow.request
if req.method.upper() != "GET":
print("[DEBUG] Skipping non-GET request")
return
url = req.pretty_url
parsed = urlparse(url)
query = parse_qs(parsed.query)
print(f"[DEBUG] Checking URL: {url}")
print(f"[DEBUG] Query parameters: {list(query.keys())}")
if not self.has_required_pkce_keys(query):
print(f"[DEBUG] Missing required keys: {self.missing_keys(query)}")
return
method_val = query.get("code_challenge_method", [None])[0]
challenge_val = query.get("code_challenge", [None])[0]
is_openid = self.is_openid_flow(query, url)
if not method_val or not challenge_val:
await self.report_missing_parameters(url, is_openid)
return
if method_val.lower() == "plain":
await self.report_plain_method(url)
return
downgraded_url = self.create_downgraded_url(parsed, query)
await self.compare_responses(url, downgraded_url, req.headers, is_openid)
def has_required_pkce_keys(self, query: Dict[str, List[str]]) -> bool:
return all(k in query for k in self.required_keys)
def missing_keys(self, query: Dict[str, List[str]]) -> List[str]:
return [k for k in self.required_keys if k not in query]
def is_openid_flow(self, query: Dict[str, List[str]], url: str) -> bool:
return "openid" in query.get("scope", [""])[0] or "id_token" in url
async def report_missing_parameters(self, url: str, is_openid: bool):
status = "MEDIUM" if is_openid else "LOW"
report_vuln(
title="PKCE Parameters Missing",
desc="PKCE parameters are missing or incomplete.",
status=status,
uri=url,
)
async def report_plain_method(self, url: str):
report_vuln(
title="PKCE Plain Method",
desc="PKCE method is set to 'plain'. Possible downgrade.",
status="CRITICAL",
uri=url,
)
def create_downgraded_url(self, parsed, query):
downgraded_query = query.copy()
downgraded_query.pop("code_challenge", None)
downgraded_query.pop("code_challenge_method", None)
new_query = urlencode(downgraded_query, doseq=True)
return str(urlunparse(parsed._replace(query=new_query)))
async def compare_responses(self, original_url, downgraded_url, headers, is_openid):
print(f"[DEBUG] Original: {original_url}")
print(f"[DEBUG] Downgraded: {downgraded_url}")
async with httpx.AsyncClient(follow_redirects=False, verify=False) as client:
try:
orig_resp = await client.get(original_url, headers=dict(headers))
down_resp = await client.get(downgraded_url, headers=dict(headers))
orig_status = orig_resp.status_code
down_status = down_resp.status_code
orig_loc = orig_resp.headers.get("location", "")
down_loc = down_resp.headers.get("location", "")
print(f"[DEBUG] Original status: {orig_status}, location: {orig_loc}")
print(f"[DEBUG] Downgraded status: {down_status}, location: {down_loc}")
if self.both_success(orig_status, down_status):
await self.analyze_results(
original_url, downgraded_url, orig_loc, down_loc, is_openid
)
else:
print(
"[DEBUG] Requests had different success status - likely PKCE is enforced"
)
except Exception as e:
print(f"[ERROR] Request failed: {e}")
def both_success(self, orig_status, down_status):
return orig_status in [200, 301, 302] and down_status in [200, 301, 302]
async def analyze_results(
self, original_url, downgraded_url, orig_loc, down_loc, is_openid
):
both_redirect = all(
code in [301, 302] for code in [orig_loc and 302, down_loc and 302]
)
both_have_code = "code=" in orig_loc and "code=" in down_loc
if both_redirect and both_have_code:
status = "CRITICAL"
title = "PKCE Downgrade Vulnerability"
description = (
"Both URLs returned an authorization code without enforcing PKCE."
)
elif both_redirect and self.same_redirect_destination(orig_loc, down_loc):
status = "MEDIUM" if is_openid else "LOW"
title = "Potential PKCE Downgrade"
description = (
"Redirects are similar even without PKCE. Possible vulnerability."
)
elif (
"code=" in down_loc
or "id_token=" in down_loc
or "access_token=" in down_loc
):
status = "MEDIUM" if is_openid else "LOW"
title = "PKCE Not Enforced"
description = "OAuth flow succeeded without PKCE parameters."
else:
return # Likely safe
report_vuln(
title=title,
desc=description,
status=status,
uri=f"Original: {original_url}\nDowngraded: {downgraded_url}",
)
def same_redirect_destination(self, orig_loc, down_loc):
orig = urlparse(orig_loc)
down = urlparse(down_loc)
return orig.netloc == down.netloc and orig.path == down.path