from mitmproxy import http import asyncio from pkce_check import PKCEDowngradeChecker from addon.scope_detection import ScopeDetection from csrf_check import CsrfChecker from nonce_check import NonceChecker from redirect_uri_check import RedirectBypassChecker from access_token import AccessTokenScanner from addon.google_login_hint import GoogleLoginHint import os from dotenv import load_dotenv from lib.utils.try_catch import try_catch from lib.false_true_varifing_task import FalseTrueVarifingTask # Initialize the singleton task manager false_true_varifing_task = FalseTrueVarifingTask() load_dotenv(override=True) class AddonBase: """ Base class for addons. Each addon should implement its own request or response method. """ def __init__(self) -> None: if os.getenv('GOOGLE_ID'): self.google_login_hint = GoogleLoginHint() else: self.google_login_hint = None async def request(self, flow: http.HTTPFlow): if false_true_varifing_task.is_verifing_false_true(): return tasks = [ try_catch(self.google_login_hint.request(flow)) if self.google_login_hint else None, try_catch(PKCEDowngradeChecker().test(flow)), ] await asyncio.gather(*tasks) async def response(self, flow: http.HTTPFlow): if false_true_varifing_task.is_verifing_false_true(): return tasks = [ try_catch(CsrfChecker().response(flow)), try_catch(ScopeDetection().test(flow)), # try_catch(NonceChecker().check_nonce_in_request(flow)), try_catch(AccessTokenScanner().scan(flow)), try_catch(RedirectBypassChecker().test(flow)), ] await asyncio.gather(*tasks) addons = [AddonBase()]