mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 03:31:51 +09:00
91 lines
3 KiB
Python
91 lines
3 KiB
Python
from mitmproxy import http
|
|
import asyncio
|
|
from pkce_check import PKCEDowngradeChecker
|
|
from addon.scope_detection import ScopeDetection
|
|
from csrf_check import CsrfChecker
|
|
from nonce_check import NonceChecker
|
|
from redirect_uri_check import RedirectBypassChecker
|
|
from access_token import AccessTokenScanner
|
|
from addon.google_login_hint import GoogleLoginHint
|
|
import os
|
|
from dotenv import load_dotenv
|
|
from lib.utils.try_catch import try_catch
|
|
from lib.false_true_varifing_task import FalseTrueVarifingTask
|
|
|
|
# Initialize the singleton task manager
|
|
false_true_varifing_task = FalseTrueVarifingTask()
|
|
|
|
load_dotenv(override=True)
|
|
|
|
class AddonBase:
|
|
"""
|
|
Base class for addons.
|
|
Each addon should implement its own request or response method.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
if os.getenv('GOOGLE_ID'):
|
|
self.google_login_hint = GoogleLoginHint()
|
|
else:
|
|
self.google_login_hint = None
|
|
|
|
def should_ignore(self, flow: http.HTTPFlow) -> bool:
|
|
"""Check if the request should be ignored."""
|
|
ignore_domains = [
|
|
".googleapis.com",
|
|
"android.clients.google.com", # Added missing comma here
|
|
".adtrafficquality.google",
|
|
".googlesyndication.com",
|
|
"cdn.jsdelivr.net",
|
|
"update.googleapis.com",
|
|
".google-analytics.com",
|
|
".gstatic.com"
|
|
]
|
|
# Ignore .googleapis.com domains
|
|
for domain in ignore_domains:
|
|
if domain in flow.request.pretty_host:
|
|
return True
|
|
|
|
# Ignore static files (JS, CSS, fonts, images, etc.)
|
|
# Split on '?' to remove query parameters before checking extension
|
|
path = flow.request.path.split('?')[0].lower()
|
|
static_extensions = [
|
|
'.js', '.css', '.woff2', '.woff', '.ttf', '.otf', '.svg',
|
|
'.png', '.jpg', '.jpeg', '.gif', '.webp', '.ico', '.bmp',
|
|
'.tiff', '.tif', '.webm', '.mp4', '.avi', '.mov', '.pdf', '.md',
|
|
'.txt', '.csv'
|
|
]
|
|
|
|
if any(path.endswith(ext) for ext in static_extensions):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
async def request(self, flow: http.HTTPFlow):
|
|
if self.google_login_hint:
|
|
await try_catch(self.google_login_hint.request(flow))
|
|
|
|
if false_true_varifing_task.is_verifing_false_true():
|
|
return
|
|
|
|
tasks = [
|
|
try_catch(PKCEDowngradeChecker().test(flow)),
|
|
]
|
|
await asyncio.gather(*tasks)
|
|
|
|
async def response(self, flow: http.HTTPFlow):
|
|
if false_true_varifing_task.is_verifing_false_true() or self.should_ignore(flow):
|
|
return
|
|
|
|
tasks = [
|
|
try_catch(CsrfChecker().response(flow)),
|
|
try_catch(ScopeDetection().test(flow)),
|
|
# try_catch(NonceChecker().check_nonce_in_request(flow)),
|
|
try_catch(AccessTokenScanner().scan(flow)),
|
|
try_catch(RedirectBypassChecker().test(flow)),
|
|
]
|
|
await asyncio.gather(*tasks)
|
|
|
|
addons = [AddonBase()]
|