feat: 코드베이스 리팩터링

* `run.py`에서 `main.py` 경로를 명시적으로 지정하고, 명령줄 인자를 보다 사용하기 쉽게 조정했습니다.
* 에이전트, 리소스 정리, 공통 함수, 모델 등을 포함하는 브라우저 유틸리티용 신규 모듈 구조를 만들었습니다.
* `agents.py`에 비동기 에이전트 실행 및 재시도 로직을 구현했습니다.
* `scanner.py`에 OAuth URL 추출 및 로그인 테스트 기능을 추가했습니다.
* 전반적인 코드베이스에 걸쳐 에러 핸들링 및 로깅을 강화했습니다.
* 백엔드 URL과 Google API 키 등의 관리를 위한 환경변수 기반 설정 시스템을 도입했습니다.
* 스캐닝 중 진행 상태 추적 및 시그널 핸들링을 통한 정상 종료 처리를 개선했습니다.
* 텍스트 파일 읽기 및 HTML 콘텐츠 여부 확인을 위한 유틸리티 함수를 추가했습니다.
* LLM과의 상호작용을 위한 구조화된 프롬프트 시스템을 구축했습니다.
This commit is contained in:
암냥 2025-06-26 21:44:31 +09:00
commit 069dbf446d
29 changed files with 453 additions and 452 deletions

View file

@ -11,6 +11,9 @@ GOOGLE_MODEL=gemini-2.5-flash
INITIAL_BACKOFF=60 INITIAL_BACKOFF=60
MAX_BACKOFF=600 MAX_BACKOFF=600
#ENABLE_PLANNER_MODEL_OAUTH_LOGIN=true # OAuth 로그인 시 Planner 모델을 활성화합니다.
#ENABLE_PLANNER_MODEL_OAUTH_LIST=true # OAuth List를 찾을 때 Planner 모델을 활성화합니다.
# ========== Monitoring ========== # ========== Monitoring ==========
# 선택 # 선택

3
.gitignore vendored
View file

@ -83,5 +83,6 @@ my.sh
log.txt log.txt
data/ data/
!src/lib/utils/data
# End of https://www.toptal.com/developers/gitignore/api/macos,windows # End of https://www.toptal.com/developers/gitignore/api/macos,windows

View file

@ -1,10 +0,0 @@
# export from show_info
from lib.utils.agent_info import *
from lib.utils.backend_client import *
from lib.utils.config import *
from lib.utils.is_html import *
from lib.utils.logger import *
from lib.utils.read_txt import *
from lib.utils.browser_use import *

View file

@ -1,5 +0,0 @@
from lib.utils.browser_use.clean_resources import *
from lib.utils.browser_use.func import *
from lib.utils.browser_use.model import *
from lib.utils.browser_use.init_profile import *
from lib.utils.browser_use.sensitive_data import *

425
main.py
View file

@ -1,425 +0,0 @@
import asyncio
import json
import os
import csv
import argparse
from pathlib import Path
import signal
from dotenv import load_dotenv
from browser_use import (
Agent,
BrowserSession,
Controller,
)
from patchright.async_api import async_playwright as async_patchright, Page
from pydantic import BaseModel
from lib.utils import (
notify_backend,
read_lines_between,
is_html_url,
env_cheker,
logger,
config,
GetProfile
)
from lib.utils import (
GetSensitiveData,
setup_storage_state,
clean_resources
)
from lib.llm import (
CreateChatGoogleGenerativeAI,
get_prompt
)
import lib.utils.browser_use.model as model
load_dotenv(verbose=True, override=True)
# Exponential backoff settings
INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", "60")) # seconds
MAX_BACKOFF = int(os.getenv("MAX_BACKOFF", "600")) # seconds
# 진행 상황 추적을 위한 전역 변수
current_progress = {"current_index": 0, "total": 0, "current_url": "", "start_line": 0}
progress_file = Path("data/scan_progress.json")
env_cheker()
if os.getenv("LMNR_PROJECT_API_KEY"):
from lmnr import Laminar
Laminar.initialize(project_api_key=os.getenv("LMNR_PROJECT_API_KEY"))
def save_progress():
"""현재 진행 상황을 파일에 저장"""
with open(progress_file, "w", encoding="utf-8") as f:
json.dump(current_progress, f, ensure_ascii=False, indent=2)
def load_progress():
"""이전 진행 상황을 파일에서 불러오기"""
if os.path.exists(progress_file):
try:
with open(progress_file, "r", encoding="utf-8") as f:
return json.load(f)
except:
return None
return None
def signal_handler(signum, frame):
"""Ctrl+C 시그널 핸들러"""
print("\n" + "=" * 60)
print("🛑 스캔이 중단되었습니다!")
print(f"📊 진행 상황:")
print(f" - 전체: {current_progress['total']}개 URL")
print(f" - 완료: {current_progress['current_index']}개 URL")
print(f" - 현재 처리 중: {current_progress['current_url']}")
print(
f" - domains.txt의 {current_progress['start_line'] + current_progress['current_index']}번째 줄"
)
print(
f" - 진행률: {current_progress['current_index']}/{current_progress['total']} ({current_progress['current_index']/current_progress['total']*100:.1f}%)"
)
print("=" * 60)
save_progress()
print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.")
exit(0)
# 시그널 핸들러 등록
signal.signal(signal.SIGINT, signal_handler)
# ── OAuth 리스트 추출 Agent ──
async def extract_oauth_list(url: str, skip_html_check: bool = False):
"""첫 번째 Agent: 로그인 페이지를 찾고 OAuth 리스트만 추출"""
await setup_storage_state()
target_url = url if url.startswith("http") else f"https://{url}"
print(f"<EFBFBD> OAuth 리스트 추출 시작: {target_url}")
# 1) URL이 HTML 페이지인지 확인
if not is_html_url(target_url) and not skip_html_check:
print(f"{target_url} 은(는) HTML이 아닙니다. 스킵합니다.")
return []
agent = None
session = None
try_cnt = 0
while True:
session = BrowserSession(
playwright=(await async_patchright().start()),
browser_profile=await GetProfile(),
)
initial_actions = [{"open_tab": {"url": target_url}}]
controller = Controller(
output_model=model.OAuthList,
exclude_actions=["search_google", "unknown_action", "unkown"],
)
print("🤖 OAuth 리스트 추출 Agent 초기화...")
try:
agent = Agent(
browser_session=session,
initial_actions=initial_actions,
sensitive_data=GetSensitiveData(),
task=(
"Navigate to the login page and identify all OAuth provider buttons (excluding Passkey). "
"DO NOT click any OAuth buttons or attempt to login. "
"Just find and list all available OAuth providers with their button texts or provider names. "
"Return a list of OAuth providers found on the login page."
),
llm=CreateChatGoogleGenerativeAI(config.GOOGLE_MODEL),
planner_llm=(
CreateChatGoogleGenerativeAI(config.GOOGLE_PLANNER_MODEL)
if config.GOOGLE_PLANNER_MODEL
else None
),
controller=controller,
extend_planner_system_message=get_prompt("auth"),
)
response = await agent.run()
final_result = response.final_result()
if final_result is None:
raise ValueError("OAuth 리스트 추출 결과가 None입니다.")
data = json.loads(final_result)
oauth_providers = data["oauth_providers"] # 이제 문자열 배열
oauth_entries = [model.OAuth(provider=provider) for provider in oauth_providers]
await clean_resources(agent, session)
return oauth_entries
except Exception as e:
await clean_resources(agent, session)
# API 쿼터 문제인지 확인
if "ResourceExhausted" in str(e) or "429" in str(e):
wait = min(INITIAL_BACKOFF * (2**try_cnt), MAX_BACKOFF)
print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...")
await asyncio.sleep(wait)
try_cnt += 1
if try_cnt >= 3:
print(
f"{url} OAuth 리스트 추출 실패: API 쿼터 문제가 지속됩니다."
)
logger(f"{url} OAuth 리스트 추출 실패: API 쿼터 문제: {e}")
return []
continue
# 일반 에러 처리
try_cnt += 1
if try_cnt >= 3:
print(f"{url} OAuth 리스트 추출 실패: 에러: {e}")
logger(f"{url} OAuth 리스트 추출 실패: 에러: {e}")
return []
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
await asyncio.sleep(30)
continue
# ── 개별 OAuth 로그인 Agent ──
async def test_oauth_login(url: str, oauth_provider: str):
"""두 번째 Agent: 특정 OAuth 제공자로 로그인 시도"""
await setup_storage_state()
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🔐 {oauth_provider} 로그인 시작: {target_url}")
agent = None
session = None
try_cnt = 0
while True:
session = BrowserSession(
playwright=(await async_patchright().start()),
browser_profile=await GetProfile(),
)
initial_actions = [{"open_tab": {"url": target_url}}]
controller = Controller(
exclude_actions=["search_google", "unknown_action", "unkown"],
)
print(f"🤖 {oauth_provider} 로그인 Agent 초기화...")
try:
agent = Agent(
browser_session=session,
initial_actions=initial_actions,
sensitive_data=GetSensitiveData(),
task=(
f"Navigate to the login page, find and click the {oauth_provider} OAuth button, "
f"then follow the complete OAuth login flow as far as possible with a real user account. "
f"Capture the final redirect URL after login completion. "
f"If login fails or encounters errors, report the issue. "
f"Focus only on {oauth_provider} - ignore other OAuth providers."
),
llm=CreateChatGoogleGenerativeAI(config.GOOGLE_MODEL),
planner_llm=(
CreateChatGoogleGenerativeAI(config.GOOGLE_PLANNER_MODEL)
if config.GOOGLE_PLANNER_MODEL
else None
),
controller=controller,
extend_planner_system_message=get_prompt(oauth_provider),
)
response = await agent.run()
final_result = response.final_result()
print(f"{oauth_provider} 로그인 완료")
if final_result:
logger(f"{url} - {oauth_provider} 로그인 결과: {final_result}")
await clean_resources(agent, session)
return True
except Exception as e:
await clean_resources(agent, session)
# API 쿼터 문제인지 확인
if "ResourceExhausted" in str(e) or "429" in str(e):
wait = min(INITIAL_BACKOFF * (2**try_cnt), MAX_BACKOFF)
print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...")
await asyncio.sleep(wait)
try_cnt += 1
if try_cnt >= 3:
print(
f"{oauth_provider} 로그인 실패: API 쿼터 문제가 지속됩니다."
)
logger(
f"{url} - {oauth_provider} 로그인 실패: API 쿼터 문제: {e}"
)
return False
continue
# 일반 에러 처리
try_cnt += 1
if try_cnt >= 3:
print(f"{oauth_provider} 로그인 실패: 에러: {e}")
logger(f"{url} - {oauth_provider} 로그인 실패: 에러: {e}")
return False
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
await asyncio.sleep(30)
continue
# ── 통합 스캔 함수 ──
async def scan_one_url(url: str, skip_html_check: bool = False):
"""URL 스캔 통합 함수: OAuth 리스트 추출 → 개별 OAuth 로그인 시도"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🚀 스캔 시작: {target_url}")
# Backend에 스캔 시작을 알림
notify_backend(target_url)
# 1단계: OAuth 리스트 추출
oauth_entries = await extract_oauth_list(url, skip_html_check)
if not oauth_entries:
print(f"{target_url}에서 OAuth 제공자를 찾을 수 없습니다.")
return
print("-" * 50)
print(f"🔗 스캔 URL: {url}")
print(f"🔐 발견된 OAuth 제공자들: {len(oauth_entries)}")
for entry in oauth_entries:
print(f" - {entry.provider}")
print("-" * 50)
# CSV에 OAuth 리스트 저장
csv_file = "./data/oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri", "login_tested"])
for entry in oauth_entries:
writer.writerow([url, entry.provider, "", "pending"]) # oauth_uri는 빈 문자열
# 2단계: 각 OAuth 제공자별로 개별 로그인 시도
for i, oauth_entry in enumerate(oauth_entries):
print(
f"\n🔄 OAuth 로그인 테스트 {i+1}/{len(oauth_entries)}: {oauth_entry.provider}"
)
# OAuth 간 대기 시간
if i > 0:
print("⏳ OAuth 테스트 간 대기 중 (30초)...")
await asyncio.sleep(30)
# 개별 OAuth 로그인 시도
success = await test_oauth_login(url, oauth_entry.provider)
# 결과를 CSV에 업데이트 (간단하게 로그만 남김)
status = "success" if success else "failed"
print(f"📝 {oauth_entry.provider} 로그인 결과: {status}")
async def loop(
filepath: str, start_line: int, end_line: int, skip_html_check: bool = False
):
# 인자값으로 받은 파일 경로와 줄 범위를 통해 도메인 리스트 생성
target_list = read_lines_between(
filepath=filepath, start_line=start_line, end_line=end_line
)
# 진행 상황 초기화
current_progress["total"] = len(target_list)
current_progress["start_line"] = start_line
current_progress["current_index"] = 0
# 이전 진행 상황 확인
prev_progress = load_progress()
if prev_progress and prev_progress.get("start_line") == start_line:
print(f"📋 이전 진행 상황을 발견했습니다:")
print(
f" - 이전 완료: {prev_progress['current_index']}/{prev_progress['total']}"
)
print(f" - 마지막 처리: {prev_progress.get('current_url', 'N/A')}")
resume = input("이어서 진행하시겠습니까? (y/n): ").lower().strip()
if resume == "y":
current_progress["current_index"] = prev_progress["current_index"]
target_list = target_list[current_progress["current_index"] :]
print(f"{current_progress['current_index']}번째부터 재개합니다.")
# (필요하다면) 강제 설정이 필요한 경우, 아래 주석을 해제하여 target_list[0] 등을 덮어쓸 수 있습니다.
# target_list[0] = "velog.io"
for i, url in enumerate(target_list):
actual_index = current_progress["current_index"] + i
current_progress["current_url"] = url
current_progress["current_index"] = actual_index
print(f"\n🔄 Processing {actual_index + 1}/{current_progress['total']}: {url}")
print(f"📍 domains.txt의 {start_line + actual_index}번째 줄")
# URL들 사이에 API 쿼터 회복을 위한 대기 시간 추가
if actual_index > 0:
print("⏳ API 쿼터 보호를 위해 30초 대기 중...")
await asyncio.sleep(30)
await scan_one_url(url, skip_html_check=skip_html_check)
# 진행 상황 저장
current_progress["current_index"] = actual_index + 1
save_progress()
print(f"\n🎉 모든 스캔이 완료되었습니다! ({current_progress['total']}개 URL)")
# 완료 후 진행 상황 파일 삭제
if os.path.exists(progress_file):
os.remove(progress_file)
def main():
parser = argparse.ArgumentParser(
prog="domain_scanner",
description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다.",
)
# 커맨드라인 인자로 받을 옵션들 정의
parser.add_argument(
"-f",
"--file",
type=str,
required=True,
help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)",
)
parser.add_argument(
"-s", "--start", type=int, required=True, help="읽기 시작 줄 번호 (1-based)"
)
parser.add_argument(
"-e", "--end", type=int, required=True, help="읽기 종료 줄 번호 (1-based)"
)
parser.add_argument(
"-skh",
"--skip-html-check",
type=bool,
default=False,
help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다. (기본값: False)",
)
args = parser.parse_args()
# 인자값을 비동기 함수에 전달
asyncio.run(
loop(
filepath=args.file,
start_line=args.start,
end_line=args.end,
skip_html_check=args.skip_html_check,
)
)
if __name__ == "__main__":
main()

17
run.py
View file

@ -8,7 +8,7 @@ import argparse
#!/usr/bin/env python3 #!/usr/bin/env python3
# ── 설정 부분 ── # ── 설정 부분 ──
PYTHON_SCRIPT = "main.py" PYTHON_SCRIPT = "./src/main.py"
DOMAIN_FILE = "./data/domains.txt" DOMAIN_FILE = "./data/domains.txt"
# ───────────── # ─────────────
@ -35,13 +35,16 @@ def run_script(start_line, end_line, skh_option):
print(f"[{current_time}] Processing lines {start_line} to {end_line}...") print(f"[{current_time}] Processing lines {start_line} to {end_line}...")
try: try:
subprocess.run([ command = [
"uv", "run", PYTHON_SCRIPT, "uv", "run", PYTHON_SCRIPT,
"-f", DOMAIN_FILE, "-f", DOMAIN_FILE,
"-s", str(start_line), "-s", str(start_line),
"-e", str(end_line), "-e", str(end_line),
"-skh", str(skh_option) ]
], check=True) if skh_option:
command.append("--skip-html-check")
subprocess.run(command, check=True)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
print("Python 스크립트 실행 실패") print("Python 스크립트 실행 실패")
sys.exit(1) sys.exit(1)
@ -52,9 +55,9 @@ def main():
formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=""" epilog="""
사용 예시: 사용 예시:
python run.py 10000 11000 # 10000~11000 라인 처리 uv run run.py 10000 11000 # 10000~11000 라인 처리
python run.py 10000 11000 --skh # SKH 옵션 활성화 uv run run.py 10000 11000 --skh # SKH 옵션 활성화
python run.py 10000 11000 --no-download # 다운로드 생략 uv run run.py 10000 11000 --no-download # 다운로드 생략
""" """
) )

View file

@ -0,0 +1,7 @@
from lib.browser_use.clean_resources import *
from lib.browser_use.func import *
from lib.browser_use.model import *
from lib.browser_use.init_profile import *
from lib.browser_use.sensitive_data import *
from lib.browser_use.agents import *
from lib.browser_use.scanner import *

View file

@ -0,0 +1,167 @@
import asyncio
import os
import json
from browser_use import Agent, BrowserSession, Controller
from patchright.async_api import async_playwright as async_patchright
from lib.browser_use import (
GetProfile,
GetSensitiveData,
clean_resources,
)
from lib.utils import (
logger,
config,
)
from lib.llm import CreateChatGoogleGenerativeAI, get_prompt
import lib.browser_use.model as model
# Exponential backoff settings
INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", "60")) # seconds
MAX_BACKOFF = int(os.getenv("MAX_BACKOFF", "600")) # seconds
async def _run_agent_with_retry(agent_config):
"""Agent 실행을 위한 내부 헬퍼 함수 (재시도 로직 포함)"""
agent = None
session = None
try_cnt = 0
url = agent_config["url"]
while try_cnt < 3:
try:
session = BrowserSession(
playwright=(await async_patchright().start()),
browser_profile=await GetProfile(),
)
agent = Agent(
browser_session=session,
**agent_config["agent_params"]
)
response = await agent.run()
await clean_resources(agent, session)
return response
except Exception as e:
await clean_resources(agent, session)
if "ResourceExhausted" in str(e) or "429" in str(e):
wait = min(INITIAL_BACKOFF * (2**try_cnt), MAX_BACKOFF)
print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...")
await asyncio.sleep(wait)
try_cnt += 1
if try_cnt >= 3:
error_msg = f"API 쿼터 문제가 지속됩니다."
logger(f"{url} - {agent_config['log_context']} 실패: {error_msg}: {e}")
print(f"{url} - {agent_config['log_context']} 실패: {error_msg}")
return None
continue
# 일반 에러 처리
try_cnt += 1
if try_cnt >= 3:
error_msg = f"최대 재시도 횟수 초과."
logger(f"{url} - {agent_config['log_context']} 실패: {error_msg}: {e}")
print(f"{url} - {agent_config['log_context']} 실패: {error_msg}")
return None
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
await asyncio.sleep(30)
continue
return None
async def extract_oauth_list(url: str):
"""첫 번째 Agent: 로그인 페이지를 찾고 OAuth 리스트만 추출"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🔎 OAuth 리스트 추출 시작: {target_url}")
agent_config = {
"url": target_url,
"log_context": "OAuth 리스트 추출",
"agent_params": {
"initial_actions": [{"open_tab": {"url": target_url}}],
"sensitive_data": GetSensitiveData(),
"task": (
"Navigate to the login page and identify all OAuth provider buttons (excluding Passkey). "
"DO NOT click any OAuth buttons or attempt to login. "
"Just find and list all available OAuth providers with their button texts or provider names. "
"Return a list of OAuth providers found on the login page."
),
"llm": CreateChatGoogleGenerativeAI(config.GOOGLE_MODEL),
"planner_llm": (
CreateChatGoogleGenerativeAI(config.GOOGLE_PLANNER_MODEL)
if config.GOOGLE_PLANNER_MODEL
else None
),
"controller": Controller(
output_model=model.OAuthList,
exclude_actions=["search_google", "unknown_action", "unkown"],
),
"extend_planner_system_message": get_prompt("auth"),
}
}
response = await _run_agent_with_retry(agent_config)
if not response:
return []
final_result = response.final_result()
if not final_result:
print("OAuth 리스트 추출 결과가 없습니다.")
return []
try:
data = json.loads(final_result)
oauth_providers = data.get("oauth_providers", [])
return [model.OAuth(provider=provider) for provider in oauth_providers]
except (json.JSONDecodeError, KeyError) as e:
print(f"❌ 결과 파싱 실패: {e}")
logger(f"{url} 결과 파싱 실패: {final_result}")
return []
async def test_oauth_login(url: str, oauth_provider: str):
"""두 번째 Agent: 특정 OAuth 제공자로 로그인 시도"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🔐 {oauth_provider} 로그인 시작: {target_url}")
agent_config = {
"url": target_url,
"log_context": f"{oauth_provider} 로그인",
"agent_params": {
"initial_actions": [{"open_tab": {"url": target_url}}],
"sensitive_data": GetSensitiveData(),
"task": (
f"Navigate to the login page, find and click the {oauth_provider} OAuth button, "
f"then follow the complete OAuth login flow as far as possible with a real user account. "
f"Capture the final redirect URL after login completion. "
f"If login fails or encounters errors, report the issue. "
f"Focus only on {oauth_provider} - ignore other OAuth providers."
),
"llm": CreateChatGoogleGenerativeAI(config.GOOGLE_MODEL),
"planner_llm": (
CreateChatGoogleGenerativeAI(config.GOOGLE_PLANNER_MODEL)
if config.GOOGLE_PLANNER_MODEL and os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN")
else None
),
"controller": Controller(
exclude_actions=["search_google", "unknown_action", "unkown"],
),
"extend_planner_system_message": get_prompt(oauth_provider),
}
}
response = await _run_agent_with_retry(agent_config)
if response and response.final_result():
final_result = response.final_result()
print(f"{oauth_provider} 로그인 완료")
logger(f"{url} - {oauth_provider} 로그인 결과: {final_result}")
return True
print(f"{oauth_provider} 로그인 실패")
return False

View file

@ -1,5 +1,5 @@
import os import os
from lib.utils.browser_use.func import * from lib.browser_use.func import *
# Initialize configuration # Initialize configuration
proxy_url = setup_proxy() proxy_url = setup_proxy()

View file

@ -0,0 +1,106 @@
import asyncio
import os
import csv
from lib.utils import notify_backend, read_lines_between, is_html_url
from lib.browser_use.agents import extract_oauth_list, test_oauth_login
from lib.utils.progress import current_progress, load_progress, save_progress, progress_file
async def scan_one_url(url: str, skip_html_check: bool = False):
"""URL 스캔 통합 함수: OAuth 리스트 추출 → 개별 OAuth 로그인 시도"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🚀 스캔 시작: {target_url}")
# Backend에 스캔 시작을 알림
notify_backend(target_url)
# 1) URL이 HTML 페이지인지 확인
if not is_html_url(target_url) and not skip_html_check:
print(f"{target_url} 은(는) HTML이 아닙니다. 스킵합니다.")
return
# 1단계: OAuth 리스트 추출
oauth_entries = await extract_oauth_list(target_url)
if not oauth_entries:
print(f"{target_url}에서 OAuth 제공자를 찾을 수 없습니다.")
return
print("-" * 50)
print(f"🔗 스캔 URL: {url}")
print(f"🔐 발견된 OAuth 제공자들: {len(oauth_entries)}")
for entry in oauth_entries:
print(f" - {entry.provider}")
print("-" * 50)
# CSV에 OAuth 리스트 저장
csv_file = "./data/oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri", "login_tested"])
for entry in oauth_entries:
writer.writerow([url, entry.provider, "", "pending"])
# 2단계: 각 OAuth 제공자별로 개별 로그인 시도
for i, oauth_entry in enumerate(oauth_entries):
print(
f"\n🔄 OAuth 로그인 테스트 {i+1}/{len(oauth_entries)}: {oauth_entry.provider}"
)
# OAuth 간 대기 시간
if i > 0:
print("⏳ OAuth 테스트 간 대기 중 (30초)...")
await asyncio.sleep(30)
# 개별 OAuth 로그인 시도
success = await test_oauth_login(url, oauth_entry.provider)
# 결과를 CSV에 업데이트 (간단하게 로그만 남김)
status = "success" if success else "failed"
print(f"📝 {oauth_entry.provider} 로그인 결과: {status}")
async def main_loop(
filepath: str, start_line: int, end_line: int, skip_html_check: bool = False
):
"""지정된 URL 목록에 대해 스캔을 실행하는 메인 루프"""
target_list = read_lines_between(
filepath=filepath, start_line=start_line, end_line=end_line
)
current_progress["total"] = len(target_list)
current_progress["start_line"] = start_line
current_progress["current_index"] = 0
prev_progress = load_progress()
if prev_progress and prev_progress.get("start_line") == start_line:
print("📋 이전 진행 상황을 발견했습니다:")
print(f" - 이전 완료: {prev_progress['current_index']}/{prev_progress['total']}")
print(f" - 마지막 처리: {prev_progress.get('current_url', 'N/A')}")
resume = input("이어서 진행하시겠습니까? (y/n): ").lower().strip()
if resume == 'y':
start_index = prev_progress.get("current_index", 0)
current_progress["current_index"] = start_index
target_list = target_list[start_index:]
print(f"{start_index}번째부터 재개합니다.")
for i, url in enumerate(target_list):
actual_index = current_progress["current_index"] + i
current_progress["current_url"] = url
print(f"\n🔄 Processing {actual_index + 1}/{current_progress['total']}: {url}")
print(f"📍 {os.path.basename(filepath)}{start_line + actual_index}번째 줄")
if i > 0:
print("⏳ API 쿼터 보호를 위해 30초 대기 중...")
await asyncio.sleep(30)
await scan_one_url(url, skip_html_check=skip_html_check)
current_progress["current_index"] = actual_index + 1
save_progress()
print(f"\n🎉 모든 스캔이 완료되었습니다! ({current_progress['total']}개 URL)")

View file

@ -0,0 +1,7 @@
# export from show_info
from lib.utils.agent_info import *
from lib.utils.data import *
from lib.utils.config import *
from lib.utils.parsing.is_html import *
from lib.utils.parsing.read_txt import *

View file

@ -4,7 +4,9 @@ from lib.utils.config import (
GOOGLE_MODEL, GOOGLE_MODEL,
GOOGLE_PLANNER_MODEL, GOOGLE_PLANNER_MODEL,
) )
import os
from dotenv import load_dotenv
load_dotenv(override=True)
def show_info(): def show_info():
print("🔧 환경 설정:") print("🔧 환경 설정:")
@ -38,3 +40,19 @@ def browser_use_version():
def env_cheker(): def env_cheker():
if GOOGLE_API_KEY is None: if GOOGLE_API_KEY is None:
raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.") raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.")
if GOOGLE_PLANNER_MODEL != None and (not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN") or not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LIST")):
print(
"⚠️ GOOGLE_PLANNER_MODEL이 설정되어 있지만, ENABLE_PLANNER_MODEL_OAUTH_LOGIN 또는 ENABLE_PLANNER_MODEL_OAUTH_LIST가 활성화되지 않았습니다."
)
print(
"⚠️ Planner 모델을 사용하려면 .env 파일에서 ENABLE_PLANNER_MODEL_OAUTH_LOGIN과 ENABLE_PLANNER_MODEL_OAUTH_LIST를 true로 설정하세요."
)
print(
"‼️ 하지만 현재 Planner 모델을 사용하는 것이 권장되지 않습니다. 이 기능은 오작동을 일으킬 수 있습니다."
)
print(
"⚠️ 이 경고는 1초동안 정지합니다."
)
# 이 경고는 1초동안 sleep
import time
time.sleep(1)

View file

@ -4,5 +4,5 @@ load_dotenv(verbose=True, override=True)
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:11081") BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:11081")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash-preview-05-20") GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash")
GOOGLE_PLANNER_MODEL = os.getenv("GOOGLE_PLANNER_MODEL", "gemini-2.5-pro-preview-06-05") GOOGLE_PLANNER_MODEL = os.getenv("GOOGLE_PLANNER_MODEL")

View file

@ -0,0 +1,2 @@
from lib.utils.data.backend_client import *
from lib.utils.data.logger import *

48
src/lib/utils/progress.py Normal file
View file

@ -0,0 +1,48 @@
import json
import os
import signal
from pathlib import Path
# 진행 상황 추적을 위한 전역 변수
current_progress = {"current_index": 0, "total": 0, "current_url": "", "start_line": 0}
progress_file = Path("data/scan_progress.json")
def save_progress():
"""현재 진행 상황을 파일에 저장"""
progress_file.parent.mkdir(parents=True, exist_ok=True)
with open(progress_file, "w", encoding="utf-8") as f:
json.dump(current_progress, f, ensure_ascii=False, indent=2)
def load_progress():
"""이전 진행 상황을 파일에서 불러오기"""
if os.path.exists(progress_file):
try:
with open(progress_file, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return None
return None
def signal_handler(signum, frame):
"""Ctrl+C 시그널 핸들러"""
print("\n" + "=" * 60)
print("🛑 스캔이 중단되었습니다!")
print(f"📊 진행 상황:")
print(f" - 전체: {current_progress['total']}개 URL")
print(f" - 완료: {current_progress['current_index']}개 URL")
print(f" - 현재 처리 중: {current_progress['current_url']}")
print(
f" - domains.txt의 {current_progress['start_line'] + current_progress['current_index']}번째 줄"
)
if current_progress['total'] > 0:
print(
f" - 진행률: {current_progress['current_index']}/{current_progress['total']} ({current_progress['current_index']/current_progress['total']*100:.1f}%)"
)
print("=" * 60)
save_progress()
print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.")
exit(0)
def setup_signal_handler():
"""시그널 핸들러 등록"""
signal.signal(signal.SIGINT, signal_handler)

79
src/main.py Normal file
View file

@ -0,0 +1,79 @@
import asyncio
import argparse
import os
from dotenv import load_dotenv
from lib.utils import env_cheker
from lib.browser_use.scanner import main_loop
from lib.utils.progress import setup_signal_handler, progress_file
# .env 파일 로드
load_dotenv(verbose=True, override=True)
# 환경 변수 체크
env_cheker()
# Laminar 초기화 (선택적)
if os.getenv("LMNR_PROJECT_API_KEY"):
try:
from lmnr import Laminar
Laminar.initialize(project_api_key=os.getenv("LMNR_PROJECT_API_KEY"))
except ImportError:
print("⚠️ Laminar 라이브러리가 설치되지 않았습니다. 관련 기능이 비활성화됩니다.")
def main():
"""애플리케이션 메인 진입점"""
# 시그널 핸들러 설정
setup_signal_handler()
parser = argparse.ArgumentParser(
prog="domain_scanner",
description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다.",
)
parser.add_argument(
"-f",
"--file",
type=str,
required=True,
help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)",
)
parser.add_argument(
"-s", "--start", type=int, required=True, help="읽기 시작 줄 번호 (1-based)"
)
parser.add_argument(
"-e", "--end", type=int, required=True, help="읽기 종료 줄 번호 (1-based)"
)
parser.add_argument(
"-skh",
"--skip-html-check",
action='store_true', # 플래그 형식으로 변경
help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다.",
)
args = parser.parse_args()
try:
asyncio.run(
main_loop(
filepath=args.file,
start_line=args.start,
end_line=args.end,
skip_html_check=args.skip_html_check,
)
)
except KeyboardInterrupt:
# signal_handler가 처리하므로 여기서는 별도 처리 불필요
pass
finally:
# 정상 종료 시 진행 상황 파일 삭제
if os.path.exists(progress_file):
try:
os.remove(progress_file)
except OSError as e:
print(f"오류: 진행 상황 파일을 삭제하지 못했습니다. {e}")
if __name__ == "__main__":
main()