[Refactor and Enhance] addon init.py의 비동기 작업을 더욱 효율적으로 수행

This commit is contained in:
tv0924@icloud.com 2025-06-26 19:07:35 +09:00
commit 0d81fdd49f
7 changed files with 58 additions and 155 deletions

View file

@ -51,47 +51,36 @@ http://localhost:11081로 백엔드 서버가 열리게 됩니다.
`./addon/init.py` `./addon/init.py`
```py ```py
from example_check import Example ...
async def request(self, flow: http.HTTPFlow):
class LoggerAddon:
def __init__(self):
self.checker = Example()
def request(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true(): if false_true_varifing_task.is_verifing_false_true():
return return
self.checker.test(flow)
def response(self, flow: http.HTTPFlow): # 비동기가 필요할 경우 async def로 할 것
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
self.checker.test(flow)
tasks = [
try_catch(self.google_login_hint.request(flow)) if self.google_login_hint else None,
try_catch(PKCEDowngradeChecker().test(flow)),
try_catch(Example().test(flow))
]
await asyncio.gather(*tasks)
...
``` ```
`./addon/example.py` `./addon/example.py`
```py ```py
import lib.target as target from lib.report_vuln import report_vuln
from lib.report import save_report
class Example: class Example:
async def test(self, flow): async def test(self, flow):
req = flow.request
method = req.method
url = req.pretty_url url = req.pretty_url
# data/report.csv에 저장 # data/report.csv에 저장
report_data = [{ report_vuln(
'target': target.load(), title="PKCE Plain Method",
'status': "CRITICAL", desc="PKCE method is set to 'plain'. Possible downgrade.",
'title': "PKCE Downgrade Vulnerability", status="CRITICAL",
'description': "PKCE downgrade vulnerability detected! Both URLs returned authorization code.", uri=url,
'uri': f"Original: {url}\nDowngraded: {downgraded_url}" )
}]
save_report(report_data)
``` ```
이러한 예제를 참고하여 작성하여주세요. 이러한 예제를 참고하여 작성하여주세요.

View file

@ -1,6 +1,5 @@
import re import re
from dataclasses import dataclass, asdict from typing import Optional, Any
from typing import List, Dict, Optional, Any
import asyncio import asyncio
from mitmproxy.http import HTTPFlow from mitmproxy.http import HTTPFlow

View file

@ -1,5 +1,5 @@
# csrf_check.py # csrf_check.py
from mitmproxy import http, ctx from mitmproxy import http
from urllib.parse import urlparse, parse_qs, unquote from urllib.parse import urlparse, parse_qs, unquote
import httpx import httpx
from typing import Optional, Union, List from typing import Optional, Union, List

View file

@ -9,6 +9,7 @@ from access_token import AccessTokenScanner
from addon.google_login_hint import GoogleLoginHint from addon.google_login_hint import GoogleLoginHint
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
from lib.utils.try_catch import try_catch
from lib.false_true_varifing_task import FalseTrueVarifingTask from lib.false_true_varifing_task import FalseTrueVarifingTask
# Initialize the singleton task manager # Initialize the singleton task manager
@ -16,111 +17,39 @@ false_true_varifing_task = FalseTrueVarifingTask()
load_dotenv(override=True) load_dotenv(override=True)
class PKCEAddon: class AddonBase:
def __init__(self): """
self.checker = PKCEDowngradeChecker() Base class for addons.
Each addon should implement its own request or response method.
async def request(self, flow: http.HTTPFlow): """
print(
f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}"
)
try:
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
await self.checker.test(flow)
except Exception as e:
print(f"[ERROR] Addon failed: {e}")
pass
class CsrfAddon:
def __init__(self):
self.checker = CsrfChecker()
async def response(self, flow: http.HTTPFlow):
try:
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
await self.checker.response(flow)
except Exception as e:
print(f"[ERROR] CSRF Addon failed: {e}")
pass
class ScopeAddon:
def __init__(self):
self.checker = ScopeDetection()
async def response(self, flow: http.HTTPFlow):
try:
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
await self.checker.test(flow)
except Exception as e:
print(f"[ERROR] ScopeDetection failed: {e}")
class NonceAddon:
def __init__(self):
self.checker = NonceChecker()
async def response(self, flow: http.HTTPFlow):
try:
pass
# TODO id_token을 파싱하는 부분이 누락되어있습니다.
# await self.checker.check_nonce_in_id_token(flow)
except Exception as e:
print(f"[ERROR] NonceAddon failed: {e}")
pass
class AccessTokenAddon:
def __init__(self):
self.checker = AccessTokenScanner()
async def response(self, flow: http.HTTPFlow):
try:
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
await self.checker.scan(flow)
except Exception as e:
print(f"[ERROR] AccessToken Addon failed: {e}")
pass
class RedirectBypassAddon:
def __init__(self):
self.checker = RedirectBypassChecker()
# request 대신 response 로 바꿔 보세요:
async def response(self, flow: http.HTTPFlow):
try:
# 오탐 검사하고 있을때는 검증하지 않음
if false_true_varifing_task.is_verifing_false_true():
return
await self.checker.test(flow)
except Exception as e:
print(f"[ERROR] RedirectBypass Addon failed: {e}")
class GoogleLoginHintAddon():
def __init__(self) -> None: def __init__(self) -> None:
if os.getenv('GOOGLE_ID'): if os.getenv('GOOGLE_ID'):
self.checker = GoogleLoginHint() self.google_login_hint = GoogleLoginHint()
else: else:
self.checker = None self.google_login_hint = None
async def request(self, flow: http.HTTPFlow):
if self.checker is None:
return
try:
await self.checker.request(flow)
except Exception as e:
print(f"[ERROR] GoogleLoginHint Addon failed: {e}")
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon(), AccessTokenAddon(), GoogleLoginHintAddon(), RedirectBypassAddon()] async def request(self, flow: http.HTTPFlow):
if false_true_varifing_task.is_verifing_false_true():
return
tasks = [
try_catch(self.google_login_hint.request(flow)) if self.google_login_hint else None,
try_catch(PKCEDowngradeChecker().test(flow)),
]
await asyncio.gather(*tasks)
async def response(self, flow: http.HTTPFlow):
if false_true_varifing_task.is_verifing_false_true():
return
tasks = [
try_catch(CsrfChecker().response(flow)),
try_catch(ScopeDetection().test(flow)),
# try_catch(NonceChecker().check_nonce_in_request(flow)),
try_catch(AccessTokenScanner().scan(flow)),
try_catch(RedirectBypassChecker().test(flow)),
]
await asyncio.gather(*tasks)
addons = [AddonBase()]

View file

@ -1,9 +1,6 @@
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import asyncio
import httpx import httpx
from typing import Dict, List from typing import Dict, List
import lib.cur_target_url as cur_target_url
from lib.report_vuln import report_vuln from lib.report_vuln import report_vuln

View file

@ -12,18 +12,16 @@ class ScopeDetection:
return scope_values[0] return scope_values[0]
return None return None
async def check_scope(self, flow): async def test(self, flow):
if not is_oauth_uri(flow.request.pretty_url):
return
req = flow.request req = flow.request
res = flow.response
parsed = urlparse(req.pretty_url) parsed = urlparse(req.pretty_url)
query = parsed.query query = parsed.query
location = res.headers.get("Location", "")
location_query = urlparse(location).query
query_scope = self.get_scope_from_query(query) query_scope = self.get_scope_from_query(query)
location_scope = self.get_scope_from_query(location_query)
if query_scope in ["all", "*"]: if query_scope in ["all", "*"]:
report_vuln( report_vuln(
@ -32,17 +30,3 @@ class ScopeDetection:
status="WARNING", status="WARNING",
uri=req.pretty_url uri=req.pretty_url
) )
if location_scope in ["all", "*"]:
report_vuln(
title="OAuth Scope Value Issue",
desc=f"Scope value issue detected in response location: {location_scope}",
status="WARNING",
uri=location
)
async def test(self, flow):
if not is_oauth_uri(flow.request.pretty_url):
return
await self.check_scope(flow)

5
lib/utils/try_catch.py Normal file
View file

@ -0,0 +1,5 @@
async def try_catch(coro):
try:
return await coro
except Exception as e:
print(f"[ERROR] {coro} failed: {e}")