From 5fe33564d607a4437b3f4c9c2162f9b7450f0e63 Mon Sep 17 00:00:00 2001 From: "tv0924@icloud.com" Date: Sun, 8 Jun 2025 12:39:24 +0900 Subject: [PATCH] [Update] csrf --- .DS_Store | Bin 0 -> 6148 bytes addon/csrf_check.py | 168 ++++++++++++++++++++++++++++++++++++++++++++ addon/init.py | 19 ++++- lib/report.py | 25 ++++--- main.py | 2 +- 5 files changed, 200 insertions(+), 14 deletions(-) create mode 100644 .DS_Store create mode 100644 addon/csrf_check.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..1a71cb0776ca1e4c312bf3287e5b925ef4942c2c GIT binary patch literal 6148 zcmZQzU|@7AO)+F(5MW?n;9!8z45|#6fRTZLfrTN3A(5ekA+apDC@&{JFCCbS3Ltr!nhHD5gvbY4hIDsln96kiqxd~7? z5F1n#GlJ@9h#qiN3~K~1K?W2hpvpnjJ-8}nWPsG bool: + qs = parse_qs(urlparse(uri).query) + qs_keys = [*qs] + + if "client_id" in qs_keys and any(p in qs_keys for p in ( + "redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")): + return True + return False + + def get_header(self, headers: http.Headers, name: str) -> Optional[str]: + # mitmproxy Headers는 case-insensitive + raw = headers.get(name) + if raw is None: + return None + # percent-encoding 디코딩 (예: '%20' → ' ') + return unquote(raw) + + def get_query_param(self, uri: str, param: str) -> Optional[str]: + return parse_qs(urlparse(uri).query).get(param, [None])[0] + + def set_query_param(self, qs: dict, param: str, value: str) -> dict: + new_qs = dict(qs) + new_qs[param] = [value] + return new_qs + + def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool: + code = flow.response.status_code + loc = self.get_header(flow.response.headers, "location") or "" + return 300 <= code < 400 and self.is_oauth_uri(loc) + + def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool: + qs = parse_qs(urlparse(flow.request.url).query) + + for p in self.nonce_params: + val = qs.get(p) # 값이 없으면 None, 있으면 리스트 혹은 단일값 + if val: + return True + return False + + def find_nonce_param(self, uri: str) -> Optional[str]: + qs_keys = parse_qs(urlparse(uri).query).keys() + for p in self.nonce_params: + if p in qs_keys: + return p + return None + + async def fetch_no_cookie(self, flow: http.HTTPFlow) -> httpx.Response: + # HTTPX로 비동기 재요청: 쿠키 제외 + headers = { + k: v for k, v in flow.request.headers.items() + if k.lower() != "cookie" + } + async with httpx.AsyncClient(follow_redirects=False) as cli: + return await cli.request( + method=flow.request.method, + url=flow.request.url, + headers=headers, + content=flow.request.get_content(), + ) + + def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]: + # ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답 + if not (self.is_oauth_uri(flow.request.url) + and self.check_nonce_in_request(flow) + and self.is_oauth_redirect(flow)): + return 0 + + param = self.find_nonce_param(flow.request.url) + orig_nonce = self.get_query_param(flow.request.url, param) if param else None + loc = self.get_header(flow.response.headers, "location") or "" + resp_nonce = self.get_query_param(loc, param) if param else None + + if resp_nonce is None: + return ["Missing nonce in redirect"] + if orig_nonce != resp_nonce: + return ["Nonce mismatch request↔response"] + + return 0 + + async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]: + # OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사 + if self.is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow): + return 0 + + + loc0 = self.get_header(flow.response.headers, "location") or "" + param = self.find_nonce_param(loc0) or "state" + qs0 = parse_qs(urlparse(loc0).query) + orig_nonce = qs0.get(param, [None])[0] + + # (1) 쿠키 없는 재요청 → 새 nonce + resp_no_cookie = await self.fetch_no_cookie(flow) + if resp_no_cookie.status_code >= 400: + return 0 + loc1 = resp_no_cookie.headers.get("location", "") + new_nonce = parse_qs(urlparse(loc1).query).get(param, [None])[0] + if new_nonce is None: + return 0 + if new_nonce == orig_nonce: + return ["Nonce reused without cookies"] + + # (2) 두 번의 리다이렉트 비교 + async with httpx.AsyncClient(follow_redirects=False) as cli: + # 원본 쿼리 + req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers) + # nonce 교체 쿼리 + qs0[param] = [new_nonce] + req2 = await cli.get(loc0, params=qs0, headers=flow.request.headers) + + if ( + req1.status_code == req2.status_code + and 200 <= req1.status_code < 400 + and urlparse(req1.headers.get("location", "")).path + == urlparse(req2.headers.get("location", "")).path + ): + return ["Identical redirects on nonce swap → potential CSRF"] + + return 0 + + async def response(self, flow: http.HTTPFlow) -> None: + try: + msgs: List[str] = [] + + # 1) 요청에 nonce 없으면 + if self.is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow): + msgs.append("Missing state/nonce in request") + + # 2) 리다이렉트에서 nonce 검사 + r1 = self.check_redirect_nonce(flow) + if r1: + msgs.extend(r1 if isinstance(r1, list) else []) + + # 3) nonce 재사용 검사 + r2 = await self.check_nonce_reuse(flow) + if r2: + msgs.extend(r2 if isinstance(r2, list) else []) + + if msgs: + desc = " | ".join(msgs) + status = "MEDIUM" + report_data = [{ + 'target': target.load(), + 'status': status, + 'title': "CSRF Risk", + 'description': desc, + 'uri': flow.request.url, + }] + save_report(report_data) + print(f"[INFO] CSRF Check: {desc}") + else: + pass + except Exception as e: + print(f"[ERROR] CSRF Check failed: {e}") + return diff --git a/addon/init.py b/addon/init.py index e31fe1e..7419987 100644 --- a/addon/init.py +++ b/addon/init.py @@ -2,7 +2,7 @@ from mitmproxy import http import asyncio from pkce_check import PKCEDowngradeChecker from ScopeDetection import ScopeDetection - +from csrf_check import CsrfChecker class PKCEAddon: def __init__(self): @@ -16,7 +16,21 @@ class PKCEAddon: await self.checker.test(flow) except Exception as e: print(f"[ERROR] Addon failed: {e}") + pass + + +class CsrfAddon: + def __init__(self): + self.checker = CsrfChecker() + async def response(self, flow: http.HTTPFlow): + try: + await self.checker.response(flow) + except Exception as e: + print(f"[ERROR] CSRF Addon failed: {e}") + pass + + class ScopeAddon: def __init__(self): self.checker = ScopeDetection() @@ -35,5 +49,4 @@ class ScopeAddon: except Exception as e: print(f"[ERROR] ScopeDetection failed: {e}") - -addons = [PKCEAddon(), ScopeAddon()] +addons = [PKCEAddon(), ScopeAddon(), CsrfAddon()] diff --git a/lib/report.py b/lib/report.py index e6198a7..40925d0 100644 --- a/lib/report.py +++ b/lib/report.py @@ -11,17 +11,22 @@ def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report """ - Save the report data to a CSV file. - - :param report_data: List of dictionaries containing report data. - :param file_path: Path to the CSV file where the report will be saved. + report_data 안의 각 레포트를 한 줄씩 CSV에 추가로 저장합니다. + 파일이 없으면 헤더를 먼저 쓰고, 있으면 레코드만 이어서 씁니다. """ fieldnames = ['target', 'status', 'title', 'description', 'uri'] - - with open(file_path, mode='w', newline='', encoding='utf-8') as csvfile: + file_exists = os.path.exists(file_path) + + with open(file_path, mode='a', newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() + # 파일이 없던 새로 만들 때만 헤더 작성 + if not file_exists: + writer.writeheader() + for row in report_data: - # Replace actual newlines with literal \n strings - escaped_row = {k: str(v).replace('\n', '\\n') if v is not None else v for k, v in row.items()} - writer.writerow(escaped_row) + # None 방지 & 줄바꿈 이스케이프 + escaped = { + k: str(v).replace('\n', '\\n') if v is not None else '' + for k, v in row.items() + } + writer.writerow(escaped) diff --git a/main.py b/main.py index b3a64a8..cd9f6bb 100644 --- a/main.py +++ b/main.py @@ -18,4 +18,4 @@ if __name__ == "__main__": # Run mitmdump proxy run_proxy() finally: - server_process.terminate() + server_process.terminate() \ No newline at end of file