mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 03:31:51 +09:00
Compare commits
6 commits
main
...
nonce_chec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8226df5ac0 | ||
|
|
79295badd1 | ||
|
|
ac32d5d604 | ||
|
|
78a377414d | ||
|
|
2bb887939a | ||
|
|
e5b7eea42f |
2 changed files with 40 additions and 20 deletions
|
|
@ -58,9 +58,7 @@ class NonceAddon:
|
||||||
|
|
||||||
async def response(self, flow: http.HTTPFlow):
|
async def response(self, flow: http.HTTPFlow):
|
||||||
try:
|
try:
|
||||||
pass
|
await self.checker.check_nonce_in_id_token(flow)
|
||||||
# TODO id_token을 파싱하는 부분이 누락되어있습니다.
|
|
||||||
# await self.checker.check_nonce_in_id_token(flow)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[ERROR] NonceAddon failed: {e}")
|
print(f"[ERROR] NonceAddon failed: {e}")
|
||||||
pass
|
pass
|
||||||
|
|
|
||||||
|
|
@ -8,20 +8,19 @@ from lib.report import save_report
|
||||||
|
|
||||||
class NonceChecker:
|
class NonceChecker:
|
||||||
def is_oidc_flow(self, flow) -> bool:
|
def is_oidc_flow(self, flow) -> bool:
|
||||||
req = flow.request
|
|
||||||
res = flow.response
|
res = flow.response
|
||||||
url = req.pretty_url
|
url = res.url
|
||||||
parsed = urlparse(url)
|
parsed = urlparse(url)
|
||||||
query = parse_qs(parsed.query)
|
fragment_params = parse_qs(parsed.fragment)
|
||||||
location = res.headers.get("location", "")
|
location = res.headers.get("location", "")
|
||||||
content_type = res.headers.get("content-type", "")
|
content_type = res.headers.get("content-type", "")
|
||||||
|
|
||||||
if "/authorize" in url and "response_type" in query and "openid" in query.get("scope", [""])[0]:
|
|
||||||
return True
|
|
||||||
|
|
||||||
if "application/json" in content_type:
|
if "application/json" in content_type:
|
||||||
if "id_token" in res.text:
|
if "id_token" in res.text:
|
||||||
return True
|
return True
|
||||||
|
if self.extract_id_token(self, flow):
|
||||||
|
return True
|
||||||
|
|
||||||
if res.status_code in [302, 303]:
|
if res.status_code in [302, 303]:
|
||||||
if isinstance(location, list):
|
if isinstance(location, list):
|
||||||
|
|
@ -29,26 +28,28 @@ class NonceChecker:
|
||||||
if "id_token=" in location:
|
if "id_token=" in location:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
if "/authorize" in url and "nonce" in query:
|
if "id_token" in fragment_params:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def extract_id_token(self, response) -> Union[str, None]:
|
def extract_id_token(self, flow) -> Union[str, None]:
|
||||||
"""
|
"""
|
||||||
응답에서 id_token을 추출하는 함수.
|
응답에서 id_token을 추출하는 함수.
|
||||||
"""
|
"""
|
||||||
|
res = flow.response
|
||||||
# 1. JSON 응답에 id_token 있음
|
# 1. JSON 응답에 id_token 있음
|
||||||
try:
|
if "application/json" in res.headers.get("content-type", ""):
|
||||||
if "application/json" in response.headers.get("content-type", ""):
|
try:
|
||||||
data = response.json()
|
data = res.json()
|
||||||
return data.get("id_token")
|
return data.get("id_token")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
# 2. Location 헤더에서 id_token 파싱 (예: #id_token=...&access_token=...)
|
# 2. Location 헤더에서 id_token 파싱 (예: #id_token=...&access_token=...)
|
||||||
location = response.headers.get("location", "")
|
location = res.headers.get("location", "")
|
||||||
if location:
|
if location:
|
||||||
if "#" in location:
|
if "#" in location:
|
||||||
fragment = location.split("#")[1]
|
fragment = location.split("#")[1]
|
||||||
|
|
@ -62,22 +63,43 @@ class NonceChecker:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def decode_id_token(self, id_token: str) -> dict:
|
def decode_id_token(self, flow) -> dict:
|
||||||
|
res = flow.response
|
||||||
|
id_token = self.extract_id_token(flow)
|
||||||
|
if not id_token:
|
||||||
|
return {}
|
||||||
try:
|
try:
|
||||||
return jwt.decode(id_token, options={"verify_signature": False})
|
return jwt.decode(id_token, options={"verify_signature": False})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def check_nonce_in_id_token(self, flow) -> bool:
|
||||||
|
if not flow.response or not self.is_oidc_flow(flow):
|
||||||
|
# OIDC 플로우가 아니거나 응답이 없으면 nonce 체크를 건너뜀
|
||||||
|
return True
|
||||||
|
|
||||||
|
res = flow.response
|
||||||
|
url = res.url
|
||||||
|
parsed = urlparse(url)
|
||||||
|
fragment_params = parse_qs(parsed.fragment)
|
||||||
|
|
||||||
|
if "id_token" in fragment_params:
|
||||||
|
# id_token이 fragment에 있는 경우
|
||||||
|
id_token = fragment_params["id_token"][0]
|
||||||
|
return True
|
||||||
|
|
||||||
|
id_token = self.extract_id_token(flow)
|
||||||
# TODO id_token을 파싱하는 부분이 누락되어있습니다.
|
# TODO id_token을 파싱하는 부분이 누락되어있습니다.
|
||||||
def check_nonce_in_id_token(self, flow, id_token: str) -> bool:
|
def check_nonce_in_id_token(self, flow, id_token: str) -> bool:
|
||||||
decoded = self.decode_id_token(id_token)
|
decoded = self.decode_id_token(id_token)
|
||||||
nonce = decoded.get("nonce")
|
nonce = decoded.get("nonce")
|
||||||
req = flow.request
|
|
||||||
url = req.pretty_url
|
|
||||||
if not nonce:
|
if not nonce:
|
||||||
report_data = [{
|
report_data = [{
|
||||||
'target': target.load(),
|
'target': target.load(),
|
||||||
'status': "CRITICAL",
|
'status': "MEDIUM",
|
||||||
'title': "nonce is missing in id_token",
|
'title': "nonce is missing in id_token",
|
||||||
'description': "Nonce is present in the request but missing in the id_token.",
|
'description': "Nonce is present in the request but missing in the id_token.",
|
||||||
'uri': f"Original: {url}\nDecoded ID Token: {decoded}",
|
'uri': f"Original: {url}\nDecoded ID Token: {decoded}",
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue