mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 07:41:52 +09:00
181 lines
6.4 KiB
Python
181 lines
6.4 KiB
Python
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
|
|
import asyncio
|
|
import httpx
|
|
from typing import Dict, List
|
|
|
|
import lib.target as target
|
|
from lib.report import save_report
|
|
|
|
|
|
class PKCEDowngradeChecker:
|
|
required_keys = [
|
|
"client_id",
|
|
"response_type",
|
|
"code_challenge",
|
|
"code_challenge_method",
|
|
]
|
|
|
|
async def test(self, flow):
|
|
req = flow.request
|
|
if req.method.upper() != "GET":
|
|
print("[DEBUG] Skipping non-GET request")
|
|
return
|
|
|
|
url = req.pretty_url
|
|
parsed = urlparse(url)
|
|
query = parse_qs(parsed.query)
|
|
|
|
print(f"[DEBUG] Checking URL: {url}")
|
|
print(f"[DEBUG] Query parameters: {list(query.keys())}")
|
|
|
|
if not self.has_required_pkce_keys(query):
|
|
print(f"[DEBUG] Missing required keys: {self.missing_keys(query)}")
|
|
return
|
|
|
|
method_val = query.get("code_challenge_method", [None])[0]
|
|
challenge_val = query.get("code_challenge", [None])[0]
|
|
is_openid = self.is_openid_flow(query, url)
|
|
|
|
if not method_val or not challenge_val:
|
|
await self.report_missing_parameters(url, is_openid)
|
|
return
|
|
|
|
if method_val.lower() == "plain":
|
|
await self.report_plain_method(url)
|
|
return
|
|
|
|
downgraded_url = self.create_downgraded_url(parsed, query)
|
|
await self.compare_responses(url, downgraded_url, req.headers, is_openid)
|
|
|
|
def has_required_pkce_keys(self, query: Dict[str, List[str]]) -> bool:
|
|
return all(k in query for k in self.required_keys)
|
|
|
|
def missing_keys(self, query: Dict[str, List[str]]) -> List[str]:
|
|
return [k for k in self.required_keys if k not in query]
|
|
|
|
def is_openid_flow(self, query: Dict[str, List[str]], url: str) -> bool:
|
|
return "openid" in query.get("scope", [""])[0] or "id_token" in url
|
|
|
|
async def report_missing_parameters(self, url: str, is_openid: bool):
|
|
status = "MEDIUM" if is_openid else "LOW"
|
|
self.save(
|
|
[
|
|
self.make_report(
|
|
status,
|
|
"PKCE Parameters Missing",
|
|
"PKCE parameters are missing or incomplete.",
|
|
url,
|
|
)
|
|
]
|
|
)
|
|
|
|
async def report_plain_method(self, url: str):
|
|
self.save(
|
|
[
|
|
self.make_report(
|
|
"CRITICAL",
|
|
"PKCE Plain Method",
|
|
"PKCE method is set to 'plain'. Possible downgrade.",
|
|
url,
|
|
)
|
|
]
|
|
)
|
|
|
|
def create_downgraded_url(self, parsed, query):
|
|
downgraded_query = query.copy()
|
|
downgraded_query.pop("code_challenge", None)
|
|
downgraded_query.pop("code_challenge_method", None)
|
|
new_query = urlencode(downgraded_query, doseq=True)
|
|
return str(urlunparse(parsed._replace(query=new_query)))
|
|
|
|
async def compare_responses(self, original_url, downgraded_url, headers, is_openid):
|
|
print(f"[DEBUG] Original: {original_url}")
|
|
print(f"[DEBUG] Downgraded: {downgraded_url}")
|
|
|
|
async with httpx.AsyncClient(follow_redirects=False, verify=False) as client:
|
|
try:
|
|
orig_resp = await client.get(original_url, headers=dict(headers))
|
|
down_resp = await client.get(downgraded_url, headers=dict(headers))
|
|
|
|
orig_status = orig_resp.status_code
|
|
down_status = down_resp.status_code
|
|
orig_loc = orig_resp.headers.get("location", "")
|
|
down_loc = down_resp.headers.get("location", "")
|
|
|
|
print(f"[DEBUG] Original status: {orig_status}, location: {orig_loc}")
|
|
print(f"[DEBUG] Downgraded status: {down_status}, location: {down_loc}")
|
|
|
|
if self.both_success(orig_status, down_status):
|
|
await self.analyze_results(
|
|
original_url, downgraded_url, orig_loc, down_loc, is_openid
|
|
)
|
|
else:
|
|
print(
|
|
"[DEBUG] Requests had different success status - likely PKCE is enforced"
|
|
)
|
|
except Exception as e:
|
|
print(f"[ERROR] Request failed: {e}")
|
|
|
|
def both_success(self, orig_status, down_status):
|
|
return orig_status in [200, 301, 302] and down_status in [200, 301, 302]
|
|
|
|
async def analyze_results(
|
|
self, original_url, downgraded_url, orig_loc, down_loc, is_openid
|
|
):
|
|
both_redirect = all(
|
|
code in [301, 302] for code in [orig_loc and 302, down_loc and 302]
|
|
)
|
|
both_have_code = "code=" in orig_loc and "code=" in down_loc
|
|
|
|
if both_redirect and both_have_code:
|
|
status = "CRITICAL"
|
|
title = "PKCE Downgrade Vulnerability"
|
|
description = (
|
|
"Both URLs returned an authorization code without enforcing PKCE."
|
|
)
|
|
elif both_redirect and self.same_redirect_destination(orig_loc, down_loc):
|
|
status = "MEDIUM" if is_openid else "LOW"
|
|
title = "Potential PKCE Downgrade"
|
|
description = (
|
|
"Redirects are similar even without PKCE. Possible vulnerability."
|
|
)
|
|
elif (
|
|
"code=" in down_loc
|
|
or "id_token=" in down_loc
|
|
or "access_token=" in down_loc
|
|
):
|
|
status = "MEDIUM" if is_openid else "LOW"
|
|
title = "PKCE Not Enforced"
|
|
description = "OAuth flow succeeded without PKCE parameters."
|
|
else:
|
|
return # Likely safe
|
|
|
|
self.save(
|
|
[
|
|
self.make_report(
|
|
status,
|
|
title,
|
|
description,
|
|
f"Original: {original_url}\nDowngraded: {downgraded_url}",
|
|
)
|
|
]
|
|
)
|
|
|
|
def same_redirect_destination(self, orig_loc, down_loc):
|
|
orig = urlparse(orig_loc)
|
|
down = urlparse(down_loc)
|
|
return orig.netloc == down.netloc and orig.path == down.path
|
|
|
|
def make_report(
|
|
self, status: str, title: str, description: str, uri: str
|
|
) -> Dict[str, str]:
|
|
return {
|
|
"target": target.load(),
|
|
"status": status,
|
|
"title": title,
|
|
"description": description,
|
|
"uri": uri,
|
|
}
|
|
|
|
def save(self, report_data: List[Dict[str, str]]):
|
|
save_report(report_data)
|