diff --git a/src/lib/browser_use/agents.py b/src/lib/browser_use/agents.py index 3dd50e9..af424e4 100644 --- a/src/lib/browser_use/agents.py +++ b/src/lib/browser_use/agents.py @@ -1,6 +1,9 @@ import asyncio import os import json +from typing import Dict, Any, Optional +from dataclasses import dataclass +from datetime import datetime, timedelta from browser_use import Agent, BrowserSession, Controller from patchright.async_api import async_playwright as async_patchright @@ -20,6 +23,103 @@ from lib.llm import CreateChatGoogle, get_prompt INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", "60")) # seconds MAX_BACKOFF = int(os.getenv("MAX_BACKOFF", "600")) # seconds +@dataclass +class RetryTask: + """재시도할 작업을 나타내는 클래스""" + task_type: str # "oauth_list" or "oauth_login" + url: str + oauth_provider: Optional[str] = None + retry_count: int = 0 + next_retry_time: Optional[datetime] = None + max_retries: int = 5 + +# 전역 재시도 큐 +retry_queue: list[RetryTask] = [] +retry_queue_lock = asyncio.Lock() + +async def add_to_retry_queue(task: RetryTask): + """작업을 재시도 큐에 추가""" + async with retry_queue_lock: + # 중복 작업 확인 + existing_task = None + for existing in retry_queue: + if (existing.task_type == task.task_type and + existing.url == task.url and + existing.oauth_provider == task.oauth_provider): + existing_task = existing + break + + if existing_task: + # 기존 작업이 있으면 재시도 횟수 업데이트 + existing_task.retry_count = task.retry_count + existing_task.next_retry_time = task.next_retry_time + print(f"📝 기존 작업 업데이트: {task.task_type} - {task.url} (재시도: {task.retry_count})") + else: + # 새 작업 추가 + retry_queue.append(task) + print(f"➕ 재시도 큐에 작업 추가: {task.task_type} - {task.url} (재시도: {task.retry_count})") + +async def process_retry_queue(): + """재시도 큐 처리""" + async with retry_queue_lock: + now = datetime.now() + ready_tasks = [] + + for task in retry_queue[:]: # 복사본에서 반복 + if task.next_retry_time and task.next_retry_time <= now: + ready_tasks.append(task) + retry_queue.remove(task) + + if ready_tasks: + print(f"🔄 {len(ready_tasks)}개의 재시도 작업 처리 중...") + + for task in ready_tasks: + try: + if task.task_type == "oauth_list": + result = await _extract_oauth_list_internal(task.url) + if result: + print(f"✅ 재시도 성공: OAuth 리스트 추출 - {task.url}") + else: + await _handle_retry_failure(task) + elif task.task_type == "oauth_login": + result = await _test_oauth_login_internal(task.url, task.oauth_provider) + if result: + print(f"✅ 재시도 성공: {task.oauth_provider} 로그인 - {task.url}") + else: + await _handle_retry_failure(task) + except Exception as e: + print(f"❌ 재시도 중 에러: {e}") + await _handle_retry_failure(task) + +async def _handle_retry_failure(task: RetryTask): + """재시도 실패 처리""" + if task.retry_count < task.max_retries: + task.retry_count += 1 + wait_time = min(INITIAL_BACKOFF * (2 ** task.retry_count), MAX_BACKOFF) + task.next_retry_time = datetime.now() + timedelta(seconds=wait_time) + await add_to_retry_queue(task) + print(f"⏰ {wait_time}초 후 재시도 예정: {task.task_type} - {task.url}") + else: + print(f"❌ 최대 재시도 횟수 초과: {task.task_type} - {task.url}") + logger(f"❌ 최대 재시도 횟수 초과: {task.task_type} - {task.url}") + +async def get_retry_queue_status(): + """재시도 큐 상태 조회""" + async with retry_queue_lock: + return { + "queue_length": len(retry_queue), + "tasks": [ + { + "task_type": task.task_type, + "url": task.url, + "oauth_provider": task.oauth_provider, + "retry_count": task.retry_count, + "next_retry_time": task.next_retry_time.isoformat() if task.next_retry_time else None + } + for task in retry_queue + ] + } + async def _run_agent_with_retry(agent_config): """Agent 실행을 위한 내부 헬퍼 함수 (재시도 로직 포함)""" agent = None @@ -41,23 +141,27 @@ async def _run_agent_with_retry(agent_config): response = await agent.run() await clean_resources(agent, session) + + if any(keyword in str(response) for keyword in [ + "429", "resource_exhausted", "resourceexhausted", + "quota", "rate limit", "too many requests", + "exceeded", "limit reached" + ]): + print(f"⚠️ API 쿼터 에러 발생, 재시도 큐에 추가: {url}") + task = RetryTask( + task_type=agent_config.get("task_type", "unknown"), + url=url, + retry_count=try_cnt + 1, + next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF) + ) + await add_to_retry_queue(task) + return None + return response except Exception as e: await clean_resources(agent, session) - - if "ResourceExhausted" in str(e) or "429" in str(e): - wait = min(INITIAL_BACKOFF * (2**try_cnt), MAX_BACKOFF) - print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...") - await asyncio.sleep(wait) - try_cnt += 1 - if try_cnt >= 3: - error_msg = f"API 쿼터 문제가 지속됩니다." - logger(f"❌ {url} - {agent_config['log_context']} 실패: {error_msg}: {e}") - print(f"❌ {url} - {agent_config['log_context']} 실패: {error_msg}") - return None - continue - + # 일반 에러 처리 try_cnt += 1 if try_cnt >= 3: @@ -72,8 +176,8 @@ async def _run_agent_with_retry(agent_config): return None -async def extract_oauth_list(url: str): - """첫 번째 Agent: 로그인 페이지를 찾고 OAuth 리스트만 추출""" +async def _extract_oauth_list_internal(url: str): + """OAuth 리스트 추출 내부 함수 (재시도 큐에서 사용)""" target_url = url if url.startswith("http") else f"https://{url}" print(f"🔎 OAuth 리스트 추출 시작: {target_url}") prompt, model = get_prompt("auth") @@ -93,13 +197,14 @@ async def extract_oauth_list(url: str): "llm": CreateChatGoogle(config.GOOGLE_MODEL), "planner_llm": ( CreateChatGoogle(config.GOOGLE_PLANNER_MODEL) - if config.GOOGLE_PLANNER_MODEL + if config.GOOGLE_PLANNER_MODEL and os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LIST") else None ), "controller": Controller( output_model=model if not isinstance(model, str) else None, exclude_actions=["search_google", "unknown_action", "unkown"], ), + "extend_system_message": prompt, "extend_planner_system_message": prompt, } } @@ -130,8 +235,32 @@ async def extract_oauth_list(url: str): return [] -async def test_oauth_login(url: str, oauth_provider: str): - """두 번째 Agent: 특정 OAuth 제공자로 로그인 시도""" +async def extract_oauth_list(url: str): + """첫 번째 Agent: 로그인 페이지를 찾고 OAuth 리스트만 추출""" + try: + return await _extract_oauth_list_internal(url) + except Exception as e: + error_str = str(e).lower() + if any(keyword in error_str for keyword in [ + "429", "resource_exhausted", "resourceexhausted", + "quota", "rate limit", "too many requests", + "exceeded", "limit reached" + ]): + print(f"⚠️ API 쿼터 에러 발생, 재시도 큐에 추가: {url}") + task = RetryTask( + task_type="oauth_list", + url=url, + retry_count=1, + next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF) + ) + await add_to_retry_queue(task) + return [] + else: + raise e + + +async def _test_oauth_login_internal(url: str, oauth_provider: str): + """OAuth 로그인 테스트 내부 함수 (재시도 큐에서 사용)""" target_url = url if url.startswith("http") else f"https://{url}" print(f"🔐 {oauth_provider} 로그인 시작: {target_url}") @@ -160,6 +289,7 @@ async def test_oauth_login(url: str, oauth_provider: str): output_model=model if not isinstance(model, str) else None, exclude_actions=["search_google", "unknown_action", "unkown"], ), + "extend_system_message": prompt, "extend_planner_system_message": prompt, } } @@ -173,4 +303,51 @@ async def test_oauth_login(url: str, oauth_provider: str): return True print(f"❌ {oauth_provider} 로그인 실패") - return False \ No newline at end of file + return False + + +async def test_oauth_login(url: str, oauth_provider: str): + """두 번째 Agent: 특정 OAuth 제공자로 로그인 시도""" + try: + return await _test_oauth_login_internal(url, oauth_provider) + except Exception as e: + error_str = str(e).lower() + if any(keyword in error_str for keyword in [ + "429", "resource_exhausted", "resourceexhausted", + "quota", "rate limit", "too many requests", + "exceeded", "limit reached" + ]): + print(f"⚠️ API 쿼터 에러 발생, 재시도 큐에 추가: {oauth_provider} - {url}") + task = RetryTask( + task_type="oauth_login", + url=url, + oauth_provider=oauth_provider, + retry_count=1, + next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF) + ) + await add_to_retry_queue(task) + return False + else: + raise e + +async def start_retry_queue_processor(): + """재시도 큐 처리기를 백그라운드에서 시작""" + async def queue_processor(): + while True: + try: + await process_retry_queue() + await asyncio.sleep(30) # 30초마다 큐 확인 + except Exception as e: + print(f"❌ 재시도 큐 처리 중 에러: {e}") + await asyncio.sleep(60) # 에러 발생 시 1분 대기 + + # 백그라운드 태스크로 실행 + asyncio.create_task(queue_processor()) + print("🔄 재시도 큐 처리기 시작됨") + +# 모듈 로딩 시 자동으로 백그라운드 처리기 시작 +# (실제 애플리케이션에서는 main 함수에서 호출하는 것이 좋음) +def init_retry_system(): + """재시도 시스템 초기화""" + print("🔧 재시도 시스템 초기화 중...") + # 이 함수는 메인 애플리케이션에서 호출해야 함 \ No newline at end of file diff --git a/src/lib/browser_use/scanner.py b/src/lib/browser_use/scanner.py index 4a586db..04a9371 100644 --- a/src/lib/browser_use/scanner.py +++ b/src/lib/browser_use/scanner.py @@ -3,7 +3,7 @@ import os import csv from lib.utils import notify_backend, read_lines_between, is_html_url -from lib.browser_use.agents import extract_oauth_list, test_oauth_login +from lib.browser_use.agents import extract_oauth_list, test_oauth_login, start_retry_queue_processor, get_retry_queue_status from lib.utils.progress import current_progress, load_progress, save_progress, progress_file async def scan_one_url(url: str, skip_html_check: bool = False): @@ -66,11 +66,16 @@ async def main_loop( filepath: str, start_line: int, end_line: int, skip_html_check: bool = False ): """지정된 URL 목록에 대해 스캔을 실행하는 메인 루프""" + # 재시도 큐 처리기 시작 + await start_retry_queue_processor() + target_list = read_lines_between( filepath=filepath, start_line=start_line, end_line=end_line ) - current_progress["total"] = len(target_list) + # 전체 목록 길이를 저장 (재개 시에도 유지되어야 함) + total_count = len(target_list) + current_progress["total"] = total_count current_progress["start_line"] = start_line current_progress["current_index"] = 0 @@ -84,15 +89,23 @@ async def main_loop( if resume == 'y': start_index = prev_progress.get("current_index", 0) current_progress["current_index"] = start_index + # 전체 개수는 원래 목록 길이로 유지 + current_progress["total"] = total_count target_list = target_list[start_index:] print(f"✅ {start_index}번째부터 재개합니다.") for i, url in enumerate(target_list): - actual_index = current_progress["current_index"] + i + # current_index는 전체 목록에서의 현재 위치를 나타냄 + current_url_index = current_progress["current_index"] current_progress["current_url"] = url - print(f"\n🔄 Processing {actual_index + 1}/{current_progress['total']}: {url}") - print(f"📍 {os.path.basename(filepath)}의 {start_line + actual_index}번째 줄") + print(f"\n🔄 Processing {current_url_index + 1}/{current_progress['total']}: {url}") + print(f"📍 {os.path.basename(filepath)}의 {start_line + current_url_index}번째 줄") + + # 재시도 큐 상태 확인 및 출력 + retry_status = await get_retry_queue_status() + if retry_status["queue_length"] > 0: + print(f"⏳ 재시도 큐에 {retry_status['queue_length']}개 작업 대기 중") if i > 0: print("⏳ API 쿼터 보호를 위해 30초 대기 중...") @@ -100,7 +113,23 @@ async def main_loop( await scan_one_url(url, skip_html_check=skip_html_check) - current_progress["current_index"] = actual_index + 1 + # 스캔 완료 후 재시도 큐 상태 확인 + retry_status_after = await get_retry_queue_status() + if retry_status_after["queue_length"] > 0: + print(f"📊 스캔 완료 후 재시도 큐 상태: {retry_status_after['queue_length']}개 작업 대기 중") + + # 다음 URL로 진행 + current_progress["current_index"] = current_url_index + 1 save_progress() - print(f"\n🎉 모든 스캔이 완료되었습니다! ({current_progress['total']}개 URL)") \ No newline at end of file + # 모든 URL 처리 완료 후 재시도 큐가 빌 때까지 대기 + print("\n🔄 모든 URL 처리 완료. 재시도 큐 처리 대기 중...") + while True: + retry_status = await get_retry_queue_status() + if retry_status["queue_length"] == 0: + break + print(f"⏳ 재시도 큐에 {retry_status['queue_length']}개 작업 남음. 30초 후 다시 확인...") + await asyncio.sleep(30) + + print(f"\n🎉 모든 스캔이 완료되었습니다! ({total_count}개 URL)") + print("🎉 재시도 큐도 모두 처리되었습니다!") \ No newline at end of file diff --git a/src/lib/llm/prompt/fallback/model.py b/src/lib/llm/prompt/fallback/model.py index d1322ba..832409c 100644 --- a/src/lib/llm/prompt/fallback/model.py +++ b/src/lib/llm/prompt/fallback/model.py @@ -2,5 +2,5 @@ from pydantic import BaseModel class model(BaseModel): msg: str | None = None - status: str | None = None # "success", "mfa_required", "google_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials" + status: str | None = None # "success", "mfa_required", "blocked", "sso_not_found", "login_page_not_found", "invalid_credentials" final_url: str | None = None diff --git a/src/lib/llm/prompt/fallback/prompt.py b/src/lib/llm/prompt/fallback/prompt.py index 87ad5ce..42b11e1 100644 --- a/src/lib/llm/prompt/fallback/prompt.py +++ b/src/lib/llm/prompt/fallback/prompt.py @@ -103,7 +103,7 @@ Return the result in the following format only: ```json {{ "msg": "Google login completed", - "status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "google_blocked" | "sso_not_found" | "login_page_not_found", + "status": "success" | "already_logged_in" | "mfa_required" | "window_blocked" | "idpw_required" | "blocked" | "sso_not_found" | "login_page_not_found", "final_url": "" }} ```