리팩토링

This commit is contained in:
imnyang 2025-06-07 19:15:21 +09:00
commit 0d09f191c5
7 changed files with 559 additions and 141 deletions

View file

@ -1,153 +1,181 @@
# pkce_check.py
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import asyncio
import httpx
import csv
import os
from typing import List, Dict, Any
from typing import Dict, List
import lib.target as target
from lib.report import save_report
class PKCEDowngradeChecker:
required_keys = ["client_id", "response_type", "code_challenge", "code_challenge_method"]
required_keys = [
"client_id",
"response_type",
"code_challenge",
"code_challenge_method",
]
async def test(self, flow):
req = flow.request
method = req.method
url = req.pretty_url
print(f"[DEBUG] PKCE check - Method: {method}, URL: {url}")
if method.upper() != "GET":
print(f"[DEBUG] Skipping non-GET request")
if req.method.upper() != "GET":
print("[DEBUG] Skipping non-GET request")
return
url = req.pretty_url
parsed = urlparse(url)
query = parse_qs(parsed.query)
print(f"[DEBUG] Checking URL: {url}")
print(f"[DEBUG] Query parameters: {list(query.keys())}")
if not all(k in query for k in self.required_keys):
missing_keys = [k for k in self.required_keys if k not in query]
print(f"[DEBUG] Missing required keys: {missing_keys}")
if not self.has_required_pkce_keys(query):
print(f"[DEBUG] Missing required keys: {self.missing_keys(query)}")
return
print(f"[DEBUG] Found OAuth request with PKCE parameters")
is_openid = "openid" in query.get("scope", [""])[0] or "id_token" in url
method_val = query.get("code_challenge_method", [None])[0]
challenge_val = query.get("code_challenge", [None])[0]
is_openid = self.is_openid_flow(query, url)
if not method_val or not challenge_val:
status = "MEDIUM" if is_openid else "LOW"
report_data = [{
'target': target.load(),
'status': status,
'title': "PKCE Parameters Missing",
'description': "PKCE parameters are missing or incomplete.",
'uri': url
}]
save_report(report_data)
await self.report_missing_parameters(url, is_openid)
return
if method_val.lower() == "plain":
status = "CRITICAL"
report_data = [{
'target': target.load(),
'status': status,
'title': "PKCE Plain Method",
'description': "PKCE method is set to 'plain'. Possible downgrade.",
'uri': url
}]
save_report(report_data)
await self.report_plain_method(url)
return
# Create downgraded request
downgraded_url = self.create_downgraded_url(parsed, query)
await self.compare_responses(url, downgraded_url, req.headers, is_openid)
def has_required_pkce_keys(self, query: Dict[str, List[str]]) -> bool:
return all(k in query for k in self.required_keys)
def missing_keys(self, query: Dict[str, List[str]]) -> List[str]:
return [k for k in self.required_keys if k not in query]
def is_openid_flow(self, query: Dict[str, List[str]], url: str) -> bool:
return "openid" in query.get("scope", [""])[0] or "id_token" in url
async def report_missing_parameters(self, url: str, is_openid: bool):
status = "MEDIUM" if is_openid else "LOW"
self.save(
[
self.make_report(
status,
"PKCE Parameters Missing",
"PKCE parameters are missing or incomplete.",
url,
)
]
)
async def report_plain_method(self, url: str):
self.save(
[
self.make_report(
"CRITICAL",
"PKCE Plain Method",
"PKCE method is set to 'plain'. Possible downgrade.",
url,
)
]
)
def create_downgraded_url(self, parsed, query):
downgraded_query = query.copy()
downgraded_query.pop("code_challenge", None)
downgraded_query.pop("code_challenge_method", None)
new_query = urlencode(downgraded_query, doseq=True)
downgraded_url = urlunparse(parsed._replace(query=new_query))
# Ensure downgraded_url is a string, not bytes
if isinstance(downgraded_url, bytes):
downgraded_url = downgraded_url.decode('utf-8')
# Ensure it's definitely a string
downgraded_url = str(downgraded_url)
return str(urlunparse(parsed._replace(query=new_query)))
async def compare_responses(self, original_url, downgraded_url, headers, is_openid):
print(f"[DEBUG] Original: {original_url}")
print(f"[DEBUG] Downgraded: {downgraded_url}")
print(f"[DEBUG] Testing downgraded URL: {downgraded_url}")
async with httpx.AsyncClient(follow_redirects=False, verify=False) as client:
try:
print(f"[DEBUG] Sending original request...")
orig_resp = await client.get(url, headers=dict(req.headers))
print(f"[DEBUG] Original response: {orig_resp.status_code}")
print(f"[DEBUG] Sending downgraded request...")
down_resp = await client.get(downgraded_url, headers=dict(req.headers))
print(f"[DEBUG] Downgraded response: {down_resp.status_code}")
orig_resp = await client.get(original_url, headers=dict(headers))
down_resp = await client.get(downgraded_url, headers=dict(headers))
orig_status = orig_resp.status_code
down_status = down_resp.status_code
orig_loc = orig_resp.headers.get("location", "")
down_loc = down_resp.headers.get("location", "")
print(f"[DEBUG] Original location: {orig_loc[:100]}..." if len(orig_loc) > 100 else f"[DEBUG] Original location: {orig_loc}")
print(f"[DEBUG] Downgraded location: {down_loc[:100]}..." if len(down_loc) > 100 else f"[DEBUG] Downgraded location: {down_loc}")
both_redirect = orig_resp.status_code in [301, 302] and down_resp.status_code in [301, 302]
both_have_code = "code=" in orig_loc and "code=" in down_loc
print(f"[DEBUG] Both redirect: {both_redirect}")
print(f"[DEBUG] Both have code: {both_have_code}")
# Check if both requests succeeded (either with redirect or direct response)
orig_success = orig_resp.status_code in [200, 301, 302]
down_success = down_resp.status_code in [200, 301, 302]
if orig_success and down_success:
# If both redirect and both have code, it's a clear vulnerability
if both_redirect and both_have_code:
status = "CRITICAL"
report_data = [{
'target': target.load(),
'status': status,
'title': "PKCE Downgrade Vulnerability",
'description': "PKCE downgrade vulnerability detected! Both URLs returned authorization code.",
'uri': f"Original: {url}\nDowngraded: {downgraded_url}"
}]
save_report(report_data)
# If responses are similar (both redirect to similar pages), it might be vulnerable
elif both_redirect:
# Check if the redirect locations are similar (excluding PKCE parameters)
orig_parsed = urlparse(orig_loc)
down_parsed = urlparse(down_loc)
if orig_parsed.path == down_parsed.path and orig_parsed.netloc == down_parsed.netloc:
status = "MEDIUM" if is_openid else "LOW"
report_data = [{
'target': target.load(),
'status': status,
'title': "Potential PKCE Downgrade",
'description': "Potential PKCE downgrade vulnerability! Server accepts requests without PKCE.",
'uri': f"Original: {url}\nDowngraded: {downgraded_url}"
}]
save_report(report_data)
elif down_resp.status_code != 400: # 400 would be expected for missing required parameters
status = "MEDIUM" if is_openid else "LOW"
report_data = [{
'target': target.load(),
'status': status,
'title': "PKCE Not Enforced",
'description': "Server accepts OAuth request without PKCE parameters.",
'uri': f"Original: {url}\nDowngraded: {downgraded_url}"
}]
save_report(report_data)
print(f"[DEBUG] Original status: {orig_status}, location: {orig_loc}")
print(f"[DEBUG] Downgraded status: {down_status}, location: {down_loc}")
if self.both_success(orig_status, down_status):
await self.analyze_results(
original_url, downgraded_url, orig_loc, down_loc, is_openid
)
else:
print(f"[DEBUG] Requests had different success status - likely PKCE is enforced")
print(
"[DEBUG] Requests had different success status - likely PKCE is enforced"
)
except Exception as e:
print(f"[ERROR] Request failed: {e}")
def both_success(self, orig_status, down_status):
return orig_status in [200, 301, 302] and down_status in [200, 301, 302]
async def analyze_results(
self, original_url, downgraded_url, orig_loc, down_loc, is_openid
):
both_redirect = all(
code in [301, 302] for code in [orig_loc and 302, down_loc and 302]
)
both_have_code = "code=" in orig_loc and "code=" in down_loc
if both_redirect and both_have_code:
status = "CRITICAL"
title = "PKCE Downgrade Vulnerability"
description = (
"Both URLs returned an authorization code without enforcing PKCE."
)
elif both_redirect and self.same_redirect_destination(orig_loc, down_loc):
status = "MEDIUM" if is_openid else "LOW"
title = "Potential PKCE Downgrade"
description = (
"Redirects are similar even without PKCE. Possible vulnerability."
)
elif (
"code=" in down_loc
or "id_token=" in down_loc
or "access_token=" in down_loc
):
status = "MEDIUM" if is_openid else "LOW"
title = "PKCE Not Enforced"
description = "OAuth flow succeeded without PKCE parameters."
else:
return # Likely safe
self.save(
[
self.make_report(
status,
title,
description,
f"Original: {original_url}\nDowngraded: {downgraded_url}",
)
]
)
def same_redirect_destination(self, orig_loc, down_loc):
orig = urlparse(orig_loc)
down = urlparse(down_loc)
return orig.netloc == down.netloc and orig.path == down.path
def make_report(
self, status: str, title: str, description: str, uri: str
) -> Dict[str, str]:
return {
"target": target.load(),
"status": status,
"title": title,
"description": description,
"uri": uri,
}
def save(self, report_data: List[Dict[str, str]]):
save_report(report_data)