feat: OAuth 리스트 추출 및 로그인 기능 개선

- README.md: uv 실행 명령어 수정
- lib/llm/prompt: OAuth 리스트 추출 및 fallback 프롬프트 추가
- lib/utils/browser_use: 프로필 생성 시 스토리지 상태 파일 처리 개선
- lib/utils/browser_use/func: 안전한 JSON 읽기 및 쓰기 함수 추가
- main.py: OAuth 리스트 추출 및 개별 로그인 시도 통합
- model.py: OAuth 모델 수정
This commit is contained in:
암냥 2025-06-23 00:15:03 +09:00
commit 4b3637b762
8 changed files with 444 additions and 206 deletions

View file

@ -96,8 +96,8 @@ curl "https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt" -o domains.txt
``` ```
```sh ```sh
# uv run run.py {domains.txt 시작 줄} {domains.txt 끝 줄} {HTML 검사 Skip} # uv run run.py {domains.txt 시작 줄} {domains.txt 끝 줄} {--skh}
uv run run.py 12540 13000 False uv run run.py 1 100 --skh
``` ```
# 참고하면 좋을만한 것 # 참고하면 좋을만한 것

View file

@ -3,140 +3,16 @@ from dotenv import load_dotenv
load_dotenv(override=True) load_dotenv(override=True)
# Extended planner prompt def get_prompt(type:str) -> str:
extend_planner_system_message = f""" """
🎯 목적: 자동화를 위한 **SSO 로그인 리디렉션 URL 수집** Prompt를 반환합니다.
📌 주의사항 (전제 조건) :param type: 'extend_planner' 또는 'oauth_login'
- **검색 엔진(Google, Bing ) 사용 금지** :return: 해당하는 프롬프트 문자열
- **초기 제공된 URL 내에서만 탐색** """
- 직접 이동하거나 추측한 링크 클릭 금지 if type == "auth":
- 추측한 URL은 대답하거나 클릭하지 마세요 from lib.llm.prompt.auth_list import extract_oauth_list_prompt
- OAuth가 아닌 일반 로그인은 무시 return extract_oauth_list_prompt
- OAuth가 없다면 **즉시 중단**하고 배열 반환 else:
from lib.llm.prompt.fallback import extend_planner_system_message
--- return extend_planner_system_message
## 🧩 Step 0: 페이지 차단(Block) 여부 확인
초기 URL의 로그인 페이지에 접근하여 다음 사항을 점검합니다:
- 🚫 페이지 차단됨 (Firewall, Access Denied ) 즉시 중단
- 🔒 CAPTCHA는 통과 가능 (해결하고 계속 진행)
- 로그인 UI가 정상적으로 로드되지 않으면 중단
📤 차단 즉시 반환:
```json
[
{{
"provider": "Blocked",
"oauth_uri": "-"
}}
]
````
---
## 🔍 Step 1: 로그인 페이지 탐색
* 초기 URL에 접속하여 **클라이언트용 로그인 페이지** 진입합니다.
* 쿠키 동의, 개인정보 안내 팝업은 무시하거나 닫고 계속 진행하세요.
* 페이지가 정상 로드되었다고 가정합니다.
---
## 👀 Step 2: SSO 로그인 버튼 식별
아래 **OAuth SSO 버튼들만** 유효합니다:
* Google, GitHub, Facebook, LinkedIn, Microsoft, Naver
**유효한 버튼 기준**:
* OAuth 인증 흐름을 실제로 트리거
* `window.location` 또는 `<a href=...>` 또는 JS로 redirect가 발생
**제외 버튼들 (클릭 금지)**:
* 일반 로그인, 패스키, 이메일/전화번호, 인증서 기반, 비밀번호 입력
---
## ✅ Step 3: 모든 SSO 버튼 클릭 및 로그인 시도
> SSO 로그인 버튼을 클릭한 반드시 아래 절차를 **완전히 수행**해야 합니다.
SSO 버튼에 대해 다음을 수행:
1. 버튼 클릭
2. 🌐 페이지가 이동되면, **현재 주소창(URL) 확인하여 리디렉션된 OAuth URL** `oauth_uri` 저장
: `https://accounts.google.com/o/oauth2/auth?...`
3. 로그인 진행:
- 로그인 페이지에서 OAuth 인증을 완료합니다.
- sign in with your username(email) x_username and password is x_password
- 버튼같은게 안눌리면 새로고침을 해봐
- **로그인 완료 authorize 버튼이 있으면 클릭**
- GitHub같은 경우 Authorize 버튼이 뜨는데 오래걸릴 있음, 기다려야 수도 있음
- 만약 버튼을 눌러도 반응이 없을 경우 새로고침을 한번 해주세요.
- 로그인 실패 시에는 다음 SSO 버튼을 클릭합니다.
4. 로그인이 성공하면 모두 쿠키를 삭제하고 다음 SSO 버튼을 클릭합니다.
5. 다음 SSO 버튼으로 반복 진행
쿠키 삭제 방법:
chrome://settings/clearBrowserData에 들어가서 삭제해주세요.
🛑 절대 아래와 같이 해석하지 :
- 버튼 클릭 페이지 로딩만 기다리고 돌아가기
- URL 저장 없이 go_back() 호출
📤 로그인 다음 형식으로 결과 저장:
```json
[
{{
"provider": "Google",
"oauth_uri": "https://example.com/auth/google?client_id=..."
}}
]
````
````
---
### ✨ 추가 안전 장치: "뒤로가기(go_back) 호출 조건" 제한
```text
🛑 뒤로가기(go_back) 다음 조건이 모두 충족될 때만 사용 => 다만 로그인 실패 , 뒤로가기 수행:
- 로그인 흐름이 완료됨 (: redirect back to app, or callback URL)
- 현재 리디렉션 URL이 수집됨
- 결과에 저장 다음 버튼 탐색을 위해 복귀 필요할
```
---
## 🚫 Step 4: 버튼 없음 또는 예외 발생 시
* 유효한 SSO 버튼이 **전혀 없을 경우**
* 예외, 오류 발생
📤 즉시 중단 다음 형식으로 반환:
```json
[]
```
---
## 📎 중요 규칙 요약
* **모든 SSO 로그인은 반드시 실행** (가능한 버튼은 모두 클릭)
* 🔁 단계는 반드시 순서대로 진행
* 🔐 로그인은 쿠키/세션으로 유지된 상태에서 수행
* 🚫 직접 ID/PW 입력하지 않음
* 추측 URL 클릭 금지
* 예외 발생 반드시 규정된 JSON 포맷만 반환
---
"""

View file

@ -0,0 +1,58 @@
# OAuth 리스트 추출용 프롬프트 (클릭하지 않고 단순 식별만)
extract_oauth_list_prompt = f"""
🎯 목적: 로그인 페이지에서 **OAuth 제공자 리스트 추출**
📌 주요 규칙:
- **OAuth 버튼을 클릭하지 마세요**
- **OAuth 제공자만 식별하고 리스트 작성**
- 일반 로그인은 무시
- 검색 엔진 사용 금지
---
## 🔍 Step 1: 로그인 페이지 접근
* 초기 URL에 접속하여 **클라이언트용 로그인 페이지** 진입합니다.
* 쿠키 동의, 팝업 등은 무시하거나 닫고 계속 진행하세요.
---
## 👀 Step 2: OAuth 제공자 식별
아래 **OAuth SSO 버튼들만** 식별합니다:
**유효한 OAuth 제공자들**:
* Google, GitHub, Facebook, LinkedIn, Microsoft, Naver, Kakao, Apple, Twitter/X
* "Continue with..." 또는 "Sign in with..." 버튼들
* OAuth 아이콘이 있는 버튼들
**제외할 항목들**:
* 일반 로그인 (이메일/비밀번호 입력)
* 패스키 (Passkey)
* 전화번호 인증
* 인증서 기반 로그인
---
## 📝 Step 3: 결과 반환
발견된 OAuth 제공자들을 다음 형식으로 반환:
```json
{{
"oauth_providers": [
{{
"provider": "Google",
"oauth_uri": ""
}},
{{
"provider": "GitHub",
"oauth_uri": ""
}}
]
}}
```
**중요**: 버튼을 클릭하지 마세요. 단순히 식별만 하면 됩니다.
"""

135
lib/llm/prompt/fallback.py Normal file
View file

@ -0,0 +1,135 @@
# Extended planner prompt
extend_planner_system_message = f"""
🎯 목적: 자동화를 위한 **SSO 로그인 리디렉션 URL 수집**
📌 주의사항 (전제 조건)
- **검색 엔진(Google, Bing ) 사용 금지**
- **초기 제공된 URL 내에서만 탐색**
- 직접 이동하거나 추측한 링크 클릭 금지
- 추측한 URL은 대답하거나 클릭하지 마세요
- OAuth가 아닌 일반 로그인은 무시
- OAuth가 없다면 **즉시 중단**하고 배열 반환
---
## 🧩 Step 0: 페이지 차단(Block) 여부 확인
초기 URL의 로그인 페이지에 접근하여 다음 사항을 점검합니다:
- 🚫 페이지 차단됨 (Firewall, Access Denied ) 즉시 중단
- 🔒 CAPTCHA는 통과 가능 (해결하고 계속 진행)
- 로그인 UI가 정상적으로 로드되지 않으면 중단
📤 차단 즉시 반환:
```json
[
{{
"provider": "Blocked",
"oauth_uri": "-"
}}
]
````
---
## 🔍 Step 1: 로그인 페이지 탐색
* 초기 URL에 접속하여 **클라이언트용 로그인 페이지** 진입합니다.
* 쿠키 동의, 개인정보 안내 팝업은 무시하거나 닫고 계속 진행하세요.
* 페이지가 정상 로드되었다고 가정합니다.
---
## 👀 Step 2: SSO 로그인 버튼 식별
아래 **OAuth SSO 버튼들만** 유효합니다:
* Google, GitHub, Facebook, LinkedIn, Microsoft, Naver
**유효한 버튼 기준**:
* OAuth 인증 흐름을 실제로 트리거
* `window.location` 또는 `<a href=...>` 또는 JS로 redirect가 발생
**제외 버튼들 (클릭 금지)**:
* 일반 로그인, 패스키, 이메일/전화번호, 인증서 기반, 비밀번호 입력
---
## ✅ Step 3: 모든 SSO 버튼 클릭 및 로그인 시도
> SSO 로그인 버튼을 클릭한 반드시 아래 절차를 **완전히 수행**해야 합니다.
SSO 버튼에 대해 다음을 수행:
1. 버튼 클릭
2. 🌐 페이지가 이동되면, **현재 주소창(URL) 확인하여 리디렉션된 OAuth URL** `oauth_uri` 저장
: `https://accounts.google.com/o/oauth2/auth?...`
3. 로그인 진행:
- 로그인 페이지에서 OAuth 인증을 완료합니다.
- sign in with your username(email) x_username and password is x_password
- 버튼같은게 안눌리면 새로고침을 해봐
- **로그인 완료 authorize 버튼이 있으면 클릭**
- GitHub같은 경우 Authorize 버튼이 뜨는데 오래걸릴 있음, 기다려야 수도 있음
- 만약 버튼을 눌러도 반응이 없을 경우 새로고침을 한번 해주세요.
- 로그인 실패 시에는 다음 SSO 버튼을 클릭합니다.
4. 로그인이 성공하면 모두 쿠키를 삭제하고 다음 SSO 버튼을 클릭합니다.
5. 다음 SSO 버튼으로 반복 진행
쿠키 삭제 방법:
chrome://settings/clearBrowserData에 들어가서 삭제해주세요.
🛑 절대 아래와 같이 해석하지 :
- 버튼 클릭 페이지 로딩만 기다리고 돌아가기
- URL 저장 없이 go_back() 호출
📤 로그인 다음 형식으로 결과 저장:
```json
[
{{
"provider": "Google",
"oauth_uri": "https://example.com/auth/google?client_id=..."
}}
]
````
---
### ✨ 추가 안전 장치: "뒤로가기(go_back) 호출 조건" 제한
```text
🛑 뒤로가기(go_back) 다음 조건이 모두 충족될 때만 사용 => 다만 로그인 실패 , 뒤로가기 수행:
- 로그인 흐름이 완료됨 (: redirect back to app, or callback URL)
- 현재 리디렉션 URL이 수집됨
- 결과에 저장 다음 버튼 탐색을 위해 복귀 필요할
```
---
## 🚫 Step 4: 버튼 없음 또는 예외 발생 시
* 유효한 SSO 버튼이 **전혀 없을 경우**
* 예외, 오류 발생
📤 즉시 중단 다음 형식으로 반환:
```json
[]
```
---
## 📎 중요 규칙 요약
* **모든 SSO 로그인은 반드시 실행** (가능한 버튼은 모두 클릭)
* 🔁 단계는 반드시 순서대로 진행
* 🔐 로그인은 쿠키/세션으로 유지된 상태에서 수행
* 🚫 직접 ID/PW 입력하지 않음
* 추측 URL 클릭 금지
* 예외 발생 반드시 규정된 JSON 포맷만 반환
---
"""

View file

@ -1,11 +1,26 @@
import os
from lib.utils.browser_use.func import * from lib.utils.browser_use.func import *
# Initialize configuration # Initialize configuration
proxy_url = setup_proxy() proxy_url = setup_proxy()
# Create browser profile
async def GetProfile(): async def GetProfile():
storage_state_path = await setup_storage_state() storage_state_path = await setup_storage_state()
# Handle potential encoding issues with storage state file
try:
if storage_state_path and os.path.exists(storage_state_path):
# Test if file can be read properly, if not, skip it
with open(storage_state_path, 'r', encoding='utf-8') as f:
f.read()
storage_state = storage_state_path
else:
print("⚠️ Storage state file not found or inaccessible, proceeding without it.")
storage_state = None
except (UnicodeDecodeError, FileNotFoundError):
# If there's an encoding error, don't use the storage state
storage_state = None
profile = BrowserProfile( profile = BrowserProfile(
# Security settings # Security settings
disable_security=True, disable_security=True,
@ -19,7 +34,7 @@ async def GetProfile():
# Data persistence # Data persistence
user_data_dir=None, user_data_dir=None,
storage_state=storage_state_path, storage_state=storage_state,
# Network settings # Network settings
proxy={"server": proxy_url} if proxy_url else None, proxy={"server": proxy_url} if proxy_url else None,
@ -28,4 +43,4 @@ async def GetProfile():
args=get_browser_args(), args=get_browser_args(),
) )
return profile return profile

View file

@ -1,4 +1,5 @@
import os import os
import json
from pathlib import Path from pathlib import Path
from dotenv import load_dotenv from dotenv import load_dotenv
from browser_use import BrowserProfile from browser_use import BrowserProfile
@ -6,6 +7,27 @@ from browser_use import BrowserProfile
# Load environment variables # Load environment variables
load_dotenv(override=True) load_dotenv(override=True)
def safe_json_read(file_path: Path) -> dict:
"""Safely read JSON file with proper encoding handling."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (UnicodeDecodeError, json.JSONDecodeError):
# Try with different encodings
for encoding in ['utf-8-sig', 'latin1', 'cp1252']:
try:
with open(file_path, 'r', encoding=encoding) as f:
return json.load(f)
except (UnicodeDecodeError, json.JSONDecodeError):
continue
return {}
def safe_json_write(file_path: Path, data: dict):
"""Safely write JSON file with proper encoding handling."""
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
def setup_proxy(): def setup_proxy():
"""Configure proxy settings from environment variables.""" """Configure proxy settings from environment variables."""
proxy_host = os.getenv("PROXY_HOST") proxy_host = os.getenv("PROXY_HOST")
@ -31,14 +53,27 @@ async def setup_storage_state():
print(f"📂 Temp storage state path: {storage_state_temp_path}") print(f"📂 Temp storage state path: {storage_state_temp_path}")
if storage_state_path.exists(): if storage_state_path.exists():
if storage_state_temp_path.exists(): try:
storage_state_temp_path.unlink() if storage_state_temp_path.exists():
storage_state_temp_path.unlink()
storage_state_temp_path.write_text( # 안전한 JSON 파일 처리 (인코딩 문제 해결)
storage_state_path.read_text(encoding="utf-8"), encoding="utf-8" storage_data = safe_json_read(storage_state_path)
)
print(f"🔄 Using existing storage state: {storage_state_temp_path}") if storage_data: # 데이터가 성공적으로 읽혔다면
return str(storage_state_temp_path) safe_json_write(storage_state_temp_path, storage_data)
print(f"🔄 Using existing storage state: {storage_state_temp_path}")
return str(storage_state_temp_path)
else:
print("⚠️ Storage state file is empty or corrupted")
return None
except Exception as e:
print(f"⚠️ Error processing storage state: {e}")
# 문제가 있는 파일을 제거하고 새로 시작
if storage_state_temp_path.exists():
storage_state_temp_path.unlink()
return None
print("⚠️ No existing storage state found") print("⚠️ No existing storage state found")
return None return None
@ -73,3 +108,19 @@ def get_browser_args():
# Language # Language
f"--lang={os.getenv('LANG', 'en_US')}", f"--lang={os.getenv('LANG', 'en_US')}",
] ]
def cleanup_corrupted_storage_files():
"""Clean up corrupted storage state files."""
script_dir = Path(__file__).parent.parent.parent.parent
storage_state_temp_path = script_dir / "data" / "storage_state_temp.json"
if storage_state_temp_path.exists():
try:
# Try to read the file to check if it's corrupted
with open(storage_state_temp_path, 'r', encoding='utf-8') as f:
json.load(f)
print(f"✅ Storage temp file is valid: {storage_state_temp_path}")
except (UnicodeDecodeError, json.JSONDecodeError) as e:
print(f"🗑️ Removing corrupted storage temp file: {e}")
storage_state_temp_path.unlink()

View file

@ -4,8 +4,12 @@ from pydantic import BaseModel
# 출력 모델 # 출력 모델
class OAuth(BaseModel): class OAuth(BaseModel):
provider: str provider: str
oauth_uri: str oauth_uri: str = "" # OAuth 리스트 추출 단계에서는 URI가 없을 수 있음
class OAuthList(BaseModel): class OAuthList(BaseModel):
oauth_providers: List[OAuth] oauth_providers: List[OAuth]
# 기존 모델 유지 (backward compatibility)
BaseModel = OAuthList

209
main.py
View file

@ -26,7 +26,7 @@ from lib.utils.browser_use.sensitive_data import GetSensitiveData
from lib.utils.config import BACKEND_URL, GOOGLE_MODEL, GOOGLE_PLANNER_MODEL from lib.utils.config import BACKEND_URL, GOOGLE_MODEL, GOOGLE_PLANNER_MODEL
from lib.utils.is_html import is_html_url from lib.utils.is_html import is_html_url
from lib.utils.read_txt import read_lines_between from lib.utils.read_txt import read_lines_between
from lib.llm.prompt import extend_planner_system_message from lib.llm.prompt import get_prompt
from lib.utils.logger import logger from lib.utils.logger import logger
import lib.utils.browser_use as browser_use import lib.utils.browser_use as browser_use
from lib.llm import CreateChatGoogleGenerativeAI from lib.llm import CreateChatGoogleGenerativeAI
@ -89,52 +89,46 @@ def signal_handler(signum, frame):
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
# ── URL별로 Browser를 새로 띄우는 함수 ── # ── OAuth 리스트 추출 Agent ──
async def scan_one_url(url: str, skip_html_check: bool = False): async def extract_oauth_list(url: str, skip_html_check: bool = False):
"""첫 번째 Agent: 로그인 페이지를 찾고 OAuth 리스트만 추출"""
await setup_storage_state() await setup_storage_state()
target_url = url if url.startswith("http") else f"https://{url}" target_url = url if url.startswith("http") else f"https://{url}"
print(f"🚀 Starting scan for: {target_url}") print(f"<EFBFBD> OAuth 리스트 추출 시작: {target_url}")
# 1) URL이 HTML 페이지인지 확인 # 1) URL이 HTML 페이지인지 확인
if not is_html_url(target_url) and not skip_html_check: if not is_html_url(target_url) and not skip_html_check:
print(f"{target_url} 은(는) HTML이 아닙니다. 스킵합니다.") print(f"{target_url} 은(는) HTML이 아닙니다. 스킵합니다.")
return return []
# Backend에 스캔 시작을 알림
notify_backend(target_url)
agent = None agent = None
session = None session = None
try_cnt = 0 try_cnt = 0
while True: while True:
# BrowserSession에 profile 전달
session = BrowserSession( session = BrowserSession(
playwright=(await async_patchright().start()), playwright=(await async_patchright().start()),
browser_profile=await browser_use.GetProfile(), browser_profile=await browser_use.GetProfile(),
) )
# Agent 생성 및 실행 (단일 try-except with 백오프)
initial_actions = [{"open_tab": {"url": target_url}}] initial_actions = [{"open_tab": {"url": target_url}}]
controller = Controller( controller = Controller(
output_model=model.BaseModel, output_model=model.OAuthList,
exclude_actions=["search_google", "unknown_action", "unkown"], exclude_actions=["search_google", "unknown_action", "unkown"],
) )
print("🤖 LLM 모델 초기화 및 스캔 시작...") print("🤖 OAuth 리스트 추출 Agent 초기화...")
print("Available actions:", list(controller.registry.registry.actions.keys()))
try: try:
agent = Agent( agent = Agent(
browser_session=session, browser_session=session,
initial_actions=initial_actions, initial_actions=initial_actions,
sensitive_data=GetSensitiveData(), sensitive_data=GetSensitiveData(),
task=( task=(
"Navigate to the login page, identify all OAuth provider buttons (excluding Passkey), " "Navigate to the login page and identify all OAuth provider buttons (excluding Passkey). "
"and for each one: click the button, follow the full OAuth login flow as far as possible " "DO NOT click any OAuth buttons or attempt to login. "
"with a real user account (without using a fake or non-existent account), and capture the " "Just find and list all available OAuth providers with their button texts or provider names. "
"final redirect URL after login. Do not stop at just collecting the initial authorization URL—" "Return a list of OAuth providers found on the login page."
"actually perform the login step like a real user would. "
"If the OAuth buttons do not appear immediately, wait briefly to allow the page to load completely before proceeding. "
"Always log out before starting the login process, and make sure to attempt the login again from a clean state."
), ),
llm=CreateChatGoogleGenerativeAI(GOOGLE_MODEL), llm=CreateChatGoogleGenerativeAI(GOOGLE_MODEL),
planner_llm=( planner_llm=(
@ -143,13 +137,21 @@ async def scan_one_url(url: str, skip_html_check: bool = False):
else None else None
), ),
controller=controller, controller=controller,
extend_planner_system_message=extend_planner_system_message, extend_planner_system_message=get_prompt("auth"),
) )
response = await agent.run() response = await agent.run()
final_result = response.final_result() final_result = response.final_result()
if final_result is None: if final_result is None:
raise ValueError("final_result()가 None을 반환했습니다.") raise ValueError("OAuth 리스트 추출 결과가 None입니다.")
data = json.loads(final_result)
oauth_entries = [model.OAuth(**entry) for entry in data["oauth_providers"]]
await clean_resources(agent, session)
return oauth_entries
except Exception as e: except Exception as e:
await clean_resources(agent, session) await clean_resources(agent, session)
# API 쿼터 문제인지 확인 # API 쿼터 문제인지 확인
@ -159,52 +161,149 @@ async def scan_one_url(url: str, skip_html_check: bool = False):
await asyncio.sleep(wait) await asyncio.sleep(wait)
try_cnt += 1 try_cnt += 1
if try_cnt >= 3: if try_cnt >= 3:
print(f"{url} 스캔 실패: API 쿼터 문제가 지속됩니다.") print(f"{url} OAuth 리스트 추출 실패: API 쿼터 문제가 지속됩니다.")
logger(f"{url} 스캔 실패: API 쿼터 문제: {e}") logger(f"{url} OAuth 리스트 추출 실패: API 쿼터 문제: {e}")
return return []
continue continue
# 일반 에러 처리 # 일반 에러 처리
try_cnt += 1 try_cnt += 1
if try_cnt >= 3: if try_cnt >= 3:
print(f"{url} 스캔 실패: 에러: {e}") print(f"{url} OAuth 리스트 추출 실패: 에러: {e}")
logger(f"{url} 스캔 실패: 에러: {e}") logger(f"{url} OAuth 리스트 추출 실패: 에러: {e}")
return return []
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...") print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
await asyncio.sleep(30) await asyncio.sleep(30)
continue continue
# 스캔 결과 처리
data = json.loads(final_result) # ── 개별 OAuth 로그인 Agent ──
async def test_oauth_login(url: str, oauth_provider: str):
"""두 번째 Agent: 특정 OAuth 제공자로 로그인 시도"""
await setup_storage_state()
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🔐 {oauth_provider} 로그인 시작: {target_url}")
agent = None
session = None
try_cnt = 0
while True:
session = BrowserSession(
playwright=(await async_patchright().start()),
browser_profile=await browser_use.GetProfile(),
)
initial_actions = [{"open_tab": {"url": target_url}}]
controller = Controller(
exclude_actions=["search_google", "unknown_action", "unkown"],
)
print(f"🤖 {oauth_provider} 로그인 Agent 초기화...")
try: try:
oauth_entries = [model.OAuth(**entry) for entry in data["oauth_providers"]] agent = Agent(
browser_session=session,
initial_actions=initial_actions,
sensitive_data=GetSensitiveData(),
task=(
f"Navigate to the login page, find and click the {oauth_provider} OAuth button, "
f"then follow the complete OAuth login flow as far as possible with a real user account. "
f"Capture the final redirect URL after login completion. "
f"If login fails or encounters errors, report the issue. "
f"Focus only on {oauth_provider} - ignore other OAuth providers."
),
llm=CreateChatGoogleGenerativeAI(GOOGLE_MODEL),
planner_llm=(
CreateChatGoogleGenerativeAI(GOOGLE_PLANNER_MODEL)
if GOOGLE_PLANNER_MODEL
else None
),
controller=controller,
extend_planner_system_message=get_prompt(oauth_provider),
)
response = await agent.run()
final_result = response.final_result()
print(f"{oauth_provider} 로그인 완료")
if final_result:
logger(f"{url} - {oauth_provider} 로그인 결과: {final_result}")
await clean_resources(agent, session)
return True
except Exception as e: except Exception as e:
raise ValueError(f"결과 파싱 실패: {e}\n원본 결과: {final_result}") await clean_resources(agent, session)
# API 쿼터 문제인지 확인
if "ResourceExhausted" in str(e) or "429" in str(e):
wait = min(INITIAL_BACKOFF * (2**try_cnt), MAX_BACKOFF)
print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...")
await asyncio.sleep(wait)
try_cnt += 1
if try_cnt >= 3:
print(f"{oauth_provider} 로그인 실패: API 쿼터 문제가 지속됩니다.")
logger(f"{url} - {oauth_provider} 로그인 실패: API 쿼터 문제: {e}")
return False
continue
# 일반 에러 처리
try_cnt += 1
if try_cnt >= 3:
print(f"{oauth_provider} 로그인 실패: 에러: {e}")
logger(f"{url} - {oauth_provider} 로그인 실패: 에러: {e}")
return False
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
await asyncio.sleep(30)
continue
print("-" * 50)
print(f"🔗 Scanned URL: {url}\n") # ── 통합 스캔 함수 ──
print("🔐 Detected OAuth Providers and URLs:") async def scan_one_url(url: str, skip_html_check: bool = False):
"""URL 스캔 통합 함수: OAuth 리스트 추출 → 개별 OAuth 로그인 시도"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🚀 스캔 시작: {target_url}")
# Backend에 스캔 시작을 알림
notify_backend(target_url)
# 1단계: OAuth 리스트 추출
oauth_entries = await extract_oauth_list(url, skip_html_check)
if not oauth_entries:
print(f"{target_url}에서 OAuth 제공자를 찾을 수 없습니다.")
return
print("-" * 50)
print(f"🔗 스캔 URL: {url}")
print(f"🔐 발견된 OAuth 제공자들: {len(oauth_entries)}")
for entry in oauth_entries:
print(f" - {entry.provider}")
print("-" * 50)
# CSV에 OAuth 리스트 저장
csv_file = "./data/oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri", "login_tested"])
for entry in oauth_entries: for entry in oauth_entries:
if "<" in entry.oauth_uri or "..." in entry.oauth_uri: writer.writerow([url, entry.provider, entry.oauth_uri, "pending"])
print(
f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n"
)
else:
print(f"- {entry.provider}: {entry.oauth_uri}")
print("-" * 50)
# CSV에 저장 (append)
csv_file = "./data/oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri"])
for entry in oauth_entries:
writer.writerow([url, entry.provider, entry.oauth_uri])
await clean_resources(agent, session)
break
# 2단계: 각 OAuth 제공자별로 개별 로그인 시도
for i, oauth_entry in enumerate(oauth_entries):
print(f"\n🔄 OAuth 로그인 테스트 {i+1}/{len(oauth_entries)}: {oauth_entry.provider}")
# OAuth 간 대기 시간
if i > 0:
print("⏳ OAuth 테스트 간 대기 중 (30초)...")
await asyncio.sleep(30)
# 개별 OAuth 로그인 시도
success = await test_oauth_login(url, oauth_entry.provider)
# 결과를 CSV에 업데이트 (간단하게 로그만 남김)
status = "success" if success else "failed"
print(f"📝 {oauth_entry.provider} 로그인 결과: {status}")
async def loop( async def loop(
filepath: str, start_line: int, end_line: int, skip_html_check: bool = False filepath: str, start_line: int, end_line: int, skip_html_check: bool = False
): ):