Compare commits

..

No commits in common. "main" and "gemini" have entirely different histories.

61 changed files with 1011 additions and 5613 deletions

View file

@ -1,38 +1,9 @@
# 권장 (다른 모델로 교체 가능) [다른 모델로 교체시 성능 보장 불가]
ANONYMIZED_TELEMETRY=false ANONYMIZED_TELEMETRY=false
# ========== LLM ==========
GOOGLE_API_KEY= GOOGLE_API_KEY=
# 권장 (다른 모델로 교체 가능) [다른 모델로 교체시 성능 보장 불가] GOOGLE_MODEL=gemini-2.5-flash-preview-04-17 # 권장 (다른 모델로 교체 가능) [다른 모델로 교체시 성능 보장 불가]
GOOGLE_MODEL=gemini-2.5-flash GOOGLE_PLANNER_MODEL=gemini-2.0-flash-lite # 권장 (다른 모델로 교체 가능) [다른 모델로 교체시 성능 보장 불가]
#GOOGLE_PLANNER_MODEL=gemini-2.5-flash # 왜 비활성화 되었나요? // Planner 모델이 오히려 문제를 일으키는 경우가 있어 비활성화했습니다. 필요시 활성화하세요.
# min(INITIAL_BACKOFF * (2 ** try_cnt), MAX_BACKOFF)만큼 API가 실패시 대기합니다.
INITIAL_BACKOFF=60
MAX_BACKOFF=600
#ENABLE_PLANNER_MODEL_OAUTH_LOGIN=true # OAuth 로그인 시 Planner 모델을 활성화합니다.
#ENABLE_PLANNER_MODEL_OAUTH_LIST=true # OAuth List를 찾을 때 Planner 모델을 활성화합니다.
# ========== Monitoring ==========
# 선택 # 선택
PROXY_HOST=127.0.0.1 PROXY_HOST=127.0.0.1
PROXY_PORT=11080 PROXY_PORT=8080
BACKEND_URL=http://localhost:11081
# https://docs.browser-use.com/development/observability - 선택
# Lmnr 계정이 필요합니다.
# https://lmnr.ai/
LMNR_PROJECT_API_KEY=
# 브라우저 언어 설정
LANG=en_US
HEADLESS=False # 브라우저를 헤드리스 모드로 실행할지 여부. True로 설정하면 브라우저가 보이지 않습니다.
# ========= Account ==========
# 필수 뒤에 있는 이메일 주소는 Google 계정의 로그인 힌트로 사용됩니다.
# 이메일의 전체를 입력해주세요
GOOGLE_ID=whs.imnya.ng@gmail.com

77
.gitignore vendored
View file

@ -6,83 +6,8 @@ dist/
wheels/ wheels/
*.egg-info *.egg-info
oauth_providers.csv
# Virtual environments # Virtual environments
.venv .venv
.env .env
.sensitive.json log_*.log
log_*.log
domains.txt
# Created by https://www.toptal.com/developers/gitignore/api/macos,windows
# Edit at https://www.toptal.com/developers/gitignore?templates=macos,windows
### macOS ###
# General
.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two \r
Icon
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
### macOS Patch ###
# iCloud generated files
*.icloud
### Windows ###
# Windows thumbnail cache files
Thumbs.db
Thumbs.db:encryptable
ehthumbs.db
ehthumbs_vista.db
# Dump file
*.stackdump
# Folder config file
[Dd]esktop.ini
# Recycle Bin used on file shares
$RECYCLE.BIN/
# Windows Installer files
*.cab
*.msi
*.msix
*.msm
*.msp
# Windows shortcuts
*.lnk
my.sh
log.txt
data/
!src/lib/utils/data
# End of https://www.toptal.com/developers/gitignore/api/macos,windows

View file

@ -1,42 +0,0 @@
{
"google.com": {
"x_username": "whs.imnya.ng@gmail.com",
"x_password": "Vb1Mz9pgjY8JVs"
},
"accounts.google.com": {
"x_username": "whs.imnya.ng@gmail.com",
"x_password": "Vb1Mz9pgjY8JVs"
},
"naver.com": {
"x_username": "oauth-test-test",
"x_password": "gx^AKz-289d3/7B"
},
"nid.naver.com": {
"x_username": "oauth-test-test",
"x_password": "gx^AKz-289d3/7B"
},
"github.com": {
"x_username": "imnyang-bot",
"x_password": "6PuVXCH9tpQLNm"
},
"apple.com": {
"x_username": "",
"x_password": ""
},
"appleid.apple.com": {
"x_username": "",
"x_password": ""
},
"microsoft.com": {
"x_username": "whs.imnya.ng@gmail.com",
"x_password": "WHS123987"
},
"login.microsoftonline.com": {
"x_username": "whs.imnya.ng@gmail.com",
"x_password": "WHS123987"
},
"facebook.com": {
"x_username": "01047183675",
"x_password": "whs3oauth@"
}
}

View file

@ -1,3 +0,0 @@
{
"rust-analyzer.initializeStopped": true
}

116
README.md
View file

@ -1,106 +1,48 @@
# 테스트한 영상
https://f.imnya.ng/.whs/teamproject/
# 참고하면 좋을만한 것
- [ ] 일부 웹사이트는 사용자의 언어에 따라 OAuth 옵션을 바꾸기도 합니다.
- [ ] https://docs.browser-use.com/customize/custom-functions
# 환경 설정 # 환경 설정
요구 사항 이 프로젝트는 [uv](https://docs.astral.sh/uv/getting-started/installation/)라는 Python 패키지 관리자를 사용하여 설정해야합니다.
- [uv](https://docs.astral.sh/uv/getting-started/installation/) - Python Package Manager Written by Rust
- [oauth-backend](https://github.com/j93es/oauth-backend)
- [Google Chrome](https://www.google.com/intl/ko_kr/chrome/)
---
> [oauth-backend](https://github.com/j93es/oauth-backend) 프록시를 사용한다면 이 가이드에 따라 인증서 또한 설정되어야만 합니다.
>
> 그렇지 않으면 실행되지 않습니다.
>
> 윈도우 환경에서는 `sudo certutil -addstore root mitmproxy-ca-cert.cer`로 인증합니다.
>
> Sudo가 활성화되어있지 않은 환경에서는 관리자로 상향된 쉘에서 실행합니다.
>
> MacOS 환경에서는 `sudo security add-trusted-cert -d -p ssl -p basic -k /Library/Keychains/System.keychain ~/.mitmproxy/mitmproxy-ca-cert.pem`으로 인증합니다.
>
> 다른 플렛폼은 수동으로 설정되어야만 합니다.
> https://docs.mitmproxy.org/stable/concepts/certificates/
현재 아래와 같은 환경에서 개발되며 테스트되고 있습니다.
- ✅ MacOS 26 Tahoe Developer Beta 2 (25A5295e) en-US aarch64
- ✅ Windows 11 Pro for Workstations 24H2 (26100.4351) en-US x86_64
- ✅ NixOS 25.05.804570.c7ab75210cb8 KDE 6 / Linux 6.15 x86_64
---
다음과 같은 명령어로 환경을 설정합니다.
설명하는 가이드를 잘 따라가면 설정할 수 있습니다.
```sh
uv run setup.py
```
uv 설치 후 다음과 같은 명령어를 입력합니다. uv 설치 후 다음과 같은 명령어를 입력합니다.
```sh ```
uv sync uv sync
``` ```
venv와 패키지가 설치가 됩니다. venv와 패키지가 설치가 됩니다.
--- browser_use가 Playwright에 대한 의존성이 있어 브라우저 설치가 필요합니다
`uv run setup.py`로 환경을 설정합니다. ```
playwright install chromium --with-deps --no-shell
---
# 윈도우 인코딩 이슈 해결
이거 해결 방법
![image](https://github.com/user-attachments/assets/01ca45c2-fda9-44fb-83fc-39daa7e52092)
![image](https://github.com/user-attachments/assets/55c502f5-0bf7-44f8-bbb4-1dc1e0c8c3c3)
이것도 setup.py 사용하면 반자동으로 할 수 있습니다.
못찾겠으면 intl.cpl 열어주세요.
# 실행
domains.txt는 실행시 자동으로 다운로드 됩니다.
```sh
curl "https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt" -o domains.txt
``` ```
```sh 다음과 같은 명령어로 실행합니다.
# uv run run.py {domains.txt 시작 줄} {domains.txt 끝 줄} {--skh} {--no-download}
uv run run.py 1 100 --skh ```
uv run main.py
``` ```
# Prompt 확장 가이드 Environment에는 다음과 같은 값이 들어갑니다.
```
ANONYMIZED_TELEMETRY=false
## 1. 파일 생성 OPENAI_API_KEY=your_openai_api_key_here
OPENAI_BASE_URL=https://models.github.ai/inference # 선택
OPENAI_MODEL=openai/gpt-4o-mini # Github Models가 아닐시 gpt-4.1
`lib/llm/prompt` 폴더에서 fallback 폴더를 복사하여 # 선택
PROXY_HOST=127.0.0.1
원하는 프로바이더를 추가해줍니다. `ex) lib/llm/prompt/Google/` PROXY_PORT=8080
## 2. prompt.py 수정
Prompt에서 추가한 파일을 prompt.py에서 수정합니다.
만약 로그인 정보를 넣고 싶다면 Sensitive
`Log into example.com as user x_username with password x_password`
## 3. model.py
응답할 때 원하는 리턴 값을 `dict`로 받습니다.
## 4. \_\_init\_\_.py 수정
![image](https://github.com/user-attachments/assets/3aed2243-92d5-4359-8515-6d2f9bfa100b)
추가한 prompt에 따라 import합니다.
## 5. 사용 방법
```py
from lib.llm.prompt.fallback import prompt, model
``` ```
# 참고하면 좋을만한 것 `OPENAI_BASE_URL`은 GitHub Models가 아닐시 비워둡니다.
- [ ] 일부 웹사이트는 사용자의 언어에 따라 OAuth 옵션을 바꾸기도 합니다. `OPENAI_MODEL`은 GitHub Models가 아닐시 `openai/`를 제거합니다.
- [ ] https://docs.browser-use.com/customize/custom-functions
`PROXY_HOST``PROXY_PORT`는 만약 Caido를 사용 중일 시 환경에 맞게 설정 후 설정합니다.

View file

@ -1,71 +0,0 @@
# Generated by Cargo
# will have compiled files and executables
debug/
target/
# These are backup files generated by rustfmt
**/*.rs.bk
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
# Generated by cargo mutants
# Contains mutation testing data
**/mutants.out*/
# RustRover
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# General
.DS_Store
.AppleDouble
.LSOverride
Icon[]
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
# Windows thumbnail cache files
Thumbs.db
Thumbs.db:encryptable
ehthumbs.db
ehthumbs_vista.db
# Dump file
*.stackdump
# Folder config file
[Dd]esktop.ini
# Recycle Bin used on file shares
$RECYCLE.BIN/
# Windows Installer files
*.cab
*.msi
*.msix
*.msm
*.msp
# Windows shortcuts
*.lnk

1581
is-html-fast/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,8 +0,0 @@
[package]
name = "is-html-fast"
version = "0.1.0"
edition = "2024"
[dependencies]
rayon = "1.10.0"
reqwest = { version = "0.12.19", features = ["blocking", "json"]}

View file

@ -1,2 +0,0 @@
실제로 사용되진 않습니다.
일회용 코드입니다.

View file

@ -1,92 +0,0 @@
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::sync::atomic::{AtomicUsize, Ordering};
use rayon::prelude::*;
use reqwest::blocking::Client;
use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE, USER_AGENT, ACCEPT, ACCEPT_LANGUAGE, ACCEPT_ENCODING, CONNECTION, UPGRADE_INSECURE_REQUESTS};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let input_file = File::open("domains.txt")?;
let reader = BufReader::new(input_file);
let domains: Vec<String> = reader.lines().filter_map(Result::ok).collect();
let total_count = domains.len();
let counter = Arc::new(AtomicUsize::new(0));
let html_count = Arc::new(AtomicUsize::new(0));
let failed_count = Arc::new(AtomicUsize::new(0));
let non_html_count = Arc::new(AtomicUsize::new(0));
let output_file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open("domains-filtered.txt")?;
let output = Arc::new(Mutex::new(output_file));
// 브라우저 헤더 세팅
let mut headers = HeaderMap::new();
headers.insert(USER_AGENT, HeaderValue::from_static("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:124.0) Gecko/20100101 Firefox/124.0"));
headers.insert(ACCEPT, HeaderValue::from_static("text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8"));
headers.insert(ACCEPT_LANGUAGE, HeaderValue::from_static("ko,en-US;q=0.7,en;q=0.3"));
headers.insert(ACCEPT_ENCODING, HeaderValue::from_static("gzip, deflate, br"));
headers.insert(CONNECTION, HeaderValue::from_static("keep-alive"));
headers.insert(UPGRADE_INSECURE_REQUESTS, HeaderValue::from_static("1"));
let client = Arc::new(
Client::builder()
.timeout(Duration::from_secs(5))
.default_headers(headers)
.build()?,
);
domains.par_iter().for_each(|domain| {
let current = counter.fetch_add(1, Ordering::SeqCst) + 1;
let url = format!("https://{}", domain);
let response = client.get(&url).send();
match response {
Ok(resp) => {
if let Some(content_type) = resp.headers().get(CONTENT_TYPE) {
if let Ok(content_type_str) = content_type.to_str() {
if content_type_str.starts_with("text/html") {
if let Ok(mut file) = output.lock() {
writeln!(file, "{}", domain).ok();
}
html_count.fetch_add(1, Ordering::SeqCst);
println!("[{}/{}] ✅ HTML: {}", current, total_count, domain);
} else {
non_html_count.fetch_add(1, Ordering::SeqCst);
println!("[{}/{}] ❌ Not HTML: {} ({})", current, total_count, domain, content_type_str);
}
}
} else {
non_html_count.fetch_add(1, Ordering::SeqCst);
println!("[{}/{}] ❌ No Content-Type: {}", current, total_count, domain);
}
}
Err(_) => {
failed_count.fetch_add(1, Ordering::SeqCst);
println!("[{}/{}] ⚠️ Failed to connect: {}", current, total_count, domain);
}
}
});
// Final results
let html_final = html_count.load(Ordering::SeqCst);
let failed_final = failed_count.load(Ordering::SeqCst);
let non_html_final = non_html_count.load(Ordering::SeqCst);
println!("\n=== Final Results ===");
println!("📊 Total domains: {}", total_count);
println!("✅ HTML domains: {} ({:.1}%)", html_final, (html_final as f64 / total_count as f64) * 100.0);
println!("❌ Non-HTML domains: {} ({:.1}%)", non_html_final, (non_html_final as f64 / total_count as f64) * 100.0);
println!("⚠️ Failed connections: {} ({:.1}%)", failed_final, (failed_final as f64 / total_count as f64) * 100.0);
println!("💾 HTML domains saved to: domains-filtered.txt");
Ok(())
}

26
lib/browser_config.py Normal file
View file

@ -0,0 +1,26 @@
from browser_use.browser.context import BrowserContextConfig
from pathlib import Path
import os
from typing import Any
def browser_config_kwargs(lang: str = "en_US") -> dict[str, Any]:
browser_config_kwargs: dict[str, Any] = {
"keep_alive": True,
"browser_type": "chromium",
"headless": False,
"disable_security": True,
"extra_browser_args": [
"--disable-web-security",
"--disable-features=IsolateOrigins,site-per-process",
"--disable-popup-blocking",
f"--lang={lang}",
],
}
proxy_host = os.getenv("PROXY_HOST")
proxy_port = os.getenv("PROXY_PORT")
if proxy_host and proxy_port:
browser_config_kwargs["proxy"] = {"server": f"http://{proxy_host}:{proxy_port}"}
return browser_config_kwargs

152
main.py Normal file
View file

@ -0,0 +1,152 @@
import asyncio
import json
import os
from typing import List
from dotenv import load_dotenv
from pydantic import BaseModel
from langchain_google_genai import ChatGoogleGenerativeAI
from browser_use import Agent, Browser, BrowserConfig, Controller
from browser_use.browser.context import BrowserContext, BrowserContextConfig
from lib.browser_config import browser_config_kwargs
import csv
load_dotenv()
# Check environment variables
if os.getenv("GOOGLE_API_KEY") is None:
raise ValueError("OPENAI_API_KEY environment variable not set.")
if os.getenv("GOOGLE_MODEL") is None:
raise ValueError("OPENAI_MODEL environment variable not set.")
if os.getenv("GOOGLE_PLANNER_MODEL") is None:
raise ValueError("OPENAI_PLANNER_MODEL environment variable not set.")
# Configure browser
browser = Browser(
config=BrowserConfig(**browser_config_kwargs())
)
# Set browser context
context = BrowserContext(
browser=browser,
config=BrowserContextConfig(
wait_for_network_idle_page_load_time=3.0,
window_width=1600,
window_height=900,
locale='en-US',
highlight_elements=True,
viewport_expansion=500,
keep_alive=True
)
)
# Output model: each result is one OAuth entry with metadata
class OAuth(BaseModel):
provider: str
oauth_uri: str
class OAuthList(BaseModel):
oauth_providers: List[OAuth]
controller = Controller(output_model=OAuthList)
# Extended planner prompt
extend_planner_system_message = """
🎯 Your mission is to collect the real OAuth login URLs from the website.
1. First, go to the websites **login page**.
2. On the login page, look for OAuth login buttons. These usually say things like **"Continue with Google"**, **"Sign in with GitHub"**, etc.
3. **DO NOT collect or include "Passkey"** it is NOT an OAuth provider.
---
For EACH OAuth button you find:
- **Try opening it in a new tab**. If it redirects to an OAuth URL (e.g. `https://accounts.google.com/...`, `https://github.com/login/oauth/...`), copy that **exact final URL**.
- If it **doesnt open in a new tab**, **click the button** and wait for the redirect to happen.
- As soon as you see the redirected URL with **client_id**, **redirect_uri**, etc., copy that **entire URL without changing or hiding anything**.
- Then come back to the original tab (if needed) and continue with the next provider.
---
💡 **Do not guess** the OAuth URLs only collect them by actually interacting with the buttons.
🚫 **Do not redact or mask any part** of the URL, including `client_id`, `redirect_uri`, `state`, or any other parameters. Record them exactly as they appear.
Return a list of all OAuth providers and their **full raw redirect URLs** in this exact format:
```json
[
{
"provider": "Google",
"oauth_uri": "https://accounts.google.com/o/oauth2/v2/auth?client_id=...&redirect_uri=...&...",
},
{
"provider": "GitHub",
"oauth_uri": "https://github.com/login/oauth/authorize?client_id=...&redirect_uri=...",
}
]
```
"""
# Main async runner
async def main():
url = "https://git.imnya.ng"
agent = Agent(
browser_context=context,
browser=browser,
task=f"Go to {url}, navigate to the login page, and collect the OAuth provider buttons and their login URLs. Ignore Passkey.",
llm=ChatGoogleGenerativeAI(model=os.getenv("GOOGLE_MODEL")),
planner_llm=ChatGoogleGenerativeAI(model=os.getenv("GOOGLE_PLANNER_MODEL")),
controller=controller,
extend_planner_system_message=extend_planner_system_message,
)
# Run the agent
response = await agent.run()
final_result = response.final_result()
if final_result is None:
raise ValueError("final_result() returned None")
data = json.loads(final_result)
try:
oauth_entries: List[OAuth] = [OAuth(**entry) for entry in data["oauth_providers"]]
except Exception as e:
raise ValueError(f"Failed to parse result: {e}\nRaw result: {final_result}")
# Clear terminal
#print("\033c", end="")
print("-" * 20)
print(f"Raw result: {final_result}")
print(f"🔗 Scanned URL: {url}\n")
print("🔐 Detected OAuth Providers and URLs:")
for entry in oauth_entries:
if "<" in entry.oauth_uri or "..." in entry.oauth_uri:
print(f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n")
else:
print(f"- {entry.provider}: {entry.oauth_uri}")
# Save the result to CSV (append mode, so you can continue later)
# 이거 좀 이상한데 나중에 고쳐야 할듯 파일이 수정이 안됨
csv_file = "oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri"])
for entry in oauth_entries:
writer.writerow([url, entry.provider, entry.oauth_uri])
print(f"\n✅ OAuth providers saved to {csv_file}")
# Save the result to JSON
with open(f"oauth_providers_{url}.json", "w") as f:
json.dump(data, f, indent=2)
print(f"✅ OAuth providers saved to oauth_providers_{url}.json")
# Run it
asyncio.run(main())

View file

@ -5,10 +5,5 @@ description = "Add your description here"
readme = "README.md" readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"
dependencies = [ dependencies = [
"black>=25.1.0", "browser-use[memory]>=0.1.48",
"browser-use[memory]==0.5.3",
"chardet>=5.2.0",
"isort>=6.0.1",
"lmnr[all]>=0.6.10",
"patchright>=1.52.5",
] ]

1
run.ps1 Normal file
View file

@ -0,0 +1 @@
uv run ./main.py

165
run.py
View file

@ -1,165 +0,0 @@
import argparse
import os
import signal
import subprocess
import sys
from datetime import datetime
import requests
#!/usr/bin/env python3
# ── 설정 부분 ──
PYTHON_SCRIPT = "./src/main.py"
DOMAIN_FILE = "./data/domains.txt"
# ─────────────
def download_domains():
"""도메인 파일 다운로드"""
try:
print("도메인 파일 다운로드 중...")
response = requests.get(
"https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt"
)
response.raise_for_status()
# 디렉토리가 없으면 생성
os.makedirs(os.path.dirname("./data"), exist_ok=True)
with open(DOMAIN_FILE, "w", encoding="utf-8") as f:
f.write(response.text)
print("도메인 파일 다운로드 완료")
except requests.RequestException as e:
print(f"도메인 파일 다운로드 실패: {e}")
sys.exit(1)
def run_script(start_line, end_line, skh_option):
"""Python 스크립트 실행"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{current_time}] Processing lines {start_line} to {end_line}...")
process = None
signal_handled = False
def signal_handler(sig, frame):
nonlocal signal_handled
if signal_handled:
return
signal_handled = True
print("\n🛑 종료 신호를 받았습니다. 정리 작업을 진행합니다...")
if process:
try:
# 자식 프로세스에 SIGTERM 전송
print("📤 서브프로세스에 종료 신호를 전달합니다...")
process.terminate()
# 5초간 대기
process.wait(timeout=5)
print("✅ 서브프로세스가 정상적으로 종료되었습니다.")
except subprocess.TimeoutExpired:
print("⚠️ 서브프로세스가 응답하지 않아 강제 종료합니다...")
process.kill()
try:
process.wait(timeout=3)
print("✅ 서브프로세스가 강제 종료되었습니다.")
except subprocess.TimeoutExpired:
print("❌ 서브프로세스 강제 종료 실패")
except Exception as e:
print(f"❌ 프로세스 종료 중 오류: {e}")
print("✅ 런처 종료 완료.")
sys.exit(0)
# 원래 시그널 핸들러 저장
original_sigint = signal.signal(signal.SIGINT, signal_handler)
original_sigterm = signal.signal(signal.SIGTERM, signal_handler)
try:
command = [
"uv",
"run",
PYTHON_SCRIPT,
"-f",
DOMAIN_FILE,
"-s",
str(start_line),
"-e",
str(end_line),
]
if skh_option:
command.append("--skip-html-check")
process = subprocess.Popen(command)
returncode = process.wait()
if returncode != 0:
print(f"❌ Python 스크립트가 오류 코드 {returncode}로 종료되었습니다.")
sys.exit(returncode)
except KeyboardInterrupt:
signal_handler(signal.SIGINT, None)
except Exception as e:
print(f"❌ 스크립트 실행 중 오류: {e}")
if process:
try:
process.terminate()
process.wait(timeout=3)
except subprocess.TimeoutExpired:
process.kill()
sys.exit(1)
finally:
# 시그널 핸들러 복원
signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGTERM, original_sigterm)
def main():
parser = argparse.ArgumentParser(
description="도메인 처리 스크립트 실행기",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
사용 예시:
uv run run.py 10000 11000 # 10000~11000 라인 처리
uv run run.py 10000 11000 --skh # SKH 옵션 활성화
uv run run.py 10000 11000 --no-download # 다운로드 생략
""",
)
parser.add_argument("start_line", type=int, help="시작 라인 번호")
parser.add_argument("end_line", type=int, help="종료 라인 번호")
parser.add_argument("--skh", action="store_true", help="SKH 옵션 활성화")
parser.add_argument(
"--no-download", action="store_true", help="도메인 파일 다운로드 생략"
)
args = parser.parse_args()
# 라인 범위 검증
if args.start_line < 0 or args.end_line < 0:
print("라인 번호는 0 이상이어야 합니다.")
sys.exit(1)
if args.start_line > args.end_line:
print("시작 라인은 종료 라인보다 크거나 같아야 합니다.")
sys.exit(1)
# 도메인 파일 다운로드
if not args.no_download:
download_domains()
elif not os.path.exists(DOMAIN_FILE):
print(
f"도메인 파일({DOMAIN_FILE})이 존재하지 않습니다. --no-download 옵션을 제거하거나 파일을 준비해주세요."
)
sys.exit(1)
# 스크립트 실행
run_script(args.start_line, args.end_line, args.skh)
print("처리 완료.")
if __name__ == "__main__":
main()

263
setup.py
View file

@ -1,263 +0,0 @@
import os
import subprocess
import webbrowser
import asyncio
from browser_use import BrowserProfile, Agent
from browser_use.llm import ChatGoogle
from dotenv import load_dotenv
import threading
load_dotenv(verbose=True, override=True)
os.makedirs(os.path.dirname("./data"), exist_ok=True)
def create_file_from_example(target: str, example: str) -> bool:
if not os.path.exists(target):
if os.path.exists(example):
with (
open(example, "r", encoding="utf-8") as example_file,
open(target, "w", encoding="utf-8") as target_file,
):
target_file.write(example_file.read())
# os.startfile(target)
print(f"{target} 파일이 {example}에서 생성되었습니다.")
return True
else:
print(
f"⚠️ {example} 파일이 존재하지 않습니다. {target} 생성에 실패했습니다."
)
else:
print(f" {target} 파일이 이미 존재합니다.")
return False
def install_playwright_chrome():
print("\n🛠️ Playwright의 Chromium을 설치 중입니다...")
print("👉 이 작업은 시간이 걸릴 수 있습니다. 잠시 기다려주세요.")
try:
subprocess.run(["uv", "run", "playwright", "install", "chromium"], check=True)
print("✅ Playwright Chrome 설치 완료.")
except subprocess.CalledProcessError as e:
if "already" in e.stdout.decode():
print(" Chrome이 이미 설치되어 있습니다.")
else:
print(f"❌ Playwright 설치 실패: {e}")
print("\n")
def prompt_yes_no(message: str) -> bool:
print(message, end="")
return input().strip().lower() in ["y", "yes"]
def i_dont_like_windows():
# Windows인지 확인
if os.name != "nt":
return
else:
# run (Get-ItemProperty "HKLM:\SYSTEM\CurrentControlSet\Control\Nls\CodePage").ACP
try:
result = subprocess.run(
[
"powershell",
"-Command",
'(Get-ItemProperty "HKLM:\\SYSTEM\\CurrentControlSet\\Control\\Nls\\CodePage").ACP',
],
capture_output=True,
text=True,
check=True,
)
acp = result.stdout.strip()
if acp == "65001":
print("현재 Active Code Page가 UTF-8로 설정되어 있습니다.")
return
else:
print("현재 Active Code Page가 UTF-8로 설정되어 있지 않습니다.")
except subprocess.CalledProcessError as e:
print(f"코드 페이지 확인 실패: {e}")
print("=======================================================")
print("\n⚠️ Windows에서는 인코딩 문제가 발생합니다.")
print("👉 엔터를 누르면 자동으로 intl.cpl이 열립니다.")
print('👉 자세한 내용은 README.md에서 "윈도우 인코딩 해결"을 참조해주세요.\n')
print(
"⚠️ 경고 : 이 작업은 윈도우에서 킹갓 대한민국의 프로그램들의 한글이 정상적으로 표시되지 않을 수 있습니다."
)
# Pause
input("계속하려면 Enter 키를 누르세요...")
webbrowser.open("intl.cpl")
print("👉 intl.cpl가 열렸습니다.\n")
print("👉 관리자 옵션 -> 시스템 로켈 변경")
print("👀 Beta: 세계 언어 지원을 위해 Unicode UTF-8 사용")
print("👉 이 설정을 변경한 후, 시스템을 재시작하세요.\n")
print("⚠️ 이 작업은 시스템 언어 설정을 변경하므로 주의가 필요합니다.\n")
print("=======================================================")
input("계속하려면 Enter 키를 누르세요...")
async def setup_user_data():
print("\n📂 사용자 데이터 디렉토리를 설정하시겠습니까?")
print("⚠️ 사용자 데이터 디렉토리는 브라우저의 프로필 데이터를 저장하는 곳입니다.")
print("✅ 이 작업은 Google API Key를 설정하고 나서 진행해야만합니다.")
if prompt_yes_no("\033[1m\033[33m선택하시려면 y를 입력하세요 (y/n):\033[0m "):
if os.getenv("GOOGLE_API_KEY") is None:
print(
"⚠️ Google API Key가 설정되어 있지 않습니다. 먼저 Google API Key를 설정해주세요."
)
return
print("======================================================")
llm = ChatGoogle(
model="gemini-2.0-flash",
)
initial_actions = [
{"go_to_url": {"url": "https://www.google.com", "new_tab": False}},
{"wait": {"seconds": 2147483647}},
]
agent = Agent(
task="Just Wait",
llm=llm,
use_vision=False,
initial_actions=initial_actions,
browser_profile=BrowserProfile(
disable_security=True,
# stealth=True,
headless=False,
device_scale_factor=1,
window_size={"width": 1600, "height": 900},
viewport={"width": 1600, "height": 900},
user_data_dir="./data/user_data",
args=[
# "--disable-features=Translate,PasswordManagerDefaultEnabled",
],
ignore_default_args=[
"--disable-datasaver-prompt",
"--disable-component-extensions-with-background-pages",
"--disable-prompt-on-repost",
"--safeBrowse-disable-auto-update",
"--install-autogenerated-theme=0,0,0",
"--disable-speech-synthesis-api",
"--ash-no-nudges",
"--test-type=gpu",
"--noerrdialogs",
"--disable-external-intent-requests",
"--disable-breakpad",
"--disable-backgrounding-occluded-windows",
"--export-tagged-pdf",
"--disable-focus-on-load",
"--suppress-message-center-popups",
"--disable-renderer-backgrounding",
"--hide-crash-restore-bubble",
"--disable-back-forward-cache",
"--allow-legacy-extension-manifests",
# "--disable-field-trial-config", # 왜 이걸 끄면 웹사이트가 압축된 형태로 보이는 진 모르곘음
"--disable-popup-blocking",
"--disable-background-networking",
"--no-first-run",
"--disable-blink-features=AutomationControlled",
"--password-store=basic",
"--enable-network-information-downlink-max",
"--allow-pre-commit-input",
"--enable-features=NetworkService,NetworkServiceInProcess",
"--metrics-recording-only",
"--silent-debugger-extension-api",
"--disable-features=AcceptCHFrame,AutoExpandDetailsElement,AvoidUnnecessaryBeforeUnloadCheckSync,CertificateTransparencyComponentUpdater,DestroyProfileOnBrowserClose,DialMediaRouteProvider,ExtensionManifestV2Disabled,GlobalMediaControls,HttpsUpgrades,ImprovedCookieControls,LazyFrameLoading,LensOverlay,MediaRouter,PaintHolding,ThirdPartyStoragePartitioning,Translate,AutomationControlled,BackForwardCache,OptimizationHints,ProcessPerSiteUpToMainFrameThreshold,InterestFeedContentSuggestions,CalculateNativeWinOcclusion,HeavyAdPrivacyMitigations,PrivacySandboxSettings4,AutofillServerCommunication,CrashReporting,OverscrollHistoryNavigation,InfiniteSessionRestore,ExtensionDisableUnsupportedDeveloper",
"--disable-ipc-flooding-protection",
"--disable-hang-monitor",
"--disable-dev-shm-usage",
"--disable-client-side-phishing-detection",
"--log-level=2",
"--generate-pdf-document-outline",
"--disable-speech-api",
"--disable-search-engine-choice-screen",
"--no-service-autorun",
"--no-pings",
"--disable-component-update",
'--simulate-outdated-no-au="Tue, 31 Dec 2099 23:59:59 GMT"',
"--disable-background-timer-throttling",
"--use-mock-keychain",
"--disable-features=IsolateOrigins,site-per-process",
# 아래는 기존 예시에 있던 인자들입니다. 필요에 따라 유지하거나 제거하세요.
"--enable-automation",
"--disable-extensions",
"--hide-scrollbars",
],
),
)
print("======================================================\n")
print(
"👉 브라우저가 열립니다. 필요한 로그인을 완료한 후 엔터키를 눌러 다음 단계로 진행하세요."
)
input("계속하려면 Enter 키를 누르세요...\n")
print("======================================================")
# 브라우저를 백그라운드에서 시작
def run_agent():
asyncio.run(agent.run())
agent_thread = threading.Thread(target=run_agent)
agent_thread.daemon = True
agent_thread.start()
# 사용자가 'n'을 입력할 때까지 대기
while True:
user_input = input("").strip().lower()
if user_input == "":
agent.stop()
break
print("======================================================")
print("✅ 설정이 완료되었습니다.")
else:
print("🚫 설정이 취소되었습니다.")
print("======================================================")
print(
"⚠️ 이후에 USER_DATA_DIR을 설정하려면, .env 파일을 참고하여 USER_DATA_DIR을 설정하세요.\n"
)
def setup_sensitive():
print("\n🔐 Sensitive Data을 설정하시겠습니까?")
print("👉 이미 세션을 설정했다면, 이 작업은 **선택사항**입니다.")
print(
"⚠️ 민감 정보 파일은 오류를 유발하거나 문제가 될 수 있으므로 가급적 세션 사용을 권장합니다."
)
if prompt_yes_no("\033[1m\033[33m선택하시려면 y를 입력하세요 (y/n):\033[0m "):
print("======================================================")
print("👀 .sensitive.json 파일을 생성합니다.")
print("💾 Browser Use의 문서를 참조하여 수정을 수정해주세요.")
print("https://docs.browser-use.com/customize/sensitive-data")
create_file_from_example(".sensitive.json", ".sensitive.example.json")
print("======================================================")
print("✅ .sensitive.json 파일이 생성되었습니다.")
else:
print("🚫 .sensitive.json 생성이 취소되었습니다.")
print("======================================================")
print(
"⚠️ 이후에 민감 정보 파일을 설정하려면, .sensitive.example.json 파일을 참고하여 .sensitive.json 파일을 생성하세요.\n"
)
if __name__ == "__main__":
# 1. .env 생성
create_file_from_example(".env", ".env.example")
print("=====================================================")
# 2. Playwright용 Chrome 설치
install_playwright_chrome()
print("=====================================================")
# 3. Windows 인코딩 문제 해결
# i_dont_like_windows()
# print("=====================================================")
# 4. Setup User Data
asyncio.run(setup_user_data())
print("=====================================================")
# 5. .sensitive.json 생성
# setup_sensitive()
print("=====================================================")
print("🎉 초기 설정이 완료되었습니다! 이제 스크립트를 실행할 준비가 되었습니다.")

View file

@ -1,6 +0,0 @@
from lib.browser_use.agents import *
from lib.browser_use.func import *
from lib.browser_use.init_profile import *
from lib.browser_use.model import *
from lib.browser_use.scanner import *
from lib.browser_use.sensitive_data import *

View file

@ -1,417 +0,0 @@
import asyncio
import json
import os
import shutil
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from browser_use import Agent, BrowserSession, Controller
from patchright.async_api import async_playwright as async_patchright
from lib.browser_use.init_profile import GetProfile
from lib.browser_use.sensitive_data import GetSensitiveData
from lib.llm import CreateChatGoogle, get_prompt
from lib.utils import config, logger
# Exponential backoff settings
INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", "60")) # seconds
MAX_BACKOFF = int(os.getenv("MAX_BACKOFF", "600")) # seconds
@dataclass
class RetryTask:
"""재시도할 작업을 나타내는 클래스"""
task_type: str # "oauth_list" or "oauth_login"
url: str
oauth_provider: Optional[str] = None
retry_count: int = 0
next_retry_time: Optional[datetime] = None
max_retries: int = 5
# 전역 재시도 큐
retry_queue: list[RetryTask] = []
retry_queue_lock = asyncio.Lock()
async def add_to_retry_queue(task: RetryTask):
"""작업을 재시도 큐에 추가"""
async with retry_queue_lock:
# 중복 작업 확인
existing_task = None
for existing in retry_queue:
if (
existing.task_type == task.task_type
and existing.url == task.url
and existing.oauth_provider == task.oauth_provider
):
existing_task = existing
break
if existing_task:
# 기존 작업이 있으면 재시도 횟수 업데이트
existing_task.retry_count = task.retry_count
existing_task.next_retry_time = task.next_retry_time
print(
f"📝 기존 작업 업데이트: {task.task_type} - {task.url} (재시도: {task.retry_count})"
)
else:
# 새 작업 추가
retry_queue.append(task)
print(
f" 재시도 큐에 작업 추가: {task.task_type} - {task.url} (재시도: {task.retry_count})"
)
async def process_retry_queue():
"""재시도 큐 처리"""
async with retry_queue_lock:
now = datetime.now()
ready_tasks = []
for task in retry_queue[:]: # 복사본에서 반복
if task.next_retry_time and task.next_retry_time <= now:
ready_tasks.append(task)
retry_queue.remove(task)
if ready_tasks:
print(f"🔄 {len(ready_tasks)}개의 재시도 작업 처리 중...")
for task in ready_tasks:
try:
if task.task_type == "oauth_list":
result = await _extract_oauth_list_internal(task.url)
if result:
print(f"✅ 재시도 성공: OAuth 리스트 추출 - {task.url}")
else:
await _handle_retry_failure(task)
elif task.task_type == "oauth_login":
result = await _test_oauth_login_internal(
task.url, task.oauth_provider
)
if result:
print(
f"✅ 재시도 성공: {task.oauth_provider} 로그인 - {task.url}"
)
else:
await _handle_retry_failure(task)
except Exception as e:
print(f"❌ 재시도 중 에러: {e}")
await _handle_retry_failure(task)
async def _handle_retry_failure(task: RetryTask):
"""재시도 실패 처리"""
if task.retry_count < task.max_retries:
task.retry_count += 1
wait_time = min(INITIAL_BACKOFF * (2**task.retry_count), MAX_BACKOFF)
task.next_retry_time = datetime.now() + timedelta(seconds=wait_time)
await add_to_retry_queue(task)
print(f"{wait_time}초 후 재시도 예정: {task.task_type} - {task.url}")
else:
print(f"❌ 최대 재시도 횟수 초과: {task.task_type} - {task.url}")
logger(f"❌ 최대 재시도 횟수 초과: {task.task_type} - {task.url}")
async def get_retry_queue_status():
"""재시도 큐 상태 조회"""
async with retry_queue_lock:
return {
"queue_length": len(retry_queue),
"tasks": [
{
"task_type": task.task_type,
"url": task.url,
"oauth_provider": task.oauth_provider,
"retry_count": task.retry_count,
"next_retry_time": (
task.next_retry_time.isoformat()
if task.next_retry_time
else None
),
}
for task in retry_queue
],
}
async def _run_agent_with_retry(agent_config):
"""Agent 실행을 위한 내부 헬퍼 함수 (재시도 로직 포함)"""
agent = None
session = None
try_cnt = 0
url = agent_config["url"]
headless = os.getenv("HEADLESS", "False").lower() == "true"
while try_cnt < 3:
try:
Profile = await GetProfile(headless=headless)
session = BrowserSession(
playwright=(await async_patchright().start()),
browser_profile=Profile[0],
)
agent = Agent(browser_session=session, **agent_config["agent_params"])
response = await agent.run()
if any(
keyword in str(response)
for keyword in [
"429",
"resource_exhausted",
"resourceexhausted",
"quota",
"rate limit",
"too many requests",
"exceeded",
"limit reached",
]
):
print(f"⚠️ API 쿼터 에러 발생, 재시도 큐에 추가: {url}")
task = RetryTask(
task_type=agent_config.get("task_type", "unknown"),
url=url,
retry_count=try_cnt + 1,
next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF),
)
await add_to_retry_queue(task)
return None
# remove profile
print(Profile)
if Profile[1] and isinstance(Profile[1], str):
print(1)
shutil.rmtree(Profile[1], ignore_errors=True)
print(f"🗑️ 임시 프로필 디렉토리 삭제 완료: {Profile[1]}")
return response
except Exception as e:
# 일반 에러 처리
try_cnt += 1
if try_cnt >= 3:
error_msg = f"최대 재시도 횟수 초과."
logger(
f"{url} - {agent_config['log_context']} 실패: {error_msg}: {e}"
)
print(f"{url} - {agent_config['log_context']} 실패: {error_msg}")
return None
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
await asyncio.sleep(30)
continue
return None
async def _extract_oauth_list_internal(url: str):
"""OAuth 리스트 추출 내부 함수 (재시도 큐에서 사용)"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🔎 OAuth 리스트 추출 시작: {target_url}")
prompt, model = get_prompt("auth")
agent_config = {
"url": target_url,
"log_context": "OAuth 리스트 추출",
"agent_params": {
"initial_actions": [{"go_to_url": {"url": target_url, 'new_tab': False}}],
"sensitive_data": GetSensitiveData(),
"task": (
"Navigate to the login page and identify all OAuth provider buttons (excluding Passkey). "
"DO NOT click any OAuth buttons or attempt to login. "
"Just find and list all available OAuth providers with their button texts or provider names. "
"Return a list of OAuth providers found on the login page."
),
"llm": CreateChatGoogle(config.GOOGLE_MODEL),
"planner_llm": (
CreateChatGoogle(config.GOOGLE_PLANNER_MODEL)
if config.GOOGLE_PLANNER_MODEL
and os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LIST")
else None
),
"controller": Controller(
output_model=model if not isinstance(model, str) else None,
exclude_actions=["search_google", "unknown_action", "unkown"],
),
"extend_system_message": prompt,
"extend_planner_system_message": prompt,
},
}
response = await _run_agent_with_retry(agent_config)
if not response:
return []
final_result = response.final_result()
if not final_result:
print("OAuth 리스트 추출 결과가 없습니다.")
return []
try:
data = json.loads(final_result)
print(final_result)
oauth_providers = data.get("sso_list", [])
if not oauth_providers:
print("❌ OAuth 제공자가 없습니다.")
logger(f"{url} - OAuth 제공자 없음: {final_result}")
return []
print(f"✅ OAuth 제공자 추출 완료: {oauth_providers}")
return oauth_providers
except (json.JSONDecodeError, KeyError) as e:
print(f"❌ 결과 파싱 실패: {e}")
logger(f"{url} 결과 파싱 실패: {final_result}")
return []
async def extract_oauth_list(url: str):
"""첫 번째 Agent: 로그인 페이지를 찾고 OAuth 리스트만 추출"""
try:
return await _extract_oauth_list_internal(url)
except Exception as e:
error_str = str(e).lower()
if any(
keyword in error_str
for keyword in [
"429",
"resource_exhausted",
"resourceexhausted",
"quota",
"rate limit",
"too many requests",
"exceeded",
"limit reached",
]
):
print(f"⚠️ API 쿼터 에러 발생, 재시도 큐에 추가: {url}")
task = RetryTask(
task_type="oauth_list",
url=url,
retry_count=1,
next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF),
)
await add_to_retry_queue(task)
return []
else:
raise e
async def _test_oauth_login_internal(url: str, oauth_provider: str):
"""OAuth 로그인 테스트 내부 함수 (재시도 큐에서 사용)"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🔐 {oauth_provider} 로그인 시작: {target_url}")
prompt, model = get_prompt(oauth_provider)
agent_config = {
"url": target_url,
"log_context": f"{oauth_provider} 로그인",
"agent_params": {
"initial_actions": [{"go_to_url": {"url": target_url, 'new_tab': False}}],
"sensitive_data": GetSensitiveData(),
"task": (
f"Navigate to the login page, find and click the {oauth_provider} OAuth button, "
f"then follow the complete OAuth login flow as far as possible with a real user account. "
f"Capture the final redirect URL after login completion. "
f"If login fails or encounters errors, report the issue. "
f"Focus only on {oauth_provider} - ignore other OAuth providers."
),
"llm": CreateChatGoogle(config.GOOGLE_MODEL),
"planner_llm": (
CreateChatGoogle(config.GOOGLE_PLANNER_MODEL)
if config.GOOGLE_PLANNER_MODEL
and os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN")
else None
),
"controller": Controller(
output_model=model if not isinstance(model, str) else None,
exclude_actions=["search_google", "unknown_action", "unkown"],
),
"extend_system_message": prompt,
"extend_planner_system_message": prompt,
},
}
response = await _run_agent_with_retry(agent_config)
if response and response.final_result():
final_result = response.final_result()
try:
import json
result_data = json.loads(final_result)
status = result_data.get("status", "")
if status == "success":
print(f"{oauth_provider} 로그인 완료")
logger(f"{url} - {oauth_provider} 로그인 결과: {final_result}")
return True
else:
print(f"{oauth_provider} 로그인 실패: {status}")
logger(f"{url} - {oauth_provider} 로그인 실패: {final_result}")
return False
except (json.JSONDecodeError, KeyError):
print(f"{oauth_provider} 결과 파싱 실패")
return False
print(f"{oauth_provider} 로그인 실패")
return False
async def test_oauth_login(url: str, oauth_provider: str):
"""두 번째 Agent: 특정 OAuth 제공자로 로그인 시도"""
try:
return await _test_oauth_login_internal(url, oauth_provider)
except Exception as e:
error_str = str(e).lower()
if any(
keyword in error_str
for keyword in [
"429",
"resource_exhausted",
"resourceexhausted",
"quota",
"rate limit",
"too many requests",
"exceeded",
"limit reached",
]
):
print(f"⚠️ API 쿼터 에러 발생, 재시도 큐에 추가: {oauth_provider} - {url}")
task = RetryTask(
task_type="oauth_login",
url=url,
oauth_provider=oauth_provider,
retry_count=1,
next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF),
)
await add_to_retry_queue(task)
return False
else:
raise e
async def start_retry_queue_processor():
"""재시도 큐 처리기를 백그라운드에서 시작"""
async def queue_processor():
while True:
try:
await process_retry_queue()
await asyncio.sleep(30) # 30초마다 큐 확인
except Exception as e:
print(f"❌ 재시도 큐 처리 중 에러: {e}")
await asyncio.sleep(60) # 에러 발생 시 1분 대기
# 백그라운드 태스크로 실행
asyncio.create_task(queue_processor())
print("🔄 재시도 큐 처리기 시작됨")
# 모듈 로딩 시 자동으로 백그라운드 처리기 시작
# (실제 애플리케이션에서는 main 함수에서 호출하는 것이 좋음)
def init_retry_system():
"""재시도 시스템 초기화"""
print("🔧 재시도 시스템 초기화 중...")
# 이 함수는 메인 애플리케이션에서 호출해야 함

View file

@ -1,110 +0,0 @@
"""
브라우저 리소스 정리를 위한 모듈
"""
import os
import shutil
import asyncio
from pathlib import Path
async def cleanup_browser_resources(agent=None, session=None, user_data_dir=None):
"""브라우저 관련 리소스를 정리하는 함수"""
print("🔄 브라우저 리소스 정리를 시작합니다...")
# 에이전트 리소스 정리
if agent:
try:
print("<EFBFBD> 에이전트 리소스 정리 중...")
# 브라우저 종료 대기 시간 설정
await asyncio.wait_for(agent.close(), timeout=10.0)
print("✅ 에이전트 리소스 정리 완료.")
except asyncio.TimeoutError:
print("⚠️ 에이전트 종료 시간 초과. 강제 종료합니다.")
except Exception as e:
print(f"⚠️ 에이전트 리소스 정리 실패: {e}")
# 세션 리소스 정리
if session:
try:
print("🔄 세션 리소스 정리 중...")
await asyncio.wait_for(session.close(), timeout=5.0)
print("✅ 세션 리소스 정리 완료.")
except asyncio.TimeoutError:
print("⚠️ 세션 종료 시간 초과.")
except Exception as e:
print(f"⚠️ 세션 리소스 정리 실패: {e}")
# 임시 스토리지 상태 파일 삭제
storage_state_temp_path = Path("./data/storage_state_temp.json").resolve()
if storage_state_temp_path.exists():
try:
print(f"<EFBFBD> 임시 스토리지 상태 파일 삭제 중: {storage_state_temp_path}")
storage_state_temp_path.unlink()
print("✅ 임시 스토리지 상태 파일 삭제 완료.")
except Exception as e:
print(f"⚠️ 임시 스토리지 상태 파일 삭제 실패: {e}")
# 임시 사용자 데이터 디렉토리 정리
if user_data_dir and os.path.exists(user_data_dir):
try:
print(f"🗑️ 임시 사용자 데이터 디렉토리 삭제 중: {user_data_dir}")
await asyncio.sleep(0.5) # 브라우저가 완전히 종료될 시간 제공
shutil.rmtree(user_data_dir)
print("✅ 임시 사용자 데이터 디렉토리 삭제 완료.")
except Exception as e:
print(f"⚠️ 임시 사용자 데이터 디렉토리 삭제 실패: {e}")
# userdata.dump 파일에서 기록된 디렉토리 정리
log_file = "./data/userdata.dump"
if os.path.exists(log_file):
try:
with open(log_file, "r") as f:
tmp_user_data_dir = f.read().strip()
if tmp_user_data_dir and os.path.exists(tmp_user_data_dir):
print(f"🗑️ 기록된 임시 사용자 데이터 디렉토리 삭제 중: {tmp_user_data_dir}")
await asyncio.sleep(0.5) # 브라우저가 완전히 종료될 시간 제공
shutil.rmtree(tmp_user_data_dir)
print("✅ 기록된 임시 사용자 데이터 디렉토리 삭제 완료.")
os.remove(log_file)
print("✅ userdata.dump 파일 삭제 완료.")
except Exception as e:
print(f"⚠️ userdata.dump 관련 정리 실패: {e}")
print("✅ 브라우저 리소스 정리가 완료되었습니다.")
def cleanup_all_running_tasks():
"""실행 중인 모든 asyncio 태스크를 정리"""
try:
loop = asyncio.get_running_loop()
tasks = [task for task in asyncio.all_tasks(loop) if not task.done()]
if tasks:
print(f"🔄 {len(tasks)}개의 실행 중인 태스크를 정리합니다...")
for task in tasks:
task.cancel()
# 태스크들이 정리될 때까지 잠시 대기
async def wait_for_tasks():
await asyncio.gather(*tasks, return_exceptions=True)
asyncio.create_task(wait_for_tasks())
print("✅ 모든 태스크 정리 완료.")
except RuntimeError:
# 이벤트 루프가 실행 중이 아닌 경우
pass
except Exception as e:
print(f"⚠️ 태스크 정리 중 오류: {e}")
async def emergency_cleanup():
"""긴급 종료 시 최소한의 리소스 정리"""
print("🚨 긴급 리소스 정리 실행 중...")
# 모든 태스크 취소
cleanup_all_running_tasks()
# 기본 리소스 정리
await cleanup_browser_resources()
print("✅ 긴급 리소스 정리 완료.")

View file

@ -1,54 +0,0 @@
import json
import os
from pathlib import Path
from browser_use import BrowserProfile
from dotenv import load_dotenv
# Load environment variables
load_dotenv(override=True)
def setup_proxy():
"""Configure proxy settings from environment variables."""
proxy_host = os.getenv("PROXY_HOST")
proxy_port = os.getenv("PROXY_PORT")
if proxy_host and proxy_port:
proxy_url = f"http://{proxy_host}:{proxy_port}"
print(f"🔗 Using proxy: {proxy_host}:{proxy_port}")
return proxy_url
else:
print("🔗 No proxy configured, using direct connection.")
return None
def get_browser_args():
"""Get browser arguments for enhanced compatibility and security."""
return [
# Security and isolation
"--disable-web-security",
"--disable-site-isolation-trials",
"--disable-features=IsolateOrigins,site-per-process",
"--ignore-certificate-errors",
"--ignore-ssl-errors",
"--allow-running-insecure-content",
# Performance and rendering
"--disable-features=VizDisplayCompositor",
"--disable-dev-shm-usage",
# Popup and automation
"--disable-popup-blocking",
"--disable-blink-features=AutomationControlled",
# Browser behavior
"--no-first-run",
"--no-service-autorun",
"--no-default-browser-check",
"--password-store=basic",
"--use-mock-keychain",
# Extensions
"--disable-extensions-file-access-check",
"--disable-extensions-http-throttling",
"--disable-component-extensions-with-background-pages",
# Language
f"--lang={os.getenv('LANG', 'en_US')}",
]

View file

@ -1,134 +0,0 @@
import os
import shutil
import tempfile
from lib.browser_use.func import *
from lib.utils.config import USER_DATA_DIR
# Initialize configuration
proxy_url = setup_proxy()
async def GetProfile(headless=False):
"""브라우저 프로필을 생성하고 임시 사용자 데이터 디렉토리를 관리합니다."""
user_data_dir = None
tmp_user_data_dir = None
if USER_DATA_DIR and os.path.isdir(USER_DATA_DIR):
try:
tmp_user_data_dir = tempfile.mkdtemp(prefix="browser_use_")
print(f"🔧 기본 사용자 데이터 디렉토리: {USER_DATA_DIR}")
print(f"🔧 임시 사용자 데이터 디렉토리: {tmp_user_data_dir}")
log_file = os.path.join("./data", "userdata.dump")
if not os.path.exists("./data"):
os.makedirs("./data")
# 기존 로그 파일이 있다면 해당 디렉토리 정리
if os.path.exists(log_file):
try:
with open(log_file, "r") as f:
old_tmp_dir = f.read().strip()
if old_tmp_dir and os.path.exists(old_tmp_dir):
shutil.rmtree(old_tmp_dir)
print(f"🗑️ 이전 임시 디렉토리 정리: {old_tmp_dir}")
except Exception as e:
print(f"⚠️ 이전 임시 디렉토리 정리 실패: {e}")
os.remove(log_file)
# 새 임시 디렉토리 경로 로깅
with open(log_file, "w") as f:
f.write(tmp_user_data_dir)
# 사용자 데이터 디렉토리 복사
if os.path.exists(tmp_user_data_dir):
shutil.rmtree(tmp_user_data_dir)
shutil.copytree(
USER_DATA_DIR,
tmp_user_data_dir,
dirs_exist_ok=False,
ignore_dangling_symlinks=True,
)
user_data_dir = tmp_user_data_dir
print(f"✅ 사용자 데이터 디렉토리 복사 완료: {user_data_dir}")
except Exception as e:
print(f"❌ 사용자 데이터 디렉토리 복사 실패: {e}")
# 실패 시 임시 디렉토리 정리
if tmp_user_data_dir and os.path.exists(tmp_user_data_dir):
try:
shutil.rmtree(tmp_user_data_dir)
except Exception:
pass
tmp_user_data_dir = None
user_data_dir = None
print(proxy_url)
profile = BrowserProfile(
# Security settings
# disable_security=True,
# Display settings
headless=headless,
# Data persistence
user_data_dir=user_data_dir,
# Network settings
proxy={"server": proxy_url} if proxy_url else None,
# Additional arguments
args=[
"--proxy-server=" + proxy_url if proxy_url else "",
# "--disable-features=Translate,PasswordManagerDefaultEnabled",
],
ignore_default_args=[
# "--disable-datasaver-prompt",
# "--disable-component-extensions-with-background-pages",
# "--disable-prompt-on-repost",
# "--safeBrowse-disable-auto-update",
# "--install-autogenerated-theme=0,0,0",
# "--disable-speech-synthesis-api",
# "--ash-no-nudges",
# "--test-type=gpu",
# "--noerrdialogs",
# "--disable-external-intent-requests",
# "--disable-breakpad",
# "--disable-backgrounding-occluded-windows",
# "--export-tagged-pdf",
# "--disable-focus-on-load",
# "--suppress-message-center-popups",
# "--disable-renderer-backgrounding",
# "--hide-crash-restore-bubble",
# "--disable-back-forward-cache",
# "--allow-legacy-extension-manifests",
# # "--disable-field-trial-config", # 왜 이걸 끄면 웹사이트가 압축된 형태로 보이는 진 모르곘음
# "--disable-popup-blocking",
# "--disable-background-networking",
# "--no-first-run",
# "--disable-blink-features=AutomationControlled",
# "--password-store=basic",
# "--enable-network-information-downlink-max",
# "--allow-pre-commit-input",
# "--enable-features=NetworkService,NetworkServiceInProcess",
# "--metrics-recording-only",
# "--silent-debugger-extension-api",
# "--disable-features=AcceptCHFrame,AutoExpandDetailsElement,AvoidUnnecessaryBeforeUnloadCheckSync,CertificateTransparencyComponentUpdater,DestroyProfileOnBrowserClose,DialMediaRouteProvider,ExtensionManifestV2Disabled,GlobalMediaControls,HttpsUpgrades,ImprovedCookieControls,LazyFrameLoading,LensOverlay,MediaRouter,PaintHolding,ThirdPartyStoragePartitioning,Translate,AutomationControlled,BackForwardCache,OptimizationHints,ProcessPerSiteUpToMainFrameThreshold,InterestFeedContentSuggestions,CalculateNativeWinOcclusion,HeavyAdPrivacyMitigations,PrivacySandboxSettings4,AutofillServerCommunication,CrashReporting,OverscrollHistoryNavigation,InfiniteSessionRestore,ExtensionDisableUnsupportedDeveloper",
# "--disable-ipc-flooding-protection",
# "--disable-hang-monitor",
# "--disable-dev-shm-usage",
# "--disable-client-side-phishing-detection",
# "--log-level=2",
# "--generate-pdf-document-outline",
# "--disable-speech-api",
# "--disable-search-engine-choice-screen",
# "--no-service-autorun",
# "--no-pings",
# "--disable-component-update",
# '--simulate-outdated-no-au="Tue, 31 Dec 2099 23:59:59 GMT"',
# "--disable-background-timer-throttling",
# "--use-mock-keychain",
# "--disable-features=IsolateOrigins,site-per-process",
# 아래는 기존 예시에 있던 인자들입니다. 필요에 따라 유지하거나 제거하세요.
"--enable-automation",
"--disable-extensions",
"--hide-scrollbars",
],
)
return [profile, tmp_user_data_dir] if tmp_user_data_dir else [profile]

View file

@ -1,17 +0,0 @@
from typing import List
from pydantic import BaseModel
# 출력 모델
class OAuth(BaseModel):
provider: str
oauth_uri: str = "" # OAuth 리스트 추출 단계에서는 URI가 없을 수 있음
class OAuthList(BaseModel):
oauth_providers: List[str] # 이제 문자열 배열로 변경
# 기존 모델 유지 (backward compatibility)
BaseModel = OAuthList

View file

@ -1,191 +0,0 @@
import asyncio
import csv
import os
from lib.browser_use.agents import (
extract_oauth_list,
get_retry_queue_status,
start_retry_queue_processor,
test_oauth_login,
)
from lib.browser_use.cleanup import cleanup_browser_resources
from lib.utils import is_html_url, notify_backend, read_lines_between
from lib.utils.progress import (
current_progress,
is_shutdown_requested,
load_progress,
progress_file,
save_progress,
)
async def scan_one_url(url: str, skip_html_check: bool = False):
"""URL 스캔 통합 함수: OAuth 리스트 추출 → 개별 OAuth 로그인 시도"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🚀 스캔 시작: {target_url}")
# Backend에 스캔 시작을 알림
notify_backend(target_url)
# 1) URL이 HTML 페이지인지 확인
if not is_html_url(target_url) and not skip_html_check:
print(f"{target_url} 은(는) HTML이 아닙니다. 스킵합니다.")
return
# 1단계: OAuth 리스트 추출
oauth_entries = await extract_oauth_list(target_url)
if not oauth_entries:
print(f"{target_url}에서 OAuth 제공자를 찾을 수 없습니다.")
return
print("-" * 50)
print(f"🔗 스캔 URL: {url}")
print(f"🔐 발견된 OAuth 제공자들: {len(oauth_entries)}")
for entry in oauth_entries:
print(f" - {entry}")
print("-" * 50)
# CSV에 OAuth 리스트 저장
csv_file = "./data/oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri", "login_tested"])
for entry in oauth_entries:
writer.writerow([url, entry, "", "pending"])
# 2단계: 각 OAuth 제공자별로 개별 로그인 시도
for i, oauth_entry in enumerate(oauth_entries):
print(f"\n🔄 OAuth 로그인 테스트 {i+1}/{len(oauth_entries)}: {oauth_entry}")
# OAuth 간 대기 시간
if i > 0:
print("⏳ OAuth 테스트 간 대기 중 (30초)...")
await asyncio.sleep(30)
# 개별 OAuth 로그인 시도
success = await test_oauth_login(url, oauth_entry)
# 결과를 CSV에 업데이트 (간단하게 로그만 남김)
status = "success" if success else "failed"
print(f"📝 {oauth_entry} 로그인 결과: {status}")
async def main_loop(
filepath: str, start_line: int, end_line: int, skip_html_check: bool = False
):
"""지정된 URL 목록에 대해 스캔을 실행하는 메인 루프"""
try:
# 재시도 큐 처리기 시작
await start_retry_queue_processor()
target_list = read_lines_between(
filepath=filepath, start_line=start_line, end_line=end_line
)
# 전체 목록 길이를 저장 (재개 시에도 유지되어야 함)
total_count = len(target_list)
current_progress["total"] = total_count
current_progress["start_line"] = start_line
current_progress["current_index"] = 0
prev_progress = load_progress()
if prev_progress and prev_progress.get("start_line") == start_line:
print("📋 이전 진행 상황을 발견했습니다:")
print(
f" - 이전 완료: {prev_progress['current_index']}/{prev_progress['total']}"
)
print(f" - 마지막 처리: {prev_progress.get('current_url', 'N/A')}")
resume = input("이어서 진행하시겠습니까? (y/n): ").lower().strip()
if resume == "y":
start_index = prev_progress.get("current_index", 0)
current_progress["current_index"] = start_index
# 전체 개수는 원래 목록 길이로 유지
current_progress["total"] = total_count
target_list = target_list[start_index:]
print(f"{start_index}번째부터 재개합니다.")
for i, url in enumerate(target_list):
# 종료 요청 체크
if is_shutdown_requested():
print("🛑 종료 요청으로 인해 스캔을 중단합니다.")
break
# current_index는 전체 목록에서의 현재 위치를 나타냄
current_url_index = current_progress["current_index"]
current_progress["current_url"] = url
print(
f"\n🔄 Processing {current_url_index + 1}/{current_progress['total']}: {url}"
)
print(
f"📍 {os.path.basename(filepath)}{start_line + current_url_index}번째 줄"
)
# 재시도 큐 상태 확인 및 출력
retry_status = await get_retry_queue_status()
if retry_status["queue_length"] > 0:
print(f"⏳ 재시도 큐에 {retry_status['queue_length']}개 작업 대기 중")
if i > 0:
print("⏳ API 쿼터 보호를 위해 30초 대기 중...")
# 대기 중에도 종료 요청 체크
for _ in range(30):
if is_shutdown_requested():
print("🛑 대기 중 종료 요청으로 스캔을 중단합니다.")
return
await asyncio.sleep(1)
try:
await scan_one_url(url, skip_html_check=skip_html_check)
except Exception as e:
print(f"{url} 스캔 중 오류 발생: {e}")
continue
# 스캔 완료 후 재시도 큐 상태 확인
retry_status_after = await get_retry_queue_status()
if retry_status_after["queue_length"] > 0:
print(
f"📊 스캔 완료 후 재시도 큐 상태: {retry_status_after['queue_length']}개 작업 대기 중"
)
# 다음 URL로 진행
current_progress["current_index"] = current_url_index + 1
save_progress()
# 모든 URL 처리 완료 후 재시도 큐가 빌 때까지 대기
if not is_shutdown_requested():
print("\n🔄 모든 URL 처리 완료. 재시도 큐 처리 대기 중...")
while True:
if is_shutdown_requested():
print("🛑 재시도 큐 대기 중 종료 요청으로 중단합니다.")
break
retry_status = await get_retry_queue_status()
if retry_status["queue_length"] == 0:
break
print(
f"⏳ 재시도 큐에 {retry_status['queue_length']}개 작업 남음. 30초 후 다시 확인..."
)
# 대기 중에도 종료 요청 체크
for _ in range(30):
if is_shutdown_requested():
print("🛑 재시도 큐 대기 중 종료 요청으로 중단합니다.")
break
await asyncio.sleep(1)
if not is_shutdown_requested():
print(f"\n🎉 모든 스캔이 완료되었습니다! ({total_count}개 URL)")
print("🎉 재시도 큐도 모두 처리되었습니다!")
else:
print("\n🛑 종료 요청으로 인해 스캔이 중단되었습니다.")
else:
print("\n🛑 종료 요청으로 인해 스캔이 중단되었습니다.")
finally:
# 항상 리소스 정리
print("🔄 브라우저 리소스를 정리합니다...")
await cleanup_browser_resources()

View file

@ -1,22 +0,0 @@
# read json file .sensitive.json
import json
import os
def GetSensitiveData():
"""
Reads sensitive data from a .sensitive.json file in the current directory.
Returns:
dict: A dictionary containing the sensitive data.
"""
file_path = os.path.join(os.getcwd(), ".sensitive.json")
if not os.path.exists(file_path):
return None
with open(file_path, "r") as file:
sensitive_data = json.load(file)
return sensitive_data

View file

@ -1,2 +0,0 @@
from lib.llm.create import *
from lib.llm.prompt import *

View file

@ -1,19 +0,0 @@
from browser_use.llm import ChatGoogle
from dotenv import load_dotenv
# 환경 변수 로드 (GOOGLE_API_KEY 필요)
load_dotenv(override=True)
def CreateChatGoogle(model: str):
"""Browser Use용 Google 모델 생성"""
if model == "fallback":
print("⚠️ Fallback 모델을 사용합니다. Environment 변수를 확인하세요.")
print("⚠️ Model gemini-2.0-flash-lite를 사용합니다.")
model = "gemini-2.0-flash-lite"
return ChatGoogle(
model=model,
temperature=0.0
# Browser Use는 내부적으로 재시도 로직을 처리합니다
)

View file

@ -1,46 +0,0 @@
from typing import Type, Union
from pydantic import BaseModel
def get_prompt(type: str) -> tuple[str, Type[BaseModel]] | str:
"""
Prompt를 반환합니다.
:param type: 'auth' {Auth List} 또는 'google' {OAuth Provider}, 'meta' {OAuth Provider} 지정합니다.
:return: 해당하는 프롬프트 문자열 또는 (프롬프트, 모델) 튜플
"""
if type.lower() == "auth":
from lib.llm.prompt._get_oauth import model, prompt
return prompt, model
elif type.lower() in ["google", "google account"]:
from lib.llm.prompt.google import model, prompt
return prompt, model
elif type.lower() in ["microsoft", "microsoftonline"]:
from lib.llm.prompt.microsoft import model, prompt
return prompt, model
elif type.lower() in ["meta", "facebook"]:
from lib.llm.prompt.facebook import model, prompt
return prompt, model
elif type.lower() in ["apple"]:
from lib.llm.prompt.apple import model, prompt
return prompt, model
elif type.lower() in ["github"]:
from lib.llm.prompt.github import model, prompt
return prompt, model
else:
from lib.llm.prompt._fallback import model, prompt
return prompt, model

View file

@ -1,2 +0,0 @@
from lib.llm.prompt._fallback.model import model
from lib.llm.prompt._fallback.prompt import prompt

View file

@ -1,9 +0,0 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = (
None # "success", "mfa_required", "blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -1,66 +0,0 @@
from dotenv import load_dotenv
import os
load_dotenv()
google_id = os.getenv("GOOGLE_ID")
google_password = os.getenv("GOOGLE_PASSWORD")
naver_id = os.getenv("NAVER_ID")
naver_password = os.getenv("NAVER_PASSWORD")
facebook_id = os.getenv("FACEBOOK_ID")
facebook_password = os.getenv("FACEBOOK_PASSWORD")
github_id = os.getenv("GITHUB_ID")
github_password = os.getenv("GITHUB_PASSWORD")
microsoft_id = os.getenv("MICROSOFT_ID")
microsoft_password = os.getenv("MICROSOFT_PASSWORD")
# Extended planner prompt
prompt = f"""
You are a web automation agent.
Your task is to visit the given domain and perform a full login via the **SSO Login button**, following all steps strictly as described below.
Instructions:
1. If any cookie or privacy popups appear, dismiss or accept them.
2. Navigate through the site's UI to find the **login or sign-in page** (e.g., via buttons like "Log In", "Sign In", "Get Started").
3. Click the **SSO login button**.
4. Check if the user is **already logged and immediately redirected back to the original site** without showing a login screen.
- If so, treat the login as successful and return immediately.
5. If login proceeds without interruptions, complete the login and **immediately close the browser window**. Do not perform any further actions.
6. Login is considered successful if:
- You are redirected to a page that indicates successful login (e.g., a welcome page, dashboard, or account page).
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Credentials to use for login:
- Google `{google_id}` / `{google_password}`
- Naver `{naver_id}` / `{naver_password}`
- GitHub `{github_id}` / `{github_password}`
- facebook `{facebook_id}` / `{facebook_password}`
- Microsoft `{microsoft_id}` / `{microsoft_password}`
Constraints:
- Do NOT use search engines or guess URLs.
- Do NOT proceed with login if:
- CAPTCHA or MFA appears
- If the user is already logged and redirected back automatically, stop there and report success.
- If the login page cannot be found, return "login_page_not_found".
- If the login button is not found, return "sso_not_found".
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Final Output:
Return the result in the following format only:
```json
{{
"msg": "login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -1,2 +0,0 @@
from lib.llm.prompt._get_oauth.model import model
from lib.llm.prompt._get_oauth.prompt import prompt

View file

@ -1,7 +0,0 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
url: str | None = None
sso_list: list[str] = [] # List of SSO providers found on the login page

View file

@ -1,61 +0,0 @@
prompt = """
You are an expert in finding login pages.
Your task is to navigate to the login page of the given URL. Follow the steps below strictly and return results only in the specified format.
You are NOT allowed to navigate to URLs that are not directly discoverable within the initial domain. Do NOT use search engines or guess external login URLs.
0. INITIAL BLOCK CHECK
- If the browser is blocked when trying to access the page due to firewall, CAPTCHA, regional restrictions, or other access denials immediately terminate the process and return the following JSON:
```json
{
"msg": "Blocked",
"url": "",
"sso_list": []
}
```
- Do NOT proceed to further steps in this case.
1. LOGIN PAGE NAVIGATION
- Navigate only to a **client-side (non-enterprise)** login page within the provided domain.
- Do NOT rely on external tools, search engines, or links not directly found on the site.
- If a consent popup (e.g. for privacy/cookies) appears, you MUST dismiss or close it before proceeding.
- Since step 0 confirmed access, assume the page now loads properly.
2. SSO BUTTON IDENTIFICATION
- On the login page, look for the following social login (SSO) buttons:
- Google, GitHub, Facebook, Microsoft, Naver, Etc.
- Proceed only if it is clearly an **actual SSO button**.
- Exclude the following:
- Passkey-related buttons
- Username/password fields
- Email-based login
- Non-OAuth methods such as certificate or phone verification
3. RETURN FORMAT
- If the login page is successfully found, return:
```json
{
"msg": "Login page found",
"url": "https://example.com/login",
"sso_list": ["Google", "GitHub"]
}
```
- If the login page cannot be found, return:
```json
{
"msg": "Login page not found",
"url": "",
"sso_list": []
}
```
- If blocked (as in step 0), return:
```json
{
"msg": "Blocked",
"url": "",
"sso_list": []
}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -1,2 +0,0 @@
from lib.llm.prompt.apple.model import model
from lib.llm.prompt.apple.prompt import prompt

View file

@ -1,9 +0,0 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = (
None # "success", "mfa_required", "apple_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -1,62 +0,0 @@
import os
# Extended planner prompt
prompt = f"""
You are a web automation agent.
Your task is to visit the given domain and perform a full login via the **Apple SSO button**, following all steps strictly as described below.
Target: Find a login page inside this domain that allows "Sign in with Apple", and use it to complete login via Apple.
Instructions:
1. If any cookie or privacy popups appear, dismiss or accept them.
2. Navigate through the site's UI to find the **login or sign-in page** (e.g., via buttons like "Log In", "Sign In", "Get Started").
- Only follow links within the same domain.
3. On the login page, look for a clearly labeled **Apple SSO button** typically labeled as:
- "Continue with Apple"
- "Sign in with Apple"
- or a button with the Apple icon
4. Click the **Apple login button**.
- The Apple login flow MUST open in a **new browser tab** (not a new window or popup).
- If the login opens in a new **window** or **popup**, do NOT continue. Immediately stop and return the appropriate status.
5. Check if the user is **already logged in to Apple and immediately redirected back to the original site** without showing a Apple login screen.
- If so, treat the login as successful and return immediately.
6. If redirected to the Apple login page:
a. If a **CAPTCHA**, complete it.
b. If a **MFA prompt**, or a request for **ID/password entry** appears, do NOT proceed - Immediately stop and return the appropriate status.
- If a **"Continue"**, **"Trust"**, **"Authorize"**, or **"Allow"** button is displayed, click it to grant consent.
7. If login proceeds without interruptions, complete the login and **immediately close the browser window**. Do not perform any further actions.
8. Login is considered successful if:
- You are redirected to a page that indicates successful login (e.g., a welcome page, dashboard, or account page).
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Credentials to use for Apple login:
- Email: {os.getenv("APPLE_EMAIL", "")}
- Password: {os.getenv("APPLE_PASSWORD", "")}
Constraints:
- Do NOT use search engines or guess URLs.
- Do NOT use autofill, saved sessions, or cookies.
- Do NOT proceed with login if:
- The login opens in a new window (only tabs are allowed)
- CAPTCHA or MFA appears
- ID/password input is required
- If the user is already logged in to Apple and redirected back automatically, stop there and report success.
- If the login page cannot be found, return "login_page_not_found".
- If the Apple login button is not found, return "sso_not_found".
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Final Output:
Return the result in the following format only:
```json
{{
"msg": "Apple login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "apple_blocked" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -1,2 +0,0 @@
from lib.llm.prompt.facebook.model import model
from lib.llm.prompt.facebook.prompt import prompt

View file

@ -1,9 +0,0 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = (
None # "success", "mfa_required", "facebook_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -1,71 +0,0 @@
import os
# Extended planner prompt
prompt = f"""
You are a web automation agent.
Your task is to visit the given domain and perform a full login via the **Facebook SSO button**, following all steps strictly as described below.
Target: Find a login page inside this domain that allows "Sign in with Facebook", and use it to complete login via Facebook.
Instructions:
1. **CRITICAL**: Wait 3 seconds for page to load and interactive elements to appear
2. If any cookie or privacy popups appear, dismiss or accept them by looking for buttons like "Accept All", "Accept", "Allow All"
3. If any promotional/discount popups appear, dismiss them by looking for "Close", "X", or "No Thanks" buttons
4. Navigate through the site's UI to find the **login or sign-in page**:
- Look for "My Account", "Log In", "Sign In", "Get Started" buttons
- Only follow links within the same domain
5. On the login page, look for a clearly labeled **Facebook SSO button** typically labeled as:
- "Continue with Facebook"
- "Sign in with Facebook"
- "Login with Facebook"
- or a button with the Facebook 'f' icon
6. Click the **Facebook login button**.
- The Facebook login flow will open in a new browser tab or window (this is normal OAuth behavior)
- Continue with the Facebook login process in the new tab/window
7. **Facebook Login Flow Handling:**
- **If already logged in**: You may see a consent screen with buttons like "Continue as [Name]", "Continue", "Allow" - CLICK these buttons to proceed
- **If CAPTCHA appears on registration form**: Try clicking "Create account" button multiple times (up to 5 attempts) as some CAPTCHAs may not block form submission
- **If form submission fails after 5 attempts**: Report "success" anyway if all fields were filled and Facebook OAuth was completed successfully
- **Registration completion is optional**: The main goal is Facebook OAuth completion, not site registration
8. **After Facebook OAuth consent/login**:
- Wait for redirection back to the original site
- **CRITICAL**: If redirected to a registration/signup form with fields like:
- "Last name", "Email address", "Password"
- "Create account" or "Sign up" button
- ANY additional information form
- **MUST complete the registration process:**
- Fill in required fields with appropriate test data:
- Last name: "TestUser"
- Email: Use the Facebook email or "test@example.com"
- Password: "TestPassword123!"
- Click "Create account", "Sign up", or "Complete registration" button
- Only after completing ALL steps, record the final URL as successful login
9. If all steps are completed successfully, close your browser window.
10. Login is considered successful if:
- You are redirected to a page that indicates successful login (e.g., a welcome page, dashboard, or account page).
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Credentials to use for Facebook login (if needed):
- Email/Phone: {os.getenv("FACEBOOK_EMAIL", "")}
- Password: {os.getenv("FACEBOOK_PASSWORD", "")}
Constraints:
- Do NOT use search engines or guess URLs
- Do NOT use autofill, saved sessions, or cookies
- Do NOT proceed with login if CAPTCHA or MFA appears
- **ALWAYS complete any additional registration forms** after Facebook OAuth
- **Fill required fields** with test data if signup form appears
- **Only return "success" after completing ALL registration steps**
- If the login page cannot be found, return "login_page_not_found"
- If the Facebook login button is not found, return "sso_not_found"
Final Output:
Return the result in the following format only:
```json
{{
"msg": "Facebook login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "idpw_required" | "facebook_blocked" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -1,2 +0,0 @@
from lib.llm.prompt.github.model import model
from lib.llm.prompt.github.prompt import prompt

View file

@ -1,9 +0,0 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = (
None # "success", "mfa_required", "github_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -1,81 +0,0 @@
import os
# Extended planner prompt
prompt = f"""
You are a web automation agent.
Your task is to visit the given domain and perform a full login via the **GitHub SSO button**, following all steps strictly as described below.
Target: Find a login page inside this domain that allows "Sign in with GitHub", and use it to complete login via GitHub.
Instructions:
1. If any cookie or privacy popups appear, dismiss or accept them.
2. Navigate through the site's UI to find the **login or sign-in page** (e.g., via buttons like "Log In", "Sign In", "Get Started").
- Only follow links within the same domain.
- If a "Sign Up" or "Create Account" page appears instead, it is acceptable **as long as it includes a GitHub SSO option**.
3. On the login or sign-up page, look for a clearly labeled **GitHub SSO button** typically labeled as:
- "Continue with GitHub"
- "Sign in with GitHub"
- or a button with the GitHub logo
4. Click the **GitHub login button**.
- The GitHub login flow MUST open in a **new browser tab** (not a new window or popup).
- If the login opens in a new **window** or **popup**, do NOT continue. Immediately stop and return the appropriate status.
5. Check if the user is **already logged in to GitHub and immediately redirected back to the original site** without showing a GitHub login screen.
- If so, treat the login as successful and return immediately.
6. If redirected to the GitHub login page:
a. Wait for the username or email input field, then enter the email: {os.getenv("GITHUB_EMAIL", "")}
b. Click the "Continue" or "Next" button if present.
c. Enter the password: {os.getenv("GITHUB_PASSWORD", "")}
d. Click the "Sign in" button.
e. If a page appears asking to "Authorize" access for the application, click the "Authorize" button.
- GitHub may take a while to redirect after authorization, so please wait patiently.
- If a CAPTCHA, MFA prompt, or other interruption appears, do NOT proceed.
- If login fails due to incorrect credentials or authentication errors, treat as `"idpw_required"` and stop.
- Immediately stop and return the appropriate status.
7. If login proceeds without interruptions, wait for redirection back to the original site and record the final URL.
8. Close your browser window after the login is completed.
9. Login is considered successful if:
- You are redirected to a page that indicates successful login (e.g., a welcome page, dashboard, or account page).
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Credentials to use for GitHub login:
- Email: {os.getenv("GITHUB_EMAIL", "")}
- Password: {os.getenv("GITHUB_PASSWORD", "")}
Constraints:
- Do NOT use search engines or guess URLs.
- Do NOT use autofill, saved sessions, or cookies.
- Do NOT proceed with login if:
- The login opens in a new window (only tabs are allowed)
- CAPTCHA or MFA appears
- ID/password input is required and cannot be autofilled
- If the user is already logged in to GitHub and redirected back automatically, stop there and report success.
- If the login page cannot be found, return "login_page_not_found".
- If the GitHub login button is not found, return "sso_not_found".
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Final Output:
Return the result in the following format only:
```json
{{
"msg": "GitHub login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "github_blocked" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -1,2 +0,0 @@
from lib.llm.prompt.google.model import model
from lib.llm.prompt.google.prompt import prompt

View file

@ -1,9 +0,0 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = (
None # "success", "mfa_required", "google_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -1,63 +0,0 @@
import os
# Extended planner prompt
prompt = f"""
You are a web automation agent.
Your task is to visit the given domain and perform a full login via the **Google SSO button**, following all steps strictly as described below.
Target: Find a login page inside this domain that allows "Sign in with Google", and use it to complete login via Google.
Instructions:
1. If any cookie or privacy popups appear, dismiss or accept them.
2. Navigate through the site's UI to find the **login or sign-in page** (e.g., via buttons like "Log In", "Sign In", "Get Started").
- Only follow links within the same domain.
3. On the login page, look for a clearly labeled **Google SSO button** typically labeled as:
- "Continue with Google"
- "Sign in with Google"
- or a button with the Google 'G' icon
4. Click the **Google login button**.
- The Google login flow MUST open in a **new browser tab** (not a new window or popup).
- If the login opens in a new **window** or **popup**, do NOT continue. Immediately stop and return the appropriate status.
5. Check if the user is **already logged in to Google and immediately redirected back to the original site** without showing a Google login screen.
- If so, treat the login as successful and return immediately.
6. If redirected to the Google login page:
a. Wait for the username or email input field, then enter the email: {os.getenv("GOOGLE_EMAIL", "")}
b. Click the "Continue" or "Next" button if present. (If still on the same page, reapeat step a)
c. Wait for the password input field, then enter the password: {os.getenv("GOOGLE_PASSWORD", "")}
d. Click the "Sign in" or "Next" button.
7. If login proceeds without interruptions, wait for redirection back to the original site and record the final URL.
8. Close your browser window after the login is completed.
9. Login is considered successful if:
- You are redirected to a page that indicates successful login (e.g., a welcome page, dashboard, or account page).
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Credentials to use for Google login:
- Email: {os.getenv("GOOGLE_EMAIL", "")}
- Password: {os.getenv("GOOGLE_PASSWORD", "")}
Constraints:
- Do NOT use search engines or guess URLs.
- Do NOT use autofill, saved sessions, or cookies.
- Do NOT proceed with login if:
- The login opens in a new window (only tabs are allowed)
- CAPTCHA or MFA appears
- If the user is already logged in to Google and redirected back automatically, stop there and report success.
- If the login page cannot be found, return "login_page_not_found".
- If the Google login button is not found, return "sso_not_found".
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Final Output:
Return the result in the following format only:
```json
{{
"msg": "Google login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "google_blocked" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -1,2 +0,0 @@
from lib.llm.prompt.microsoft.model import model
from lib.llm.prompt.microsoft.prompt import prompt

View file

@ -1,9 +0,0 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = (
None # "success", "mfa_required", "microsoft_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -1,60 +0,0 @@
import os
# This code snippet is used to generate a prompt for a web automation agent that performs Microsoft SSO login.
prompt = f"""
당신은 자동화 에이전트입니다.
당신의 임무는 주어진 도메인에 방문하여 아래에 엄격히 설명된 모든 단계를 따라 **Microsoft SSO 버튼** 통해 전체 로그인을 수행하는 것입니다.
목표: 도메인 내에서 "Microsoft로 로그인" 가능한 로그인 페이지를 찾아 Microsoft을 통해 로그인을 완료하세요.
지침:
1. 쿠키 또는 개인정보 팝업이 나타나면 닫거나 수락하세요.
2. 사이트의 UI를 탐색하여 **로그인 또는 로그인 페이지**(: "로그인", "Sign In", "Get Started" 같은 버튼) 찾으세요.
- 동일한 도메인 내의 링크만 따라가세요.
3. 로그인 페이지에서 명확하게 표시된 **Microsoft SSO 버튼** 찾으세요. 일반적으로 다음과 같이 표시됩니다:
- "Continue with Microsoft"
- "Sign in with Microsoft"
- or a button with the Microsoft logo (usually four squares)
4. **Microsoft 로그인 버튼** 클릭하세요.
- Microsoft 로그인 플로우는 반드시 ** 브라우저 **에서 열려야 합니다 ( 창이나 팝업이 아님).
- 로그인이 ****이나 **팝업**에서 열리면, 즉시 중단하고 적절한 상태를 반환하세요.
5. 사용자가 **이미 Microsoft에 로그인되어 있고 즉시 원래 사이트로 리디렉션**된다면,
- 경우 로그인이 성공한 것으로 간주하고 즉시 반환하세요.
6. Microsoft 로그인 페이지로 리디렉션된 경우:
- **CAPTCHA**, **MFA 프롬프트** 요청이 나타나면 진행하지 마세요.
- 즉시 중단하고 적절한 상태를 반환하세요.
7. 로그인에 방해가 없다면, 원래 사이트로 리디렉션될 때까지 기다리고 최종 URL을 기록하세요.
8. 로그인 되어있지 않으면 아래의 EMAIL과 PASSWORD를 사용하여 로그인하세요:
- Email: {os.getenv("MICROSOFT_EMAIL", "")}
- Password: {os.getenv("MICROSOFT_PASSWORD", "")}
9. 로그인 완료 브라우저 창을 닫으세요.
10. Login is considered successful if:
- You are redirected to a page that indicates successful login (e.g., a welcome page, dashboard, or account page).
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
제약 사항:
- 검색 엔진을 사용하거나 URL을 추측하지 마세요.
- 자동완성, 저장된 세션 또는 쿠키를 사용하지 마세요.
- 다음과 같은 경우 로그인 절차를 진행하지 마세요:
- 로그인이 창에서 열릴 (탭만 허용)
- CAPTCHA 또는 MFA가 나타날
- ID/비밀번호 입력이 필요하지만 자동입력이 불가한 경우
- 사용자가 이미 Microsoft에 로그인되어 자동으로 리디렉션된다면, 즉시 성공으로 보고 종료하세요.
- 로그인 페이지를 찾을 없으면 "login_page_not_found" 반환하세요.
- Microsoft 로그인 버튼을 찾을 없으면 "sso_not_found" 반환하세요.
- 회원가입 페이지와 같은 화면이 나타나면 성공적인 로그인으로 간주하고 즉시 종료하세요.
최종 출력:
다음 형식으로만 결과를 반환하세요:
```json
{{
"msg": "Microsoft login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "microsoft_blocked" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -1,7 +0,0 @@
# export from show_info
from lib.utils.agent_info import *
from lib.utils.config import *
from lib.utils.data import *
from lib.utils.parsing.is_html import *
from lib.utils.parsing.read_txt import *

View file

@ -1,64 +0,0 @@
import os
from dotenv import load_dotenv
from lib.utils.config import (
BACKEND_URL,
GOOGLE_API_KEY,
GOOGLE_MODEL,
GOOGLE_PLANNER_MODEL,
)
load_dotenv(override=True)
def show_info():
print("🔧 환경 설정:")
print(browser_use_version())
print(f"🔗 Backend URL: {BACKEND_URL}")
print(
f"🔑 Google API Key: {'*' * (len(GOOGLE_API_KEY) - 4) + GOOGLE_API_KEY[-4:] if GOOGLE_API_KEY else None}"
)
print(f"🌐 Google Model: {GOOGLE_MODEL}")
print(f"🌐 Google Planner Model: {GOOGLE_PLANNER_MODEL}")
def browser_use_version():
try:
# run uv pip show browser-use
import subprocess
result = subprocess.run(
["uv", "pip", "show", "browser-use"],
capture_output=True,
text=True,
check=True,
)
print("📦 Browser Use 패키지 정보:")
return result.stdout.strip()
except ImportError:
return None
def env_cheker():
if GOOGLE_API_KEY is None:
raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.")
if GOOGLE_PLANNER_MODEL != None and (
not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN")
or not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LIST")
):
print(
"⚠️ GOOGLE_PLANNER_MODEL이 설정되어 있지만, ENABLE_PLANNER_MODEL_OAUTH_LOGIN 또는 ENABLE_PLANNER_MODEL_OAUTH_LIST가 활성화되지 않았습니다."
)
print(
"⚠️ Planner 모델을 사용하려면 .env 파일에서 ENABLE_PLANNER_MODEL_OAUTH_LOGIN과 ENABLE_PLANNER_MODEL_OAUTH_LIST를 true로 설정하세요."
)
print(
"‼️ 하지만 현재 Planner 모델을 사용하는 것이 권장되지 않습니다. 이 기능은 오작동을 일으킬 수 있습니다."
)
print("⚠️ 이 경고는 1초동안 정지합니다.")
# 이 경고는 1초동안 sleep
import time
time.sleep(1)

View file

@ -1,11 +0,0 @@
import os
from dotenv import load_dotenv
load_dotenv(verbose=True, override=True)
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:11081")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash")
GOOGLE_PLANNER_MODEL = os.getenv("GOOGLE_PLANNER_MODEL")
USER_DATA_DIR = os.getenv("USER_DATA_DIR", "./data/user_data")

View file

@ -1,2 +0,0 @@
from lib.utils.data.backend_client import *
from lib.utils.data.logger import *

View file

@ -1,23 +0,0 @@
import requests
from lib.utils.config import BACKEND_URL
def notify_backend(target_url):
# Backend에 스캔 시작을 알림
try:
response = requests.post(
f"{BACKEND_URL}/start", params={"url": target_url}, timeout=5
)
if response.status_code == 200:
print(f"✅ Backend notified: {response.text}")
else:
print(f"⚠️ Backend notification failed: {response.status_code}")
except requests.exceptions.ConnectionError:
print(
f"⚠️ Backend server not available at {BACKEND_URL}. Continuing without notification."
)
except requests.exceptions.Timeout:
print(f"⚠️ Backend notification timed out. Continuing without notification.")
except Exception as e:
print(f"⚠️ Failed to notify backend: {e}")

View file

@ -1,30 +0,0 @@
from datetime import datetime
from pathlib import Path
# 미리 정해진 파일 경로
FILE_PATH = Path("data/log.txt")
def logger(msg: str) -> None:
try:
"""
msg 문자열을 파일 끝에 추가합니다.
- 파일이 없으면 새로 생성
- 디렉터리가 없으면 생성
"""
# 상위 디렉터리 생성 (이미 있으면 무시)
FILE_PATH.parent.mkdir(parents=True, exist_ok=True)
# 현재 시각 구해서 포맷팅
now = datetime.now()
timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
# 메시지에 개행이 없으면 자동으로 붙이기
newline = "" if msg.endswith("\n") else "\n"
line = f"[{timestamp}] {msg}{newline}"
# 'a' 모드: 파일이 없으면 생성, 있으면 이어쓰기
with FILE_PATH.open(mode="a", encoding="utf-8") as f:
f.write(line)
except:
print(msg)

View file

@ -1,38 +0,0 @@
import requests
def is_html_url(url: str, timeout: float = 10.0) -> bool:
"""
주어진 URL에 HEAD 요청을 보내고, 응답 헤더의 Content-Type이 HTML인지 확인합니다.
- url: 검사할 URL 문자열
- timeout: 요청 타임아웃( 단위)
반환값:
- Content-Type이 'text/html' 시작하면 True, 그렇지 않으면 False
"""
try:
with requests.get(url, timeout=timeout, stream=True) as response:
# 응답 코드가 200번대가 아니면 False로 간주
if not response.ok:
return False
content_type = response.headers.get("Content-Type", "")
# Content-Type에 'text/html'이 포함되어 있으면 HTML로 간주
return content_type.lower().startswith("text/html")
except requests.RequestException:
return False
if __name__ == "__main__":
test_urls = [
"https://www.example.com",
"https://api.github.com", # JSON API라서 HTML이 아닐 확률이 높음
"https://raw.githubusercontent.com", # 텍스트 파일 등 다양한 타입
]
for url in test_urls:
if is_html_url(url):
print(f"[HTML] {url}")
else:
print(f"[Not HTML] {url}")

View file

@ -1,38 +0,0 @@
def read_lines_between(filepath: str, start_line: int, end_line: int) -> list[str]:
"""
파일에서 start_line번 줄부터 end_line번 줄까지 읽어와
줄을 요소로 갖는 리스트를 반환하는 함수.
Parameters:
----------
filepath : str
읽을 텍스트 파일의 경로
start_line : int
읽기 시작할 번호 (1부터 시작)
end_line : int
읽을 마지막 번호 (start_line <= end_line)
Returns:
-------
list[str]
줄을 문자열로 저장한 리스트.
파일에 해당 범위의 줄이 없으면 가능한 만큼만 반환.
"""
if start_line < 1 or end_line < start_line:
raise ValueError(
"start_line은 1 이상이어야 하며, end_line은 start_line 이상이어야 합니다."
)
selected_lines: list[str] = []
with open(filepath, "r", encoding="utf-8") as f:
for idx, line in enumerate(f, start=1):
if idx < start_line:
# 아직 읽기 시작 전
continue
if idx > end_line:
# 읽을 범위를 벗어났으므로 중단
break
# 줄 끝의 개행 문자를 제거하고 리스트에 추가
selected_lines.append(line.rstrip("\n"))
return selected_lines

View file

@ -1,119 +0,0 @@
import json
import os, sys
import signal
import time
import threading
from pathlib import Path
# 진행 상황 추적을 위한 전역 변수
current_progress = {"current_index": 0, "total": 0, "current_url": "", "start_line": 0}
progress_file = Path("data/scan_progress.json")
# Ctrl+C 처리를 위한 전역 변수
ctrl_c_count = 0
last_ctrl_c_time = 0
shutdown_requested = False
shutdown_lock = threading.Lock()
def save_progress():
"""현재 진행 상황을 파일에 저장"""
progress_file.parent.mkdir(parents=True, exist_ok=True)
with open(progress_file, "w", encoding="utf-8") as f:
json.dump(current_progress, f, ensure_ascii=False, indent=2)
def load_progress():
"""이전 진행 상황을 파일에서 불러오기"""
if os.path.exists(progress_file):
try:
with open(progress_file, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return None
return None
def signal_handler(signum, frame):
"""Ctrl+C 시그널 핸들러 - browser-use pause 기능과 호환"""
global shutdown_requested, ctrl_c_count, last_ctrl_c_time
current_time = time.time()
with shutdown_lock:
# 연속된 Ctrl+C 감지 (2초 내에 두 번 누르면 강제 종료)
if current_time - last_ctrl_c_time < 2.0:
ctrl_c_count += 1
else:
ctrl_c_count = 1
last_ctrl_c_time = current_time
# 두 번째 Ctrl+C이거나 이미 종료 요청이 있었다면 강제 종료
if ctrl_c_count >= 2 or shutdown_requested:
print("\n⚡ 강제 종료합니다!")
import asyncio
try:
loop = asyncio.get_running_loop()
for task in asyncio.all_tasks(loop):
task.cancel()
except RuntimeError:
pass
os._exit(1)
# 첫 번째 Ctrl+C: 정상 종료 요청
shutdown_requested = True
print("\n" + "=" * 60)
print("🛑 종료 신호를 받았습니다!")
print(f"📊 현재 진행 상황:")
print(f" - 전체: {current_progress['total']}개 URL")
print(f" - 완료: {current_progress['current_index']}개 URL")
print(f" - 현재 처리 중: {current_progress['current_url']}")
if current_progress.get('start_line'):
print(f" - domains.txt의 {current_progress['start_line'] + current_progress['current_index']}번째 줄")
if current_progress["total"] > 0:
print(f" - 진행률: {current_progress['current_index']}/{current_progress['total']} ({current_progress['current_index']/current_progress['total']*100:.1f}%)")
print("=" * 60)
# 진행 상황 저장
save_progress()
print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.")
print("다음에 같은 명령어로 실행하면 이어서 진행할 수 있습니다.")
print("<EFBFBD> 2초 내에 Ctrl+C를 다시 누르면 강제 종료됩니다.")
# 정상적인 종료를 위해 KeyboardInterrupt 발생
raise KeyboardInterrupt()
def is_shutdown_requested():
"""종료 요청 상태를 확인하는 함수"""
with shutdown_lock:
return shutdown_requested
def request_shutdown():
"""외부에서 종료를 요청할 수 있는 함수"""
global shutdown_requested
with shutdown_lock:
if not shutdown_requested:
shutdown_requested = True
print("\n🛑 종료가 요청되었습니다.")
print(f"📊 현재 진행 상황:")
print(f" - 전체: {current_progress['total']}개 URL")
print(f" - 완료: {current_progress['current_index']}개 URL")
print(f" - 현재 처리 중: {current_progress['current_url']}")
if current_progress.get('start_line'):
print(f" - domains.txt의 {current_progress['start_line'] + current_progress['current_index']}번째 줄")
if current_progress["total"] > 0:
print(f" - 진행률: {current_progress['current_index']}/{current_progress['total']} ({current_progress['current_index']/current_progress['total']*100:.1f}%)")
save_progress()
print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.")
print("다음에 같은 명령어로 실행하면 이어서 진행할 수 있습니다.")
def setup_signal_handler():
"""시그널 핸들러 등록 - browser-use와의 호환성을 위해 비활성화"""
# browser-use 라이브러리가 자체적으로 Ctrl+C 처리를 하므로
# 우리의 signal handler는 등록하지 않음
pass

View file

@ -1,116 +0,0 @@
"""
종료 처리를 위한 개선된 모듈
browser-use의 pause 기능과 호환되도록 설계
"""
import json
import os
import signal
import time
import threading
import asyncio
from pathlib import Path
# 진행 상황 추적을 위한 전역 변수
current_progress = {"current_index": 0, "total": 0, "current_url": "", "start_line": 0}
progress_file = Path("data/scan_progress.json")
# 종료 관리를 위한 전역 변수
shutdown_requested = False
shutdown_lock = threading.Lock()
original_handler = None
def save_progress():
"""현재 진행 상황을 파일에 저장"""
progress_file.parent.mkdir(parents=True, exist_ok=True)
with open(progress_file, "w", encoding="utf-8") as f:
json.dump(current_progress, f, ensure_ascii=False, indent=2)
def load_progress():
"""이전 진행 상황을 파일에서 불러오기"""
if os.path.exists(progress_file):
try:
with open(progress_file, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return None
return None
def request_shutdown():
"""종료 요청 함수 - 외부에서 호출 가능"""
global shutdown_requested
with shutdown_lock:
if not shutdown_requested:
shutdown_requested = True
print("\n🛑 종료가 요청되었습니다. 현재 작업을 완료한 후 종료합니다...")
save_progress()
print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.")
def is_shutdown_requested():
"""종료 요청 상태를 확인하는 함수"""
with shutdown_lock:
return shutdown_requested
def cleanup_signal_handler():
"""signal handler를 정리하고 원래 상태로 복원"""
global original_handler
if original_handler is not None:
signal.signal(signal.SIGINT, original_handler)
original_handler = None
def setup_minimal_signal_handler():
"""최소한의 signal handler만 설정 - browser-use와 충돌 방지"""
global original_handler
# 원래 핸들러 저장
original_handler = signal.signal(signal.SIGINT, signal.SIG_DFL)
def graceful_signal_handler(signum, frame):
"""우아한 종료를 위한 최소한의 signal handler"""
print("\n🛑 종료 신호를 받았습니다...")
save_progress()
print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.")
# 원래 핸들러로 복원하고 신호를 다시 발생시킴
signal.signal(signal.SIGINT, original_handler)
os.kill(os.getpid(), signal.SIGINT)
signal.signal(signal.SIGINT, graceful_signal_handler)
class GracefulShutdown:
"""컨텍스트 매니저로 사용할 수 있는 우아한 종료 클래스"""
def __init__(self):
self.original_handler = None
def __enter__(self):
self.original_handler = signal.signal(signal.SIGINT, self._signal_handler)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.original_handler is not None:
signal.signal(signal.SIGINT, self.original_handler)
def _signal_handler(self, signum, frame):
"""내부 signal handler"""
request_shutdown()
# 원래 핸들러 복원 후 신호 재전송
signal.signal(signal.SIGINT, self.original_handler)
os.kill(os.getpid(), signal.SIGINT)
# 기존 함수들과의 호환성을 위한 별칭
def setup_signal_handler():
"""기존 코드와의 호환성을 위한 함수"""
pass # browser-use의 signal handler를 방해하지 않음
def signal_handler(signum, frame):
"""기존 코드와의 호환성을 위한 함수"""
request_shutdown()

View file

@ -1,121 +0,0 @@
import argparse
import asyncio
import os
import sys
from dotenv import load_dotenv
from lib.browser_use.scanner import main_loop
from lib.utils import env_cheker
from lib.utils.progress import progress_file, setup_signal_handler
def setup_environment():
"""환경 변수 로드 및 관련 라이브러리를 초기화합니다."""
# .env 파일 로드
load_dotenv(verbose=True, override=True)
# 환경 변수 체크
env_cheker()
# Laminar 초기화 (선택적)
if os.getenv("LMNR_PROJECT_API_KEY"):
try:
from lmnr import Laminar
Laminar.initialize(project_api_key=os.getenv("LMNR_PROJECT_API_KEY"))
except ImportError:
print("⚠️ Laminar 라이브러리가 설치되지 않았습니다. 관련 기능이 비활성화됩니다.")
else:
print("⚠️ LMNR_PROJECT_API_KEY 환경 변수가 설정되지 않았습니다. Laminar 기능이 비활성화됩니다.")
def parse_arguments():
"""커맨드 라인 인자를 파싱합니다."""
parser = argparse.ArgumentParser(
prog="domain_scanner",
description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다.",
)
parser.add_argument(
"-f",
"--file",
type=str,
required=True,
help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)",
)
parser.add_argument(
"-s", "--start", type=int, required=True, help="읽기 시작 줄 번호 (1-based)"
)
parser.add_argument(
"-e", "--end", type=int, required=True, help="읽기 종료 줄 번호 (1-based)"
)
parser.add_argument(
"-skh",
"--skip-html-check",
action="store_true",
help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다.",
)
return parser.parse_args()
def main():
"""애플리케이션 메인 진입점"""
setup_environment()
setup_signal_handler()
args = parse_arguments()
# read and remove user data path
log_file = os.path.join("./data", "userdata.dump")
if not os.path.exists("./data"):
os.makedirs("./data")
if os.path.exists(log_file):
with open(log_file, "r") as f:
tmp_user_data_dir = f.read().strip()
try:
import shutil
if os.path.exists(tmp_user_data_dir):
shutil.rmtree(tmp_user_data_dir)
print(f"🔧 이전 실행의 임시 사용자 데이터 디렉토리 {tmp_user_data_dir}를 삭제하였습니다.")
except (PermissionError, FileNotFoundError, OSError) as e:
print(f"⚠️ 임시 사용자 데이터 디렉토리 삭제 실패: {e}")
try:
os.remove(log_file)
except OSError:
pass
try:
asyncio.run(
main_loop(
filepath=args.file,
start_line=args.start,
end_line=args.end,
skip_html_check=args.skip_html_check,
)
)
except KeyboardInterrupt:
print("\n🛑 사용자에 의해 중단되었습니다.")
# 진행 상황 저장
from lib.utils.progress import save_progress, request_shutdown
request_shutdown()
print("✅ 정리 완료.")
sys.exit(0)
except Exception as e:
print(f"\n❌ 예상치 못한 오류가 발생했습니다: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
# 정상 종료 시에만 진행 상황 파일 삭제
from lib.utils.progress import is_shutdown_requested
if not is_shutdown_requested() and os.path.exists(progress_file):
try:
os.remove(progress_file)
print("✅ 진행 상황 파일이 삭제되었습니다.")
except OSError as e:
print(f"⚠️ 진행 상황 파일 삭제 실패: {e}", file=sys.stderr)
if __name__ == "__main__":
main()

52
temp.md Normal file
View file

@ -0,0 +1,52 @@
You are an AI model specialized in web crawling and analysis. Given a URI, perform the following tasks:
1. Navigate to the provided URI and locate the login page. If its not found, explore common auth-related pages like /login or /auth.
2. On the login page, identify all available social login buttons (OAuth-based) such as Google, GitHub, Facebook, etc.
3. Simulate clicking each social login button and follow the redirect to capture the full redirect URL (including query parameters).
4. From the redirect URL and parameters, extract:
- `client_id`
- `redirect_uri`
- `response_type`
- `scope`
5. Based on URL patterns, infer the OAuth method: Authorization Code, Implicit, PKCE, etc.
6. Return data in the following JSON format only:
```json
{
"oauths": [
{
"issue": "<site being tested, e.g., git.imnya.ng>",
"oauth_uri": "<original button href or URL triggered>"
}
]
}
````
7. If the login button says something like "Login with GitHub" or "Login with Google", follow the flow and use the **final redirect URL after clicking** as the value of `oauth_uri`.
**Examples:**
```json
{
"oauths": [
{
"issue": "git.imnya.ng",
"provider": "GitHub",
"client_id": "Iv1.xxxxx",
"redirect_uri": "https://git.imnya.ng/user/oauth2/callback",
"response_type": "code",
"scope": "read:user",
"oauth_uri": "https://github.com/login/oauth/authorize?client_id=Iv1.xxxxx&redirect_uri=https%3A%2F%2Fgit.imnya.ng%2Fuser%2Foauth2%2Fcallback&response_type=code&scope=read%3Auser"
}
]
}
```
**Constraints:**
* Simulate realistic interaction with buttons (e.g., clicking them to follow redirects).
* Ensure the output is strictly in the specified JSON format.
* Avoid any additional text or explanations outside the JSON response.
* If no OAuth logins are found, return an empty array.
* WebAuthn, PassKey is not OAuth, so do not include it in the results.

2083
uv.lock generated

File diff suppressed because it is too large Load diff