This commit is contained in:
tv0924@icloud.com 2025-06-22 22:19:30 +09:00
commit 495b3a52da
5 changed files with 161 additions and 30 deletions

View file

@ -6,46 +6,87 @@ from browser_use import (
)
from lib.agents.run_agent import run_agent
from lib.utils.logger import logger
from lib.browser_use_utils.clean_resources import clean_agent_resources
from lib.browser_use_utils.create_google_ai import create_google_ai
from lib.config import GOOGLE_MODEL, GOOGLE_PLANNER_MODEL
NOT_FOUND_LOGIN_PAGE = 0
FOUND_LOGIN_PAGE = 1
class IsFound(BaseModel):
status: int
class FindLoginPageResponse(BaseModel):
status: int = NOT_FOUND_LOGIN_PAGE # 0 if not found, 1 if found
msg: str | None = None
url: str | None = None
async def find_login_page(target_url, session):
async def find_login_page(target_url, session) -> tuple[bool, str | None]:
initial_actions = [{"open_tab": {"url": target_url}}]
task = "Navigate to the login page, and stop"
extend_planner_system_message = "You are an expert in finding login pages. Your task is to navigate to the login page of the given URL and stop there."
task = """
You are an expert in finding login pages.
controller = Controller(output_model=IsFound, exclude_actions=['search_google'])
Your task is to navigate to the login page of the given URL. Follow the steps below strictly and return results only in the specified format.
You are NOT allowed to navigate to URLs that are not directly discoverable within the initial domain. Do NOT use search engines or guess external login URLs.
0. INITIAL BLOCK CHECK
- If the browser is blocked when trying to access the page due to firewall, CAPTCHA, regional restrictions, or other access denials immediately terminate the process and return the following JSON:
```json
{
"status": 0,
"msg": "Blocked",
"url": ""
}
```
- Do NOT proceed to further steps in this case.
1. LOGIN PAGE NAVIGATION
- Navigate only to a **client-side (non-enterprise)** login page within the provided domain.
- Do NOT rely on external tools, search engines, or links not directly found on the site.
- If a consent popup (e.g. for privacy/cookies) appears, you MUST dismiss or close it before proceeding.
- Since step 0 confirmed access, assume the page now loads properly.
2. RETURN FORMAT
- Once the login page is reached, return a JSON object matching the following schema:
```json
{
"status": 1, // 1 if login page is found, 0 otherwise
"msg": "Login page found", // Optional message
"url": "https://example.com/login" // Full URL of the login page if found
}
```
- If the login page cannot be found, return:
```json
{
"status": 0,
"msg": "Login page not found",
"url": ""
}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""
controller = Controller(output_model=FindLoginPageResponse, exclude_actions=['search_google'])
agent = Agent(
browser_session=session,
initial_actions=initial_actions,
task=task,
llm=create_google_ai(GOOGLE_MODEL),
planner_llm=create_google_ai(GOOGLE_PLANNER_MODEL),
controller=controller,
extend_planner_system_message=extend_planner_system_message,
)
status, final_result = await run_agent(agent)
if status:
is_failed, final_result = await run_agent(agent)
if is_failed:
logger(f"⚠️ 스캔 실패: {target_url} | {final_result}")
print(f"⚠️ 스캔 실패: {target_url} | {final_result}")
return False, None;
data = json.loads(final_result)
try:
is_found = IsFound(**data)
if is_found.status == NOT_FOUND_LOGIN_PAGE:
return False, "로그인 페이지를 찾을 수 없습니다."
resp = FindLoginPageResponse(**data)
if resp.status == FOUND_LOGIN_PAGE and len(resp.url) > 0:
return True, resp.url
else:
return True, "로그인 페이지를 찾았습니다."
return False, resp.msg
except Exception as e:
logger(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {final_result}")
print(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {final_result}")
return False, "결과 파싱 실패"
logger(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
print(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
return False, data.msg

View file

@ -0,0 +1,66 @@
import json
from pydantic import BaseModel
from browser_use import (
Agent,
Controller,
)
from lib.agents.run_agent import run_agent
from lib.utils.logger import logger
from lib.browser_use_utils.create_google_ai import create_google_ai
from lib.config import GOOGLE_MODEL, GOOGLE_PLANNER_MODEL
NOT_FOUND_SSO_LIST = 0
FOUND_SSO_LIST = 1
class EachSSOProvider(BaseModel):
provider: str
oauth_uri: str | None = None
class FindLoginPageResponse(BaseModel):
EachSSOProviders: list[EachSSOProvider] | None = None
status: int = NOT_FOUND_SSO_LIST # 0 if not found,
msg: str | None = None
async def get_sso_list(target_url, session) -> tuple[bool, str | None]:
initial_actions = [{"open_tab": {"url": target_url}}]
task = "Navigate to the login page, and return the result in the specified format."
extend_planner_system_message = """
You are an expert in finding login pages.
Your task is to navigate to the login page of the given URL.
Once you reach the login page, stop and return a JSON object that matches the following schema:
```json
{
"status": 1, # 1 if login page found, 0 otherwise
"url": "https://example.com/login" # Full URL of the login page if found
}
Return only this JSON object. Do not include any explanation or additional text.
"""
controller = Controller(output_model=FindLoginPageResponse, exclude_actions=['search_google'])
agent = Agent(
browser_session=session,
initial_actions=initial_actions,
task=task,
llm=create_google_ai(GOOGLE_MODEL),
planner_llm=create_google_ai(GOOGLE_PLANNER_MODEL),
controller=controller,
extend_planner_system_message=extend_planner_system_message,
)
is_failed, final_result = await run_agent(agent)
if is_failed:
logger(f"⚠️ 스캔 실패: {target_url} | {final_result}")
print(f"⚠️ 스캔 실패: {target_url} | {final_result}")
return False, None;
data = json.loads(final_result)
try:
resp = FindLoginPageResponse(**data)
if resp.status == FOUND_SSO_LIST:
return True, resp
else:
return False, None
except Exception as e:
logger(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
print(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
return False, data.msg

View file

@ -1,6 +1,6 @@
from lib.browser_use_utils.clean_resources import clean_agent_resources
async def run_agent(agent):
async def run_agent(agent) -> tuple[int, str]:
try:
response = await agent.run()
final_result = response.final_result()
@ -17,4 +17,3 @@ async def run_agent(agent):
return 2, "일반 에러로 인한 실패"
finally:
await clean_agent_resources(agent)
print("리소스 정리 완료")

View file

@ -4,6 +4,7 @@ from patchright.async_api import async_playwright as async_patchright
from lib.agents.find_login_page import find_login_page
from lib.browser_use_utils.clean_resources import clean_session_resources
from lib.browser_use_utils.get_profile import get_profile
from lib.utils.save_oauth_providers import save_oauth_providers
async def find_sso_list(target_url):
session = BrowserSession(
@ -18,29 +19,40 @@ async def find_sso_list(target_url):
FINISH = 0
final_result = None
task_queue = []
# find SSO
login_url = target_url
state = FIND_LOGIN_PAGE
while True:
if state == FIND_LOGIN_PAGE:
status, response = await find_login_page(
is_success, resp = await find_login_page(
target_url=target_url,
session=session,
)
if not status:
print(f"⚠️ 로그인 페이지 탐지 실패: {target_url} | {response}")
if not is_success:
print(f"⚠️ 로그인 페이지 탐지 실패: {target_url} | {resp}")
state = WHEN_ERROR
login_url = resp if resp else target_url
state = FIND_SSO_LIST
if state == FIND_SSO_LIST:
print(f"🔎 SSO 목록 찾는 중: {target_url}")
await asyncio.sleep(10) # 잠시 대기 후 다음 단계로 넘어감
break
is_success, resp = await find_sso_list(
target_url=login_url,
session=session,
)
if not is_success:
print(f"⚠️ SSO 목록 탐지 실패: {target_url} | {resp}")
state = WHEN_ERROR
final_result = ""
state = SAVE_DATA
if state == SAVE_DATA:
print(f"💾 데이터 저장 중: {target_url}")
break
if not final_result:
print(f"⚠️ SSO 목록이 전달되지 않았습니다: {target_url}")
state = WHEN_ERROR
save_oauth_providers(target_url, final_result)
state = FINISH
if state == WHEN_ERROR:
print(f"⚠️ 에러 발생: {target_url} | 스캔을 중단합니다.")

View file

@ -0,0 +1,13 @@
import csv
import os
def save_oauth_providers(url, oauth_entries):
csv_file = "./oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri"])
for entry in oauth_entries:
writer.writerow([url, entry.provider or None, entry.oauth_uri or None])
print(f"✅ OAuth providers saved to {csv_file}\n")