[ADD] 재시도 큐 시스템 추가 및 관련 함수 구현

This commit is contained in:
암냥 2025-06-30 21:01:01 +09:00
commit 18a575a8af
4 changed files with 234 additions and 28 deletions

View file

@ -1,6 +1,9 @@
import asyncio
import os
import json
from typing import Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
from browser_use import Agent, BrowserSession, Controller
from patchright.async_api import async_playwright as async_patchright
@ -20,6 +23,103 @@ from lib.llm import CreateChatGoogle, get_prompt
INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", "60")) # seconds
MAX_BACKOFF = int(os.getenv("MAX_BACKOFF", "600")) # seconds
@dataclass
class RetryTask:
"""재시도할 작업을 나타내는 클래스"""
task_type: str # "oauth_list" or "oauth_login"
url: str
oauth_provider: Optional[str] = None
retry_count: int = 0
next_retry_time: Optional[datetime] = None
max_retries: int = 5
# 전역 재시도 큐
retry_queue: list[RetryTask] = []
retry_queue_lock = asyncio.Lock()
async def add_to_retry_queue(task: RetryTask):
"""작업을 재시도 큐에 추가"""
async with retry_queue_lock:
# 중복 작업 확인
existing_task = None
for existing in retry_queue:
if (existing.task_type == task.task_type and
existing.url == task.url and
existing.oauth_provider == task.oauth_provider):
existing_task = existing
break
if existing_task:
# 기존 작업이 있으면 재시도 횟수 업데이트
existing_task.retry_count = task.retry_count
existing_task.next_retry_time = task.next_retry_time
print(f"📝 기존 작업 업데이트: {task.task_type} - {task.url} (재시도: {task.retry_count})")
else:
# 새 작업 추가
retry_queue.append(task)
print(f" 재시도 큐에 작업 추가: {task.task_type} - {task.url} (재시도: {task.retry_count})")
async def process_retry_queue():
"""재시도 큐 처리"""
async with retry_queue_lock:
now = datetime.now()
ready_tasks = []
for task in retry_queue[:]: # 복사본에서 반복
if task.next_retry_time and task.next_retry_time <= now:
ready_tasks.append(task)
retry_queue.remove(task)
if ready_tasks:
print(f"🔄 {len(ready_tasks)}개의 재시도 작업 처리 중...")
for task in ready_tasks:
try:
if task.task_type == "oauth_list":
result = await _extract_oauth_list_internal(task.url)
if result:
print(f"✅ 재시도 성공: OAuth 리스트 추출 - {task.url}")
else:
await _handle_retry_failure(task)
elif task.task_type == "oauth_login":
result = await _test_oauth_login_internal(task.url, task.oauth_provider)
if result:
print(f"✅ 재시도 성공: {task.oauth_provider} 로그인 - {task.url}")
else:
await _handle_retry_failure(task)
except Exception as e:
print(f"❌ 재시도 중 에러: {e}")
await _handle_retry_failure(task)
async def _handle_retry_failure(task: RetryTask):
"""재시도 실패 처리"""
if task.retry_count < task.max_retries:
task.retry_count += 1
wait_time = min(INITIAL_BACKOFF * (2 ** task.retry_count), MAX_BACKOFF)
task.next_retry_time = datetime.now() + timedelta(seconds=wait_time)
await add_to_retry_queue(task)
print(f"{wait_time}초 후 재시도 예정: {task.task_type} - {task.url}")
else:
print(f"❌ 최대 재시도 횟수 초과: {task.task_type} - {task.url}")
logger(f"❌ 최대 재시도 횟수 초과: {task.task_type} - {task.url}")
async def get_retry_queue_status():
"""재시도 큐 상태 조회"""
async with retry_queue_lock:
return {
"queue_length": len(retry_queue),
"tasks": [
{
"task_type": task.task_type,
"url": task.url,
"oauth_provider": task.oauth_provider,
"retry_count": task.retry_count,
"next_retry_time": task.next_retry_time.isoformat() if task.next_retry_time else None
}
for task in retry_queue
]
}
async def _run_agent_with_retry(agent_config):
"""Agent 실행을 위한 내부 헬퍼 함수 (재시도 로직 포함)"""
agent = None
@ -41,23 +141,27 @@ async def _run_agent_with_retry(agent_config):
response = await agent.run()
await clean_resources(agent, session)
if any(keyword in str(response) for keyword in [
"429", "resource_exhausted", "resourceexhausted",
"quota", "rate limit", "too many requests",
"exceeded", "limit reached"
]):
print(f"⚠️ API 쿼터 에러 발생, 재시도 큐에 추가: {url}")
task = RetryTask(
task_type=agent_config.get("task_type", "unknown"),
url=url,
retry_count=try_cnt + 1,
next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF)
)
await add_to_retry_queue(task)
return None
return response
except Exception as e:
await clean_resources(agent, session)
if "ResourceExhausted" in str(e) or "429" in str(e):
wait = min(INITIAL_BACKOFF * (2**try_cnt), MAX_BACKOFF)
print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...")
await asyncio.sleep(wait)
try_cnt += 1
if try_cnt >= 3:
error_msg = f"API 쿼터 문제가 지속됩니다."
logger(f"{url} - {agent_config['log_context']} 실패: {error_msg}: {e}")
print(f"{url} - {agent_config['log_context']} 실패: {error_msg}")
return None
continue
# 일반 에러 처리
try_cnt += 1
if try_cnt >= 3:
@ -72,8 +176,8 @@ async def _run_agent_with_retry(agent_config):
return None
async def extract_oauth_list(url: str):
"""첫 번째 Agent: 로그인 페이지를 찾고 OAuth 리스트 추출"""
async def _extract_oauth_list_internal(url: str):
"""OAuth 리스트 추출 내부 함수 (재시도 큐에서 사용)"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🔎 OAuth 리스트 추출 시작: {target_url}")
prompt, model = get_prompt("auth")
@ -93,13 +197,14 @@ async def extract_oauth_list(url: str):
"llm": CreateChatGoogle(config.GOOGLE_MODEL),
"planner_llm": (
CreateChatGoogle(config.GOOGLE_PLANNER_MODEL)
if config.GOOGLE_PLANNER_MODEL
if config.GOOGLE_PLANNER_MODEL and os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LIST")
else None
),
"controller": Controller(
output_model=model if not isinstance(model, str) else None,
exclude_actions=["search_google", "unknown_action", "unkown"],
),
"extend_system_message": prompt,
"extend_planner_system_message": prompt,
}
}
@ -130,8 +235,32 @@ async def extract_oauth_list(url: str):
return []
async def test_oauth_login(url: str, oauth_provider: str):
"""두 번째 Agent: 특정 OAuth 제공자로 로그인 시도"""
async def extract_oauth_list(url: str):
"""첫 번째 Agent: 로그인 페이지를 찾고 OAuth 리스트만 추출"""
try:
return await _extract_oauth_list_internal(url)
except Exception as e:
error_str = str(e).lower()
if any(keyword in error_str for keyword in [
"429", "resource_exhausted", "resourceexhausted",
"quota", "rate limit", "too many requests",
"exceeded", "limit reached"
]):
print(f"⚠️ API 쿼터 에러 발생, 재시도 큐에 추가: {url}")
task = RetryTask(
task_type="oauth_list",
url=url,
retry_count=1,
next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF)
)
await add_to_retry_queue(task)
return []
else:
raise e
async def _test_oauth_login_internal(url: str, oauth_provider: str):
"""OAuth 로그인 테스트 내부 함수 (재시도 큐에서 사용)"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🔐 {oauth_provider} 로그인 시작: {target_url}")
@ -160,6 +289,7 @@ async def test_oauth_login(url: str, oauth_provider: str):
output_model=model if not isinstance(model, str) else None,
exclude_actions=["search_google", "unknown_action", "unkown"],
),
"extend_system_message": prompt,
"extend_planner_system_message": prompt,
}
}
@ -173,4 +303,51 @@ async def test_oauth_login(url: str, oauth_provider: str):
return True
print(f"{oauth_provider} 로그인 실패")
return False
return False
async def test_oauth_login(url: str, oauth_provider: str):
"""두 번째 Agent: 특정 OAuth 제공자로 로그인 시도"""
try:
return await _test_oauth_login_internal(url, oauth_provider)
except Exception as e:
error_str = str(e).lower()
if any(keyword in error_str for keyword in [
"429", "resource_exhausted", "resourceexhausted",
"quota", "rate limit", "too many requests",
"exceeded", "limit reached"
]):
print(f"⚠️ API 쿼터 에러 발생, 재시도 큐에 추가: {oauth_provider} - {url}")
task = RetryTask(
task_type="oauth_login",
url=url,
oauth_provider=oauth_provider,
retry_count=1,
next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF)
)
await add_to_retry_queue(task)
return False
else:
raise e
async def start_retry_queue_processor():
"""재시도 큐 처리기를 백그라운드에서 시작"""
async def queue_processor():
while True:
try:
await process_retry_queue()
await asyncio.sleep(30) # 30초마다 큐 확인
except Exception as e:
print(f"❌ 재시도 큐 처리 중 에러: {e}")
await asyncio.sleep(60) # 에러 발생 시 1분 대기
# 백그라운드 태스크로 실행
asyncio.create_task(queue_processor())
print("🔄 재시도 큐 처리기 시작됨")
# 모듈 로딩 시 자동으로 백그라운드 처리기 시작
# (실제 애플리케이션에서는 main 함수에서 호출하는 것이 좋음)
def init_retry_system():
"""재시도 시스템 초기화"""
print("🔧 재시도 시스템 초기화 중...")
# 이 함수는 메인 애플리케이션에서 호출해야 함

View file

@ -3,7 +3,7 @@ import os
import csv
from lib.utils import notify_backend, read_lines_between, is_html_url
from lib.browser_use.agents import extract_oauth_list, test_oauth_login
from lib.browser_use.agents import extract_oauth_list, test_oauth_login, start_retry_queue_processor, get_retry_queue_status
from lib.utils.progress import current_progress, load_progress, save_progress, progress_file
async def scan_one_url(url: str, skip_html_check: bool = False):
@ -66,11 +66,16 @@ async def main_loop(
filepath: str, start_line: int, end_line: int, skip_html_check: bool = False
):
"""지정된 URL 목록에 대해 스캔을 실행하는 메인 루프"""
# 재시도 큐 처리기 시작
await start_retry_queue_processor()
target_list = read_lines_between(
filepath=filepath, start_line=start_line, end_line=end_line
)
current_progress["total"] = len(target_list)
# 전체 목록 길이를 저장 (재개 시에도 유지되어야 함)
total_count = len(target_list)
current_progress["total"] = total_count
current_progress["start_line"] = start_line
current_progress["current_index"] = 0
@ -84,15 +89,23 @@ async def main_loop(
if resume == 'y':
start_index = prev_progress.get("current_index", 0)
current_progress["current_index"] = start_index
# 전체 개수는 원래 목록 길이로 유지
current_progress["total"] = total_count
target_list = target_list[start_index:]
print(f"{start_index}번째부터 재개합니다.")
for i, url in enumerate(target_list):
actual_index = current_progress["current_index"] + i
# current_index는 전체 목록에서의 현재 위치를 나타냄
current_url_index = current_progress["current_index"]
current_progress["current_url"] = url
print(f"\n🔄 Processing {actual_index + 1}/{current_progress['total']}: {url}")
print(f"📍 {os.path.basename(filepath)}{start_line + actual_index}번째 줄")
print(f"\n🔄 Processing {current_url_index + 1}/{current_progress['total']}: {url}")
print(f"📍 {os.path.basename(filepath)}{start_line + current_url_index}번째 줄")
# 재시도 큐 상태 확인 및 출력
retry_status = await get_retry_queue_status()
if retry_status["queue_length"] > 0:
print(f"⏳ 재시도 큐에 {retry_status['queue_length']}개 작업 대기 중")
if i > 0:
print("⏳ API 쿼터 보호를 위해 30초 대기 중...")
@ -100,7 +113,23 @@ async def main_loop(
await scan_one_url(url, skip_html_check=skip_html_check)
current_progress["current_index"] = actual_index + 1
# 스캔 완료 후 재시도 큐 상태 확인
retry_status_after = await get_retry_queue_status()
if retry_status_after["queue_length"] > 0:
print(f"📊 스캔 완료 후 재시도 큐 상태: {retry_status_after['queue_length']}개 작업 대기 중")
# 다음 URL로 진행
current_progress["current_index"] = current_url_index + 1
save_progress()
print(f"\n🎉 모든 스캔이 완료되었습니다! ({current_progress['total']}개 URL)")
# 모든 URL 처리 완료 후 재시도 큐가 빌 때까지 대기
print("\n🔄 모든 URL 처리 완료. 재시도 큐 처리 대기 중...")
while True:
retry_status = await get_retry_queue_status()
if retry_status["queue_length"] == 0:
break
print(f"⏳ 재시도 큐에 {retry_status['queue_length']}개 작업 남음. 30초 후 다시 확인...")
await asyncio.sleep(30)
print(f"\n🎉 모든 스캔이 완료되었습니다! ({total_count}개 URL)")
print("🎉 재시도 큐도 모두 처리되었습니다!")

View file

@ -2,5 +2,5 @@ from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = None # "success", "mfa_required", "google_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
status: str | None = None # "success", "mfa_required", "blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
final_url: str | None = None

View file

@ -103,7 +103,7 @@ Return the result in the following format only:
```json
{{
"msg": "Google login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "google_blocked" | "sso_not_found" | "login_page_not_found",
"status": "success" | "already_logged_in" | "mfa_required" | "window_blocked" | "idpw_required" | "blocked" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```