diff --git a/addon/client_secret.py b/addon/client_secret.py new file mode 100644 index 0000000..00c7386 --- /dev/null +++ b/addon/client_secret.py @@ -0,0 +1,29 @@ +from lib.report_vuln import report_vuln +from urllib.parse import urlparse, parse_qs + +class ClientSecret: + def get_target_from_query(self, query: str, target: str) -> str | None: + if not query: + return None + parsed = parse_qs(query) + scope_values = parsed.get(target, []) + if scope_values: + return scope_values[0] + return None + + async def test(self, flow): + req = flow.request + + parsed = urlparse(req.pretty_url) + query = parsed.query + + query_client_id = self.get_target_from_query(query, "client_id") + query_client_secret = self.get_target_from_query(query, "client_secret") + + if query_client_id and query_client_secret: + report_vuln( + title="OAuth Client Secret Exposure", + desc=f"Client ID and Secret found in request: {query_client_id}, {query_client_secret}", + status="CRITICAL", + uri=req.pretty_url + ) diff --git a/addon/google_login_hint.py b/addon/google_login_hint.py index fee0584..3abaf73 100644 --- a/addon/google_login_hint.py +++ b/addon/google_login_hint.py @@ -44,8 +44,8 @@ class GoogleLoginHint: # 요청 URL 수정 - URL과 호스트 모두 업데이트 flow.request.url = new_url - flow.request.pretty_url = new_url print(f"🔄 Modified URL: {new_url}") + def _is_google_oauth_url(self, url): """Google OAuth URL인지 확인""" google_oauth_domains = [ diff --git a/addon/google_response_type_token.py b/addon/google_response_type_token.py new file mode 100644 index 0000000..12cdf92 --- /dev/null +++ b/addon/google_response_type_token.py @@ -0,0 +1,52 @@ +from lib.report_vuln import report_vuln +import httpx +from lib.utils.is_oauth_uri import is_oauth_uri +from urllib.parse import urlparse, parse_qs + +class GoogleResponseTypeToken: + def get_taregt_from_query(self, query: str, target: str) -> str | None: + if not query: + return None + parsed = parse_qs(query) + scope_values = parsed.get(target, []) + if scope_values: + return scope_values[0] + return None + + async def test(self, flow): + req = flow.request + + if not is_oauth_uri(req.pretty_url): + return + + if req.pretty_host != "accounts.google.com": + return + + if "response_type=token" in req.pretty_url: + return + + url = f"{req.pretty_url}".replace("response_type=code", "response_type=token") + + async with httpx.AsyncClient(follow_redirects=True) as cli: + response = await cli.request( + method=req.method, + url=url, + headers=req.headers, + content=req.get_content(), + ) + + + if response.status_code >= 400: + return + + if "400." in response.text: + return + + if "response_type=token" in str(response.url): + report_vuln( + "Google Response Type Token", + f"Response type token allowed in {req.pretty_url}", + "HIGH", + str(response.url) + ) + diff --git a/addon/init.py b/addon/init.py index 632af8a..77fb210 100644 --- a/addon/init.py +++ b/addon/init.py @@ -3,10 +3,11 @@ import asyncio from pkce_check import PKCEDowngradeChecker from addon.scope_detection import ScopeDetection from csrf_check import CsrfChecker -from nonce_check import NonceChecker +from client_secret import ClientSecret from redirect_uri_check import RedirectBypassChecker from access_token import AccessTokenScanner from addon.google_login_hint import GoogleLoginHint +from addon.google_response_type_token import GoogleResponseTypeToken import os from dotenv import load_dotenv from lib.utils.try_catch import try_catch @@ -82,9 +83,10 @@ class AddonBase: tasks = [ try_catch(CsrfChecker().response(flow)), try_catch(ScopeDetection().test(flow)), - # try_catch(NonceChecker().check_nonce_in_request(flow)), + try_catch(ClientSecret().test(flow)), try_catch(AccessTokenScanner().scan(flow)), try_catch(RedirectBypassChecker().test(flow)), + try_catch(GoogleResponseTypeToken().test(flow)), ] await asyncio.gather(*tasks) diff --git a/addon/redirect_uri_check.py b/addon/redirect_uri_check.py index f6af225..83c0ac6 100644 --- a/addon/redirect_uri_check.py +++ b/addon/redirect_uri_check.py @@ -1296,6 +1296,8 @@ class RedirectBypassChecker: query = parse_qs(parsed.query) # location 헤더에 code가 없으면 스킵 + # TODO: 우리가 탐지하는 IdP가 제한적이기 때문에 각 IdP의 uri 패턴을 탐지하는 것도 좋아보임 + # Loaction 헤더에 담긴 인가 코드 뿐만 아니라, script나 다른 방식으로도 인가 코드가 전달될 수 있음 location = flow.response.headers.get("Location", "") if not self._is_code_in_location(location): return