From a5186a7e44f5370bd9b7ec0e24f8ea27ddae1b4c Mon Sep 17 00:00:00 2001 From: "tv0924@icloud.com" Date: Sun, 8 Jun 2025 01:13:07 +0900 Subject: [PATCH] =?UTF-8?q?[Add]=20csrf=20=ED=8F=AC=ED=8C=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- addon/csrf_check.py | 168 ++++++++++++++++++++++++++++++++++++++++++++ addon/init.py | 19 ++++- lib/report.py | 35 +++++---- main.py | 7 +- 4 files changed, 210 insertions(+), 19 deletions(-) create mode 100644 addon/csrf_check.py diff --git a/addon/csrf_check.py b/addon/csrf_check.py new file mode 100644 index 0000000..4e07407 --- /dev/null +++ b/addon/csrf_check.py @@ -0,0 +1,168 @@ +# csrf_check.py +from mitmproxy import http, ctx +from urllib.parse import urlparse, parse_qs, unquote +import httpx +from typing import Optional, Union, List + +import lib.target as target +from lib.report import save_report + +class CsrfChecker: + nonce_params = { + "state", "nonce", "as", "frame_id", "csrf_token", "csrf" + } + + def is_oauth_uri(self, uri: str) -> bool: + qs = parse_qs(urlparse(uri).query) + qs_keys = [*qs] + + if "client_id" in qs_keys and any(p in qs_keys for p in ( + "redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")): + return True + return False + + def get_header(self, headers: http.Headers, name: str) -> Optional[str]: + # mitmproxy Headers는 case-insensitive + raw = headers.get(name) + if raw is None: + return None + # percent-encoding 디코딩 (예: '%20' → ' ') + return unquote(raw) + + def get_query_param(self, uri: str, param: str) -> Optional[str]: + return parse_qs(urlparse(uri).query).get(param, [None])[0] + + def set_query_param(self, qs: dict, param: str, value: str) -> dict: + new_qs = dict(qs) + new_qs[param] = [value] + return new_qs + + def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool: + code = flow.response.status_code + loc = self.get_header(flow.response.headers, "location") or "" + return 300 <= code < 400 and self.is_oauth_uri(loc) + + def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool: + qs = parse_qs(urlparse(flow.request.url).query) + + for p in self.nonce_params: + val = qs.get(p) # 값이 없으면 None, 있으면 리스트 혹은 단일값 + if val: + return True + return False + + def find_nonce_param(self, uri: str) -> Optional[str]: + qs_keys = parse_qs(urlparse(uri).query).keys() + for p in self.nonce_params: + if p in qs_keys: + return p + return None + + async def fetch_no_cookie(self, flow: http.HTTPFlow) -> httpx.Response: + # HTTPX로 비동기 재요청: 쿠키 제외 + headers = { + k: v for k, v in flow.request.headers.items() + if k.lower() != "cookie" + } + async with httpx.AsyncClient(follow_redirects=False) as cli: + return await cli.request( + method=flow.request.method, + url=flow.request.url, + headers=headers, + content=flow.request.get_content(), + ) + + def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]: + # ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답 + if not (self.is_oauth_uri(flow.request.url) + and self.check_nonce_in_request(flow) + and self.is_oauth_redirect(flow)): + return 0 + + param = self.find_nonce_param(flow.request.url) + orig_nonce = self.get_query_param(flow.request.url, param) if param else None + loc = self.get_header(flow.response.headers, "location") or "" + resp_nonce = self.get_query_param(loc, param) if param else None + + if resp_nonce is None: + return ["Missing nonce in redirect"] + if orig_nonce != resp_nonce: + return ["Nonce mismatch request↔response"] + + return 0 + + async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]: + # OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사 + if self.is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow): + return 0 + + + loc0 = self.get_header(flow.response.headers, "location") or "" + param = self.find_nonce_param(loc0) or "state" + qs0 = parse_qs(urlparse(loc0).query) + orig_nonce = qs0.get(param, [None])[0] + + # (1) 쿠키 없는 재요청 → 새 nonce + resp_no_cookie = await self.fetch_no_cookie(flow) + if resp_no_cookie.status_code >= 400: + return 0 + loc1 = resp_no_cookie.headers.get("location", "") + new_nonce = parse_qs(urlparse(loc1).query).get(param, [None])[0] + if new_nonce is None: + return 0 + if new_nonce == orig_nonce: + return ["Nonce reused without cookies"] + + # (2) 두 번의 리다이렉트 비교 + async with httpx.AsyncClient(follow_redirects=False) as cli: + # 원본 쿼리 + req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers) + # nonce 교체 쿼리 + qs0[param] = [new_nonce] + req2 = await cli.get(loc0, params=qs0, headers=flow.request.headers) + + if ( + req1.status_code == req2.status_code + and 200 <= req1.status_code < 400 + and urlparse(req1.headers.get("location", "")).path + == urlparse(req2.headers.get("location", "")).path + ): + return ["Identical redirects on nonce swap → potential CSRF"] + + return 0 + + async def response(self, flow: http.HTTPFlow) -> None: + try: + msgs: List[str] = [] + + # 1) 요청에 nonce 없으면 + if self.is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow): + msgs.append("Missing state/nonce in request") + + # 2) 리다이렉트에서 nonce 검사 + r1 = self.check_redirect_nonce(flow) + if r1: + msgs.extend(r1 if isinstance(r1, list) else []) + + # 3) nonce 재사용 검사 + r2 = await self.check_nonce_reuse(flow) + if r2: + msgs.extend(r2 if isinstance(r2, list) else []) + + if msgs: + desc = " | ".join(msgs) + status = "MEDIUM" + report_data = [{ + 'target': target.load(), + 'status': status, + 'title': "CSRF Risk", + 'description': desc, + 'uri': flow.request.url, + }] + save_report(report_data) + print(f"[INFO] CSRF Check: {desc}") + else: + pass + except Exception as e: + print(f"[ERROR] CSRF Check failed: {e}") + return diff --git a/addon/init.py b/addon/init.py index 25bebeb..2a5410a 100644 --- a/addon/init.py +++ b/addon/init.py @@ -1,16 +1,31 @@ from mitmproxy import http import asyncio from pkce_check import PKCEDowngradeChecker +from csrf_check import CsrfChecker class PKCEAddon: def __init__(self): self.checker = PKCEDowngradeChecker() async def request(self, flow: http.HTTPFlow): - print(f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}") + # print(f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}") try: await self.checker.test(flow) except Exception as e: print(f"[ERROR] Addon failed: {e}") + pass + + +class CsrfAddon: + def __init__(self): + self.checker = CsrfChecker() -addons = [PKCEAddon()] + async def response(self, flow: http.HTTPFlow): + # print(f"[DEBUG] Processing request for CSRF check: {flow.request.method} {flow.request.pretty_url}") + try: + await self.checker.response(flow) + except Exception as e: + print(f"[ERROR] CSRF Addon failed: {e}") + pass + +addons = [PKCEAddon(), CsrfAddon()] diff --git a/lib/report.py b/lib/report.py index ba8f1d3..a159cc2 100644 --- a/lib/report.py +++ b/lib/report.py @@ -1,23 +1,28 @@ -# save as data/report.csv import csv +import os from typing import List, Dict, Any -# target, status, title, description, uri - -# file path는 'data/report.csv'로 고정 -def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report.csv') -> None: +def save_report( + report_data: List[Dict[str, Any]], + file_path: str = 'data/report.csv' +) -> None: """ - Save the report data to a CSV file. - - :param report_data: List of dictionaries containing report data. - :param file_path: Path to the CSV file where the report will be saved. + report_data 안의 각 레포트를 한 줄씩 CSV에 추가로 저장합니다. + 파일이 없으면 헤더를 먼저 쓰고, 있으면 레코드만 이어서 씁니다. """ fieldnames = ['target', 'status', 'title', 'description', 'uri'] - - with open(file_path, mode='w', newline='', encoding='utf-8') as csvfile: + file_exists = os.path.exists(file_path) + + with open(file_path, mode='a', newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() + # 파일이 없던 새로 만들 때만 헤더 작성 + if not file_exists: + writer.writeheader() + for row in report_data: - # Replace actual newlines with literal \n strings - escaped_row = {k: str(v).replace('\n', '\\n') if v is not None else v for k, v in row.items()} - writer.writerow(escaped_row) + # None 방지 & 줄바꿈 이스케이프 + escaped = { + k: str(v).replace('\n', '\\n') if v is not None else '' + for k, v in row.items() + } + writer.writerow(escaped) diff --git a/main.py b/main.py index c72ab4c..7795eb6 100644 --- a/main.py +++ b/main.py @@ -4,8 +4,11 @@ from flask import Flask, request import threading import lib.target as target +proxy_port = 11080 +server_port = 11081 + def main(): - sys.argv = ["mitmdump", "-s", "./addon/init.py", "--listen-port", "11080"] + sys.argv = ["mitmdump", "-s", "./addon/init.py", "--listen-port", f"{proxy_port}"] mitmdump() # get target from browser use web api @@ -21,7 +24,7 @@ def start(): return "No URL provided" def run_web_server(): - app.run(host='localhost', port=11081, debug=False) + app.run(host='localhost', port=server_port, debug=False) # Start web server in a separate thread web_thread = threading.Thread(target=run_web_server, daemon=True)