refect: 코드 가독성 해결

This commit is contained in:
암냥 2025-07-02 19:10:58 +09:00
commit 3199a53a44
52 changed files with 389 additions and 3246 deletions

View file

@ -1,7 +1,7 @@
from lib.browser_use.agents import *
from lib.browser_use.clean_resources import *
from lib.browser_use.func import *
from lib.browser_use.model import *
from lib.browser_use.init_profile import *
from lib.browser_use.model import *
from lib.browser_use.scanner import *
from lib.browser_use.sensitive_data import *
from lib.browser_use.agents import *
from lib.browser_use.scanner import *

View file

@ -1,31 +1,26 @@
import asyncio
import os
import json
from typing import Dict, Any, Optional
import os
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from browser_use import Agent, BrowserSession, Controller
from patchright.async_api import async_playwright as async_patchright
from lib.browser_use import (
GetProfile,
GetSensitiveData,
clean_resources,
)
from lib.utils import (
logger,
config,
)
from lib.browser_use import GetProfile, GetSensitiveData, clean_resources
from lib.llm import CreateChatGoogle, get_prompt
from lib.utils import config, logger
# Exponential backoff settings
INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", "60")) # seconds
MAX_BACKOFF = int(os.getenv("MAX_BACKOFF", "600")) # seconds
@dataclass
class RetryTask:
"""재시도할 작업을 나타내는 클래스"""
task_type: str # "oauth_list" or "oauth_login"
url: str
oauth_provider: Optional[str] = None
@ -33,46 +28,55 @@ class RetryTask:
next_retry_time: Optional[datetime] = None
max_retries: int = 5
# 전역 재시도 큐
retry_queue: list[RetryTask] = []
retry_queue_lock = asyncio.Lock()
async def add_to_retry_queue(task: RetryTask):
"""작업을 재시도 큐에 추가"""
async with retry_queue_lock:
# 중복 작업 확인
existing_task = None
for existing in retry_queue:
if (existing.task_type == task.task_type and
existing.url == task.url and
existing.oauth_provider == task.oauth_provider):
if (
existing.task_type == task.task_type
and existing.url == task.url
and existing.oauth_provider == task.oauth_provider
):
existing_task = existing
break
if existing_task:
# 기존 작업이 있으면 재시도 횟수 업데이트
existing_task.retry_count = task.retry_count
existing_task.next_retry_time = task.next_retry_time
print(f"📝 기존 작업 업데이트: {task.task_type} - {task.url} (재시도: {task.retry_count})")
print(
f"📝 기존 작업 업데이트: {task.task_type} - {task.url} (재시도: {task.retry_count})"
)
else:
# 새 작업 추가
retry_queue.append(task)
print(f" 재시도 큐에 작업 추가: {task.task_type} - {task.url} (재시도: {task.retry_count})")
print(
f" 재시도 큐에 작업 추가: {task.task_type} - {task.url} (재시도: {task.retry_count})"
)
async def process_retry_queue():
"""재시도 큐 처리"""
async with retry_queue_lock:
now = datetime.now()
ready_tasks = []
for task in retry_queue[:]: # 복사본에서 반복
if task.next_retry_time and task.next_retry_time <= now:
ready_tasks.append(task)
retry_queue.remove(task)
if ready_tasks:
print(f"🔄 {len(ready_tasks)}개의 재시도 작업 처리 중...")
for task in ready_tasks:
try:
if task.task_type == "oauth_list":
@ -82,20 +86,25 @@ async def process_retry_queue():
else:
await _handle_retry_failure(task)
elif task.task_type == "oauth_login":
result = await _test_oauth_login_internal(task.url, task.oauth_provider)
result = await _test_oauth_login_internal(
task.url, task.oauth_provider
)
if result:
print(f"✅ 재시도 성공: {task.oauth_provider} 로그인 - {task.url}")
print(
f"✅ 재시도 성공: {task.oauth_provider} 로그인 - {task.url}"
)
else:
await _handle_retry_failure(task)
except Exception as e:
print(f"❌ 재시도 중 에러: {e}")
await _handle_retry_failure(task)
async def _handle_retry_failure(task: RetryTask):
"""재시도 실패 처리"""
if task.retry_count < task.max_retries:
task.retry_count += 1
wait_time = min(INITIAL_BACKOFF * (2 ** task.retry_count), MAX_BACKOFF)
wait_time = min(INITIAL_BACKOFF * (2**task.retry_count), MAX_BACKOFF)
task.next_retry_time = datetime.now() + timedelta(seconds=wait_time)
await add_to_retry_queue(task)
print(f"{wait_time}초 후 재시도 예정: {task.task_type} - {task.url}")
@ -103,6 +112,7 @@ async def _handle_retry_failure(task: RetryTask):
print(f"❌ 최대 재시도 횟수 초과: {task.task_type} - {task.url}")
logger(f"❌ 최대 재시도 횟수 초과: {task.task_type} - {task.url}")
async def get_retry_queue_status():
"""재시도 큐 상태 조회"""
async with retry_queue_lock:
@ -114,19 +124,24 @@ async def get_retry_queue_status():
"url": task.url,
"oauth_provider": task.oauth_provider,
"retry_count": task.retry_count,
"next_retry_time": task.next_retry_time.isoformat() if task.next_retry_time else None
"next_retry_time": (
task.next_retry_time.isoformat()
if task.next_retry_time
else None
),
}
for task in retry_queue
]
],
}
async def _run_agent_with_retry(agent_config):
"""Agent 실행을 위한 내부 헬퍼 함수 (재시도 로직 포함)"""
agent = None
session = None
try_cnt = 0
url = agent_config["url"]
while try_cnt < 3:
try:
session = BrowserSession(
@ -134,25 +149,30 @@ async def _run_agent_with_retry(agent_config):
browser_profile=await GetProfile(),
)
agent = Agent(
browser_session=session,
**agent_config["agent_params"]
)
agent = Agent(browser_session=session, **agent_config["agent_params"])
response = await agent.run()
await clean_resources(agent, session)
if any(keyword in str(response) for keyword in [
"429", "resource_exhausted", "resourceexhausted",
"quota", "rate limit", "too many requests",
"exceeded", "limit reached"
]):
if any(
keyword in str(response)
for keyword in [
"429",
"resource_exhausted",
"resourceexhausted",
"quota",
"rate limit",
"too many requests",
"exceeded",
"limit reached",
]
):
print(f"⚠️ API 쿼터 에러 발생, 재시도 큐에 추가: {url}")
task = RetryTask(
task_type=agent_config.get("task_type", "unknown"),
url=url,
retry_count=try_cnt + 1,
next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF)
next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF),
)
await add_to_retry_queue(task)
return None
@ -166,10 +186,12 @@ async def _run_agent_with_retry(agent_config):
try_cnt += 1
if try_cnt >= 3:
error_msg = f"최대 재시도 횟수 초과."
logger(f"{url} - {agent_config['log_context']} 실패: {error_msg}: {e}")
logger(
f"{url} - {agent_config['log_context']} 실패: {error_msg}: {e}"
)
print(f"{url} - {agent_config['log_context']} 실패: {error_msg}")
return None
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
await asyncio.sleep(30)
continue
@ -197,7 +219,8 @@ async def _extract_oauth_list_internal(url: str):
"llm": CreateChatGoogle(config.GOOGLE_MODEL),
"planner_llm": (
CreateChatGoogle(config.GOOGLE_PLANNER_MODEL)
if config.GOOGLE_PLANNER_MODEL and os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LIST")
if config.GOOGLE_PLANNER_MODEL
and os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LIST")
else None
),
"controller": Controller(
@ -206,7 +229,7 @@ async def _extract_oauth_list_internal(url: str):
),
"extend_system_message": prompt,
"extend_planner_system_message": prompt,
}
},
}
response = await _run_agent_with_retry(agent_config)
@ -241,17 +264,25 @@ async def extract_oauth_list(url: str):
return await _extract_oauth_list_internal(url)
except Exception as e:
error_str = str(e).lower()
if any(keyword in error_str for keyword in [
"429", "resource_exhausted", "resourceexhausted",
"quota", "rate limit", "too many requests",
"exceeded", "limit reached"
]):
if any(
keyword in error_str
for keyword in [
"429",
"resource_exhausted",
"resourceexhausted",
"quota",
"rate limit",
"too many requests",
"exceeded",
"limit reached",
]
):
print(f"⚠️ API 쿼터 에러 발생, 재시도 큐에 추가: {url}")
task = RetryTask(
task_type="oauth_list",
url=url,
retry_count=1,
next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF)
next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF),
)
await add_to_retry_queue(task)
return []
@ -282,7 +313,8 @@ async def _test_oauth_login_internal(url: str, oauth_provider: str):
"llm": CreateChatGoogle(config.GOOGLE_MODEL),
"planner_llm": (
CreateChatGoogle(config.GOOGLE_PLANNER_MODEL)
if config.GOOGLE_PLANNER_MODEL and os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN")
if config.GOOGLE_PLANNER_MODEL
and os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN")
else None
),
"controller": Controller(
@ -291,7 +323,7 @@ async def _test_oauth_login_internal(url: str, oauth_provider: str):
),
"extend_system_message": prompt,
"extend_planner_system_message": prompt,
}
},
}
response = await _run_agent_with_retry(agent_config)
@ -301,7 +333,7 @@ async def _test_oauth_login_internal(url: str, oauth_provider: str):
print(f"{oauth_provider} 로그인 완료")
logger(f"{url} - {oauth_provider} 로그인 결과: {final_result}")
return True
print(f"{oauth_provider} 로그인 실패")
return False
@ -312,26 +344,36 @@ async def test_oauth_login(url: str, oauth_provider: str):
return await _test_oauth_login_internal(url, oauth_provider)
except Exception as e:
error_str = str(e).lower()
if any(keyword in error_str for keyword in [
"429", "resource_exhausted", "resourceexhausted",
"quota", "rate limit", "too many requests",
"exceeded", "limit reached"
]):
if any(
keyword in error_str
for keyword in [
"429",
"resource_exhausted",
"resourceexhausted",
"quota",
"rate limit",
"too many requests",
"exceeded",
"limit reached",
]
):
print(f"⚠️ API 쿼터 에러 발생, 재시도 큐에 추가: {oauth_provider} - {url}")
task = RetryTask(
task_type="oauth_login",
url=url,
oauth_provider=oauth_provider,
retry_count=1,
next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF)
next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF),
)
await add_to_retry_queue(task)
return False
else:
raise e
async def start_retry_queue_processor():
"""재시도 큐 처리기를 백그라운드에서 시작"""
async def queue_processor():
while True:
try:
@ -340,14 +382,15 @@ async def start_retry_queue_processor():
except Exception as e:
print(f"❌ 재시도 큐 처리 중 에러: {e}")
await asyncio.sleep(60) # 에러 발생 시 1분 대기
# 백그라운드 태스크로 실행
asyncio.create_task(queue_processor())
print("🔄 재시도 큐 처리기 시작됨")
# 모듈 로딩 시 자동으로 백그라운드 처리기 시작
# (실제 애플리케이션에서는 main 함수에서 호출하는 것이 좋음)
def init_retry_system():
"""재시도 시스템 초기화"""
print("🔧 재시도 시스템 초기화 중...")
# 이 함수는 메인 애플리케이션에서 호출해야 함
# 이 함수는 메인 애플리케이션에서 호출해야 함

View file

@ -1,5 +1,6 @@
from pathlib import Path
async def clean_resources(agent=None, session=None):
"""리소스를 정리하는 함수"""
storage_state_temp_path = Path("./data/storage_state_temp.json").resolve()

View file

@ -1,21 +1,21 @@
import os
import json
import os
from pathlib import Path
from dotenv import load_dotenv
from browser_use import BrowserProfile
import json
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv(override=True)
async def setup_storage_state():
"""Setup browser storage state for session persistence."""
# Get the script directory to ensure correct path resolution
script_dir = Path(__file__).parent.parent.parent.parent
storage_state_path = script_dir / "data" / "storage_state.json"
storage_state_temp_path = script_dir / "data" / "storage_state_temp.json"
print(f"📂 Storage state path: {storage_state_path}")
print(f"📂 Temp storage state path: {storage_state_temp_path}")
@ -24,15 +24,15 @@ async def setup_storage_state():
if storage_state_temp_path.exists():
storage_state_temp_path.unlink()
with open(storage_state_path, 'r') as f:
with open(storage_state_path, "r") as f:
storage_data = json.load(f)
with open(storage_state_temp_path, 'w') as f:
with open(storage_state_temp_path, "w") as f:
json.dump(storage_data, f, indent=4)
print(f"🔄 Using existing storage state: {storage_state_temp_path}")
return str(storage_state_temp_path)
except Exception as e:
print(f"⚠️ Error processing storage state: {e}")
if storage_state_temp_path.exists():

View file

@ -1,44 +1,44 @@
import os
from lib.browser_use.func import *
# Initialize configuration
proxy_url = setup_proxy()
async def GetProfile():
storage_state_path = await setup_storage_state()
# Handle potential encoding issues with storage state file
try:
if storage_state_path and os.path.exists(storage_state_path):
# Test if file can be read properly, if not, skip it
with open(storage_state_path, 'r', encoding='utf-8') as f:
with open(storage_state_path, "r", encoding="utf-8") as f:
f.read()
storage_state = storage_state_path
else:
print("⚠️ Storage state file not found or inaccessible, proceeding without it.")
print(
"⚠️ Storage state file not found or inaccessible, proceeding without it."
)
storage_state = None
except (UnicodeDecodeError, FileNotFoundError):
# If there's an encoding error, don't use the storage state
storage_state = None
profile = BrowserProfile(
# Security settings
disable_security=True,
stealth=True,
# Display settings
headless=False,
device_scale_factor=1,
window_size={"width": 1600, "height": 900},
viewport={"width": 1600, "height": 900},
# Data persistence
user_data_dir=None,
storage_state=storage_state,
# Network settings
proxy={"server": proxy_url} if proxy_url else None,
# Additional arguments
args=get_browser_args(),
)

View file

@ -1,6 +1,8 @@
from typing import List
from pydantic import BaseModel
# 출력 모델
class OAuth(BaseModel):
provider: str
@ -12,4 +14,4 @@ class OAuthList(BaseModel):
# 기존 모델 유지 (backward compatibility)
BaseModel = OAuthList
BaseModel = OAuthList

View file

@ -1,10 +1,21 @@
import asyncio
import os
import csv
import os
from lib.browser_use.agents import (
extract_oauth_list,
get_retry_queue_status,
start_retry_queue_processor,
test_oauth_login,
)
from lib.utils import is_html_url, notify_backend, read_lines_between
from lib.utils.progress import (
current_progress,
load_progress,
progress_file,
save_progress,
)
from lib.utils import notify_backend, read_lines_between, is_html_url
from lib.browser_use.agents import extract_oauth_list, test_oauth_login, start_retry_queue_processor, get_retry_queue_status
from lib.utils.progress import current_progress, load_progress, save_progress, progress_file
async def scan_one_url(url: str, skip_html_check: bool = False):
"""URL 스캔 통합 함수: OAuth 리스트 추출 → 개별 OAuth 로그인 시도"""
@ -45,9 +56,7 @@ async def scan_one_url(url: str, skip_html_check: bool = False):
# 2단계: 각 OAuth 제공자별로 개별 로그인 시도
for i, oauth_entry in enumerate(oauth_entries):
print(
f"\n🔄 OAuth 로그인 테스트 {i+1}/{len(oauth_entries)}: {oauth_entry}"
)
print(f"\n🔄 OAuth 로그인 테스트 {i+1}/{len(oauth_entries)}: {oauth_entry}")
# OAuth 간 대기 시간
if i > 0:
@ -68,7 +77,7 @@ async def main_loop(
"""지정된 URL 목록에 대해 스캔을 실행하는 메인 루프"""
# 재시도 큐 처리기 시작
await start_retry_queue_processor()
target_list = read_lines_between(
filepath=filepath, start_line=start_line, end_line=end_line
)
@ -82,11 +91,13 @@ async def main_loop(
prev_progress = load_progress()
if prev_progress and prev_progress.get("start_line") == start_line:
print("📋 이전 진행 상황을 발견했습니다:")
print(f" - 이전 완료: {prev_progress['current_index']}/{prev_progress['total']}")
print(
f" - 이전 완료: {prev_progress['current_index']}/{prev_progress['total']}"
)
print(f" - 마지막 처리: {prev_progress.get('current_url', 'N/A')}")
resume = input("이어서 진행하시겠습니까? (y/n): ").lower().strip()
if resume == 'y':
if resume == "y":
start_index = prev_progress.get("current_index", 0)
current_progress["current_index"] = start_index
# 전체 개수는 원래 목록 길이로 유지
@ -98,9 +109,13 @@ async def main_loop(
# current_index는 전체 목록에서의 현재 위치를 나타냄
current_url_index = current_progress["current_index"]
current_progress["current_url"] = url
print(f"\n🔄 Processing {current_url_index + 1}/{current_progress['total']}: {url}")
print(f"📍 {os.path.basename(filepath)}{start_line + current_url_index}번째 줄")
print(
f"\n🔄 Processing {current_url_index + 1}/{current_progress['total']}: {url}"
)
print(
f"📍 {os.path.basename(filepath)}{start_line + current_url_index}번째 줄"
)
# 재시도 큐 상태 확인 및 출력
retry_status = await get_retry_queue_status()
@ -116,7 +131,9 @@ async def main_loop(
# 스캔 완료 후 재시도 큐 상태 확인
retry_status_after = await get_retry_queue_status()
if retry_status_after["queue_length"] > 0:
print(f"📊 스캔 완료 후 재시도 큐 상태: {retry_status_after['queue_length']}개 작업 대기 중")
print(
f"📊 스캔 완료 후 재시도 큐 상태: {retry_status_after['queue_length']}개 작업 대기 중"
)
# 다음 URL로 진행
current_progress["current_index"] = current_url_index + 1
@ -128,8 +145,10 @@ async def main_loop(
retry_status = await get_retry_queue_status()
if retry_status["queue_length"] == 0:
break
print(f"⏳ 재시도 큐에 {retry_status['queue_length']}개 작업 남음. 30초 후 다시 확인...")
print(
f"⏳ 재시도 큐에 {retry_status['queue_length']}개 작업 남음. 30초 후 다시 확인..."
)
await asyncio.sleep(30)
print(f"\n🎉 모든 스캔이 완료되었습니다! ({total_count}개 URL)")
print("🎉 재시도 큐도 모두 처리되었습니다!")
print("🎉 재시도 큐도 모두 처리되었습니다!")

View file

@ -3,19 +3,20 @@
import json
import os
def GetSensitiveData():
"""
Reads sensitive data from a .sensitive.json file in the current directory.
Returns:
dict: A dictionary containing the sensitive data.
"""
file_path = os.path.join(os.getcwd(), '.sensitive.json')
file_path = os.path.join(os.getcwd(), ".sensitive.json")
if not os.path.exists(file_path):
return None
with open(file_path, 'r') as file:
with open(file_path, "r") as file:
sensitive_data = json.load(file)
return sensitive_data
return sensitive_data

View file

@ -1,3 +1,2 @@
from lib.llm.create import *
from lib.llm.prompt import *
from lib.llm.prompt import *

View file

@ -4,15 +4,16 @@ from dotenv import load_dotenv
# 환경 변수 로드 (GOOGLE_API_KEY 필요)
load_dotenv(override=True)
def CreateChatGoogle(model: str):
"""Browser Use용 Google 모델 생성"""
if model == "fallback":
print("⚠️ Fallback 모델을 사용합니다. Environment 변수를 확인하세요.")
print("⚠️ Model gemini-2.0-flash-lite를 사용합니다.")
model = "gemini-2.0-flash-lite"
return ChatGoogle(
model=model,
temperature=0.0,
# Browser Use는 내부적으로 재시도 로직을 처리합니다
)
)

View file

@ -1,6 +1,8 @@
from typing import Union, Type
from typing import Type, Union
from pydantic import BaseModel
def get_prompt(type: str) -> tuple[str, Type[BaseModel]] | str:
"""
Prompt를 반환합니다.
@ -9,29 +11,36 @@ def get_prompt(type: str) -> tuple[str, Type[BaseModel]] | str:
:return: 해당하는 프롬프트 문자열 또는 (프롬프트, 모델) 튜플
"""
if type.lower() == "auth":
from lib.llm.prompt._get_oauth import prompt, model
from lib.llm.prompt._get_oauth import model, prompt
return prompt, model
elif type.lower() in ["google", "google account"]:
from lib.llm.prompt.google import prompt, model
from lib.llm.prompt.google import model, prompt
return prompt, model
elif type.lower() in ["microsoft", "microsoftonline"]:
from lib.llm.prompt.microsoft import prompt, model
from lib.llm.prompt.microsoft import model, prompt
return prompt, model
elif type.lower() in ["meta", "facebook"]:
from lib.llm.prompt.facebook import prompt, model
from lib.llm.prompt.facebook import model, prompt
return prompt, model
elif type.lower() in ["apple"]:
from lib.llm.prompt.apple import prompt, model
from lib.llm.prompt.apple import model, prompt
return prompt, model
elif type.lower() in ["github"]:
from lib.llm.prompt.github import prompt, model
from lib.llm.prompt.github import model, prompt
return prompt, model
else:
from lib.llm.prompt._fallback import model, prompt
return prompt, model

View file

@ -1,2 +1,2 @@
from lib.llm.prompt._fallback.prompt import prompt
from lib.llm.prompt._fallback.model import model
from lib.llm.prompt._fallback.prompt import prompt

View file

@ -1,6 +1,9 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = None # "success", "mfa_required", "blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
status: str | None = (
None # "success", "mfa_required", "blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -1,2 +1,2 @@
from lib.llm.prompt._get_oauth.prompt import prompt
from lib.llm.prompt._get_oauth.model import model
from lib.llm.prompt._get_oauth.prompt import prompt

View file

@ -1,5 +1,6 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
url: str | None = None

View file

@ -1,2 +1,2 @@
from lib.llm.prompt.apple.prompt import prompt
from lib.llm.prompt.apple.model import model
from lib.llm.prompt.apple.prompt import prompt

View file

@ -1,6 +1,9 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = None # "success", "mfa_required", "apple_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
status: str | None = (
None # "success", "mfa_required", "apple_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -56,4 +56,4 @@ Return the result in the following format only:
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""
"""

View file

@ -1,2 +1,2 @@
from lib.llm.prompt.facebook.model import model
from lib.llm.prompt.facebook.prompt import prompt
from lib.llm.prompt.facebook.model import model

View file

@ -1,6 +1,9 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = None # "success", "mfa_required", "facebook_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
final_url: str | None = None
status: str | None = (
None # "success", "mfa_required", "facebook_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -1,4 +1,5 @@
import os
# Extended planner prompt
prompt = f"""
You are a web automation agent.
@ -47,4 +48,4 @@ Return the result in the following format only:
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""
"""

View file

@ -1,2 +1,2 @@
from lib.llm.prompt.github.model import model
from lib.llm.prompt.github.prompt import prompt
from lib.llm.prompt.github.model import model

View file

@ -1,6 +1,9 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = None # "success", "mfa_required", "github_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
status: str | None = (
None # "success", "mfa_required", "github_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -67,4 +67,4 @@ Return the result in the following format only:
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""
"""

View file

@ -1,2 +1,2 @@
from lib.llm.prompt.google.prompt import prompt
from lib.llm.prompt.google.model import model
from lib.llm.prompt.google.prompt import prompt

View file

@ -1,6 +1,9 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = None # "success", "mfa_required", "google_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
status: str | None = (
None # "success", "mfa_required", "google_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -55,4 +55,4 @@ Return the result in the following format only:
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""
"""

View file

@ -1,2 +1,2 @@
from lib.llm.prompt.microsoft.prompt import prompt
from lib.llm.prompt.microsoft.model import model
from lib.llm.prompt.microsoft.prompt import prompt

View file

@ -1,6 +1,9 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = None # "success", "mfa_required", "microsoft_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
status: str | None = (
None # "success", "mfa_required", "microsoft_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -54,4 +54,4 @@ Microsoft 로그인에 사용할 자격 증명:
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""
"""

View file

@ -1,7 +1,7 @@
# export from show_info
from lib.utils.agent_info import *
from lib.utils.data import *
from lib.utils.config import *
from lib.utils.data import *
from lib.utils.parsing.is_html import *
from lib.utils.parsing.read_txt import *

View file

@ -1,13 +1,17 @@
import os
from dotenv import load_dotenv
from lib.utils.config import (
BACKEND_URL,
GOOGLE_API_KEY,
GOOGLE_MODEL,
GOOGLE_PLANNER_MODEL,
)
import os
from dotenv import load_dotenv
load_dotenv(override=True)
def show_info():
print("🔧 환경 설정:")
print(browser_use_version())
@ -40,7 +44,10 @@ def browser_use_version():
def env_cheker():
if GOOGLE_API_KEY is None:
raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.")
if GOOGLE_PLANNER_MODEL != None and (not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN") or not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LIST")):
if GOOGLE_PLANNER_MODEL != None and (
not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN")
or not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LIST")
):
print(
"⚠️ GOOGLE_PLANNER_MODEL이 설정되어 있지만, ENABLE_PLANNER_MODEL_OAUTH_LOGIN 또는 ENABLE_PLANNER_MODEL_OAUTH_LIST가 활성화되지 않았습니다."
)
@ -50,9 +57,8 @@ def env_cheker():
print(
"‼️ 하지만 현재 Planner 모델을 사용하는 것이 권장되지 않습니다. 이 기능은 오작동을 일으킬 수 있습니다."
)
print(
"⚠️ 이 경고는 1초동안 정지합니다."
)
print("⚠️ 이 경고는 1초동안 정지합니다.")
# 이 경고는 1초동안 sleep
import time
time.sleep(1)

View file

@ -1,8 +1,10 @@
import os
from dotenv import load_dotenv
load_dotenv(verbose=True, override=True)
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:11081")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash")
GOOGLE_PLANNER_MODEL = os.getenv("GOOGLE_PLANNER_MODEL")
GOOGLE_PLANNER_MODEL = os.getenv("GOOGLE_PLANNER_MODEL")

View file

@ -2,6 +2,7 @@ import requests
from lib.utils.config import BACKEND_URL
def notify_backend(target_url):
# Backend에 스캔 시작을 알림
try:

View file

@ -1,9 +1,10 @@
from pathlib import Path
from datetime import datetime
from pathlib import Path
# 미리 정해진 파일 경로
FILE_PATH = Path("data/log.txt")
def logger(msg: str) -> None:
try:
"""
@ -13,7 +14,7 @@ def logger(msg: str) -> None:
"""
# 상위 디렉터리 생성 (이미 있으면 무시)
FILE_PATH.parent.mkdir(parents=True, exist_ok=True)
# 현재 시각 구해서 포맷팅
now = datetime.now()
timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
@ -26,4 +27,4 @@ def logger(msg: str) -> None:
with FILE_PATH.open(mode="a", encoding="utf-8") as f:
f.write(line)
except:
print(msg)
print(msg)

View file

@ -1,32 +1,34 @@
import requests
def is_html_url(url: str, timeout: float = 10.0) -> bool:
"""
주어진 URL에 HEAD 요청을 보내고, 응답 헤더의 Content-Type이 HTML인지 확인합니다.
- url: 검사할 URL 문자열
- timeout: 요청 타임아웃( 단위)
반환값:
- Content-Type이 'text/html' 시작하면 True, 그렇지 않으면 False
"""
try:
with requests.get(url, timeout=timeout, stream=True) as response:
# 응답 코드가 200번대가 아니면 False로 간주
if not response.ok:
return False
content_type = response.headers.get('Content-Type', '')
content_type = response.headers.get("Content-Type", "")
# Content-Type에 'text/html'이 포함되어 있으면 HTML로 간주
return content_type.lower().startswith('text/html')
return content_type.lower().startswith("text/html")
except requests.RequestException:
return False
if __name__ == '__main__':
if __name__ == "__main__":
test_urls = [
'https://www.example.com',
'https://api.github.com', # JSON API라서 HTML이 아닐 확률이 높음
'https://raw.githubusercontent.com' # 텍스트 파일 등 다양한 타입
"https://www.example.com",
"https://api.github.com", # JSON API라서 HTML이 아닐 확률이 높음
"https://raw.githubusercontent.com", # 텍스트 파일 등 다양한 타입
]
for url in test_urls:

View file

@ -1,6 +1,6 @@
def read_lines_between(filepath: str, start_line: int, end_line: int) -> list[str]:
"""
파일에서 start_line번 줄부터 end_line번 줄까지 읽어와
파일에서 start_line번 줄부터 end_line번 줄까지 읽어와
줄을 요소로 갖는 리스트를 반환하는 함수.
Parameters:
@ -15,15 +15,17 @@ def read_lines_between(filepath: str, start_line: int, end_line: int) -> list[st
Returns:
-------
list[str]
줄을 문자열로 저장한 리스트.
줄을 문자열로 저장한 리스트.
파일에 해당 범위의 줄이 없으면 가능한 만큼만 반환.
"""
if start_line < 1 or end_line < start_line:
raise ValueError("start_line은 1 이상이어야 하며, end_line은 start_line 이상이어야 합니다.")
raise ValueError(
"start_line은 1 이상이어야 하며, end_line은 start_line 이상이어야 합니다."
)
selected_lines: list[str] = []
with open(filepath, 'r', encoding='utf-8') as f:
with open(filepath, "r", encoding="utf-8") as f:
for idx, line in enumerate(f, start=1):
if idx < start_line:
# 아직 읽기 시작 전
@ -32,5 +34,5 @@ def read_lines_between(filepath: str, start_line: int, end_line: int) -> list[st
# 읽을 범위를 벗어났으므로 중단
break
# 줄 끝의 개행 문자를 제거하고 리스트에 추가
selected_lines.append(line.rstrip('\n'))
selected_lines.append(line.rstrip("\n"))
return selected_lines

View file

@ -7,12 +7,14 @@ from pathlib import Path
current_progress = {"current_index": 0, "total": 0, "current_url": "", "start_line": 0}
progress_file = Path("data/scan_progress.json")
def save_progress():
"""현재 진행 상황을 파일에 저장"""
progress_file.parent.mkdir(parents=True, exist_ok=True)
with open(progress_file, "w", encoding="utf-8") as f:
json.dump(current_progress, f, ensure_ascii=False, indent=2)
def load_progress():
"""이전 진행 상황을 파일에서 불러오기"""
if os.path.exists(progress_file):
@ -23,6 +25,7 @@ def load_progress():
return None
return None
def signal_handler(signum, frame):
"""Ctrl+C 시그널 핸들러"""
print("\n" + "=" * 60)
@ -34,7 +37,7 @@ def signal_handler(signum, frame):
print(
f" - domains.txt의 {current_progress['start_line'] + current_progress['current_index']}번째 줄"
)
if current_progress['total'] > 0:
if current_progress["total"] > 0:
print(
f" - 진행률: {current_progress['current_index']}/{current_progress['total']} ({current_progress['current_index']/current_progress['total']*100:.1f}%)"
)
@ -43,6 +46,7 @@ def signal_handler(signum, frame):
print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.")
exit(0)
def setup_signal_handler():
"""시그널 핸들러 등록"""
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGINT, signal_handler)

View file

@ -1,32 +1,35 @@
import asyncio
import argparse
import asyncio
import os
import sys
from dotenv import load_dotenv
from lib.utils import env_cheker
from lib.browser_use.scanner import main_loop
from lib.utils.progress import setup_signal_handler, progress_file
# .env 파일 로드
load_dotenv(verbose=True, override=True)
# 환경 변수 체크
env_cheker()
# Laminar 초기화 (선택적)
if os.getenv("LMNR_PROJECT_API_KEY"):
try:
from lmnr import Laminar
Laminar.initialize(project_api_key=os.getenv("LMNR_PROJECT_API_KEY"))
except ImportError:
print("⚠️ Laminar 라이브러리가 설치되지 않았습니다. 관련 기능이 비활성화됩니다.")
from lib.utils import env_cheker
from lib.utils.progress import progress_file, setup_signal_handler
def main():
"""애플리케이션 메인 진입점"""
# 시그널 핸들러 설정
setup_signal_handler()
def setup_environment():
"""환경 변수 로드 및 관련 라이브러리를 초기화합니다."""
# .env 파일 로드
load_dotenv(verbose=True, override=True)
# 환경 변수 체크
env_cheker()
# Laminar 초기화 (선택적)
if os.getenv("LMNR_PROJECT_API_KEY"):
try:
from lmnr import Laminar
Laminar.initialize(project_api_key=os.getenv("LMNR_PROJECT_API_KEY"))
except ImportError:
print("⚠️ Laminar 라이브러리가 설치되지 않았습니다. 관련 기능이 비활성화됩니다.")
def parse_arguments():
"""커맨드 라인 인자를 파싱합니다."""
parser = argparse.ArgumentParser(
prog="domain_scanner",
description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다.",
@ -48,11 +51,18 @@ def main():
parser.add_argument(
"-skh",
"--skip-html-check",
action='store_true', # 플래그 형식으로 변경
action="store_true",
help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다.",
)
args = parser.parse_args()
return parser.parse_args()
def main():
"""애플리케이션 메인 진입점"""
setup_environment()
setup_signal_handler()
args = parse_arguments()
try:
asyncio.run(
@ -64,16 +74,17 @@ def main():
)
)
except KeyboardInterrupt:
# signal_handler가 처리하므로 여기서는 별도 처리 불필요
pass
print("\n프로그램이 사용자에 의해 중단되었습니다.")
sys.exit(1)
finally:
# 정상 종료 시 진행 상황 파일 삭제
if os.path.exists(progress_file):
try:
os.remove(progress_file)
print("진행 상황 파일이 삭제되었습니다.")
except OSError as e:
print(f"오류: 진행 상황 파일을 삭제하지 못했습니다. {e}")
print(f"오류: 진행 상황 파일을 삭제하지 못했습니다. {e}", file=sys.stderr)
if __name__ == "__main__":
main()
main()