oauth-backend/addon/pkce_check.py
2025-06-07 15:12:15 +09:00

153 lines
6.9 KiB
Python

# pkce_check.py
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import asyncio
import httpx
import csv
import os
from typing import List, Dict, Any
import lib.target as target
from lib.report import save_report
class PKCEDowngradeChecker:
required_keys = ["client_id", "response_type", "code_challenge", "code_challenge_method"]
async def test(self, flow):
req = flow.request
method = req.method
url = req.pretty_url
print(f"[DEBUG] PKCE check - Method: {method}, URL: {url}")
if method.upper() != "GET":
print(f"[DEBUG] Skipping non-GET request")
return
parsed = urlparse(url)
query = parse_qs(parsed.query)
print(f"[DEBUG] Query parameters: {list(query.keys())}")
if not all(k in query for k in self.required_keys):
missing_keys = [k for k in self.required_keys if k not in query]
print(f"[DEBUG] Missing required keys: {missing_keys}")
return
print(f"[DEBUG] Found OAuth request with PKCE parameters")
is_openid = "openid" in query.get("scope", [""])[0] or "id_token" in url
method_val = query.get("code_challenge_method", [None])[0]
challenge_val = query.get("code_challenge", [None])[0]
if not method_val or not challenge_val:
status = "MEDIUM" if is_openid else "LOW"
report_data = [{
'target': target.load(),
'status': status,
'title': "PKCE Parameters Missing",
'description': "PKCE parameters are missing or incomplete.",
'uri': url
}]
save_report(report_data)
return
if method_val.lower() == "plain":
status = "CRITICAL"
report_data = [{
'target': target.load(),
'status': status,
'title': "PKCE Plain Method",
'description': "PKCE method is set to 'plain'. Possible downgrade.",
'uri': url
}]
save_report(report_data)
return
# Create downgraded request
downgraded_query = query.copy()
downgraded_query.pop("code_challenge", None)
downgraded_query.pop("code_challenge_method", None)
new_query = urlencode(downgraded_query, doseq=True)
downgraded_url = urlunparse(parsed._replace(query=new_query))
# Ensure downgraded_url is a string, not bytes
if isinstance(downgraded_url, bytes):
downgraded_url = downgraded_url.decode('utf-8')
# Ensure it's definitely a string
downgraded_url = str(downgraded_url)
print(f"[DEBUG] Testing downgraded URL: {downgraded_url}")
async with httpx.AsyncClient(follow_redirects=False, verify=False) as client:
try:
print(f"[DEBUG] Sending original request...")
orig_resp = await client.get(url, headers=dict(req.headers))
print(f"[DEBUG] Original response: {orig_resp.status_code}")
print(f"[DEBUG] Sending downgraded request...")
down_resp = await client.get(downgraded_url, headers=dict(req.headers))
print(f"[DEBUG] Downgraded response: {down_resp.status_code}")
orig_loc = orig_resp.headers.get("location", "")
down_loc = down_resp.headers.get("location", "")
print(f"[DEBUG] Original location: {orig_loc[:100]}..." if len(orig_loc) > 100 else f"[DEBUG] Original location: {orig_loc}")
print(f"[DEBUG] Downgraded location: {down_loc[:100]}..." if len(down_loc) > 100 else f"[DEBUG] Downgraded location: {down_loc}")
both_redirect = orig_resp.status_code in [301, 302] and down_resp.status_code in [301, 302]
both_have_code = "code=" in orig_loc and "code=" in down_loc
print(f"[DEBUG] Both redirect: {both_redirect}")
print(f"[DEBUG] Both have code: {both_have_code}")
# Check if both requests succeeded (either with redirect or direct response)
orig_success = orig_resp.status_code in [200, 301, 302]
down_success = down_resp.status_code in [200, 301, 302]
if orig_success and down_success:
# If both redirect and both have code, it's a clear vulnerability
if both_redirect and both_have_code:
status = "CRITICAL"
report_data = [{
'target': target.load(),
'status': status,
'title': "PKCE Downgrade Vulnerability",
'description': "PKCE downgrade vulnerability detected! Both URLs returned authorization code.",
'uri': f"Original: {url}\nDowngraded: {downgraded_url}"
}]
save_report(report_data)
# If responses are similar (both redirect to similar pages), it might be vulnerable
elif both_redirect:
# Check if the redirect locations are similar (excluding PKCE parameters)
orig_parsed = urlparse(orig_loc)
down_parsed = urlparse(down_loc)
if orig_parsed.path == down_parsed.path and orig_parsed.netloc == down_parsed.netloc:
status = "MEDIUM" if is_openid else "LOW"
report_data = [{
'target': target.load(),
'status': status,
'title': "Potential PKCE Downgrade",
'description': "Potential PKCE downgrade vulnerability! Server accepts requests without PKCE.",
'uri': f"Original: {url}\nDowngraded: {downgraded_url}"
}]
save_report(report_data)
elif down_resp.status_code != 400: # 400 would be expected for missing required parameters
status = "MEDIUM" if is_openid else "LOW"
report_data = [{
'target': target.load(),
'status': status,
'title': "PKCE Not Enforced",
'description': "Server accepts OAuth request without PKCE parameters.",
'uri': f"Original: {url}\nDowngraded: {downgraded_url}"
}]
save_report(report_data)
else:
print(f"[DEBUG] Requests had different success status - likely PKCE is enforced")
except Exception as e:
print(f"[ERROR] Request failed: {e}")