feat/refect 가져오기

This commit is contained in:
tk 2025-06-27 23:06:35 +09:00
commit 54682cdb72
43 changed files with 702 additions and 520 deletions

View file

@ -0,0 +1,7 @@
from lib.browser_use.clean_resources import *
from lib.browser_use.func import *
from lib.browser_use.model import *
from lib.browser_use.init_profile import *
from lib.browser_use.sensitive_data import *
from lib.browser_use.agents import *
from lib.browser_use.scanner import *

View file

@ -0,0 +1,175 @@
import asyncio
import os
import json
from browser_use import Agent, BrowserSession, Controller
from patchright.async_api import async_playwright as async_patchright
from lib.browser_use import (
GetProfile,
GetSensitiveData,
clean_resources,
)
from lib.utils import (
logger,
config,
)
from lib.llm import CreateChatGoogleGenerativeAI, get_prompt
# Exponential backoff settings
INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", "60")) # seconds
MAX_BACKOFF = int(os.getenv("MAX_BACKOFF", "600")) # seconds
async def _run_agent_with_retry(agent_config):
"""Agent 실행을 위한 내부 헬퍼 함수 (재시도 로직 포함)"""
agent = None
session = None
try_cnt = 0
url = agent_config["url"]
while try_cnt < 3:
try:
session = BrowserSession(
playwright=(await async_patchright().start()),
browser_profile=await GetProfile(),
)
agent = Agent(
browser_session=session,
**agent_config["agent_params"]
)
response = await agent.run()
await clean_resources(agent, session)
return response
except Exception as e:
await clean_resources(agent, session)
if "ResourceExhausted" in str(e) or "429" in str(e):
wait = min(INITIAL_BACKOFF * (2**try_cnt), MAX_BACKOFF)
print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...")
await asyncio.sleep(wait)
try_cnt += 1
if try_cnt >= 3:
error_msg = f"API 쿼터 문제가 지속됩니다."
logger(f"{url} - {agent_config['log_context']} 실패: {error_msg}: {e}")
print(f"{url} - {agent_config['log_context']} 실패: {error_msg}")
return None
continue
# 일반 에러 처리
try_cnt += 1
if try_cnt >= 3:
error_msg = f"최대 재시도 횟수 초과."
logger(f"{url} - {agent_config['log_context']} 실패: {error_msg}: {e}")
print(f"{url} - {agent_config['log_context']} 실패: {error_msg}")
return None
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
await asyncio.sleep(30)
continue
return None
async def extract_oauth_list(url: str):
"""첫 번째 Agent: 로그인 페이지를 찾고 OAuth 리스트만 추출"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🔎 OAuth 리스트 추출 시작: {target_url}")
prompt, model = get_prompt("auth")
agent_config = {
"url": target_url,
"log_context": "OAuth 리스트 추출",
"agent_params": {
"initial_actions": [{"open_tab": {"url": target_url}}],
"sensitive_data": GetSensitiveData(),
"task": (
"Navigate to the login page and identify all OAuth provider buttons (excluding Passkey). "
"DO NOT click any OAuth buttons or attempt to login. "
"Just find and list all available OAuth providers with their button texts or provider names. "
"Return a list of OAuth providers found on the login page."
),
"llm": CreateChatGoogleGenerativeAI(config.GOOGLE_MODEL),
"planner_llm": (
CreateChatGoogleGenerativeAI(config.GOOGLE_PLANNER_MODEL)
if config.GOOGLE_PLANNER_MODEL
else None
),
"controller": Controller(
output_model=model if not isinstance(model, str) else None,
exclude_actions=["search_google", "unknown_action", "unkown"],
),
"extend_planner_system_message": prompt,
}
}
response = await _run_agent_with_retry(agent_config)
if not response:
return []
final_result = response.final_result()
if not final_result:
print("OAuth 리스트 추출 결과가 없습니다.")
return []
try:
data = json.loads(final_result)
oauth_providers = data.get("oauth_providers", [])
if not oauth_providers:
print("❌ OAuth 제공자가 없습니다.")
logger(f"{url} - OAuth 제공자 없음: {final_result}")
return []
print(f"✅ OAuth 제공자 추출 완료: {oauth_providers}")
return oauth_providers
except (json.JSONDecodeError, KeyError) as e:
print(f"❌ 결과 파싱 실패: {e}")
logger(f"{url} 결과 파싱 실패: {final_result}")
return []
async def test_oauth_login(url: str, oauth_provider: str):
"""두 번째 Agent: 특정 OAuth 제공자로 로그인 시도"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🔐 {oauth_provider} 로그인 시작: {target_url}")
prompt, model = get_prompt(oauth_provider)
agent_config = {
"url": target_url,
"log_context": f"{oauth_provider} 로그인",
"agent_params": {
"initial_actions": [{"open_tab": {"url": target_url}}],
"sensitive_data": GetSensitiveData(),
"task": (
f"Navigate to the login page, find and click the {oauth_provider} OAuth button, "
f"then follow the complete OAuth login flow as far as possible with a real user account. "
f"Capture the final redirect URL after login completion. "
f"If login fails or encounters errors, report the issue. "
f"Focus only on {oauth_provider} - ignore other OAuth providers."
),
"llm": CreateChatGoogleGenerativeAI(config.GOOGLE_MODEL),
"planner_llm": (
CreateChatGoogleGenerativeAI(config.GOOGLE_PLANNER_MODEL)
if config.GOOGLE_PLANNER_MODEL and os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN")
else None
),
"controller": Controller(
output_model=model if not isinstance(model, str) else None,
exclude_actions=["search_google", "unknown_action", "unkown"],
),
"extend_planner_system_message": prompt,
}
}
response = await _run_agent_with_retry(agent_config)
if response and response.final_result():
final_result = response.final_result()
print(f"{oauth_provider} 로그인 완료")
logger(f"{url} - {oauth_provider} 로그인 결과: {final_result}")
return True
print(f"{oauth_provider} 로그인 실패")
return False

View file

@ -0,0 +1,25 @@
from pathlib import Path
async def clean_resources(agent=None, session=None):
"""리소스를 정리하는 함수"""
storage_state_temp_path = Path("./data/storage_state_temp.json").resolve()
if storage_state_temp_path.exists():
try:
# remove file
print(f"🗑️ 임시 스토리지 상태 파일 삭제 중: {storage_state_temp_path}")
# unlink removes the file
storage_state_temp_path.unlink()
print("🗑️ 임시 스토리지 상태 파일 삭제 완료.")
except Exception as e:
print(f"⚠️ 임시 스토리지 상태 파일 삭제 실패: {e}")
if agent:
try:
await agent.close()
except Exception as e:
print(f"⚠️ 에이전트 리소스 정리 실패: {e}")
if session:
try:
await session.close()
except Exception as e:
print(f"⚠️ 세션 리소스 정리 실패: {e}")

View file

@ -0,0 +1,88 @@
import os
import json
from pathlib import Path
from dotenv import load_dotenv
from browser_use import BrowserProfile
import json
import os
# Load environment variables
load_dotenv(override=True)
async def setup_storage_state():
"""Setup browser storage state for session persistence."""
# Get the script directory to ensure correct path resolution
script_dir = Path(__file__).parent.parent.parent.parent
storage_state_path = script_dir / "data" / "storage_state.json"
storage_state_temp_path = script_dir / "data" / "storage_state_temp.json"
print(f"📂 Storage state path: {storage_state_path}")
print(f"📂 Temp storage state path: {storage_state_temp_path}")
if storage_state_path.exists():
try:
if storage_state_temp_path.exists():
storage_state_temp_path.unlink()
with open(storage_state_path, 'r') as f:
storage_data = json.load(f)
with open(storage_state_temp_path, 'w') as f:
json.dump(storage_data, f, indent=4)
print(f"🔄 Using existing storage state: {storage_state_temp_path}")
return str(storage_state_temp_path)
except Exception as e:
print(f"⚠️ Error processing storage state: {e}")
if storage_state_temp_path.exists():
storage_state_temp_path.unlink()
return None
print("⚠️ No existing storage state found")
return None
def setup_proxy():
"""Configure proxy settings from environment variables."""
proxy_host = os.getenv("PROXY_HOST")
proxy_port = os.getenv("PROXY_PORT")
if proxy_host and proxy_port:
proxy_url = f"http://{proxy_host}:{proxy_port}"
print(f"🔗 Using proxy: {proxy_host}:{proxy_port}")
return proxy_url
else:
print("🔗 No proxy configured, using direct connection.")
return None
def get_browser_args():
"""Get browser arguments for enhanced compatibility and security."""
return [
# Security and isolation
"--disable-web-security",
"--disable-site-isolation-trials",
"--disable-features=IsolateOrigins,site-per-process",
"--ignore-certificate-errors",
"--ignore-ssl-errors",
"--allow-running-insecure-content",
# Performance and rendering
"--disable-features=VizDisplayCompositor",
"--disable-dev-shm-usage",
# Popup and automation
"--disable-popup-blocking",
"--disable-blink-features=AutomationControlled",
# Browser behavior
"--no-first-run",
"--no-service-autorun",
"--no-default-browser-check",
"--password-store=basic",
"--use-mock-keychain",
# Extensions
"--disable-extensions-file-access-check",
"--disable-extensions-http-throttling",
"--disable-component-extensions-with-background-pages",
# Language
f"--lang={os.getenv('LANG', 'en_US')}",
]

View file

@ -0,0 +1,46 @@
import os
from lib.browser_use.func import *
# Initialize configuration
proxy_url = setup_proxy()
async def GetProfile():
storage_state_path = await setup_storage_state()
# Handle potential encoding issues with storage state file
try:
if storage_state_path and os.path.exists(storage_state_path):
# Test if file can be read properly, if not, skip it
with open(storage_state_path, 'r', encoding='utf-8') as f:
f.read()
storage_state = storage_state_path
else:
print("⚠️ Storage state file not found or inaccessible, proceeding without it.")
storage_state = None
except (UnicodeDecodeError, FileNotFoundError):
# If there's an encoding error, don't use the storage state
storage_state = None
profile = BrowserProfile(
# Security settings
disable_security=True,
stealth=True,
# Display settings
headless=False,
device_scale_factor=1,
window_size={"width": 1600, "height": 900},
viewport={"width": 1600, "height": 900},
# Data persistence
user_data_dir=None,
storage_state=storage_state,
# Network settings
proxy={"server": proxy_url} if proxy_url else None,
# Additional arguments
args=get_browser_args(),
)
return profile

View file

@ -0,0 +1,15 @@
from typing import List
from pydantic import BaseModel
# 출력 모델
class OAuth(BaseModel):
provider: str
oauth_uri: str = "" # OAuth 리스트 추출 단계에서는 URI가 없을 수 있음
class OAuthList(BaseModel):
oauth_providers: List[str] # 이제 문자열 배열로 변경
# 기존 모델 유지 (backward compatibility)
BaseModel = OAuthList

View file

@ -0,0 +1,106 @@
import asyncio
import os
import csv
from lib.utils import notify_backend, read_lines_between, is_html_url
from lib.browser_use.agents import extract_oauth_list, test_oauth_login
from lib.utils.progress import current_progress, load_progress, save_progress, progress_file
async def scan_one_url(url: str, skip_html_check: bool = False):
"""URL 스캔 통합 함수: OAuth 리스트 추출 → 개별 OAuth 로그인 시도"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🚀 스캔 시작: {target_url}")
# Backend에 스캔 시작을 알림
notify_backend(target_url)
# 1) URL이 HTML 페이지인지 확인
if not is_html_url(target_url) and not skip_html_check:
print(f"{target_url} 은(는) HTML이 아닙니다. 스킵합니다.")
return
# 1단계: OAuth 리스트 추출
oauth_entries = await extract_oauth_list(target_url)
if not oauth_entries:
print(f"{target_url}에서 OAuth 제공자를 찾을 수 없습니다.")
return
print("-" * 50)
print(f"🔗 스캔 URL: {url}")
print(f"🔐 발견된 OAuth 제공자들: {len(oauth_entries)}")
for entry in oauth_entries:
print(f" - {entry.provider}")
print("-" * 50)
# CSV에 OAuth 리스트 저장
csv_file = "./data/oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri", "login_tested"])
for entry in oauth_entries:
writer.writerow([url, entry.provider, "", "pending"])
# 2단계: 각 OAuth 제공자별로 개별 로그인 시도
for i, oauth_entry in enumerate(oauth_entries):
print(
f"\n🔄 OAuth 로그인 테스트 {i+1}/{len(oauth_entries)}: {oauth_entry.provider}"
)
# OAuth 간 대기 시간
if i > 0:
print("⏳ OAuth 테스트 간 대기 중 (30초)...")
await asyncio.sleep(30)
# 개별 OAuth 로그인 시도
success = await test_oauth_login(url, oauth_entry.provider)
# 결과를 CSV에 업데이트 (간단하게 로그만 남김)
status = "success" if success else "failed"
print(f"📝 {oauth_entry.provider} 로그인 결과: {status}")
async def main_loop(
filepath: str, start_line: int, end_line: int, skip_html_check: bool = False
):
"""지정된 URL 목록에 대해 스캔을 실행하는 메인 루프"""
target_list = read_lines_between(
filepath=filepath, start_line=start_line, end_line=end_line
)
current_progress["total"] = len(target_list)
current_progress["start_line"] = start_line
current_progress["current_index"] = 0
prev_progress = load_progress()
if prev_progress and prev_progress.get("start_line") == start_line:
print("📋 이전 진행 상황을 발견했습니다:")
print(f" - 이전 완료: {prev_progress['current_index']}/{prev_progress['total']}")
print(f" - 마지막 처리: {prev_progress.get('current_url', 'N/A')}")
resume = input("이어서 진행하시겠습니까? (y/n): ").lower().strip()
if resume == 'y':
start_index = prev_progress.get("current_index", 0)
current_progress["current_index"] = start_index
target_list = target_list[start_index:]
print(f"{start_index}번째부터 재개합니다.")
for i, url in enumerate(target_list):
actual_index = current_progress["current_index"] + i
current_progress["current_url"] = url
print(f"\n🔄 Processing {actual_index + 1}/{current_progress['total']}: {url}")
print(f"📍 {os.path.basename(filepath)}{start_line + actual_index}번째 줄")
if i > 0:
print("⏳ API 쿼터 보호를 위해 30초 대기 중...")
await asyncio.sleep(30)
await scan_one_url(url, skip_html_check=skip_html_check)
current_progress["current_index"] = actual_index + 1
save_progress()
print(f"\n🎉 모든 스캔이 완료되었습니다! ({current_progress['total']}개 URL)")

View file

@ -0,0 +1,21 @@
# read json file .sensitive.json
import json
import os
def GetSensitiveData():
"""
Reads sensitive data from a .sensitive.json file in the current directory.
Returns:
dict: A dictionary containing the sensitive data.
"""
file_path = os.path.join(os.getcwd(), '.sensitive.json')
if not os.path.exists(file_path):
return None
with open(file_path, 'r') as file:
sensitive_data = json.load(file)
return sensitive_data

3
src/lib/llm/__init__.py Normal file
View file

@ -0,0 +1,3 @@
from lib.llm.create import *
from lib.llm.prompt import *

25
src/lib/llm/create.py Normal file
View file

@ -0,0 +1,25 @@
from langchain.callbacks.base import BaseCallbackHandler
from langchain_google_genai import ChatGoogleGenerativeAI
class QuotaExhaustedHandler(BaseCallbackHandler):
def on_llm_error(self, error, **kwargs):
if "ResourceExhausted" in str(error) or "429" in str(error):
print("⚠️ API 쿼터가 소진되었습니다. 재시도 로직에 위임합니다...")
# backoff handled in scan_one_url
def CreateChatGoogleGenerativeAI(model: str):
"""재시도 로직이 포함된 LLM 생성"""
if model == "fallback":
print("⚠️ Fallback 모델을 사용합니다. Envorinment 변수를 확인하세요.")
print("⚠️ Model Gemini-2.0-flash-lite를 사용합니다.")
model = "gemini-2.0-flash-lite"
return ChatGoogleGenerativeAI(
model=model,
max_retries=10, # 최대 재시도 횟수 증가
model_kwargs={
"request_timeout": 120, # 타임아웃 시간 증가 (2분)
},
callbacks=[QuotaExhaustedHandler()],
# API 호출 간격 조정
temperature=0.0,
)

View file

@ -0,0 +1,25 @@
from typing import Union, Type
from pydantic import BaseModel
def get_prompt(type: str) -> tuple[str, Type[BaseModel]] | str:
"""
Prompt를 반환합니다.
:param type: 'auth' {Auth List} 또는 'google' {OAuth Provider}, 'meta' {OAuth Provider} 지정합니다.
:return: 해당하는 프롬프트 문자열 또는 (프롬프트, 모델) 튜플
"""
if type.lower() == "auth":
from lib.llm.prompt.get_oauth import prompt, model
return prompt, model
elif type.lower() in ["google", "google account"]:
from lib.llm.prompt.google import prompt, model
return prompt, model
elif type.lower() in ["microsoft", "microsoftonline"]:
from lib.llm.prompt.microsoft import prompt, model
return prompt, model
else:
from lib.llm.prompt.fallback import model, prompt
return prompt, model

View file

@ -0,0 +1,2 @@
from lib.llm.prompt.fallback.prompt import prompt
from lib.llm.prompt.fallback.model import model

View file

@ -0,0 +1,6 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = None # "success", "mfa_required", "google_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
final_url: str | None = None

View file

@ -0,0 +1,122 @@
# Extended planner prompt
prompt = f"""
🎯 목적: 자동화를 위한 **SSO 로그인 리디렉션 URL 수집**
📌 주의사항 (전제 조건)
- **검색 엔진(Google, Bing ) 사용 금지**
- **초기 제공된 URL 내에서만 탐색**
- 직접 이동하거나 추측한 링크 클릭 금지
- 추측한 URL은 대답하거나 클릭하지 마세요
- OAuth가 아닌 일반 로그인은 무시
- OAuth가 없다면 **즉시 중단**하고 배열 반환
---
## 🧩 Step 0: 페이지 차단(Block) 여부 확인
초기 URL의 로그인 페이지에 접근하여 다음 사항을 점검합니다:
- 🚫 페이지 차단됨 (Firewall, Access Denied ) 즉시 중단
- 🔒 CAPTCHA는 통과 가능 (해결하고 계속 진행)
- 로그인 UI가 정상적으로 로드되지 않으면 중단
📤 차단 즉시 종료
---
## 🔍 Step 1: 로그인 페이지 탐색
* 초기 URL에 접속하여 **클라이언트용 로그인 페이지** 진입합니다.
* 쿠키 동의, 개인정보 안내 팝업은 무시하거나 닫고 계속 진행하세요.
* 페이지가 정상 로드되었다고 가정합니다.
---
## 👀 Step 2: SSO 로그인 버튼 식별
아래 **OAuth SSO 버튼들만** 유효합니다:
* Google, GitHub, Facebook, LinkedIn, Microsoft, Naver
**유효한 버튼 기준**:
* OAuth 인증 흐름을 실제로 트리거
* `window.location` 또는 `<a href=...>` 또는 JS로 redirect가 발생
**제외 버튼들 (클릭 금지)**:
* 일반 로그인, 패스키, 이메일/전화번호, 인증서 기반, 비밀번호 입력
---
## ✅ Step 3: 모든 SSO 버튼 클릭 및 로그인 시도
> SSO 로그인 버튼을 클릭한 반드시 아래 절차를 **완전히 수행**해야 합니다.
SSO 버튼에 대해 다음을 수행:
1. 버튼 클릭
2. 로그인 진행:
- 로그인 페이지에서 OAuth 인증을 완료합니다.
- sign in with your username(email) x_username and password is x_password
- 버튼같은게 안눌리면 새로고침을 해봐
- **로그인 완료 authorize 버튼이 있으면 클릭**
- GitHub같은 경우 Authorize 버튼이 뜨는데 오래걸릴 있음, 기다려야 수도 있음
- 만약 버튼을 눌러도 반응이 없을 경우 새로고침을 한번 해주세요.
- **OAuth Flow가 완료되면 (callback URL 도달 또는 인증 완료) 즉시 작업 종료**
4. 로그인이 성공하면 모두 쿠키를 삭제하고 다음 SSO 버튼을 클릭합니다.
5. 다음 SSO 버튼으로 반복 진행
쿠키 삭제 방법:
chrome://settings/clearBrowserData에 들어가서 삭제해주세요.
🛑 절대 아래와 같이 해석하지 :
- 버튼 클릭 페이지 로딩만 기다리고 돌아가기
- URL 저장 없이 go_back() 호출
---
### ✨ 추가 안전 장치: "뒤로가기(go_back) 호출 조건" 제한
```text
🛑 뒤로가기(go_back) 다음 조건이 모두 충족될 때만 사용 => 다만 로그인 실패 , 뒤로가기 수행:
- 로그인 흐름이 완료됨 (: redirect back to app, or callback URL)
- 현재 리디렉션 URL이 수집됨
- 결과에 저장 다음 버튼 탐색을 위해 복귀 필요할
```
---
## 🚫 Step 4: 버튼 없음 또는 예외 발생 시
* 유효한 SSO 버튼이 **전혀 없을 경우**
* 예외, 오류 발생
-> 즉시 중단
---
최종 반환:
Return the result in the following format only:
```json
{{
"msg": "Google login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "google_blocked" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```
---
## 📎 중요 규칙 요약
* **모든 SSO 로그인은 반드시 실행** (가능한 버튼은 모두 클릭)
* 🔁 단계는 반드시 순서대로 진행
* 🔐 로그인은 쿠키/세션으로 유지된 상태에서 수행
* 👀 직접 OAuth Providor ID/PW를 입력하여도 가지고 있다면
* 추측한 URL은 접속하지 않음
---
"""

View file

@ -0,0 +1,2 @@
from lib.llm.prompt.get_oauth.prompt import prompt
from lib.llm.prompt.get_oauth.model import model

View file

@ -0,0 +1,6 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
url: str | None = None
sso_list: list[str] = [] # List of SSO providers found on the login page

View file

@ -0,0 +1,37 @@
prompt = """
🎯 목적: 주어진 초기 URL 내에서 **OAuth 로그인 Provider** 찾아 아래 형식의 JSON으로 정리합니다.
📌 작업 목표:
- Google, GitHub, Discord, Facebook, Apple, Microsoft, Twitter, LinkedIn **OAuth 인증을 사용하는 외부 로그인 링크**에서 Provider 이름만 모두 수집합니다.
- 로그인 버튼, 링크 클릭 등을 통해 탐색을 진행할 있습니다.
- **같은 provider가 여러 나와도 하나만 저장**합니다.
🛑 제한 사항:
- 로그인 입력창이나 이메일/비밀번호 입력 방식은 제외합니다.
- 검색 엔진, 사이트 외부 탐색은 금지합니다.
- URL 추측이나 직접 입력은 금지합니다.
- OAuth가 없는 경우 배열 `[]` 반환합니다.
- OAuth가 아닌 일반 로그인은 무시합니다.
🔍 탐색 방법:
1. 초기 URL에 접속하여 **클라이언트용 로그인 페이지** 진입합니다.
2. 페이지가 정상적으로 로드되었다고 가정합니다.
3. 'Continue with X', 'Continue with Google'... 등의 버튼이나 링크를 식별합니다.
🧾 출력 형식 (예시):
```json
{{
"oauth_providers": [
"Google",
"GitHub",
"Discord"
]
}}
```
📌 주의:
결과가 없는 경우 배열 `[]` 반환합니다.
정확한 provider 이름을 포함해 주세요.
"""

View file

@ -0,0 +1,2 @@
from lib.llm.prompt.google.prompt import prompt
from lib.llm.prompt.google.model import model

View file

@ -0,0 +1,6 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = None # "success", "mfa_required", "google_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
final_url: str | None = None

View file

@ -0,0 +1,58 @@
import os
# Extended planner prompt
prompt = f"""
You are a web automation agent.
Your task is to visit the given domain and perform a full login via the **Google SSO button**, following all steps strictly as described below.
Target: Find a login page inside this domain that allows "Sign in with Google", and use it to complete login via Google.
Instructions:
1. If any cookie or privacy popups appear, dismiss or accept them.
2. Navigate through the site's UI to find the **login or sign-in page** (e.g., via buttons like "Log In", "Sign In", "Get Started").
- Only follow links within the same domain.
3. On the login page, look for a clearly labeled **Google SSO button** typically labeled as:
- "Continue with Google"
- "Sign in with Google"
- or a button with the Google 'G' icon
4. Click the **Google login button**.
- The Google login flow MUST open in a **new browser tab** (not a new window or popup).
- If the login opens in a new **window** or **popup**, do NOT continue. Immediately stop and return the appropriate status.
5. Check if the user is **already logged in to Google and immediately redirected back to the original site** without showing a Google login screen.
- If so, treat the login as successful and return immediately.
6. If redirected to the Google login page:
- If a **CAPTCHA**, **MFA prompt**, or a request for **ID/password entry** appears, do NOT proceed.
- Immediately stop and return the appropriate status.
7. If login proceeds without interruptions, wait for redirection back to the original site and record the final URL.
Credentials to use for Google login:
- Email: {os.getenv("GOOGLE_EMAIL", "")}
- Password: {os.getenv("GOOGLE_PASSWORD", "")}
Constraints:
- Do NOT use search engines or guess URLs.
- Do NOT use autofill, saved sessions, or cookies.
- Do NOT proceed with login if:
- The login opens in a new window (only tabs are allowed)
- CAPTCHA or MFA appears
- ID/password input is required
- If the user is already logged in to Google and redirected back automatically, stop there and report success.
- If the login page cannot be found, return "login_page_not_found".
- If the Google login button is not found, return "sso_not_found".
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Final Output:
Return the result in the following format only:
```json
{{
"msg": "Google login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "google_blocked" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -0,0 +1,2 @@
from lib.llm.prompt.microsoft.prompt import prompt
from lib.llm.prompt.microsoft.model import model

View file

@ -0,0 +1,57 @@
import os
# This code snippet is used to generate a prompt for a web automation agent that performs Microsoft SSO login.
prompt = f"""
당신은 자동화 에이전트입니다.
당신의 임무는 주어진 도메인에 방문하여 아래에 엄격히 설명된 모든 단계를 따라 **Microsoft SSO 버튼** 통해 전체 로그인을 수행하는 것입니다.
목표: 도메인 내에서 "Microsoft로 로그인" 가능한 로그인 페이지를 찾아 Microsoft을 통해 로그인을 완료하세요.
지침:
1. 쿠키 또는 개인정보 팝업이 나타나면 닫거나 수락하세요.
2. 사이트의 UI를 탐색하여 **로그인 또는 로그인 페이지**(: "로그인", "Sign In", "Get Started" 같은 버튼) 찾으세요.
- 동일한 도메인 내의 링크만 따라가세요.
3. 로그인 페이지에서 명확하게 표시된 **Microsoft SSO 버튼** 찾으세요. 일반적으로 다음과 같이 표시됩니다:
- "Microsoft로 계속"
- "Microsoft로 로그인"
- 또는 Microsoft 'M' 아이콘이 있는 버튼
4. **Microsoft 로그인 버튼** 클릭하세요.
- Microsoft 로그인 플로우는 반드시 ** 브라우저 **에서 열려야 합니다 ( 창이나 팝업이 아님).
- 로그인이 ****이나 **팝업**에서 열리면, 즉시 중단하고 적절한 상태를 반환하세요.
5. 사용자가 **이미 Microsoft에 로그인되어 있고 즉시 원래 사이트로 리디렉션**된다면,
- 경우 로그인이 성공한 것으로 간주하고 즉시 반환하세요.
6. Microsoft 로그인 페이지로 리디렉션된 경우:
- **CAPTCHA**, **MFA 프롬프트**, 또는 **ID/비밀번호 입력** 요청이 나타나면 진행하지 마세요.
- 즉시 중단하고 적절한 상태를 반환하세요.
7. 로그인에 방해가 없다면, 원래 사이트로 리디렉션될 때까지 기다리고 최종 URL을 기록하세요.
Microsoft 로그인에 사용할 자격 증명:
- 이메일: {os.getenv("MICROSOFT_EMAIL", "")}
- 비밀번호: {os.getenv("MICROSOFT_PASSWORD", "")}
제약 사항:
- 검색 엔진을 사용하거나 URL을 추측하지 마세요.
- 자동완성, 저장된 세션 또는 쿠키를 사용하지 마세요.
- 다음과 같은 경우 로그인 절차를 진행하지 마세요:
- 로그인이 창에서 열릴 (탭만 허용)
- CAPTCHA 또는 MFA가 나타날
- ID/비밀번호 입력이 요구될
- 사용자가 이미 Microsoft에 로그인되어 자동으로 리디렉션된다면, 즉시 성공으로 보고 종료하세요.
- 로그인 페이지를 찾을 없으면 "login_page_not_found" 반환하세요.
- Microsoft 로그인 버튼을 찾을 없으면 "sso_not_found" 반환하세요.
- 회원가입 페이지와 같은 화면이 나타나면 성공적인 로그인으로 간주하고 즉시 종료하세요.
최종 출력:
다음 형식으로만 결과를 반환하세요:
```json
{{
"msg": "Microsoft login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "microsoft_blocked" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -0,0 +1,6 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = None # "success", "mfa_required", "microsoft_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
final_url: str | None = None

View file

@ -0,0 +1,7 @@
# export from show_info
from lib.utils.agent_info import *
from lib.utils.data import *
from lib.utils.config import *
from lib.utils.parsing.is_html import *
from lib.utils.parsing.read_txt import *

View file

@ -0,0 +1,58 @@
from lib.utils.config import (
BACKEND_URL,
GOOGLE_API_KEY,
GOOGLE_MODEL,
GOOGLE_PLANNER_MODEL,
)
import os
from dotenv import load_dotenv
load_dotenv(override=True)
def show_info():
print("🔧 환경 설정:")
print(browser_use_version())
print(f"🔗 Backend URL: {BACKEND_URL}")
print(
f"🔑 Google API Key: {'*' * (len(GOOGLE_API_KEY) - 4) + GOOGLE_API_KEY[-4:] if GOOGLE_API_KEY else None}"
)
print(f"🌐 Google Model: {GOOGLE_MODEL}")
print(f"🌐 Google Planner Model: {GOOGLE_PLANNER_MODEL}")
def browser_use_version():
try:
# run uv pip show browser-use
import subprocess
result = subprocess.run(
["uv", "pip", "show", "browser-use"],
capture_output=True,
text=True,
check=True,
)
print("📦 Browser Use 패키지 정보:")
return result.stdout.strip()
except ImportError:
return None
def env_cheker():
if GOOGLE_API_KEY is None:
raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.")
if GOOGLE_PLANNER_MODEL != None and (not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN") or not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LIST")):
print(
"⚠️ GOOGLE_PLANNER_MODEL이 설정되어 있지만, ENABLE_PLANNER_MODEL_OAUTH_LOGIN 또는 ENABLE_PLANNER_MODEL_OAUTH_LIST가 활성화되지 않았습니다."
)
print(
"⚠️ Planner 모델을 사용하려면 .env 파일에서 ENABLE_PLANNER_MODEL_OAUTH_LOGIN과 ENABLE_PLANNER_MODEL_OAUTH_LIST를 true로 설정하세요."
)
print(
"‼️ 하지만 현재 Planner 모델을 사용하는 것이 권장되지 않습니다. 이 기능은 오작동을 일으킬 수 있습니다."
)
print(
"⚠️ 이 경고는 1초동안 정지합니다."
)
# 이 경고는 1초동안 sleep
import time
time.sleep(1)

8
src/lib/utils/config.py Normal file
View file

@ -0,0 +1,8 @@
import os
from dotenv import load_dotenv
load_dotenv(verbose=True, override=True)
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:11081")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash")
GOOGLE_PLANNER_MODEL = os.getenv("GOOGLE_PLANNER_MODEL")

View file

@ -0,0 +1,2 @@
from lib.utils.data.backend_client import *
from lib.utils.data.logger import *

View file

@ -0,0 +1,22 @@
import requests
from lib.utils.config import BACKEND_URL
def notify_backend(target_url):
# Backend에 스캔 시작을 알림
try:
response = requests.post(
f"{BACKEND_URL}/start", params={"url": target_url}, timeout=5
)
if response.status_code == 200:
print(f"✅ Backend notified: {response.text}")
else:
print(f"⚠️ Backend notification failed: {response.status_code}")
except requests.exceptions.ConnectionError:
print(
f"⚠️ Backend server not available at {BACKEND_URL}. Continuing without notification."
)
except requests.exceptions.Timeout:
print(f"⚠️ Backend notification timed out. Continuing without notification.")
except Exception as e:
print(f"⚠️ Failed to notify backend: {e}")

View file

@ -0,0 +1,29 @@
from pathlib import Path
from datetime import datetime
# 미리 정해진 파일 경로
FILE_PATH = Path("data/log.txt")
def logger(msg: str) -> None:
try:
"""
msg 문자열을 파일 끝에 추가합니다.
- 파일이 없으면 새로 생성
- 디렉터리가 없으면 생성
"""
# 상위 디렉터리 생성 (이미 있으면 무시)
FILE_PATH.parent.mkdir(parents=True, exist_ok=True)
# 현재 시각 구해서 포맷팅
now = datetime.now()
timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
# 메시지에 개행이 없으면 자동으로 붙이기
newline = "" if msg.endswith("\n") else "\n"
line = f"[{timestamp}] {msg}{newline}"
# 'a' 모드: 파일이 없으면 생성, 있으면 이어쓰기
with FILE_PATH.open(mode="a", encoding="utf-8") as f:
f.write(line)
except:
print(msg)

View file

@ -0,0 +1,36 @@
import requests
def is_html_url(url: str, timeout: float = 10.0) -> bool:
"""
주어진 URL에 HEAD 요청을 보내고, 응답 헤더의 Content-Type이 HTML인지 확인합니다.
- url: 검사할 URL 문자열
- timeout: 요청 타임아웃( 단위)
반환값:
- Content-Type이 'text/html' 시작하면 True, 그렇지 않으면 False
"""
try:
with requests.get(url, timeout=timeout, stream=True) as response:
# 응답 코드가 200번대가 아니면 False로 간주
if not response.ok:
return False
content_type = response.headers.get('Content-Type', '')
# Content-Type에 'text/html'이 포함되어 있으면 HTML로 간주
return content_type.lower().startswith('text/html')
except requests.RequestException:
return False
if __name__ == '__main__':
test_urls = [
'https://www.example.com',
'https://api.github.com', # JSON API라서 HTML이 아닐 확률이 높음
'https://raw.githubusercontent.com' # 텍스트 파일 등 다양한 타입
]
for url in test_urls:
if is_html_url(url):
print(f"[HTML] {url}")
else:
print(f"[Not HTML] {url}")

View file

@ -0,0 +1,36 @@
def read_lines_between(filepath: str, start_line: int, end_line: int) -> list[str]:
"""
파일에서 start_line번 줄부터 end_line번 줄까지 읽어와
줄을 요소로 갖는 리스트를 반환하는 함수.
Parameters:
----------
filepath : str
읽을 텍스트 파일의 경로
start_line : int
읽기 시작할 번호 (1부터 시작)
end_line : int
읽을 마지막 번호 (start_line <= end_line)
Returns:
-------
list[str]
줄을 문자열로 저장한 리스트.
파일에 해당 범위의 줄이 없으면 가능한 만큼만 반환.
"""
if start_line < 1 or end_line < start_line:
raise ValueError("start_line은 1 이상이어야 하며, end_line은 start_line 이상이어야 합니다.")
selected_lines: list[str] = []
with open(filepath, 'r', encoding='utf-8') as f:
for idx, line in enumerate(f, start=1):
if idx < start_line:
# 아직 읽기 시작 전
continue
if idx > end_line:
# 읽을 범위를 벗어났으므로 중단
break
# 줄 끝의 개행 문자를 제거하고 리스트에 추가
selected_lines.append(line.rstrip('\n'))
return selected_lines

48
src/lib/utils/progress.py Normal file
View file

@ -0,0 +1,48 @@
import json
import os
import signal
from pathlib import Path
# 진행 상황 추적을 위한 전역 변수
current_progress = {"current_index": 0, "total": 0, "current_url": "", "start_line": 0}
progress_file = Path("data/scan_progress.json")
def save_progress():
"""현재 진행 상황을 파일에 저장"""
progress_file.parent.mkdir(parents=True, exist_ok=True)
with open(progress_file, "w", encoding="utf-8") as f:
json.dump(current_progress, f, ensure_ascii=False, indent=2)
def load_progress():
"""이전 진행 상황을 파일에서 불러오기"""
if os.path.exists(progress_file):
try:
with open(progress_file, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return None
return None
def signal_handler(signum, frame):
"""Ctrl+C 시그널 핸들러"""
print("\n" + "=" * 60)
print("🛑 스캔이 중단되었습니다!")
print(f"📊 진행 상황:")
print(f" - 전체: {current_progress['total']}개 URL")
print(f" - 완료: {current_progress['current_index']}개 URL")
print(f" - 현재 처리 중: {current_progress['current_url']}")
print(
f" - domains.txt의 {current_progress['start_line'] + current_progress['current_index']}번째 줄"
)
if current_progress['total'] > 0:
print(
f" - 진행률: {current_progress['current_index']}/{current_progress['total']} ({current_progress['current_index']/current_progress['total']*100:.1f}%)"
)
print("=" * 60)
save_progress()
print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.")
exit(0)
def setup_signal_handler():
"""시그널 핸들러 등록"""
signal.signal(signal.SIGINT, signal_handler)

79
src/main.py Normal file
View file

@ -0,0 +1,79 @@
import asyncio
import argparse
import os
from dotenv import load_dotenv
from lib.utils import env_cheker
from lib.browser_use.scanner import main_loop
from lib.utils.progress import setup_signal_handler, progress_file
# .env 파일 로드
load_dotenv(verbose=True, override=True)
# 환경 변수 체크
env_cheker()
# Laminar 초기화 (선택적)
if os.getenv("LMNR_PROJECT_API_KEY"):
try:
from lmnr import Laminar
Laminar.initialize(project_api_key=os.getenv("LMNR_PROJECT_API_KEY"))
except ImportError:
print("⚠️ Laminar 라이브러리가 설치되지 않았습니다. 관련 기능이 비활성화됩니다.")
def main():
"""애플리케이션 메인 진입점"""
# 시그널 핸들러 설정
setup_signal_handler()
parser = argparse.ArgumentParser(
prog="domain_scanner",
description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다.",
)
parser.add_argument(
"-f",
"--file",
type=str,
required=True,
help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)",
)
parser.add_argument(
"-s", "--start", type=int, required=True, help="읽기 시작 줄 번호 (1-based)"
)
parser.add_argument(
"-e", "--end", type=int, required=True, help="읽기 종료 줄 번호 (1-based)"
)
parser.add_argument(
"-skh",
"--skip-html-check",
action='store_true', # 플래그 형식으로 변경
help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다.",
)
args = parser.parse_args()
try:
asyncio.run(
main_loop(
filepath=args.file,
start_line=args.start,
end_line=args.end,
skip_html_check=args.skip_html_check,
)
)
except KeyboardInterrupt:
# signal_handler가 처리하므로 여기서는 별도 처리 불필요
pass
finally:
# 정상 종료 시 진행 상황 파일 삭제
if os.path.exists(progress_file):
try:
os.remove(progress_file)
except OSError as e:
print(f"오류: 진행 상황 파일을 삭제하지 못했습니다. {e}")
if __name__ == "__main__":
main()