browser-use-oauth/lib/browser_use_utils/run_task.py
2025-06-27 09:45:50 +09:00

40 lines
1.5 KiB
Python

import json
from typing import Any
from pydantic import BaseModel
from browser_use import (
BrowserSession
)
from patchright.async_api import async_playwright as async_patchright
from lib.utils.logger import logger
from lib.browser_use_utils import get_profile, clean_session_resources, run_agent
async def run_task(target_url: str, ReturnModel: type[BaseModel], task: str) -> tuple[bool, type[BaseModel] | None]:
session = BrowserSession(
playwright=(await async_patchright().start()),
browser_profile=await get_profile(),
)
initial_actions = [{"open_tab": {"url": target_url}}]
seccess, msg, final_result = await run_agent(session=session,
initial_actions=initial_actions,
ReturnModel=ReturnModel,
task=task)
if not seccess:
logger(f"⚠️ LLM 실행 실패: {target_url} | {msg}")
print(f"⚠️ LLM 실행 실패: {target_url} | {msg}")
await clean_session_resources(session)
return False, None
try:
data = json.loads(final_result)
resp = ReturnModel(**data)
return True, resp
except Exception as e:
logger(f"⚠️ LLM 응답 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
print(f"⚠️ LLM 응답 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
return False, None
finally:
await clean_session_resources(session)