mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 07:31:51 +09:00
126 lines
4 KiB
Python
126 lines
4 KiB
Python
from mitmproxy import http
|
|
import asyncio
|
|
from pkce_check import PKCEDowngradeChecker
|
|
from addon.scope_detection import ScopeDetection
|
|
from csrf_check import CsrfChecker
|
|
from nonce_check import NonceChecker
|
|
from redirect_uri_check import RedirectBypassChecker
|
|
from access_token import AccessTokenScanner
|
|
from addon.google_login_hint import GoogleLoginHint
|
|
import os
|
|
from dotenv import load_dotenv
|
|
from lib.false_true_varifing_task import FalseTrueVarifingTask
|
|
|
|
# Initialize the singleton task manager
|
|
false_true_varifing_task = FalseTrueVarifingTask()
|
|
|
|
load_dotenv(override=True)
|
|
|
|
class PKCEAddon:
|
|
def __init__(self):
|
|
self.checker = PKCEDowngradeChecker()
|
|
|
|
async def request(self, flow: http.HTTPFlow):
|
|
print(
|
|
f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}"
|
|
)
|
|
try:
|
|
# 오탐 검사하고 있을때는 검증하지 않음
|
|
if false_true_varifing_task.is_verifing_false_true():
|
|
return
|
|
|
|
await self.checker.test(flow)
|
|
except Exception as e:
|
|
print(f"[ERROR] Addon failed: {e}")
|
|
pass
|
|
|
|
|
|
class CsrfAddon:
|
|
def __init__(self):
|
|
self.checker = CsrfChecker()
|
|
|
|
async def response(self, flow: http.HTTPFlow):
|
|
try:
|
|
# 오탐 검사하고 있을때는 검증하지 않음
|
|
if false_true_varifing_task.is_verifing_false_true():
|
|
return
|
|
await self.checker.response(flow)
|
|
except Exception as e:
|
|
print(f"[ERROR] CSRF Addon failed: {e}")
|
|
pass
|
|
|
|
|
|
class ScopeAddon:
|
|
def __init__(self):
|
|
self.checker = ScopeDetection()
|
|
|
|
async def response(self, flow: http.HTTPFlow):
|
|
try:
|
|
# 오탐 검사하고 있을때는 검증하지 않음
|
|
if false_true_varifing_task.is_verifing_false_true():
|
|
return
|
|
await self.checker.test(flow)
|
|
except Exception as e:
|
|
print(f"[ERROR] ScopeDetection failed: {e}")
|
|
|
|
|
|
|
|
class NonceAddon:
|
|
def __init__(self):
|
|
self.checker = NonceChecker()
|
|
|
|
async def response(self, flow: http.HTTPFlow):
|
|
try:
|
|
pass
|
|
# TODO id_token을 파싱하는 부분이 누락되어있습니다.
|
|
# await self.checker.check_nonce_in_id_token(flow)
|
|
except Exception as e:
|
|
print(f"[ERROR] NonceAddon failed: {e}")
|
|
pass
|
|
|
|
|
|
|
|
class AccessTokenAddon:
|
|
def __init__(self):
|
|
self.checker = AccessTokenScanner()
|
|
|
|
async def response(self, flow: http.HTTPFlow):
|
|
try:
|
|
# 오탐 검사하고 있을때는 검증하지 않음
|
|
if false_true_varifing_task.is_verifing_false_true():
|
|
return
|
|
await self.checker.scan(flow)
|
|
except Exception as e:
|
|
print(f"[ERROR] AccessToken Addon failed: {e}")
|
|
pass
|
|
|
|
class RedirectBypassAddon:
|
|
def __init__(self):
|
|
self.checker = RedirectBypassChecker()
|
|
|
|
# request 대신 response 로 바꿔 보세요:
|
|
async def response(self, flow: http.HTTPFlow):
|
|
try:
|
|
# 오탐 검사하고 있을때는 검증하지 않음
|
|
if false_true_varifing_task.is_verifing_false_true():
|
|
return
|
|
await self.checker.test(flow)
|
|
except Exception as e:
|
|
print(f"[ERROR] RedirectBypass Addon failed: {e}")
|
|
|
|
class GoogleLoginHintAddon():
|
|
def __init__(self) -> None:
|
|
if os.getenv('GOOGLE_ID'):
|
|
self.checker = GoogleLoginHint()
|
|
else:
|
|
self.checker = None
|
|
|
|
async def request(self, flow: http.HTTPFlow):
|
|
if self.checker is None:
|
|
return
|
|
try:
|
|
await self.checker.request(flow)
|
|
except Exception as e:
|
|
print(f"[ERROR] GoogleLoginHint Addon failed: {e}")
|
|
|
|
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), AccessTokenAddon(), GoogleLoginHintAddon(), RedirectBypassAddon()]
|