diff --git a/lib/agents/__init__.py b/lib/agents/__init__.py new file mode 100644 index 0000000..c8eac1d --- /dev/null +++ b/lib/agents/__init__.py @@ -0,0 +1,9 @@ +from lib.agents.get_sso_list import get_sso_list +# 업데이트될 버전 import 아직 개발 중 +from lib.agents.get_sso_list_v2 import get_sso_list as get_sso_list_v2 +from lib.agents.login_google import login_google + +__all__ = [ + "get_sso_list", + "login_google", +] \ No newline at end of file diff --git a/lib/agents/find_login_page.py b/lib/agents/find_login_page.py deleted file mode 100644 index f0c0dc7..0000000 --- a/lib/agents/find_login_page.py +++ /dev/null @@ -1,92 +0,0 @@ -import json -from pydantic import BaseModel -from browser_use import ( - Agent, - Controller, -) -from lib.agents.run_agent import run_agent -from lib.utils.logger import logger -from lib.browser_use_utils.create_google_ai import create_google_ai -from lib.config import GOOGLE_MODEL, GOOGLE_PLANNER_MODEL - -NOT_FOUND_LOGIN_PAGE = 0 -FOUND_LOGIN_PAGE = 1 - -class FindLoginPageResponse(BaseModel): - status: int = NOT_FOUND_LOGIN_PAGE # 0 if not found, 1 if found - msg: str | None = None - url: str | None = None - -async def find_login_page(target_url, session) -> tuple[bool, str | None]: - initial_actions = [{"open_tab": {"url": target_url}}] - task = """ - You are an expert in finding login pages. - - Your task is to navigate to the login page of the given URL. Follow the steps below strictly and return results only in the specified format. - - ※ You are NOT allowed to navigate to URLs that are not directly discoverable within the initial domain. Do NOT use search engines or guess external login URLs. - - 0. INITIAL BLOCK CHECK - - If the browser is blocked when trying to access the page — due to firewall, CAPTCHA, regional restrictions, or other access denials — immediately terminate the process and return the following JSON: - ```json - { - "status": 0, - "msg": "Blocked", - "url": "" - } - ``` - - Do NOT proceed to further steps in this case. - - 1. LOGIN PAGE NAVIGATION - - Navigate only to a **client-side (non-enterprise)** login page within the provided domain. - - Do NOT rely on external tools, search engines, or links not directly found on the site. - - If a consent popup (e.g. for privacy/cookies) appears, you MUST dismiss or close it before proceeding. - - Since step 0 confirmed access, assume the page now loads properly. - - 2. RETURN FORMAT - - Once the login page is reached, return a JSON object matching the following schema: - ```json - { - "status": 1, // 1 if login page is found, 0 otherwise - "msg": "Login page found", // Optional message - "url": "https://example.com/login" // Full URL of the login page if found - } - ``` - - If the login page cannot be found, return: - ```json - { - "status": 0, - "msg": "Login page not found", - "url": "" - } - ``` - - Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output. - """ - - - controller = Controller(output_model=FindLoginPageResponse, exclude_actions=['search_google']) - agent = Agent( - browser_session=session, - initial_actions=initial_actions, - task=task, - llm=create_google_ai(GOOGLE_MODEL), - controller=controller, - ) - - is_failed, final_result = await run_agent(agent) - if is_failed: - logger(f"⚠️ 스캔 실패: {target_url} | {final_result}") - print(f"⚠️ 스캔 실패: {target_url} | {final_result}") - return False, None; - - data = json.loads(final_result) - try: - resp = FindLoginPageResponse(**data) - if resp.status == FOUND_LOGIN_PAGE and len(resp.url) > 0: - return True, resp.url - else: - return False, resp.msg - except Exception as e: - logger(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}") - print(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}") - return False, data.msg diff --git a/lib/agents/get_sso_list.py b/lib/agents/get_sso_list.py deleted file mode 100644 index 2276fc6..0000000 --- a/lib/agents/get_sso_list.py +++ /dev/null @@ -1,66 +0,0 @@ -import json -from pydantic import BaseModel -from browser_use import ( - Agent, - Controller, -) -from lib.agents.run_agent import run_agent -from lib.utils.logger import logger -from lib.browser_use_utils.create_google_ai import create_google_ai -from lib.config import GOOGLE_MODEL, GOOGLE_PLANNER_MODEL - -NOT_FOUND_SSO_LIST = 0 -FOUND_SSO_LIST = 1 - -class EachSSOProvider(BaseModel): - provider: str - oauth_uri: str | None = None - -class FindLoginPageResponse(BaseModel): - EachSSOProviders: list[EachSSOProvider] | None = None - status: int = NOT_FOUND_SSO_LIST # 0 if not found, - msg: str | None = None - -async def get_sso_list(target_url, session) -> tuple[bool, str | None]: - initial_actions = [{"open_tab": {"url": target_url}}] - task = "Navigate to the login page, and return the result in the specified format." - extend_planner_system_message = """ - You are an expert in finding login pages. - Your task is to navigate to the login page of the given URL. - Once you reach the login page, stop and return a JSON object that matches the following schema: - ```json - { - "status": 1, # 1 if login page found, 0 otherwise - "url": "https://example.com/login" # Full URL of the login page if found - } - Return only this JSON object. Do not include any explanation or additional text. - """ - - controller = Controller(output_model=FindLoginPageResponse, exclude_actions=['search_google']) - agent = Agent( - browser_session=session, - initial_actions=initial_actions, - task=task, - llm=create_google_ai(GOOGLE_MODEL), - planner_llm=create_google_ai(GOOGLE_PLANNER_MODEL), - controller=controller, - extend_planner_system_message=extend_planner_system_message, - ) - - is_failed, final_result = await run_agent(agent) - if is_failed: - logger(f"⚠️ 스캔 실패: {target_url} | {final_result}") - print(f"⚠️ 스캔 실패: {target_url} | {final_result}") - return False, None; - - data = json.loads(final_result) - try: - resp = FindLoginPageResponse(**data) - if resp.status == FOUND_SSO_LIST: - return True, resp - else: - return False, None - except Exception as e: - logger(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}") - print(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}") - return False, data.msg diff --git a/lib/agents/get_sso_list/__init__.py b/lib/agents/get_sso_list/__init__.py new file mode 100644 index 0000000..1bab0f1 --- /dev/null +++ b/lib/agents/get_sso_list/__init__.py @@ -0,0 +1,3 @@ +from lib.agents.get_sso_list.get_sso_list import get_sso_list + +__all__ = ["get_sso_list"] \ No newline at end of file diff --git a/lib/agents/get_sso_list/get_sso_list.py b/lib/agents/get_sso_list/get_sso_list.py new file mode 100644 index 0000000..40f417b --- /dev/null +++ b/lib/agents/get_sso_list/get_sso_list.py @@ -0,0 +1,22 @@ +from lib.agents.get_sso_list.prompt import get_sso_list_task, FindLoginPageResponse +from lib.browser_use_utils.run_task import run_task + + +NOT_FOUND_LOGIN_PAGE = 0 +FOUND_LOGIN_PAGE = 1 + +async def get_sso_list(target_url) -> tuple[bool, str | FindLoginPageResponse | None]: + + task = get_sso_list_task + ReturnModel = FindLoginPageResponse + success, response = await run_task(target_url, ReturnModel, task) + if not success: + return False, response + if isinstance(response, str): + return False, response + + return True, response + + + + diff --git a/lib/agents/get_sso_list/prompt.py b/lib/agents/get_sso_list/prompt.py new file mode 100644 index 0000000..e8798df --- /dev/null +++ b/lib/agents/get_sso_list/prompt.py @@ -0,0 +1,68 @@ +from pydantic import BaseModel + +class FindLoginPageResponse(BaseModel): + msg: str | None = None + url: str | None = None + sso_list: list[str] = [] # List of SSO providers found on the login page + +get_sso_list_task = """ +You are an expert in finding login pages. + +Your task is to navigate to the login page of the given URL. Follow the steps below strictly and return results only in the specified format. + +※ You are NOT allowed to navigate to URLs that are not directly discoverable within the initial domain. Do NOT use search engines or guess external login URLs. + +0. INITIAL BLOCK CHECK +- If the browser is blocked when trying to access the page — due to firewall, CAPTCHA, regional restrictions, or other access denials — immediately terminate the process and return the following JSON: + ```json + { + "msg": "Blocked", + "url": "", + "sso_list": [] + } + ``` +- Do NOT proceed to further steps in this case. + +1. LOGIN PAGE NAVIGATION +- Navigate only to a **client-side (non-enterprise)** login page within the provided domain. +- Do NOT rely on external tools, search engines, or links not directly found on the site. +- If a consent popup (e.g. for privacy/cookies) appears, you MUST dismiss or close it before proceeding. +- Since step 0 confirmed access, assume the page now loads properly. + +2. SSO BUTTON IDENTIFICATION +- On the login page, look for the following social login (SSO) buttons: + - Google, GitHub, Facebook, LinkedIn, Microsoft, Naver, Slack, Etc. +- ✅ Proceed only if it is clearly an **actual SSO button**. +- ❌ Exclude the following: + - Passkey-related buttons + - Username/password fields + - Email-based login + - Non-OAuth methods such as certificate or phone verification + +3. RETURN FORMAT +- If the login page is successfully found, return: + ```json + { + "msg": "Login page found", + "url": "https://example.com/login", + "sso_list": ["Google", "GitHub"] + } + ``` +- If the login page cannot be found, return: + ```json + { + "msg": "Login page not found", + "url": "", + "sso_list": [] + } + ``` +- If blocked (as in step 0), return: + ```json + { + "msg": "Blocked", + "url": "", + "sso_list": [] + } + ``` +- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output. +""" \ No newline at end of file diff --git a/lib/agents/get_sso_list_v2/__init__.py b/lib/agents/get_sso_list_v2/__init__.py new file mode 100644 index 0000000..89002e4 --- /dev/null +++ b/lib/agents/get_sso_list_v2/__init__.py @@ -0,0 +1,3 @@ +from lib.agents.get_sso_list_v2 import get_sso_list + +__all__ = ["get_sso_list"] \ No newline at end of file diff --git a/lib/agents/get_sso_list_v2/get_sso_list.py b/lib/agents/get_sso_list_v2/get_sso_list.py new file mode 100644 index 0000000..582f262 --- /dev/null +++ b/lib/agents/get_sso_list_v2/get_sso_list.py @@ -0,0 +1,20 @@ +from lib.agents.get_sso_list_v2.prompt import get_sso_list_task, FindLoginPageResponse +from lib.browser_use_utils.run_task import run_task + +# TODO - Split find login page agent and get SSO list agent + +async def get_sso_list(target_url) -> tuple[bool, str | FindLoginPageResponse | None]: + + task = get_sso_list_task + ReturnModel = FindLoginPageResponse + success, response = await run_task(target_url, ReturnModel, task) + if not success: + return False, response + if isinstance(response, str): + return False, response + + return True, response + + + + diff --git a/lib/agents/get_sso_list_v2/prompt.py b/lib/agents/get_sso_list_v2/prompt.py new file mode 100644 index 0000000..e8798df --- /dev/null +++ b/lib/agents/get_sso_list_v2/prompt.py @@ -0,0 +1,68 @@ +from pydantic import BaseModel + +class FindLoginPageResponse(BaseModel): + msg: str | None = None + url: str | None = None + sso_list: list[str] = [] # List of SSO providers found on the login page + +get_sso_list_task = """ +You are an expert in finding login pages. + +Your task is to navigate to the login page of the given URL. Follow the steps below strictly and return results only in the specified format. + +※ You are NOT allowed to navigate to URLs that are not directly discoverable within the initial domain. Do NOT use search engines or guess external login URLs. + +0. INITIAL BLOCK CHECK +- If the browser is blocked when trying to access the page — due to firewall, CAPTCHA, regional restrictions, or other access denials — immediately terminate the process and return the following JSON: + ```json + { + "msg": "Blocked", + "url": "", + "sso_list": [] + } + ``` +- Do NOT proceed to further steps in this case. + +1. LOGIN PAGE NAVIGATION +- Navigate only to a **client-side (non-enterprise)** login page within the provided domain. +- Do NOT rely on external tools, search engines, or links not directly found on the site. +- If a consent popup (e.g. for privacy/cookies) appears, you MUST dismiss or close it before proceeding. +- Since step 0 confirmed access, assume the page now loads properly. + +2. SSO BUTTON IDENTIFICATION +- On the login page, look for the following social login (SSO) buttons: + - Google, GitHub, Facebook, LinkedIn, Microsoft, Naver, Slack, Etc. +- ✅ Proceed only if it is clearly an **actual SSO button**. +- ❌ Exclude the following: + - Passkey-related buttons + - Username/password fields + - Email-based login + - Non-OAuth methods such as certificate or phone verification + +3. RETURN FORMAT +- If the login page is successfully found, return: + ```json + { + "msg": "Login page found", + "url": "https://example.com/login", + "sso_list": ["Google", "GitHub"] + } + ``` +- If the login page cannot be found, return: + ```json + { + "msg": "Login page not found", + "url": "", + "sso_list": [] + } + ``` +- If blocked (as in step 0), return: + ```json + { + "msg": "Blocked", + "url": "", + "sso_list": [] + } + ``` +- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output. +""" \ No newline at end of file diff --git a/lib/agents/login_google/__init__.py b/lib/agents/login_google/__init__.py new file mode 100644 index 0000000..68e515d --- /dev/null +++ b/lib/agents/login_google/__init__.py @@ -0,0 +1,3 @@ +from lib.agents.login_google.login_google import login_google + +__all__ = ["login_google"] \ No newline at end of file diff --git a/lib/agents/login_google/login_google.py b/lib/agents/login_google/login_google.py new file mode 100644 index 0000000..1fe171c --- /dev/null +++ b/lib/agents/login_google/login_google.py @@ -0,0 +1,11 @@ +from lib.agents.login_google.prompt import login_google_task, LoginGoogleResponse +from lib.browser_use_utils.run_task import run_task + +async def login_google(target_url) -> tuple[bool, str | LoginGoogleResponse | None]: + task = login_google_task + ReturnModel = LoginGoogleResponse + success, response = await run_task(target_url, ReturnModel, task) + if not success: + return False, None + + return True, response diff --git a/lib/agents/login_google/prompt.py b/lib/agents/login_google/prompt.py new file mode 100644 index 0000000..98d34ec --- /dev/null +++ b/lib/agents/login_google/prompt.py @@ -0,0 +1,63 @@ +from pydantic import BaseModel +from lib.config import GOOGLE_ID, GOOGLE_PASSWORD + +class LoginGoogleResponse(BaseModel): + msg: str | None = None + status: str | None = None # "success", "mfa_required", "google_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials" + final_url: str | None = None + +login_google_task = f""" +You are a web automation agent. + +Your task is to visit the given domain and perform a full login via the **Google SSO button**, following all steps strictly as described below. + +▶ Target: Find a login page inside this domain that allows "Sign in with Google", and use it to complete login via Google. + +Instructions: + +1. If any cookie or privacy popups appear, dismiss or accept them. +2. Navigate through the site's UI to find the **login or sign-in page** (e.g., via buttons like "Log In", "Sign In", "Get Started"). + - Only follow links within the same domain. +3. On the login page, look for a clearly labeled **Google SSO button** — typically labeled as: + - "Continue with Google" + - "Sign in with Google" + - or a button with the Google 'G' icon +4. Click the **Google login button**. + - ⚠️ The Google login flow MUST open in a **new browser tab** (not a new window or popup). + - ❌ If the login opens in a new **window** or **popup**, do NOT continue. Immediately stop and return the appropriate status. +5. Check if the user is **already logged in to Google and immediately redirected back to the original site** without showing a Google login screen. + - ✅ If so, treat the login as successful and return immediately. +6. If redirected to the Google login page: + - If a **CAPTCHA**, **MFA prompt**, or a request for **ID/password entry** appears, do NOT proceed. + - Immediately stop and return the appropriate status. +7. If login proceeds without interruptions, wait for redirection back to the original site and record the final URL. + +Credentials to use for Google login: +- Email: {GOOGLE_ID} +- Password: {GOOGLE_PASSWORD} + +Constraints: +- Do NOT use search engines or guess URLs. +- Do NOT use autofill, saved sessions, or cookies. +- Do NOT proceed with login if: + - The login opens in a new window (only tabs are allowed) + - CAPTCHA or MFA appears + - ID/password input is required +- If the user is already logged in to Google and redirected back automatically, stop there and report success. +- If the login page cannot be found, return "login_page_not_found". +- If the Google login button is not found, return "sso_not_found". +- If a page such as a sign-up page appears, consider it a successful login and terminate immediately. + +Final Output: +Return the result in the following format only: + +```json +{{ + "msg": "Google login completed", + "status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "google_blocked" | "sso_not_found" | "login_page_not_found", + "final_url": "" +}} +``` + +- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output. +""" \ No newline at end of file diff --git a/lib/agents/run_agent.py b/lib/agents/run_agent.py deleted file mode 100644 index 7bcb686..0000000 --- a/lib/agents/run_agent.py +++ /dev/null @@ -1,19 +0,0 @@ -from lib.browser_use_utils.clean_resources import clean_agent_resources - -async def run_agent(agent) -> tuple[int, str]: - try: - response = await agent.run() - final_result = response.final_result() - - if final_result is None: - return -1, "최종 결과가 없습니다. 에이전트 실행 실패" - return 0, final_result - except Exception as e: - # API 쿼터 문제인지 확인 - if "ResourceExhausted" in str(e) or "429" in str(e): - return 1, "API 쿼터 에러로 인한 실패" - # 일반 에러 처리 - else: - return 2, "일반 에러로 인한 실패" - finally: - await clean_agent_resources(agent) \ No newline at end of file diff --git a/lib/browser_use_utils/__init__.py b/lib/browser_use_utils/__init__.py new file mode 100644 index 0000000..d7cb4c5 --- /dev/null +++ b/lib/browser_use_utils/__init__.py @@ -0,0 +1,15 @@ +from lib.browser_use_utils.clean_resources import clean_resources, clean_agent_resources, clean_session_resources +from lib.browser_use_utils.create_google_ai import create_google_ai +from lib.browser_use_utils.get_profile import get_profile +from lib.browser_use_utils.run_agent import run_agent +from lib.browser_use_utils.run_task import run_task + +__all__ = [ + "clean_resources", + "clean_agent_resources", + "clean_session_resources", + "create_google_ai", + "get_profile", + "run_agent", + "run_task", +] diff --git a/lib/browser_use_utils/run_agent.py b/lib/browser_use_utils/run_agent.py new file mode 100644 index 0000000..b18bca2 --- /dev/null +++ b/lib/browser_use_utils/run_agent.py @@ -0,0 +1,40 @@ +from typing import Any +from pydantic import BaseModel +from lib.browser_use_utils.clean_resources import clean_agent_resources +from lib.config import GOOGLE_MODEL +from browser_use import ( + Agent, + Controller, +) +from lib.browser_use_utils.create_google_ai import create_google_ai + + +async def run_agent(session, initial_actions, ReturnModel: type[BaseModel], task: str) -> tuple[bool, str, Any | None]: + + controller = Controller(output_model=ReturnModel, exclude_actions=['search_google']) + agent = Agent( + browser_session=session, + initial_actions=initial_actions, + task=task, + llm=create_google_ai(GOOGLE_MODEL), + controller=controller, + ) + + try: + response = await agent.run() + final_result = response.final_result() + + if final_result is None: + return False, "LLM이 반환한 최종 결과가 없습니다.", None + except Exception as e: + # API 쿼터 문제인지 확인 + if "ResourceExhausted" in str(e) or "429" in str(e): + return False, "API 쿼터 에러로 인한 실패", None + # 일반 에러 처리 + else: + return False, "일반 에러로 인한 실패", None + finally: + await clean_agent_resources(agent) + + return True, "ok", final_result + diff --git a/lib/browser_use_utils/run_task.py b/lib/browser_use_utils/run_task.py new file mode 100644 index 0000000..1604502 --- /dev/null +++ b/lib/browser_use_utils/run_task.py @@ -0,0 +1,40 @@ +import json +from typing import Any +from pydantic import BaseModel +from browser_use import ( + BrowserSession +) +from patchright.async_api import async_playwright as async_patchright +from lib.utils.logger import logger +from lib.browser_use_utils import get_profile, clean_session_resources, run_agent + + +async def run_task(target_url: str, ReturnModel: type[BaseModel], task: str) -> tuple[bool, type[BaseModel] | None]: + session = BrowserSession( + playwright=(await async_patchright().start()), + browser_profile=await get_profile(), + ) + + initial_actions = [{"open_tab": {"url": target_url}}] + + seccess, msg, final_result = await run_agent(session=session, + initial_actions=initial_actions, + ReturnModel=ReturnModel, + task=task) + if not seccess: + logger(f"⚠️ LLM 실행 실패: {target_url} | {msg}") + print(f"⚠️ LLM 실행 실패: {target_url} | {msg}") + await clean_session_resources(session) + return False, None + + try: + data = json.loads(final_result) + resp = ReturnModel(**data) + return True, resp + except Exception as e: + logger(f"⚠️ LLM 응답 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}") + print(f"⚠️ LLM 응답 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}") + return False, None + finally: + await clean_session_resources(session) + diff --git a/lib/config.py b/lib/config.py index 9d1d5ac..5e8fc8d 100644 --- a/lib/config.py +++ b/lib/config.py @@ -4,5 +4,7 @@ load_dotenv(verbose=True, override=True) BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:11081") GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") -GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash-preview-05-20") -GOOGLE_PLANNER_MODEL = os.getenv("GOOGLE_PLANNER_MODEL", "gemini-2.5-pro-preview-06-05") \ No newline at end of file +GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash") + +GOOGLE_ID = os.getenv("GOOGLE_ID", "google") +GOOGLE_PASSWORD = os.getenv("GOOGLE_PASSWORD", "google") \ No newline at end of file diff --git a/lib/find_sso_list.py b/lib/find_sso_list.py deleted file mode 100644 index 11fd608..0000000 --- a/lib/find_sso_list.py +++ /dev/null @@ -1,65 +0,0 @@ -import asyncio -from browser_use import Agent, BrowserSession -from patchright.async_api import async_playwright as async_patchright -from lib.agents.find_login_page import find_login_page -from lib.browser_use_utils.clean_resources import clean_session_resources -from lib.browser_use_utils.get_profile import get_profile -from lib.utils.save_oauth_providers import save_oauth_providers - -async def find_sso_list(target_url): - session = BrowserSession( - playwright=(await async_patchright().start()), - browser_profile=await get_profile(), - ) - - FIND_LOGIN_PAGE = 1 - FIND_SSO_LIST = 2 - SAVE_DATA = 3 - WHEN_ERROR = -1 - FINISH = 0 - - final_result = None - login_url = target_url - state = FIND_LOGIN_PAGE - while True: - if state == FIND_LOGIN_PAGE: - is_success, resp = await find_login_page( - target_url=target_url, - session=session, - ) - if not is_success: - print(f"⚠️ 로그인 페이지 탐지 실패: {target_url} | {resp}") - state = WHEN_ERROR - login_url = resp if resp else target_url - state = FIND_SSO_LIST - - if state == FIND_SSO_LIST: - print(f"🔎 SSO 목록 찾는 중: {target_url}") - is_success, resp = await find_sso_list( - target_url=login_url, - session=session, - ) - if not is_success: - print(f"⚠️ SSO 목록 탐지 실패: {target_url} | {resp}") - state = WHEN_ERROR - final_result = "" - state = SAVE_DATA - - if state == SAVE_DATA: - print(f"💾 데이터 저장 중: {target_url}") - if not final_result: - print(f"⚠️ SSO 목록이 전달되지 않았습니다: {target_url}") - state = WHEN_ERROR - - save_oauth_providers(target_url, final_result) - state = FINISH - - if state == WHEN_ERROR: - print(f"⚠️ 에러 발생: {target_url} | 스캔을 중단합니다.") - return - - if state == FINISH: - print(f"✅ 스캔 완료: {target_url}") - break - - await clean_session_resources(session) \ No newline at end of file diff --git a/lib/utils/__init__.py b/lib/utils/__init__.py new file mode 100644 index 0000000..260fcda --- /dev/null +++ b/lib/utils/__init__.py @@ -0,0 +1,20 @@ +from lib.utils.env_checker import check_env_variables +from lib.utils.is_html import is_html_url +from lib.utils.logger import logger +from lib.utils.notify_backend import notify_backend +from lib.utils.progress_checker import save_progress, load_progress +# v2 import => 아직 개발 중 +from lib.utils.progress_checker_v2 import ProgressChecker +from lib.utils.read_txt import read_lines_between +from lib.utils.save_oauth_providers import save_oauth_providers + +__all__ = [ + "check_env_variables", + "is_html_url", + "logger", + "notify_backend", + "read_lines_between", + "save_progress", + "load_progress", + "save_oauth_providers", +] \ No newline at end of file diff --git a/lib/utils/env_checker.py b/lib/utils/env_checker.py index d92edd1..8255dde 100644 --- a/lib/utils/env_checker.py +++ b/lib/utils/env_checker.py @@ -8,7 +8,6 @@ def check_env_variables(): "BACKEND_URL", "GOOGLE_API_KEY", "GOOGLE_MODEL", - "GOOGLE_PLANNER_MODEL" ] for var in required_vars: diff --git a/lib/utils/progress_checker.py b/lib/utils/progress_checker.py index 3ee3d71..5cd5c4c 100644 --- a/lib/utils/progress_checker.py +++ b/lib/utils/progress_checker.py @@ -4,7 +4,6 @@ from pathlib import Path progress_file = Path("data/scan_progress.json") - def save_progress(current_progress): """현재 진행 상황을 파일에 저장""" with open(progress_file, 'w', encoding='utf-8') as f: diff --git a/lib/utils/progress_checker_v2.py b/lib/utils/progress_checker_v2.py new file mode 100644 index 0000000..9906321 --- /dev/null +++ b/lib/utils/progress_checker_v2.py @@ -0,0 +1,25 @@ +import json +import os +from pathlib import Path + +progress_file = Path("data/scan_progress.json") + +class ProgressChecker: + def __init__(self, filepath): + self.filepath = filepath + self.progress = self.load_progress() + + def save(self): + """현재 진행 상황을 파일에 저장""" + with open(self.filepath, 'w', encoding='utf-8') as f: + json.dump(self.progress, f, ensure_ascii=False, indent=2) + + def load(self): + """이전 진행 상황을 파일에서 불러오기""" + if os.path.exists(self.filepath): + try: + with open(self.filepath, 'r', encoding='utf-8') as f: + return json.load(f) + except: + return None + return None diff --git a/lib/utils/prompt.py b/lib/utils/prompt.py deleted file mode 100644 index d50a796..0000000 --- a/lib/utils/prompt.py +++ /dev/null @@ -1,90 +0,0 @@ -from dotenv import load_dotenv -import os - -load_dotenv() -google_id = os.getenv("GOOGLE_ID") -google_password = os.getenv("GOOGLE_PASSWORD") - -naver_id = os.getenv("NAVER_ID") -naver_password = os.getenv("NAVER_PASSWORD") - -facebook_id = os.getenv("FACEBOOK_ID") -facebook_password = os.getenv("FACEBOOK_PASSWORD") - -github_id = os.getenv("GITHUB_ID") -github_password = os.getenv("GITHUB_PASSWORD") - -# Extended planner prompt -extend_planner_system_message = f""" -🎯 Mission: Collect Initial SSO Redirect URLs (For Browser Automation) - -※ **모든 STEP에서 구글 검색, Bing 검색 등 어떤 외부 검색 기능도 절대 사용하지 않고, 초기에 주어진 URL에서 탐색하세요.** -※ **초기에 주어진 URL 내에서 실제로 확인되지 않은 URL로 직접 이동하는것은 허용되지 않습니다.** - -0. **초기 블록(Block) 체크** - - 브라우저가 로그인 페이지에 접근하려 할 때, **페이지가 차단(blocked)** 되거나 **방화벽, CAPTCHA, 접근 제한** 등으로 인해 정상적으로 로드되지 않으면 즉시 프로세스를 종료하고 아래 JSON만 반환해야 합니다. - ```json - [ - {{ - "provider": "Blocked", - "oauth_uri": "-" - }} - ] - ``` - - 이후 단계로 절대 넘어가지 않도록 합니다. - -1. **로그인 페이지 탐색** - - **클라이언트(비엔터프라이즈) 로그인 페이지**로 직접 이동합니다. **검색 엔진을 사용하여 찾아서는 안 됩니다.** - - 접근 후 **개인정보/쿠키/동의 팝업**이 뜨면, 이를 반드시 **닫거나(Dismiss)** 처리하고 계속 진행합니다. - - (이미 0단계에서 블록 여부를 확인했으므로, 이 단계에서는 페이지가 정상 로드되었다고 가정합니다.) - -2. **SSO 버튼 식별** - - 로그인 페이지에서 다음과 같은 소셜 로그인 버튼을 찾습니다: - - Google, GitHub, Facebook, Linkedin, Microsoft, Naver” - - ✅ **실제 SSO 버튼**임이 명확히 확인되는 경우에만 진행합니다. - - ❌ 제외 대상: - - “Passkey” 관련 버튼 - - 아이디/비밀번호 입력란 - - 이메일 기반 로그인 - - 인증서, 휴대폰 인증 등 비-OAuth 로그인 옵션 - -3. **SSO 버튼 클릭 및 로그인 시도** - - 유효한 SSO 버튼이 발견되면, 버튼을 클릭합니다. - - 클릭 후 **첫 번째로 리디렉션된 URL(쿼리 스트링 포함)**을 `oauth_uri`로 기록합니다. - - 공급자 페이지가 열리면, 아래 자격증명을 이용해 로그인을 시도합니다, 아래 자격증명에 포함되지 않는 SSO 버튼도 클릭까지는 시도합니다.: - - Google → `{google_id}` / `{google_password}` - - Naver → `{naver_id}` / `{naver_password}` - - GitHub → `{github_id}` / `{github_password}` - - facebook → `{facebook_id}` / `{facebook_password}` - - **자격증명이 주어진 SSO 버튼인 경우 로그인 과정을 꼭 진행합니다.** - - 로그인 과정이 모두 끝나거나 로그인이 되지 않는 경우 세션 및 쿠키를 모두 삭제하고 페이지를 새로고침합니다. - - 한번이라도 SSO 버튼을 클릭한 경우, 해당 버튼은 더 이상 탐색하지 않습니다. - - id/pw 입력 성공 시, 아직 로그인되지 않았다면, 최대 5초간 대기합니다. - - 아직 로그인을 시도하지 않은 SSO 버튼이 있다면 이전 단계인 1. **로그인 페이지 탐색**, 2. **SSO 버튼 식별**, 3. **SSO 버튼 클릭 및 로그인 시도** 로 돌아가 절차를 반복합니다. - - 최종 결과는 다음과 같이 기록합니다: -```json - [ - {{ - "provider": "Google", - "oauth_uri": "(optional) https://example.com/auth/google?client_id=...", - }}, - {{ - "provider": "Naver", - "oauth_uri": "(optional) https://example.com/auth/naver?client_id=...", - }} - ] - ``` - -4. **SSO 버튼 미발견 또는 오류 발생 시** - - 페이지 내부에 유효한 SSO 버튼이 전혀 없거나, 탐색 중 예기치 않은 오류가 발생하면 즉시 프로세스를 종료하고 **빈 배열**을 반환합니다: - ```json - [] - ``` - -5. **중요 사항** - - **반드시** 위의 단계들을 순서대로 수행해야 하며, 각 단계에서 발생하는 예외 상황을 정확히 처리해야 합니다. - - **반복 행동**이 감지되면 즉시 빈 배열을 반환하고, **블록된 페이지**는 초기 단계에서 처리하여 프로세스를 종료해야 합니다. - - **SSO 버튼이 발견되지 않거나, 오류가 발생한 경우에도 빈 배열을 반환해야 합니다.** - - **반드시** JSON 형식으로 결과를 반환해야 하며, 다른 형식은 허용되지 않습니다. - - 최대한 효율적인 단계로 진행하며, 불필요한 반복이나 검색 엔진 사용을 피해야 합니다. -""" \ No newline at end of file diff --git a/main.py b/main.py index 34a746a..d2ef63a 100644 --- a/main.py +++ b/main.py @@ -3,18 +3,18 @@ import argparse import signal from dotenv import load_dotenv from lib.config import BACKEND_URL -from lib.utils.notify_backend import notify_backend -from lib.utils.is_html import is_html_url -from lib.utils.read_txt import read_lines_between -from lib.utils.progress_checker import save_progress, load_progress -from lib.utils.env_checker import check_env_variables -from lib.find_sso_list import find_sso_list +from lib.utils import notify_backend, is_html_url, read_lines_between, save_progress, load_progress, check_env_variables +from lib.agents import get_sso_list, login_google load_dotenv() check_env_variables() backend_url = BACKEND_URL +login_agents = { + "google": login_google +} + # ── URL별로 Browser를 새로 띄우는 함수 ── async def scan_one_url(url: str, skip_html_check: bool = False): target_url = url if url.startswith("http") else f"https://{url}" @@ -28,33 +28,28 @@ async def scan_one_url(url: str, skip_html_check: bool = False): # Backend에 스캔 시작 알림 notify_backend(target_url) - await find_sso_list(target_url) + success, response = await get_sso_list(target_url) + if not success: + return + if len(response.sso_list) == 0: + return + for sso in response.sso_list: + target_login_agent = login_agents.get(sso.lower()) + if target_login_agent: + print(f"🔍 {target_url} 에서 SSO 발견: {sso}, 로그인 시도 중...") + success, login_response = await target_login_agent(target_url) + if not success: + print(f"⚠️ {target_url} 에서 {sso} 로그인 실패") + continue + + print(f"✅ {target_url} 에서 {sso} 로그인 성공: {login_response.final_url}") + else: + print(f"✅ {target_url} 에서 SSO 발견: {sso} | TODO") + + # Backend에 스캔 완료 알림 + # 오탐 검증 - - - - # # 5) 결과 출력 - # print("-" * 50) - # print(f"🔗 Scanned URL: {url}\n") - # print("🔐 Detected OAuth Providers and URLs:") - # for entry in oauth_entries: - # if "<" in entry.oauth_uri or "..." in entry.oauth_uri: - # print(f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n") - # else: - # print(f"- {entry.provider}: {entry.oauth_uri}") - # print("-" * 50) - - # # 6) CSV에 저장 (append) - # csv_file = "./oauth_providers.csv" - # file_exists = os.path.isfile(csv_file) - # with open(csv_file, "a", newline="", encoding="utf-8") as f: - # writer = csv.writer(f) - # if not file_exists: - # writer.writerow(["issuer", "provider", "oauth_uri"]) - # for entry in oauth_entries: - # writer.writerow([url, entry.provider, entry.oauth_uri]) - # print(f"✅ OAuth providers saved to {csv_file}\n") @@ -111,7 +106,7 @@ async def loop( current_progress["current_index"] = actual_index print(f"\n🔄 Processing {actual_index + 1}/{current_progress['total']}: {url}") - print(f"📍 domains.txt의 {start_line + actual_index}번째 줄") + print(f"📍 domains.txt의 {actual_index}번째 줄") await scan_one_url(url, skip_html_check=skip_html_check) diff --git a/uv.lock b/uv.lock index 964f411..1819596 100644 --- a/uv.lock +++ b/uv.lock @@ -75,30 +75,30 @@ wheels = [ [[package]] name = "boto3" -version = "1.38.41" +version = "1.38.44" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "botocore" }, { name = "jmespath" }, { name = "s3transfer" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/2f/3b/f421b30e32c33ce63f0de3b32ea12954039a4595c693db4ea4900babe742/boto3-1.38.41.tar.gz", hash = "sha256:c6710fc533c8e1f5d1f025c74ffe1222c3659094cd51c076ec50c201a54c8f22", size = 111835, upload-time = "2025-06-20T19:26:41.584Z" } +sdist = { url = "https://files.pythonhosted.org/packages/7b/7f/ea50e25a049072c0078045437d25fc9c8eaec4bd58f2cc340e6ed52e55cd/boto3-1.38.44.tar.gz", hash = "sha256:af1769dfb2a8a30eec24d0b74a8c17db2accc5a6224d4fab39dd36df6590f741", size = 111899, upload-time = "2025-06-25T19:27:40.825Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/0a/bb/541825bf9811eb7fe13a357e691dc4cfead56a5fed4556aa101dc62e06ca/boto3-1.38.41-py3-none-any.whl", hash = "sha256:6119e9f272b9f004f052ca78ce94d3fe10198bc159ae808f75c0e1b9c07518bd", size = 139922, upload-time = "2025-06-20T19:26:39.963Z" }, + { url = "https://files.pythonhosted.org/packages/17/73/4a1bbd696e492f17064e7404c49d4d3bafcc8b50239ec6624c10ea824dd1/boto3-1.38.44-py3-none-any.whl", hash = "sha256:73fcb2f8c7bec25d17e3f1940a1776c515b458b3da77ad3a31a177479591028b", size = 139923, upload-time = "2025-06-25T19:27:38.748Z" }, ] [[package]] name = "botocore" -version = "1.38.41" +version = "1.38.44" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "jmespath" }, { name = "python-dateutil" }, { name = "urllib3" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/98/46/cb33f5a0b00086a97c4eebbc4e0211fe85d66d45e53a9545b33805f25b31/botocore-1.38.41.tar.gz", hash = "sha256:98e3fed636ebb519320c4b2d078db6fa6099b052b4bb9b5c66632a5a7fe72507", size = 14031081, upload-time = "2025-06-20T19:26:31.365Z" } +sdist = { url = "https://files.pythonhosted.org/packages/31/06/c6e652e8b449837218d83cedda9c54104cfd5d38dc97762044a40116b209/botocore-1.38.44.tar.gz", hash = "sha256:8d54795a084204e4cd7885d9307e4bfaccc96411dc0384f6ba240b515c45bf54", size = 14050056, upload-time = "2025-06-25T19:27:29.354Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/ec/b7/37d9f1a633e72250408cb7d53d8915561ac6108b5c3a1973eb8f53ce2990/botocore-1.38.41-py3-none-any.whl", hash = "sha256:06069a06f1352accb1f6c9505d6e323753627112be80a9d2e057c6d9c9779ffd", size = 13690225, upload-time = "2025-06-20T19:26:26.014Z" }, + { url = "https://files.pythonhosted.org/packages/ad/85/e3cd7bf4237af134a90290c8e37bf7f786c5e58b9ff98eeb0495615e3985/botocore-1.38.44-py3-none-any.whl", hash = "sha256:d0171ac6ec0bfdf86083b41c801f212e2b2d5756a61ea1d45af2051f21dbf886", size = 13710700, upload-time = "2025-06-25T19:27:23.645Z" }, ] [[package]] @@ -156,7 +156,7 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "browser-use", extras = ["memory"], specifier = "==0.2.7" }, - { name = "patchright", specifier = ">=1.52.5" }, + { name = "patchright", specifier = "==1.52.5" }, ] [[package]] @@ -809,7 +809,7 @@ wheels = [ [[package]] name = "mem0ai" -version = "0.1.110" +version = "0.1.111" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "openai" }, @@ -819,9 +819,9 @@ dependencies = [ { name = "qdrant-client" }, { name = "sqlalchemy" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/8d/fd/95c6285ad55a5fb78df17f15b5710273d59ae687c6ff79dcd03acb15e24f/mem0ai-0.1.110.tar.gz", hash = "sha256:8a9b6f45c2c4e5d97ce1aa096dc85991cd657acccde796422b65a52089ca7fcb", size = 107869, upload-time = "2025-06-20T15:01:56.754Z" } +sdist = { url = "https://files.pythonhosted.org/packages/2d/93/ff302f96e02b5ac80a1ad18b94617985296f78aee212f86d83cba1c2a1a5/mem0ai-0.1.111.tar.gz", hash = "sha256:cc4b1a20cd4fd3b980cca4fd9f77ee4c9cff81b92e6f4d30014fd900dce59bba", size = 108299, upload-time = "2025-06-23T16:23:19.642Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/8a/d6/3d67909445682f5e73e910b187fc64ff84643709ac1956240d8a3834b1bd/mem0ai-0.1.110-py3-none-any.whl", hash = "sha256:4f69df6e633200b9d1b0177f82eaa96bf70a446aee8f40e56eedb67403f14395", size = 166820, upload-time = "2025-06-20T15:01:54.864Z" }, + { url = "https://files.pythonhosted.org/packages/2a/f5/185c88df177d0d9ae1226cc1ae75a2b2480280521a5c7690f1ca6a54b6af/mem0ai-0.1.111-py3-none-any.whl", hash = "sha256:53e8ce3551ffe1454b6e28ba90a8a88907280a9052edfeb872241662a4707f14", size = 168161, upload-time = "2025-06-23T16:23:18.146Z" }, ] [[package]]