diff --git a/README.md b/README.md
index 9128804..c9ade30 100644
--- a/README.md
+++ b/README.md
@@ -96,8 +96,8 @@ curl "https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt" -o domains.txt
```
```sh
-# uv run run.py {domains.txt 시작 줄} {domains.txt 끝 줄} {HTML 검사 Skip}
-uv run run.py 12540 13000 False
+# uv run run.py {domains.txt 시작 줄} {domains.txt 끝 줄} {--skh}
+uv run run.py 1 100 --skh
```
# 참고하면 좋을만한 것
diff --git a/lib/llm/prompt/__init__.py b/lib/llm/prompt/__init__.py
index 7f1c44c..314b2dd 100644
--- a/lib/llm/prompt/__init__.py
+++ b/lib/llm/prompt/__init__.py
@@ -3,140 +3,16 @@ from dotenv import load_dotenv
load_dotenv(override=True)
-# Extended planner prompt
-extend_planner_system_message = f"""
-🎯 목적: 웹 자동화를 위한 **SSO 로그인 리디렉션 URL 수집**
-
-📌 주의사항 (전제 조건)
-- ❌ **검색 엔진(Google, Bing 등) 사용 금지**
-- ✅ **초기 제공된 URL 내에서만 탐색**
-- ❌ 직접 이동하거나 추측한 링크 클릭 금지
-- ⛔ 추측한 URL은 대답하거나 클릭하지 마세요
-- OAuth가 아닌 일반 로그인은 무시
-- OAuth가 없다면 **즉시 중단**하고 빈 배열 반환
-
----
-
-## 🧩 Step 0: 페이지 차단(Block) 여부 확인
-
-초기 URL의 로그인 페이지에 접근하여 다음 사항을 점검합니다:
-
-- 🚫 페이지 차단됨 (Firewall, Access Denied 등) → 즉시 중단
-- 🔒 CAPTCHA는 통과 가능 (해결하고 계속 진행)
-- ❗ 로그인 UI가 정상적으로 로드되지 않으면 중단
-
-📤 차단 시 즉시 반환:
-
-```json
-[
- {{
- "provider": "Blocked",
- "oauth_uri": "-"
- }}
-]
-````
-
----
-
-## 🔍 Step 1: 로그인 페이지 탐색
-
-* 초기 URL에 접속하여 **클라이언트용 로그인 페이지**로 진입합니다.
-* 쿠키 동의, 개인정보 안내 등 팝업은 무시하거나 닫고 계속 진행하세요.
-* 페이지가 정상 로드되었다고 가정합니다.
-
----
-
-## 👀 Step 2: SSO 로그인 버튼 식별
-
-아래 **OAuth SSO 버튼들만** 유효합니다:
-
-* ✅ Google, GitHub, Facebook, LinkedIn, Microsoft, Naver
-
-**유효한 버튼 기준**:
-
-* OAuth 인증 흐름을 실제로 트리거
-* `window.location` 또는 `` 또는 JS로 redirect가 발생
-
-**제외 버튼들 (클릭 금지)**:
-
-* ❌ 일반 로그인, 패스키, 이메일/전화번호, 인증서 기반, 비밀번호 입력
-
----
-
-## ✅ Step 3: 모든 SSO 버튼 클릭 및 로그인 시도
-
-> 각 SSO 로그인 버튼을 클릭한 뒤 반드시 아래 절차를 **완전히 수행**해야 합니다.
-
-각 SSO 버튼에 대해 다음을 수행:
-
-1. 버튼 클릭
-2. 🌐 페이지가 이동되면, **현재 주소창(URL)을 확인하여 리디렉션된 OAuth URL**을 `oauth_uri`로 저장
- → 예: `https://accounts.google.com/o/oauth2/auth?...`
-3. ✅ 로그인 진행:
- - 로그인 페이지에서 OAuth 인증을 완료합니다.
- - sign in with your username(email) x_username and password is x_password
- - 버튼같은게 안눌리면 새로고침을 해봐
- - **로그인 완료 후 authorize 등 버튼이 있으면 클릭**
- - GitHub같은 경우 Authorize 버튼이 뜨는데 오래걸릴 수 있음, 기다려야 할 수도 있음
- - 만약 버튼을 눌러도 반응이 없을 경우 새로고침을 한번 해주세요.
- - 로그인 실패 시에는 다음 SSO 버튼을 클릭합니다.
-4. 로그인이 성공하면 모두 쿠키를 삭제하고 다음 SSO 버튼을 클릭합니다.
-5. 다음 SSO 버튼으로 반복 진행
-
-쿠키 삭제 방법:
-chrome://settings/clearBrowserData에 들어가서 삭제해주세요.
-
-🛑 절대 아래와 같이 해석하지 말 것:
-- ❌ 버튼 클릭 후 페이지 로딩만 기다리고 돌아가기
-- ❌ URL 저장 없이 go_back() 호출
-
-📤 각 로그인 후 다음 형식으로 결과 저장:
-
-```json
-[
- {{
- "provider": "Google",
- "oauth_uri": "https://example.com/auth/google?client_id=..."
- }}
-]
-````
-
-````
-
----
-
-### ✨ 추가 안전 장치: "뒤로가기(go_back) 호출 조건" 제한
-
-```text
-🛑 뒤로가기(go_back)은 다음 조건이 모두 충족될 때만 사용 => 다만 로그인 실패 시, 뒤로가기 수행:
-- ✅ 로그인 흐름이 완료됨 (예: redirect back to app, or callback URL)
-- ✅ 현재 리디렉션 URL이 수집됨
-- ✅ 결과에 저장 후 다음 버튼 탐색을 위해 복귀 필요할 때
-```
-
----
-
-## 🚫 Step 4: 버튼 없음 또는 예외 발생 시
-
-* 유효한 SSO 버튼이 **전혀 없을 경우**
-* 예외, 오류 등 발생 시
-
-📤 즉시 중단 후 다음 형식으로 반환:
-
-```json
-[]
-```
-
----
-
-## 📎 중요 규칙 요약
-
-* ✅ **모든 SSO 로그인은 반드시 실행** (가능한 버튼은 모두 클릭)
-* 🔁 단계는 반드시 순서대로 진행
-* 🔐 로그인은 쿠키/세션으로 유지된 상태에서 수행
-* 🚫 직접 ID/PW 입력하지 않음
-* ⛔ 추측 URL 클릭 금지
-* ❗ 예외 발생 시 반드시 규정된 JSON 포맷만 반환
-
----
-"""
\ No newline at end of file
+def get_prompt(type:str) -> str:
+ """
+ Prompt를 반환합니다.
+
+ :param type: 'extend_planner' 또는 'oauth_login'
+ :return: 해당하는 프롬프트 문자열
+ """
+ if type == "auth":
+ from lib.llm.prompt.auth_list import extract_oauth_list_prompt
+ return extract_oauth_list_prompt
+ else:
+ from lib.llm.prompt.fallback import extend_planner_system_message
+ return extend_planner_system_message
\ No newline at end of file
diff --git a/lib/llm/prompt/auth_list.py b/lib/llm/prompt/auth_list.py
new file mode 100644
index 0000000..193c472
--- /dev/null
+++ b/lib/llm/prompt/auth_list.py
@@ -0,0 +1,58 @@
+# OAuth 리스트 추출용 프롬프트 (클릭하지 않고 단순 식별만)
+extract_oauth_list_prompt = f"""
+🎯 목적: 로그인 페이지에서 **OAuth 제공자 리스트 추출**
+
+📌 주요 규칙:
+- ❌ **OAuth 버튼을 클릭하지 마세요**
+- ✅ **OAuth 제공자만 식별하고 리스트 작성**
+- ❌ 일반 로그인은 무시
+- ❌ 검색 엔진 사용 금지
+
+---
+
+## 🔍 Step 1: 로그인 페이지 접근
+
+* 초기 URL에 접속하여 **클라이언트용 로그인 페이지**로 진입합니다.
+* 쿠키 동의, 팝업 등은 무시하거나 닫고 계속 진행하세요.
+
+---
+
+## 👀 Step 2: OAuth 제공자 식별
+
+아래 **OAuth SSO 버튼들만** 식별합니다:
+
+**유효한 OAuth 제공자들**:
+* ✅ Google, GitHub, Facebook, LinkedIn, Microsoft, Naver, Kakao, Apple, Twitter/X
+* ✅ "Continue with..." 또는 "Sign in with..." 버튼들
+* ✅ OAuth 아이콘이 있는 버튼들
+
+**제외할 항목들**:
+* ❌ 일반 로그인 (이메일/비밀번호 입력)
+* ❌ 패스키 (Passkey)
+* ❌ 전화번호 인증
+* ❌ 인증서 기반 로그인
+
+---
+
+## 📝 Step 3: 결과 반환
+
+발견된 OAuth 제공자들을 다음 형식으로 반환:
+
+```json
+{{
+ "oauth_providers": [
+ {{
+ "provider": "Google",
+ "oauth_uri": ""
+ }},
+ {{
+ "provider": "GitHub",
+ "oauth_uri": ""
+ }}
+ ]
+}}
+```
+
+⚠️ **중요**: 버튼을 클릭하지 마세요. 단순히 식별만 하면 됩니다.
+"""
+
diff --git a/lib/llm/prompt/fallback.py b/lib/llm/prompt/fallback.py
new file mode 100644
index 0000000..91dd959
--- /dev/null
+++ b/lib/llm/prompt/fallback.py
@@ -0,0 +1,135 @@
+# Extended planner prompt
+extend_planner_system_message = f"""
+🎯 목적: 웹 자동화를 위한 **SSO 로그인 리디렉션 URL 수집**
+
+📌 주의사항 (전제 조건)
+- ❌ **검색 엔진(Google, Bing 등) 사용 금지**
+- ✅ **초기 제공된 URL 내에서만 탐색**
+- ❌ 직접 이동하거나 추측한 링크 클릭 금지
+- ⛔ 추측한 URL은 대답하거나 클릭하지 마세요
+- OAuth가 아닌 일반 로그인은 무시
+- OAuth가 없다면 **즉시 중단**하고 빈 배열 반환
+
+---
+
+## 🧩 Step 0: 페이지 차단(Block) 여부 확인
+
+초기 URL의 로그인 페이지에 접근하여 다음 사항을 점검합니다:
+
+- 🚫 페이지 차단됨 (Firewall, Access Denied 등) → 즉시 중단
+- 🔒 CAPTCHA는 통과 가능 (해결하고 계속 진행)
+- ❗ 로그인 UI가 정상적으로 로드되지 않으면 중단
+
+📤 차단 시 즉시 반환:
+
+```json
+[
+ {{
+ "provider": "Blocked",
+ "oauth_uri": "-"
+ }}
+]
+````
+
+---
+
+## 🔍 Step 1: 로그인 페이지 탐색
+
+* 초기 URL에 접속하여 **클라이언트용 로그인 페이지**로 진입합니다.
+* 쿠키 동의, 개인정보 안내 등 팝업은 무시하거나 닫고 계속 진행하세요.
+* 페이지가 정상 로드되었다고 가정합니다.
+
+---
+
+## 👀 Step 2: SSO 로그인 버튼 식별
+
+아래 **OAuth SSO 버튼들만** 유효합니다:
+
+* ✅ Google, GitHub, Facebook, LinkedIn, Microsoft, Naver
+
+**유효한 버튼 기준**:
+
+* OAuth 인증 흐름을 실제로 트리거
+* `window.location` 또는 `` 또는 JS로 redirect가 발생
+
+**제외 버튼들 (클릭 금지)**:
+
+* ❌ 일반 로그인, 패스키, 이메일/전화번호, 인증서 기반, 비밀번호 입력
+
+---
+
+## ✅ Step 3: 모든 SSO 버튼 클릭 및 로그인 시도
+
+> 각 SSO 로그인 버튼을 클릭한 뒤 반드시 아래 절차를 **완전히 수행**해야 합니다.
+
+각 SSO 버튼에 대해 다음을 수행:
+
+1. 버튼 클릭
+2. 🌐 페이지가 이동되면, **현재 주소창(URL)을 확인하여 리디렉션된 OAuth URL**을 `oauth_uri`로 저장
+ → 예: `https://accounts.google.com/o/oauth2/auth?...`
+3. ✅ 로그인 진행:
+ - 로그인 페이지에서 OAuth 인증을 완료합니다.
+ - sign in with your username(email) x_username and password is x_password
+ - 버튼같은게 안눌리면 새로고침을 해봐
+ - **로그인 완료 후 authorize 등 버튼이 있으면 클릭**
+ - GitHub같은 경우 Authorize 버튼이 뜨는데 오래걸릴 수 있음, 기다려야 할 수도 있음
+ - 만약 버튼을 눌러도 반응이 없을 경우 새로고침을 한번 해주세요.
+ - 로그인 실패 시에는 다음 SSO 버튼을 클릭합니다.
+4. 로그인이 성공하면 모두 쿠키를 삭제하고 다음 SSO 버튼을 클릭합니다.
+5. 다음 SSO 버튼으로 반복 진행
+
+쿠키 삭제 방법:
+chrome://settings/clearBrowserData에 들어가서 삭제해주세요.
+
+🛑 절대 아래와 같이 해석하지 말 것:
+- ❌ 버튼 클릭 후 페이지 로딩만 기다리고 돌아가기
+- ❌ URL 저장 없이 go_back() 호출
+
+📤 각 로그인 후 다음 형식으로 결과 저장:
+
+```json
+[
+ {{
+ "provider": "Google",
+ "oauth_uri": "https://example.com/auth/google?client_id=..."
+ }}
+]
+````
+
+---
+
+### ✨ 추가 안전 장치: "뒤로가기(go_back) 호출 조건" 제한
+
+```text
+🛑 뒤로가기(go_back)은 다음 조건이 모두 충족될 때만 사용 => 다만 로그인 실패 시, 뒤로가기 수행:
+- ✅ 로그인 흐름이 완료됨 (예: redirect back to app, or callback URL)
+- ✅ 현재 리디렉션 URL이 수집됨
+- ✅ 결과에 저장 후 다음 버튼 탐색을 위해 복귀 필요할 때
+```
+
+---
+
+## 🚫 Step 4: 버튼 없음 또는 예외 발생 시
+
+* 유효한 SSO 버튼이 **전혀 없을 경우**
+* 예외, 오류 등 발생 시
+
+📤 즉시 중단 후 다음 형식으로 반환:
+
+```json
+[]
+```
+
+---
+
+## 📎 중요 규칙 요약
+
+* ✅ **모든 SSO 로그인은 반드시 실행** (가능한 버튼은 모두 클릭)
+* 🔁 단계는 반드시 순서대로 진행
+* 🔐 로그인은 쿠키/세션으로 유지된 상태에서 수행
+* 🚫 직접 ID/PW 입력하지 않음
+* ⛔ 추측 URL 클릭 금지
+* ❗ 예외 발생 시 반드시 규정된 JSON 포맷만 반환
+
+---
+"""
diff --git a/lib/utils/browser_use/__init__.py b/lib/utils/browser_use/__init__.py
index c38ca03..8b85b50 100644
--- a/lib/utils/browser_use/__init__.py
+++ b/lib/utils/browser_use/__init__.py
@@ -1,11 +1,26 @@
+import os
from lib.utils.browser_use.func import *
# Initialize configuration
proxy_url = setup_proxy()
-# Create browser profile
async def GetProfile():
storage_state_path = await setup_storage_state()
+
+ # Handle potential encoding issues with storage state file
+ try:
+ if storage_state_path and os.path.exists(storage_state_path):
+ # Test if file can be read properly, if not, skip it
+ with open(storage_state_path, 'r', encoding='utf-8') as f:
+ f.read()
+ storage_state = storage_state_path
+ else:
+ print("⚠️ Storage state file not found or inaccessible, proceeding without it.")
+ storage_state = None
+ except (UnicodeDecodeError, FileNotFoundError):
+ # If there's an encoding error, don't use the storage state
+ storage_state = None
+
profile = BrowserProfile(
# Security settings
disable_security=True,
@@ -19,7 +34,7 @@ async def GetProfile():
# Data persistence
user_data_dir=None,
- storage_state=storage_state_path,
+ storage_state=storage_state,
# Network settings
proxy={"server": proxy_url} if proxy_url else None,
@@ -28,4 +43,4 @@ async def GetProfile():
args=get_browser_args(),
)
- return profile
\ No newline at end of file
+ return profile
diff --git a/lib/utils/browser_use/func.py b/lib/utils/browser_use/func.py
index 31eee31..afc6d52 100644
--- a/lib/utils/browser_use/func.py
+++ b/lib/utils/browser_use/func.py
@@ -1,4 +1,5 @@
import os
+import json
from pathlib import Path
from dotenv import load_dotenv
from browser_use import BrowserProfile
@@ -6,6 +7,27 @@ from browser_use import BrowserProfile
# Load environment variables
load_dotenv(override=True)
+def safe_json_read(file_path: Path) -> dict:
+ """Safely read JSON file with proper encoding handling."""
+ try:
+ with open(file_path, 'r', encoding='utf-8') as f:
+ return json.load(f)
+ except (UnicodeDecodeError, json.JSONDecodeError):
+ # Try with different encodings
+ for encoding in ['utf-8-sig', 'latin1', 'cp1252']:
+ try:
+ with open(file_path, 'r', encoding=encoding) as f:
+ return json.load(f)
+ except (UnicodeDecodeError, json.JSONDecodeError):
+ continue
+ return {}
+
+def safe_json_write(file_path: Path, data: dict):
+ """Safely write JSON file with proper encoding handling."""
+ with open(file_path, 'w', encoding='utf-8') as f:
+ json.dump(data, f, ensure_ascii=False, indent=4)
+
+
def setup_proxy():
"""Configure proxy settings from environment variables."""
proxy_host = os.getenv("PROXY_HOST")
@@ -31,14 +53,27 @@ async def setup_storage_state():
print(f"📂 Temp storage state path: {storage_state_temp_path}")
if storage_state_path.exists():
- if storage_state_temp_path.exists():
- storage_state_temp_path.unlink()
+ try:
+ if storage_state_temp_path.exists():
+ storage_state_temp_path.unlink()
- storage_state_temp_path.write_text(
- storage_state_path.read_text(encoding="utf-8"), encoding="utf-8"
- )
- print(f"🔄 Using existing storage state: {storage_state_temp_path}")
- return str(storage_state_temp_path)
+ # 안전한 JSON 파일 처리 (인코딩 문제 해결)
+ storage_data = safe_json_read(storage_state_path)
+
+ if storage_data: # 데이터가 성공적으로 읽혔다면
+ safe_json_write(storage_state_temp_path, storage_data)
+ print(f"🔄 Using existing storage state: {storage_state_temp_path}")
+ return str(storage_state_temp_path)
+ else:
+ print("⚠️ Storage state file is empty or corrupted")
+ return None
+
+ except Exception as e:
+ print(f"⚠️ Error processing storage state: {e}")
+ # 문제가 있는 파일을 제거하고 새로 시작
+ if storage_state_temp_path.exists():
+ storage_state_temp_path.unlink()
+ return None
print("⚠️ No existing storage state found")
return None
@@ -73,3 +108,19 @@ def get_browser_args():
# Language
f"--lang={os.getenv('LANG', 'en_US')}",
]
+
+def cleanup_corrupted_storage_files():
+ """Clean up corrupted storage state files."""
+ script_dir = Path(__file__).parent.parent.parent.parent
+ storage_state_temp_path = script_dir / "data" / "storage_state_temp.json"
+
+ if storage_state_temp_path.exists():
+ try:
+ # Try to read the file to check if it's corrupted
+ with open(storage_state_temp_path, 'r', encoding='utf-8') as f:
+ json.load(f)
+ print(f"✅ Storage temp file is valid: {storage_state_temp_path}")
+ except (UnicodeDecodeError, json.JSONDecodeError) as e:
+ print(f"🗑️ Removing corrupted storage temp file: {e}")
+ storage_state_temp_path.unlink()
+
diff --git a/lib/utils/browser_use/model.py b/lib/utils/browser_use/model.py
index e4397be..4d1078b 100644
--- a/lib/utils/browser_use/model.py
+++ b/lib/utils/browser_use/model.py
@@ -4,8 +4,12 @@ from pydantic import BaseModel
# 출력 모델
class OAuth(BaseModel):
provider: str
- oauth_uri: str
+ oauth_uri: str = "" # OAuth 리스트 추출 단계에서는 URI가 없을 수 있음
class OAuthList(BaseModel):
- oauth_providers: List[OAuth]
\ No newline at end of file
+ oauth_providers: List[OAuth]
+
+
+# 기존 모델 유지 (backward compatibility)
+BaseModel = OAuthList
\ No newline at end of file
diff --git a/main.py b/main.py
index 70f864e..cccb656 100644
--- a/main.py
+++ b/main.py
@@ -26,7 +26,7 @@ from lib.utils.browser_use.sensitive_data import GetSensitiveData
from lib.utils.config import BACKEND_URL, GOOGLE_MODEL, GOOGLE_PLANNER_MODEL
from lib.utils.is_html import is_html_url
from lib.utils.read_txt import read_lines_between
-from lib.llm.prompt import extend_planner_system_message
+from lib.llm.prompt import get_prompt
from lib.utils.logger import logger
import lib.utils.browser_use as browser_use
from lib.llm import CreateChatGoogleGenerativeAI
@@ -89,52 +89,46 @@ def signal_handler(signum, frame):
signal.signal(signal.SIGINT, signal_handler)
-# ── URL별로 Browser를 새로 띄우는 함수 ──
-async def scan_one_url(url: str, skip_html_check: bool = False):
+# ── OAuth 리스트 추출 Agent ──
+async def extract_oauth_list(url: str, skip_html_check: bool = False):
+ """첫 번째 Agent: 로그인 페이지를 찾고 OAuth 리스트만 추출"""
await setup_storage_state()
target_url = url if url.startswith("http") else f"https://{url}"
- print(f"🚀 Starting scan for: {target_url}")
+ print(f"� OAuth 리스트 추출 시작: {target_url}")
# 1) URL이 HTML 페이지인지 확인
if not is_html_url(target_url) and not skip_html_check:
print(f"❌ {target_url} 은(는) HTML이 아닙니다. 스킵합니다.")
- return
-
- # Backend에 스캔 시작을 알림
- notify_backend(target_url)
+ return []
agent = None
session = None
try_cnt = 0
+
while True:
- # BrowserSession에 profile 전달
session = BrowserSession(
playwright=(await async_patchright().start()),
browser_profile=await browser_use.GetProfile(),
)
- # Agent 생성 및 실행 (단일 try-except with 백오프)
initial_actions = [{"open_tab": {"url": target_url}}]
controller = Controller(
- output_model=model.BaseModel,
+ output_model=model.OAuthList,
exclude_actions=["search_google", "unknown_action", "unkown"],
)
- print("🤖 LLM 모델 초기화 및 스캔 시작...")
- print("Available actions:", list(controller.registry.registry.actions.keys()))
+ print("🤖 OAuth 리스트 추출 Agent 초기화...")
+
try:
agent = Agent(
browser_session=session,
initial_actions=initial_actions,
sensitive_data=GetSensitiveData(),
task=(
- "Navigate to the login page, identify all OAuth provider buttons (excluding Passkey), "
- "and for each one: click the button, follow the full OAuth login flow as far as possible "
- "with a real user account (without using a fake or non-existent account), and capture the "
- "final redirect URL after login. Do not stop at just collecting the initial authorization URL—"
- "actually perform the login step like a real user would. "
- "If the OAuth buttons do not appear immediately, wait briefly to allow the page to load completely before proceeding. "
- "Always log out before starting the login process, and make sure to attempt the login again from a clean state."
+ "Navigate to the login page and identify all OAuth provider buttons (excluding Passkey). "
+ "DO NOT click any OAuth buttons or attempt to login. "
+ "Just find and list all available OAuth providers with their button texts or provider names. "
+ "Return a list of OAuth providers found on the login page."
),
llm=CreateChatGoogleGenerativeAI(GOOGLE_MODEL),
planner_llm=(
@@ -143,13 +137,21 @@ async def scan_one_url(url: str, skip_html_check: bool = False):
else None
),
controller=controller,
- extend_planner_system_message=extend_planner_system_message,
+ extend_planner_system_message=get_prompt("auth"),
)
+
response = await agent.run()
final_result = response.final_result()
if final_result is None:
- raise ValueError("final_result()가 None을 반환했습니다.")
+ raise ValueError("OAuth 리스트 추출 결과가 None입니다.")
+
+ data = json.loads(final_result)
+ oauth_entries = [model.OAuth(**entry) for entry in data["oauth_providers"]]
+
+ await clean_resources(agent, session)
+ return oauth_entries
+
except Exception as e:
await clean_resources(agent, session)
# API 쿼터 문제인지 확인
@@ -159,52 +161,149 @@ async def scan_one_url(url: str, skip_html_check: bool = False):
await asyncio.sleep(wait)
try_cnt += 1
if try_cnt >= 3:
- print(f"❌ {url} 스캔 실패: API 쿼터 문제가 지속됩니다.")
- logger(f"❌ {url} 스캔 실패: API 쿼터 문제: {e}")
- return
+ print(f"❌ {url} OAuth 리스트 추출 실패: API 쿼터 문제가 지속됩니다.")
+ logger(f"❌ {url} OAuth 리스트 추출 실패: API 쿼터 문제: {e}")
+ return []
continue
# 일반 에러 처리
try_cnt += 1
if try_cnt >= 3:
- print(f"❌ {url} 스캔 실패: 에러: {e}")
- logger(f"❌ {url} 스캔 실패: 에러: {e}")
- return
+ print(f"❌ {url} OAuth 리스트 추출 실패: 에러: {e}")
+ logger(f"❌ {url} OAuth 리스트 추출 실패: 에러: {e}")
+ return []
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
await asyncio.sleep(30)
continue
- # 스캔 결과 처리
- data = json.loads(final_result)
+
+# ── 개별 OAuth 로그인 Agent ──
+async def test_oauth_login(url: str, oauth_provider: str):
+ """두 번째 Agent: 특정 OAuth 제공자로 로그인 시도"""
+ await setup_storage_state()
+ target_url = url if url.startswith("http") else f"https://{url}"
+ print(f"🔐 {oauth_provider} 로그인 시작: {target_url}")
+
+ agent = None
+ session = None
+ try_cnt = 0
+
+ while True:
+ session = BrowserSession(
+ playwright=(await async_patchright().start()),
+ browser_profile=await browser_use.GetProfile(),
+ )
+
+ initial_actions = [{"open_tab": {"url": target_url}}]
+ controller = Controller(
+ exclude_actions=["search_google", "unknown_action", "unkown"],
+ )
+
+ print(f"🤖 {oauth_provider} 로그인 Agent 초기화...")
+
try:
- oauth_entries = [model.OAuth(**entry) for entry in data["oauth_providers"]]
+ agent = Agent(
+ browser_session=session,
+ initial_actions=initial_actions,
+ sensitive_data=GetSensitiveData(),
+ task=(
+ f"Navigate to the login page, find and click the {oauth_provider} OAuth button, "
+ f"then follow the complete OAuth login flow as far as possible with a real user account. "
+ f"Capture the final redirect URL after login completion. "
+ f"If login fails or encounters errors, report the issue. "
+ f"Focus only on {oauth_provider} - ignore other OAuth providers."
+ ),
+ llm=CreateChatGoogleGenerativeAI(GOOGLE_MODEL),
+ planner_llm=(
+ CreateChatGoogleGenerativeAI(GOOGLE_PLANNER_MODEL)
+ if GOOGLE_PLANNER_MODEL
+ else None
+ ),
+ controller=controller,
+ extend_planner_system_message=get_prompt(oauth_provider),
+ )
+
+ response = await agent.run()
+ final_result = response.final_result()
+
+ print(f"✅ {oauth_provider} 로그인 완료")
+ if final_result:
+ logger(f"✅ {url} - {oauth_provider} 로그인 결과: {final_result}")
+
+ await clean_resources(agent, session)
+ return True
+
except Exception as e:
- raise ValueError(f"결과 파싱 실패: {e}\n원본 결과: {final_result}")
+ await clean_resources(agent, session)
+ # API 쿼터 문제인지 확인
+ if "ResourceExhausted" in str(e) or "429" in str(e):
+ wait = min(INITIAL_BACKOFF * (2**try_cnt), MAX_BACKOFF)
+ print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...")
+ await asyncio.sleep(wait)
+ try_cnt += 1
+ if try_cnt >= 3:
+ print(f"❌ {oauth_provider} 로그인 실패: API 쿼터 문제가 지속됩니다.")
+ logger(f"❌ {url} - {oauth_provider} 로그인 실패: API 쿼터 문제: {e}")
+ return False
+ continue
+ # 일반 에러 처리
+ try_cnt += 1
+ if try_cnt >= 3:
+ print(f"❌ {oauth_provider} 로그인 실패: 에러: {e}")
+ logger(f"❌ {url} - {oauth_provider} 로그인 실패: 에러: {e}")
+ return False
+ print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
+ await asyncio.sleep(30)
+ continue
- print("-" * 50)
- print(f"🔗 Scanned URL: {url}\n")
- print("🔐 Detected OAuth Providers and URLs:")
+
+# ── 통합 스캔 함수 ──
+async def scan_one_url(url: str, skip_html_check: bool = False):
+ """URL 스캔 통합 함수: OAuth 리스트 추출 → 개별 OAuth 로그인 시도"""
+ target_url = url if url.startswith("http") else f"https://{url}"
+ print(f"🚀 스캔 시작: {target_url}")
+
+ # Backend에 스캔 시작을 알림
+ notify_backend(target_url)
+
+ # 1단계: OAuth 리스트 추출
+ oauth_entries = await extract_oauth_list(url, skip_html_check)
+
+ if not oauth_entries:
+ print(f"❌ {target_url}에서 OAuth 제공자를 찾을 수 없습니다.")
+ return
+
+ print("-" * 50)
+ print(f"🔗 스캔 URL: {url}")
+ print(f"🔐 발견된 OAuth 제공자들: {len(oauth_entries)}개")
+ for entry in oauth_entries:
+ print(f" - {entry.provider}")
+ print("-" * 50)
+
+ # CSV에 OAuth 리스트 저장
+ csv_file = "./data/oauth_providers.csv"
+ file_exists = os.path.isfile(csv_file)
+ with open(csv_file, "a", newline="", encoding="utf-8") as f:
+ writer = csv.writer(f)
+ if not file_exists:
+ writer.writerow(["issuer", "provider", "oauth_uri", "login_tested"])
for entry in oauth_entries:
- if "<" in entry.oauth_uri or "..." in entry.oauth_uri:
- print(
- f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n"
- )
- else:
- print(f"- {entry.provider}: {entry.oauth_uri}")
- print("-" * 50)
-
- # CSV에 저장 (append)
- csv_file = "./data/oauth_providers.csv"
- file_exists = os.path.isfile(csv_file)
- with open(csv_file, "a", newline="", encoding="utf-8") as f:
- writer = csv.writer(f)
- if not file_exists:
- writer.writerow(["issuer", "provider", "oauth_uri"])
- for entry in oauth_entries:
- writer.writerow([url, entry.provider, entry.oauth_uri])
- await clean_resources(agent, session)
- break
-
+ writer.writerow([url, entry.provider, entry.oauth_uri, "pending"])
+ # 2단계: 각 OAuth 제공자별로 개별 로그인 시도
+ for i, oauth_entry in enumerate(oauth_entries):
+ print(f"\n🔄 OAuth 로그인 테스트 {i+1}/{len(oauth_entries)}: {oauth_entry.provider}")
+
+ # OAuth 간 대기 시간
+ if i > 0:
+ print("⏳ OAuth 테스트 간 대기 중 (30초)...")
+ await asyncio.sleep(30)
+
+ # 개별 OAuth 로그인 시도
+ success = await test_oauth_login(url, oauth_entry.provider)
+
+ # 결과를 CSV에 업데이트 (간단하게 로그만 남김)
+ status = "success" if success else "failed"
+ print(f"📝 {oauth_entry.provider} 로그인 결과: {status}")
async def loop(
filepath: str, start_line: int, end_line: int, skip_html_check: bool = False
):