mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 06:41:52 +09:00
commit
4da37af05d
5 changed files with 138 additions and 29 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -8,3 +8,5 @@ wheels/
|
|||
|
||||
# Virtual environments
|
||||
.venv
|
||||
|
||||
data/
|
||||
|
|
@ -3,6 +3,12 @@
|
|||
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
|
||||
import asyncio
|
||||
import httpx
|
||||
import csv
|
||||
import os
|
||||
from typing import List, Dict, Any
|
||||
|
||||
import lib.target as target
|
||||
from lib.report import save_report
|
||||
|
||||
class PKCEDowngradeChecker:
|
||||
required_keys = ["client_id", "response_type", "code_challenge", "code_challenge_method"]
|
||||
|
|
@ -35,11 +41,27 @@ class PKCEDowngradeChecker:
|
|||
challenge_val = query.get("code_challenge", [None])[0]
|
||||
|
||||
if not method_val or not challenge_val:
|
||||
self.report("PKCE parameters missing or incomplete.", url, is_openid)
|
||||
status = "MEDIUM" if is_openid else "LOW"
|
||||
report_data = [{
|
||||
'target': target.load(),
|
||||
'status': status,
|
||||
'title': "PKCE Parameters Missing",
|
||||
'description': "PKCE parameters are missing or incomplete.",
|
||||
'uri': url
|
||||
}]
|
||||
save_report(report_data)
|
||||
return
|
||||
|
||||
if method_val.lower() == "plain":
|
||||
self.report("PKCE method is set to 'plain'. Possible downgrade.", url, is_openid)
|
||||
status = "CRITICAL"
|
||||
report_data = [{
|
||||
'target': target.load(),
|
||||
'status': status,
|
||||
'title': "PKCE Plain Method",
|
||||
'description': "PKCE method is set to 'plain'. Possible downgrade.",
|
||||
'uri': url
|
||||
}]
|
||||
save_report(report_data)
|
||||
return
|
||||
|
||||
# Create downgraded request
|
||||
|
|
@ -49,6 +71,13 @@ class PKCEDowngradeChecker:
|
|||
|
||||
new_query = urlencode(downgraded_query, doseq=True)
|
||||
downgraded_url = urlunparse(parsed._replace(query=new_query))
|
||||
|
||||
# Ensure downgraded_url is a string, not bytes
|
||||
if isinstance(downgraded_url, bytes):
|
||||
downgraded_url = downgraded_url.decode('utf-8')
|
||||
|
||||
# Ensure it's definitely a string
|
||||
downgraded_url = str(downgraded_url)
|
||||
|
||||
print(f"[DEBUG] Testing downgraded URL: {downgraded_url}")
|
||||
|
||||
|
|
@ -81,12 +110,15 @@ class PKCEDowngradeChecker:
|
|||
if orig_success and down_success:
|
||||
# If both redirect and both have code, it's a clear vulnerability
|
||||
if both_redirect and both_have_code:
|
||||
self.report(
|
||||
"PKCE downgrade vulnerability detected! Both URLs returned authorization code.",
|
||||
f"Original: {url}\nDowngraded: {downgraded_url}",
|
||||
is_openid,
|
||||
critical=True
|
||||
)
|
||||
status = "CRITICAL"
|
||||
report_data = [{
|
||||
'target': target.load(),
|
||||
'status': status,
|
||||
'title': "PKCE Downgrade Vulnerability",
|
||||
'description': "PKCE downgrade vulnerability detected! Both URLs returned authorization code.",
|
||||
'uri': f"Original: {url}\nDowngraded: {downgraded_url}"
|
||||
}]
|
||||
save_report(report_data)
|
||||
# If responses are similar (both redirect to similar pages), it might be vulnerable
|
||||
elif both_redirect:
|
||||
# Check if the redirect locations are similar (excluding PKCE parameters)
|
||||
|
|
@ -94,30 +126,28 @@ class PKCEDowngradeChecker:
|
|||
down_parsed = urlparse(down_loc)
|
||||
|
||||
if orig_parsed.path == down_parsed.path and orig_parsed.netloc == down_parsed.netloc:
|
||||
self.report(
|
||||
"Potential PKCE downgrade vulnerability! Server accepts requests without PKCE.",
|
||||
f"Original: {url}\nDowngraded: {downgraded_url}",
|
||||
is_openid,
|
||||
critical=False
|
||||
)
|
||||
# If downgraded request doesn't fail, it's concerning
|
||||
status = "MEDIUM" if is_openid else "LOW"
|
||||
report_data = [{
|
||||
'target': target.load(),
|
||||
'status': status,
|
||||
'title': "Potential PKCE Downgrade",
|
||||
'description': "Potential PKCE downgrade vulnerability! Server accepts requests without PKCE.",
|
||||
'uri': f"Original: {url}\nDowngraded: {downgraded_url}"
|
||||
}]
|
||||
save_report(report_data)
|
||||
elif down_resp.status_code != 400: # 400 would be expected for missing required parameters
|
||||
self.report(
|
||||
"Server accepts OAuth request without PKCE parameters.",
|
||||
f"Original: {url}\nDowngraded: {downgraded_url}",
|
||||
is_openid,
|
||||
critical=False
|
||||
)
|
||||
status = "MEDIUM" if is_openid else "LOW"
|
||||
report_data = [{
|
||||
'target': target.load(),
|
||||
'status': status,
|
||||
'title': "PKCE Not Enforced",
|
||||
'description': "Server accepts OAuth request without PKCE parameters.",
|
||||
'uri': f"Original: {url}\nDowngraded: {downgraded_url}"
|
||||
}]
|
||||
save_report(report_data)
|
||||
|
||||
else:
|
||||
print(f"[DEBUG] Requests had different success status - likely PKCE is enforced")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Request failed: {e}")
|
||||
|
||||
def report(self, msg, context_url, is_openid, critical=False):
|
||||
tag = "[CRITICAL]" if critical else "[WARN]"
|
||||
flow_type = "OpenID" if is_openid else "OAuth2"
|
||||
# ANSI color codes for red text
|
||||
RED = '\033[91m'
|
||||
RESET = '\033[0m'
|
||||
print(f"{RED}{tag} {flow_type} - {msg}\n → {context_url}{RESET}\n")
|
||||
|
|
|
|||
23
lib/report.py
Normal file
23
lib/report.py
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
# save as data/report.csv
|
||||
import csv
|
||||
from typing import List, Dict, Any
|
||||
|
||||
# target, status, title, description, uri
|
||||
|
||||
# file path는 'data/report.csv'로 고정
|
||||
def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report.csv') -> None:
|
||||
"""
|
||||
Save the report data to a CSV file.
|
||||
|
||||
:param report_data: List of dictionaries containing report data.
|
||||
:param file_path: Path to the CSV file where the report will be saved.
|
||||
"""
|
||||
fieldnames = ['target', 'status', 'title', 'description', 'uri']
|
||||
|
||||
with open(file_path, mode='w', newline='', encoding='utf-8') as csvfile:
|
||||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for row in report_data:
|
||||
# Replace actual newlines with literal \n strings
|
||||
escaped_row = {k: str(v).replace('\n', '\\n') if v is not None else v for k, v in row.items()}
|
||||
writer.writerow(escaped_row)
|
||||
32
lib/target.py
Normal file
32
lib/target.py
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
# save ./data/target.temp file as domain string
|
||||
import os
|
||||
|
||||
def save(target: str, file_path: str = "./data/target.dump") -> None:
|
||||
"""
|
||||
Save the target domain to a temporary file.
|
||||
|
||||
:param target: Target domain to be saved.
|
||||
"""
|
||||
# Create directory if it doesn't exist
|
||||
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
||||
|
||||
with open(file_path, 'w', encoding='utf-8') as f:
|
||||
f.write(target)
|
||||
|
||||
print(f"Target saved to {file_path}") # Debug message
|
||||
|
||||
def load(file_path: str = "./data/target.dump") -> str:
|
||||
"""
|
||||
Load the target domain from a temporary file.
|
||||
|
||||
:return: Target domain string.
|
||||
"""
|
||||
if not os.path.exists(file_path):
|
||||
print(f"[ERROR] Target file {file_path} does not exist.")
|
||||
return ""
|
||||
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
target = f.read().strip()
|
||||
|
||||
print(f"Loaded target from {file_path}: {target}") # Debug message
|
||||
return target
|
||||
22
main.py
22
main.py
|
|
@ -1,9 +1,31 @@
|
|||
import sys
|
||||
from mitmproxy.tools.main import mitmdump
|
||||
from flask import Flask, request
|
||||
import threading
|
||||
import lib.target as target
|
||||
|
||||
def main():
|
||||
sys.argv = ["mitmdump", "-s", "./addon/init.py", "--listen-port", "11080"]
|
||||
mitmdump()
|
||||
|
||||
# get target from browser use web api
|
||||
app = Flask(__name__)
|
||||
|
||||
@app.route('/start', methods=['GET', 'POST'])
|
||||
def start():
|
||||
target_url = request.args.get('url')
|
||||
if target_url:
|
||||
target.save(target_url)
|
||||
print(f"Target URL set to: {target_url}")
|
||||
return f"Target URL set to: {target_url}"
|
||||
return "No URL provided"
|
||||
|
||||
def run_web_server():
|
||||
app.run(host='localhost', port=11081, debug=False)
|
||||
|
||||
# Start web server in a separate thread
|
||||
web_thread = threading.Thread(target=run_web_server, daemon=True)
|
||||
web_thread.start()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue