mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 03:51:51 +09:00
[Update] csrf
This commit is contained in:
parent
e91b2738e1
commit
5fe33564d6
5 changed files with 200 additions and 14 deletions
168
addon/csrf_check.py
Normal file
168
addon/csrf_check.py
Normal file
|
|
@ -0,0 +1,168 @@
|
|||
# csrf_check.py
|
||||
from mitmproxy import http, ctx
|
||||
from urllib.parse import urlparse, parse_qs, unquote
|
||||
import httpx
|
||||
from typing import Optional, Union, List
|
||||
|
||||
import lib.target as target
|
||||
from lib.report import save_report
|
||||
|
||||
class CsrfChecker:
|
||||
nonce_params = {
|
||||
"state", "nonce", "as", "frame_id", "csrf_token", "csrf"
|
||||
}
|
||||
|
||||
def is_oauth_uri(self, uri: str) -> bool:
|
||||
qs = parse_qs(urlparse(uri).query)
|
||||
qs_keys = [*qs]
|
||||
|
||||
if "client_id" in qs_keys and any(p in qs_keys for p in (
|
||||
"redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")):
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_header(self, headers: http.Headers, name: str) -> Optional[str]:
|
||||
# mitmproxy Headers는 case-insensitive
|
||||
raw = headers.get(name)
|
||||
if raw is None:
|
||||
return None
|
||||
# percent-encoding 디코딩 (예: '%20' → ' ')
|
||||
return unquote(raw)
|
||||
|
||||
def get_query_param(self, uri: str, param: str) -> Optional[str]:
|
||||
return parse_qs(urlparse(uri).query).get(param, [None])[0]
|
||||
|
||||
def set_query_param(self, qs: dict, param: str, value: str) -> dict:
|
||||
new_qs = dict(qs)
|
||||
new_qs[param] = [value]
|
||||
return new_qs
|
||||
|
||||
def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool:
|
||||
code = flow.response.status_code
|
||||
loc = self.get_header(flow.response.headers, "location") or ""
|
||||
return 300 <= code < 400 and self.is_oauth_uri(loc)
|
||||
|
||||
def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool:
|
||||
qs = parse_qs(urlparse(flow.request.url).query)
|
||||
|
||||
for p in self.nonce_params:
|
||||
val = qs.get(p) # 값이 없으면 None, 있으면 리스트 혹은 단일값
|
||||
if val:
|
||||
return True
|
||||
return False
|
||||
|
||||
def find_nonce_param(self, uri: str) -> Optional[str]:
|
||||
qs_keys = parse_qs(urlparse(uri).query).keys()
|
||||
for p in self.nonce_params:
|
||||
if p in qs_keys:
|
||||
return p
|
||||
return None
|
||||
|
||||
async def fetch_no_cookie(self, flow: http.HTTPFlow) -> httpx.Response:
|
||||
# HTTPX로 비동기 재요청: 쿠키 제외
|
||||
headers = {
|
||||
k: v for k, v in flow.request.headers.items()
|
||||
if k.lower() != "cookie"
|
||||
}
|
||||
async with httpx.AsyncClient(follow_redirects=False) as cli:
|
||||
return await cli.request(
|
||||
method=flow.request.method,
|
||||
url=flow.request.url,
|
||||
headers=headers,
|
||||
content=flow.request.get_content(),
|
||||
)
|
||||
|
||||
def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]:
|
||||
# ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답
|
||||
if not (self.is_oauth_uri(flow.request.url)
|
||||
and self.check_nonce_in_request(flow)
|
||||
and self.is_oauth_redirect(flow)):
|
||||
return 0
|
||||
|
||||
param = self.find_nonce_param(flow.request.url)
|
||||
orig_nonce = self.get_query_param(flow.request.url, param) if param else None
|
||||
loc = self.get_header(flow.response.headers, "location") or ""
|
||||
resp_nonce = self.get_query_param(loc, param) if param else None
|
||||
|
||||
if resp_nonce is None:
|
||||
return ["Missing nonce in redirect"]
|
||||
if orig_nonce != resp_nonce:
|
||||
return ["Nonce mismatch request↔response"]
|
||||
|
||||
return 0
|
||||
|
||||
async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]:
|
||||
# OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사
|
||||
if self.is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow):
|
||||
return 0
|
||||
|
||||
|
||||
loc0 = self.get_header(flow.response.headers, "location") or ""
|
||||
param = self.find_nonce_param(loc0) or "state"
|
||||
qs0 = parse_qs(urlparse(loc0).query)
|
||||
orig_nonce = qs0.get(param, [None])[0]
|
||||
|
||||
# (1) 쿠키 없는 재요청 → 새 nonce
|
||||
resp_no_cookie = await self.fetch_no_cookie(flow)
|
||||
if resp_no_cookie.status_code >= 400:
|
||||
return 0
|
||||
loc1 = resp_no_cookie.headers.get("location", "")
|
||||
new_nonce = parse_qs(urlparse(loc1).query).get(param, [None])[0]
|
||||
if new_nonce is None:
|
||||
return 0
|
||||
if new_nonce == orig_nonce:
|
||||
return ["Nonce reused without cookies"]
|
||||
|
||||
# (2) 두 번의 리다이렉트 비교
|
||||
async with httpx.AsyncClient(follow_redirects=False) as cli:
|
||||
# 원본 쿼리
|
||||
req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers)
|
||||
# nonce 교체 쿼리
|
||||
qs0[param] = [new_nonce]
|
||||
req2 = await cli.get(loc0, params=qs0, headers=flow.request.headers)
|
||||
|
||||
if (
|
||||
req1.status_code == req2.status_code
|
||||
and 200 <= req1.status_code < 400
|
||||
and urlparse(req1.headers.get("location", "")).path
|
||||
== urlparse(req2.headers.get("location", "")).path
|
||||
):
|
||||
return ["Identical redirects on nonce swap → potential CSRF"]
|
||||
|
||||
return 0
|
||||
|
||||
async def response(self, flow: http.HTTPFlow) -> None:
|
||||
try:
|
||||
msgs: List[str] = []
|
||||
|
||||
# 1) 요청에 nonce 없으면
|
||||
if self.is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow):
|
||||
msgs.append("Missing state/nonce in request")
|
||||
|
||||
# 2) 리다이렉트에서 nonce 검사
|
||||
r1 = self.check_redirect_nonce(flow)
|
||||
if r1:
|
||||
msgs.extend(r1 if isinstance(r1, list) else [])
|
||||
|
||||
# 3) nonce 재사용 검사
|
||||
r2 = await self.check_nonce_reuse(flow)
|
||||
if r2:
|
||||
msgs.extend(r2 if isinstance(r2, list) else [])
|
||||
|
||||
if msgs:
|
||||
desc = " | ".join(msgs)
|
||||
status = "MEDIUM"
|
||||
report_data = [{
|
||||
'target': target.load(),
|
||||
'status': status,
|
||||
'title': "CSRF Risk",
|
||||
'description': desc,
|
||||
'uri': flow.request.url,
|
||||
}]
|
||||
save_report(report_data)
|
||||
print(f"[INFO] CSRF Check: {desc}")
|
||||
else:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"[ERROR] CSRF Check failed: {e}")
|
||||
return
|
||||
|
|
@ -2,7 +2,7 @@ from mitmproxy import http
|
|||
import asyncio
|
||||
from pkce_check import PKCEDowngradeChecker
|
||||
from ScopeDetection import ScopeDetection
|
||||
|
||||
from csrf_check import CsrfChecker
|
||||
|
||||
class PKCEAddon:
|
||||
def __init__(self):
|
||||
|
|
@ -16,7 +16,21 @@ class PKCEAddon:
|
|||
await self.checker.test(flow)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Addon failed: {e}")
|
||||
pass
|
||||
|
||||
|
||||
class CsrfAddon:
|
||||
def __init__(self):
|
||||
self.checker = CsrfChecker()
|
||||
|
||||
async def response(self, flow: http.HTTPFlow):
|
||||
try:
|
||||
await self.checker.response(flow)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] CSRF Addon failed: {e}")
|
||||
pass
|
||||
|
||||
|
||||
class ScopeAddon:
|
||||
def __init__(self):
|
||||
self.checker = ScopeDetection()
|
||||
|
|
@ -35,5 +49,4 @@ class ScopeAddon:
|
|||
except Exception as e:
|
||||
print(f"[ERROR] ScopeDetection failed: {e}")
|
||||
|
||||
|
||||
addons = [PKCEAddon(), ScopeAddon()]
|
||||
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon()]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue