oauth-backend/addon/init.py

93 lines
3.1 KiB
Python

from mitmproxy import http
import asyncio
from pkce_check import PKCEDowngradeChecker
from addon.scope_detection import ScopeDetection
from csrf_check import CsrfChecker
from client_secret import ClientSecret
from redirect_uri_check import RedirectBypassChecker
from access_token import AccessTokenScanner
from addon.google_login_hint import GoogleLoginHint
from addon.google_response_type_token import GoogleResponseTypeToken
import os
from dotenv import load_dotenv
from lib.utils.try_catch import try_catch
from lib.false_true_varifing_task import FalseTrueVarifingTask
# Initialize the singleton task manager
false_true_varifing_task = FalseTrueVarifingTask()
load_dotenv(override=True)
class AddonBase:
"""
Base class for addons.
Each addon should implement its own request or response method.
"""
def __init__(self) -> None:
if os.getenv('GOOGLE_ID'):
self.google_login_hint = GoogleLoginHint()
else:
self.google_login_hint = None
def should_ignore(self, flow: http.HTTPFlow) -> bool:
"""Check if the request should be ignored."""
ignore_domains = [
".googleapis.com",
"android.clients.google.com", # Added missing comma here
".adtrafficquality.google",
".googlesyndication.com",
"cdn.jsdelivr.net",
"update.googleapis.com",
".google-analytics.com",
".gstatic.com"
]
# Ignore .googleapis.com domains
for domain in ignore_domains:
if domain in flow.request.pretty_host:
return True
# Ignore static files (JS, CSS, fonts, images, etc.)
# Split on '?' to remove query parameters before checking extension
path = flow.request.path.split('?')[0].lower()
static_extensions = [
'.js', '.css', '.woff2', '.woff', '.ttf', '.otf', '.svg',
'.png', '.jpg', '.jpeg', '.gif', '.webp', '.ico', '.bmp',
'.tiff', '.tif', '.webm', '.mp4', '.avi', '.mov', '.pdf', '.md',
'.txt', '.csv'
]
if any(path.endswith(ext) for ext in static_extensions):
return True
return False
async def request(self, flow: http.HTTPFlow):
if self.google_login_hint:
await try_catch(self.google_login_hint.request(flow))
if false_true_varifing_task.is_verifing_false_true():
return
tasks = [
try_catch(PKCEDowngradeChecker().test(flow)),
]
await asyncio.gather(*tasks)
async def response(self, flow: http.HTTPFlow):
if false_true_varifing_task.is_verifing_false_true() or self.should_ignore(flow):
return
tasks = [
try_catch(CsrfChecker().response(flow)),
try_catch(ScopeDetection().test(flow)),
try_catch(ClientSecret().test(flow)),
try_catch(AccessTokenScanner().scan(flow)),
try_catch(RedirectBypassChecker().test(flow)),
try_catch(GoogleResponseTypeToken().test(flow)),
]
await asyncio.gather(*tasks)
addons = [AddonBase()]