mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 05:21:51 +09:00
nonceCheck
oidc flow인지 확인하고 id token을 디코딩한 후 nonce의 유무를 확인한다
This commit is contained in:
parent
39d2e7906c
commit
4f6f2519b3
4 changed files with 113 additions and 1 deletions
|
|
@ -3,6 +3,7 @@ import asyncio
|
|||
from pkce_check import PKCEDowngradeChecker
|
||||
from ScopeDetection import ScopeDetection
|
||||
from csrf_check import CsrfChecker
|
||||
from nonce_check import NonceChecker
|
||||
|
||||
class PKCEAddon:
|
||||
def __init__(self):
|
||||
|
|
@ -49,4 +50,15 @@ class ScopeAddon:
|
|||
except Exception as e:
|
||||
print(f"[ERROR] ScopeDetection failed: {e}")
|
||||
|
||||
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon()]
|
||||
class NonceAddon:
|
||||
def __init__(self):
|
||||
self.checker = NonceChecker()
|
||||
|
||||
async def response(self, flow: http.HTTPFlow):
|
||||
try:
|
||||
await self.checker.response(flow)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] NonceAddon failed: {e}")
|
||||
pass
|
||||
|
||||
addons = [PKCEAddon(), ScopeAddon(), CsrfAddon(), NonceAddon()]
|
||||
|
|
|
|||
88
addon/nonce_check.py
Normal file
88
addon/nonce_check.py
Normal file
|
|
@ -0,0 +1,88 @@
|
|||
import jwt
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
from typing import Union
|
||||
import httpx
|
||||
|
||||
import lib.target as target
|
||||
from lib.report import save_report
|
||||
|
||||
class NonceChecker:
|
||||
def is_oidc_flow(self, flow) -> bool:
|
||||
req = flow.request
|
||||
res = flow.response
|
||||
url = req.pretty_url
|
||||
parsed = urlparse(url)
|
||||
query = parse_qs(parsed.query)
|
||||
location = res.headers.get("location", "")
|
||||
content_type = res.headers.get("content-type", "")
|
||||
|
||||
if "/authorize" in url and "response_type" in query and "openid" in query.get("scope", [""])[0]:
|
||||
return True
|
||||
|
||||
if "application/json" in content_type:
|
||||
if "id_token" in res.text:
|
||||
return True
|
||||
|
||||
if res.status_code in [302, 303]:
|
||||
if isinstance(location, list):
|
||||
location = location[0]
|
||||
if "id_token=" in location:
|
||||
return True
|
||||
|
||||
if "/authorize" in url and "nonce" in query:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def extract_id_token(self, response) -> Union[str, None]:
|
||||
"""
|
||||
응답에서 id_token을 추출하는 함수.
|
||||
"""
|
||||
# 1. JSON 응답에 id_token 있음
|
||||
try:
|
||||
if "application/json" in response.headers.get("content-type", ""):
|
||||
data = response.json()
|
||||
return data.get("id_token")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 2. Location 헤더에서 id_token 파싱 (예: #id_token=...&access_token=...)
|
||||
location = response.headers.get("location", "")
|
||||
if location:
|
||||
if "#" in location:
|
||||
fragment = location.split("#")[1]
|
||||
params = parse_qs(fragment)
|
||||
return params.get("id_token", [None])[0]
|
||||
elif "?" in location:
|
||||
query = location.split("?")[1]
|
||||
params = parse_qs(query)
|
||||
return params.get("id_token", [None])[0]
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def decode_id_token(self, id_token: str) -> dict:
|
||||
try:
|
||||
return jwt.decode(id_token, options={"verify_signature": False})
|
||||
except Exception as e:
|
||||
return {}
|
||||
|
||||
|
||||
def check_nonce_in_id_token(self, flow, id_token: str) -> bool:
|
||||
decoded = self.decode_id_token(id_token)
|
||||
nonce = decoded.get("nonce")
|
||||
req = flow.request
|
||||
url = req.pretty_url
|
||||
if not nonce:
|
||||
report_data = [{
|
||||
'target': target.load(),
|
||||
'status': "CRITICAL",
|
||||
'title': "nonce is missing in id_token",
|
||||
'description': "Nonce is present in the request but missing in the id_token.",
|
||||
'uri': f"Original: {url}\nDecoded ID Token: {decoded}",
|
||||
}]
|
||||
save_report(report_data)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
Loading…
Add table
Add a link
Reference in a new issue