diff --git a/.env.example b/.env.example index 7a97499..99f4232 100644 --- a/.env.example +++ b/.env.example @@ -11,6 +11,9 @@ GOOGLE_MODEL=gemini-2.5-flash INITIAL_BACKOFF=60 MAX_BACKOFF=600 +#ENABLE_PLANNER_MODEL_OAUTH_LOGIN=true # OAuth 로그인 시 Planner 모델을 활성화합니다. +#ENABLE_PLANNER_MODEL_OAUTH_LIST=true # OAuth List를 찾을 때 Planner 모델을 활성화합니다. + # ========== Monitoring ========== # 선택 diff --git a/.gitignore b/.gitignore index d53f32f..ef2b333 100644 --- a/.gitignore +++ b/.gitignore @@ -83,5 +83,6 @@ my.sh log.txt data/ +!src/lib/utils/data -# End of https://www.toptal.com/developers/gitignore/api/macos,windows \ No newline at end of file +# End of https://www.toptal.com/developers/gitignore/api/macos,windows \ No newline at end of file diff --git a/README.md b/README.md index 9128804..8d8cec8 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ > 그렇지 않으면 실행되지 않습니다. > > 윈도우 환경에서는 `sudo certutil -addstore root mitmproxy-ca-cert.cer`로 인증합니다. -> +> > Sudo가 활성화되어있지 않은 환경에서는 관리자로 상향된 쉘에서 실행합니다. > > MacOS 환경에서는 `sudo security add-trusted-cert -d -p ssl -p basic -k /Library/Keychains/System.keychain ~/.mitmproxy/mitmproxy-ca-cert.pem`으로 인증합니다. @@ -20,6 +20,10 @@ > 다른 플렛폼은 수동으로 설정되어야만 합니다. > https://docs.mitmproxy.org/stable/concepts/certificates/ +현재 아래와 같은 환경에서 개발되며 테스트되고 있습니다. +- ✅ MacOS 26 Tahoe Developer Beta 2 (25A5295e) en-US aarch64 +- ✅ Windows 11 Pro for Workstations 24H2 (26100.4351) en-US x86_64 +- ✅ NixOS 25.05.804570.c7ab75210cb8 KDE 6 / Linux 6.15 x86_64 --- 다음과 같은 명령어로 환경을 설정합니다. @@ -48,7 +52,7 @@ venv와 패키지가 설치가 됩니다. 스텔스 기능 때문에 Google Chrome이 필요합니다. -만약 설치가 되어 있지 않다면 +만약 설치가 되어 있지 않다면 ``` playwright install chrome ``` @@ -76,7 +80,7 @@ uv run playwright open https://google.com/ --save-storage=./data/storage_state.j `.sensitive.example.json`을 `.sensitive.json`으로 복사해서 안에 있는 예시 내용을 참고해서 작성해주시면 됩니다. -더 자세한 내용은 +더 자세한 내용은 [Sensitive Data - Browser Use](https://docs.browser-use.com/customize/sensitive-data)를 참고하시면 좋을 것 같습니다. [Sensitive Data - Browser Use](https://docs.browser-use.com/customize/sensitive-data)에서도 권장하지 않는 방법인만큼 애매하긴 하지만 쿠키와 로컬 스토리지를 저장하기 어려운 경우나 일부 flow에서 접근이 어려운 경우 사용해주세요. @@ -86,6 +90,14 @@ uv run playwright open https://google.com/ --save-storage=./data/storage_state.j --- +# 윈도우 인코딩 이슈 해결 +이거 해결 방법 +![image](https://github.com/user-attachments/assets/01ca45c2-fda9-44fb-83fc-39daa7e52092) + +![](./docs/encode.png) +이것도 setup.py 사용하면 반자동으로 할 수 있습니다. + +못찾겠으면 intl.cpl 열어주세요. # 실행 @@ -96,10 +108,32 @@ curl "https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt" -o domains.txt ``` ```sh -# uv run run.py {domains.txt 시작 줄} {domains.txt 끝 줄} {HTML 검사 Skip} -uv run run.py 12540 13000 False +# uv run run.py {domains.txt 시작 줄} {domains.txt 끝 줄} {--skh} {--no-download} +uv run run.py 1 100 --skh ``` +# Prompt 확장 가이드 + +## 1. 파일 생성 + +`lib/llm/prompt` 폴더로 + +![](./docs/list.png) + +fallback.py를 복사하여 + +원하는 프로바이더를 추가해줍니다. `ex) lib/llm/prompt/Google.py` + +## 2. __init__.py 수정 + +![](./docs/guide.png) + +Prompt에서 추가한 파일을 __init__.py에서 import합니다. + +## 3. 파일 수정 + +생성한 파일에서 프롬프트를 수정합니다. + # 참고하면 좋을만한 것 - [ ] 일부 웹사이트는 사용자의 언어에 따라 OAuth 옵션을 바꾸기도 합니다. diff --git a/docs/encode.png b/docs/encode.png new file mode 100644 index 0000000..5eb0a20 Binary files /dev/null and b/docs/encode.png differ diff --git a/docs/guide.png b/docs/guide.png new file mode 100644 index 0000000..0ddc9e7 Binary files /dev/null and b/docs/guide.png differ diff --git a/docs/list.png b/docs/list.png new file mode 100644 index 0000000..1a005de Binary files /dev/null and b/docs/list.png differ diff --git a/lib/utils/browser_use/__init__.py b/lib/utils/browser_use/__init__.py deleted file mode 100644 index c38ca03..0000000 --- a/lib/utils/browser_use/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -from lib.utils.browser_use.func import * - -# Initialize configuration -proxy_url = setup_proxy() - -# Create browser profile -async def GetProfile(): - storage_state_path = await setup_storage_state() - profile = BrowserProfile( - # Security settings - disable_security=True, - stealth=True, - - # Display settings - headless=False, - device_scale_factor=1, - window_size={"width": 1600, "height": 900}, - viewport={"width": 1600, "height": 900}, - - # Data persistence - user_data_dir=None, - storage_state=storage_state_path, - - # Network settings - proxy={"server": proxy_url} if proxy_url else None, - - # Additional arguments - args=get_browser_args(), - ) - - return profile \ No newline at end of file diff --git a/lib/utils/browser_use/model.py b/lib/utils/browser_use/model.py deleted file mode 100644 index e4397be..0000000 --- a/lib/utils/browser_use/model.py +++ /dev/null @@ -1,11 +0,0 @@ -from typing import List -from pydantic import BaseModel - -# 출력 모델 -class OAuth(BaseModel): - provider: str - oauth_uri: str - - -class OAuthList(BaseModel): - oauth_providers: List[OAuth] \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index 70f864e..0000000 --- a/main.py +++ /dev/null @@ -1,306 +0,0 @@ -import asyncio -import json -import os -import csv -import argparse -from pathlib import Path -import signal - -from dotenv import load_dotenv - -from browser_use import ( - Agent, - BrowserSession, - Controller, - ActionResult, -) -from patchright.async_api import async_playwright as async_patchright, Page -from pydantic import BaseModel - -from lib.utils import env_cheker -from lib.utils.backend_client import notify_backend -from lib.utils.browser_use import model -from lib.utils.browser_use.clean_resources import clean_resources -from lib.utils.browser_use.func import setup_storage_state -from lib.utils.browser_use.sensitive_data import GetSensitiveData -from lib.utils.config import BACKEND_URL, GOOGLE_MODEL, GOOGLE_PLANNER_MODEL -from lib.utils.is_html import is_html_url -from lib.utils.read_txt import read_lines_between -from lib.llm.prompt import extend_planner_system_message -from lib.utils.logger import logger -import lib.utils.browser_use as browser_use -from lib.llm import CreateChatGoogleGenerativeAI - -load_dotenv(verbose=True, override=True) - -# Exponential backoff settings -INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", "60")) # seconds -MAX_BACKOFF = int(os.getenv("MAX_BACKOFF", "600")) # seconds - -# 진행 상황 추적을 위한 전역 변수 -current_progress = {"current_index": 0, "total": 0, "current_url": "", "start_line": 0} -progress_file = Path("data/scan_progress.json") - -env_cheker() -if os.getenv("LMNR_PROJECT_API_KEY"): - from lmnr import Laminar - - Laminar.initialize(project_api_key=os.getenv("LMNR_PROJECT_API_KEY")) - - -def save_progress(): - """현재 진행 상황을 파일에 저장""" - with open(progress_file, "w", encoding="utf-8") as f: - json.dump(current_progress, f, ensure_ascii=False, indent=2) - - -def load_progress(): - """이전 진행 상황을 파일에서 불러오기""" - if os.path.exists(progress_file): - try: - with open(progress_file, "r", encoding="utf-8") as f: - return json.load(f) - except: - return None - return None - - -def signal_handler(signum, frame): - """Ctrl+C 시그널 핸들러""" - print("\n" + "=" * 60) - print("🛑 스캔이 중단되었습니다!") - print(f"📊 진행 상황:") - print(f" - 전체: {current_progress['total']}개 URL") - print(f" - 완료: {current_progress['current_index']}개 URL") - print(f" - 현재 처리 중: {current_progress['current_url']}") - print( - f" - domains.txt의 {current_progress['start_line'] + current_progress['current_index']}번째 줄" - ) - print( - f" - 진행률: {current_progress['current_index']}/{current_progress['total']} ({current_progress['current_index']/current_progress['total']*100:.1f}%)" - ) - print("=" * 60) - save_progress() - print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.") - exit(0) - - -# 시그널 핸들러 등록 -signal.signal(signal.SIGINT, signal_handler) - - -# ── URL별로 Browser를 새로 띄우는 함수 ── -async def scan_one_url(url: str, skip_html_check: bool = False): - await setup_storage_state() - target_url = url if url.startswith("http") else f"https://{url}" - print(f"🚀 Starting scan for: {target_url}") - - # 1) URL이 HTML 페이지인지 확인 - if not is_html_url(target_url) and not skip_html_check: - print(f"❌ {target_url} 은(는) HTML이 아닙니다. 스킵합니다.") - return - - # Backend에 스캔 시작을 알림 - notify_backend(target_url) - - agent = None - session = None - try_cnt = 0 - while True: - # BrowserSession에 profile 전달 - session = BrowserSession( - playwright=(await async_patchright().start()), - browser_profile=await browser_use.GetProfile(), - ) - - # Agent 생성 및 실행 (단일 try-except with 백오프) - initial_actions = [{"open_tab": {"url": target_url}}] - controller = Controller( - output_model=model.BaseModel, - exclude_actions=["search_google", "unknown_action", "unkown"], - ) - - print("🤖 LLM 모델 초기화 및 스캔 시작...") - print("Available actions:", list(controller.registry.registry.actions.keys())) - try: - agent = Agent( - browser_session=session, - initial_actions=initial_actions, - sensitive_data=GetSensitiveData(), - task=( - "Navigate to the login page, identify all OAuth provider buttons (excluding Passkey), " - "and for each one: click the button, follow the full OAuth login flow as far as possible " - "with a real user account (without using a fake or non-existent account), and capture the " - "final redirect URL after login. Do not stop at just collecting the initial authorization URL—" - "actually perform the login step like a real user would. " - "If the OAuth buttons do not appear immediately, wait briefly to allow the page to load completely before proceeding. " - "Always log out before starting the login process, and make sure to attempt the login again from a clean state." - ), - llm=CreateChatGoogleGenerativeAI(GOOGLE_MODEL), - planner_llm=( - CreateChatGoogleGenerativeAI(GOOGLE_PLANNER_MODEL) - if GOOGLE_PLANNER_MODEL - else None - ), - controller=controller, - extend_planner_system_message=extend_planner_system_message, - ) - response = await agent.run() - final_result = response.final_result() - - if final_result is None: - raise ValueError("final_result()가 None을 반환했습니다.") - except Exception as e: - await clean_resources(agent, session) - # API 쿼터 문제인지 확인 - if "ResourceExhausted" in str(e) or "429" in str(e): - wait = min(INITIAL_BACKOFF * (2**try_cnt), MAX_BACKOFF) - print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...") - await asyncio.sleep(wait) - try_cnt += 1 - if try_cnt >= 3: - print(f"❌ {url} 스캔 실패: API 쿼터 문제가 지속됩니다.") - logger(f"❌ {url} 스캔 실패: API 쿼터 문제: {e}") - return - continue - # 일반 에러 처리 - try_cnt += 1 - if try_cnt >= 3: - print(f"❌ {url} 스캔 실패: 에러: {e}") - logger(f"❌ {url} 스캔 실패: 에러: {e}") - return - print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...") - await asyncio.sleep(30) - continue - - # 스캔 결과 처리 - data = json.loads(final_result) - try: - oauth_entries = [model.OAuth(**entry) for entry in data["oauth_providers"]] - except Exception as e: - raise ValueError(f"결과 파싱 실패: {e}\n원본 결과: {final_result}") - - print("-" * 50) - print(f"🔗 Scanned URL: {url}\n") - print("🔐 Detected OAuth Providers and URLs:") - for entry in oauth_entries: - if "<" in entry.oauth_uri or "..." in entry.oauth_uri: - print( - f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n" - ) - else: - print(f"- {entry.provider}: {entry.oauth_uri}") - print("-" * 50) - - # CSV에 저장 (append) - csv_file = "./data/oauth_providers.csv" - file_exists = os.path.isfile(csv_file) - with open(csv_file, "a", newline="", encoding="utf-8") as f: - writer = csv.writer(f) - if not file_exists: - writer.writerow(["issuer", "provider", "oauth_uri"]) - for entry in oauth_entries: - writer.writerow([url, entry.provider, entry.oauth_uri]) - await clean_resources(agent, session) - break - - -async def loop( - filepath: str, start_line: int, end_line: int, skip_html_check: bool = False -): - # 인자값으로 받은 파일 경로와 줄 범위를 통해 도메인 리스트 생성 - target_list = read_lines_between( - filepath=filepath, start_line=start_line, end_line=end_line - ) - - # 진행 상황 초기화 - current_progress["total"] = len(target_list) - current_progress["start_line"] = start_line - current_progress["current_index"] = 0 - - # 이전 진행 상황 확인 - prev_progress = load_progress() - if prev_progress and prev_progress.get("start_line") == start_line: - print(f"📋 이전 진행 상황을 발견했습니다:") - print( - f" - 이전 완료: {prev_progress['current_index']}/{prev_progress['total']}" - ) - print(f" - 마지막 처리: {prev_progress.get('current_url', 'N/A')}") - - resume = input("이어서 진행하시겠습니까? (y/n): ").lower().strip() - if resume == "y": - current_progress["current_index"] = prev_progress["current_index"] - target_list = target_list[current_progress["current_index"] :] - print(f"✅ {current_progress['current_index']}번째부터 재개합니다.") - - # (필요하다면) 강제 설정이 필요한 경우, 아래 주석을 해제하여 target_list[0] 등을 덮어쓸 수 있습니다. - # target_list[0] = "velog.io" - - for i, url in enumerate(target_list): - actual_index = current_progress["current_index"] + i - current_progress["current_url"] = url - current_progress["current_index"] = actual_index - - print(f"\n🔄 Processing {actual_index + 1}/{current_progress['total']}: {url}") - print(f"📍 domains.txt의 {start_line + actual_index}번째 줄") - - # URL들 사이에 API 쿼터 회복을 위한 대기 시간 추가 - if actual_index > 0: - print("⏳ API 쿼터 보호를 위해 30초 대기 중...") - await asyncio.sleep(30) - - await scan_one_url(url, skip_html_check=skip_html_check) - - # 진행 상황 저장 - current_progress["current_index"] = actual_index + 1 - save_progress() - - print(f"\n🎉 모든 스캔이 완료되었습니다! ({current_progress['total']}개 URL)") - # 완료 후 진행 상황 파일 삭제 - if os.path.exists(progress_file): - os.remove(progress_file) - - -def main(): - parser = argparse.ArgumentParser( - prog="domain_scanner", - description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다.", - ) - - # 커맨드라인 인자로 받을 옵션들 정의 - parser.add_argument( - "-f", - "--file", - type=str, - required=True, - help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)", - ) - parser.add_argument( - "-s", "--start", type=int, required=True, help="읽기 시작 줄 번호 (1-based)" - ) - parser.add_argument( - "-e", "--end", type=int, required=True, help="읽기 종료 줄 번호 (1-based)" - ) - parser.add_argument( - "-skh", - "--skip-html-check", - type=bool, - default=False, - help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다. (기본값: False)", - ) - - args = parser.parse_args() - - # 인자값을 비동기 함수에 전달 - asyncio.run( - loop( - filepath=args.file, - start_line=args.start, - end_line=args.end, - skip_html_check=args.skip_html_check, - ) - ) - - -if __name__ == "__main__": - main() diff --git a/pyproject.toml b/pyproject.toml index 1b2f537..4c453ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,6 +6,7 @@ readme = "README.md" requires-python = ">=3.13" dependencies = [ "browser-use[memory]==0.3.2", + "chardet>=5.2.0", "lmnr[all]>=0.6.10", "patchright>=1.52.5", ] diff --git a/run.py b/run.py index 30e369e..e3b27b4 100644 --- a/run.py +++ b/run.py @@ -8,7 +8,7 @@ import argparse #!/usr/bin/env python3 # ── 설정 부분 ── -PYTHON_SCRIPT = "main.py" +PYTHON_SCRIPT = "./src/main.py" DOMAIN_FILE = "./data/domains.txt" # ───────────── @@ -35,13 +35,16 @@ def run_script(start_line, end_line, skh_option): print(f"[{current_time}] Processing lines {start_line} to {end_line}...") try: - subprocess.run([ + command = [ "uv", "run", PYTHON_SCRIPT, "-f", DOMAIN_FILE, "-s", str(start_line), "-e", str(end_line), - "-skh", str(skh_option) - ], check=True) + ] + if skh_option: + command.append("--skip-html-check") + + subprocess.run(command, check=True) except subprocess.CalledProcessError: print("Python 스크립트 실행 실패") sys.exit(1) @@ -52,9 +55,9 @@ def main(): formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 사용 예시: - python run.py 10000 11000 # 10000~11000 라인 처리 - python run.py 10000 11000 --skh # SKH 옵션 활성화 - python run.py 10000 11000 --no-download # 다운로드 생략 + uv run run.py 10000 11000 # 10000~11000 라인 처리 + uv run run.py 10000 11000 --skh # SKH 옵션 활성화 + uv run run.py 10000 11000 --no-download # 다운로드 생략 """ ) @@ -70,8 +73,8 @@ def main(): print("라인 번호는 0 이상이어야 합니다.") sys.exit(1) - if args.start_line >= args.end_line: - print("시작 라인은 종료 라인보다 작아야 합니다.") + if args.start_line > args.end_line: + print("시작 라인은 종료 라인보다 크거나 같아야 합니다.") sys.exit(1) # 도메인 파일 다운로드 diff --git a/setup.py b/setup.py index 5823705..6cf42d8 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,6 @@ import os import subprocess +import webbrowser os.makedirs(os.path.dirname("./data"), exist_ok=True) @@ -9,7 +10,7 @@ def create_file_from_example(target: str, example: str) -> bool: with open(example, 'r', encoding='utf-8') as example_file, \ open(target, 'w', encoding='utf-8') as target_file: target_file.write(example_file.read()) - os.startfile(target) + #os.startfile(target) print(f"✅ {target} 파일이 {example}에서 생성되었습니다.") return True else: @@ -36,6 +37,45 @@ def prompt_yes_no(message: str) -> bool: print(message, end="") return input().strip().lower() in ['y', 'yes'] +def i_dont_like_windows(): + # Windows인지 확인 + if os.name != 'nt': + return + else: + # run (Get-ItemProperty "HKLM:\SYSTEM\CurrentControlSet\Control\Nls\CodePage").ACP + try: + result = subprocess.run( + ['powershell', '-Command', '(Get-ItemProperty "HKLM:\\SYSTEM\\CurrentControlSet\\Control\\Nls\\CodePage").ACP'], + capture_output=True, + text=True, + check=True + ) + acp = result.stdout.strip() + if acp == '65001': + print("현재 Active Code Page가 UTF-8로 설정되어 있습니다.") + return + else: + print("현재 Active Code Page가 UTF-8로 설정되어 있지 않습니다.") + except subprocess.CalledProcessError as e: + print(f"코드 페이지 확인 실패: {e}") + print("=======================================================") + print("\n⚠️ Windows에서는 인코딩 문제가 발생합니다.") + print("👉 엔터를 누르면 자동으로 intl.cpl이 열립니다.") + print("👉 자세한 내용은 README.md에서 \"윈도우 인코딩 해결\"을 참조해주세요.\n") + print("⚠️ 경고 : 이 작업은 윈도우에서 킹갓 대한민국의 프로그램들의 한글이 정상적으로 표시되지 않을 수 있습니다.") + # Pause + input("계속하려면 Enter 키를 누르세요...") + + webbrowser.open('intl.cpl') + + print("👉 intl.cpl가 열렸습니다.\n") + print("👉 관리자 옵션 -> 시스템 로켈 변경") + print("👀 Beta: 세계 언어 지원을 위해 Unicode UTF-8 사용") + print("👉 이 설정을 변경한 후, 시스템을 재시작하세요.\n") + print("⚠️ 이 작업은 시스템 언어 설정을 변경하므로 주의가 필요합니다.\n") + print("=======================================================") + input("계속하려면 Enter 키를 누르세요...") + def setup_storage(): print("\n🔧 쿠키와 로컬 스토리지를 설정하시겠습니까?") @@ -78,11 +118,15 @@ if __name__ == "__main__": install_playwright_chrome() print("=====================================================") - # 3. 쿠키와 로컬 스토리지 설정 + # 3. Windows 인코딩 문제 해결 + i_dont_like_windows() + print("=====================================================") + + # 4. 쿠키와 로컬 스토리지 설정 setup_storage() print("=====================================================") - # 4. .sensitive.json 생성 + # 5. .sensitive.json 생성 setup_sensitive() print("=====================================================") print("🎉 초기 설정이 완료되었습니다! 이제 스크립트를 실행할 준비가 되었습니다.") diff --git a/src/lib/browser_use/__init__.py b/src/lib/browser_use/__init__.py new file mode 100644 index 0000000..6814f2a --- /dev/null +++ b/src/lib/browser_use/__init__.py @@ -0,0 +1,7 @@ +from lib.browser_use.clean_resources import * +from lib.browser_use.func import * +from lib.browser_use.model import * +from lib.browser_use.init_profile import * +from lib.browser_use.sensitive_data import * +from lib.browser_use.agents import * +from lib.browser_use.scanner import * \ No newline at end of file diff --git a/src/lib/browser_use/agents.py b/src/lib/browser_use/agents.py new file mode 100644 index 0000000..9711e3c --- /dev/null +++ b/src/lib/browser_use/agents.py @@ -0,0 +1,167 @@ +import asyncio +import os +import json + +from browser_use import Agent, BrowserSession, Controller +from patchright.async_api import async_playwright as async_patchright + +from lib.browser_use import ( + GetProfile, + GetSensitiveData, + clean_resources, +) +from lib.utils import ( + logger, + config, +) +from lib.llm import CreateChatGoogleGenerativeAI, get_prompt +import lib.browser_use.model as model + +# Exponential backoff settings +INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", "60")) # seconds +MAX_BACKOFF = int(os.getenv("MAX_BACKOFF", "600")) # seconds + +async def _run_agent_with_retry(agent_config): + """Agent 실행을 위한 내부 헬퍼 함수 (재시도 로직 포함)""" + agent = None + session = None + try_cnt = 0 + url = agent_config["url"] + + while try_cnt < 3: + try: + session = BrowserSession( + playwright=(await async_patchright().start()), + browser_profile=await GetProfile(), + ) + + agent = Agent( + browser_session=session, + **agent_config["agent_params"] + ) + + response = await agent.run() + await clean_resources(agent, session) + return response + + except Exception as e: + await clean_resources(agent, session) + + if "ResourceExhausted" in str(e) or "429" in str(e): + wait = min(INITIAL_BACKOFF * (2**try_cnt), MAX_BACKOFF) + print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...") + await asyncio.sleep(wait) + try_cnt += 1 + if try_cnt >= 3: + error_msg = f"API 쿼터 문제가 지속됩니다." + logger(f"❌ {url} - {agent_config['log_context']} 실패: {error_msg}: {e}") + print(f"❌ {url} - {agent_config['log_context']} 실패: {error_msg}") + return None + continue + + # 일반 에러 처리 + try_cnt += 1 + if try_cnt >= 3: + error_msg = f"최대 재시도 횟수 초과." + logger(f"❌ {url} - {agent_config['log_context']} 실패: {error_msg}: {e}") + print(f"❌ {url} - {agent_config['log_context']} 실패: {error_msg}") + return None + + print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...") + await asyncio.sleep(30) + continue + return None + + +async def extract_oauth_list(url: str): + """첫 번째 Agent: 로그인 페이지를 찾고 OAuth 리스트만 추출""" + target_url = url if url.startswith("http") else f"https://{url}" + print(f"🔎 OAuth 리스트 추출 시작: {target_url}") + + agent_config = { + "url": target_url, + "log_context": "OAuth 리스트 추출", + "agent_params": { + "initial_actions": [{"open_tab": {"url": target_url}}], + "sensitive_data": GetSensitiveData(), + "task": ( + "Navigate to the login page and identify all OAuth provider buttons (excluding Passkey). " + "DO NOT click any OAuth buttons or attempt to login. " + "Just find and list all available OAuth providers with their button texts or provider names. " + "Return a list of OAuth providers found on the login page." + ), + "llm": CreateChatGoogleGenerativeAI(config.GOOGLE_MODEL), + "planner_llm": ( + CreateChatGoogleGenerativeAI(config.GOOGLE_PLANNER_MODEL) + if config.GOOGLE_PLANNER_MODEL + else None + ), + "controller": Controller( + output_model=model.OAuthList, + exclude_actions=["search_google", "unknown_action", "unkown"], + ), + "extend_planner_system_message": get_prompt("auth"), + } + } + + response = await _run_agent_with_retry(agent_config) + + if not response: + return [] + + final_result = response.final_result() + if not final_result: + print("OAuth 리스트 추출 결과가 없습니다.") + return [] + + try: + data = json.loads(final_result) + oauth_providers = data.get("oauth_providers", []) + return [model.OAuth(provider=provider) for provider in oauth_providers] + except (json.JSONDecodeError, KeyError) as e: + print(f"❌ 결과 파싱 실패: {e}") + logger(f"❌ {url} 결과 파싱 실패: {final_result}") + return [] + + +async def test_oauth_login(url: str, oauth_provider: str): + """두 번째 Agent: 특정 OAuth 제공자로 로그인 시도""" + target_url = url if url.startswith("http") else f"https://{url}" + print(f"🔐 {oauth_provider} 로그인 시작: {target_url}") + + agent_config = { + "url": target_url, + "log_context": f"{oauth_provider} 로그인", + "agent_params": { + "initial_actions": [{"open_tab": {"url": target_url}}], + "sensitive_data": GetSensitiveData(), + "task": ( + f"Navigate to the login page, find and click the {oauth_provider} OAuth button, " + f"then follow the complete OAuth login flow as far as possible with a real user account. " + f"Capture the final redirect URL after login completion. " + f"If login fails or encounters errors, report the issue. " + f"Focus only on {oauth_provider} - ignore other OAuth providers." + ), + "llm": CreateChatGoogleGenerativeAI(config.GOOGLE_MODEL), + "planner_llm": ( + CreateChatGoogleGenerativeAI(config.GOOGLE_PLANNER_MODEL) + if config.GOOGLE_PLANNER_MODEL and os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN") + else None + ), + "controller": Controller( + exclude_actions=["search_google", "unknown_action", "unkown"], + ), + "extend_planner_system_message": get_prompt(oauth_provider), + } + } + + response = await _run_agent_with_retry(agent_config) + + if response and response.final_result(): + final_result = response.final_result() + print(f"✅ {oauth_provider} 로그인 완료") + logger(f"✅ {url} - {oauth_provider} 로그인 결과: {final_result}") + return True + + print(f"❌ {oauth_provider} 로그인 실패") + return False \ No newline at end of file diff --git a/lib/utils/browser_use/clean_resources.py b/src/lib/browser_use/clean_resources.py similarity index 100% rename from lib/utils/browser_use/clean_resources.py rename to src/lib/browser_use/clean_resources.py diff --git a/lib/utils/browser_use/func.py b/src/lib/browser_use/func.py similarity index 75% rename from lib/utils/browser_use/func.py rename to src/lib/browser_use/func.py index 31eee31..5c2faa8 100644 --- a/lib/utils/browser_use/func.py +++ b/src/lib/browser_use/func.py @@ -1,11 +1,48 @@ import os +import json from pathlib import Path from dotenv import load_dotenv from browser_use import BrowserProfile +import json +import os # Load environment variables load_dotenv(override=True) +async def setup_storage_state(): + """Setup browser storage state for session persistence.""" + # Get the script directory to ensure correct path resolution + script_dir = Path(__file__).parent.parent.parent.parent + storage_state_path = script_dir / "data" / "storage_state.json" + storage_state_temp_path = script_dir / "data" / "storage_state_temp.json" + + print(f"📂 Storage state path: {storage_state_path}") + print(f"📂 Temp storage state path: {storage_state_temp_path}") + + if storage_state_path.exists(): + try: + if storage_state_temp_path.exists(): + storage_state_temp_path.unlink() + + with open(storage_state_path, 'r') as f: + storage_data = json.load(f) + + with open(storage_state_temp_path, 'w') as f: + json.dump(storage_data, f, indent=4) + + print(f"🔄 Using existing storage state: {storage_state_temp_path}") + return str(storage_state_temp_path) + + except Exception as e: + print(f"⚠️ Error processing storage state: {e}") + if storage_state_temp_path.exists(): + storage_state_temp_path.unlink() + return None + + print("⚠️ No existing storage state found") + return None + + def setup_proxy(): """Configure proxy settings from environment variables.""" proxy_host = os.getenv("PROXY_HOST") @@ -20,30 +57,6 @@ def setup_proxy(): return None -async def setup_storage_state(): - """Setup browser storage state for session persistence.""" - # Get the script directory to ensure correct path resolution - script_dir = Path(__file__).parent.parent.parent.parent - storage_state_path = script_dir / "data" / "storage_state.json" - storage_state_temp_path = script_dir / "data" / "storage_state_temp.json" - - print(f"📂 Storage state path: {storage_state_path}") - print(f"📂 Temp storage state path: {storage_state_temp_path}") - - if storage_state_path.exists(): - if storage_state_temp_path.exists(): - storage_state_temp_path.unlink() - - storage_state_temp_path.write_text( - storage_state_path.read_text(encoding="utf-8"), encoding="utf-8" - ) - print(f"🔄 Using existing storage state: {storage_state_temp_path}") - return str(storage_state_temp_path) - - print("⚠️ No existing storage state found") - return None - - def get_browser_args(): """Get browser arguments for enhanced compatibility and security.""" return [ diff --git a/src/lib/browser_use/init_profile.py b/src/lib/browser_use/init_profile.py new file mode 100644 index 0000000..dd3f3b8 --- /dev/null +++ b/src/lib/browser_use/init_profile.py @@ -0,0 +1,46 @@ +import os +from lib.browser_use.func import * + +# Initialize configuration +proxy_url = setup_proxy() + +async def GetProfile(): + storage_state_path = await setup_storage_state() + + # Handle potential encoding issues with storage state file + try: + if storage_state_path and os.path.exists(storage_state_path): + # Test if file can be read properly, if not, skip it + with open(storage_state_path, 'r', encoding='utf-8') as f: + f.read() + storage_state = storage_state_path + else: + print("⚠️ Storage state file not found or inaccessible, proceeding without it.") + storage_state = None + except (UnicodeDecodeError, FileNotFoundError): + # If there's an encoding error, don't use the storage state + storage_state = None + + profile = BrowserProfile( + # Security settings + disable_security=True, + stealth=True, + + # Display settings + headless=False, + device_scale_factor=1, + window_size={"width": 1600, "height": 900}, + viewport={"width": 1600, "height": 900}, + + # Data persistence + user_data_dir=None, + storage_state=storage_state, + + # Network settings + proxy={"server": proxy_url} if proxy_url else None, + + # Additional arguments + args=get_browser_args(), + ) + + return profile diff --git a/src/lib/browser_use/model.py b/src/lib/browser_use/model.py new file mode 100644 index 0000000..6a1178f --- /dev/null +++ b/src/lib/browser_use/model.py @@ -0,0 +1,15 @@ +from typing import List +from pydantic import BaseModel + +# 출력 모델 +class OAuth(BaseModel): + provider: str + oauth_uri: str = "" # OAuth 리스트 추출 단계에서는 URI가 없을 수 있음 + + +class OAuthList(BaseModel): + oauth_providers: List[str] # 이제 문자열 배열로 변경 + + +# 기존 모델 유지 (backward compatibility) +BaseModel = OAuthList \ No newline at end of file diff --git a/src/lib/browser_use/scanner.py b/src/lib/browser_use/scanner.py new file mode 100644 index 0000000..460adfc --- /dev/null +++ b/src/lib/browser_use/scanner.py @@ -0,0 +1,106 @@ +import asyncio +import os +import csv + +from lib.utils import notify_backend, read_lines_between, is_html_url +from lib.browser_use.agents import extract_oauth_list, test_oauth_login +from lib.utils.progress import current_progress, load_progress, save_progress, progress_file + +async def scan_one_url(url: str, skip_html_check: bool = False): + """URL 스캔 통합 함수: OAuth 리스트 추출 → 개별 OAuth 로그인 시도""" + target_url = url if url.startswith("http") else f"https://{url}" + print(f"🚀 스캔 시작: {target_url}") + + # Backend에 스캔 시작을 알림 + notify_backend(target_url) + + # 1) URL이 HTML 페이지인지 확인 + if not is_html_url(target_url) and not skip_html_check: + print(f"❌ {target_url} 은(는) HTML이 아닙니다. 스킵합니다.") + return + + # 1단계: OAuth 리스트 추출 + oauth_entries = await extract_oauth_list(target_url) + + if not oauth_entries: + print(f"❌ {target_url}에서 OAuth 제공자를 찾을 수 없습니다.") + return + + print("-" * 50) + print(f"🔗 스캔 URL: {url}") + print(f"🔐 발견된 OAuth 제공자들: {len(oauth_entries)}개") + for entry in oauth_entries: + print(f" - {entry.provider}") + print("-" * 50) + + # CSV에 OAuth 리스트 저장 + csv_file = "./data/oauth_providers.csv" + file_exists = os.path.isfile(csv_file) + with open(csv_file, "a", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + if not file_exists: + writer.writerow(["issuer", "provider", "oauth_uri", "login_tested"]) + for entry in oauth_entries: + writer.writerow([url, entry.provider, "", "pending"]) + + # 2단계: 각 OAuth 제공자별로 개별 로그인 시도 + for i, oauth_entry in enumerate(oauth_entries): + print( + f"\n🔄 OAuth 로그인 테스트 {i+1}/{len(oauth_entries)}: {oauth_entry.provider}" + ) + + # OAuth 간 대기 시간 + if i > 0: + print("⏳ OAuth 테스트 간 대기 중 (30초)...") + await asyncio.sleep(30) + + # 개별 OAuth 로그인 시도 + success = await test_oauth_login(url, oauth_entry.provider) + + # 결과를 CSV에 업데이트 (간단하게 로그만 남김) + status = "success" if success else "failed" + print(f"📝 {oauth_entry.provider} 로그인 결과: {status}") + + +async def main_loop( + filepath: str, start_line: int, end_line: int, skip_html_check: bool = False +): + """지정된 URL 목록에 대해 스캔을 실행하는 메인 루프""" + target_list = read_lines_between( + filepath=filepath, start_line=start_line, end_line=end_line + ) + + current_progress["total"] = len(target_list) + current_progress["start_line"] = start_line + current_progress["current_index"] = 0 + + prev_progress = load_progress() + if prev_progress and prev_progress.get("start_line") == start_line: + print("📋 이전 진행 상황을 발견했습니다:") + print(f" - 이전 완료: {prev_progress['current_index']}/{prev_progress['total']}") + print(f" - 마지막 처리: {prev_progress.get('current_url', 'N/A')}") + + resume = input("이어서 진행하시겠습니까? (y/n): ").lower().strip() + if resume == 'y': + start_index = prev_progress.get("current_index", 0) + current_progress["current_index"] = start_index + target_list = target_list[start_index:] + print(f"✅ {start_index}번째부터 재개합니다.") + + for i, url in enumerate(target_list): + actual_index = current_progress["current_index"] + i + current_progress["current_url"] = url + + print(f"\n🔄 Processing {actual_index + 1}/{current_progress['total']}: {url}") + print(f"📍 {os.path.basename(filepath)}의 {start_line + actual_index}번째 줄") + + if i > 0: + print("⏳ API 쿼터 보호를 위해 30초 대기 중...") + await asyncio.sleep(30) + + await scan_one_url(url, skip_html_check=skip_html_check) + + current_progress["current_index"] = actual_index + 1 + save_progress() + + print(f"\n🎉 모든 스캔이 완료되었습니다! ({current_progress['total']}개 URL)") \ No newline at end of file diff --git a/lib/utils/browser_use/sensitive_data.py b/src/lib/browser_use/sensitive_data.py similarity index 100% rename from lib/utils/browser_use/sensitive_data.py rename to src/lib/browser_use/sensitive_data.py diff --git a/src/lib/llm/__init__.py b/src/lib/llm/__init__.py new file mode 100644 index 0000000..9acc135 --- /dev/null +++ b/src/lib/llm/__init__.py @@ -0,0 +1,3 @@ +from lib.llm.create import * + +from lib.llm.prompt import * \ No newline at end of file diff --git a/lib/llm/__init__.py b/src/lib/llm/create.py similarity index 97% rename from lib/llm/__init__.py rename to src/lib/llm/create.py index 2c41b3f..4463eff 100644 --- a/lib/llm/__init__.py +++ b/src/lib/llm/create.py @@ -21,5 +21,5 @@ def CreateChatGoogleGenerativeAI(model: str): }, callbacks=[QuotaExhaustedHandler()], # API 호출 간격 조정 - temperature=0.1, + temperature=0.0, ) diff --git a/src/lib/llm/prompt/__init__.py b/src/lib/llm/prompt/__init__.py new file mode 100644 index 0000000..5fa38d2 --- /dev/null +++ b/src/lib/llm/prompt/__init__.py @@ -0,0 +1,18 @@ +# why this is isn't index +# 이 파일을 __init__.py로 만든 이유는 +# 굳이 이 짧은 코드를 파일을 하나 더 만드는게 코드의 가독성을 떨어뜨린다고 판단했기 때문입니다. + +def get_prompt(type:str) -> str: + """ + Prompt를 반환합니다. + + :param type: 'auth' {Auth List} 또는 'google' {OAuth Provider}, 'meta' {OAuth Provider}을 지정합니다. + :return: 해당하는 프롬프트 문자열 + """ + if type.lower() == "auth": + from lib.llm.prompt.auth_list import extract_oauth_list_prompt + return extract_oauth_list_prompt + + else: + from lib.llm.prompt.fallback import extend_planner_system_message + return extend_planner_system_message diff --git a/src/lib/llm/prompt/auth_list.py b/src/lib/llm/prompt/auth_list.py new file mode 100644 index 0000000..bea1212 --- /dev/null +++ b/src/lib/llm/prompt/auth_list.py @@ -0,0 +1,41 @@ +# @file purpose: This file contains the prompt for extracting a list of OAuth providers from a web page. +# OAuth 리스트 추출용 프롬프트 (클릭하지 않고 단순 식별만) +extract_oauth_list_prompt = f""" +🎯 목적: 주어진 초기 URL 내에서 **OAuth 로그인 Provider**를 찾아 아래 형식의 JSON으로 정리합니다. + +📌 작업 목표: +- Google, GitHub, Discord, Facebook, Apple, Microsoft, Twitter, LinkedIn 등 **OAuth 인증을 사용하는 외부 로그인 링크**에서 Provider 이름만 모두 수집합니다. +- 로그인 버튼, 링크 클릭 등을 통해 탐색을 진행할 수 있습니다. +- **같은 provider가 여러 번 나와도 하나만 저장**합니다. + +🛑 제한 사항: +- ❌ 로그인 입력창이나 이메일/비밀번호 입력 방식은 제외합니다. +- ❌ 검색 엔진, 사이트 외부 탐색은 금지합니다. +- ❌ URL 추측이나 직접 입력은 금지합니다. +- ❌ OAuth가 없는 경우 빈 배열 `[]`로 반환합니다. +- ❌ OAuth가 아닌 일반 로그인은 무시합니다. + +🔍 탐색 방법: +1. 초기 URL에 접속하여 **클라이언트용 로그인 페이지**로 진입합니다. +2. 페이지가 정상적으로 로드되었다고 가정합니다. +3. 'Continue with X', 'Continue with Google'... 등의 버튼이나 링크를 식별합니다. + + +🧾 출력 형식 (예시): + +```json +{{ + "oauth_providers": [ + "Google", + "GitHub", + "Discord" + ] +}} +``` + +📌 주의: + 결과가 없는 경우 빈 배열 `[]`로 반환합니다. + 정확한 provider 이름을 포함해 주세요. + +""" + diff --git a/lib/llm/prompt/__init__.py b/src/lib/llm/prompt/fallback.py similarity index 80% rename from lib/llm/prompt/__init__.py rename to src/lib/llm/prompt/fallback.py index 7f1c44c..d6954b3 100644 --- a/lib/llm/prompt/__init__.py +++ b/src/lib/llm/prompt/fallback.py @@ -1,8 +1,3 @@ -import os -from dotenv import load_dotenv - -load_dotenv(override=True) - # Extended planner prompt extend_planner_system_message = f""" 🎯 목적: 웹 자동화를 위한 **SSO 로그인 리디렉션 URL 수집** @@ -25,16 +20,7 @@ extend_planner_system_message = f""" - 🔒 CAPTCHA는 통과 가능 (해결하고 계속 진행) - ❗ 로그인 UI가 정상적으로 로드되지 않으면 중단 -📤 차단 시 즉시 반환: - -```json -[ - {{ - "provider": "Blocked", - "oauth_uri": "-" - }} -] -```` +📤 차단 시 즉시 종료 --- @@ -70,16 +56,14 @@ extend_planner_system_message = f""" 각 SSO 버튼에 대해 다음을 수행: 1. 버튼 클릭 -2. 🌐 페이지가 이동되면, **현재 주소창(URL)을 확인하여 리디렉션된 OAuth URL**을 `oauth_uri`로 저장 - → 예: `https://accounts.google.com/o/oauth2/auth?...` -3. ✅ 로그인 진행: +2. ✅ 로그인 진행: - 로그인 페이지에서 OAuth 인증을 완료합니다. - sign in with your username(email) x_username and password is x_password - 버튼같은게 안눌리면 새로고침을 해봐 - **로그인 완료 후 authorize 등 버튼이 있으면 클릭** - GitHub같은 경우 Authorize 버튼이 뜨는데 오래걸릴 수 있음, 기다려야 할 수도 있음 - 만약 버튼을 눌러도 반응이 없을 경우 새로고침을 한번 해주세요. - - 로그인 실패 시에는 다음 SSO 버튼을 클릭합니다. + - **OAuth Flow가 완료되면 (callback URL 도달 또는 인증 완료) 즉시 작업 종료** 4. 로그인이 성공하면 모두 쿠키를 삭제하고 다음 SSO 버튼을 클릭합니다. 5. 다음 SSO 버튼으로 반복 진행 @@ -90,19 +74,6 @@ chrome://settings/clearBrowserData에 들어가서 삭제해주세요. - ❌ 버튼 클릭 후 페이지 로딩만 기다리고 돌아가기 - ❌ URL 저장 없이 go_back() 호출 -📤 각 로그인 후 다음 형식으로 결과 저장: - -```json -[ - {{ - "provider": "Google", - "oauth_uri": "https://example.com/auth/google?client_id=..." - }} -] -```` - -```` - --- ### ✨ 추가 안전 장치: "뒤로가기(go_back) 호출 조건" 제한 @@ -121,11 +92,7 @@ chrome://settings/clearBrowserData에 들어가서 삭제해주세요. * 유효한 SSO 버튼이 **전혀 없을 경우** * 예외, 오류 등 발생 시 -📤 즉시 중단 후 다음 형식으로 반환: - -```json -[] -``` +-> 즉시 중단 --- @@ -134,9 +101,8 @@ chrome://settings/clearBrowserData에 들어가서 삭제해주세요. * ✅ **모든 SSO 로그인은 반드시 실행** (가능한 버튼은 모두 클릭) * 🔁 단계는 반드시 순서대로 진행 * 🔐 로그인은 쿠키/세션으로 유지된 상태에서 수행 -* 🚫 직접 ID/PW 입력하지 않음 -* ⛔ 추측 URL 클릭 금지 -* ❗ 예외 발생 시 반드시 규정된 JSON 포맷만 반환 +* 👀 직접 OAuth Providor ID/PW를 입력하여도 됨 가지고 있다면 +* ⛔ 추측한 URL은 접속하지 않음 --- -""" \ No newline at end of file +""" diff --git a/src/lib/utils/__init__.py b/src/lib/utils/__init__.py new file mode 100644 index 0000000..4a068ce --- /dev/null +++ b/src/lib/utils/__init__.py @@ -0,0 +1,7 @@ +# export from show_info + +from lib.utils.agent_info import * +from lib.utils.data import * +from lib.utils.config import * +from lib.utils.parsing.is_html import * +from lib.utils.parsing.read_txt import * diff --git a/lib/utils/__init__.py b/src/lib/utils/agent_info.py similarity index 51% rename from lib/utils/__init__.py rename to src/lib/utils/agent_info.py index d2f3a8a..ea56116 100644 --- a/lib/utils/__init__.py +++ b/src/lib/utils/agent_info.py @@ -4,7 +4,9 @@ from lib.utils.config import ( GOOGLE_MODEL, GOOGLE_PLANNER_MODEL, ) - +import os +from dotenv import load_dotenv +load_dotenv(override=True) def show_info(): print("🔧 환경 설정:") @@ -38,3 +40,19 @@ def browser_use_version(): def env_cheker(): if GOOGLE_API_KEY is None: raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.") + if GOOGLE_PLANNER_MODEL != None and (not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN") or not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LIST")): + print( + "⚠️ GOOGLE_PLANNER_MODEL이 설정되어 있지만, ENABLE_PLANNER_MODEL_OAUTH_LOGIN 또는 ENABLE_PLANNER_MODEL_OAUTH_LIST가 활성화되지 않았습니다." + ) + print( + "⚠️ Planner 모델을 사용하려면 .env 파일에서 ENABLE_PLANNER_MODEL_OAUTH_LOGIN과 ENABLE_PLANNER_MODEL_OAUTH_LIST를 true로 설정하세요." + ) + print( + "‼️ 하지만 현재 Planner 모델을 사용하는 것이 권장되지 않습니다. 이 기능은 오작동을 일으킬 수 있습니다." + ) + print( + "⚠️ 이 경고는 1초동안 정지합니다." + ) + # 이 경고는 1초동안 sleep + import time + time.sleep(1) diff --git a/lib/utils/config.py b/src/lib/utils/config.py similarity index 54% rename from lib/utils/config.py rename to src/lib/utils/config.py index 9d1d5ac..9066ad6 100644 --- a/lib/utils/config.py +++ b/src/lib/utils/config.py @@ -4,5 +4,5 @@ load_dotenv(verbose=True, override=True) BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:11081") GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") -GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash-preview-05-20") -GOOGLE_PLANNER_MODEL = os.getenv("GOOGLE_PLANNER_MODEL", "gemini-2.5-pro-preview-06-05") \ No newline at end of file +GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash") +GOOGLE_PLANNER_MODEL = os.getenv("GOOGLE_PLANNER_MODEL") \ No newline at end of file diff --git a/src/lib/utils/data/__init__.py b/src/lib/utils/data/__init__.py new file mode 100644 index 0000000..1be0b0a --- /dev/null +++ b/src/lib/utils/data/__init__.py @@ -0,0 +1,2 @@ +from lib.utils.data.backend_client import * +from lib.utils.data.logger import * diff --git a/lib/utils/backend_client.py b/src/lib/utils/data/backend_client.py similarity index 100% rename from lib/utils/backend_client.py rename to src/lib/utils/data/backend_client.py diff --git a/lib/utils/logger.py b/src/lib/utils/data/logger.py similarity index 100% rename from lib/utils/logger.py rename to src/lib/utils/data/logger.py diff --git a/lib/utils/is_html.py b/src/lib/utils/parsing/is_html.py similarity index 100% rename from lib/utils/is_html.py rename to src/lib/utils/parsing/is_html.py diff --git a/lib/utils/read_txt.py b/src/lib/utils/parsing/read_txt.py similarity index 100% rename from lib/utils/read_txt.py rename to src/lib/utils/parsing/read_txt.py diff --git a/src/lib/utils/progress.py b/src/lib/utils/progress.py new file mode 100644 index 0000000..1803ad4 --- /dev/null +++ b/src/lib/utils/progress.py @@ -0,0 +1,48 @@ +import json +import os +import signal +from pathlib import Path + +# 진행 상황 추적을 위한 전역 변수 +current_progress = {"current_index": 0, "total": 0, "current_url": "", "start_line": 0} +progress_file = Path("data/scan_progress.json") + +def save_progress(): + """현재 진행 상황을 파일에 저장""" + progress_file.parent.mkdir(parents=True, exist_ok=True) + with open(progress_file, "w", encoding="utf-8") as f: + json.dump(current_progress, f, ensure_ascii=False, indent=2) + +def load_progress(): + """이전 진행 상황을 파일에서 불러오기""" + if os.path.exists(progress_file): + try: + with open(progress_file, "r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return None + return None + +def signal_handler(signum, frame): + """Ctrl+C 시그널 핸들러""" + print("\n" + "=" * 60) + print("🛑 스캔이 중단되었습니다!") + print(f"📊 진행 상황:") + print(f" - 전체: {current_progress['total']}개 URL") + print(f" - 완료: {current_progress['current_index']}개 URL") + print(f" - 현재 처리 중: {current_progress['current_url']}") + print( + f" - domains.txt의 {current_progress['start_line'] + current_progress['current_index']}번째 줄" + ) + if current_progress['total'] > 0: + print( + f" - 진행률: {current_progress['current_index']}/{current_progress['total']} ({current_progress['current_index']/current_progress['total']*100:.1f}%)" + ) + print("=" * 60) + save_progress() + print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.") + exit(0) + +def setup_signal_handler(): + """시그널 핸들러 등록""" + signal.signal(signal.SIGINT, signal_handler) \ No newline at end of file diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..c692f28 --- /dev/null +++ b/src/main.py @@ -0,0 +1,79 @@ +import asyncio +import argparse +import os +from dotenv import load_dotenv + +from lib.utils import env_cheker +from lib.browser_use.scanner import main_loop +from lib.utils.progress import setup_signal_handler, progress_file + +# .env 파일 로드 +load_dotenv(verbose=True, override=True) + +# 환경 변수 체크 +env_cheker() + +# Laminar 초기화 (선택적) +if os.getenv("LMNR_PROJECT_API_KEY"): + try: + from lmnr import Laminar + Laminar.initialize(project_api_key=os.getenv("LMNR_PROJECT_API_KEY")) + except ImportError: + print("⚠️ Laminar 라이브러리가 설치되지 않았습니다. 관련 기능이 비활성화됩니다.") + + +def main(): + """애플리케이션 메인 진입점""" + # 시그널 핸들러 설정 + setup_signal_handler() + + parser = argparse.ArgumentParser( + prog="domain_scanner", + description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다.", + ) + + parser.add_argument( + "-f", + "--file", + type=str, + required=True, + help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)", + ) + parser.add_argument( + "-s", "--start", type=int, required=True, help="읽기 시작 줄 번호 (1-based)" + ) + parser.add_argument( + "-e", "--end", type=int, required=True, help="읽기 종료 줄 번호 (1-based)" + ) + parser.add_argument( + "-skh", + "--skip-html-check", + action='store_true', # 플래그 형식으로 변경 + help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다.", + ) + + args = parser.parse_args() + + try: + asyncio.run( + main_loop( + filepath=args.file, + start_line=args.start, + end_line=args.end, + skip_html_check=args.skip_html_check, + ) + ) + except KeyboardInterrupt: + # signal_handler가 처리하므로 여기서는 별도 처리 불필요 + pass + finally: + # 정상 종료 시 진행 상황 파일 삭제 + if os.path.exists(progress_file): + try: + os.remove(progress_file) + except OSError as e: + print(f"오류: 진행 상황 파일을 삭제하지 못했습니다. {e}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/temp.md b/temp.md deleted file mode 100644 index adc7749..0000000 --- a/temp.md +++ /dev/null @@ -1,52 +0,0 @@ - -You are an AI model specialized in web crawling and analysis. Given a URI, perform the following tasks: - -1. Navigate to the provided URI and locate the login page. If it’s not found, explore common auth-related pages like /login or /auth. -2. On the login page, identify all available social login buttons (OAuth-based) such as Google, GitHub, Facebook, etc. -3. Simulate clicking each social login button and follow the redirect to capture the full redirect URL (including query parameters). -4. From the redirect URL and parameters, extract: - - `client_id` - - `redirect_uri` - - `response_type` - - `scope` -5. Based on URL patterns, infer the OAuth method: Authorization Code, Implicit, PKCE, etc. -6. Return data in the following JSON format only: - -```json -{ - "oauths": [ - { - "issue": "", - "oauth_uri": "" - } - ] -} -```` - -7. If the login button says something like "Login with GitHub" or "Login with Google", follow the flow and use the **final redirect URL after clicking** as the value of `oauth_uri`. - -**Examples:** - -```json -{ - "oauths": [ - { - "issue": "git.imnya.ng", - "provider": "GitHub", - "client_id": "Iv1.xxxxx", - "redirect_uri": "https://git.imnya.ng/user/oauth2/callback", - "response_type": "code", - "scope": "read:user", - "oauth_uri": "https://github.com/login/oauth/authorize?client_id=Iv1.xxxxx&redirect_uri=https%3A%2F%2Fgit.imnya.ng%2Fuser%2Foauth2%2Fcallback&response_type=code&scope=read%3Auser" - } - ] -} -``` - -**Constraints:** - -* Simulate realistic interaction with buttons (e.g., clicking them to follow redirects). -* Ensure the output is strictly in the specified JSON format. -* Avoid any additional text or explanations outside the JSON response. -* If no OAuth logins are found, return an empty array. -* WebAuthn, PassKey is not OAuth, so do not include it in the results. \ No newline at end of file diff --git a/uv.lock b/uv.lock index c125c7a..02e9d9b 100644 --- a/uv.lock +++ b/uv.lock @@ -144,6 +144,7 @@ version = "0.1.0" source = { virtual = "." } dependencies = [ { name = "browser-use", extra = ["memory"] }, + { name = "chardet" }, { name = "lmnr", extra = ["all"] }, { name = "patchright" }, ] @@ -151,6 +152,7 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "browser-use", extras = ["memory"], specifier = "==0.3.2" }, + { name = "chardet", specifier = ">=5.2.0" }, { name = "lmnr", extras = ["all"], specifier = ">=0.6.10" }, { name = "patchright", specifier = ">=1.52.5" }, ] @@ -211,6 +213,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7c/fc/6a8cb64e5f0324877d503c854da15d76c1e50eb722e320b15345c4d0c6de/cffi-1.17.1-cp313-cp313-win_amd64.whl", hash = "sha256:f6a16c31041f09ead72d69f583767292f750d24913dadacf5756b966aacb3f1a", size = 182009, upload-time = "2024-09-04T20:44:45.309Z" }, ] +[[package]] +name = "chardet" +version = "5.2.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f3/0d/f7b6ab21ec75897ed80c17d79b15951a719226b9fababf1e40ea74d69079/chardet-5.2.0.tar.gz", hash = "sha256:1b3b6ff479a8c414bc3fa2c0852995695c4a026dcd6d0633b2dd092ca39c1cf7", size = 2069618, upload-time = "2023-08-01T19:23:02.662Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/38/6f/f5fbc992a329ee4e0f288c1fe0e2ad9485ed064cac731ed2fe47dcc38cbf/chardet-5.2.0-py3-none-any.whl", hash = "sha256:e1cf59446890a00105fe7b7912492ea04b6e6f06d4b742b2c788469e34c82970", size = 199385, upload-time = "2023-08-01T19:23:00.661Z" }, +] + [[package]] name = "charset-normalizer" version = "3.4.2"