mirror of
https://github.com/j93es/browser-use-oauth.git
synced 2026-06-04 03:31:51 +09:00
40 lines
1.5 KiB
Python
40 lines
1.5 KiB
Python
import json
|
|
from typing import Any
|
|
from pydantic import BaseModel
|
|
from browser_use import (
|
|
BrowserSession
|
|
)
|
|
from patchright.async_api import async_playwright as async_patchright
|
|
from lib.utils.logger import logger
|
|
from lib.browser_use_utils import get_profile, clean_session_resources, run_agent
|
|
|
|
|
|
async def run_task(target_url: str, ReturnModel: type[BaseModel], task: str) -> tuple[bool, type[BaseModel] | None]:
|
|
session = BrowserSession(
|
|
playwright=(await async_patchright().start()),
|
|
browser_profile=await get_profile(),
|
|
)
|
|
|
|
initial_actions = [{"open_tab": {"url": target_url}}]
|
|
|
|
seccess, msg, final_result = await run_agent(session=session,
|
|
initial_actions=initial_actions,
|
|
ReturnModel=ReturnModel,
|
|
task=task)
|
|
if not seccess:
|
|
logger(f"⚠️ LLM 실행 실패: {target_url} | {msg}")
|
|
print(f"⚠️ LLM 실행 실패: {target_url} | {msg}")
|
|
await clean_session_resources(session)
|
|
return False, None
|
|
|
|
try:
|
|
data = json.loads(final_result)
|
|
resp = ReturnModel(**data)
|
|
return True, resp
|
|
except Exception as e:
|
|
logger(f"⚠️ LLM 응답 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
|
|
print(f"⚠️ LLM 응답 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
|
|
return False, None
|
|
finally:
|
|
await clean_session_resources(session)
|
|
|