oauth-backend/addon/csrf_check.py

169 lines
6.1 KiB
Python

# csrf_check.py
from mitmproxy import http
from urllib.parse import urlparse, parse_qs, unquote
import httpx
from typing import Optional, Union, List
from lib.report_vuln import report_vuln
from lib.utils.is_oauth_uri import is_oauth_uri
class CsrfChecker:
nonce_params = {
"state", "nonce", "as", "frame_id", "csrf_token", "csrf"
}
def get_header(self, headers: http.Headers, name: str) -> Optional[str]:
# mitmproxy Headers는 case-insensitive
raw = headers.get(name)
if raw is None:
return None
# percent-encoding 디코딩 (예: '%20' → ' ')
return unquote(raw)
def get_query_param(self, uri: str, param: str) -> Optional[str]:
return parse_qs(urlparse(uri).query).get(param, [None])[0]
def set_query_param(self, qs: dict, param: str, value: str) -> dict:
new_qs = dict(qs)
new_qs[param] = [value]
return new_qs
def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool:
code = flow.response.status_code
loc = self.get_header(flow.response.headers, "location") or ""
return 300 <= code < 400 and is_oauth_uri(loc)
def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool:
qs = parse_qs(urlparse(flow.request.url).query)
for p in self.nonce_params:
val = qs.get(p) # 값이 없으면 None, 있으면 리스트 혹은 단일값
if val:
return True
return False
def find_nonce_param(self, uri: str) -> Optional[str]:
qs_keys = parse_qs(urlparse(uri).query).keys()
for p in self.nonce_params:
if p in qs_keys:
return p
return None
async def fetch_no_cookie(self, flow: http.HTTPFlow) -> httpx.Response:
# HTTPX로 비동기 재요청: 쿠키 제외
headers = {
k: v for k, v in flow.request.headers.items()
if k.lower() != "cookie"
}
async with httpx.AsyncClient(follow_redirects=False) as cli:
return await cli.request(
method=flow.request.method,
url=flow.request.url,
headers=headers,
content=flow.request.get_content(),
)
def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]:
# ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답
if not (is_oauth_uri(flow.request.url)
and self.check_nonce_in_request(flow)
and self.is_oauth_redirect(flow)):
return 0
param = self.find_nonce_param(flow.request.url)
orig_nonce = self.get_query_param(flow.request.url, param) if param else None
loc = self.get_header(flow.response.headers, "location") or ""
resp_nonce = self.get_query_param(loc, param) if param else None
if resp_nonce is None:
report_vuln(
title="CSRF Risk",
desc="Missing nonce in redirect response",
status="CRITICAL",
uri=flow.request.url
)
return 1
if orig_nonce != resp_nonce:
report_vuln(
title="CSRF Risk",
desc="Nonce mismatch request↔response",
status="HIGH",
uri=flow.request.url
)
return 1
return 0
async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]:
# OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사
if is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow):
return 0
loc0 = self.get_header(flow.response.headers, "location") or ""
param = self.find_nonce_param(loc0) or "state"
qs0 = parse_qs(urlparse(loc0).query)
orig_nonce = qs0.get(param, [None])[0]
# (1) 쿠키 없는 재요청 → 새 nonce
resp_no_cookie = await self.fetch_no_cookie(flow)
if resp_no_cookie.status_code >= 400:
return 0
loc1 = resp_no_cookie.headers.get("location", "")
new_nonce = parse_qs(urlparse(loc1).query).get(param, [None])[0]
if new_nonce is None:
return 0
if new_nonce == orig_nonce:
report_vuln(
title="CSRF Risk",
desc="Nonce reused without cookies",
status="CRITICAL",
uri=flow.request.url
)
return 1
# (2) 두 번의 리다이렉트 비교
async with httpx.AsyncClient(follow_redirects=True) as cli:
# 원본 쿼리
req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers)
# nonce 교체 쿼리
qs0[param] = [new_nonce]
req2 = await cli.get(loc0, params=qs0, headers=flow.request.headers)
if (
req1.status_code == req2.status_code
and 200 <= req1.status_code < 400
and urlparse(req1.headers.get("location", "")).path
== urlparse(req2.headers.get("location", "")).path
):
report_vuln(
title="CSRF Risk",
desc="Identical redirects on nonce swap → potential CSRF",
status="NOT-VERIFIED-HIGH",
uri=flow.request.url
)
return 1
return 0
async def response(self, flow: http.HTTPFlow) -> None:
try:
# 1) 요청에 nonce 없으면
if is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow):
report_vuln(
title="CSRF Risk",
desc="Missing nonce in OAuth request",
status="CRITICAL",
uri=flow.request.url
)
return
# 2) 리다이렉트에서 nonce 검사
r1 = self.check_redirect_nonce(flow)
# 3) nonce 재사용 검사
r2 = await self.check_nonce_reuse(flow)
except Exception as e:
print(f"[ERROR] CSRF Check failed: {e}")
return