Merge pull request #12 from j93es/feat/storage-state

Browser Use Latest && 쿠키 로컬스토리지 추가 등
This commit is contained in:
암냥 2025-06-16 08:22:14 +09:00 committed by GitHub
commit c5ff066c65
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 1543 additions and 369 deletions

View file

@ -1,11 +1,56 @@
ANONYMIZED_TELEMETRY=false
# ========== LLM ==========
GOOGLE_API_KEY=
# 권장 (다른 모델로 교체 가능) [다른 모델로 교체시 성능 보장 불가]
GOOGLE_MODEL=gemini-2.5-flash-preview-05-20
GOOGLE_PLANNER_MODEL=gemini-2.5-flash-preview-05-20
#GOOGLE_PLANNER_MODEL=gemini-2.5-flash-preview-05-20
# min(INITIAL_BACKOFF * (2 ** try_cnt), MAX_BACKOFF)만큼 API가 실패시 대기합니다.
INITIAL_BACKOFF=60
MAX_BACKOFF=600
# ========== Monitoring ==========
# 선택
PROXY_HOST=127.0.0.1
PROXY_PORT=11080
BACKEND_URL=http://localhost:11081
BACKEND_URL=http://localhost:11081
# https://docs.browser-use.com/development/observability
# Lmnr 계정이 필요합니다.
# https://lmnr.ai/
LMNR_PROJECT_API_KEY=
# 브라우저 언어 설정
LANG=en_US
# ========= Account ==========
# 필수 뒤에 있는 이메일 주소는 Google 계정의 로그인 힌트로 사용됩니다.
# 이메일의 전체를 입력해주세요
GOOGLE_ID=bot.imnya.ng@gmail.com
# provider 계정 (본인이 사용하지 않는 계정 권장) (Github, apple, kakao등 다른 계정 추가 가능)
# PROVIDOR_CREDENTIALS_IN_LLM는 True로 설정시, 아래 계정 정보가 LLM에 포함되어 사용됩니다.
# 쿠키와 로컬스토리지 사용은 제외됩니다.
PROVIDOR_CREDENTIALS_IN_LLM=False
GOOGLE_ID=
GOOGLE_PASSWORD=
NAVER_ID=
NAVER_PASSWORD=
FACEBOOK_ID=
FACEBOOK_PASSWORD=
GITGUB_ID=
GITHUB_PASSWORD=
LinkedIn_ID=
LinkedIn_PASSWORD=
Microsoft_ID=
Microsoft_PASSWORD=

View file

@ -6,25 +6,39 @@
# 환경 설정
이 프로젝트는 [uv](https://docs.astral.sh/uv/getting-started/installation/)라는 Python 패키지 관리자를 사용하여 설정해야합니다.
또한 [oauth-backend](https://github.com/j93es/oauth-backend)가 설정된 상태여야만 합니다.
또한 [oauth-backend](https://github.com/j93es/oauth-backend)가 설정되길 권장합니다.
> 프록시를 사용한다면 이 가이드에 따라 인증서 또한 설정되어야만 합니다.
>
> 그렇지 않으면 실행되지 않습니다.
>
> 윈도우 환경에서는 `sudo certutil -addstore root mitmproxy-ca-cert.cer`로 인증합니다.
>
> Sudo가 활성화되어있지 않은 환경에서는 관리자로 상향된 쉘에서 실행합니다.
>
> MacOS 환경에서는 `sudo security add-trusted-cert -d -p ssl -p basic -k /Library/Keychains/System.keychain ~/.mitmproxy/mitmproxy-ca-cert.pem`으로 인증합니다.
>
> 다른 플렛폼은 수동으로 설정되어야만 합니다.
> https://docs.mitmproxy.org/stable/concepts/certificates/
---
uv 설치 후 다음과 같은 명령어를 입력합니다.
```
```sh
uv sync
```
venv와 패키지가 설치가 됩니다.
browser_use가 Playwright에 대한 의존성이 있어 브라우저 설치가 필요합니다
~~browser_use가 Playwright에 대한 의존성이 있어 브라우저 설치가 필요합니다~~
스텔스 기능 때문에 Chrome이 필요합니다.
```
playwright install chromium --with-deps --no-shell
```
다음과 같은 명령어로 실행합니다.
```
```sh
uv run main.py
```
@ -32,6 +46,12 @@ Environment는 .env.example에 따라 설정되어야합니다.
.env.example을 .env로 복사하여서 사용해주세요.
# 쿠키와 로컬 스토리지 설정 방법
```sh
uv run playwright open https://google.com/ --save-storage=./data/storage_state.json
```
# 실행
```sh
@ -44,6 +64,8 @@ curl "https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt" -o domains.txt
```pwsh
# domains.txt 받기
curl "https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt" -o domains.txt
# ./run.ps1 {domains.txt 시작 줄} {domains.txt 끝 줄} {HTML 검사 Skip}
./run.ps1 12540 13000 False
```

View file

@ -1,29 +0,0 @@
from browser_use.browser.context import BrowserContextConfig
from pathlib import Path
import os
from typing import Any
def browser_config_kwargs(lang: str = "en_US") -> dict[str, Any]:
browser_config_kwargs: dict[str, Any] = {
"keep_alive": True,
"browser_type": "chromium",
"headless": False,
"disable_security": True,
"extra_browser_args": [
"--disable-web-security",
"--disable-features=IsolateOrigins,site-per-process",
"--disable-popup-blocking",
f"--lang={lang}",
"--ignore-certificate-errors"
],
}
proxy_host = os.getenv("PROXY_HOST")
proxy_port = os.getenv("PROXY_PORT")
if proxy_host and proxy_port:
browser_config_kwargs["extra_browser_args"].append(
f"--proxy-server=http={proxy_host}:{proxy_port};https={proxy_host}:{proxy_port}"
)
return browser_config_kwargs

25
lib/llm/__init__.py Normal file
View file

@ -0,0 +1,25 @@
from langchain.callbacks.base import BaseCallbackHandler
from langchain_google_genai import ChatGoogleGenerativeAI
class QuotaExhaustedHandler(BaseCallbackHandler):
def on_llm_error(self, error, **kwargs):
if "ResourceExhausted" in str(error) or "429" in str(error):
print("⚠️ API 쿼터가 소진되었습니다. 재시도 로직에 위임합니다...")
# backoff handled in scan_one_url
def CreateChatGoogleGenerativeAI(model: str):
"""재시도 로직이 포함된 LLM 생성"""
if model == "fallback":
print("⚠️ Fallback 모델을 사용합니다. Envorinment 변수를 확인하세요.")
print("⚠️ Model Gemini-2.0-flash-lite를 사용합니다.")
model = "gemini-2.0-flash-lite"
return ChatGoogleGenerativeAI(
model=model,
max_retries=10, # 최대 재시도 횟수 증가
model_kwargs={
"request_timeout": 120, # 타임아웃 시간 증가 (2분)
},
callbacks=[QuotaExhaustedHandler()],
# API 호출 간격 조정
temperature=0.1,
)

View file

@ -0,0 +1,12 @@
from dotenv import load_dotenv
import os
from lib.llm.prompt import llm_login, session
load_dotenv(override=True)
def extend_planner_system_message():
if os.getenv("PROVIDOR_CREDENTIALS_IN_LLM", "False").lower() == "true":
return llm_login.extend_planner_system_message
else:
return session.extend_planner_system_message

View file

@ -1,17 +1,34 @@
from dotenv import load_dotenv
import os
load_dotenv(override=True)
google_id = os.getenv("GOOGLE_ID", "")
google_password = os.getenv("GOOGLE_PASSWORD", "")
naver_id = os.getenv("NAVER_ID", "")
naver_password = os.getenv("NAVER_PASSWORD", "")
facebook_id = os.getenv("FACEBOOK_ID", "")
facebook_password = os.getenv("FACEBOOK_PASSWORD", "")
github_id = os.getenv("GITHUB_ID", "")
github_password = os.getenv("GITHUB_PASSWORD", "")
# Extended planner prompt
extend_planner_system_message = """
extend_planner_system_message = f"""
🎯 Mission: Collect Initial SSO Redirect URLs (For Browser Automation)
**모든 STEP에서 구글 검색, Bing 검색 어떤 외부 검색 기능도 절대 사용하지 않고, 초기에 주어진 URL에서 탐색하세요.**
**초기에 주어진 URL 내에서 실제로 확인되지 않은 URL로 직접 이동하는것은 허용되지 않습니다.**
0. **초기 블록(Block) 체크**
- 브라우저가 로그인 페이지에 접근하려 , **페이지가 차단(blocked)** 되거나 **방화벽, CAPTCHA, 접근 제한** 등으로 인해 정상적으로 로드되지 않으면 즉시 프로세스를 종료하고 아래 JSON만 반환해야 합니다.
```json
[
{
{{
"provider": "Blocked",
"oauth_uri": "-"
}
}}
]
```
- 이후 단계로 절대 넘어가지 않도록 합니다.
@ -23,9 +40,7 @@ extend_planner_system_message = """
2. **SSO 버튼 식별**
- 로그인 페이지에서 다음과 같은 소셜 로그인 버튼을 찾습니다:
- Continue with Google
- Sign in with GitHub
- Login with Naver
- Google, GitHub, Facebook, Linkedin, Microsoft, Naver
- **실제 SSO 버튼**임이 명확히 확인되는 경우에만 진행합니다.
- 제외 대상:
- Passkey 관련 버튼
@ -33,28 +48,29 @@ extend_planner_system_message = """
- 이메일 기반 로그인
- 인증서, 휴대폰 인증 -OAuth 로그인 옵션
3. **리디렉션 URL 캡처**
- 유효한 SSO 버튼을 하나 이상 찾았다면, 각각의 버튼을 ** 탭으로 열기** 시도하거나, 불가능할 경우 **직접 클릭**합니다.
- 클릭 번째로 **리디렉션된 URL(쿼리 스트링 포함)** 캡처합니다. URL은:
- 예시: `https://example.com/auth/google?include_all_params=...`
- **OAuth 공급자 자체 엔드포인트** (: `https://accounts.google.com/...`) 수집하지 않습니다.
- 만약 **반복 행동(looping)** 감지될 경우(: 동일한 버튼을 여러 열거나 페이지 반복 이동), 즉시 프로세스를 종료하고 ** 배열** 반환합니다:
```json
[]
```
- 정상적으로 리디렉션 URL을 획득했다면, 아래 형식으로 결과를 수집합니다:
```json
[
{
"provider": "Google",
"oauth_uri": "https://example.com/auth/google?include_all_params=..."
},
{
"provider": "GitHub",
"oauth_uri": "https://example.com/auth/github?include_all_params=..."
}
]
```
3. **SSO 버튼 클릭 로그인 시도**
- 유효한 SSO 버튼이 발견되면, 버튼을 클릭합니다.
- 클릭 ** 번째로 리디렉션된 URL(쿼리 스트링 포함)** `oauth_uri` 기록합니다.
- 공급자 페이지가 열리면, 아래 자격증명을 이용해 로그인을 시도합니다, 아래 자격증명에 포함되지 않는 SSO 버튼도 클릭까지는 시도합니다.:
- Google `{google_id}` / `{google_password}`
- Naver `{naver_id}` / `{naver_password}`
- GitHub `{github_id}` / `{github_password}`
- 자격증명이 주어진 SSO 버튼인 경우 로그인 과정을 진행합니다.
- 로그인 과정이 모두 끝나거나 로그인이 되지 않는 경우 세션 쿠키를 모두 삭제하고 페이지를 새로고침합니다.
- 아직 로그인을 시도하지 않은 SSO 버튼이 있다면 이전 단계인 1. **로그인 페이지 탐색**, 2. **SSO 버튼 식별**, 3. **SSO 버튼 클릭 로그인 시도** 돌아가 절차를 반복합니다.
- 최종 결과는 다음과 같이 기록합니다:
```json
[
{{
"provider": "Google",
"oauth_uri": "(optional) https://example.com/auth/google?client_id=...",
}},
{{
"provider": "Naver",
"oauth_uri": "(optional) https://example.com/auth/naver?client_id=...",
}}
]
```
4. **SSO 버튼 미발견 또는 오류 발생 **
- 페이지 내부에 유효한 SSO 버튼이 전혀 없거나, 탐색 예기치 않은 오류가 발생하면 즉시 프로세스를 종료하고 ** 배열** 반환합니다:

136
lib/llm/prompt/session.py Normal file
View file

@ -0,0 +1,136 @@
import os
from dotenv import load_dotenv
load_dotenv(override=True)
# Extended planner prompt
extend_planner_system_message = f"""
🎯 목적: 자동화를 위한 **SSO 로그인 리디렉션 URL 수집**
📌 주의사항 (전제 조건)
- **검색 엔진(Google, Bing ) 사용 금지**
- **초기 제공된 URL 내에서만 탐색**
- 직접 이동하거나 추측한 링크 클릭 금지
- 추측한 URL은 대답하거나 클릭하지 마세요
---
## 🧩 Step 0: 페이지 차단(Block) 여부 확인
초기 URL의 로그인 페이지에 접근하여 다음 사항을 점검합니다:
- 🚫 페이지 차단됨 (Firewall, Access Denied ) 즉시 중단
- 🔒 CAPTCHA는 통과 가능 (해결하고 계속 진행)
- 로그인 UI가 정상적으로 로드되지 않으면 중단
📤 차단 즉시 반환:
```json
[
{{
"provider": "Blocked",
"oauth_uri": "-"
}}
]
````
---
## 🔍 Step 1: 로그인 페이지 탐색
* 초기 URL에 접속하여 **클라이언트용 로그인 페이지** 진입합니다.
* 쿠키 동의, 개인정보 안내 팝업은 무시하거나 닫고 계속 진행하세요.
* 페이지가 정상 로드되었다고 가정합니다.
---
## 👀 Step 2: SSO 로그인 버튼 식별
아래 **OAuth SSO 버튼들만** 유효합니다:
* Google, GitHub, Facebook, LinkedIn, Microsoft, Naver
**유효한 버튼 기준**:
* OAuth 인증 흐름을 실제로 트리거
* `window.location` 또는 `<a href=...>` 또는 JS로 redirect가 발생
**제외 버튼들 (클릭 금지)**:
* 일반 로그인, 패스키, 이메일/전화번호, 인증서 기반, 비밀번호 입력
---
## ✅ Step 3: 모든 SSO 버튼 클릭 및 로그인 시도
> SSO 로그인 버튼을 클릭한 반드시 아래 절차를 **완전히 수행**해야 합니다.
SSO 버튼에 대해 다음을 수행:
1. 버튼 클릭
2. 🌐 페이지가 이동되면, **현재 주소창(URL) 확인하여 리디렉션된 OAuth URL** `oauth_uri` 저장
: `https://accounts.google.com/o/oauth2/auth?...`
3. 로그인 진행:
- 세션 쿠키에 따라 이미 로그인된 상태로 간주하고 진행
- Google OAuth인 경우 URL에 `&login_hint={str(os.getenv('GOOGLE_ID'))}` 추가
- 버튼같은게 안눌리면 새로고침을 해봐
- **로그인 완료 authorize 버튼이 있으면 클릭**
- GitHub같은 경우 Authorize 버튼이 뜨는데 오래걸릴 있음, 기다려야 수도 있음
- 만약 버튼을 눌러도 반응이 없을 경우 새로고침을 한번 해주세요.
4. 로그인 성공 원래 페이지로 돌아오고, 해당 OAuth URL은 결과에 저장
5. 다음 SSO 버튼으로 반복 진행
🛑 절대 아래와 같이 해석하지 :
- 버튼 클릭 페이지 로딩만 기다리고 돌아가기
- URL 저장 없이 go_back() 호출
📤 로그인 다음 형식으로 결과 저장:
```json
[
{{
"provider": "Google",
"oauth_uri": "https://example.com/auth/google?client_id=..."
}}
]
````
````
---
### ✨ 추가 안전 장치: "뒤로가기(go_back) 호출 조건" 제한
```text
🛑 뒤로가기(go_back) 다음 조건이 모두 충족될 때만 사용:
- 로그인 흐름이 완료됨 (: redirect back to app, or callback URL)
- 현재 리디렉션 URL이 수집됨
- 결과에 저장 다음 버튼 탐색을 위해 복귀 필요할
```
---
## 🚫 Step 4: 버튼 없음 또는 예외 발생 시
* 유효한 SSO 버튼이 **전혀 없을 경우**
* 예외, 오류 발생
📤 즉시 중단 다음 형식으로 반환:
```json
[]
```
---
## 📎 중요 규칙 요약
* **모든 SSO 로그인은 반드시 실행** (가능한 버튼은 모두 클릭)
* 🔁 단계는 반드시 순서대로 진행
* 🔐 로그인은 쿠키/세션으로 유지된 상태에서 수행
* 🚫 직접 ID/PW 입력하지 않음
* 추측 URL 클릭 금지
* 예외 발생 반드시 규정된 JSON 포맷만 반환
---
"""

40
lib/utils/__init__.py Normal file
View file

@ -0,0 +1,40 @@
from lib.utils.config import (
BACKEND_URL,
GOOGLE_API_KEY,
GOOGLE_MODEL,
GOOGLE_PLANNER_MODEL,
)
def show_info():
print("🔧 환경 설정:")
print(browser_use_version())
print(f"🔗 Backend URL: {BACKEND_URL}")
print(
f"🔑 Google API Key: {'*' * (len(GOOGLE_API_KEY) - 4) + GOOGLE_API_KEY[-4:] if GOOGLE_API_KEY else None}"
)
print(f"🌐 Google Model: {GOOGLE_MODEL}")
print(f"🌐 Google Planner Model: {GOOGLE_PLANNER_MODEL}")
def browser_use_version():
try:
# run uv pip show browser-use
import subprocess
result = subprocess.run(
["uv", "pip", "show", "browser-use"],
capture_output=True,
text=True,
check=True,
)
print("📦 Browser Use 패키지 정보:")
return result.stdout.strip()
except ImportError:
return None
def env_cheker():
if GOOGLE_API_KEY is None:
raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.")

View file

@ -0,0 +1,22 @@
import requests
from lib.utils.config import BACKEND_URL
def notify_backend(target_url):
# Backend에 스캔 시작을 알림
try:
response = requests.post(
f"{BACKEND_URL}/start", params={"url": target_url}, timeout=5
)
if response.status_code == 200:
print(f"✅ Backend notified: {response.text}")
else:
print(f"⚠️ Backend notification failed: {response.status_code}")
except requests.exceptions.ConnectionError:
print(
f"⚠️ Backend server not available at {BACKEND_URL}. Continuing without notification."
)
except requests.exceptions.Timeout:
print(f"⚠️ Backend notification timed out. Continuing without notification.")
except Exception as e:
print(f"⚠️ Failed to notify backend: {e}")

View file

@ -0,0 +1,31 @@
from lib.utils.browser_use.func import *
# Initialize configuration
proxy_url = setup_proxy()
# Create browser profile
async def GetProfile():
storage_state_path = await setup_storage_state()
profile = BrowserProfile(
# Security settings
disable_security=True,
stealth=True,
# Display settings
headless=False,
device_scale_factor=1,
window_size={"width": 1600, "height": 900},
viewport={"width": 1600, "height": 900},
# Data persistence
user_data_dir=None,
storage_state=storage_state_path,
# Network settings
proxy={"server": proxy_url} if proxy_url else None,
# Additional arguments
args=get_browser_args(),
)
return profile

View file

@ -0,0 +1,25 @@
from pathlib import Path
async def clean_resources(agent=None, session=None):
"""리소스를 정리하는 함수"""
storage_state_temp_path = Path("./data/storage_state_temp.json").resolve()
if storage_state_temp_path.exists():
try:
# remove file
print(f"🗑️ 임시 스토리지 상태 파일 삭제 중: {storage_state_temp_path}")
# unlink removes the file
storage_state_temp_path.unlink()
print("🗑️ 임시 스토리지 상태 파일 삭제 완료.")
except Exception as e:
print(f"⚠️ 임시 스토리지 상태 파일 삭제 실패: {e}")
if agent:
try:
await agent.close()
except Exception as e:
print(f"⚠️ 에이전트 리소스 정리 실패: {e}")
if session:
try:
await session.close()
except Exception as e:
print(f"⚠️ 세션 리소스 정리 실패: {e}")

View file

@ -0,0 +1,75 @@
import os
from pathlib import Path
from dotenv import load_dotenv
from browser_use import BrowserProfile
# Load environment variables
load_dotenv(override=True)
def setup_proxy():
"""Configure proxy settings from environment variables."""
proxy_host = os.getenv("PROXY_HOST")
proxy_port = os.getenv("PROXY_PORT")
if proxy_host and proxy_port:
proxy_url = f"http://{proxy_host}:{proxy_port}"
print(f"🔗 Using proxy: {proxy_host}:{proxy_port}")
return proxy_url
else:
print("🔗 No proxy configured, using direct connection.")
return None
async def setup_storage_state():
"""Setup browser storage state for session persistence."""
# Get the script directory to ensure correct path resolution
script_dir = Path(__file__).parent.parent.parent.parent
storage_state_path = script_dir / "data" / "storage_state.json"
storage_state_temp_path = script_dir / "data" / "storage_state_temp.json"
print(f"📂 Storage state path: {storage_state_path}")
print(f"📂 Temp storage state path: {storage_state_temp_path}")
if storage_state_path.exists():
if storage_state_temp_path.exists():
storage_state_temp_path.unlink()
storage_state_temp_path.write_text(
storage_state_path.read_text(encoding="utf-8"), encoding="utf-8"
)
print(f"🔄 Using existing storage state: {storage_state_temp_path}")
return str(storage_state_temp_path)
print("⚠️ No existing storage state found")
return None
def get_browser_args():
"""Get browser arguments for enhanced compatibility and security."""
return [
# Security and isolation
"--disable-web-security",
"--disable-site-isolation-trials",
"--disable-features=IsolateOrigins,site-per-process",
"--ignore-certificate-errors",
"--ignore-ssl-errors",
"--allow-running-insecure-content",
# Performance and rendering
"--disable-features=VizDisplayCompositor",
"--disable-dev-shm-usage",
# Popup and automation
"--disable-popup-blocking",
"--disable-blink-features=AutomationControlled",
# Browser behavior
"--no-first-run",
"--no-service-autorun",
"--no-default-browser-check",
"--password-store=basic",
"--use-mock-keychain",
# Extensions
"--disable-extensions-file-access-check",
"--disable-extensions-http-throttling",
"--disable-component-extensions-with-background-pages",
# Language
f"--lang={os.getenv('LANG', 'en_US')}",
]

View file

@ -0,0 +1,11 @@
from typing import List
from pydantic import BaseModel
# 출력 모델
class OAuth(BaseModel):
provider: str
oauth_uri: str
class OAuthList(BaseModel):
oauth_providers: List[OAuth]

8
lib/utils/config.py Normal file
View file

@ -0,0 +1,8 @@
import os
from dotenv import load_dotenv
load_dotenv(verbose=True, override=True)
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:11081")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash-preview-05-20")
GOOGLE_PLANNER_MODEL = os.getenv("GOOGLE_PLANNER_MODEL", "gemini-2.5-pro-preview-06-05")

374
main.py
View file

@ -3,228 +3,284 @@ import json
import os
import csv
import argparse
import requests
from typing import List
from pathlib import Path
import signal
from dotenv import load_dotenv
from pydantic import BaseModel
from langchain_google_genai import ChatGoogleGenerativeAI
from browser_use import Agent, Browser, BrowserConfig, Controller
from browser_use.browser.context import BrowserContext, BrowserContextConfig
from lib.browser_config import browser_config_kwargs
from lib.is_html import is_html_url
from lib.read_txt import read_lines_between
from lib.prompt import extend_planner_system_message
from lib.logger import logger
load_dotenv()
from browser_use import (
Agent,
BrowserSession,
Controller,
)
from patchright.async_api import async_playwright as async_patchright
if os.getenv("GOOGLE_API_KEY") is None:
raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.")
if os.getenv("GOOGLE_MODEL") is None:
raise ValueError("GOOGLE_MODEL 환경변수가 설정되지 않았습니다.")
if os.getenv("GOOGLE_PLANNER_MODEL") is None:
raise ValueError("GOOGLE_PLANNER_MODEL 환경변수가 설정되지 않았습니다.")
from lib.utils import env_cheker
from lib.utils.backend_client import notify_backend
from lib.utils.browser_use import model
from lib.utils.browser_use.clean_resources import clean_resources
from lib.utils.browser_use.func import setup_storage_state
from lib.utils.config import BACKEND_URL, GOOGLE_MODEL, GOOGLE_PLANNER_MODEL
from lib.utils.is_html import is_html_url
from lib.utils.read_txt import read_lines_between
from lib.llm.prompt import extend_planner_system_message
from lib.utils.logger import logger
import lib.utils.browser_use as browser_use
from lib.llm import CreateChatGoogleGenerativeAI
backend_url = os.getenv("BACKEND_URL", "http://localhost:11081")
load_dotenv(verbose=True, override=True)
# 출력 모델
class OAuth(BaseModel):
provider: str
oauth_uri: str
# Exponential backoff settings
INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", "60")) # seconds
MAX_BACKOFF = int(os.getenv("MAX_BACKOFF", "600")) # seconds
# 진행 상황 추적을 위한 전역 변수
current_progress = {"current_index": 0, "total": 0, "current_url": "", "start_line": 0}
progress_file = Path("data/scan_progress.json")
env_cheker()
if os.getenv("LMNR_PROJECT_API_KEY"):
from lmnr import Laminar
Laminar.initialize(project_api_key=os.getenv("LMNR_PROJECT_API_KEY"))
def save_progress():
"""현재 진행 상황을 파일에 저장"""
with open(progress_file, 'w', encoding='utf-8') as f:
json.dump(current_progress, f, ensure_ascii=False, indent=2)
def load_progress():
"""이전 진행 상황을 파일에서 불러오기"""
if os.path.exists(progress_file):
try:
with open(progress_file, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return None
return None
def signal_handler(signum, frame):
"""Ctrl+C 시그널 핸들러"""
print("\n" + "="*60)
print("🛑 스캔이 중단되었습니다!")
print(f"📊 진행 상황:")
print(f" - 전체: {current_progress['total']}개 URL")
print(f" - 완료: {current_progress['current_index']}개 URL")
print(f" - 현재 처리 중: {current_progress['current_url']}")
print(f" - domains.txt의 {current_progress['start_line'] + current_progress['current_index']}번째 줄")
print(f" - 진행률: {current_progress['current_index']}/{current_progress['total']} ({current_progress['current_index']/current_progress['total']*100:.1f}%)")
print("="*60)
save_progress()
print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.")
exit(0)
# 시그널 핸들러 등록
signal.signal(signal.SIGINT, signal_handler)
class OAuthList(BaseModel):
oauth_providers: List[OAuth]
async def clean_resources(agent, context, browser):
"""리소스를 정리하는 함수"""
try:
await agent.close()
except Exception as e:
print(f"⚠️ 에이전트 리소스 정리 실패: {e}")
try:
await context.close()
except Exception as e:
print(f"⚠️ 컨텍스트 리소스 정리 실패: {e}")
try:
await browser.close()
except Exception as e:
print(f"⚠️ 브라우저 리소스 정리 실패: {e}")
# ── URL별로 Browser를 새로 띄우는 함수 ──
async def scan_one_url(url: str, skip_html_check: bool = False):
await setup_storage_state()
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🚀 Starting scan for: {target_url}")
# 1) URL이 HTML 페이지인지 확인
if not is_html_url(target_url) and not skip_html_check:
print(f"{target_url} 은(는) HTML이 아닙니다. 스킵합니다.")
return
# Backend에 스캔 시작을 알림
try:
response = requests.post(f"{backend_url}/start", params={"url": target_url}, timeout=5)
if response.status_code == 200:
print(f"✅ Backend notified: {response.text}")
else:
print(f"⚠️ Backend notification failed: {response.status_code}")
except requests.exceptions.ConnectionError:
print(f"⚠️ Backend server not available at {backend_url}. Continuing without notification.")
except requests.exceptions.Timeout:
print(f"⚠️ Backend notification timed out. Continuing without notification.")
except Exception as e:
print(f"⚠️ Failed to notify backend: {e}")
notify_backend(target_url)
agent = None
session = None
try_cnt = 0
while True:
# 2) Browser + Context 생성
browser = Browser(config=BrowserConfig(**browser_config_kwargs()))
context = BrowserContext(
browser=browser,
config=BrowserContextConfig(
wait_for_network_idle_page_load_time=3.0,
window_width=1600,
window_height=900,
locale='en-US',
highlight_elements=True,
viewport_expansion=500,
keep_alive=False
)
)
# 3) Agent, Controller 생성
initial_actions = [
{'open_tab': {'url': target_url}},
]
controller = Controller(output_model=OAuthList)
agent = Agent(
browser_context=context,
browser=browser,
initial_actions=initial_actions,
task=f"Navigate to the login page, and collect the OAuth provider buttons and their login URLs. Ignore Passkey.",
llm=ChatGoogleGenerativeAI(model=os.getenv("GOOGLE_MODEL")),
planner_llm=ChatGoogleGenerativeAI(model=os.getenv("GOOGLE_PLANNER_MODEL")),
controller=controller,
extend_planner_system_message=extend_planner_system_message,
retry_delay=60,
# BrowserSession에 profile 전달
session = BrowserSession(
playwright=(await async_patchright().start()),
browser_profile=await browser_use.GetProfile(),
)
# Agent 생성 및 실행 (단일 try-except with 백오프)
initial_actions = [{"open_tab": {"url": target_url}}]
controller = Controller(output_model=model.BaseModel)
print("🤖 LLM 모델 초기화 및 스캔 시작...")
try:
# 4) 실제 스캔 실행
agent = Agent(
browser_session=session,
initial_actions=initial_actions,
task=(
"Navigate to the login page, identify all OAuth provider buttons (excluding Passkey), "
"and for each one: click the button, follow the full OAuth login flow as far as possible "
"with a real user account (without using a fake or non-existent account), and capture the "
"final redirect URL after login. Do not stop at just collecting the initial authorization URL—"
"actually perform the login step like a real user would. "
"If the OAuth buttons do not appear immediately, wait briefly to allow the page to load completely before proceeding. "
"Always log out before starting the login process, and make sure to attempt the login again from a clean state."
),
llm=CreateChatGoogleGenerativeAI(GOOGLE_MODEL),
planner_llm=CreateChatGoogleGenerativeAI(GOOGLE_PLANNER_MODEL),
controller=controller,
extend_planner_system_message=extend_planner_system_message(),
)
response = await agent.run()
final_result = response.final_result()
if final_result is None:
raise ValueError("final_result()가 None을 반환했습니다.")
data = json.loads(final_result)
try:
oauth_entries: List[OAuth] = [OAuth(**entry) for entry in data["oauth_providers"]]
except Exception as e:
raise ValueError(f"결과 파싱 실패: {e}\n원본 결과: {final_result}")
# 5) 결과 출력
print("-" * 50)
print(f"🔗 Scanned URL: {url}\n")
print("🔐 Detected OAuth Providers and URLs:")
for entry in oauth_entries:
if "<" in entry.oauth_uri or "..." in entry.oauth_uri:
print(f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n")
else:
print(f"- {entry.provider}: {entry.oauth_uri}")
print("-" * 50)
# 6) CSV에 저장 (append)
csv_file = "./oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri"])
for entry in oauth_entries:
writer.writerow([url, entry.provider, entry.oauth_uri])
print(f"✅ OAuth providers saved to {csv_file}\n")
await clean_resources(agent, context, browser)
# 성공적으로 처리했으므로 반복문 탈출
break
except Exception as e:
await clean_resources(agent, context, browser)
if try_cnt >= 1:
print(f"{url} 스캔에 실패했습니다. 에러: {e}")
logger(f"{url} 스캔에 실패했습니다. 에러: {e}")
return
await clean_resources(agent, session)
# API 쿼터 문제인지 확인
if "ResourceExhausted" in str(e) or "429" in str(e):
wait = min(INITIAL_BACKOFF * (2**try_cnt), MAX_BACKOFF)
print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...")
await asyncio.sleep(wait)
try_cnt += 1
if try_cnt >= 3:
print(f"{url} 스캔 실패: API 쿼터 문제가 지속됩니다.")
logger(f"{url} 스캔 실패: API 쿼터 문제: {e}")
return
continue
# 일반 에러 처리
try_cnt += 1
if try_cnt >= 3:
print(f"{url} 스캔 실패: 에러: {e}")
logger(f"{url} 스캔 실패: 에러: {e}")
return
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
# 1분 대기
await asyncio.sleep(5)
# 반복문을 통해 재시도
await asyncio.sleep(30)
continue
async def loop(filepath: str, start_line: int, end_line: int, skip_html_check: bool = False):
# 스캔 결과 처리
data = json.loads(final_result)
try:
oauth_entries = [model.OAuth(**entry) for entry in data["oauth_providers"]]
except Exception as e:
raise ValueError(f"결과 파싱 실패: {e}\n원본 결과: {final_result}")
print("-" * 50)
print(f"🔗 Scanned URL: {url}\n")
print("🔐 Detected OAuth Providers and URLs:")
for entry in oauth_entries:
if "<" in entry.oauth_uri or "..." in entry.oauth_uri:
print(
f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n"
)
else:
print(f"- {entry.provider}: {entry.oauth_uri}")
print("-" * 50)
# CSV에 저장 (append)
csv_file = "./oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri"])
for entry in oauth_entries:
writer.writerow([url, entry.provider, entry.oauth_uri])
await clean_resources(agent, session)
break
async def loop(
filepath: str, start_line: int, end_line: int, skip_html_check: bool = False
):
# 인자값으로 받은 파일 경로와 줄 범위를 통해 도메인 리스트 생성
target_list = read_lines_between(
filepath=filepath,
start_line=start_line,
end_line=end_line
filepath=filepath, start_line=start_line, end_line=end_line
)
# 진행 상황 초기화
current_progress["total"] = len(target_list)
current_progress["start_line"] = start_line
current_progress["current_index"] = 0
# 이전 진행 상황 확인
prev_progress = load_progress()
if prev_progress and prev_progress.get("start_line") == start_line:
print(f"📋 이전 진행 상황을 발견했습니다:")
print(f" - 이전 완료: {prev_progress['current_index']}/{prev_progress['total']}")
print(f" - 마지막 처리: {prev_progress.get('current_url', 'N/A')}")
resume = input("이어서 진행하시겠습니까? (y/n): ").lower().strip()
if resume == 'y':
current_progress["current_index"] = prev_progress["current_index"]
target_list = target_list[current_progress["current_index"]:]
print(f"{current_progress['current_index']}번째부터 재개합니다.")
# (필요하다면) 강제 설정이 필요한 경우, 아래 주석을 해제하여 target_list[0] 등을 덮어쓸 수 있습니다.
# target_list[0] = "velog.io"
for url in target_list:
# scan_one_url은 외부에 정의된 비동기 함수라고 가정합니다.
# 실제로 scan_one_url이 정의된 위치를 import하거나
# 모듈 수준에 구현해두셔야 합니다.
for i, url in enumerate(target_list):
actual_index = current_progress["current_index"] + i
current_progress["current_url"] = url
current_progress["current_index"] = actual_index
print(f"\n🔄 Processing {actual_index + 1}/{current_progress['total']}: {url}")
print(f"📍 domains.txt의 {start_line + actual_index}번째 줄")
# URL들 사이에 API 쿼터 회복을 위한 대기 시간 추가
if actual_index > 0:
print("⏳ API 쿼터 보호를 위해 30초 대기 중...")
await asyncio.sleep(30)
await scan_one_url(url, skip_html_check=skip_html_check)
# 진행 상황 저장
current_progress["current_index"] = actual_index + 1
save_progress()
print(f"\n🎉 모든 스캔이 완료되었습니다! ({current_progress['total']}개 URL)")
# 완료 후 진행 상황 파일 삭제
if os.path.exists(progress_file):
os.remove(progress_file)
def main():
parser = argparse.ArgumentParser(
prog="domain_scanner",
description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다."
description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다.",
)
# 커맨드라인 인자로 받을 옵션들 정의
parser.add_argument(
"-f", "--file",
"-f",
"--file",
type=str,
required=True,
help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)"
help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)",
)
parser.add_argument(
"-s", "--start",
type=int,
required=True,
help="읽기 시작 줄 번호 (1-based)"
"-s", "--start", type=int, required=True, help="읽기 시작 줄 번호 (1-based)"
)
parser.add_argument(
"-e", "--end",
type=int,
required=True,
help="읽기 종료 줄 번호 (1-based)"
"-e", "--end", type=int, required=True, help="읽기 종료 줄 번호 (1-based)"
)
parser.add_argument(
"-skh", "--skip-html-check",
"-skh",
"--skip-html-check",
type=bool,
default=False,
help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다. (기본값: False)"
help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다. (기본값: False)",
)
args = parser.parse_args()
# 인자값을 비동기 함수에 전달
asyncio.run(loop(
filepath=args.file,
start_line=args.start,
end_line=args.end,
skip_html_check=args.skip_html_check
))
asyncio.run(
loop(
filepath=args.file,
start_line=args.start,
end_line=args.end,
skip_html_check=args.skip_html_check,
)
)
if __name__ == "__main__":

View file

@ -5,5 +5,7 @@ description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"browser-use[memory]>=0.1.48",
"browser-use[memory]>=0.2.7",
"lmnr[all]>=0.6.10",
"patchright>=1.52.5",
]

34
run.ps1
View file

@ -4,9 +4,6 @@ $PYTHON_SCRIPT = "main.py"
# 도메인 목록 파일 경로 (Python 스크립트 실행 시 -f 옵션에 전달)
$DOMAIN_FILE = "./domains.txt"
# 몇 줄씩(chunk) 나눠서 실행할지
$CHUNK_SIZE = 10
# ─────────────
# https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt
@ -26,27 +23,14 @@ $START_LINE = [int]$args[0]
$END_LINE = [int]$args[1]
$SKIP_HEADER = if ($args.Count -eq 3) { $args[2] } else { "False" }
# START_LINE부터 END_LINE까지 CHUNK_SIZE 만큼씩 반복
$current = $START_LINE
while ($current -le $END_LINE) {
# 각 청크 구간의 마지막 줄 계산
$chunk_end = $current + $CHUNK_SIZE - 1
if ($chunk_end -gt $END_LINE) {
$chunk_end = $END_LINE
}
$timestamp = Get-Date -Format "yyyy-MM-dd HH:mm:ss"
Write-Host "[$timestamp] Processing lines $START_LINE to $END_LINE..."
$timestamp = Get-Date -Format "yyyy-MM-dd HH:mm:ss"
Write-Host "[$timestamp] Processing lines $current to $chunk_end..."
# Python 스크립트 실행
# -f DOMAIN_FILE: 도메인 목록 파일 경로
# -s current : 읽기 시작 줄
# -e chunk_end: 읽기 끝 줄
# -skh SKIP_HEADER: 헤더 스킵 여부
uv run $PYTHON_SCRIPT -f $DOMAIN_FILE -s $current -e $chunk_end -skh $SKIP_HEADER
# Python 스크립트 실행
# -f DOMAIN_FILE: 도메인 목록 파일 경로
# -s START_LINE : 읽기 시작 줄
# -e END_LINE : 읽기 끝 줄
# -skh SKIP_HEADER: 헤더 스킵 여부
uv run $PYTHON_SCRIPT -f $DOMAIN_FILE -s $START_LINE -e $END_LINE -skh $SKIP_HEADER
# 다음 청크의 시작 값 설정
$current = $chunk_end + 1
}
Write-Host "모든 청크 처리 완료."
Write-Host "처리 완료."

18
run.sh
View file

@ -3,7 +3,6 @@
# ── 설정 부분 ──
PYTHON_SCRIPT="main.py"
DOMAIN_FILE="./domains.txt"
CHUNK_SIZE=10
# ─────────────
curl "https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt" -o $DOMAIN_FILE
@ -23,18 +22,7 @@ if [ -z "$SKH_OPTION" ]; then
SKH_OPTION="False"
fi
current=$START_LINE
while [ "$current" -le "$END_LINE" ]; do
chunk_end=$(( current + CHUNK_SIZE - 1 ))
if [ "$chunk_end" -gt "$END_LINE" ]; then
chunk_end=$END_LINE
fi
echo "[$(date '+%Y-%m-%d %H:%M:%S')] Processing lines ${START_LINE} to ${END_LINE}..."
uv run "$PYTHON_SCRIPT" -f "$DOMAIN_FILE" -s "$START_LINE" -e "$END_LINE" -skh $SKH_OPTION
echo "[$(date '+%Y-%m-%d %H:%M:%S')] Processing lines ${current} to ${chunk_end}..."
uv run "$PYTHON_SCRIPT" -f "$DOMAIN_FILE" -s "$current" -e "$chunk_end" -skh $SKH_OPTION
current=$(( chunk_end + 1 ))
sleep 1 # 1초 대기
done
echo "모든 청크 처리 완료."
echo "처리 완료."

899
uv.lock generated

File diff suppressed because it is too large Load diff