mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 05:21:51 +09:00
Refactor code structure for improved readability and maintainability
This commit is contained in:
commit
60af190524
8 changed files with 1129 additions and 0 deletions
123
addon/pkce_check.py
Normal file
123
addon/pkce_check.py
Normal file
|
|
@ -0,0 +1,123 @@
|
|||
# pkce_check.py
|
||||
|
||||
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
|
||||
import asyncio
|
||||
import httpx
|
||||
|
||||
class PKCEDowngradeChecker:
|
||||
required_keys = ["client_id", "response_type", "code_challenge", "code_challenge_method"]
|
||||
|
||||
async def test(self, flow):
|
||||
req = flow.request
|
||||
method = req.method
|
||||
url = req.pretty_url
|
||||
|
||||
print(f"[DEBUG] PKCE check - Method: {method}, URL: {url}")
|
||||
|
||||
if method.upper() != "GET":
|
||||
print(f"[DEBUG] Skipping non-GET request")
|
||||
return
|
||||
|
||||
parsed = urlparse(url)
|
||||
query = parse_qs(parsed.query)
|
||||
|
||||
print(f"[DEBUG] Query parameters: {list(query.keys())}")
|
||||
|
||||
if not all(k in query for k in self.required_keys):
|
||||
missing_keys = [k for k in self.required_keys if k not in query]
|
||||
print(f"[DEBUG] Missing required keys: {missing_keys}")
|
||||
return
|
||||
|
||||
print(f"[DEBUG] Found OAuth request with PKCE parameters")
|
||||
|
||||
is_openid = "openid" in query.get("scope", [""])[0] or "id_token" in url
|
||||
method_val = query.get("code_challenge_method", [None])[0]
|
||||
challenge_val = query.get("code_challenge", [None])[0]
|
||||
|
||||
if not method_val or not challenge_val:
|
||||
self.report("PKCE parameters missing or incomplete.", url, is_openid)
|
||||
return
|
||||
|
||||
if method_val.lower() == "plain":
|
||||
self.report("PKCE method is set to 'plain'. Possible downgrade.", url, is_openid)
|
||||
return
|
||||
|
||||
# Create downgraded request
|
||||
downgraded_query = query.copy()
|
||||
downgraded_query.pop("code_challenge", None)
|
||||
downgraded_query.pop("code_challenge_method", None)
|
||||
|
||||
new_query = urlencode(downgraded_query, doseq=True)
|
||||
downgraded_url = urlunparse(parsed._replace(query=new_query))
|
||||
|
||||
print(f"[DEBUG] Testing downgraded URL: {downgraded_url}")
|
||||
|
||||
async with httpx.AsyncClient(follow_redirects=False, verify=False) as client:
|
||||
try:
|
||||
print(f"[DEBUG] Sending original request...")
|
||||
orig_resp = await client.get(url, headers=dict(req.headers))
|
||||
print(f"[DEBUG] Original response: {orig_resp.status_code}")
|
||||
|
||||
print(f"[DEBUG] Sending downgraded request...")
|
||||
down_resp = await client.get(downgraded_url, headers=dict(req.headers))
|
||||
print(f"[DEBUG] Downgraded response: {down_resp.status_code}")
|
||||
|
||||
orig_loc = orig_resp.headers.get("location", "")
|
||||
down_loc = down_resp.headers.get("location", "")
|
||||
|
||||
print(f"[DEBUG] Original location: {orig_loc[:100]}..." if len(orig_loc) > 100 else f"[DEBUG] Original location: {orig_loc}")
|
||||
print(f"[DEBUG] Downgraded location: {down_loc[:100]}..." if len(down_loc) > 100 else f"[DEBUG] Downgraded location: {down_loc}")
|
||||
|
||||
both_redirect = orig_resp.status_code in [301, 302] and down_resp.status_code in [301, 302]
|
||||
both_have_code = "code=" in orig_loc and "code=" in down_loc
|
||||
|
||||
print(f"[DEBUG] Both redirect: {both_redirect}")
|
||||
print(f"[DEBUG] Both have code: {both_have_code}")
|
||||
|
||||
# Check if both requests succeeded (either with redirect or direct response)
|
||||
orig_success = orig_resp.status_code in [200, 301, 302]
|
||||
down_success = down_resp.status_code in [200, 301, 302]
|
||||
|
||||
if orig_success and down_success:
|
||||
# If both redirect and both have code, it's a clear vulnerability
|
||||
if both_redirect and both_have_code:
|
||||
self.report(
|
||||
"PKCE downgrade vulnerability detected! Both URLs returned authorization code.",
|
||||
f"Original: {url}\nDowngraded: {downgraded_url}",
|
||||
is_openid,
|
||||
critical=True
|
||||
)
|
||||
# If responses are similar (both redirect to similar pages), it might be vulnerable
|
||||
elif both_redirect:
|
||||
# Check if the redirect locations are similar (excluding PKCE parameters)
|
||||
orig_parsed = urlparse(orig_loc)
|
||||
down_parsed = urlparse(down_loc)
|
||||
|
||||
if orig_parsed.path == down_parsed.path and orig_parsed.netloc == down_parsed.netloc:
|
||||
self.report(
|
||||
"Potential PKCE downgrade vulnerability! Server accepts requests without PKCE.",
|
||||
f"Original: {url}\nDowngraded: {downgraded_url}",
|
||||
is_openid,
|
||||
critical=False
|
||||
)
|
||||
# If downgraded request doesn't fail, it's concerning
|
||||
elif down_resp.status_code != 400: # 400 would be expected for missing required parameters
|
||||
self.report(
|
||||
"Server accepts OAuth request without PKCE parameters.",
|
||||
f"Original: {url}\nDowngraded: {downgraded_url}",
|
||||
is_openid,
|
||||
critical=False
|
||||
)
|
||||
else:
|
||||
print(f"[DEBUG] Requests had different success status - likely PKCE is enforced")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Request failed: {e}")
|
||||
|
||||
def report(self, msg, context_url, is_openid, critical=False):
|
||||
tag = "[CRITICAL]" if critical else "[WARN]"
|
||||
flow_type = "OpenID" if is_openid else "OAuth2"
|
||||
# ANSI color codes for red text
|
||||
RED = '\033[91m'
|
||||
RESET = '\033[0m'
|
||||
print(f"{RED}{tag} {flow_type} - {msg}\n → {context_url}{RESET}\n")
|
||||
Loading…
Add table
Add a link
Reference in a new issue