mirror of
https://github.com/j93es/oauth-backend.git
synced 2026-06-04 05:21:51 +09:00
수정/test 완료
This commit is contained in:
parent
c0c6743423
commit
e9c59ae6f6
2 changed files with 76 additions and 1 deletions
53
addon/ScopeDetection.py
Normal file
53
addon/ScopeDetection.py
Normal file
|
|
@ -0,0 +1,53 @@
|
||||||
|
import lib.target as target
|
||||||
|
from lib.report import save_report
|
||||||
|
|
||||||
|
class ScopeDetection:
|
||||||
|
def get_scope_from_query(self, query: str) -> str | None:
|
||||||
|
if not query:
|
||||||
|
return None
|
||||||
|
import urllib.parse
|
||||||
|
parsed = urllib.parse.parse_qs(query)
|
||||||
|
scope_values = parsed.get("scope", [])
|
||||||
|
if scope_values:
|
||||||
|
return scope_values[0]
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def check_scope(self, flow):
|
||||||
|
req = flow.request
|
||||||
|
res = flow.response
|
||||||
|
|
||||||
|
# req.query가 MultiDictView일 수 있으므로 문자열로 변환
|
||||||
|
if hasattr(req.query, "urlencode"):
|
||||||
|
query = req.query.urlencode()
|
||||||
|
else:
|
||||||
|
query = str(req.query) if req.query else ""
|
||||||
|
|
||||||
|
location = res.headers.get("location", "")
|
||||||
|
|
||||||
|
query_scope = self.get_scope_from_query(query)
|
||||||
|
location_scope = self.get_scope_from_query(location)
|
||||||
|
|
||||||
|
result = []
|
||||||
|
if query_scope in ["all", "*"]:
|
||||||
|
result.append(f"Scope value issue detected in request: {query_scope}")
|
||||||
|
if location_scope in ["all", "*"]:
|
||||||
|
result.append(f"Scope value issue detected in response location: {location_scope}")
|
||||||
|
|
||||||
|
return result if result else 0
|
||||||
|
|
||||||
|
async def test(self, flow):
|
||||||
|
req = flow.request
|
||||||
|
method = req.method
|
||||||
|
url = req.pretty_url
|
||||||
|
|
||||||
|
result = await self.check_scope(flow)
|
||||||
|
|
||||||
|
if result != 0:
|
||||||
|
report_data = [{
|
||||||
|
'target': target.load(),
|
||||||
|
'status': "WARNING",
|
||||||
|
'title': "OAuth scope value issue",
|
||||||
|
'description': f"{method} {url}: {', '.join(result)}",
|
||||||
|
'uri': url
|
||||||
|
}]
|
||||||
|
save_report(report_data)
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
from mitmproxy import http
|
from mitmproxy import http
|
||||||
import asyncio
|
import asyncio
|
||||||
from pkce_check import PKCEDowngradeChecker
|
from pkce_check import PKCEDowngradeChecker
|
||||||
|
from ScopeDetection import ScopeDetection
|
||||||
|
|
||||||
class PKCEAddon:
|
class PKCEAddon:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
|
@ -13,4 +14,25 @@ class PKCEAddon:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[ERROR] Addon failed: {e}")
|
print(f"[ERROR] Addon failed: {e}")
|
||||||
|
|
||||||
addons = [PKCEAddon()]
|
class ScopeAddon:
|
||||||
|
def __init__(self):
|
||||||
|
self.checker = ScopeDetection()
|
||||||
|
self._flow_map = {} # 요청 정보를 저장
|
||||||
|
|
||||||
|
async def request(self, flow: http.HTTPFlow):
|
||||||
|
self._flow_map[flow.id] = {
|
||||||
|
"method": flow.request.method,
|
||||||
|
"url": flow.request.pretty_url,
|
||||||
|
"query": flow.request.query,
|
||||||
|
}
|
||||||
|
|
||||||
|
async def response(self, flow: http.HTTPFlow):
|
||||||
|
try:
|
||||||
|
await self.checker.test(flow)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] ScopeDetection failed: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
addons = [PKCEAddon(),
|
||||||
|
ScopeAddon()
|
||||||
|
]
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue