From 4b3637b762f18ead1b0950f9690fe73ea586bea7 Mon Sep 17 00:00:00 2001 From: imnyang Date: Mon, 23 Jun 2025 00:15:03 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20OAuth=20=EB=A6=AC=EC=8A=A4=ED=8A=B8=20?= =?UTF-8?q?=EC=B6=94=EC=B6=9C=20=EB=B0=8F=20=EB=A1=9C=EA=B7=B8=EC=9D=B8=20?= =?UTF-8?q?=EA=B8=B0=EB=8A=A5=20=EA=B0=9C=EC=84=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - README.md: uv 실행 명령어 수정 - lib/llm/prompt: OAuth 리스트 추출 및 fallback 프롬프트 추가 - lib/utils/browser_use: 프로필 생성 시 스토리지 상태 파일 처리 개선 - lib/utils/browser_use/func: 안전한 JSON 읽기 및 쓰기 함수 추가 - main.py: OAuth 리스트 추출 및 개별 로그인 시도 통합 - model.py: OAuth 모델 수정 --- README.md | 4 +- lib/llm/prompt/__init__.py | 150 ++------------------- lib/llm/prompt/auth_list.py | 58 +++++++++ lib/llm/prompt/fallback.py | 135 +++++++++++++++++++ lib/utils/browser_use/__init__.py | 21 ++- lib/utils/browser_use/func.py | 65 +++++++++- lib/utils/browser_use/model.py | 8 +- main.py | 209 ++++++++++++++++++++++-------- 8 files changed, 444 insertions(+), 206 deletions(-) create mode 100644 lib/llm/prompt/auth_list.py create mode 100644 lib/llm/prompt/fallback.py diff --git a/README.md b/README.md index 9128804..c9ade30 100644 --- a/README.md +++ b/README.md @@ -96,8 +96,8 @@ curl "https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt" -o domains.txt ``` ```sh -# uv run run.py {domains.txt 시작 줄} {domains.txt 끝 줄} {HTML 검사 Skip} -uv run run.py 12540 13000 False +# uv run run.py {domains.txt 시작 줄} {domains.txt 끝 줄} {--skh} +uv run run.py 1 100 --skh ``` # 참고하면 좋을만한 것 diff --git a/lib/llm/prompt/__init__.py b/lib/llm/prompt/__init__.py index 7f1c44c..314b2dd 100644 --- a/lib/llm/prompt/__init__.py +++ b/lib/llm/prompt/__init__.py @@ -3,140 +3,16 @@ from dotenv import load_dotenv load_dotenv(override=True) -# Extended planner prompt -extend_planner_system_message = f""" -🎯 목적: 웹 자동화를 위한 **SSO 로그인 리디렉션 URL 수집** - -📌 주의사항 (전제 조건) -- ❌ **검색 엔진(Google, Bing 등) 사용 금지** -- ✅ **초기 제공된 URL 내에서만 탐색** -- ❌ 직접 이동하거나 추측한 링크 클릭 금지 -- ⛔ 추측한 URL은 대답하거나 클릭하지 마세요 -- OAuth가 아닌 일반 로그인은 무시 -- OAuth가 없다면 **즉시 중단**하고 빈 배열 반환 - ---- - -## 🧩 Step 0: 페이지 차단(Block) 여부 확인 - -초기 URL의 로그인 페이지에 접근하여 다음 사항을 점검합니다: - -- 🚫 페이지 차단됨 (Firewall, Access Denied 등) → 즉시 중단 -- 🔒 CAPTCHA는 통과 가능 (해결하고 계속 진행) -- ❗ 로그인 UI가 정상적으로 로드되지 않으면 중단 - -📤 차단 시 즉시 반환: - -```json -[ - {{ - "provider": "Blocked", - "oauth_uri": "-" - }} -] -```` - ---- - -## 🔍 Step 1: 로그인 페이지 탐색 - -* 초기 URL에 접속하여 **클라이언트용 로그인 페이지**로 진입합니다. -* 쿠키 동의, 개인정보 안내 등 팝업은 무시하거나 닫고 계속 진행하세요. -* 페이지가 정상 로드되었다고 가정합니다. - ---- - -## 👀 Step 2: SSO 로그인 버튼 식별 - -아래 **OAuth SSO 버튼들만** 유효합니다: - -* ✅ Google, GitHub, Facebook, LinkedIn, Microsoft, Naver - -**유효한 버튼 기준**: - -* OAuth 인증 흐름을 실제로 트리거 -* `window.location` 또는 `` 또는 JS로 redirect가 발생 - -**제외 버튼들 (클릭 금지)**: - -* ❌ 일반 로그인, 패스키, 이메일/전화번호, 인증서 기반, 비밀번호 입력 - ---- - -## ✅ Step 3: 모든 SSO 버튼 클릭 및 로그인 시도 - -> 각 SSO 로그인 버튼을 클릭한 뒤 반드시 아래 절차를 **완전히 수행**해야 합니다. - -각 SSO 버튼에 대해 다음을 수행: - -1. 버튼 클릭 -2. 🌐 페이지가 이동되면, **현재 주소창(URL)을 확인하여 리디렉션된 OAuth URL**을 `oauth_uri`로 저장 - → 예: `https://accounts.google.com/o/oauth2/auth?...` -3. ✅ 로그인 진행: - - 로그인 페이지에서 OAuth 인증을 완료합니다. - - sign in with your username(email) x_username and password is x_password - - 버튼같은게 안눌리면 새로고침을 해봐 - - **로그인 완료 후 authorize 등 버튼이 있으면 클릭** - - GitHub같은 경우 Authorize 버튼이 뜨는데 오래걸릴 수 있음, 기다려야 할 수도 있음 - - 만약 버튼을 눌러도 반응이 없을 경우 새로고침을 한번 해주세요. - - 로그인 실패 시에는 다음 SSO 버튼을 클릭합니다. -4. 로그인이 성공하면 모두 쿠키를 삭제하고 다음 SSO 버튼을 클릭합니다. -5. 다음 SSO 버튼으로 반복 진행 - -쿠키 삭제 방법: -chrome://settings/clearBrowserData에 들어가서 삭제해주세요. - -🛑 절대 아래와 같이 해석하지 말 것: -- ❌ 버튼 클릭 후 페이지 로딩만 기다리고 돌아가기 -- ❌ URL 저장 없이 go_back() 호출 - -📤 각 로그인 후 다음 형식으로 결과 저장: - -```json -[ - {{ - "provider": "Google", - "oauth_uri": "https://example.com/auth/google?client_id=..." - }} -] -```` - -```` - ---- - -### ✨ 추가 안전 장치: "뒤로가기(go_back) 호출 조건" 제한 - -```text -🛑 뒤로가기(go_back)은 다음 조건이 모두 충족될 때만 사용 => 다만 로그인 실패 시, 뒤로가기 수행: -- ✅ 로그인 흐름이 완료됨 (예: redirect back to app, or callback URL) -- ✅ 현재 리디렉션 URL이 수집됨 -- ✅ 결과에 저장 후 다음 버튼 탐색을 위해 복귀 필요할 때 -``` - ---- - -## 🚫 Step 4: 버튼 없음 또는 예외 발생 시 - -* 유효한 SSO 버튼이 **전혀 없을 경우** -* 예외, 오류 등 발생 시 - -📤 즉시 중단 후 다음 형식으로 반환: - -```json -[] -``` - ---- - -## 📎 중요 규칙 요약 - -* ✅ **모든 SSO 로그인은 반드시 실행** (가능한 버튼은 모두 클릭) -* 🔁 단계는 반드시 순서대로 진행 -* 🔐 로그인은 쿠키/세션으로 유지된 상태에서 수행 -* 🚫 직접 ID/PW 입력하지 않음 -* ⛔ 추측 URL 클릭 금지 -* ❗ 예외 발생 시 반드시 규정된 JSON 포맷만 반환 - ---- -""" \ No newline at end of file +def get_prompt(type:str) -> str: + """ + Prompt를 반환합니다. + + :param type: 'extend_planner' 또는 'oauth_login' + :return: 해당하는 프롬프트 문자열 + """ + if type == "auth": + from lib.llm.prompt.auth_list import extract_oauth_list_prompt + return extract_oauth_list_prompt + else: + from lib.llm.prompt.fallback import extend_planner_system_message + return extend_planner_system_message \ No newline at end of file diff --git a/lib/llm/prompt/auth_list.py b/lib/llm/prompt/auth_list.py new file mode 100644 index 0000000..193c472 --- /dev/null +++ b/lib/llm/prompt/auth_list.py @@ -0,0 +1,58 @@ +# OAuth 리스트 추출용 프롬프트 (클릭하지 않고 단순 식별만) +extract_oauth_list_prompt = f""" +🎯 목적: 로그인 페이지에서 **OAuth 제공자 리스트 추출** + +📌 주요 규칙: +- ❌ **OAuth 버튼을 클릭하지 마세요** +- ✅ **OAuth 제공자만 식별하고 리스트 작성** +- ❌ 일반 로그인은 무시 +- ❌ 검색 엔진 사용 금지 + +--- + +## 🔍 Step 1: 로그인 페이지 접근 + +* 초기 URL에 접속하여 **클라이언트용 로그인 페이지**로 진입합니다. +* 쿠키 동의, 팝업 등은 무시하거나 닫고 계속 진행하세요. + +--- + +## 👀 Step 2: OAuth 제공자 식별 + +아래 **OAuth SSO 버튼들만** 식별합니다: + +**유효한 OAuth 제공자들**: +* ✅ Google, GitHub, Facebook, LinkedIn, Microsoft, Naver, Kakao, Apple, Twitter/X +* ✅ "Continue with..." 또는 "Sign in with..." 버튼들 +* ✅ OAuth 아이콘이 있는 버튼들 + +**제외할 항목들**: +* ❌ 일반 로그인 (이메일/비밀번호 입력) +* ❌ 패스키 (Passkey) +* ❌ 전화번호 인증 +* ❌ 인증서 기반 로그인 + +--- + +## 📝 Step 3: 결과 반환 + +발견된 OAuth 제공자들을 다음 형식으로 반환: + +```json +{{ + "oauth_providers": [ + {{ + "provider": "Google", + "oauth_uri": "" + }}, + {{ + "provider": "GitHub", + "oauth_uri": "" + }} + ] +}} +``` + +⚠️ **중요**: 버튼을 클릭하지 마세요. 단순히 식별만 하면 됩니다. +""" + diff --git a/lib/llm/prompt/fallback.py b/lib/llm/prompt/fallback.py new file mode 100644 index 0000000..91dd959 --- /dev/null +++ b/lib/llm/prompt/fallback.py @@ -0,0 +1,135 @@ +# Extended planner prompt +extend_planner_system_message = f""" +🎯 목적: 웹 자동화를 위한 **SSO 로그인 리디렉션 URL 수집** + +📌 주의사항 (전제 조건) +- ❌ **검색 엔진(Google, Bing 등) 사용 금지** +- ✅ **초기 제공된 URL 내에서만 탐색** +- ❌ 직접 이동하거나 추측한 링크 클릭 금지 +- ⛔ 추측한 URL은 대답하거나 클릭하지 마세요 +- OAuth가 아닌 일반 로그인은 무시 +- OAuth가 없다면 **즉시 중단**하고 빈 배열 반환 + +--- + +## 🧩 Step 0: 페이지 차단(Block) 여부 확인 + +초기 URL의 로그인 페이지에 접근하여 다음 사항을 점검합니다: + +- 🚫 페이지 차단됨 (Firewall, Access Denied 등) → 즉시 중단 +- 🔒 CAPTCHA는 통과 가능 (해결하고 계속 진행) +- ❗ 로그인 UI가 정상적으로 로드되지 않으면 중단 + +📤 차단 시 즉시 반환: + +```json +[ + {{ + "provider": "Blocked", + "oauth_uri": "-" + }} +] +```` + +--- + +## 🔍 Step 1: 로그인 페이지 탐색 + +* 초기 URL에 접속하여 **클라이언트용 로그인 페이지**로 진입합니다. +* 쿠키 동의, 개인정보 안내 등 팝업은 무시하거나 닫고 계속 진행하세요. +* 페이지가 정상 로드되었다고 가정합니다. + +--- + +## 👀 Step 2: SSO 로그인 버튼 식별 + +아래 **OAuth SSO 버튼들만** 유효합니다: + +* ✅ Google, GitHub, Facebook, LinkedIn, Microsoft, Naver + +**유효한 버튼 기준**: + +* OAuth 인증 흐름을 실제로 트리거 +* `window.location` 또는 `` 또는 JS로 redirect가 발생 + +**제외 버튼들 (클릭 금지)**: + +* ❌ 일반 로그인, 패스키, 이메일/전화번호, 인증서 기반, 비밀번호 입력 + +--- + +## ✅ Step 3: 모든 SSO 버튼 클릭 및 로그인 시도 + +> 각 SSO 로그인 버튼을 클릭한 뒤 반드시 아래 절차를 **완전히 수행**해야 합니다. + +각 SSO 버튼에 대해 다음을 수행: + +1. 버튼 클릭 +2. 🌐 페이지가 이동되면, **현재 주소창(URL)을 확인하여 리디렉션된 OAuth URL**을 `oauth_uri`로 저장 + → 예: `https://accounts.google.com/o/oauth2/auth?...` +3. ✅ 로그인 진행: + - 로그인 페이지에서 OAuth 인증을 완료합니다. + - sign in with your username(email) x_username and password is x_password + - 버튼같은게 안눌리면 새로고침을 해봐 + - **로그인 완료 후 authorize 등 버튼이 있으면 클릭** + - GitHub같은 경우 Authorize 버튼이 뜨는데 오래걸릴 수 있음, 기다려야 할 수도 있음 + - 만약 버튼을 눌러도 반응이 없을 경우 새로고침을 한번 해주세요. + - 로그인 실패 시에는 다음 SSO 버튼을 클릭합니다. +4. 로그인이 성공하면 모두 쿠키를 삭제하고 다음 SSO 버튼을 클릭합니다. +5. 다음 SSO 버튼으로 반복 진행 + +쿠키 삭제 방법: +chrome://settings/clearBrowserData에 들어가서 삭제해주세요. + +🛑 절대 아래와 같이 해석하지 말 것: +- ❌ 버튼 클릭 후 페이지 로딩만 기다리고 돌아가기 +- ❌ URL 저장 없이 go_back() 호출 + +📤 각 로그인 후 다음 형식으로 결과 저장: + +```json +[ + {{ + "provider": "Google", + "oauth_uri": "https://example.com/auth/google?client_id=..." + }} +] +```` + +--- + +### ✨ 추가 안전 장치: "뒤로가기(go_back) 호출 조건" 제한 + +```text +🛑 뒤로가기(go_back)은 다음 조건이 모두 충족될 때만 사용 => 다만 로그인 실패 시, 뒤로가기 수행: +- ✅ 로그인 흐름이 완료됨 (예: redirect back to app, or callback URL) +- ✅ 현재 리디렉션 URL이 수집됨 +- ✅ 결과에 저장 후 다음 버튼 탐색을 위해 복귀 필요할 때 +``` + +--- + +## 🚫 Step 4: 버튼 없음 또는 예외 발생 시 + +* 유효한 SSO 버튼이 **전혀 없을 경우** +* 예외, 오류 등 발생 시 + +📤 즉시 중단 후 다음 형식으로 반환: + +```json +[] +``` + +--- + +## 📎 중요 규칙 요약 + +* ✅ **모든 SSO 로그인은 반드시 실행** (가능한 버튼은 모두 클릭) +* 🔁 단계는 반드시 순서대로 진행 +* 🔐 로그인은 쿠키/세션으로 유지된 상태에서 수행 +* 🚫 직접 ID/PW 입력하지 않음 +* ⛔ 추측 URL 클릭 금지 +* ❗ 예외 발생 시 반드시 규정된 JSON 포맷만 반환 + +--- +""" diff --git a/lib/utils/browser_use/__init__.py b/lib/utils/browser_use/__init__.py index c38ca03..8b85b50 100644 --- a/lib/utils/browser_use/__init__.py +++ b/lib/utils/browser_use/__init__.py @@ -1,11 +1,26 @@ +import os from lib.utils.browser_use.func import * # Initialize configuration proxy_url = setup_proxy() -# Create browser profile async def GetProfile(): storage_state_path = await setup_storage_state() + + # Handle potential encoding issues with storage state file + try: + if storage_state_path and os.path.exists(storage_state_path): + # Test if file can be read properly, if not, skip it + with open(storage_state_path, 'r', encoding='utf-8') as f: + f.read() + storage_state = storage_state_path + else: + print("⚠️ Storage state file not found or inaccessible, proceeding without it.") + storage_state = None + except (UnicodeDecodeError, FileNotFoundError): + # If there's an encoding error, don't use the storage state + storage_state = None + profile = BrowserProfile( # Security settings disable_security=True, @@ -19,7 +34,7 @@ async def GetProfile(): # Data persistence user_data_dir=None, - storage_state=storage_state_path, + storage_state=storage_state, # Network settings proxy={"server": proxy_url} if proxy_url else None, @@ -28,4 +43,4 @@ async def GetProfile(): args=get_browser_args(), ) - return profile \ No newline at end of file + return profile diff --git a/lib/utils/browser_use/func.py b/lib/utils/browser_use/func.py index 31eee31..afc6d52 100644 --- a/lib/utils/browser_use/func.py +++ b/lib/utils/browser_use/func.py @@ -1,4 +1,5 @@ import os +import json from pathlib import Path from dotenv import load_dotenv from browser_use import BrowserProfile @@ -6,6 +7,27 @@ from browser_use import BrowserProfile # Load environment variables load_dotenv(override=True) +def safe_json_read(file_path: Path) -> dict: + """Safely read JSON file with proper encoding handling.""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + return json.load(f) + except (UnicodeDecodeError, json.JSONDecodeError): + # Try with different encodings + for encoding in ['utf-8-sig', 'latin1', 'cp1252']: + try: + with open(file_path, 'r', encoding=encoding) as f: + return json.load(f) + except (UnicodeDecodeError, json.JSONDecodeError): + continue + return {} + +def safe_json_write(file_path: Path, data: dict): + """Safely write JSON file with proper encoding handling.""" + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=4) + + def setup_proxy(): """Configure proxy settings from environment variables.""" proxy_host = os.getenv("PROXY_HOST") @@ -31,14 +53,27 @@ async def setup_storage_state(): print(f"📂 Temp storage state path: {storage_state_temp_path}") if storage_state_path.exists(): - if storage_state_temp_path.exists(): - storage_state_temp_path.unlink() + try: + if storage_state_temp_path.exists(): + storage_state_temp_path.unlink() - storage_state_temp_path.write_text( - storage_state_path.read_text(encoding="utf-8"), encoding="utf-8" - ) - print(f"🔄 Using existing storage state: {storage_state_temp_path}") - return str(storage_state_temp_path) + # 안전한 JSON 파일 처리 (인코딩 문제 해결) + storage_data = safe_json_read(storage_state_path) + + if storage_data: # 데이터가 성공적으로 읽혔다면 + safe_json_write(storage_state_temp_path, storage_data) + print(f"🔄 Using existing storage state: {storage_state_temp_path}") + return str(storage_state_temp_path) + else: + print("⚠️ Storage state file is empty or corrupted") + return None + + except Exception as e: + print(f"⚠️ Error processing storage state: {e}") + # 문제가 있는 파일을 제거하고 새로 시작 + if storage_state_temp_path.exists(): + storage_state_temp_path.unlink() + return None print("⚠️ No existing storage state found") return None @@ -73,3 +108,19 @@ def get_browser_args(): # Language f"--lang={os.getenv('LANG', 'en_US')}", ] + +def cleanup_corrupted_storage_files(): + """Clean up corrupted storage state files.""" + script_dir = Path(__file__).parent.parent.parent.parent + storage_state_temp_path = script_dir / "data" / "storage_state_temp.json" + + if storage_state_temp_path.exists(): + try: + # Try to read the file to check if it's corrupted + with open(storage_state_temp_path, 'r', encoding='utf-8') as f: + json.load(f) + print(f"✅ Storage temp file is valid: {storage_state_temp_path}") + except (UnicodeDecodeError, json.JSONDecodeError) as e: + print(f"🗑️ Removing corrupted storage temp file: {e}") + storage_state_temp_path.unlink() + diff --git a/lib/utils/browser_use/model.py b/lib/utils/browser_use/model.py index e4397be..4d1078b 100644 --- a/lib/utils/browser_use/model.py +++ b/lib/utils/browser_use/model.py @@ -4,8 +4,12 @@ from pydantic import BaseModel # 출력 모델 class OAuth(BaseModel): provider: str - oauth_uri: str + oauth_uri: str = "" # OAuth 리스트 추출 단계에서는 URI가 없을 수 있음 class OAuthList(BaseModel): - oauth_providers: List[OAuth] \ No newline at end of file + oauth_providers: List[OAuth] + + +# 기존 모델 유지 (backward compatibility) +BaseModel = OAuthList \ No newline at end of file diff --git a/main.py b/main.py index 70f864e..cccb656 100644 --- a/main.py +++ b/main.py @@ -26,7 +26,7 @@ from lib.utils.browser_use.sensitive_data import GetSensitiveData from lib.utils.config import BACKEND_URL, GOOGLE_MODEL, GOOGLE_PLANNER_MODEL from lib.utils.is_html import is_html_url from lib.utils.read_txt import read_lines_between -from lib.llm.prompt import extend_planner_system_message +from lib.llm.prompt import get_prompt from lib.utils.logger import logger import lib.utils.browser_use as browser_use from lib.llm import CreateChatGoogleGenerativeAI @@ -89,52 +89,46 @@ def signal_handler(signum, frame): signal.signal(signal.SIGINT, signal_handler) -# ── URL별로 Browser를 새로 띄우는 함수 ── -async def scan_one_url(url: str, skip_html_check: bool = False): +# ── OAuth 리스트 추출 Agent ── +async def extract_oauth_list(url: str, skip_html_check: bool = False): + """첫 번째 Agent: 로그인 페이지를 찾고 OAuth 리스트만 추출""" await setup_storage_state() target_url = url if url.startswith("http") else f"https://{url}" - print(f"🚀 Starting scan for: {target_url}") + print(f"� OAuth 리스트 추출 시작: {target_url}") # 1) URL이 HTML 페이지인지 확인 if not is_html_url(target_url) and not skip_html_check: print(f"❌ {target_url} 은(는) HTML이 아닙니다. 스킵합니다.") - return - - # Backend에 스캔 시작을 알림 - notify_backend(target_url) + return [] agent = None session = None try_cnt = 0 + while True: - # BrowserSession에 profile 전달 session = BrowserSession( playwright=(await async_patchright().start()), browser_profile=await browser_use.GetProfile(), ) - # Agent 생성 및 실행 (단일 try-except with 백오프) initial_actions = [{"open_tab": {"url": target_url}}] controller = Controller( - output_model=model.BaseModel, + output_model=model.OAuthList, exclude_actions=["search_google", "unknown_action", "unkown"], ) - print("🤖 LLM 모델 초기화 및 스캔 시작...") - print("Available actions:", list(controller.registry.registry.actions.keys())) + print("🤖 OAuth 리스트 추출 Agent 초기화...") + try: agent = Agent( browser_session=session, initial_actions=initial_actions, sensitive_data=GetSensitiveData(), task=( - "Navigate to the login page, identify all OAuth provider buttons (excluding Passkey), " - "and for each one: click the button, follow the full OAuth login flow as far as possible " - "with a real user account (without using a fake or non-existent account), and capture the " - "final redirect URL after login. Do not stop at just collecting the initial authorization URL—" - "actually perform the login step like a real user would. " - "If the OAuth buttons do not appear immediately, wait briefly to allow the page to load completely before proceeding. " - "Always log out before starting the login process, and make sure to attempt the login again from a clean state." + "Navigate to the login page and identify all OAuth provider buttons (excluding Passkey). " + "DO NOT click any OAuth buttons or attempt to login. " + "Just find and list all available OAuth providers with their button texts or provider names. " + "Return a list of OAuth providers found on the login page." ), llm=CreateChatGoogleGenerativeAI(GOOGLE_MODEL), planner_llm=( @@ -143,13 +137,21 @@ async def scan_one_url(url: str, skip_html_check: bool = False): else None ), controller=controller, - extend_planner_system_message=extend_planner_system_message, + extend_planner_system_message=get_prompt("auth"), ) + response = await agent.run() final_result = response.final_result() if final_result is None: - raise ValueError("final_result()가 None을 반환했습니다.") + raise ValueError("OAuth 리스트 추출 결과가 None입니다.") + + data = json.loads(final_result) + oauth_entries = [model.OAuth(**entry) for entry in data["oauth_providers"]] + + await clean_resources(agent, session) + return oauth_entries + except Exception as e: await clean_resources(agent, session) # API 쿼터 문제인지 확인 @@ -159,52 +161,149 @@ async def scan_one_url(url: str, skip_html_check: bool = False): await asyncio.sleep(wait) try_cnt += 1 if try_cnt >= 3: - print(f"❌ {url} 스캔 실패: API 쿼터 문제가 지속됩니다.") - logger(f"❌ {url} 스캔 실패: API 쿼터 문제: {e}") - return + print(f"❌ {url} OAuth 리스트 추출 실패: API 쿼터 문제가 지속됩니다.") + logger(f"❌ {url} OAuth 리스트 추출 실패: API 쿼터 문제: {e}") + return [] continue # 일반 에러 처리 try_cnt += 1 if try_cnt >= 3: - print(f"❌ {url} 스캔 실패: 에러: {e}") - logger(f"❌ {url} 스캔 실패: 에러: {e}") - return + print(f"❌ {url} OAuth 리스트 추출 실패: 에러: {e}") + logger(f"❌ {url} OAuth 리스트 추출 실패: 에러: {e}") + return [] print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...") await asyncio.sleep(30) continue - # 스캔 결과 처리 - data = json.loads(final_result) + +# ── 개별 OAuth 로그인 Agent ── +async def test_oauth_login(url: str, oauth_provider: str): + """두 번째 Agent: 특정 OAuth 제공자로 로그인 시도""" + await setup_storage_state() + target_url = url if url.startswith("http") else f"https://{url}" + print(f"🔐 {oauth_provider} 로그인 시작: {target_url}") + + agent = None + session = None + try_cnt = 0 + + while True: + session = BrowserSession( + playwright=(await async_patchright().start()), + browser_profile=await browser_use.GetProfile(), + ) + + initial_actions = [{"open_tab": {"url": target_url}}] + controller = Controller( + exclude_actions=["search_google", "unknown_action", "unkown"], + ) + + print(f"🤖 {oauth_provider} 로그인 Agent 초기화...") + try: - oauth_entries = [model.OAuth(**entry) for entry in data["oauth_providers"]] + agent = Agent( + browser_session=session, + initial_actions=initial_actions, + sensitive_data=GetSensitiveData(), + task=( + f"Navigate to the login page, find and click the {oauth_provider} OAuth button, " + f"then follow the complete OAuth login flow as far as possible with a real user account. " + f"Capture the final redirect URL after login completion. " + f"If login fails or encounters errors, report the issue. " + f"Focus only on {oauth_provider} - ignore other OAuth providers." + ), + llm=CreateChatGoogleGenerativeAI(GOOGLE_MODEL), + planner_llm=( + CreateChatGoogleGenerativeAI(GOOGLE_PLANNER_MODEL) + if GOOGLE_PLANNER_MODEL + else None + ), + controller=controller, + extend_planner_system_message=get_prompt(oauth_provider), + ) + + response = await agent.run() + final_result = response.final_result() + + print(f"✅ {oauth_provider} 로그인 완료") + if final_result: + logger(f"✅ {url} - {oauth_provider} 로그인 결과: {final_result}") + + await clean_resources(agent, session) + return True + except Exception as e: - raise ValueError(f"결과 파싱 실패: {e}\n원본 결과: {final_result}") + await clean_resources(agent, session) + # API 쿼터 문제인지 확인 + if "ResourceExhausted" in str(e) or "429" in str(e): + wait = min(INITIAL_BACKOFF * (2**try_cnt), MAX_BACKOFF) + print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...") + await asyncio.sleep(wait) + try_cnt += 1 + if try_cnt >= 3: + print(f"❌ {oauth_provider} 로그인 실패: API 쿼터 문제가 지속됩니다.") + logger(f"❌ {url} - {oauth_provider} 로그인 실패: API 쿼터 문제: {e}") + return False + continue + # 일반 에러 처리 + try_cnt += 1 + if try_cnt >= 3: + print(f"❌ {oauth_provider} 로그인 실패: 에러: {e}") + logger(f"❌ {url} - {oauth_provider} 로그인 실패: 에러: {e}") + return False + print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...") + await asyncio.sleep(30) + continue - print("-" * 50) - print(f"🔗 Scanned URL: {url}\n") - print("🔐 Detected OAuth Providers and URLs:") + +# ── 통합 스캔 함수 ── +async def scan_one_url(url: str, skip_html_check: bool = False): + """URL 스캔 통합 함수: OAuth 리스트 추출 → 개별 OAuth 로그인 시도""" + target_url = url if url.startswith("http") else f"https://{url}" + print(f"🚀 스캔 시작: {target_url}") + + # Backend에 스캔 시작을 알림 + notify_backend(target_url) + + # 1단계: OAuth 리스트 추출 + oauth_entries = await extract_oauth_list(url, skip_html_check) + + if not oauth_entries: + print(f"❌ {target_url}에서 OAuth 제공자를 찾을 수 없습니다.") + return + + print("-" * 50) + print(f"🔗 스캔 URL: {url}") + print(f"🔐 발견된 OAuth 제공자들: {len(oauth_entries)}개") + for entry in oauth_entries: + print(f" - {entry.provider}") + print("-" * 50) + + # CSV에 OAuth 리스트 저장 + csv_file = "./data/oauth_providers.csv" + file_exists = os.path.isfile(csv_file) + with open(csv_file, "a", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + if not file_exists: + writer.writerow(["issuer", "provider", "oauth_uri", "login_tested"]) for entry in oauth_entries: - if "<" in entry.oauth_uri or "..." in entry.oauth_uri: - print( - f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n" - ) - else: - print(f"- {entry.provider}: {entry.oauth_uri}") - print("-" * 50) - - # CSV에 저장 (append) - csv_file = "./data/oauth_providers.csv" - file_exists = os.path.isfile(csv_file) - with open(csv_file, "a", newline="", encoding="utf-8") as f: - writer = csv.writer(f) - if not file_exists: - writer.writerow(["issuer", "provider", "oauth_uri"]) - for entry in oauth_entries: - writer.writerow([url, entry.provider, entry.oauth_uri]) - await clean_resources(agent, session) - break - + writer.writerow([url, entry.provider, entry.oauth_uri, "pending"]) + # 2단계: 각 OAuth 제공자별로 개별 로그인 시도 + for i, oauth_entry in enumerate(oauth_entries): + print(f"\n🔄 OAuth 로그인 테스트 {i+1}/{len(oauth_entries)}: {oauth_entry.provider}") + + # OAuth 간 대기 시간 + if i > 0: + print("⏳ OAuth 테스트 간 대기 중 (30초)...") + await asyncio.sleep(30) + + # 개별 OAuth 로그인 시도 + success = await test_oauth_login(url, oauth_entry.provider) + + # 결과를 CSV에 업데이트 (간단하게 로그만 남김) + status = "success" if success else "failed" + print(f"📝 {oauth_entry.provider} 로그인 결과: {status}") async def loop( filepath: str, start_line: int, end_line: int, skip_html_check: bool = False ):