# csrf_check.py from mitmproxy import http from urllib.parse import urlparse, parse_qs, unquote import httpx from typing import Optional, Union, List from lib.report_vuln import report_vuln from lib.utils.is_oauth_uri import is_oauth_uri class CsrfChecker: nonce_params = { "state", "nonce", "csrf_token", "csrf" } def get_header(self, headers: http.Headers, name: str) -> Optional[str]: # mitmproxy Headers는 case-insensitive raw = headers.get(name) if raw is None: return None # percent-encoding 디코딩 (예: '%20' → ' ') return unquote(raw) def get_query_param(self, uri: str, param: str) -> Optional[str]: return parse_qs(urlparse(uri).query).get(param, [None])[0] def set_query_param(self, qs: dict, param: str, value: str) -> dict: new_qs = dict(qs) new_qs[param] = [value] return new_qs def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool: code = flow.response.status_code loc = self.get_header(flow.response.headers, "location") or "" return 300 <= code < 400 and is_oauth_uri(loc) def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool: qs = parse_qs(urlparse(flow.request.url).query) for p in self.nonce_params: val = qs.get(p) # 값이 없으면 None, 있으면 리스트 혹은 단일값 if val: return True return False def find_nonce_param(self, uri: str) -> Optional[str]: qs_keys = parse_qs(urlparse(uri).query).keys() for p in self.nonce_params: if p in qs_keys: return p return None async def fetch_no_cookie(self, flow: http.HTTPFlow) -> httpx.Response: # HTTPX로 비동기 재요청: 쿠키 제외 headers = { k: v for k, v in flow.request.headers.items() if k.lower() != "cookie" } async with httpx.AsyncClient(follow_redirects=False) as cli: return await cli.request( method=flow.request.method, url=flow.request.url, headers=headers, content=flow.request.get_content(), ) def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]: # ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답 if not (is_oauth_uri(flow.request.url) and self.check_nonce_in_request(flow) and self.is_oauth_redirect(flow)): return 0 param = self.find_nonce_param(flow.request.url) orig_nonce = self.get_query_param(flow.request.url, param) if param else None loc = self.get_header(flow.response.headers, "location") or "" resp_nonce = self.get_query_param(loc, param) if param else None if resp_nonce is None: report_vuln( title="CSRF Risk", desc="Missing nonce in redirect response", status="CRITICAL", uri=flow.request.url ) return 1 if orig_nonce != resp_nonce: report_vuln( title="CSRF Risk", desc="Nonce mismatch request↔response", status="HIGH", uri=flow.request.url ) return 1 return 0 async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]: # OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사 if is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow): return 0 loc0 = self.get_header(flow.response.headers, "location") or "" param = self.find_nonce_param(loc0) or "state" qs0 = parse_qs(urlparse(loc0).query) orig_nonce = qs0.get(param, [None])[0] # (1) 쿠키 없는 재요청 → 새 nonce resp_no_cookie = await self.fetch_no_cookie(flow) if resp_no_cookie.status_code >= 400: return 0 loc1 = resp_no_cookie.headers.get("location", "") new_nonce = parse_qs(urlparse(loc1).query).get(param, [None])[0] if new_nonce is None: return 0 if new_nonce == orig_nonce: report_vuln( title="CSRF Risk", desc="Nonce reused without cookies", status="CRITICAL", uri=flow.request.url ) return 1 # (2) 두 번의 리다이렉트 비교 async with httpx.AsyncClient(follow_redirects=True) as cli: # 원본 쿼리 req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers) # nonce 교체 쿼리 qs0[param] = [new_nonce] req2 = await cli.get(loc0, params=qs0, headers=flow.request.headers) if ( req1.status_code == req2.status_code and 200 <= req1.status_code < 400 and urlparse(req1.headers.get("location", "")).path == urlparse(req2.headers.get("location", "")).path ): report_vuln( title="CSRF Risk", desc="Identical redirects on nonce swap → potential CSRF", status="NOT-VERIFIED-HIGH", uri=flow.request.url ) return 1 return 0 async def response(self, flow: http.HTTPFlow) -> None: try: # 1) 요청에 nonce 없으면 if is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow): report_vuln( title="CSRF Risk", desc="Missing nonce in OAuth request", status="CRITICAL", uri=flow.request.url ) return # 2) 리다이렉트에서 nonce 검사 r1 = self.check_redirect_nonce(flow) # 3) nonce 재사용 검사 r2 = await self.check_nonce_reuse(flow) except Exception as e: print(f"[ERROR] CSRF Check failed: {e}") return