oauth-backend/addon/init.py

55 lines
1.8 KiB
Python

from mitmproxy import http
import asyncio
from pkce_check import PKCEDowngradeChecker
from addon.scope_detection import ScopeDetection
from csrf_check import CsrfChecker
from nonce_check import NonceChecker
from redirect_uri_check import RedirectBypassChecker
from access_token import AccessTokenScanner
from addon.google_login_hint import GoogleLoginHint
import os
from dotenv import load_dotenv
from lib.utils.try_catch import try_catch
from lib.false_true_varifing_task import FalseTrueVarifingTask
# Initialize the singleton task manager
false_true_varifing_task = FalseTrueVarifingTask()
load_dotenv(override=True)
class AddonBase:
"""
Base class for addons.
Each addon should implement its own request or response method.
"""
def __init__(self) -> None:
if os.getenv('GOOGLE_ID'):
self.google_login_hint = GoogleLoginHint()
else:
self.google_login_hint = None
async def request(self, flow: http.HTTPFlow):
if false_true_varifing_task.is_verifing_false_true():
return
tasks = [
try_catch(self.google_login_hint.request(flow)) if self.google_login_hint else None,
try_catch(PKCEDowngradeChecker().test(flow)),
]
await asyncio.gather(*tasks)
async def response(self, flow: http.HTTPFlow):
if false_true_varifing_task.is_verifing_false_true():
return
tasks = [
try_catch(CsrfChecker().response(flow)),
try_catch(ScopeDetection().test(flow)),
# try_catch(NonceChecker().check_nonce_in_request(flow)),
try_catch(AccessTokenScanner().scan(flow)),
try_catch(RedirectBypassChecker().test(flow)),
]
await asyncio.gather(*tasks)
addons = [AddonBase()]