diff --git a/.env.example b/.env.example index b9c2c6e..d88025c 100644 --- a/.env.example +++ b/.env.example @@ -5,6 +5,10 @@ GOOGLE_API_KEY= GOOGLE_MODEL=gemini-2.5-flash-preview-05-20 GOOGLE_PLANNER_MODEL=gemini-2.5-flash-preview-05-20 +# min(INITIAL_BACKOFF * (2 ** try_cnt), MAX_BACKOFF)만큼 API가 실패시 대기합니다. +INITIAL_BACKOFF=60 +MAX_BACKOFF=600 + # 선택 PROXY_HOST=127.0.0.1 PROXY_PORT=11080 diff --git a/README.md b/README.md index f405fcf..76fc07f 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,12 @@ Environment는 .env.example에 따라 설정되어야합니다. .env.example을 .env로 복사하여서 사용해주세요. +# 쿠키와 로컬 스토리지 설정 방법 + +```sh +playwright open https://google.com/ --save-storage=./data/storage_state.json +``` + # 실행 ```sh diff --git a/login.py b/login.py deleted file mode 100644 index c4fc7db..0000000 --- a/login.py +++ /dev/null @@ -1,22 +0,0 @@ -from playwright.sync_api import sync_playwright - -def launch_browser_with_profile(): - p = sync_playwright().start() - browser = p.chromium.launch_persistent_context( - user_data_dir="./data/user_data", - headless=False - ) - page = browser.new_page() - return browser, page, p - -if __name__ == "__main__": - browser, page, playwright = launch_browser_with_profile() - page.goto("https://google.com") - print("Browser launched with user data profile.") - - # 브라우저가 열린 상태를 유지 - input("Press Enter to close the browser...") - - # 리소스 정리 - browser.close() - playwright.stop() \ No newline at end of file diff --git a/main.py b/main.py index ddeded9..63aefac 100644 --- a/main.py +++ b/main.py @@ -22,7 +22,11 @@ from lib.read_txt import read_lines_between from lib.prompt import extend_planner_system_message from lib.logger import logger -load_dotenv() +load_dotenv(verbose=True, override=True) + +# Exponential backoff settings +INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", "60")) # seconds +MAX_BACKOFF = int(os.getenv("MAX_BACKOFF", "600")) # seconds if os.getenv("GOOGLE_API_KEY") is None: raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.") @@ -33,13 +37,19 @@ if os.getenv("GOOGLE_PLANNER_MODEL") is None: backend_url = os.getenv("BACKEND_URL", "http://localhost:11081") +print("🔧 환경 설정:") +print(f"🔗 Backend URL: {backend_url}") +api_key = os.getenv('GOOGLE_API_KEY') +print(f"🔑 Google API Key: {api_key[-4:] if api_key else None}") +print(f"🌐 Google Model: {os.getenv('GOOGLE_MODEL')}") +print(f"🌐 Google Planner Model: {os.getenv('GOOGLE_PLANNER_MODEL')}") # API 쿼터 처리를 위한 콜백 핸들러 class QuotaExhaustedHandler(BaseCallbackHandler): def on_llm_error(self, error, **kwargs): if "ResourceExhausted" in str(error) or "429" in str(error): - print("⚠️ API 쿼터가 소진되었습니다. 120초 대기 후 재시도합니다...") - time.sleep(120) + print("⚠️ API 쿼터가 소진되었습니다. 재시도 로직에 위임합니다...") + # backoff handled in scan_one_url def CreateChatGoogleGenerativeAI(model: str): @@ -112,6 +122,8 @@ async def scan_one_url(url: str, skip_html_check: bool = False): except Exception as e: print(f"⚠️ Failed to notify backend: {e}") + agent = None + session = None try_cnt = 0 while True: proxy_host = os.getenv("PROXY_HOST") @@ -127,12 +139,16 @@ async def scan_one_url(url: str, skip_html_check: bool = False): user_data_path = Path("./data/user_data").resolve() user_data_path.mkdir(parents=True, exist_ok=True) + storage_state_path = Path("./data/storage_state.json").resolve() + # BrowserProfile에 모든 설정 포함 profile = BrowserProfile( disable_security=True, stealth=True, headless=False, - user_data_dir=str(user_data_path), + #user_data_dir=str(user_data_path), + user_data_dir=None, + storage_state=str(storage_state_path) if storage_state_path.exists() else None, viewport={"width": 1600, "height": 900}, # 프록시 설정 proxy={"server": proxy_url} if proxy_url else None, @@ -156,112 +172,76 @@ async def scan_one_url(url: str, skip_html_check: bool = False): browser_profile=profile, ) - # Agent 생성 - initial_actions = [ - {"open_tab": {"url": target_url}}, - ] - + # Agent 생성 및 실행 (단일 try-except with 백오프) + initial_actions = [{"open_tab": {"url": target_url}}] controller = Controller(output_model=OAuthList) - - # API 쿼터 문제 해결을 위한 LLM 생성 - print("🤖 LLM 모델 초기화 중...") - + print("🤖 LLM 모델 초기화 및 스캔 시작...") try: agent = Agent( browser_session=session, initial_actions=initial_actions, - task=f"Navigate to the login page, and collect the OAuth provider buttons and their login URLs. Ignore Passkey.", - llm=CreateChatGoogleGenerativeAI( - os.getenv("GOOGLE_MODEL") or "fallback" - ), - planner_llm=CreateChatGoogleGenerativeAI( - os.getenv("GOOGLE_PLANNER_MODEL") or "fallback" - ), + task="Navigate to the login page, and collect the OAuth provider buttons and their login URLs. Ignore Passkey.", + llm=CreateChatGoogleGenerativeAI(os.getenv("GOOGLE_MODEL") or "fallback"), + planner_llm=CreateChatGoogleGenerativeAI(os.getenv("GOOGLE_PLANNER_MODEL") or "fallback"), controller=controller, extend_planner_system_message=extend_planner_system_message, ) - except Exception as e: - print(f"⚠️ Agent 생성 실패: {e}") - # API 쿼터 문제일 경우 더 긴 대기 - if "ResourceExhausted" in str(e) or "429" in str(e): - print("⚠️ API 쿼터 문제로 인한 Agent 생성 실패. 5분 대기 후 재시도...") - await asyncio.sleep(300) - await clean_resources(agent=None, session=session) - continue - - try: - # 4) 실제 스캔 실행 - print("🔍 스캔 시작...") response = await agent.run() final_result = response.final_result() if final_result is None: raise ValueError("final_result()가 None을 반환했습니다.") - - data = json.loads(final_result) - try: - oauth_entries: List[OAuth] = [ - OAuth(**entry) for entry in data["oauth_providers"] - ] - except Exception as e: - raise ValueError(f"결과 파싱 실패: {e}\n원본 결과: {final_result}") - - # 5) 결과 출력 - print("-" * 50) - print(f"🔗 Scanned URL: {url}\n") - print("🔐 Detected OAuth Providers and URLs:") - for entry in oauth_entries: - if "<" in entry.oauth_uri or "..." in entry.oauth_uri: - print( - f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n" - ) - else: - print(f"- {entry.provider}: {entry.oauth_uri}") - print("-" * 50) - - # 6) CSV에 저장 (append) - csv_file = "./oauth_providers.csv" - file_exists = os.path.isfile(csv_file) - with open(csv_file, "a", newline="", encoding="utf-8") as f: - writer = csv.writer(f) - if not file_exists: - writer.writerow(["issuer", "provider", "oauth_uri"]) - - # 실제 데이터 저장 - for entry in oauth_entries: - writer.writerow([url, entry.provider, entry.oauth_uri]) - await clean_resources(agent, session) - - # 성공적으로 처리했으므로 반복문 탈출 - break - except Exception as e: await clean_resources(agent, session) - # API 쿼터 문제인지 확인 if "ResourceExhausted" in str(e) or "429" in str(e): - print(f"⚠️ API 쿼터 소진 에러: {e}") - print("⚠️ 5분 대기 후 재시도합니다...") - await asyncio.sleep(300) # 5분 대기 + wait = min(INITIAL_BACKOFF * (2 ** try_cnt), MAX_BACKOFF) + print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...") + await asyncio.sleep(wait) try_cnt += 1 if try_cnt >= 3: - print(f"❌ {url} 스캔에 실패했습니다. API 쿼터 문제가 지속됩니다.") - logger(f"❌ {url} 스캔에 실패했습니다. API 쿼터 문제: {e}") + print(f"❌ {url} 스캔 실패: API 쿼터 문제가 지속됩니다.") + logger(f"❌ {url} 스캔 실패: API 쿼터 문제: {e}") return continue - - if try_cnt >= 2: - print(f"❌ {url} 스캔에 실패했습니다. 에러: {e}") - logger(f"❌ {url} 스캔에 실패했습니다. 에러: {e}") - return - + # 일반 에러 처리 try_cnt += 1 + if try_cnt >= 3: + print(f"❌ {url} 스캔 실패: 에러: {e}") + logger(f"❌ {url} 스캔 실패: 에러: {e}") + return print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...") - - # 일반 에러의 경우 짧게 대기 await asyncio.sleep(30) - # 반복문을 통해 재시도 continue + # 스캔 결과 처리 + data = json.loads(final_result) + try: + oauth_entries = [OAuth(**entry) for entry in data["oauth_providers"]] + except Exception as e: + raise ValueError(f"결과 파싱 실패: {e}\n원본 결과: {final_result}") + + print("-" * 50) + print(f"🔗 Scanned URL: {url}\n") + print("🔐 Detected OAuth Providers and URLs:") + for entry in oauth_entries: + if "<" in entry.oauth_uri or "..." in entry.oauth_uri: + print(f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n") + else: + print(f"- {entry.provider}: {entry.oauth_uri}") + print("-" * 50) + + # CSV에 저장 (append) + csv_file = "./oauth_providers.csv" + file_exists = os.path.isfile(csv_file) + with open(csv_file, "a", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + if not file_exists: + writer.writerow(["issuer", "provider", "oauth_uri"]) + for entry in oauth_entries: + writer.writerow([url, entry.provider, entry.oauth_uri]) + await clean_resources(agent, session) + break + async def loop( filepath: str, start_line: int, end_line: int, skip_html_check: bool = False diff --git a/main.py.bak b/main.py.bak deleted file mode 100644 index d629731..0000000 --- a/main.py.bak +++ /dev/null @@ -1,322 +0,0 @@ -import asyncio -import json -import os -import csv -import argparse -import requests -import time -from typing import List -from dotenv import load_dotenv -from pydantic import BaseModel -from langchain_google_genai import ChatGoogleGenerativeAI -from langchain.callbacks.base import BaseCallbackHandler -from browser_use import ( - Agent, - Browser, - BrowserConfig, - BrowserSession, - BrowserProfile, - Controller, -) -from browser_use.browser.context import BrowserContext, BrowserContextConfig -from playwright.async_api import async_playwright - -# from lib import browser_config -# from lib.browser_config import browser_config_kwargs -from lib.is_html import is_html_url -from lib.read_txt import read_lines_between -from lib.prompt import extend_planner_system_message -from lib.logger import logger - -load_dotenv() - -if os.getenv("GOOGLE_API_KEY") is None: - raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.") -if os.getenv("GOOGLE_MODEL") is None: - raise ValueError("GOOGLE_MODEL 환경변수가 설정되지 않았습니다.") -if os.getenv("GOOGLE_PLANNER_MODEL") is None: - raise ValueError("GOOGLE_PLANNER_MODEL 환경변수가 설정되지 않았습니다.") - -backend_url = os.getenv("BACKEND_URL", "http://localhost:11081") - - -# API 쿼터 처리를 위한 콜백 핸들러 -class QuotaExhaustedHandler(BaseCallbackHandler): - def on_llm_error(self, error, **kwargs): - if "ResourceExhausted" in str(error) or "429" in str(error): - print("⚠️ API 쿼터가 소진되었습니다. 60초 대기 후 재시도합니다...") - time.sleep(60) - - -def create_llm_with_retry(): - """재시도 로직이 포함된 LLM 생성""" - return ChatGoogleGenerativeAI( - model=os.getenv("GOOGLE_MODEL"), - max_retries=5, # 최대 재시도 횟수 증가 - request_timeout=120, # 타임아웃 시간 증가 - callbacks=[QuotaExhaustedHandler()], - # API 호출 간격 조정 - temperature=0.1, - ) - - -def create_planner_llm_with_retry(): - """플래너용 재시도 로직이 포함된 LLM 생성""" - return ChatGoogleGenerativeAI( - model=os.getenv("GOOGLE_PLANNER_MODEL"), - max_retries=5, # 최대 재시도 횟수 증가 - request_timeout=120, # 타임아웃 시간 증가 - callbacks=[QuotaExhaustedHandler()], - # API 호출 간격 조정 - temperature=0.1, - ) - - -# 출력 모델 -class OAuth(BaseModel): - provider: str - oauth_uri: str - - -class OAuthList(BaseModel): - oauth_providers: List[OAuth] - - -async def clean_resources(agent, session, browser, playwright): - """리소스를 정리하는 함수""" - try: - await agent.close() - except Exception as e: - print(f"⚠️ 에이전트 리소스 정리 실패: {e}") - try: - await session.close() - except Exception as e: - print(f"⚠️ 세션 리소스 정리 실패: {e}") - try: - await browser.close() - except Exception as e: - print(f"⚠️ 브라우저 리소스 정리 실패: {e}") - try: - await playwright.stop() - except Exception as e: - print(f"⚠️ Playwright 리소스 정리 실패: {e}") - - -# ── URL별로 Browser를 새로 띄우는 함수 ── -async def scan_one_url(url: str, skip_html_check: bool = False): - target_url = url if url.startswith("http") else f"https://{url}" - print(f"🚀 Starting scan for: {target_url}") - - # 1) URL이 HTML 페이지인지 확인 - if not is_html_url(target_url) and not skip_html_check: - print(f"❌ {target_url} 은(는) HTML이 아닙니다. 스킵합니다.") - return - - # Backend에 스캔 시작을 알림 - try: - response = requests.post( - f"{backend_url}/start", params={"url": target_url}, timeout=5 - ) - if response.status_code == 200: - print(f"✅ Backend notified: {response.text}") - else: - print(f"⚠️ Backend notification failed: {response.status_code}") - except requests.exceptions.ConnectionError: - print( - f"⚠️ Backend server not available at {backend_url}. Continuing without notification." - ) - except requests.exceptions.Timeout: - print(f"⚠️ Backend notification timed out. Continuing without notification.") - except Exception as e: - print(f"⚠️ Failed to notify backend: {e}") - - try_cnt = 0 - while True: - proxy_host = os.getenv("PROXY_HOST") - proxy_port = os.getenv("PROXY_PORT") - proxy_url = None - if proxy_host and proxy_port: - proxy_url = f"http://{proxy_host}:{proxy_port}" - print(f"🔗 Using proxy: {proxy_host}:{proxy_port}") - else: - print("🔗 No proxy configured, using direct connection.") - break - - # 2) Browser + Context 생성 - playwright = await async_playwright().start() - browser = await playwright.chromium.launch( - proxy={"server": proxy_url} if proxy_url else None, - headless=False, # headless 모드 사용 여부 - args=[ - "--disable-web-security", - "--disable-features=VizDisplayCompositor", - "--disable-site-isolation-trials", - "--disable-features=IsolateOrigins,site-per-process", - "--disable-popup-blocking", - "--disable-dev-shm-usage", - f"--lang=" + os.getenv("LANG", "en_US"), - "--ignore-certificate-errors", - "--ignore-ssl-errors", - "--allow-running-insecure-content", - "--restore-last-session" - ], - ) - - - - os.makedirs("./data", exist_ok=True) - - profile = BrowserProfile( - stealth=True, - headless=False, # headless 모드 사용 여부 - user_data_dir="./data/user_data", - viewport={"width": 1600, "height": 900}, - ) - - # BrowserSession 생성 시 headless 옵션을 명시적으로 설정 - context = await browser.new_context() - - session = BrowserSession( - browser_context=await browser.new_context(), - ) - - # 3) Agent, Controller 생성 - initial_actions = [ - {"open_tab": {"url": target_url}}, - ] controller = Controller(output_model=OAuthList) - - # API 쿼터 문제 해결을 위한 LLM 생성 - print("🤖 LLM 모델 초기화 중...") - - agent = Agent( - browser_session=session, - browser_profile=profile, - browser_context=context, - - initial_actions=initial_actions, - task=f"Navigate to the login page, and collect the OAuth provider buttons and their login URLs. Ignore Passkey.", - - llm=create_llm_with_retry(), - planner_llm=create_planner_llm_with_retry(), - controller=controller, - extend_planner_system_message=extend_planner_system_message, - retry_delay=120, # 재시도 간격을 2분으로 증가 - ) - - try: - # 4) 실제 스캔 실행 - response = await agent.run() - final_result = response.final_result() - if final_result is None: - raise ValueError("final_result()가 None을 반환했습니다.") - - data = json.loads(final_result) - try: - oauth_entries: List[OAuth] = [ - OAuth(**entry) for entry in data["oauth_providers"] - ] - except Exception as e: - raise ValueError(f"결과 파싱 실패: {e}\n원본 결과: {final_result}") - - # 5) 결과 출력 - print("-" * 50) - print(f"🔗 Scanned URL: {url}\n") - print("🔐 Detected OAuth Providers and URLs:") - for entry in oauth_entries: - if "<" in entry.oauth_uri or "..." in entry.oauth_uri: - print( - f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n" - ) - else: - print(f"- {entry.provider}: {entry.oauth_uri}") - print("-" * 50) - - # 6) CSV에 저장 (append) - csv_file = "./oauth_providers.csv" - file_exists = os.path.isfile(csv_file) - with open(csv_file, "a", newline="", encoding="utf-8") as f: - writer = csv.writer(f) - if not file_exists: - writer.writerow(["issuer", "provider", "oauth_uri"]) - await clean_resources(agent, session, browser, playwright) - - # 성공적으로 처리했으므로 반복문 탈출 - break - - except Exception as e: - await clean_resources(agent, session, browser, playwright) - - if try_cnt >= 1: - print(f"❌ {url} 스캔에 실패했습니다. 에러: {e}") - logger(f"❌ {url} 스캔에 실패했습니다. 에러: {e}") - return - try_cnt += 1 - print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...") - - # 1분 대기 - await asyncio.sleep(5) - # 반복문을 통해 재시도 - continue - - -async def loop( - filepath: str, start_line: int, end_line: int, skip_html_check: bool = False -): - # 인자값으로 받은 파일 경로와 줄 범위를 통해 도메인 리스트 생성 - target_list = read_lines_between( - filepath=filepath, start_line=start_line, end_line=end_line - ) - - # (필요하다면) 강제 설정이 필요한 경우, 아래 주석을 해제하여 target_list[0] 등을 덮어쓸 수 있습니다. - #target_list[0] = "velog.io" - - for url in target_list: - # scan_one_url은 외부에 정의된 비동기 함수라고 가정합니다. - # 실제로 scan_one_url이 정의된 위치를 import하거나 - # 모듈 수준에 구현해두셔야 합니다. - await scan_one_url(url, skip_html_check=skip_html_check) - - -def main(): - parser = argparse.ArgumentParser( - prog="domain_scanner", - description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다.", - ) - - # 커맨드라인 인자로 받을 옵션들 정의 - parser.add_argument( - "-f", - "--file", - type=str, - required=True, - help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)", - ) - parser.add_argument( - "-s", "--start", type=int, required=True, help="읽기 시작 줄 번호 (1-based)" - ) - parser.add_argument( - "-e", "--end", type=int, required=True, help="읽기 종료 줄 번호 (1-based)" - ) - parser.add_argument( - "-skh", - "--skip-html-check", - type=bool, - default=False, - help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다. (기본값: False)", - ) - - args = parser.parse_args() - - # 인자값을 비동기 함수에 전달 - asyncio.run( - loop( - filepath=args.file, - start_line=args.start, - end_line=args.end, - skip_html_check=args.skip_html_check, - ) - ) - - -if __name__ == "__main__": - main()