mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 07:01:51 +09:00
[Add] csrf 포팅
This commit is contained in:
parent
c0c6743423
commit
a5186a7e44
4 changed files with 210 additions and 19 deletions
168
addon/csrf_check.py
Normal file
168
addon/csrf_check.py
Normal file
|
|
@ -0,0 +1,168 @@
|
||||||
|
# csrf_check.py
|
||||||
|
from mitmproxy import http, ctx
|
||||||
|
from urllib.parse import urlparse, parse_qs, unquote
|
||||||
|
import httpx
|
||||||
|
from typing import Optional, Union, List
|
||||||
|
|
||||||
|
import lib.target as target
|
||||||
|
from lib.report import save_report
|
||||||
|
|
||||||
|
class CsrfChecker:
|
||||||
|
nonce_params = {
|
||||||
|
"state", "nonce", "as", "frame_id", "csrf_token", "csrf"
|
||||||
|
}
|
||||||
|
|
||||||
|
def is_oauth_uri(self, uri: str) -> bool:
|
||||||
|
qs = parse_qs(urlparse(uri).query)
|
||||||
|
qs_keys = [*qs]
|
||||||
|
|
||||||
|
if "client_id" in qs_keys and any(p in qs_keys for p in (
|
||||||
|
"redirect_uri", "response_type", "grant_type", "scope", "state", "nonce")):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_header(self, headers: http.Headers, name: str) -> Optional[str]:
|
||||||
|
# mitmproxy Headers는 case-insensitive
|
||||||
|
raw = headers.get(name)
|
||||||
|
if raw is None:
|
||||||
|
return None
|
||||||
|
# percent-encoding 디코딩 (예: '%20' → ' ')
|
||||||
|
return unquote(raw)
|
||||||
|
|
||||||
|
def get_query_param(self, uri: str, param: str) -> Optional[str]:
|
||||||
|
return parse_qs(urlparse(uri).query).get(param, [None])[0]
|
||||||
|
|
||||||
|
def set_query_param(self, qs: dict, param: str, value: str) -> dict:
|
||||||
|
new_qs = dict(qs)
|
||||||
|
new_qs[param] = [value]
|
||||||
|
return new_qs
|
||||||
|
|
||||||
|
def is_oauth_redirect(self, flow: http.HTTPFlow) -> bool:
|
||||||
|
code = flow.response.status_code
|
||||||
|
loc = self.get_header(flow.response.headers, "location") or ""
|
||||||
|
return 300 <= code < 400 and self.is_oauth_uri(loc)
|
||||||
|
|
||||||
|
def check_nonce_in_request(self, flow: http.HTTPFlow) -> bool:
|
||||||
|
qs = parse_qs(urlparse(flow.request.url).query)
|
||||||
|
|
||||||
|
for p in self.nonce_params:
|
||||||
|
val = qs.get(p) # 값이 없으면 None, 있으면 리스트 혹은 단일값
|
||||||
|
if val:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def find_nonce_param(self, uri: str) -> Optional[str]:
|
||||||
|
qs_keys = parse_qs(urlparse(uri).query).keys()
|
||||||
|
for p in self.nonce_params:
|
||||||
|
if p in qs_keys:
|
||||||
|
return p
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def fetch_no_cookie(self, flow: http.HTTPFlow) -> httpx.Response:
|
||||||
|
# HTTPX로 비동기 재요청: 쿠키 제외
|
||||||
|
headers = {
|
||||||
|
k: v for k, v in flow.request.headers.items()
|
||||||
|
if k.lower() != "cookie"
|
||||||
|
}
|
||||||
|
async with httpx.AsyncClient(follow_redirects=False) as cli:
|
||||||
|
return await cli.request(
|
||||||
|
method=flow.request.method,
|
||||||
|
url=flow.request.url,
|
||||||
|
headers=headers,
|
||||||
|
content=flow.request.get_content(),
|
||||||
|
)
|
||||||
|
|
||||||
|
def check_redirect_nonce(self, flow: http.HTTPFlow) -> Union[List[str], int]:
|
||||||
|
# ① OAuth URI, ② 요청에 nonce, ③ 리다이렉트 응답
|
||||||
|
if not (self.is_oauth_uri(flow.request.url)
|
||||||
|
and self.check_nonce_in_request(flow)
|
||||||
|
and self.is_oauth_redirect(flow)):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
param = self.find_nonce_param(flow.request.url)
|
||||||
|
orig_nonce = self.get_query_param(flow.request.url, param) if param else None
|
||||||
|
loc = self.get_header(flow.response.headers, "location") or ""
|
||||||
|
resp_nonce = self.get_query_param(loc, param) if param else None
|
||||||
|
|
||||||
|
if resp_nonce is None:
|
||||||
|
return ["Missing nonce in redirect"]
|
||||||
|
if orig_nonce != resp_nonce:
|
||||||
|
return ["Nonce mismatch request↔response"]
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
async def check_nonce_reuse(self, flow: http.HTTPFlow) -> Union[int, List[str]]:
|
||||||
|
# OAuth Request가 아니면서, OAuth 리다이렉트 응답인 경우만 검사
|
||||||
|
if self.is_oauth_uri(flow.request.url) or not self.is_oauth_redirect(flow):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
loc0 = self.get_header(flow.response.headers, "location") or ""
|
||||||
|
param = self.find_nonce_param(loc0) or "state"
|
||||||
|
qs0 = parse_qs(urlparse(loc0).query)
|
||||||
|
orig_nonce = qs0.get(param, [None])[0]
|
||||||
|
|
||||||
|
# (1) 쿠키 없는 재요청 → 새 nonce
|
||||||
|
resp_no_cookie = await self.fetch_no_cookie(flow)
|
||||||
|
if resp_no_cookie.status_code >= 400:
|
||||||
|
return 0
|
||||||
|
loc1 = resp_no_cookie.headers.get("location", "")
|
||||||
|
new_nonce = parse_qs(urlparse(loc1).query).get(param, [None])[0]
|
||||||
|
if new_nonce is None:
|
||||||
|
return 0
|
||||||
|
if new_nonce == orig_nonce:
|
||||||
|
return ["Nonce reused without cookies"]
|
||||||
|
|
||||||
|
# (2) 두 번의 리다이렉트 비교
|
||||||
|
async with httpx.AsyncClient(follow_redirects=False) as cli:
|
||||||
|
# 원본 쿼리
|
||||||
|
req1 = await cli.get(loc0, params=qs0, headers=flow.request.headers)
|
||||||
|
# nonce 교체 쿼리
|
||||||
|
qs0[param] = [new_nonce]
|
||||||
|
req2 = await cli.get(loc0, params=qs0, headers=flow.request.headers)
|
||||||
|
|
||||||
|
if (
|
||||||
|
req1.status_code == req2.status_code
|
||||||
|
and 200 <= req1.status_code < 400
|
||||||
|
and urlparse(req1.headers.get("location", "")).path
|
||||||
|
== urlparse(req2.headers.get("location", "")).path
|
||||||
|
):
|
||||||
|
return ["Identical redirects on nonce swap → potential CSRF"]
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
async def response(self, flow: http.HTTPFlow) -> None:
|
||||||
|
try:
|
||||||
|
msgs: List[str] = []
|
||||||
|
|
||||||
|
# 1) 요청에 nonce 없으면
|
||||||
|
if self.is_oauth_uri(flow.request.url) and not self.check_nonce_in_request(flow):
|
||||||
|
msgs.append("Missing state/nonce in request")
|
||||||
|
|
||||||
|
# 2) 리다이렉트에서 nonce 검사
|
||||||
|
r1 = self.check_redirect_nonce(flow)
|
||||||
|
if r1:
|
||||||
|
msgs.extend(r1 if isinstance(r1, list) else [])
|
||||||
|
|
||||||
|
# 3) nonce 재사용 검사
|
||||||
|
r2 = await self.check_nonce_reuse(flow)
|
||||||
|
if r2:
|
||||||
|
msgs.extend(r2 if isinstance(r2, list) else [])
|
||||||
|
|
||||||
|
if msgs:
|
||||||
|
desc = " | ".join(msgs)
|
||||||
|
status = "MEDIUM"
|
||||||
|
report_data = [{
|
||||||
|
'target': target.load(),
|
||||||
|
'status': status,
|
||||||
|
'title': "CSRF Risk",
|
||||||
|
'description': desc,
|
||||||
|
'uri': flow.request.url,
|
||||||
|
}]
|
||||||
|
save_report(report_data)
|
||||||
|
print(f"[INFO] CSRF Check: {desc}")
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] CSRF Check failed: {e}")
|
||||||
|
return
|
||||||
|
|
@ -1,16 +1,31 @@
|
||||||
from mitmproxy import http
|
from mitmproxy import http
|
||||||
import asyncio
|
import asyncio
|
||||||
from pkce_check import PKCEDowngradeChecker
|
from pkce_check import PKCEDowngradeChecker
|
||||||
|
from csrf_check import CsrfChecker
|
||||||
|
|
||||||
class PKCEAddon:
|
class PKCEAddon:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.checker = PKCEDowngradeChecker()
|
self.checker = PKCEDowngradeChecker()
|
||||||
|
|
||||||
async def request(self, flow: http.HTTPFlow):
|
async def request(self, flow: http.HTTPFlow):
|
||||||
print(f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}")
|
# print(f"[DEBUG] Processing request: {flow.request.method} {flow.request.pretty_url}")
|
||||||
try:
|
try:
|
||||||
await self.checker.test(flow)
|
await self.checker.test(flow)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[ERROR] Addon failed: {e}")
|
print(f"[ERROR] Addon failed: {e}")
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class CsrfAddon:
|
||||||
|
def __init__(self):
|
||||||
|
self.checker = CsrfChecker()
|
||||||
|
|
||||||
addons = [PKCEAddon()]
|
async def response(self, flow: http.HTTPFlow):
|
||||||
|
# print(f"[DEBUG] Processing request for CSRF check: {flow.request.method} {flow.request.pretty_url}")
|
||||||
|
try:
|
||||||
|
await self.checker.response(flow)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] CSRF Addon failed: {e}")
|
||||||
|
pass
|
||||||
|
|
||||||
|
addons = [PKCEAddon(), CsrfAddon()]
|
||||||
|
|
|
||||||
|
|
@ -1,23 +1,28 @@
|
||||||
# save as data/report.csv
|
|
||||||
import csv
|
import csv
|
||||||
|
import os
|
||||||
from typing import List, Dict, Any
|
from typing import List, Dict, Any
|
||||||
|
|
||||||
# target, status, title, description, uri
|
def save_report(
|
||||||
|
report_data: List[Dict[str, Any]],
|
||||||
# file path는 'data/report.csv'로 고정
|
file_path: str = 'data/report.csv'
|
||||||
def save_report(report_data: List[Dict[str, Any]], file_path: str = 'data/report.csv') -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Save the report data to a CSV file.
|
report_data 안의 각 레포트를 한 줄씩 CSV에 추가로 저장합니다.
|
||||||
|
파일이 없으면 헤더를 먼저 쓰고, 있으면 레코드만 이어서 씁니다.
|
||||||
:param report_data: List of dictionaries containing report data.
|
|
||||||
:param file_path: Path to the CSV file where the report will be saved.
|
|
||||||
"""
|
"""
|
||||||
fieldnames = ['target', 'status', 'title', 'description', 'uri']
|
fieldnames = ['target', 'status', 'title', 'description', 'uri']
|
||||||
|
file_exists = os.path.exists(file_path)
|
||||||
with open(file_path, mode='w', newline='', encoding='utf-8') as csvfile:
|
|
||||||
|
with open(file_path, mode='a', newline='', encoding='utf-8') as csvfile:
|
||||||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||||||
writer.writeheader()
|
# 파일이 없던 새로 만들 때만 헤더 작성
|
||||||
|
if not file_exists:
|
||||||
|
writer.writeheader()
|
||||||
|
|
||||||
for row in report_data:
|
for row in report_data:
|
||||||
# Replace actual newlines with literal \n strings
|
# None 방지 & 줄바꿈 이스케이프
|
||||||
escaped_row = {k: str(v).replace('\n', '\\n') if v is not None else v for k, v in row.items()}
|
escaped = {
|
||||||
writer.writerow(escaped_row)
|
k: str(v).replace('\n', '\\n') if v is not None else ''
|
||||||
|
for k, v in row.items()
|
||||||
|
}
|
||||||
|
writer.writerow(escaped)
|
||||||
|
|
|
||||||
7
main.py
7
main.py
|
|
@ -4,8 +4,11 @@ from flask import Flask, request
|
||||||
import threading
|
import threading
|
||||||
import lib.target as target
|
import lib.target as target
|
||||||
|
|
||||||
|
proxy_port = 11080
|
||||||
|
server_port = 11081
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
sys.argv = ["mitmdump", "-s", "./addon/init.py", "--listen-port", "11080"]
|
sys.argv = ["mitmdump", "-s", "./addon/init.py", "--listen-port", f"{proxy_port}"]
|
||||||
mitmdump()
|
mitmdump()
|
||||||
|
|
||||||
# get target from browser use web api
|
# get target from browser use web api
|
||||||
|
|
@ -21,7 +24,7 @@ def start():
|
||||||
return "No URL provided"
|
return "No URL provided"
|
||||||
|
|
||||||
def run_web_server():
|
def run_web_server():
|
||||||
app.run(host='localhost', port=11081, debug=False)
|
app.run(host='localhost', port=server_port, debug=False)
|
||||||
|
|
||||||
# Start web server in a separate thread
|
# Start web server in a separate thread
|
||||||
web_thread = threading.Thread(target=run_web_server, daemon=True)
|
web_thread = threading.Thread(target=run_web_server, daemon=True)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue