from mitmproxy import http import asyncio from pkce_check import PKCEDowngradeChecker from addon.scope_detection import ScopeDetection from csrf_check import CsrfChecker from nonce_check import NonceChecker from redirect_uri_check import RedirectBypassChecker from access_token import AccessTokenScanner from addon.google_login_hint import GoogleLoginHint import os from dotenv import load_dotenv from lib.utils.try_catch import try_catch from lib.false_true_varifing_task import FalseTrueVarifingTask # Initialize the singleton task manager false_true_varifing_task = FalseTrueVarifingTask() load_dotenv(override=True) class AddonBase: """ Base class for addons. Each addon should implement its own request or response method. """ def __init__(self) -> None: if os.getenv('GOOGLE_ID'): self.google_login_hint = GoogleLoginHint() else: self.google_login_hint = None def should_ignore(self, flow: http.HTTPFlow) -> bool: """Check if the request should be ignored.""" ignore_domains = [ ".googleapis.com", "android.clients.google.com", # Added missing comma here ".adtrafficquality.google", ".googlesyndication.com", "cdn.jsdelivr.net", "update.googleapis.com", ".google-analytics.com", ".gstatic.com" ] # Ignore .googleapis.com domains for domain in ignore_domains: if domain in flow.request.pretty_host: return True # Ignore static files (JS, CSS, fonts, images, etc.) # Split on '?' to remove query parameters before checking extension path = flow.request.path.split('?')[0].lower() static_extensions = [ '.js', '.css', '.woff2', '.woff', '.ttf', '.otf', '.svg', '.png', '.jpg', '.jpeg', '.gif', '.webp', '.ico', '.bmp', '.tiff', '.tif', '.webm', '.mp4', '.avi', '.mov', '.pdf', '.md', '.txt', '.csv' ] if any(path.endswith(ext) for ext in static_extensions): return True return False async def request(self, flow: http.HTTPFlow): if self.google_login_hint: await try_catch(self.google_login_hint.request(flow)) if false_true_varifing_task.is_verifing_false_true(): return tasks = [ try_catch(PKCEDowngradeChecker().test(flow)), ] await asyncio.gather(*tasks) async def response(self, flow: http.HTTPFlow): if false_true_varifing_task.is_verifing_false_true() or self.should_ignore(flow): return tasks = [ try_catch(CsrfChecker().response(flow)), try_catch(ScopeDetection().test(flow)), # try_catch(NonceChecker().check_nonce_in_request(flow)), try_catch(AccessTokenScanner().scan(flow)), try_catch(RedirectBypassChecker().test(flow)), ] await asyncio.gather(*tasks) addons = [AddonBase()]