From 0d81fdd49fcff832d20805ea5ac66131fcbef1a1 Mon Sep 17 00:00:00 2001 From: "tv0924@icloud.com" Date: Thu, 26 Jun 2025 19:07:35 +0900 Subject: [PATCH] =?UTF-8?q?[Refactor=20and=20Enhance]=20addon=20init.py?= =?UTF-8?q?=EC=9D=98=20=EB=B9=84=EB=8F=99=EA=B8=B0=20=EC=9E=91=EC=97=85?= =?UTF-8?q?=EC=9D=84=20=EB=8D=94=EC=9A=B1=20=ED=9A=A8=EC=9C=A8=EC=A0=81?= =?UTF-8?q?=EC=9C=BC=EB=A1=9C=20=EC=88=98=ED=96=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 43 +++++------- addon/access_token.py | 3 +- addon/csrf_check.py | 2 +- addon/init.py | 137 ++++++++++----------------------------- addon/pkce_check.py | 3 - addon/scope_detection.py | 24 ++----- lib/utils/try_catch.py | 5 ++ 7 files changed, 60 insertions(+), 157 deletions(-) create mode 100644 lib/utils/try_catch.py diff --git a/README.md b/README.md index f922164..3e4b4d5 100644 --- a/README.md +++ b/README.md @@ -51,47 +51,36 @@ http://localhost:11081로 백엔드 서버가 열리게 됩니다. `./addon/init.py` ```py -from example_check import Example - -class LoggerAddon: - def __init__(self): - self.checker = Example() - - def request(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것 - # 오탐 검사하고 있을때는 검증하지 않음 +... + async def request(self, flow: http.HTTPFlow): if false_true_varifing_task.is_verifing_false_true(): return - self.checker.test(flow) - - def response(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것 - # 오탐 검사하고 있을때는 검증하지 않음 - if false_true_varifing_task.is_verifing_false_true(): - return - self.checker.test(flow) + tasks = [ + try_catch(self.google_login_hint.request(flow)) if self.google_login_hint else None, + try_catch(PKCEDowngradeChecker().test(flow)), + try_catch(Example().test(flow)) + ] + await asyncio.gather(*tasks) + ... ``` `./addon/example.py` ```py -import lib.target as target -from lib.report import save_report +from lib.report_vuln import report_vuln class Example: async def test(self, flow): - req = flow.request - method = req.method url = req.pretty_url # data/report.csv에 저장 - report_data = [{ - 'target': target.load(), - 'status': "CRITICAL", - 'title': "PKCE Downgrade Vulnerability", - 'description': "PKCE downgrade vulnerability detected! Both URLs returned authorization code.", - 'uri': f"Original: {url}\nDowngraded: {downgraded_url}" - }] - save_report(report_data) + report_vuln( + title="PKCE Plain Method", + desc="PKCE method is set to 'plain'. Possible downgrade.", + status="CRITICAL", + uri=url, + ) ``` 이러한 예제를 참고하여 작성하여주세요. diff --git a/addon/access_token.py b/addon/access_token.py index 9c18bb6..4f7c3ad 100644 --- a/addon/access_token.py +++ b/addon/access_token.py @@ -1,6 +1,5 @@ import re -from dataclasses import dataclass, asdict -from typing import List, Dict, Optional, Any +from typing import Optional, Any import asyncio from mitmproxy.http import HTTPFlow diff --git a/addon/csrf_check.py b/addon/csrf_check.py index 91b7e4e..867191f 100644 --- a/addon/csrf_check.py +++ b/addon/csrf_check.py @@ -1,5 +1,5 @@ # csrf_check.py -from mitmproxy import http, ctx +from mitmproxy import http from urllib.parse import urlparse, parse_qs, unquote import httpx from typing import Optional, Union, List diff --git a/addon/init.py b/addon/init.py index 634d344..d820f92 100644 --- a/addon/init.py +++ b/addon/init.py @@ -9,6 +9,7 @@ from access_token import AccessTokenScanner from addon.google_login_hint import GoogleLoginHint import os from dotenv import load_dotenv +from lib.utils.try_catch import try_catch from lib.false_true_varifing_task import FalseTrueVarifingTask # Initialize the singleton task manager @@ -16,111 +17,39 @@ false_true_varifing_task = FalseTrueVarifingTask() load_dotenv(override=True) -class PKCEAddon: - def __init__(self): - self.checker = PKCEDowngradeChecker() - - async def request(self, flow: http.HTTPFlow): - print( - f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}" - ) - try: - # 오탐 검사하고 있을때는 검증하지 않음 - if false_true_varifing_task.is_verifing_false_true(): - return - - await self.checker.test(flow) - except Exception as e: - print(f"[ERROR] Addon failed: {e}") - pass - - -class CsrfAddon: - def __init__(self): - self.checker = CsrfChecker() - - async def response(self, flow: http.HTTPFlow): - try: - # 오탐 검사하고 있을때는 검증하지 않음 - if false_true_varifing_task.is_verifing_false_true(): - return - await self.checker.response(flow) - except Exception as e: - print(f"[ERROR] CSRF Addon failed: {e}") - pass - - -class ScopeAddon: - def __init__(self): - self.checker = ScopeDetection() - - async def response(self, flow: http.HTTPFlow): - try: - # 오탐 검사하고 있을때는 검증하지 않음 - if false_true_varifing_task.is_verifing_false_true(): - return - await self.checker.test(flow) - except Exception as e: - print(f"[ERROR] ScopeDetection failed: {e}") - - - -class NonceAddon: - def __init__(self): - self.checker = NonceChecker() - - async def response(self, flow: http.HTTPFlow): - try: - pass - # TODO id_token을 파싱하는 부분이 누락되어있습니다. - # await self.checker.check_nonce_in_id_token(flow) - except Exception as e: - print(f"[ERROR] NonceAddon failed: {e}") - pass - - - -class AccessTokenAddon: - def __init__(self): - self.checker = AccessTokenScanner() - - async def response(self, flow: http.HTTPFlow): - try: - # 오탐 검사하고 있을때는 검증하지 않음 - if false_true_varifing_task.is_verifing_false_true(): - return - await self.checker.scan(flow) - except Exception as e: - print(f"[ERROR] AccessToken Addon failed: {e}") - pass - -class RedirectBypassAddon: - def __init__(self): - self.checker = RedirectBypassChecker() - - # request 대신 response 로 바꿔 보세요: - async def response(self, flow: http.HTTPFlow): - try: - # 오탐 검사하고 있을때는 검증하지 않음 - if false_true_varifing_task.is_verifing_false_true(): - return - await self.checker.test(flow) - except Exception as e: - print(f"[ERROR] RedirectBypass Addon failed: {e}") - -class GoogleLoginHintAddon(): +class AddonBase: + """ + Base class for addons. + Each addon should implement its own request or response method. + """ + def __init__(self) -> None: if os.getenv('GOOGLE_ID'): - self.checker = GoogleLoginHint() + self.google_login_hint = GoogleLoginHint() else: - self.checker = None - - async def request(self, flow: http.HTTPFlow): - if self.checker is None: - return - try: - await self.checker.request(flow) - except Exception as e: - print(f"[ERROR] GoogleLoginHint Addon failed: {e}") + self.google_login_hint = None -addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), AccessTokenAddon(), GoogleLoginHintAddon(), RedirectBypassAddon()] + async def request(self, flow: http.HTTPFlow): + if false_true_varifing_task.is_verifing_false_true(): + return + + tasks = [ + try_catch(self.google_login_hint.request(flow)) if self.google_login_hint else None, + try_catch(PKCEDowngradeChecker().test(flow)), + ] + await asyncio.gather(*tasks) + + async def response(self, flow: http.HTTPFlow): + if false_true_varifing_task.is_verifing_false_true(): + return + + tasks = [ + try_catch(CsrfChecker().response(flow)), + try_catch(ScopeDetection().test(flow)), + # try_catch(NonceChecker().check_nonce_in_request(flow)), + try_catch(AccessTokenScanner().scan(flow)), + try_catch(RedirectBypassChecker().test(flow)), + ] + await asyncio.gather(*tasks) + +addons = [AddonBase()] diff --git a/addon/pkce_check.py b/addon/pkce_check.py index 3466edc..8f922b9 100644 --- a/addon/pkce_check.py +++ b/addon/pkce_check.py @@ -1,9 +1,6 @@ from urllib.parse import urlparse, parse_qs, urlencode, urlunparse -import asyncio import httpx from typing import Dict, List - -import lib.cur_target_url as cur_target_url from lib.report_vuln import report_vuln diff --git a/addon/scope_detection.py b/addon/scope_detection.py index a5a2e2d..3ef088b 100644 --- a/addon/scope_detection.py +++ b/addon/scope_detection.py @@ -12,18 +12,16 @@ class ScopeDetection: return scope_values[0] return None - async def check_scope(self, flow): + async def test(self, flow): + if not is_oauth_uri(flow.request.pretty_url): + return + req = flow.request - res = flow.response parsed = urlparse(req.pretty_url) query = parsed.query - location = res.headers.get("Location", "") - location_query = urlparse(location).query - query_scope = self.get_scope_from_query(query) - location_scope = self.get_scope_from_query(location_query) if query_scope in ["all", "*"]: report_vuln( @@ -32,17 +30,3 @@ class ScopeDetection: status="WARNING", uri=req.pretty_url ) - if location_scope in ["all", "*"]: - report_vuln( - title="OAuth Scope Value Issue", - desc=f"Scope value issue detected in response location: {location_scope}", - status="WARNING", - uri=location - ) - - async def test(self, flow): - - if not is_oauth_uri(flow.request.pretty_url): - return - - await self.check_scope(flow) diff --git a/lib/utils/try_catch.py b/lib/utils/try_catch.py new file mode 100644 index 0000000..54bbc3b --- /dev/null +++ b/lib/utils/try_catch.py @@ -0,0 +1,5 @@ +async def try_catch(coro): + try: + return await coro + except Exception as e: + print(f"[ERROR] {coro} failed: {e}")