From 495b3a52daf8dbfc6f762cab1c8eb429a04e9fa3 Mon Sep 17 00:00:00 2001 From: "tv0924@icloud.com" Date: Sun, 22 Jun 2025 22:19:30 +0900 Subject: [PATCH] [Update] --- lib/agents/find_login_page.py | 77 +++++++++++++++++++++++-------- lib/agents/get_sso_list.py | 66 ++++++++++++++++++++++++++ lib/agents/run_agent.py | 5 +- lib/find_sso_list.py | 30 ++++++++---- lib/utils/save_oauth_providers.py | 13 ++++++ 5 files changed, 161 insertions(+), 30 deletions(-) create mode 100644 lib/agents/get_sso_list.py create mode 100644 lib/utils/save_oauth_providers.py diff --git a/lib/agents/find_login_page.py b/lib/agents/find_login_page.py index d363a04..f0c0dc7 100644 --- a/lib/agents/find_login_page.py +++ b/lib/agents/find_login_page.py @@ -6,46 +6,87 @@ from browser_use import ( ) from lib.agents.run_agent import run_agent from lib.utils.logger import logger -from lib.browser_use_utils.clean_resources import clean_agent_resources from lib.browser_use_utils.create_google_ai import create_google_ai from lib.config import GOOGLE_MODEL, GOOGLE_PLANNER_MODEL NOT_FOUND_LOGIN_PAGE = 0 FOUND_LOGIN_PAGE = 1 -class IsFound(BaseModel): - status: int +class FindLoginPageResponse(BaseModel): + status: int = NOT_FOUND_LOGIN_PAGE # 0 if not found, 1 if found + msg: str | None = None + url: str | None = None -async def find_login_page(target_url, session): +async def find_login_page(target_url, session) -> tuple[bool, str | None]: initial_actions = [{"open_tab": {"url": target_url}}] - task = "Navigate to the login page, and stop" - extend_planner_system_message = "You are an expert in finding login pages. Your task is to navigate to the login page of the given URL and stop there." + task = """ + You are an expert in finding login pages. - controller = Controller(output_model=IsFound, exclude_actions=['search_google']) + Your task is to navigate to the login page of the given URL. Follow the steps below strictly and return results only in the specified format. + + ※ You are NOT allowed to navigate to URLs that are not directly discoverable within the initial domain. Do NOT use search engines or guess external login URLs. + + 0. INITIAL BLOCK CHECK + - If the browser is blocked when trying to access the page — due to firewall, CAPTCHA, regional restrictions, or other access denials — immediately terminate the process and return the following JSON: + ```json + { + "status": 0, + "msg": "Blocked", + "url": "" + } + ``` + - Do NOT proceed to further steps in this case. + + 1. LOGIN PAGE NAVIGATION + - Navigate only to a **client-side (non-enterprise)** login page within the provided domain. + - Do NOT rely on external tools, search engines, or links not directly found on the site. + - If a consent popup (e.g. for privacy/cookies) appears, you MUST dismiss or close it before proceeding. + - Since step 0 confirmed access, assume the page now loads properly. + + 2. RETURN FORMAT + - Once the login page is reached, return a JSON object matching the following schema: + ```json + { + "status": 1, // 1 if login page is found, 0 otherwise + "msg": "Login page found", // Optional message + "url": "https://example.com/login" // Full URL of the login page if found + } + ``` + - If the login page cannot be found, return: + ```json + { + "status": 0, + "msg": "Login page not found", + "url": "" + } + ``` + - Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output. + """ + + + controller = Controller(output_model=FindLoginPageResponse, exclude_actions=['search_google']) agent = Agent( browser_session=session, initial_actions=initial_actions, task=task, llm=create_google_ai(GOOGLE_MODEL), - planner_llm=create_google_ai(GOOGLE_PLANNER_MODEL), controller=controller, - extend_planner_system_message=extend_planner_system_message, ) - status, final_result = await run_agent(agent) - if status: + is_failed, final_result = await run_agent(agent) + if is_failed: logger(f"⚠️ 스캔 실패: {target_url} | {final_result}") print(f"⚠️ 스캔 실패: {target_url} | {final_result}") return False, None; data = json.loads(final_result) try: - is_found = IsFound(**data) - if is_found.status == NOT_FOUND_LOGIN_PAGE: - return False, "로그인 페이지를 찾을 수 없습니다." + resp = FindLoginPageResponse(**data) + if resp.status == FOUND_LOGIN_PAGE and len(resp.url) > 0: + return True, resp.url else: - return True, "로그인 페이지를 찾았습니다." + return False, resp.msg except Exception as e: - logger(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {final_result}") - print(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {final_result}") - return False, "결과 파싱 실패" + logger(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}") + print(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}") + return False, data.msg diff --git a/lib/agents/get_sso_list.py b/lib/agents/get_sso_list.py new file mode 100644 index 0000000..2276fc6 --- /dev/null +++ b/lib/agents/get_sso_list.py @@ -0,0 +1,66 @@ +import json +from pydantic import BaseModel +from browser_use import ( + Agent, + Controller, +) +from lib.agents.run_agent import run_agent +from lib.utils.logger import logger +from lib.browser_use_utils.create_google_ai import create_google_ai +from lib.config import GOOGLE_MODEL, GOOGLE_PLANNER_MODEL + +NOT_FOUND_SSO_LIST = 0 +FOUND_SSO_LIST = 1 + +class EachSSOProvider(BaseModel): + provider: str + oauth_uri: str | None = None + +class FindLoginPageResponse(BaseModel): + EachSSOProviders: list[EachSSOProvider] | None = None + status: int = NOT_FOUND_SSO_LIST # 0 if not found, + msg: str | None = None + +async def get_sso_list(target_url, session) -> tuple[bool, str | None]: + initial_actions = [{"open_tab": {"url": target_url}}] + task = "Navigate to the login page, and return the result in the specified format." + extend_planner_system_message = """ + You are an expert in finding login pages. + Your task is to navigate to the login page of the given URL. + Once you reach the login page, stop and return a JSON object that matches the following schema: + ```json + { + "status": 1, # 1 if login page found, 0 otherwise + "url": "https://example.com/login" # Full URL of the login page if found + } + Return only this JSON object. Do not include any explanation or additional text. + """ + + controller = Controller(output_model=FindLoginPageResponse, exclude_actions=['search_google']) + agent = Agent( + browser_session=session, + initial_actions=initial_actions, + task=task, + llm=create_google_ai(GOOGLE_MODEL), + planner_llm=create_google_ai(GOOGLE_PLANNER_MODEL), + controller=controller, + extend_planner_system_message=extend_planner_system_message, + ) + + is_failed, final_result = await run_agent(agent) + if is_failed: + logger(f"⚠️ 스캔 실패: {target_url} | {final_result}") + print(f"⚠️ 스캔 실패: {target_url} | {final_result}") + return False, None; + + data = json.loads(final_result) + try: + resp = FindLoginPageResponse(**data) + if resp.status == FOUND_SSO_LIST: + return True, resp + else: + return False, None + except Exception as e: + logger(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}") + print(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}") + return False, data.msg diff --git a/lib/agents/run_agent.py b/lib/agents/run_agent.py index fe9160c..7bcb686 100644 --- a/lib/agents/run_agent.py +++ b/lib/agents/run_agent.py @@ -1,6 +1,6 @@ from lib.browser_use_utils.clean_resources import clean_agent_resources -async def run_agent(agent): +async def run_agent(agent) -> tuple[int, str]: try: response = await agent.run() final_result = response.final_result() @@ -16,5 +16,4 @@ async def run_agent(agent): else: return 2, "일반 에러로 인한 실패" finally: - await clean_agent_resources(agent) - print("리소스 정리 완료") \ No newline at end of file + await clean_agent_resources(agent) \ No newline at end of file diff --git a/lib/find_sso_list.py b/lib/find_sso_list.py index d13c8ad..11fd608 100644 --- a/lib/find_sso_list.py +++ b/lib/find_sso_list.py @@ -4,6 +4,7 @@ from patchright.async_api import async_playwright as async_patchright from lib.agents.find_login_page import find_login_page from lib.browser_use_utils.clean_resources import clean_session_resources from lib.browser_use_utils.get_profile import get_profile +from lib.utils.save_oauth_providers import save_oauth_providers async def find_sso_list(target_url): session = BrowserSession( @@ -18,29 +19,40 @@ async def find_sso_list(target_url): FINISH = 0 final_result = None - task_queue = [] - - # find SSO + login_url = target_url state = FIND_LOGIN_PAGE while True: if state == FIND_LOGIN_PAGE: - status, response = await find_login_page( + is_success, resp = await find_login_page( target_url=target_url, session=session, ) - if not status: - print(f"⚠️ 로그인 페이지 탐지 실패: {target_url} | {response}") + if not is_success: + print(f"⚠️ 로그인 페이지 탐지 실패: {target_url} | {resp}") state = WHEN_ERROR + login_url = resp if resp else target_url state = FIND_SSO_LIST if state == FIND_SSO_LIST: print(f"🔎 SSO 목록 찾는 중: {target_url}") - await asyncio.sleep(10) # 잠시 대기 후 다음 단계로 넘어감 - break + is_success, resp = await find_sso_list( + target_url=login_url, + session=session, + ) + if not is_success: + print(f"⚠️ SSO 목록 탐지 실패: {target_url} | {resp}") + state = WHEN_ERROR + final_result = "" + state = SAVE_DATA if state == SAVE_DATA: print(f"💾 데이터 저장 중: {target_url}") - break + if not final_result: + print(f"⚠️ SSO 목록이 전달되지 않았습니다: {target_url}") + state = WHEN_ERROR + + save_oauth_providers(target_url, final_result) + state = FINISH if state == WHEN_ERROR: print(f"⚠️ 에러 발생: {target_url} | 스캔을 중단합니다.") diff --git a/lib/utils/save_oauth_providers.py b/lib/utils/save_oauth_providers.py new file mode 100644 index 0000000..892d3ac --- /dev/null +++ b/lib/utils/save_oauth_providers.py @@ -0,0 +1,13 @@ +import csv +import os + +def save_oauth_providers(url, oauth_entries): + csv_file = "./oauth_providers.csv" + file_exists = os.path.isfile(csv_file) + with open(csv_file, "a", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + if not file_exists: + writer.writerow(["issuer", "provider", "oauth_uri"]) + for entry in oauth_entries: + writer.writerow([url, entry.provider or None, entry.oauth_uri or None]) + print(f"✅ OAuth providers saved to {csv_file}\n") \ No newline at end of file