mirror of
https://github.com/j93es/browser-use-oauth.git
synced 2026-06-04 22:51:27 +09:00
[Refactor] 환경 설정 및 API 재시도 로직 개선, 불필요한 파일 삭제
This commit is contained in:
parent
00558603a7
commit
8df6811cd0
5 changed files with 75 additions and 429 deletions
|
|
@ -5,6 +5,10 @@ GOOGLE_API_KEY=
|
||||||
GOOGLE_MODEL=gemini-2.5-flash-preview-05-20
|
GOOGLE_MODEL=gemini-2.5-flash-preview-05-20
|
||||||
GOOGLE_PLANNER_MODEL=gemini-2.5-flash-preview-05-20
|
GOOGLE_PLANNER_MODEL=gemini-2.5-flash-preview-05-20
|
||||||
|
|
||||||
|
# min(INITIAL_BACKOFF * (2 ** try_cnt), MAX_BACKOFF)만큼 API가 실패시 대기합니다.
|
||||||
|
INITIAL_BACKOFF=60
|
||||||
|
MAX_BACKOFF=600
|
||||||
|
|
||||||
# 선택
|
# 선택
|
||||||
PROXY_HOST=127.0.0.1
|
PROXY_HOST=127.0.0.1
|
||||||
PROXY_PORT=11080
|
PROXY_PORT=11080
|
||||||
|
|
|
||||||
|
|
@ -47,6 +47,12 @@ Environment는 .env.example에 따라 설정되어야합니다.
|
||||||
|
|
||||||
.env.example을 .env로 복사하여서 사용해주세요.
|
.env.example을 .env로 복사하여서 사용해주세요.
|
||||||
|
|
||||||
|
# 쿠키와 로컬 스토리지 설정 방법
|
||||||
|
|
||||||
|
```sh
|
||||||
|
playwright open https://google.com/ --save-storage=./data/storage_state.json
|
||||||
|
```
|
||||||
|
|
||||||
# 실행
|
# 실행
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
|
|
|
||||||
22
login.py
22
login.py
|
|
@ -1,22 +0,0 @@
|
||||||
from playwright.sync_api import sync_playwright
|
|
||||||
|
|
||||||
def launch_browser_with_profile():
|
|
||||||
p = sync_playwright().start()
|
|
||||||
browser = p.chromium.launch_persistent_context(
|
|
||||||
user_data_dir="./data/user_data",
|
|
||||||
headless=False
|
|
||||||
)
|
|
||||||
page = browser.new_page()
|
|
||||||
return browser, page, p
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
browser, page, playwright = launch_browser_with_profile()
|
|
||||||
page.goto("https://google.com")
|
|
||||||
print("Browser launched with user data profile.")
|
|
||||||
|
|
||||||
# 브라우저가 열린 상태를 유지
|
|
||||||
input("Press Enter to close the browser...")
|
|
||||||
|
|
||||||
# 리소스 정리
|
|
||||||
browser.close()
|
|
||||||
playwright.stop()
|
|
||||||
124
main.py
124
main.py
|
|
@ -22,7 +22,11 @@ from lib.read_txt import read_lines_between
|
||||||
from lib.prompt import extend_planner_system_message
|
from lib.prompt import extend_planner_system_message
|
||||||
from lib.logger import logger
|
from lib.logger import logger
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv(verbose=True, override=True)
|
||||||
|
|
||||||
|
# Exponential backoff settings
|
||||||
|
INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", "60")) # seconds
|
||||||
|
MAX_BACKOFF = int(os.getenv("MAX_BACKOFF", "600")) # seconds
|
||||||
|
|
||||||
if os.getenv("GOOGLE_API_KEY") is None:
|
if os.getenv("GOOGLE_API_KEY") is None:
|
||||||
raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.")
|
raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.")
|
||||||
|
|
@ -33,13 +37,19 @@ if os.getenv("GOOGLE_PLANNER_MODEL") is None:
|
||||||
|
|
||||||
backend_url = os.getenv("BACKEND_URL", "http://localhost:11081")
|
backend_url = os.getenv("BACKEND_URL", "http://localhost:11081")
|
||||||
|
|
||||||
|
print("🔧 환경 설정:")
|
||||||
|
print(f"🔗 Backend URL: {backend_url}")
|
||||||
|
api_key = os.getenv('GOOGLE_API_KEY')
|
||||||
|
print(f"🔑 Google API Key: {api_key[-4:] if api_key else None}")
|
||||||
|
print(f"🌐 Google Model: {os.getenv('GOOGLE_MODEL')}")
|
||||||
|
print(f"🌐 Google Planner Model: {os.getenv('GOOGLE_PLANNER_MODEL')}")
|
||||||
|
|
||||||
# API 쿼터 처리를 위한 콜백 핸들러
|
# API 쿼터 처리를 위한 콜백 핸들러
|
||||||
class QuotaExhaustedHandler(BaseCallbackHandler):
|
class QuotaExhaustedHandler(BaseCallbackHandler):
|
||||||
def on_llm_error(self, error, **kwargs):
|
def on_llm_error(self, error, **kwargs):
|
||||||
if "ResourceExhausted" in str(error) or "429" in str(error):
|
if "ResourceExhausted" in str(error) or "429" in str(error):
|
||||||
print("⚠️ API 쿼터가 소진되었습니다. 120초 대기 후 재시도합니다...")
|
print("⚠️ API 쿼터가 소진되었습니다. 재시도 로직에 위임합니다...")
|
||||||
time.sleep(120)
|
# backoff handled in scan_one_url
|
||||||
|
|
||||||
|
|
||||||
def CreateChatGoogleGenerativeAI(model: str):
|
def CreateChatGoogleGenerativeAI(model: str):
|
||||||
|
|
@ -112,6 +122,8 @@ async def scan_one_url(url: str, skip_html_check: bool = False):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"⚠️ Failed to notify backend: {e}")
|
print(f"⚠️ Failed to notify backend: {e}")
|
||||||
|
|
||||||
|
agent = None
|
||||||
|
session = None
|
||||||
try_cnt = 0
|
try_cnt = 0
|
||||||
while True:
|
while True:
|
||||||
proxy_host = os.getenv("PROXY_HOST")
|
proxy_host = os.getenv("PROXY_HOST")
|
||||||
|
|
@ -127,12 +139,16 @@ async def scan_one_url(url: str, skip_html_check: bool = False):
|
||||||
user_data_path = Path("./data/user_data").resolve()
|
user_data_path = Path("./data/user_data").resolve()
|
||||||
user_data_path.mkdir(parents=True, exist_ok=True)
|
user_data_path.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
storage_state_path = Path("./data/storage_state.json").resolve()
|
||||||
|
|
||||||
# BrowserProfile에 모든 설정 포함
|
# BrowserProfile에 모든 설정 포함
|
||||||
profile = BrowserProfile(
|
profile = BrowserProfile(
|
||||||
disable_security=True,
|
disable_security=True,
|
||||||
stealth=True,
|
stealth=True,
|
||||||
headless=False,
|
headless=False,
|
||||||
user_data_dir=str(user_data_path),
|
#user_data_dir=str(user_data_path),
|
||||||
|
user_data_dir=None,
|
||||||
|
storage_state=str(storage_state_path) if storage_state_path.exists() else None,
|
||||||
viewport={"width": 1600, "height": 900},
|
viewport={"width": 1600, "height": 900},
|
||||||
# 프록시 설정
|
# 프록시 설정
|
||||||
proxy={"server": proxy_url} if proxy_url else None,
|
proxy={"server": proxy_url} if proxy_url else None,
|
||||||
|
|
@ -156,112 +172,76 @@ async def scan_one_url(url: str, skip_html_check: bool = False):
|
||||||
browser_profile=profile,
|
browser_profile=profile,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Agent 생성
|
# Agent 생성 및 실행 (단일 try-except with 백오프)
|
||||||
initial_actions = [
|
initial_actions = [{"open_tab": {"url": target_url}}]
|
||||||
{"open_tab": {"url": target_url}},
|
|
||||||
]
|
|
||||||
|
|
||||||
controller = Controller(output_model=OAuthList)
|
controller = Controller(output_model=OAuthList)
|
||||||
|
print("🤖 LLM 모델 초기화 및 스캔 시작...")
|
||||||
# API 쿼터 문제 해결을 위한 LLM 생성
|
|
||||||
print("🤖 LLM 모델 초기화 중...")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
agent = Agent(
|
agent = Agent(
|
||||||
browser_session=session,
|
browser_session=session,
|
||||||
initial_actions=initial_actions,
|
initial_actions=initial_actions,
|
||||||
task=f"Navigate to the login page, and collect the OAuth provider buttons and their login URLs. Ignore Passkey.",
|
task="Navigate to the login page, and collect the OAuth provider buttons and their login URLs. Ignore Passkey.",
|
||||||
llm=CreateChatGoogleGenerativeAI(
|
llm=CreateChatGoogleGenerativeAI(os.getenv("GOOGLE_MODEL") or "fallback"),
|
||||||
os.getenv("GOOGLE_MODEL") or "fallback"
|
planner_llm=CreateChatGoogleGenerativeAI(os.getenv("GOOGLE_PLANNER_MODEL") or "fallback"),
|
||||||
),
|
|
||||||
planner_llm=CreateChatGoogleGenerativeAI(
|
|
||||||
os.getenv("GOOGLE_PLANNER_MODEL") or "fallback"
|
|
||||||
),
|
|
||||||
controller=controller,
|
controller=controller,
|
||||||
extend_planner_system_message=extend_planner_system_message,
|
extend_planner_system_message=extend_planner_system_message,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
|
||||||
print(f"⚠️ Agent 생성 실패: {e}")
|
|
||||||
# API 쿼터 문제일 경우 더 긴 대기
|
|
||||||
if "ResourceExhausted" in str(e) or "429" in str(e):
|
|
||||||
print("⚠️ API 쿼터 문제로 인한 Agent 생성 실패. 5분 대기 후 재시도...")
|
|
||||||
await asyncio.sleep(300)
|
|
||||||
await clean_resources(agent=None, session=session)
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
# 4) 실제 스캔 실행
|
|
||||||
print("🔍 스캔 시작...")
|
|
||||||
response = await agent.run()
|
response = await agent.run()
|
||||||
final_result = response.final_result()
|
final_result = response.final_result()
|
||||||
if final_result is None:
|
if final_result is None:
|
||||||
raise ValueError("final_result()가 None을 반환했습니다.")
|
raise ValueError("final_result()가 None을 반환했습니다.")
|
||||||
|
except Exception as e:
|
||||||
|
await clean_resources(agent, session)
|
||||||
|
# API 쿼터 문제인지 확인
|
||||||
|
if "ResourceExhausted" in str(e) or "429" in str(e):
|
||||||
|
wait = min(INITIAL_BACKOFF * (2 ** try_cnt), MAX_BACKOFF)
|
||||||
|
print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...")
|
||||||
|
await asyncio.sleep(wait)
|
||||||
|
try_cnt += 1
|
||||||
|
if try_cnt >= 3:
|
||||||
|
print(f"❌ {url} 스캔 실패: API 쿼터 문제가 지속됩니다.")
|
||||||
|
logger(f"❌ {url} 스캔 실패: API 쿼터 문제: {e}")
|
||||||
|
return
|
||||||
|
continue
|
||||||
|
# 일반 에러 처리
|
||||||
|
try_cnt += 1
|
||||||
|
if try_cnt >= 3:
|
||||||
|
print(f"❌ {url} 스캔 실패: 에러: {e}")
|
||||||
|
logger(f"❌ {url} 스캔 실패: 에러: {e}")
|
||||||
|
return
|
||||||
|
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
|
||||||
|
await asyncio.sleep(30)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 스캔 결과 처리
|
||||||
data = json.loads(final_result)
|
data = json.loads(final_result)
|
||||||
try:
|
try:
|
||||||
oauth_entries: List[OAuth] = [
|
oauth_entries = [OAuth(**entry) for entry in data["oauth_providers"]]
|
||||||
OAuth(**entry) for entry in data["oauth_providers"]
|
|
||||||
]
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise ValueError(f"결과 파싱 실패: {e}\n원본 결과: {final_result}")
|
raise ValueError(f"결과 파싱 실패: {e}\n원본 결과: {final_result}")
|
||||||
|
|
||||||
# 5) 결과 출력
|
|
||||||
print("-" * 50)
|
print("-" * 50)
|
||||||
print(f"🔗 Scanned URL: {url}\n")
|
print(f"🔗 Scanned URL: {url}\n")
|
||||||
print("🔐 Detected OAuth Providers and URLs:")
|
print("🔐 Detected OAuth Providers and URLs:")
|
||||||
for entry in oauth_entries:
|
for entry in oauth_entries:
|
||||||
if "<" in entry.oauth_uri or "..." in entry.oauth_uri:
|
if "<" in entry.oauth_uri or "..." in entry.oauth_uri:
|
||||||
print(
|
print(f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n")
|
||||||
f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n"
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
print(f"- {entry.provider}: {entry.oauth_uri}")
|
print(f"- {entry.provider}: {entry.oauth_uri}")
|
||||||
print("-" * 50)
|
print("-" * 50)
|
||||||
|
|
||||||
# 6) CSV에 저장 (append)
|
# CSV에 저장 (append)
|
||||||
csv_file = "./oauth_providers.csv"
|
csv_file = "./oauth_providers.csv"
|
||||||
file_exists = os.path.isfile(csv_file)
|
file_exists = os.path.isfile(csv_file)
|
||||||
with open(csv_file, "a", newline="", encoding="utf-8") as f:
|
with open(csv_file, "a", newline="", encoding="utf-8") as f:
|
||||||
writer = csv.writer(f)
|
writer = csv.writer(f)
|
||||||
if not file_exists:
|
if not file_exists:
|
||||||
writer.writerow(["issuer", "provider", "oauth_uri"])
|
writer.writerow(["issuer", "provider", "oauth_uri"])
|
||||||
|
|
||||||
# 실제 데이터 저장
|
|
||||||
for entry in oauth_entries:
|
for entry in oauth_entries:
|
||||||
writer.writerow([url, entry.provider, entry.oauth_uri])
|
writer.writerow([url, entry.provider, entry.oauth_uri])
|
||||||
await clean_resources(agent, session)
|
await clean_resources(agent, session)
|
||||||
|
|
||||||
# 성공적으로 처리했으므로 반복문 탈출
|
|
||||||
break
|
break
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
await clean_resources(agent, session)
|
|
||||||
|
|
||||||
# API 쿼터 문제인지 확인
|
|
||||||
if "ResourceExhausted" in str(e) or "429" in str(e):
|
|
||||||
print(f"⚠️ API 쿼터 소진 에러: {e}")
|
|
||||||
print("⚠️ 5분 대기 후 재시도합니다...")
|
|
||||||
await asyncio.sleep(300) # 5분 대기
|
|
||||||
try_cnt += 1
|
|
||||||
if try_cnt >= 3:
|
|
||||||
print(f"❌ {url} 스캔에 실패했습니다. API 쿼터 문제가 지속됩니다.")
|
|
||||||
logger(f"❌ {url} 스캔에 실패했습니다. API 쿼터 문제: {e}")
|
|
||||||
return
|
|
||||||
continue
|
|
||||||
|
|
||||||
if try_cnt >= 2:
|
|
||||||
print(f"❌ {url} 스캔에 실패했습니다. 에러: {e}")
|
|
||||||
logger(f"❌ {url} 스캔에 실패했습니다. 에러: {e}")
|
|
||||||
return
|
|
||||||
|
|
||||||
try_cnt += 1
|
|
||||||
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
|
|
||||||
|
|
||||||
# 일반 에러의 경우 짧게 대기
|
|
||||||
await asyncio.sleep(30)
|
|
||||||
# 반복문을 통해 재시도
|
|
||||||
continue
|
|
||||||
|
|
||||||
|
|
||||||
async def loop(
|
async def loop(
|
||||||
filepath: str, start_line: int, end_line: int, skip_html_check: bool = False
|
filepath: str, start_line: int, end_line: int, skip_html_check: bool = False
|
||||||
|
|
|
||||||
322
main.py.bak
322
main.py.bak
|
|
@ -1,322 +0,0 @@
|
||||||
import asyncio
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import csv
|
|
||||||
import argparse
|
|
||||||
import requests
|
|
||||||
import time
|
|
||||||
from typing import List
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
from pydantic import BaseModel
|
|
||||||
from langchain_google_genai import ChatGoogleGenerativeAI
|
|
||||||
from langchain.callbacks.base import BaseCallbackHandler
|
|
||||||
from browser_use import (
|
|
||||||
Agent,
|
|
||||||
Browser,
|
|
||||||
BrowserConfig,
|
|
||||||
BrowserSession,
|
|
||||||
BrowserProfile,
|
|
||||||
Controller,
|
|
||||||
)
|
|
||||||
from browser_use.browser.context import BrowserContext, BrowserContextConfig
|
|
||||||
from playwright.async_api import async_playwright
|
|
||||||
|
|
||||||
# from lib import browser_config
|
|
||||||
# from lib.browser_config import browser_config_kwargs
|
|
||||||
from lib.is_html import is_html_url
|
|
||||||
from lib.read_txt import read_lines_between
|
|
||||||
from lib.prompt import extend_planner_system_message
|
|
||||||
from lib.logger import logger
|
|
||||||
|
|
||||||
load_dotenv()
|
|
||||||
|
|
||||||
if os.getenv("GOOGLE_API_KEY") is None:
|
|
||||||
raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.")
|
|
||||||
if os.getenv("GOOGLE_MODEL") is None:
|
|
||||||
raise ValueError("GOOGLE_MODEL 환경변수가 설정되지 않았습니다.")
|
|
||||||
if os.getenv("GOOGLE_PLANNER_MODEL") is None:
|
|
||||||
raise ValueError("GOOGLE_PLANNER_MODEL 환경변수가 설정되지 않았습니다.")
|
|
||||||
|
|
||||||
backend_url = os.getenv("BACKEND_URL", "http://localhost:11081")
|
|
||||||
|
|
||||||
|
|
||||||
# API 쿼터 처리를 위한 콜백 핸들러
|
|
||||||
class QuotaExhaustedHandler(BaseCallbackHandler):
|
|
||||||
def on_llm_error(self, error, **kwargs):
|
|
||||||
if "ResourceExhausted" in str(error) or "429" in str(error):
|
|
||||||
print("⚠️ API 쿼터가 소진되었습니다. 60초 대기 후 재시도합니다...")
|
|
||||||
time.sleep(60)
|
|
||||||
|
|
||||||
|
|
||||||
def create_llm_with_retry():
|
|
||||||
"""재시도 로직이 포함된 LLM 생성"""
|
|
||||||
return ChatGoogleGenerativeAI(
|
|
||||||
model=os.getenv("GOOGLE_MODEL"),
|
|
||||||
max_retries=5, # 최대 재시도 횟수 증가
|
|
||||||
request_timeout=120, # 타임아웃 시간 증가
|
|
||||||
callbacks=[QuotaExhaustedHandler()],
|
|
||||||
# API 호출 간격 조정
|
|
||||||
temperature=0.1,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def create_planner_llm_with_retry():
|
|
||||||
"""플래너용 재시도 로직이 포함된 LLM 생성"""
|
|
||||||
return ChatGoogleGenerativeAI(
|
|
||||||
model=os.getenv("GOOGLE_PLANNER_MODEL"),
|
|
||||||
max_retries=5, # 최대 재시도 횟수 증가
|
|
||||||
request_timeout=120, # 타임아웃 시간 증가
|
|
||||||
callbacks=[QuotaExhaustedHandler()],
|
|
||||||
# API 호출 간격 조정
|
|
||||||
temperature=0.1,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# 출력 모델
|
|
||||||
class OAuth(BaseModel):
|
|
||||||
provider: str
|
|
||||||
oauth_uri: str
|
|
||||||
|
|
||||||
|
|
||||||
class OAuthList(BaseModel):
|
|
||||||
oauth_providers: List[OAuth]
|
|
||||||
|
|
||||||
|
|
||||||
async def clean_resources(agent, session, browser, playwright):
|
|
||||||
"""리소스를 정리하는 함수"""
|
|
||||||
try:
|
|
||||||
await agent.close()
|
|
||||||
except Exception as e:
|
|
||||||
print(f"⚠️ 에이전트 리소스 정리 실패: {e}")
|
|
||||||
try:
|
|
||||||
await session.close()
|
|
||||||
except Exception as e:
|
|
||||||
print(f"⚠️ 세션 리소스 정리 실패: {e}")
|
|
||||||
try:
|
|
||||||
await browser.close()
|
|
||||||
except Exception as e:
|
|
||||||
print(f"⚠️ 브라우저 리소스 정리 실패: {e}")
|
|
||||||
try:
|
|
||||||
await playwright.stop()
|
|
||||||
except Exception as e:
|
|
||||||
print(f"⚠️ Playwright 리소스 정리 실패: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
# ── URL별로 Browser를 새로 띄우는 함수 ──
|
|
||||||
async def scan_one_url(url: str, skip_html_check: bool = False):
|
|
||||||
target_url = url if url.startswith("http") else f"https://{url}"
|
|
||||||
print(f"🚀 Starting scan for: {target_url}")
|
|
||||||
|
|
||||||
# 1) URL이 HTML 페이지인지 확인
|
|
||||||
if not is_html_url(target_url) and not skip_html_check:
|
|
||||||
print(f"❌ {target_url} 은(는) HTML이 아닙니다. 스킵합니다.")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Backend에 스캔 시작을 알림
|
|
||||||
try:
|
|
||||||
response = requests.post(
|
|
||||||
f"{backend_url}/start", params={"url": target_url}, timeout=5
|
|
||||||
)
|
|
||||||
if response.status_code == 200:
|
|
||||||
print(f"✅ Backend notified: {response.text}")
|
|
||||||
else:
|
|
||||||
print(f"⚠️ Backend notification failed: {response.status_code}")
|
|
||||||
except requests.exceptions.ConnectionError:
|
|
||||||
print(
|
|
||||||
f"⚠️ Backend server not available at {backend_url}. Continuing without notification."
|
|
||||||
)
|
|
||||||
except requests.exceptions.Timeout:
|
|
||||||
print(f"⚠️ Backend notification timed out. Continuing without notification.")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"⚠️ Failed to notify backend: {e}")
|
|
||||||
|
|
||||||
try_cnt = 0
|
|
||||||
while True:
|
|
||||||
proxy_host = os.getenv("PROXY_HOST")
|
|
||||||
proxy_port = os.getenv("PROXY_PORT")
|
|
||||||
proxy_url = None
|
|
||||||
if proxy_host and proxy_port:
|
|
||||||
proxy_url = f"http://{proxy_host}:{proxy_port}"
|
|
||||||
print(f"🔗 Using proxy: {proxy_host}:{proxy_port}")
|
|
||||||
else:
|
|
||||||
print("🔗 No proxy configured, using direct connection.")
|
|
||||||
break
|
|
||||||
|
|
||||||
# 2) Browser + Context 생성
|
|
||||||
playwright = await async_playwright().start()
|
|
||||||
browser = await playwright.chromium.launch(
|
|
||||||
proxy={"server": proxy_url} if proxy_url else None,
|
|
||||||
headless=False, # headless 모드 사용 여부
|
|
||||||
args=[
|
|
||||||
"--disable-web-security",
|
|
||||||
"--disable-features=VizDisplayCompositor",
|
|
||||||
"--disable-site-isolation-trials",
|
|
||||||
"--disable-features=IsolateOrigins,site-per-process",
|
|
||||||
"--disable-popup-blocking",
|
|
||||||
"--disable-dev-shm-usage",
|
|
||||||
f"--lang=" + os.getenv("LANG", "en_US"),
|
|
||||||
"--ignore-certificate-errors",
|
|
||||||
"--ignore-ssl-errors",
|
|
||||||
"--allow-running-insecure-content",
|
|
||||||
"--restore-last-session"
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
os.makedirs("./data", exist_ok=True)
|
|
||||||
|
|
||||||
profile = BrowserProfile(
|
|
||||||
stealth=True,
|
|
||||||
headless=False, # headless 모드 사용 여부
|
|
||||||
user_data_dir="./data/user_data",
|
|
||||||
viewport={"width": 1600, "height": 900},
|
|
||||||
)
|
|
||||||
|
|
||||||
# BrowserSession 생성 시 headless 옵션을 명시적으로 설정
|
|
||||||
context = await browser.new_context()
|
|
||||||
|
|
||||||
session = BrowserSession(
|
|
||||||
browser_context=await browser.new_context(),
|
|
||||||
)
|
|
||||||
|
|
||||||
# 3) Agent, Controller 생성
|
|
||||||
initial_actions = [
|
|
||||||
{"open_tab": {"url": target_url}},
|
|
||||||
] controller = Controller(output_model=OAuthList)
|
|
||||||
|
|
||||||
# API 쿼터 문제 해결을 위한 LLM 생성
|
|
||||||
print("🤖 LLM 모델 초기화 중...")
|
|
||||||
|
|
||||||
agent = Agent(
|
|
||||||
browser_session=session,
|
|
||||||
browser_profile=profile,
|
|
||||||
browser_context=context,
|
|
||||||
|
|
||||||
initial_actions=initial_actions,
|
|
||||||
task=f"Navigate to the login page, and collect the OAuth provider buttons and their login URLs. Ignore Passkey.",
|
|
||||||
|
|
||||||
llm=create_llm_with_retry(),
|
|
||||||
planner_llm=create_planner_llm_with_retry(),
|
|
||||||
controller=controller,
|
|
||||||
extend_planner_system_message=extend_planner_system_message,
|
|
||||||
retry_delay=120, # 재시도 간격을 2분으로 증가
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
# 4) 실제 스캔 실행
|
|
||||||
response = await agent.run()
|
|
||||||
final_result = response.final_result()
|
|
||||||
if final_result is None:
|
|
||||||
raise ValueError("final_result()가 None을 반환했습니다.")
|
|
||||||
|
|
||||||
data = json.loads(final_result)
|
|
||||||
try:
|
|
||||||
oauth_entries: List[OAuth] = [
|
|
||||||
OAuth(**entry) for entry in data["oauth_providers"]
|
|
||||||
]
|
|
||||||
except Exception as e:
|
|
||||||
raise ValueError(f"결과 파싱 실패: {e}\n원본 결과: {final_result}")
|
|
||||||
|
|
||||||
# 5) 결과 출력
|
|
||||||
print("-" * 50)
|
|
||||||
print(f"🔗 Scanned URL: {url}\n")
|
|
||||||
print("🔐 Detected OAuth Providers and URLs:")
|
|
||||||
for entry in oauth_entries:
|
|
||||||
if "<" in entry.oauth_uri or "..." in entry.oauth_uri:
|
|
||||||
print(
|
|
||||||
f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
print(f"- {entry.provider}: {entry.oauth_uri}")
|
|
||||||
print("-" * 50)
|
|
||||||
|
|
||||||
# 6) CSV에 저장 (append)
|
|
||||||
csv_file = "./oauth_providers.csv"
|
|
||||||
file_exists = os.path.isfile(csv_file)
|
|
||||||
with open(csv_file, "a", newline="", encoding="utf-8") as f:
|
|
||||||
writer = csv.writer(f)
|
|
||||||
if not file_exists:
|
|
||||||
writer.writerow(["issuer", "provider", "oauth_uri"])
|
|
||||||
await clean_resources(agent, session, browser, playwright)
|
|
||||||
|
|
||||||
# 성공적으로 처리했으므로 반복문 탈출
|
|
||||||
break
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
await clean_resources(agent, session, browser, playwright)
|
|
||||||
|
|
||||||
if try_cnt >= 1:
|
|
||||||
print(f"❌ {url} 스캔에 실패했습니다. 에러: {e}")
|
|
||||||
logger(f"❌ {url} 스캔에 실패했습니다. 에러: {e}")
|
|
||||||
return
|
|
||||||
try_cnt += 1
|
|
||||||
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
|
|
||||||
|
|
||||||
# 1분 대기
|
|
||||||
await asyncio.sleep(5)
|
|
||||||
# 반복문을 통해 재시도
|
|
||||||
continue
|
|
||||||
|
|
||||||
|
|
||||||
async def loop(
|
|
||||||
filepath: str, start_line: int, end_line: int, skip_html_check: bool = False
|
|
||||||
):
|
|
||||||
# 인자값으로 받은 파일 경로와 줄 범위를 통해 도메인 리스트 생성
|
|
||||||
target_list = read_lines_between(
|
|
||||||
filepath=filepath, start_line=start_line, end_line=end_line
|
|
||||||
)
|
|
||||||
|
|
||||||
# (필요하다면) 강제 설정이 필요한 경우, 아래 주석을 해제하여 target_list[0] 등을 덮어쓸 수 있습니다.
|
|
||||||
#target_list[0] = "velog.io"
|
|
||||||
|
|
||||||
for url in target_list:
|
|
||||||
# scan_one_url은 외부에 정의된 비동기 함수라고 가정합니다.
|
|
||||||
# 실제로 scan_one_url이 정의된 위치를 import하거나
|
|
||||||
# 모듈 수준에 구현해두셔야 합니다.
|
|
||||||
await scan_one_url(url, skip_html_check=skip_html_check)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
parser = argparse.ArgumentParser(
|
|
||||||
prog="domain_scanner",
|
|
||||||
description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다.",
|
|
||||||
)
|
|
||||||
|
|
||||||
# 커맨드라인 인자로 받을 옵션들 정의
|
|
||||||
parser.add_argument(
|
|
||||||
"-f",
|
|
||||||
"--file",
|
|
||||||
type=str,
|
|
||||||
required=True,
|
|
||||||
help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)",
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"-s", "--start", type=int, required=True, help="읽기 시작 줄 번호 (1-based)"
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"-e", "--end", type=int, required=True, help="읽기 종료 줄 번호 (1-based)"
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"-skh",
|
|
||||||
"--skip-html-check",
|
|
||||||
type=bool,
|
|
||||||
default=False,
|
|
||||||
help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다. (기본값: False)",
|
|
||||||
)
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
# 인자값을 비동기 함수에 전달
|
|
||||||
asyncio.run(
|
|
||||||
loop(
|
|
||||||
filepath=args.file,
|
|
||||||
start_line=args.start,
|
|
||||||
end_line=args.end,
|
|
||||||
skip_html_check=args.skip_html_check,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue