Merge pull request #20 from j93es/feat/split-agent

Agent 모듈화 (feat. 코드베이스 전체 변경)
This commit is contained in:
seungyeoncherry 2025-06-26 22:25:55 +09:00 committed by GitHub
commit d8ec21c61b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
37 changed files with 717 additions and 484 deletions

View file

@ -11,6 +11,9 @@ GOOGLE_MODEL=gemini-2.5-flash
INITIAL_BACKOFF=60
MAX_BACKOFF=600
#ENABLE_PLANNER_MODEL_OAUTH_LOGIN=true # OAuth 로그인 시 Planner 모델을 활성화합니다.
#ENABLE_PLANNER_MODEL_OAUTH_LIST=true # OAuth List를 찾을 때 Planner 모델을 활성화합니다.
# ========== Monitoring ==========
# 선택

3
.gitignore vendored
View file

@ -83,5 +83,6 @@ my.sh
log.txt
data/
!src/lib/utils/data
# End of https://www.toptal.com/developers/gitignore/api/macos,windows
# End of https://www.toptal.com/developers/gitignore/api/macos,windows

View file

@ -12,7 +12,7 @@
> 그렇지 않으면 실행되지 않습니다.
>
> 윈도우 환경에서는 `sudo certutil -addstore root mitmproxy-ca-cert.cer`로 인증합니다.
>
>
> Sudo가 활성화되어있지 않은 환경에서는 관리자로 상향된 쉘에서 실행합니다.
>
> MacOS 환경에서는 `sudo security add-trusted-cert -d -p ssl -p basic -k /Library/Keychains/System.keychain ~/.mitmproxy/mitmproxy-ca-cert.pem`으로 인증합니다.
@ -20,6 +20,10 @@
> 다른 플렛폼은 수동으로 설정되어야만 합니다.
> https://docs.mitmproxy.org/stable/concepts/certificates/
현재 아래와 같은 환경에서 개발되며 테스트되고 있습니다.
- ✅ MacOS 26 Tahoe Developer Beta 2 (25A5295e) en-US aarch64
- ✅ Windows 11 Pro for Workstations 24H2 (26100.4351) en-US x86_64
- ✅ NixOS 25.05.804570.c7ab75210cb8 KDE 6 / Linux 6.15 x86_64
---
다음과 같은 명령어로 환경을 설정합니다.
@ -48,7 +52,7 @@ venv와 패키지가 설치가 됩니다.
스텔스 기능 때문에 Google Chrome이 필요합니다.
만약 설치가 되어 있지 않다면
만약 설치가 되어 있지 않다면
```
playwright install chrome
```
@ -76,7 +80,7 @@ uv run playwright open https://google.com/ --save-storage=./data/storage_state.j
`.sensitive.example.json``.sensitive.json`으로 복사해서
안에 있는 예시 내용을 참고해서 작성해주시면 됩니다.
더 자세한 내용은
더 자세한 내용은
[Sensitive Data - Browser Use](https://docs.browser-use.com/customize/sensitive-data)를 참고하시면 좋을 것 같습니다.
[Sensitive Data - Browser Use](https://docs.browser-use.com/customize/sensitive-data)에서도 권장하지 않는 방법인만큼 애매하긴 하지만 쿠키와 로컬 스토리지를 저장하기 어려운 경우나 일부 flow에서 접근이 어려운 경우 사용해주세요.
@ -86,6 +90,14 @@ uv run playwright open https://google.com/ --save-storage=./data/storage_state.j
---
# 윈도우 인코딩 이슈 해결
이거 해결 방법
![image](https://github.com/user-attachments/assets/01ca45c2-fda9-44fb-83fc-39daa7e52092)
![](./docs/encode.png)
이것도 setup.py 사용하면 반자동으로 할 수 있습니다.
못찾겠으면 intl.cpl 열어주세요.
# 실행
@ -96,10 +108,32 @@ curl "https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt" -o domains.txt
```
```sh
# uv run run.py {domains.txt 시작 줄} {domains.txt 끝 줄} {HTML 검사 Skip}
uv run run.py 12540 13000 False
# uv run run.py {domains.txt 시작 줄} {domains.txt 끝 줄} {--skh} {--no-download}
uv run run.py 1 100 --skh
```
# Prompt 확장 가이드
## 1. 파일 생성
`lib/llm/prompt` 폴더로
![](./docs/list.png)
fallback.py를 복사하여
원하는 프로바이더를 추가해줍니다. `ex) lib/llm/prompt/Google.py`
## 2. __init__.py 수정
![](./docs/guide.png)
Prompt에서 추가한 파일을 __init__.py에서 import합니다.
## 3. 파일 수정
생성한 파일에서 프롬프트를 수정합니다.
# 참고하면 좋을만한 것
- [ ] 일부 웹사이트는 사용자의 언어에 따라 OAuth 옵션을 바꾸기도 합니다.

BIN
docs/encode.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 643 KiB

BIN
docs/guide.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.6 MiB

BIN
docs/list.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.7 KiB

View file

@ -1,31 +0,0 @@
from lib.utils.browser_use.func import *
# Initialize configuration
proxy_url = setup_proxy()
# Create browser profile
async def GetProfile():
storage_state_path = await setup_storage_state()
profile = BrowserProfile(
# Security settings
disable_security=True,
stealth=True,
# Display settings
headless=False,
device_scale_factor=1,
window_size={"width": 1600, "height": 900},
viewport={"width": 1600, "height": 900},
# Data persistence
user_data_dir=None,
storage_state=storage_state_path,
# Network settings
proxy={"server": proxy_url} if proxy_url else None,
# Additional arguments
args=get_browser_args(),
)
return profile

View file

@ -1,11 +0,0 @@
from typing import List
from pydantic import BaseModel
# 출력 모델
class OAuth(BaseModel):
provider: str
oauth_uri: str
class OAuthList(BaseModel):
oauth_providers: List[OAuth]

306
main.py
View file

@ -1,306 +0,0 @@
import asyncio
import json
import os
import csv
import argparse
from pathlib import Path
import signal
from dotenv import load_dotenv
from browser_use import (
Agent,
BrowserSession,
Controller,
ActionResult,
)
from patchright.async_api import async_playwright as async_patchright, Page
from pydantic import BaseModel
from lib.utils import env_cheker
from lib.utils.backend_client import notify_backend
from lib.utils.browser_use import model
from lib.utils.browser_use.clean_resources import clean_resources
from lib.utils.browser_use.func import setup_storage_state
from lib.utils.browser_use.sensitive_data import GetSensitiveData
from lib.utils.config import BACKEND_URL, GOOGLE_MODEL, GOOGLE_PLANNER_MODEL
from lib.utils.is_html import is_html_url
from lib.utils.read_txt import read_lines_between
from lib.llm.prompt import extend_planner_system_message
from lib.utils.logger import logger
import lib.utils.browser_use as browser_use
from lib.llm import CreateChatGoogleGenerativeAI
load_dotenv(verbose=True, override=True)
# Exponential backoff settings
INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", "60")) # seconds
MAX_BACKOFF = int(os.getenv("MAX_BACKOFF", "600")) # seconds
# 진행 상황 추적을 위한 전역 변수
current_progress = {"current_index": 0, "total": 0, "current_url": "", "start_line": 0}
progress_file = Path("data/scan_progress.json")
env_cheker()
if os.getenv("LMNR_PROJECT_API_KEY"):
from lmnr import Laminar
Laminar.initialize(project_api_key=os.getenv("LMNR_PROJECT_API_KEY"))
def save_progress():
"""현재 진행 상황을 파일에 저장"""
with open(progress_file, "w", encoding="utf-8") as f:
json.dump(current_progress, f, ensure_ascii=False, indent=2)
def load_progress():
"""이전 진행 상황을 파일에서 불러오기"""
if os.path.exists(progress_file):
try:
with open(progress_file, "r", encoding="utf-8") as f:
return json.load(f)
except:
return None
return None
def signal_handler(signum, frame):
"""Ctrl+C 시그널 핸들러"""
print("\n" + "=" * 60)
print("🛑 스캔이 중단되었습니다!")
print(f"📊 진행 상황:")
print(f" - 전체: {current_progress['total']}개 URL")
print(f" - 완료: {current_progress['current_index']}개 URL")
print(f" - 현재 처리 중: {current_progress['current_url']}")
print(
f" - domains.txt의 {current_progress['start_line'] + current_progress['current_index']}번째 줄"
)
print(
f" - 진행률: {current_progress['current_index']}/{current_progress['total']} ({current_progress['current_index']/current_progress['total']*100:.1f}%)"
)
print("=" * 60)
save_progress()
print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.")
exit(0)
# 시그널 핸들러 등록
signal.signal(signal.SIGINT, signal_handler)
# ── URL별로 Browser를 새로 띄우는 함수 ──
async def scan_one_url(url: str, skip_html_check: bool = False):
await setup_storage_state()
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🚀 Starting scan for: {target_url}")
# 1) URL이 HTML 페이지인지 확인
if not is_html_url(target_url) and not skip_html_check:
print(f"{target_url} 은(는) HTML이 아닙니다. 스킵합니다.")
return
# Backend에 스캔 시작을 알림
notify_backend(target_url)
agent = None
session = None
try_cnt = 0
while True:
# BrowserSession에 profile 전달
session = BrowserSession(
playwright=(await async_patchright().start()),
browser_profile=await browser_use.GetProfile(),
)
# Agent 생성 및 실행 (단일 try-except with 백오프)
initial_actions = [{"open_tab": {"url": target_url}}]
controller = Controller(
output_model=model.BaseModel,
exclude_actions=["search_google", "unknown_action", "unkown"],
)
print("🤖 LLM 모델 초기화 및 스캔 시작...")
print("Available actions:", list(controller.registry.registry.actions.keys()))
try:
agent = Agent(
browser_session=session,
initial_actions=initial_actions,
sensitive_data=GetSensitiveData(),
task=(
"Navigate to the login page, identify all OAuth provider buttons (excluding Passkey), "
"and for each one: click the button, follow the full OAuth login flow as far as possible "
"with a real user account (without using a fake or non-existent account), and capture the "
"final redirect URL after login. Do not stop at just collecting the initial authorization URL—"
"actually perform the login step like a real user would. "
"If the OAuth buttons do not appear immediately, wait briefly to allow the page to load completely before proceeding. "
"Always log out before starting the login process, and make sure to attempt the login again from a clean state."
),
llm=CreateChatGoogleGenerativeAI(GOOGLE_MODEL),
planner_llm=(
CreateChatGoogleGenerativeAI(GOOGLE_PLANNER_MODEL)
if GOOGLE_PLANNER_MODEL
else None
),
controller=controller,
extend_planner_system_message=extend_planner_system_message,
)
response = await agent.run()
final_result = response.final_result()
if final_result is None:
raise ValueError("final_result()가 None을 반환했습니다.")
except Exception as e:
await clean_resources(agent, session)
# API 쿼터 문제인지 확인
if "ResourceExhausted" in str(e) or "429" in str(e):
wait = min(INITIAL_BACKOFF * (2**try_cnt), MAX_BACKOFF)
print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...")
await asyncio.sleep(wait)
try_cnt += 1
if try_cnt >= 3:
print(f"{url} 스캔 실패: API 쿼터 문제가 지속됩니다.")
logger(f"{url} 스캔 실패: API 쿼터 문제: {e}")
return
continue
# 일반 에러 처리
try_cnt += 1
if try_cnt >= 3:
print(f"{url} 스캔 실패: 에러: {e}")
logger(f"{url} 스캔 실패: 에러: {e}")
return
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
await asyncio.sleep(30)
continue
# 스캔 결과 처리
data = json.loads(final_result)
try:
oauth_entries = [model.OAuth(**entry) for entry in data["oauth_providers"]]
except Exception as e:
raise ValueError(f"결과 파싱 실패: {e}\n원본 결과: {final_result}")
print("-" * 50)
print(f"🔗 Scanned URL: {url}\n")
print("🔐 Detected OAuth Providers and URLs:")
for entry in oauth_entries:
if "<" in entry.oauth_uri or "..." in entry.oauth_uri:
print(
f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n"
)
else:
print(f"- {entry.provider}: {entry.oauth_uri}")
print("-" * 50)
# CSV에 저장 (append)
csv_file = "./data/oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri"])
for entry in oauth_entries:
writer.writerow([url, entry.provider, entry.oauth_uri])
await clean_resources(agent, session)
break
async def loop(
filepath: str, start_line: int, end_line: int, skip_html_check: bool = False
):
# 인자값으로 받은 파일 경로와 줄 범위를 통해 도메인 리스트 생성
target_list = read_lines_between(
filepath=filepath, start_line=start_line, end_line=end_line
)
# 진행 상황 초기화
current_progress["total"] = len(target_list)
current_progress["start_line"] = start_line
current_progress["current_index"] = 0
# 이전 진행 상황 확인
prev_progress = load_progress()
if prev_progress and prev_progress.get("start_line") == start_line:
print(f"📋 이전 진행 상황을 발견했습니다:")
print(
f" - 이전 완료: {prev_progress['current_index']}/{prev_progress['total']}"
)
print(f" - 마지막 처리: {prev_progress.get('current_url', 'N/A')}")
resume = input("이어서 진행하시겠습니까? (y/n): ").lower().strip()
if resume == "y":
current_progress["current_index"] = prev_progress["current_index"]
target_list = target_list[current_progress["current_index"] :]
print(f"{current_progress['current_index']}번째부터 재개합니다.")
# (필요하다면) 강제 설정이 필요한 경우, 아래 주석을 해제하여 target_list[0] 등을 덮어쓸 수 있습니다.
# target_list[0] = "velog.io"
for i, url in enumerate(target_list):
actual_index = current_progress["current_index"] + i
current_progress["current_url"] = url
current_progress["current_index"] = actual_index
print(f"\n🔄 Processing {actual_index + 1}/{current_progress['total']}: {url}")
print(f"📍 domains.txt의 {start_line + actual_index}번째 줄")
# URL들 사이에 API 쿼터 회복을 위한 대기 시간 추가
if actual_index > 0:
print("⏳ API 쿼터 보호를 위해 30초 대기 중...")
await asyncio.sleep(30)
await scan_one_url(url, skip_html_check=skip_html_check)
# 진행 상황 저장
current_progress["current_index"] = actual_index + 1
save_progress()
print(f"\n🎉 모든 스캔이 완료되었습니다! ({current_progress['total']}개 URL)")
# 완료 후 진행 상황 파일 삭제
if os.path.exists(progress_file):
os.remove(progress_file)
def main():
parser = argparse.ArgumentParser(
prog="domain_scanner",
description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다.",
)
# 커맨드라인 인자로 받을 옵션들 정의
parser.add_argument(
"-f",
"--file",
type=str,
required=True,
help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)",
)
parser.add_argument(
"-s", "--start", type=int, required=True, help="읽기 시작 줄 번호 (1-based)"
)
parser.add_argument(
"-e", "--end", type=int, required=True, help="읽기 종료 줄 번호 (1-based)"
)
parser.add_argument(
"-skh",
"--skip-html-check",
type=bool,
default=False,
help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다. (기본값: False)",
)
args = parser.parse_args()
# 인자값을 비동기 함수에 전달
asyncio.run(
loop(
filepath=args.file,
start_line=args.start,
end_line=args.end,
skip_html_check=args.skip_html_check,
)
)
if __name__ == "__main__":
main()

View file

@ -6,6 +6,7 @@ readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"browser-use[memory]==0.3.2",
"chardet>=5.2.0",
"lmnr[all]>=0.6.10",
"patchright>=1.52.5",
]

21
run.py
View file

@ -8,7 +8,7 @@ import argparse
#!/usr/bin/env python3
# ── 설정 부분 ──
PYTHON_SCRIPT = "main.py"
PYTHON_SCRIPT = "./src/main.py"
DOMAIN_FILE = "./data/domains.txt"
# ─────────────
@ -35,13 +35,16 @@ def run_script(start_line, end_line, skh_option):
print(f"[{current_time}] Processing lines {start_line} to {end_line}...")
try:
subprocess.run([
command = [
"uv", "run", PYTHON_SCRIPT,
"-f", DOMAIN_FILE,
"-s", str(start_line),
"-e", str(end_line),
"-skh", str(skh_option)
], check=True)
]
if skh_option:
command.append("--skip-html-check")
subprocess.run(command, check=True)
except subprocess.CalledProcessError:
print("Python 스크립트 실행 실패")
sys.exit(1)
@ -52,9 +55,9 @@ def main():
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
사용 예시:
python run.py 10000 11000 # 10000~11000 라인 처리
python run.py 10000 11000 --skh # SKH 옵션 활성화
python run.py 10000 11000 --no-download # 다운로드 생략
uv run run.py 10000 11000 # 10000~11000 라인 처리
uv run run.py 10000 11000 --skh # SKH 옵션 활성화
uv run run.py 10000 11000 --no-download # 다운로드 생략
"""
)
@ -70,8 +73,8 @@ def main():
print("라인 번호는 0 이상이어야 합니다.")
sys.exit(1)
if args.start_line >= args.end_line:
print("시작 라인은 종료 라인보다 아야 합니다.")
if args.start_line > args.end_line:
print("시작 라인은 종료 라인보다 크거나 같아야 합니다.")
sys.exit(1)
# 도메인 파일 다운로드

View file

@ -1,5 +1,6 @@
import os
import subprocess
import webbrowser
os.makedirs(os.path.dirname("./data"), exist_ok=True)
@ -9,7 +10,7 @@ def create_file_from_example(target: str, example: str) -> bool:
with open(example, 'r', encoding='utf-8') as example_file, \
open(target, 'w', encoding='utf-8') as target_file:
target_file.write(example_file.read())
os.startfile(target)
#os.startfile(target)
print(f"{target} 파일이 {example}에서 생성되었습니다.")
return True
else:
@ -36,6 +37,45 @@ def prompt_yes_no(message: str) -> bool:
print(message, end="")
return input().strip().lower() in ['y', 'yes']
def i_dont_like_windows():
# Windows인지 확인
if os.name != 'nt':
return
else:
# run (Get-ItemProperty "HKLM:\SYSTEM\CurrentControlSet\Control\Nls\CodePage").ACP
try:
result = subprocess.run(
['powershell', '-Command', '(Get-ItemProperty "HKLM:\\SYSTEM\\CurrentControlSet\\Control\\Nls\\CodePage").ACP'],
capture_output=True,
text=True,
check=True
)
acp = result.stdout.strip()
if acp == '65001':
print("현재 Active Code Page가 UTF-8로 설정되어 있습니다.")
return
else:
print("현재 Active Code Page가 UTF-8로 설정되어 있지 않습니다.")
except subprocess.CalledProcessError as e:
print(f"코드 페이지 확인 실패: {e}")
print("=======================================================")
print("\n⚠️ Windows에서는 인코딩 문제가 발생합니다.")
print("👉 엔터를 누르면 자동으로 intl.cpl이 열립니다.")
print("👉 자세한 내용은 README.md에서 \"윈도우 인코딩 해결\"을 참조해주세요.\n")
print("⚠️ 경고 : 이 작업은 윈도우에서 킹갓 대한민국의 프로그램들의 한글이 정상적으로 표시되지 않을 수 있습니다.")
# Pause
input("계속하려면 Enter 키를 누르세요...")
webbrowser.open('intl.cpl')
print("👉 intl.cpl가 열렸습니다.\n")
print("👉 관리자 옵션 -> 시스템 로켈 변경")
print("👀 Beta: 세계 언어 지원을 위해 Unicode UTF-8 사용")
print("👉 이 설정을 변경한 후, 시스템을 재시작하세요.\n")
print("⚠️ 이 작업은 시스템 언어 설정을 변경하므로 주의가 필요합니다.\n")
print("=======================================================")
input("계속하려면 Enter 키를 누르세요...")
def setup_storage():
print("\n🔧 쿠키와 로컬 스토리지를 설정하시겠습니까?")
@ -78,11 +118,15 @@ if __name__ == "__main__":
install_playwright_chrome()
print("=====================================================")
# 3. 쿠키와 로컬 스토리지 설정
# 3. Windows 인코딩 문제 해결
i_dont_like_windows()
print("=====================================================")
# 4. 쿠키와 로컬 스토리지 설정
setup_storage()
print("=====================================================")
# 4. .sensitive.json 생성
# 5. .sensitive.json 생성
setup_sensitive()
print("=====================================================")
print("🎉 초기 설정이 완료되었습니다! 이제 스크립트를 실행할 준비가 되었습니다.")

View file

@ -0,0 +1,7 @@
from lib.browser_use.clean_resources import *
from lib.browser_use.func import *
from lib.browser_use.model import *
from lib.browser_use.init_profile import *
from lib.browser_use.sensitive_data import *
from lib.browser_use.agents import *
from lib.browser_use.scanner import *

View file

@ -0,0 +1,167 @@
import asyncio
import os
import json
from browser_use import Agent, BrowserSession, Controller
from patchright.async_api import async_playwright as async_patchright
from lib.browser_use import (
GetProfile,
GetSensitiveData,
clean_resources,
)
from lib.utils import (
logger,
config,
)
from lib.llm import CreateChatGoogleGenerativeAI, get_prompt
import lib.browser_use.model as model
# Exponential backoff settings
INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", "60")) # seconds
MAX_BACKOFF = int(os.getenv("MAX_BACKOFF", "600")) # seconds
async def _run_agent_with_retry(agent_config):
"""Agent 실행을 위한 내부 헬퍼 함수 (재시도 로직 포함)"""
agent = None
session = None
try_cnt = 0
url = agent_config["url"]
while try_cnt < 3:
try:
session = BrowserSession(
playwright=(await async_patchright().start()),
browser_profile=await GetProfile(),
)
agent = Agent(
browser_session=session,
**agent_config["agent_params"]
)
response = await agent.run()
await clean_resources(agent, session)
return response
except Exception as e:
await clean_resources(agent, session)
if "ResourceExhausted" in str(e) or "429" in str(e):
wait = min(INITIAL_BACKOFF * (2**try_cnt), MAX_BACKOFF)
print(f"⚠️ API 쿼터 에러: {e}. {wait}초 대기 후 재시도합니다...")
await asyncio.sleep(wait)
try_cnt += 1
if try_cnt >= 3:
error_msg = f"API 쿼터 문제가 지속됩니다."
logger(f"{url} - {agent_config['log_context']} 실패: {error_msg}: {e}")
print(f"{url} - {agent_config['log_context']} 실패: {error_msg}")
return None
continue
# 일반 에러 처리
try_cnt += 1
if try_cnt >= 3:
error_msg = f"최대 재시도 횟수 초과."
logger(f"{url} - {agent_config['log_context']} 실패: {error_msg}: {e}")
print(f"{url} - {agent_config['log_context']} 실패: {error_msg}")
return None
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
await asyncio.sleep(30)
continue
return None
async def extract_oauth_list(url: str):
"""첫 번째 Agent: 로그인 페이지를 찾고 OAuth 리스트만 추출"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🔎 OAuth 리스트 추출 시작: {target_url}")
agent_config = {
"url": target_url,
"log_context": "OAuth 리스트 추출",
"agent_params": {
"initial_actions": [{"open_tab": {"url": target_url}}],
"sensitive_data": GetSensitiveData(),
"task": (
"Navigate to the login page and identify all OAuth provider buttons (excluding Passkey). "
"DO NOT click any OAuth buttons or attempt to login. "
"Just find and list all available OAuth providers with their button texts or provider names. "
"Return a list of OAuth providers found on the login page."
),
"llm": CreateChatGoogleGenerativeAI(config.GOOGLE_MODEL),
"planner_llm": (
CreateChatGoogleGenerativeAI(config.GOOGLE_PLANNER_MODEL)
if config.GOOGLE_PLANNER_MODEL
else None
),
"controller": Controller(
output_model=model.OAuthList,
exclude_actions=["search_google", "unknown_action", "unkown"],
),
"extend_planner_system_message": get_prompt("auth"),
}
}
response = await _run_agent_with_retry(agent_config)
if not response:
return []
final_result = response.final_result()
if not final_result:
print("OAuth 리스트 추출 결과가 없습니다.")
return []
try:
data = json.loads(final_result)
oauth_providers = data.get("oauth_providers", [])
return [model.OAuth(provider=provider) for provider in oauth_providers]
except (json.JSONDecodeError, KeyError) as e:
print(f"❌ 결과 파싱 실패: {e}")
logger(f"{url} 결과 파싱 실패: {final_result}")
return []
async def test_oauth_login(url: str, oauth_provider: str):
"""두 번째 Agent: 특정 OAuth 제공자로 로그인 시도"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🔐 {oauth_provider} 로그인 시작: {target_url}")
agent_config = {
"url": target_url,
"log_context": f"{oauth_provider} 로그인",
"agent_params": {
"initial_actions": [{"open_tab": {"url": target_url}}],
"sensitive_data": GetSensitiveData(),
"task": (
f"Navigate to the login page, find and click the {oauth_provider} OAuth button, "
f"then follow the complete OAuth login flow as far as possible with a real user account. "
f"Capture the final redirect URL after login completion. "
f"If login fails or encounters errors, report the issue. "
f"Focus only on {oauth_provider} - ignore other OAuth providers."
),
"llm": CreateChatGoogleGenerativeAI(config.GOOGLE_MODEL),
"planner_llm": (
CreateChatGoogleGenerativeAI(config.GOOGLE_PLANNER_MODEL)
if config.GOOGLE_PLANNER_MODEL and os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN")
else None
),
"controller": Controller(
exclude_actions=["search_google", "unknown_action", "unkown"],
),
"extend_planner_system_message": get_prompt(oauth_provider),
}
}
response = await _run_agent_with_retry(agent_config)
if response and response.final_result():
final_result = response.final_result()
print(f"{oauth_provider} 로그인 완료")
logger(f"{url} - {oauth_provider} 로그인 결과: {final_result}")
return True
print(f"{oauth_provider} 로그인 실패")
return False

View file

@ -1,11 +1,48 @@
import os
import json
from pathlib import Path
from dotenv import load_dotenv
from browser_use import BrowserProfile
import json
import os
# Load environment variables
load_dotenv(override=True)
async def setup_storage_state():
"""Setup browser storage state for session persistence."""
# Get the script directory to ensure correct path resolution
script_dir = Path(__file__).parent.parent.parent.parent
storage_state_path = script_dir / "data" / "storage_state.json"
storage_state_temp_path = script_dir / "data" / "storage_state_temp.json"
print(f"📂 Storage state path: {storage_state_path}")
print(f"📂 Temp storage state path: {storage_state_temp_path}")
if storage_state_path.exists():
try:
if storage_state_temp_path.exists():
storage_state_temp_path.unlink()
with open(storage_state_path, 'r') as f:
storage_data = json.load(f)
with open(storage_state_temp_path, 'w') as f:
json.dump(storage_data, f, indent=4)
print(f"🔄 Using existing storage state: {storage_state_temp_path}")
return str(storage_state_temp_path)
except Exception as e:
print(f"⚠️ Error processing storage state: {e}")
if storage_state_temp_path.exists():
storage_state_temp_path.unlink()
return None
print("⚠️ No existing storage state found")
return None
def setup_proxy():
"""Configure proxy settings from environment variables."""
proxy_host = os.getenv("PROXY_HOST")
@ -20,30 +57,6 @@ def setup_proxy():
return None
async def setup_storage_state():
"""Setup browser storage state for session persistence."""
# Get the script directory to ensure correct path resolution
script_dir = Path(__file__).parent.parent.parent.parent
storage_state_path = script_dir / "data" / "storage_state.json"
storage_state_temp_path = script_dir / "data" / "storage_state_temp.json"
print(f"📂 Storage state path: {storage_state_path}")
print(f"📂 Temp storage state path: {storage_state_temp_path}")
if storage_state_path.exists():
if storage_state_temp_path.exists():
storage_state_temp_path.unlink()
storage_state_temp_path.write_text(
storage_state_path.read_text(encoding="utf-8"), encoding="utf-8"
)
print(f"🔄 Using existing storage state: {storage_state_temp_path}")
return str(storage_state_temp_path)
print("⚠️ No existing storage state found")
return None
def get_browser_args():
"""Get browser arguments for enhanced compatibility and security."""
return [

View file

@ -0,0 +1,46 @@
import os
from lib.browser_use.func import *
# Initialize configuration
proxy_url = setup_proxy()
async def GetProfile():
storage_state_path = await setup_storage_state()
# Handle potential encoding issues with storage state file
try:
if storage_state_path and os.path.exists(storage_state_path):
# Test if file can be read properly, if not, skip it
with open(storage_state_path, 'r', encoding='utf-8') as f:
f.read()
storage_state = storage_state_path
else:
print("⚠️ Storage state file not found or inaccessible, proceeding without it.")
storage_state = None
except (UnicodeDecodeError, FileNotFoundError):
# If there's an encoding error, don't use the storage state
storage_state = None
profile = BrowserProfile(
# Security settings
disable_security=True,
stealth=True,
# Display settings
headless=False,
device_scale_factor=1,
window_size={"width": 1600, "height": 900},
viewport={"width": 1600, "height": 900},
# Data persistence
user_data_dir=None,
storage_state=storage_state,
# Network settings
proxy={"server": proxy_url} if proxy_url else None,
# Additional arguments
args=get_browser_args(),
)
return profile

View file

@ -0,0 +1,15 @@
from typing import List
from pydantic import BaseModel
# 출력 모델
class OAuth(BaseModel):
provider: str
oauth_uri: str = "" # OAuth 리스트 추출 단계에서는 URI가 없을 수 있음
class OAuthList(BaseModel):
oauth_providers: List[str] # 이제 문자열 배열로 변경
# 기존 모델 유지 (backward compatibility)
BaseModel = OAuthList

View file

@ -0,0 +1,106 @@
import asyncio
import os
import csv
from lib.utils import notify_backend, read_lines_between, is_html_url
from lib.browser_use.agents import extract_oauth_list, test_oauth_login
from lib.utils.progress import current_progress, load_progress, save_progress, progress_file
async def scan_one_url(url: str, skip_html_check: bool = False):
"""URL 스캔 통합 함수: OAuth 리스트 추출 → 개별 OAuth 로그인 시도"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🚀 스캔 시작: {target_url}")
# Backend에 스캔 시작을 알림
notify_backend(target_url)
# 1) URL이 HTML 페이지인지 확인
if not is_html_url(target_url) and not skip_html_check:
print(f"{target_url} 은(는) HTML이 아닙니다. 스킵합니다.")
return
# 1단계: OAuth 리스트 추출
oauth_entries = await extract_oauth_list(target_url)
if not oauth_entries:
print(f"{target_url}에서 OAuth 제공자를 찾을 수 없습니다.")
return
print("-" * 50)
print(f"🔗 스캔 URL: {url}")
print(f"🔐 발견된 OAuth 제공자들: {len(oauth_entries)}")
for entry in oauth_entries:
print(f" - {entry.provider}")
print("-" * 50)
# CSV에 OAuth 리스트 저장
csv_file = "./data/oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri", "login_tested"])
for entry in oauth_entries:
writer.writerow([url, entry.provider, "", "pending"])
# 2단계: 각 OAuth 제공자별로 개별 로그인 시도
for i, oauth_entry in enumerate(oauth_entries):
print(
f"\n🔄 OAuth 로그인 테스트 {i+1}/{len(oauth_entries)}: {oauth_entry.provider}"
)
# OAuth 간 대기 시간
if i > 0:
print("⏳ OAuth 테스트 간 대기 중 (30초)...")
await asyncio.sleep(30)
# 개별 OAuth 로그인 시도
success = await test_oauth_login(url, oauth_entry.provider)
# 결과를 CSV에 업데이트 (간단하게 로그만 남김)
status = "success" if success else "failed"
print(f"📝 {oauth_entry.provider} 로그인 결과: {status}")
async def main_loop(
filepath: str, start_line: int, end_line: int, skip_html_check: bool = False
):
"""지정된 URL 목록에 대해 스캔을 실행하는 메인 루프"""
target_list = read_lines_between(
filepath=filepath, start_line=start_line, end_line=end_line
)
current_progress["total"] = len(target_list)
current_progress["start_line"] = start_line
current_progress["current_index"] = 0
prev_progress = load_progress()
if prev_progress and prev_progress.get("start_line") == start_line:
print("📋 이전 진행 상황을 발견했습니다:")
print(f" - 이전 완료: {prev_progress['current_index']}/{prev_progress['total']}")
print(f" - 마지막 처리: {prev_progress.get('current_url', 'N/A')}")
resume = input("이어서 진행하시겠습니까? (y/n): ").lower().strip()
if resume == 'y':
start_index = prev_progress.get("current_index", 0)
current_progress["current_index"] = start_index
target_list = target_list[start_index:]
print(f"{start_index}번째부터 재개합니다.")
for i, url in enumerate(target_list):
actual_index = current_progress["current_index"] + i
current_progress["current_url"] = url
print(f"\n🔄 Processing {actual_index + 1}/{current_progress['total']}: {url}")
print(f"📍 {os.path.basename(filepath)}{start_line + actual_index}번째 줄")
if i > 0:
print("⏳ API 쿼터 보호를 위해 30초 대기 중...")
await asyncio.sleep(30)
await scan_one_url(url, skip_html_check=skip_html_check)
current_progress["current_index"] = actual_index + 1
save_progress()
print(f"\n🎉 모든 스캔이 완료되었습니다! ({current_progress['total']}개 URL)")

3
src/lib/llm/__init__.py Normal file
View file

@ -0,0 +1,3 @@
from lib.llm.create import *
from lib.llm.prompt import *

View file

@ -21,5 +21,5 @@ def CreateChatGoogleGenerativeAI(model: str):
},
callbacks=[QuotaExhaustedHandler()],
# API 호출 간격 조정
temperature=0.1,
temperature=0.0,
)

View file

@ -0,0 +1,18 @@
# why this is isn't index
# 이 파일을 __init__.py로 만든 이유는
# 굳이 이 짧은 코드를 파일을 하나 더 만드는게 코드의 가독성을 떨어뜨린다고 판단했기 때문입니다.
def get_prompt(type:str) -> str:
"""
Prompt를 반환합니다.
:param type: 'auth' {Auth List} 또는 'google' {OAuth Provider}, 'meta' {OAuth Provider} 지정합니다.
:return: 해당하는 프롬프트 문자열
"""
if type.lower() == "auth":
from lib.llm.prompt.auth_list import extract_oauth_list_prompt
return extract_oauth_list_prompt
else:
from lib.llm.prompt.fallback import extend_planner_system_message
return extend_planner_system_message

View file

@ -0,0 +1,41 @@
# @file purpose: This file contains the prompt for extracting a list of OAuth providers from a web page.
# OAuth 리스트 추출용 프롬프트 (클릭하지 않고 단순 식별만)
extract_oauth_list_prompt = f"""
🎯 목적: 주어진 초기 URL 내에서 **OAuth 로그인 Provider** 찾아 아래 형식의 JSON으로 정리합니다.
📌 작업 목표:
- Google, GitHub, Discord, Facebook, Apple, Microsoft, Twitter, LinkedIn **OAuth 인증을 사용하는 외부 로그인 링크**에서 Provider 이름만 모두 수집합니다.
- 로그인 버튼, 링크 클릭 등을 통해 탐색을 진행할 있습니다.
- **같은 provider가 여러 나와도 하나만 저장**합니다.
🛑 제한 사항:
- 로그인 입력창이나 이메일/비밀번호 입력 방식은 제외합니다.
- 검색 엔진, 사이트 외부 탐색은 금지합니다.
- URL 추측이나 직접 입력은 금지합니다.
- OAuth가 없는 경우 배열 `[]` 반환합니다.
- OAuth가 아닌 일반 로그인은 무시합니다.
🔍 탐색 방법:
1. 초기 URL에 접속하여 **클라이언트용 로그인 페이지** 진입합니다.
2. 페이지가 정상적으로 로드되었다고 가정합니다.
3. 'Continue with X', 'Continue with Google'... 등의 버튼이나 링크를 식별합니다.
🧾 출력 형식 (예시):
```json
{{
"oauth_providers": [
"Google",
"GitHub",
"Discord"
]
}}
```
📌 주의:
결과가 없는 경우 배열 `[]` 반환합니다.
정확한 provider 이름을 포함해 주세요.
"""

View file

@ -1,8 +1,3 @@
import os
from dotenv import load_dotenv
load_dotenv(override=True)
# Extended planner prompt
extend_planner_system_message = f"""
🎯 목적: 자동화를 위한 **SSO 로그인 리디렉션 URL 수집**
@ -25,16 +20,7 @@ extend_planner_system_message = f"""
- 🔒 CAPTCHA는 통과 가능 (해결하고 계속 진행)
- 로그인 UI가 정상적으로 로드되지 않으면 중단
📤 차단 즉시 반환:
```json
[
{{
"provider": "Blocked",
"oauth_uri": "-"
}}
]
````
📤 차단 즉시 종료
---
@ -70,16 +56,14 @@ extend_planner_system_message = f"""
SSO 버튼에 대해 다음을 수행:
1. 버튼 클릭
2. 🌐 페이지가 이동되면, **현재 주소창(URL) 확인하여 리디렉션된 OAuth URL** `oauth_uri` 저장
: `https://accounts.google.com/o/oauth2/auth?...`
3. 로그인 진행:
2. 로그인 진행:
- 로그인 페이지에서 OAuth 인증을 완료합니다.
- sign in with your username(email) x_username and password is x_password
- 버튼같은게 안눌리면 새로고침을 해봐
- **로그인 완료 authorize 버튼이 있으면 클릭**
- GitHub같은 경우 Authorize 버튼이 뜨는데 오래걸릴 있음, 기다려야 수도 있음
- 만약 버튼을 눌러도 반응이 없을 경우 새로고침을 한번 해주세요.
- 로그인 실패 시에는 다음 SSO 버튼을 클릭합니다.
- **OAuth Flow가 완료되면 (callback URL 도달 또는 인증 완료) 즉시 작업 종료**
4. 로그인이 성공하면 모두 쿠키를 삭제하고 다음 SSO 버튼을 클릭합니다.
5. 다음 SSO 버튼으로 반복 진행
@ -90,19 +74,6 @@ chrome://settings/clearBrowserData에 들어가서 삭제해주세요.
- 버튼 클릭 페이지 로딩만 기다리고 돌아가기
- URL 저장 없이 go_back() 호출
📤 로그인 다음 형식으로 결과 저장:
```json
[
{{
"provider": "Google",
"oauth_uri": "https://example.com/auth/google?client_id=..."
}}
]
````
````
---
### ✨ 추가 안전 장치: "뒤로가기(go_back) 호출 조건" 제한
@ -121,11 +92,7 @@ chrome://settings/clearBrowserData에 들어가서 삭제해주세요.
* 유효한 SSO 버튼이 **전혀 없을 경우**
* 예외, 오류 발생
📤 즉시 중단 다음 형식으로 반환:
```json
[]
```
-> 즉시 중단
---
@ -134,9 +101,8 @@ chrome://settings/clearBrowserData에 들어가서 삭제해주세요.
* **모든 SSO 로그인은 반드시 실행** (가능한 버튼은 모두 클릭)
* 🔁 단계는 반드시 순서대로 진행
* 🔐 로그인은 쿠키/세션으로 유지된 상태에서 수행
* 🚫 직접 ID/PW 입력하지 않음
* 추측 URL 클릭 금지
* 예외 발생 반드시 규정된 JSON 포맷만 반환
* 👀 직접 OAuth Providor ID/PW를 입력하여도 가지고 있다면
* 추측한 URL은 접속하지 않음
---
"""
"""

View file

@ -0,0 +1,7 @@
# export from show_info
from lib.utils.agent_info import *
from lib.utils.data import *
from lib.utils.config import *
from lib.utils.parsing.is_html import *
from lib.utils.parsing.read_txt import *

View file

@ -4,7 +4,9 @@ from lib.utils.config import (
GOOGLE_MODEL,
GOOGLE_PLANNER_MODEL,
)
import os
from dotenv import load_dotenv
load_dotenv(override=True)
def show_info():
print("🔧 환경 설정:")
@ -38,3 +40,19 @@ def browser_use_version():
def env_cheker():
if GOOGLE_API_KEY is None:
raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.")
if GOOGLE_PLANNER_MODEL != None and (not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN") or not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LIST")):
print(
"⚠️ GOOGLE_PLANNER_MODEL이 설정되어 있지만, ENABLE_PLANNER_MODEL_OAUTH_LOGIN 또는 ENABLE_PLANNER_MODEL_OAUTH_LIST가 활성화되지 않았습니다."
)
print(
"⚠️ Planner 모델을 사용하려면 .env 파일에서 ENABLE_PLANNER_MODEL_OAUTH_LOGIN과 ENABLE_PLANNER_MODEL_OAUTH_LIST를 true로 설정하세요."
)
print(
"‼️ 하지만 현재 Planner 모델을 사용하는 것이 권장되지 않습니다. 이 기능은 오작동을 일으킬 수 있습니다."
)
print(
"⚠️ 이 경고는 1초동안 정지합니다."
)
# 이 경고는 1초동안 sleep
import time
time.sleep(1)

View file

@ -4,5 +4,5 @@ load_dotenv(verbose=True, override=True)
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:11081")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash-preview-05-20")
GOOGLE_PLANNER_MODEL = os.getenv("GOOGLE_PLANNER_MODEL", "gemini-2.5-pro-preview-06-05")
GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash")
GOOGLE_PLANNER_MODEL = os.getenv("GOOGLE_PLANNER_MODEL")

View file

@ -0,0 +1,2 @@
from lib.utils.data.backend_client import *
from lib.utils.data.logger import *

48
src/lib/utils/progress.py Normal file
View file

@ -0,0 +1,48 @@
import json
import os
import signal
from pathlib import Path
# 진행 상황 추적을 위한 전역 변수
current_progress = {"current_index": 0, "total": 0, "current_url": "", "start_line": 0}
progress_file = Path("data/scan_progress.json")
def save_progress():
"""현재 진행 상황을 파일에 저장"""
progress_file.parent.mkdir(parents=True, exist_ok=True)
with open(progress_file, "w", encoding="utf-8") as f:
json.dump(current_progress, f, ensure_ascii=False, indent=2)
def load_progress():
"""이전 진행 상황을 파일에서 불러오기"""
if os.path.exists(progress_file):
try:
with open(progress_file, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return None
return None
def signal_handler(signum, frame):
"""Ctrl+C 시그널 핸들러"""
print("\n" + "=" * 60)
print("🛑 스캔이 중단되었습니다!")
print(f"📊 진행 상황:")
print(f" - 전체: {current_progress['total']}개 URL")
print(f" - 완료: {current_progress['current_index']}개 URL")
print(f" - 현재 처리 중: {current_progress['current_url']}")
print(
f" - domains.txt의 {current_progress['start_line'] + current_progress['current_index']}번째 줄"
)
if current_progress['total'] > 0:
print(
f" - 진행률: {current_progress['current_index']}/{current_progress['total']} ({current_progress['current_index']/current_progress['total']*100:.1f}%)"
)
print("=" * 60)
save_progress()
print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.")
exit(0)
def setup_signal_handler():
"""시그널 핸들러 등록"""
signal.signal(signal.SIGINT, signal_handler)

79
src/main.py Normal file
View file

@ -0,0 +1,79 @@
import asyncio
import argparse
import os
from dotenv import load_dotenv
from lib.utils import env_cheker
from lib.browser_use.scanner import main_loop
from lib.utils.progress import setup_signal_handler, progress_file
# .env 파일 로드
load_dotenv(verbose=True, override=True)
# 환경 변수 체크
env_cheker()
# Laminar 초기화 (선택적)
if os.getenv("LMNR_PROJECT_API_KEY"):
try:
from lmnr import Laminar
Laminar.initialize(project_api_key=os.getenv("LMNR_PROJECT_API_KEY"))
except ImportError:
print("⚠️ Laminar 라이브러리가 설치되지 않았습니다. 관련 기능이 비활성화됩니다.")
def main():
"""애플리케이션 메인 진입점"""
# 시그널 핸들러 설정
setup_signal_handler()
parser = argparse.ArgumentParser(
prog="domain_scanner",
description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다.",
)
parser.add_argument(
"-f",
"--file",
type=str,
required=True,
help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)",
)
parser.add_argument(
"-s", "--start", type=int, required=True, help="읽기 시작 줄 번호 (1-based)"
)
parser.add_argument(
"-e", "--end", type=int, required=True, help="읽기 종료 줄 번호 (1-based)"
)
parser.add_argument(
"-skh",
"--skip-html-check",
action='store_true', # 플래그 형식으로 변경
help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다.",
)
args = parser.parse_args()
try:
asyncio.run(
main_loop(
filepath=args.file,
start_line=args.start,
end_line=args.end,
skip_html_check=args.skip_html_check,
)
)
except KeyboardInterrupt:
# signal_handler가 처리하므로 여기서는 별도 처리 불필요
pass
finally:
# 정상 종료 시 진행 상황 파일 삭제
if os.path.exists(progress_file):
try:
os.remove(progress_file)
except OSError as e:
print(f"오류: 진행 상황 파일을 삭제하지 못했습니다. {e}")
if __name__ == "__main__":
main()

52
temp.md
View file

@ -1,52 +0,0 @@
You are an AI model specialized in web crawling and analysis. Given a URI, perform the following tasks:
1. Navigate to the provided URI and locate the login page. If its not found, explore common auth-related pages like /login or /auth.
2. On the login page, identify all available social login buttons (OAuth-based) such as Google, GitHub, Facebook, etc.
3. Simulate clicking each social login button and follow the redirect to capture the full redirect URL (including query parameters).
4. From the redirect URL and parameters, extract:
- `client_id`
- `redirect_uri`
- `response_type`
- `scope`
5. Based on URL patterns, infer the OAuth method: Authorization Code, Implicit, PKCE, etc.
6. Return data in the following JSON format only:
```json
{
"oauths": [
{
"issue": "<site being tested, e.g., git.imnya.ng>",
"oauth_uri": "<original button href or URL triggered>"
}
]
}
````
7. If the login button says something like "Login with GitHub" or "Login with Google", follow the flow and use the **final redirect URL after clicking** as the value of `oauth_uri`.
**Examples:**
```json
{
"oauths": [
{
"issue": "git.imnya.ng",
"provider": "GitHub",
"client_id": "Iv1.xxxxx",
"redirect_uri": "https://git.imnya.ng/user/oauth2/callback",
"response_type": "code",
"scope": "read:user",
"oauth_uri": "https://github.com/login/oauth/authorize?client_id=Iv1.xxxxx&redirect_uri=https%3A%2F%2Fgit.imnya.ng%2Fuser%2Foauth2%2Fcallback&response_type=code&scope=read%3Auser"
}
]
}
```
**Constraints:**
* Simulate realistic interaction with buttons (e.g., clicking them to follow redirects).
* Ensure the output is strictly in the specified JSON format.
* Avoid any additional text or explanations outside the JSON response.
* If no OAuth logins are found, return an empty array.
* WebAuthn, PassKey is not OAuth, so do not include it in the results.

11
uv.lock generated
View file

@ -144,6 +144,7 @@ version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "browser-use", extra = ["memory"] },
{ name = "chardet" },
{ name = "lmnr", extra = ["all"] },
{ name = "patchright" },
]
@ -151,6 +152,7 @@ dependencies = [
[package.metadata]
requires-dist = [
{ name = "browser-use", extras = ["memory"], specifier = "==0.3.2" },
{ name = "chardet", specifier = ">=5.2.0" },
{ name = "lmnr", extras = ["all"], specifier = ">=0.6.10" },
{ name = "patchright", specifier = ">=1.52.5" },
]
@ -211,6 +213,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/7c/fc/6a8cb64e5f0324877d503c854da15d76c1e50eb722e320b15345c4d0c6de/cffi-1.17.1-cp313-cp313-win_amd64.whl", hash = "sha256:f6a16c31041f09ead72d69f583767292f750d24913dadacf5756b966aacb3f1a", size = 182009, upload-time = "2024-09-04T20:44:45.309Z" },
]
[[package]]
name = "chardet"
version = "5.2.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f3/0d/f7b6ab21ec75897ed80c17d79b15951a719226b9fababf1e40ea74d69079/chardet-5.2.0.tar.gz", hash = "sha256:1b3b6ff479a8c414bc3fa2c0852995695c4a026dcd6d0633b2dd092ca39c1cf7", size = 2069618, upload-time = "2023-08-01T19:23:02.662Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/38/6f/f5fbc992a329ee4e0f288c1fe0e2ad9485ed064cac731ed2fe47dcc38cbf/chardet-5.2.0-py3-none-any.whl", hash = "sha256:e1cf59446890a00105fe7b7912492ea04b6e6f06d4b742b2c788469e34c82970", size = 199385, upload-time = "2023-08-01T19:23:00.661Z" },
]
[[package]]
name = "charset-normalizer"
version = "3.4.2"