Compare commits

..

83 commits

Author SHA1 Message Date
gyuu04
20578d7f7b
Merge pull request #39 from j93es/feat/some
Browser Use Proxy 미사용 문제 해결
2025-07-13 15:52:27 +09:00
2d3677fe4a ignore_default_args에서 주석 처리된 옵션 정리 2025-07-13 14:35:44 +09:00
ba1e81177b 코드 정리 및 개선: 파일 생성 로직 및 Playwright 설치 과정에서의 예외 처리 개선, 사용자 데이터 디렉토리 복사 시 로그 추가 2025-07-13 14:33:55 +09:00
9858d7acd2
Merge pull request #38 from j93es/feat/some
feat: Browser use v0.5.3 업데이트 및 프로세스 안정성 개선
2025-07-11 23:51:51 +09:00
d6803ad20e 브라우저 리소스 정리 및 종료 처리 개선, 진행 상황 저장 기능 추가 2025-07-11 18:37:31 +09:00
0f5ab6dea1 browser use 버전 업데이트 및 프롬프트 개선 및 임시 파일 삭제 구조 개선 2025-07-11 15:51:29 +09:00
gyuu04
657d5370b9
Merge pull request #35 from j93es/feat/github
Github 프롬프트 테스트 완료
2025-07-10 13:55:26 +09:00
gyuu04
628c994f22
Merge pull request #34 from j93es/feat/microsoft
microsoft prompt 정교화
2025-07-10 13:55:14 +09:00
sultanofdisco
a8165e9cdc
Merge pull request #37 from j93es/fix-goolgle
프롬프트 수정
2025-07-09 12:51:48 +00:00
sultanofdisco
90f0487dd7
Merge pull request #36 from j93es/facebook-prompt
페이스북 프롬프트 테스트 완료
2025-07-09 12:51:25 +00:00
sultanofdisco
0904f282ff 프롬프트 수정 2025-07-09 19:06:07 +09:00
gyuu04
d01a78d442 B
페북 프롬프트 정교화

- agents.py에서 응답만 있으면 성공으로 처리하던 문제를 수정 -> status가 "success"인 경우만 성공으로 반환
- 페북 로그인 프롬프트 정교화
2025-07-09 17:04:45 +09:00
seungyeoncherry
52b10446f7 프롬프트 수정 2025-07-09 16:33:57 +09:00
tk
ff54e8d4e3 프롬프트 수정(테스트 O) 2025-07-08 18:37:21 +09:00
08e7e34b9f
Merge pull request #33 from j93es/feat/chromium
크로미움 사용, user data 복사 수정
2025-07-05 15:33:21 +09:00
1742afcb5e
Merge pull request #32 from j93es/0705
[Update] naver id
2025-07-05 15:29:37 +09:00
68eedc3fa4 Chromium 설치 메시지 수정 및 사용자 데이터 복사 로직 개선 2025-07-05 15:28:57 +09:00
2b805df001 env error 2025-07-05 15:14:17 +09:00
tv0924@icloud.com
264f29ffb9 [Update] naver id 2025-07-05 15:01:12 +09:00
김민곤
8d1aa3df1a
Merge pull request #31 from j93es/imnyang-patch-2
Update .env.example
2025-07-04 21:25:43 +09:00
952db8d2b9
Update .env.example 2025-07-04 21:24:47 +09:00
2fdd187f9a
Merge pull request #30 from j93es/feat/user-data
User Data로 세션 제어, 여러 문제 수정
2025-07-03 15:22:05 +09:00
a9f3bc7233 Browser Use Downgrade 2025-07-02 23:29:53 +09:00
735e90739c Merge branch 'main' into feat/user-data 2025-07-02 20:52:46 +09:00
72f784dec5 docs: 레거시 설치 및 설정 섹션 삭제 2025-07-02 20:34:06 +09:00
b3b5f05697
Update README.md 2025-07-02 20:30:13 +09:00
c5e0a8c2f1
Delete .legacy directory 2025-07-02 20:27:10 +09:00
c3a0132489
Delete docs directory 2025-07-02 20:26:40 +09:00
b45daebc81
Update README.md 2025-07-02 20:26:29 +09:00
f5ee676468 feat: 리팩토링, User Data 2025-07-02 20:18:01 +09:00
8cfb6488d1 feat: .env.example에 HEADLESS 옵션 추가 2025-07-02 20:17:44 +09:00
3199a53a44 refect: 코드 가독성 해결 2025-07-02 19:10:58 +09:00
146c187b05 refect: OAuth 프롬프트 및 모델 구조 개선 2025-07-02 19:02:43 +09:00
92eea9c0c9
Merge pull request #29 from j93es/temp/mingon
[FEAT]: env로 로그인 구현
2025-07-01 18:34:03 +09:00
KMINGON
b2aedf53db [FEAT]: env로 로그인 구현 2025-07-01 01:05:07 +09:00
65c865b620
Merge pull request #26 from j93es/feat/microsoft
Feat/microsoft
2025-06-30 22:18:34 +09:00
54e923ae95
Merge pull request #25 from j93es/feat/facebook
facebook prompt 추가
2025-06-30 22:18:28 +09:00
13a2798fa4
Merge pull request #24 from j93es/feat/github
github prompt 추가
2025-06-30 22:18:21 +09:00
3e2b598298
Merge pull request #28 from j93es/chore/hotfix
HotFix
2025-06-30 21:08:33 +09:00
18a575a8af [ADD] 재시도 큐 시스템 추가 및 관련 함수 구현 2025-06-30 21:01:01 +09:00
seungyeoncherry
1c43e63cca Merge branch 'main' of https://github.com/j93es/browser-use-oauth into feat/facebook 2025-06-30 17:54:00 +09:00
seungyeoncherry
d1737f27a3 Merge branch 'main' of https://github.com/j93es/browser-use-oauth into feat/github 2025-06-30 17:49:23 +09:00
tk
ba0bb61ef3 마소 프젝 계정 업데이트 2025-06-30 17:48:11 +09:00
tk
fe003013f4 파일 정리 2025-06-30 17:38:13 +09:00
seungyeoncherry
91c18e12c7 facebook prompt 추가 2025-06-30 17:20:50 +09:00
sultanofdisco
686333e75c
Merge pull request #23 from j93es/feat/apple
Feat/apple
2025-06-30 16:51:55 +09:00
seungyeoncherry
79c9dffa08 github prompt 추가 2025-06-30 16:39:24 +09:00
sultanofdisco
1d7d9c8ad5 Update prompt.py 2025-06-30 02:43:35 +09:00
sultanofdisco
812a26a103 Update prompt.py 2025-06-30 02:35:53 +09:00
sultanofdisco
b7df8cffcd Update .sensitive.example.json 2025-06-29 23:56:39 +09:00
sultanofdisco
bcca364021 [ADD]apple 프롬프트 추가 2025-06-29 22:37:36 +09:00
tk
e1c07c4a1e microsoft 제작 2025-06-29 21:05:07 +09:00
tk
b26c47d1ad Merge branch 'main' of https://github.com/j93es/browser-use-oauth into tk 2025-06-29 14:29:36 +09:00
2202a1a2d8
Merge pull request #22 from j93es/feat/refect
Breaking Changes: 코드 전체 리팩토링
2025-06-29 11:53:48 +09:00
bc9b598993 feat: Browser Use Update 2025-06-29 11:53:16 +09:00
353e98e28c feat: OAuth 제공자 추출 및 로그인 테스트 개선 2025-06-28 11:51:19 +09:00
tk
54682cdb72 feat/refect 가져오기 2025-06-27 23:06:35 +09:00
tk
ce70191d49 microsoft 추가 2025-06-27 22:01:59 +09:00
20601cec76 feat: 구글 로그인 프롬프트 및 모델 추가
Co-authored-by: James <j93es.jung@gmail.com>
2025-06-27 20:31:19 +09:00
70e8bdbbde feat: Prompt 구조 개선 2025-06-27 20:28:12 +09:00
seungyeoncherry
d8ec21c61b
Merge pull request #20 from j93es/feat/split-agent
Agent 모듈화 (feat. 코드베이스 전체 변경)
2025-06-26 22:25:55 +09:00
20ac0ccc06 chore: README.md 2025-06-26 22:13:06 +09:00
069dbf446d feat: 코드베이스 리팩터링
* `run.py`에서 `main.py` 경로를 명시적으로 지정하고, 명령줄 인자를 보다 사용하기 쉽게 조정했습니다.
* 에이전트, 리소스 정리, 공통 함수, 모델 등을 포함하는 브라우저 유틸리티용 신규 모듈 구조를 만들었습니다.
* `agents.py`에 비동기 에이전트 실행 및 재시도 로직을 구현했습니다.
* `scanner.py`에 OAuth URL 추출 및 로그인 테스트 기능을 추가했습니다.
* 전반적인 코드베이스에 걸쳐 에러 핸들링 및 로깅을 강화했습니다.
* 백엔드 URL과 Google API 키 등의 관리를 위한 환경변수 기반 설정 시스템을 도입했습니다.
* 스캐닝 중 진행 상태 추적 및 시그널 핸들링을 통한 정상 종료 처리를 개선했습니다.
* 텍스트 파일 읽기 및 HTML 콘텐츠 여부 확인을 위한 유틸리티 함수를 추가했습니다.
* LLM과의 상호작용을 위한 구조화된 프롬프트 시스템을 구축했습니다.
2025-06-26 21:44:31 +09:00
1ddc3c41bc
Update README.md 2025-06-25 22:58:22 +09:00
4261bdc7d6
Update README.md 2025-06-25 22:57:34 +09:00
5c816baf67 chore: 윈도우 인코딩 이슈 해결 2025-06-25 13:43:15 +09:00
3d55b6275e Update docs and remove unused prompt files
The commit updates documentation with a guide image and removes unused
prompt files for Google and Meta logins.
2025-06-24 23:57:38 +09:00
5535515dbd Merge branch 'feat/split-agent' of https://github.com/j93es/browser-use-oauth into feat/split-agent 2025-06-24 22:44:55 +09:00
c2e610ec54 temp_commit: 프롬프트 확장 가이드 추가 및 Google, Meta 프로바이더에 대한 SSO 로그인 리디렉션 URL 수집 로직 구현 2025-06-24 22:44:51 +09:00
32f2ce486e temp_commit: 프롬프트 확장 가이드 추가 및 Google, Meta 프로바이더에 대한 SSO 로그인 리디렉션 URL 수집 로직 구현 2025-06-24 22:44:09 +09:00
27192dab3a feat: LLM 및 유틸리티 모듈 구조 개선 및 불필요한 코드 제거 2025-06-24 22:25:28 +09:00
b0a4727e13 fix: get_prompt 함수에서 type 비교 시 대소문자 구분 없애기 2025-06-23 20:44:54 +09:00
a5d8d674b4 Merge branch 'main' of https://github.com/j93es/browser-use-oauth into feat/split-agent 2025-06-23 20:42:38 +09:00
9af67fbd49 fix: 로그인 차단 시 반환 형식 수정 및 즉시 종료로 변경 2025-06-23 20:39:28 +09:00
e85c128ed8 fix: SSO 로그인 절차 간소화 및 불필요한 JSON 반환 형식 제거 2025-06-23 20:39:00 +09:00
1767cd5861 feat: OAuth 리스트 추출 및 모델 수정, 파일 생성 및 설정 과정 개선
- Windows에서 storage_state 이슈가 일어납니다.
2025-06-23 20:38:02 +09:00
6ddeed2173 fix: 온도 설정 및 OAuth 제공자 식별 프롬프트 개선 2025-06-23 19:59:51 +09:00
e1319a108d chore: storage_state.json 관련해서 수정함
- Windows 싫어요
2025-06-23 19:19:56 +09:00
4b3637b762 feat: OAuth 리스트 추출 및 로그인 기능 개선
- README.md: uv 실행 명령어 수정
- lib/llm/prompt: OAuth 리스트 추출 및 fallback 프롬프트 추가
- lib/utils/browser_use: 프로필 생성 시 스토리지 상태 파일 처리 개선
- lib/utils/browser_use/func: 안전한 JSON 읽기 및 쓰기 함수 추가
- main.py: OAuth 리스트 추출 및 개별 로그인 시도 통합
- model.py: OAuth 모델 수정
2025-06-23 00:15:03 +09:00
c1ade99b8a
Merge pull request #19 from j93es/feat/setup
chore: 환경 설정 및 크로스 플랫폼 실행 파일
2025-06-22 23:45:27 +09:00
46a169f1d0
HotFix #1 2025-06-22 23:44:57 +09:00
4f90285bdd modified: README.md 2025-06-22 23:13:39 +09:00
3dd86982d7 chore: 환경 설정 및 크로스 플랫폼 실행 파일 2025-06-22 23:13:26 +09:00
75 changed files with 4152 additions and 2435 deletions

View file

@ -1,30 +1,38 @@
# 권장 (다른 모델로 교체 가능) [다른 모델로 교체시 성능 보장 불가]
ANONYMIZED_TELEMETRY=false ANONYMIZED_TELEMETRY=false
# ========== LLM ==========
GOOGLE_API_KEY= GOOGLE_API_KEY=
# 권장 (다른 모델로 교체 가능) [다른 모델로 교체시 성능 보장 불가] # 권장 (다른 모델로 교체 가능) [다른 모델로 교체시 성능 보장 불가]
GOOGLE_MODEL=gemini-2.5-flash-preview-05-20 GOOGLE_MODEL=gemini-2.5-flash
GOOGLE_PLANNER_MODEL=gemini-2.5-flash-preview-05-20 #GOOGLE_PLANNER_MODEL=gemini-2.5-flash # 왜 비활성화 되었나요? // Planner 모델이 오히려 문제를 일으키는 경우가 있어 비활성화했습니다. 필요시 활성화하세요.
# min(INITIAL_BACKOFF * (2 ** try_cnt), MAX_BACKOFF)만큼 API가 실패시 대기합니다.
INITIAL_BACKOFF=60
MAX_BACKOFF=600
#ENABLE_PLANNER_MODEL_OAUTH_LOGIN=true # OAuth 로그인 시 Planner 모델을 활성화합니다.
#ENABLE_PLANNER_MODEL_OAUTH_LIST=true # OAuth List를 찾을 때 Planner 모델을 활성화합니다.
# ========== Monitoring ==========
# 선택 # 선택
PROXY_HOST=127.0.0.1 PROXY_HOST=127.0.0.1
PROXY_PORT=11080 PROXY_PORT=11080
BACKEND_URL=http://localhost:11081 BACKEND_URL=http://localhost:11081
# provider 계정 (본인이 사용하지 않는 계정 권장) (Github, apple, kakao등 다른 계정 추가 가능) # https://docs.browser-use.com/development/observability - 선택
GOOGLE_ID= # Lmnr 계정이 필요합니다.
GOOGLE_PASSWORD= # https://lmnr.ai/
LMNR_PROJECT_API_KEY=
NAVER_ID= # 브라우저 언어 설정
NAVER_PASSWORD= LANG=en_US
HEADLESS=False # 브라우저를 헤드리스 모드로 실행할지 여부. True로 설정하면 브라우저가 보이지 않습니다.
FACEBOOK_ID= # ========= Account ==========
FACEBOOK_PASSWORD=
GITGUB_ID= # 필수 뒤에 있는 이메일 주소는 Google 계정의 로그인 힌트로 사용됩니다.
GITHUB_PASSWORD= # 이메일의 전체를 입력해주세요
GOOGLE_ID=whs.imnya.ng@gmail.com
LinkedIn_ID=
LinkedIn_PASSWORD=
Microsoft_ID=
Microsoft_PASSWORD=

2
.gitignore vendored
View file

@ -12,6 +12,7 @@ oauth_providers.csv
.venv .venv
.env .env
.sensitive.json
log_*.log log_*.log
domains.txt domains.txt
@ -82,5 +83,6 @@ my.sh
log.txt log.txt
data/ data/
!src/lib/utils/data
# End of https://www.toptal.com/developers/gitignore/api/macos,windows # End of https://www.toptal.com/developers/gitignore/api/macos,windows

42
.sensitive.example.json Normal file
View file

@ -0,0 +1,42 @@
{
"google.com": {
"x_username": "whs.imnya.ng@gmail.com",
"x_password": "Vb1Mz9pgjY8JVs"
},
"accounts.google.com": {
"x_username": "whs.imnya.ng@gmail.com",
"x_password": "Vb1Mz9pgjY8JVs"
},
"naver.com": {
"x_username": "oauth-test-test",
"x_password": "gx^AKz-289d3/7B"
},
"nid.naver.com": {
"x_username": "oauth-test-test",
"x_password": "gx^AKz-289d3/7B"
},
"github.com": {
"x_username": "imnyang-bot",
"x_password": "6PuVXCH9tpQLNm"
},
"apple.com": {
"x_username": "",
"x_password": ""
},
"appleid.apple.com": {
"x_username": "",
"x_password": ""
},
"microsoft.com": {
"x_username": "whs.imnya.ng@gmail.com",
"x_password": "WHS123987"
},
"login.microsoftonline.com": {
"x_username": "whs.imnya.ng@gmail.com",
"x_password": "WHS123987"
},
"facebook.com": {
"x_username": "01047183675",
"x_password": "whs3oauth@"
}
}

3
.vscode/settings.json vendored Normal file
View file

@ -0,0 +1,3 @@
{
"rust-analyzer.initializeStopped": true
}

109
README.md
View file

@ -1,51 +1,106 @@
# 참고하면 좋을만한 것
- [ ] 일부 웹사이트는 사용자의 언어에 따라 OAuth 옵션을 바꾸기도 합니다.
- [ ] https://docs.browser-use.com/customize/custom-functions
# 환경 설정 # 환경 설정
이 프로젝트는 [uv](https://docs.astral.sh/uv/getting-started/installation/)라는 Python 패키지 관리자를 사용하여 설정해야합니다. 요구 사항
또한 [oauth-backend](https://github.com/j93es/oauth-backend)가 설정된 상태여야만 합니다. - [uv](https://docs.astral.sh/uv/getting-started/installation/) - Python Package Manager Written by Rust
- [oauth-backend](https://github.com/j93es/oauth-backend)
- [Google Chrome](https://www.google.com/intl/ko_kr/chrome/)
---
> [oauth-backend](https://github.com/j93es/oauth-backend) 프록시를 사용한다면 이 가이드에 따라 인증서 또한 설정되어야만 합니다.
>
> 그렇지 않으면 실행되지 않습니다.
>
> 윈도우 환경에서는 `sudo certutil -addstore root mitmproxy-ca-cert.cer`로 인증합니다.
>
> Sudo가 활성화되어있지 않은 환경에서는 관리자로 상향된 쉘에서 실행합니다.
>
> MacOS 환경에서는 `sudo security add-trusted-cert -d -p ssl -p basic -k /Library/Keychains/System.keychain ~/.mitmproxy/mitmproxy-ca-cert.pem`으로 인증합니다.
>
> 다른 플렛폼은 수동으로 설정되어야만 합니다.
> https://docs.mitmproxy.org/stable/concepts/certificates/
현재 아래와 같은 환경에서 개발되며 테스트되고 있습니다.
- ✅ MacOS 26 Tahoe Developer Beta 2 (25A5295e) en-US aarch64
- ✅ Windows 11 Pro for Workstations 24H2 (26100.4351) en-US x86_64
- ✅ NixOS 25.05.804570.c7ab75210cb8 KDE 6 / Linux 6.15 x86_64
---
다음과 같은 명령어로 환경을 설정합니다.
설명하는 가이드를 잘 따라가면 설정할 수 있습니다.
```sh
uv run setup.py
```
uv 설치 후 다음과 같은 명령어를 입력합니다. uv 설치 후 다음과 같은 명령어를 입력합니다.
``` ```sh
uv sync uv sync
``` ```
venv와 패키지가 설치가 됩니다. venv와 패키지가 설치가 됩니다.
browser_use가 Playwright에 대한 의존성이 있어 브라우저 설치가 필요합니다 ---
``` `uv run setup.py`로 환경을 설정합니다.
playwright install chromium --with-deps --no-shell
```
다음과 같은 명령어로 실행합니다. ---
``` # 윈도우 인코딩 이슈 해결
uv run main.py 이거 해결 방법
``` ![image](https://github.com/user-attachments/assets/01ca45c2-fda9-44fb-83fc-39daa7e52092)
Environment는 .env.example에 따라 설정되어야합니다. ![image](https://github.com/user-attachments/assets/55c502f5-0bf7-44f8-bbb4-1dc1e0c8c3c3)
.env.example을 .env로 복사하여서 사용해주세요. 이것도 setup.py 사용하면 반자동으로 할 수 있습니다.
로그인을 수행하지 않을 OAuth Provider는 prompt에서 제거합니다. 못찾겠으면 intl.cpl 열어주세요.
# 실행 # 실행
domains.txt는 실행시 자동으로 다운로드 됩니다.
```sh ```sh
# domains.txt 받기
curl "https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt" -o domains.txt curl "https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt" -o domains.txt
# ./run.sh {domains.txt 시작 줄} {domains.txt 끝 줄} {HTML 검사 Skip}
./run.sh 12540 13000 False
``` ```
```sh
```pwsh # uv run run.py {domains.txt 시작 줄} {domains.txt 끝 줄} {--skh} {--no-download}
# ./run.ps1 {domains.txt 시작 줄} {domains.txt 끝 줄} {HTML 검사 Skip} uv run run.py 1 100 --skh
./run.ps1 12540 13000 False
``` ```
# Prompt 확장 가이드
## 1. 파일 생성
`lib/llm/prompt` 폴더에서 fallback 폴더를 복사하여
원하는 프로바이더를 추가해줍니다. `ex) lib/llm/prompt/Google/`
## 2. prompt.py 수정
Prompt에서 추가한 파일을 prompt.py에서 수정합니다.
만약 로그인 정보를 넣고 싶다면 Sensitive
`Log into example.com as user x_username with password x_password`
## 3. model.py
응답할 때 원하는 리턴 값을 `dict`로 받습니다.
## 4. \_\_init\_\_.py 수정
![image](https://github.com/user-attachments/assets/3aed2243-92d5-4359-8515-6d2f9bfa100b)
추가한 prompt에 따라 import합니다.
## 5. 사용 방법
```py
from lib.llm.prompt.fallback import prompt, model
```
# 참고하면 좋을만한 것
- [ ] 일부 웹사이트는 사용자의 언어에 따라 OAuth 옵션을 바꾸기도 합니다.
- [ ] https://docs.browser-use.com/customize/custom-functions

View file

@ -1,9 +0,0 @@
from lib.agents.get_sso_list import get_sso_list
# 업데이트될 버전 import 아직 개발 중
from lib.agents.get_sso_list_v2 import get_sso_list as get_sso_list_v2
from lib.agents.login_google import login_google
__all__ = [
"get_sso_list",
"login_google",
]

View file

@ -1,3 +0,0 @@
from lib.agents.get_sso_list.get_sso_list import get_sso_list
__all__ = ["get_sso_list"]

View file

@ -1,22 +0,0 @@
from lib.agents.get_sso_list.prompt import get_sso_list_task, FindLoginPageResponse
from lib.browser_use_utils.run_task import run_task
NOT_FOUND_LOGIN_PAGE = 0
FOUND_LOGIN_PAGE = 1
async def get_sso_list(target_url) -> tuple[bool, str | FindLoginPageResponse | None]:
task = get_sso_list_task
ReturnModel = FindLoginPageResponse
success, response = await run_task(target_url, ReturnModel, task)
if not success:
return False, response
if isinstance(response, str):
return False, response
return True, response

View file

@ -1,68 +0,0 @@
from pydantic import BaseModel
class FindLoginPageResponse(BaseModel):
msg: str | None = None
url: str | None = None
sso_list: list[str] = [] # List of SSO providers found on the login page
get_sso_list_task = """
You are an expert in finding login pages.
Your task is to navigate to the login page of the given URL. Follow the steps below strictly and return results only in the specified format.
You are NOT allowed to navigate to URLs that are not directly discoverable within the initial domain. Do NOT use search engines or guess external login URLs.
0. INITIAL BLOCK CHECK
- If the browser is blocked when trying to access the page due to firewall, CAPTCHA, regional restrictions, or other access denials immediately terminate the process and return the following JSON:
```json
{
"msg": "Blocked",
"url": "",
"sso_list": []
}
```
- Do NOT proceed to further steps in this case.
1. LOGIN PAGE NAVIGATION
- Navigate only to a **client-side (non-enterprise)** login page within the provided domain.
- Do NOT rely on external tools, search engines, or links not directly found on the site.
- If a consent popup (e.g. for privacy/cookies) appears, you MUST dismiss or close it before proceeding.
- Since step 0 confirmed access, assume the page now loads properly.
2. SSO BUTTON IDENTIFICATION
- On the login page, look for the following social login (SSO) buttons:
- Google, GitHub, Facebook, LinkedIn, Microsoft, Naver, Slack, Etc.
- Proceed only if it is clearly an **actual SSO button**.
- Exclude the following:
- Passkey-related buttons
- Username/password fields
- Email-based login
- Non-OAuth methods such as certificate or phone verification
3. RETURN FORMAT
- If the login page is successfully found, return:
```json
{
"msg": "Login page found",
"url": "https://example.com/login",
"sso_list": ["Google", "GitHub"]
}
```
- If the login page cannot be found, return:
```json
{
"msg": "Login page not found",
"url": "",
"sso_list": []
}
```
- If blocked (as in step 0), return:
```json
{
"msg": "Blocked",
"url": "",
"sso_list": []
}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -1,3 +0,0 @@
from lib.agents.get_sso_list_v2 import get_sso_list
__all__ = ["get_sso_list"]

View file

@ -1,20 +0,0 @@
from lib.agents.get_sso_list_v2.prompt import get_sso_list_task, FindLoginPageResponse
from lib.browser_use_utils.run_task import run_task
# TODO - Split find login page agent and get SSO list agent
async def get_sso_list(target_url) -> tuple[bool, str | FindLoginPageResponse | None]:
task = get_sso_list_task
ReturnModel = FindLoginPageResponse
success, response = await run_task(target_url, ReturnModel, task)
if not success:
return False, response
if isinstance(response, str):
return False, response
return True, response

View file

@ -1,3 +0,0 @@
from lib.agents.login_google.login_google import login_google
__all__ = ["login_google"]

View file

@ -1,11 +0,0 @@
from lib.agents.login_google.prompt import login_google_task, LoginGoogleResponse
from lib.browser_use_utils.run_task import run_task
async def login_google(target_url) -> tuple[bool, str | LoginGoogleResponse | None]:
task = login_google_task
ReturnModel = LoginGoogleResponse
success, response = await run_task(target_url, ReturnModel, task)
if not success:
return False, None
return True, response

View file

@ -1,15 +0,0 @@
from lib.browser_use_utils.clean_resources import clean_resources, clean_agent_resources, clean_session_resources
from lib.browser_use_utils.create_google_ai import create_google_ai
from lib.browser_use_utils.get_profile import get_profile
from lib.browser_use_utils.run_agent import run_agent
from lib.browser_use_utils.run_task import run_task
__all__ = [
"clean_resources",
"clean_agent_resources",
"clean_session_resources",
"create_google_ai",
"get_profile",
"run_agent",
"run_task",
]

View file

@ -1,21 +0,0 @@
async def clean_agent_resources(agent=None):
"""에이전트 리소스를 정리하는 함수"""
if agent:
try:
await agent.close()
except Exception as e:
print(f"⚠️ 에이전트 리소스 정리 실패: {e}")
async def clean_session_resources(session=None):
"""브라우저 리소스를 정리하는 함수"""
if session:
try:
await session.close()
except Exception as e:
print(f"⚠️ 브라우저 리소스 정리 실패: {e}")
async def clean_resources(agent=None, session=None):
"""리소스를 정리하는 함수"""
await clean_agent_resources(agent)
await clean_session_resources(session)

View file

@ -1,25 +0,0 @@
from langchain.callbacks.base import BaseCallbackHandler
from langchain_google_genai import ChatGoogleGenerativeAI
class QuotaExhaustedHandler(BaseCallbackHandler):
def on_llm_error(self, error, **kwargs):
if "ResourceExhausted" in str(error) or "429" in str(error):
print("⚠️ API 쿼터가 소진되었습니다. 재시도 로직에 위임합니다...")
# backoff handled in scan_one_url
def create_google_ai(model: str):
"""재시도 로직이 포함된 LLM 생성"""
if model == "fallback":
print("⚠️ Fallback 모델을 사용합니다. Envorinment 변수를 확인하세요.")
print("⚠️ Model Gemini-2.0-flash-lite를 사용합니다.")
model = "gemini-2.0-flash-lite"
return ChatGoogleGenerativeAI(
model=model,
max_retries=10, # 최대 재시도 횟수 증가
model_kwargs={
"request_timeout": 120, # 타임아웃 시간 증가 (2분)
},
callbacks=[QuotaExhaustedHandler()],
# API 호출 간격 조정
temperature=0.1,
)

View file

@ -1,97 +0,0 @@
import os
from pathlib import Path
from browser_use import BrowserProfile
async def get_storage_state():
"""Setup browser storage state for session persistence."""
# Get the script directory to ensure correct path resolution
storage_state_path = Path("data/storage_state.json")
storage_state_temp_path = Path("data/storage_state_temp.json")
print(f"📂 Storage state path: {storage_state_path}")
print(f"📂 Temp storage state path: {storage_state_temp_path}")
if storage_state_path.exists():
if storage_state_temp_path.exists():
storage_state_temp_path.unlink()
storage_state_temp_path.write_text(
storage_state_path.read_text(encoding="utf-8"), encoding="utf-8"
)
print(f"🔄 Using existing storage state: {storage_state_temp_path}")
return str(storage_state_temp_path)
print("⚠️ No existing storage state found")
return None
def get_proxy_url():
"""Configure proxy settings from environment variables."""
proxy_host = os.getenv("PROXY_HOST")
proxy_port = os.getenv("PROXY_PORT")
if proxy_host and proxy_port:
proxy_url = f"http://{proxy_host}:{proxy_port}"
print(f"🔗 Using proxy: {proxy_host}:{proxy_port}")
return proxy_url
else:
print("🔗 No proxy configured, using direct connection.")
return None
def get_browser_args():
"""Get browser arguments for enhanced compatibility and security."""
return [
# Security and isolation
"--disable-web-security",
"--disable-site-isolation-trials",
"--disable-features=IsolateOrigins,site-per-process",
"--ignore-certificate-errors",
"--ignore-ssl-errors",
"--allow-running-insecure-content",
# Performance and rendering
"--disable-features=VizDisplayCompositor",
"--disable-dev-shm-usage",
# Popup and automation
"--disable-popup-blocking",
"--disable-blink-features=AutomationControlled",
# Browser behavior
"--no-first-run",
"--no-service-autorun",
"--no-default-browser-check",
"--password-store=basic",
"--use-mock-keychain",
# Extensions
"--disable-extensions-file-access-check",
"--disable-extensions-http-throttling",
"--disable-component-extensions-with-background-pages",
# Language
f"--lang={os.getenv('LANG', 'en_US')}",
]
async def get_profile():
proxy_url = get_proxy_url()
storage_state_path = await get_storage_state()
profile = BrowserProfile(
# Security settings
disable_security=True,
stealth=True,
# Display settings
headless=False,
device_scale_factor=1,
window_size={"width": 1600, "height": 900},
viewport={"width": 1600, "height": 900},
# Data persistence
user_data_dir=None,
storage_state=storage_state_path,
# Network settings
proxy={"server": proxy_url} if proxy_url else None,
# Additional arguments
args=get_browser_args(),
)
return profile

View file

@ -1,40 +0,0 @@
from typing import Any
from pydantic import BaseModel
from lib.browser_use_utils.clean_resources import clean_agent_resources
from lib.config import GOOGLE_MODEL
from browser_use import (
Agent,
Controller,
)
from lib.browser_use_utils.create_google_ai import create_google_ai
async def run_agent(session, initial_actions, ReturnModel: type[BaseModel], task: str) -> tuple[bool, str, Any | None]:
controller = Controller(output_model=ReturnModel, exclude_actions=['search_google'])
agent = Agent(
browser_session=session,
initial_actions=initial_actions,
task=task,
llm=create_google_ai(GOOGLE_MODEL),
controller=controller,
)
try:
response = await agent.run()
final_result = response.final_result()
if final_result is None:
return False, "LLM이 반환한 최종 결과가 없습니다.", None
except Exception as e:
# API 쿼터 문제인지 확인
if "ResourceExhausted" in str(e) or "429" in str(e):
return False, "API 쿼터 에러로 인한 실패", None
# 일반 에러 처리
else:
return False, "일반 에러로 인한 실패", None
finally:
await clean_agent_resources(agent)
return True, "ok", final_result

View file

@ -1,40 +0,0 @@
import json
from typing import Any
from pydantic import BaseModel
from browser_use import (
BrowserSession
)
from patchright.async_api import async_playwright as async_patchright
from lib.utils.logger import logger
from lib.browser_use_utils import get_profile, clean_session_resources, run_agent
async def run_task(target_url: str, ReturnModel: type[BaseModel], task: str) -> tuple[bool, type[BaseModel] | None]:
session = BrowserSession(
playwright=(await async_patchright().start()),
browser_profile=await get_profile(),
)
initial_actions = [{"open_tab": {"url": target_url}}]
seccess, msg, final_result = await run_agent(session=session,
initial_actions=initial_actions,
ReturnModel=ReturnModel,
task=task)
if not seccess:
logger(f"⚠️ LLM 실행 실패: {target_url} | {msg}")
print(f"⚠️ LLM 실행 실패: {target_url} | {msg}")
await clean_session_resources(session)
return False, None
try:
data = json.loads(final_result)
resp = ReturnModel(**data)
return True, resp
except Exception as e:
logger(f"⚠️ LLM 응답 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
print(f"⚠️ LLM 응답 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
return False, None
finally:
await clean_session_resources(session)

View file

@ -1,20 +0,0 @@
from lib.utils.env_checker import check_env_variables
from lib.utils.is_html import is_html_url
from lib.utils.logger import logger
from lib.utils.notify_backend import notify_backend
from lib.utils.progress_checker import save_progress, load_progress
# v2 import => 아직 개발 중
from lib.utils.progress_checker_v2 import ProgressChecker
from lib.utils.read_txt import read_lines_between
from lib.utils.save_oauth_providers import save_oauth_providers
__all__ = [
"check_env_variables",
"is_html_url",
"logger",
"notify_backend",
"read_lines_between",
"save_progress",
"load_progress",
"save_oauth_providers",
]

View file

@ -1,15 +0,0 @@
import os
from dotenv import load_dotenv
load_dotenv()
def check_env_variables():
"""환경변수 체크 함수"""
required_vars = [
"BACKEND_URL",
"GOOGLE_API_KEY",
"GOOGLE_MODEL",
]
for var in required_vars:
if os.getenv(var) is None:
raise ValueError(f"{var} 환경변수가 설정되지 않았습니다.")

View file

@ -1,21 +0,0 @@
import json
import os
from pathlib import Path
progress_file = Path("data/scan_progress.json")
def save_progress(current_progress):
"""현재 진행 상황을 파일에 저장"""
with open(progress_file, 'w', encoding='utf-8') as f:
json.dump(current_progress, f, ensure_ascii=False, indent=2)
def load_progress():
"""이전 진행 상황을 파일에서 불러오기"""
if os.path.exists(progress_file):
try:
with open(progress_file, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return None
return None

View file

@ -1,25 +0,0 @@
import json
import os
from pathlib import Path
progress_file = Path("data/scan_progress.json")
class ProgressChecker:
def __init__(self, filepath):
self.filepath = filepath
self.progress = self.load_progress()
def save(self):
"""현재 진행 상황을 파일에 저장"""
with open(self.filepath, 'w', encoding='utf-8') as f:
json.dump(self.progress, f, ensure_ascii=False, indent=2)
def load(self):
"""이전 진행 상황을 파일에서 불러오기"""
if os.path.exists(self.filepath):
try:
with open(self.filepath, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return None
return None

View file

@ -1,13 +0,0 @@
import csv
import os
def save_oauth_providers(url, oauth_entries):
csv_file = "./oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri"])
for entry in oauth_entries:
writer.writerow([url, entry.provider or None, entry.oauth_uri or None])
print(f"✅ OAuth providers saved to {csv_file}\n")

167
main.py
View file

@ -1,167 +0,0 @@
import asyncio
import argparse
import signal
from dotenv import load_dotenv
from lib.config import BACKEND_URL
from lib.utils import notify_backend, is_html_url, read_lines_between, save_progress, load_progress, check_env_variables
from lib.agents import get_sso_list, login_google
load_dotenv()
check_env_variables()
backend_url = BACKEND_URL
login_agents = {
"google": login_google
}
# ── URL별로 Browser를 새로 띄우는 함수 ──
async def scan_one_url(url: str, skip_html_check: bool = False):
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🚀 Starting scan for: {target_url}")
# 1) URL이 HTML 페이지인지 확인
if not is_html_url(target_url) and not skip_html_check:
print(f"{target_url} 은(는) HTML이 아닙니다. 스킵합니다.")
return
# Backend에 스캔 시작 알림
notify_backend(target_url)
success, response = await get_sso_list(target_url)
if not success:
return
if len(response.sso_list) == 0:
return
for sso in response.sso_list:
target_login_agent = login_agents.get(sso.lower())
if target_login_agent:
print(f"🔍 {target_url} 에서 SSO 발견: {sso}, 로그인 시도 중...")
success, login_response = await target_login_agent(target_url)
if not success:
print(f"⚠️ {target_url} 에서 {sso} 로그인 실패")
continue
print(f"{target_url} 에서 {sso} 로그인 성공: {login_response.final_url}")
else:
print(f"{target_url} 에서 SSO 발견: {sso} | TODO")
# Backend에 스캔 완료 알림
# 오탐 검증
current_progress = {"current_index": 0, "total": 0, "current_url": "", "start_line": 0}
def signal_handler(signum, frame):
"""Ctrl+C 시그널 핸들러"""
print("\n" + "="*60)
print("🛑 스캔이 중단되었습니다!")
print(f"📊 진행 상황:")
print(f" - 전체: {current_progress['total']}개 URL")
print(f" - 완료: {current_progress['current_index']}개 URL")
print(f" - 현재 처리 중: {current_progress['current_url']}")
print(f" - domains.txt의 {current_progress['current_index']}번째 줄")
print(f" - 진행률: {current_progress['current_index']}/{current_progress['total']} ({current_progress['current_index']/current_progress['total']*100:.1f}%)")
print("="*60)
save_progress(current_progress)
exit(0)
signal.signal(signal.SIGINT, signal_handler)
async def loop(
filepath: str, start_line: int, end_line: int, skip_html_check: bool = False
):
# 인자값으로 받은 파일 경로와 줄 범위를 통해 도메인 리스트 생성
target_list = read_lines_between(
filepath=filepath, start_line=start_line, end_line=end_line
)
# 진행 상황 초기화
current_progress["total"] = len(target_list)
current_progress["start_line"] = start_line
current_progress["current_index"] = 0
# 이전 진행 상황 확인
prev_progress = load_progress()
if prev_progress and prev_progress.get("start_line") == start_line:
print(f"📋 이전 진행 상황을 발견했습니다:")
print(f" - 이전 완료: {prev_progress['current_index']}/{prev_progress['total']}")
print(f" - 마지막 처리: {prev_progress.get('current_url', 'N/A')}")
resume = input("이어서 진행하시겠습니까? (Y/n): ").lower().strip()
if resume != 'n':
current_progress["current_index"] = prev_progress["current_index"]
target_list = target_list[current_progress["current_index"]:]
print(f"{current_progress['current_index']}번째부터 재개합니다.")
# (필요하다면) 강제 설정이 필요한 경우, 아래 주석을 해제하여 target_list[0] 등을 덮어쓸 수 있습니다.
# target_list[0] = "velog.io"
for i, url in enumerate(target_list):
actual_index = current_progress["current_index"] + i
current_progress["current_url"] = url
current_progress["current_index"] = actual_index
print(f"\n🔄 Processing {actual_index + 1}/{current_progress['total']}: {url}")
print(f"📍 domains.txt의 {actual_index}번째 줄")
await scan_one_url(url, skip_html_check=skip_html_check)
# 진행 상황 저장
current_progress["current_index"] = actual_index + 1
save_progress(current_progress)
print("⏳ API 쿼터 보호를 위해 10초 대기 중...")
await asyncio.sleep(10)
print(f"\n🎉 모든 스캔이 완료되었습니다! ({current_progress['total']}개 URL)")
def main():
parser = argparse.ArgumentParser(
prog="domain_scanner",
description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다."
)
# 커맨드라인 인자로 받을 옵션들 정의
parser.add_argument(
"-f", "--file",
type=str,
required=True,
help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)"
)
parser.add_argument(
"-s", "--start",
type=int,
required=True,
help="읽기 시작 줄 번호 (1-based)"
)
parser.add_argument(
"-e", "--end",
type=int,
required=True,
help="읽기 종료 줄 번호 (1-based)"
)
parser.add_argument(
"-skh", "--skip-html-check",
type=bool,
default=False,
help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다. (기본값: False)"
)
args = parser.parse_args()
# 인자값을 비동기 함수에 전달
asyncio.run(loop(
filepath=args.file,
start_line=args.start,
end_line=args.end,
skip_html_check=args.skip_html_check
))
if __name__ == "__main__":
main()

View file

@ -5,6 +5,10 @@ description = "Add your description here"
readme = "README.md" readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"
dependencies = [ dependencies = [
"browser-use[memory]==0.2.7", "black>=25.1.0",
"patchright==1.52.5", "browser-use[memory]==0.5.3",
"chardet>=5.2.0",
"isort>=6.0.1",
"lmnr[all]>=0.6.10",
"patchright>=1.52.5",
] ]

52
run.ps1
View file

@ -1,52 +0,0 @@
# ── 설정 부분 ──
# 실행할 Python 스크립트 이름 (파일 확장자까지)
$PYTHON_SCRIPT = "main.py"
# 도메인 목록 파일 경로 (Python 스크립트 실행 시 -f 옵션에 전달)
$DOMAIN_FILE = "./domains.txt"
# 몇 줄씩(chunk) 나눠서 실행할지
$CHUNK_SIZE = 10
# ─────────────
# https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt
# domains.txt 파일을 다운로드하는 명령어
curl "https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt" -o $DOMAIN_FILE
# 인자 개수 확인 (2개 또는 3개)
if ($args.Count -lt 2 -or $args.Count -gt 3) {
Write-Host "Usage: $($MyInvocation.MyCommand.Name) <start_line> <end_line> [skip_header]"
Write-Host "예시) $($MyInvocation.MyCommand.Name) 10000 11000"
Write-Host "예시) $($MyInvocation.MyCommand.Name) 10000 11000 True"
exit 1
}
$START_LINE = [int]$args[0]
$END_LINE = [int]$args[1]
$SKIP_HEADER = if ($args.Count -eq 3) { $args[2] } else { "False" }
# START_LINE부터 END_LINE까지 CHUNK_SIZE 만큼씩 반복
$current = $START_LINE
while ($current -le $END_LINE) {
# 각 청크 구간의 마지막 줄 계산
$chunk_end = $current + $CHUNK_SIZE - 1
if ($chunk_end -gt $END_LINE) {
$chunk_end = $END_LINE
}
$timestamp = Get-Date -Format "yyyy-MM-dd HH:mm:ss"
Write-Host "[$timestamp] Processing lines $current to $chunk_end..."
# Python 스크립트 실행
# -f DOMAIN_FILE: 도메인 목록 파일 경로
# -s current : 읽기 시작 줄
# -e chunk_end: 읽기 끝 줄
# -skh SKIP_HEADER: 헤더 스킵 여부
uv run $PYTHON_SCRIPT -f $DOMAIN_FILE -s $current -e $chunk_end -skh $SKIP_HEADER
# 다음 청크의 시작 값 설정
$current = $chunk_end + 1
}
Write-Host "모든 청크 처리 완료."

165
run.py Normal file
View file

@ -0,0 +1,165 @@
import argparse
import os
import signal
import subprocess
import sys
from datetime import datetime
import requests
#!/usr/bin/env python3
# ── 설정 부분 ──
PYTHON_SCRIPT = "./src/main.py"
DOMAIN_FILE = "./data/domains.txt"
# ─────────────
def download_domains():
"""도메인 파일 다운로드"""
try:
print("도메인 파일 다운로드 중...")
response = requests.get(
"https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt"
)
response.raise_for_status()
# 디렉토리가 없으면 생성
os.makedirs(os.path.dirname("./data"), exist_ok=True)
with open(DOMAIN_FILE, "w", encoding="utf-8") as f:
f.write(response.text)
print("도메인 파일 다운로드 완료")
except requests.RequestException as e:
print(f"도메인 파일 다운로드 실패: {e}")
sys.exit(1)
def run_script(start_line, end_line, skh_option):
"""Python 스크립트 실행"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{current_time}] Processing lines {start_line} to {end_line}...")
process = None
signal_handled = False
def signal_handler(sig, frame):
nonlocal signal_handled
if signal_handled:
return
signal_handled = True
print("\n🛑 종료 신호를 받았습니다. 정리 작업을 진행합니다...")
if process:
try:
# 자식 프로세스에 SIGTERM 전송
print("📤 서브프로세스에 종료 신호를 전달합니다...")
process.terminate()
# 5초간 대기
process.wait(timeout=5)
print("✅ 서브프로세스가 정상적으로 종료되었습니다.")
except subprocess.TimeoutExpired:
print("⚠️ 서브프로세스가 응답하지 않아 강제 종료합니다...")
process.kill()
try:
process.wait(timeout=3)
print("✅ 서브프로세스가 강제 종료되었습니다.")
except subprocess.TimeoutExpired:
print("❌ 서브프로세스 강제 종료 실패")
except Exception as e:
print(f"❌ 프로세스 종료 중 오류: {e}")
print("✅ 런처 종료 완료.")
sys.exit(0)
# 원래 시그널 핸들러 저장
original_sigint = signal.signal(signal.SIGINT, signal_handler)
original_sigterm = signal.signal(signal.SIGTERM, signal_handler)
try:
command = [
"uv",
"run",
PYTHON_SCRIPT,
"-f",
DOMAIN_FILE,
"-s",
str(start_line),
"-e",
str(end_line),
]
if skh_option:
command.append("--skip-html-check")
process = subprocess.Popen(command)
returncode = process.wait()
if returncode != 0:
print(f"❌ Python 스크립트가 오류 코드 {returncode}로 종료되었습니다.")
sys.exit(returncode)
except KeyboardInterrupt:
signal_handler(signal.SIGINT, None)
except Exception as e:
print(f"❌ 스크립트 실행 중 오류: {e}")
if process:
try:
process.terminate()
process.wait(timeout=3)
except subprocess.TimeoutExpired:
process.kill()
sys.exit(1)
finally:
# 시그널 핸들러 복원
signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGTERM, original_sigterm)
def main():
parser = argparse.ArgumentParser(
description="도메인 처리 스크립트 실행기",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
사용 예시:
uv run run.py 10000 11000 # 10000~11000 라인 처리
uv run run.py 10000 11000 --skh # SKH 옵션 활성화
uv run run.py 10000 11000 --no-download # 다운로드 생략
""",
)
parser.add_argument("start_line", type=int, help="시작 라인 번호")
parser.add_argument("end_line", type=int, help="종료 라인 번호")
parser.add_argument("--skh", action="store_true", help="SKH 옵션 활성화")
parser.add_argument(
"--no-download", action="store_true", help="도메인 파일 다운로드 생략"
)
args = parser.parse_args()
# 라인 범위 검증
if args.start_line < 0 or args.end_line < 0:
print("라인 번호는 0 이상이어야 합니다.")
sys.exit(1)
if args.start_line > args.end_line:
print("시작 라인은 종료 라인보다 크거나 같아야 합니다.")
sys.exit(1)
# 도메인 파일 다운로드
if not args.no_download:
download_domains()
elif not os.path.exists(DOMAIN_FILE):
print(
f"도메인 파일({DOMAIN_FILE})이 존재하지 않습니다. --no-download 옵션을 제거하거나 파일을 준비해주세요."
)
sys.exit(1)
# 스크립트 실행
run_script(args.start_line, args.end_line, args.skh)
print("처리 완료.")
if __name__ == "__main__":
main()

40
run.sh
View file

@ -1,40 +0,0 @@
#!/bin/bash
# ── 설정 부분 ──
PYTHON_SCRIPT="main.py"
DOMAIN_FILE="./domains.txt"
CHUNK_SIZE=10
# ─────────────
# curl "https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt" -o $DOMAIN_FILE
# 인자 개수 확인
if [ $# -lt 2 ]; then
echo "Usage: $0 <start_line> <end_line> [skh_option]"
echo "예시) $0 10000 11000 True"
exit 1
fi
START_LINE=$1
END_LINE=$2
SKH_OPTION=$3
if [ -z "$SKH_OPTION" ]; then
SKH_OPTION="False"
fi
current=$START_LINE
while [ "$current" -le "$END_LINE" ]; do
chunk_end=$(( current + CHUNK_SIZE - 1 ))
if [ "$chunk_end" -gt "$END_LINE" ]; then
chunk_end=$END_LINE
fi
echo "[$(date '+%Y-%m-%d %H:%M:%S')] Processing lines ${current} to ${chunk_end}..."
uv run "$PYTHON_SCRIPT" -f "$DOMAIN_FILE" -s "$current" -e "$chunk_end" -skh $SKH_OPTION
current=$(( chunk_end + 1 ))
sleep 1 # 1초 대기
done
echo "모든 청크 처리 완료."

263
setup.py Normal file
View file

@ -0,0 +1,263 @@
import os
import subprocess
import webbrowser
import asyncio
from browser_use import BrowserProfile, Agent
from browser_use.llm import ChatGoogle
from dotenv import load_dotenv
import threading
load_dotenv(verbose=True, override=True)
os.makedirs(os.path.dirname("./data"), exist_ok=True)
def create_file_from_example(target: str, example: str) -> bool:
if not os.path.exists(target):
if os.path.exists(example):
with (
open(example, "r", encoding="utf-8") as example_file,
open(target, "w", encoding="utf-8") as target_file,
):
target_file.write(example_file.read())
# os.startfile(target)
print(f"{target} 파일이 {example}에서 생성되었습니다.")
return True
else:
print(
f"⚠️ {example} 파일이 존재하지 않습니다. {target} 생성에 실패했습니다."
)
else:
print(f" {target} 파일이 이미 존재합니다.")
return False
def install_playwright_chrome():
print("\n🛠️ Playwright의 Chromium을 설치 중입니다...")
print("👉 이 작업은 시간이 걸릴 수 있습니다. 잠시 기다려주세요.")
try:
subprocess.run(["uv", "run", "playwright", "install", "chromium"], check=True)
print("✅ Playwright Chrome 설치 완료.")
except subprocess.CalledProcessError as e:
if "already" in e.stdout.decode():
print(" Chrome이 이미 설치되어 있습니다.")
else:
print(f"❌ Playwright 설치 실패: {e}")
print("\n")
def prompt_yes_no(message: str) -> bool:
print(message, end="")
return input().strip().lower() in ["y", "yes"]
def i_dont_like_windows():
# Windows인지 확인
if os.name != "nt":
return
else:
# run (Get-ItemProperty "HKLM:\SYSTEM\CurrentControlSet\Control\Nls\CodePage").ACP
try:
result = subprocess.run(
[
"powershell",
"-Command",
'(Get-ItemProperty "HKLM:\\SYSTEM\\CurrentControlSet\\Control\\Nls\\CodePage").ACP',
],
capture_output=True,
text=True,
check=True,
)
acp = result.stdout.strip()
if acp == "65001":
print("현재 Active Code Page가 UTF-8로 설정되어 있습니다.")
return
else:
print("현재 Active Code Page가 UTF-8로 설정되어 있지 않습니다.")
except subprocess.CalledProcessError as e:
print(f"코드 페이지 확인 실패: {e}")
print("=======================================================")
print("\n⚠️ Windows에서는 인코딩 문제가 발생합니다.")
print("👉 엔터를 누르면 자동으로 intl.cpl이 열립니다.")
print('👉 자세한 내용은 README.md에서 "윈도우 인코딩 해결"을 참조해주세요.\n')
print(
"⚠️ 경고 : 이 작업은 윈도우에서 킹갓 대한민국의 프로그램들의 한글이 정상적으로 표시되지 않을 수 있습니다."
)
# Pause
input("계속하려면 Enter 키를 누르세요...")
webbrowser.open("intl.cpl")
print("👉 intl.cpl가 열렸습니다.\n")
print("👉 관리자 옵션 -> 시스템 로켈 변경")
print("👀 Beta: 세계 언어 지원을 위해 Unicode UTF-8 사용")
print("👉 이 설정을 변경한 후, 시스템을 재시작하세요.\n")
print("⚠️ 이 작업은 시스템 언어 설정을 변경하므로 주의가 필요합니다.\n")
print("=======================================================")
input("계속하려면 Enter 키를 누르세요...")
async def setup_user_data():
print("\n📂 사용자 데이터 디렉토리를 설정하시겠습니까?")
print("⚠️ 사용자 데이터 디렉토리는 브라우저의 프로필 데이터를 저장하는 곳입니다.")
print("✅ 이 작업은 Google API Key를 설정하고 나서 진행해야만합니다.")
if prompt_yes_no("\033[1m\033[33m선택하시려면 y를 입력하세요 (y/n):\033[0m "):
if os.getenv("GOOGLE_API_KEY") is None:
print(
"⚠️ Google API Key가 설정되어 있지 않습니다. 먼저 Google API Key를 설정해주세요."
)
return
print("======================================================")
llm = ChatGoogle(
model="gemini-2.0-flash",
)
initial_actions = [
{"go_to_url": {"url": "https://www.google.com", "new_tab": False}},
{"wait": {"seconds": 2147483647}},
]
agent = Agent(
task="Just Wait",
llm=llm,
use_vision=False,
initial_actions=initial_actions,
browser_profile=BrowserProfile(
disable_security=True,
# stealth=True,
headless=False,
device_scale_factor=1,
window_size={"width": 1600, "height": 900},
viewport={"width": 1600, "height": 900},
user_data_dir="./data/user_data",
args=[
# "--disable-features=Translate,PasswordManagerDefaultEnabled",
],
ignore_default_args=[
"--disable-datasaver-prompt",
"--disable-component-extensions-with-background-pages",
"--disable-prompt-on-repost",
"--safeBrowse-disable-auto-update",
"--install-autogenerated-theme=0,0,0",
"--disable-speech-synthesis-api",
"--ash-no-nudges",
"--test-type=gpu",
"--noerrdialogs",
"--disable-external-intent-requests",
"--disable-breakpad",
"--disable-backgrounding-occluded-windows",
"--export-tagged-pdf",
"--disable-focus-on-load",
"--suppress-message-center-popups",
"--disable-renderer-backgrounding",
"--hide-crash-restore-bubble",
"--disable-back-forward-cache",
"--allow-legacy-extension-manifests",
# "--disable-field-trial-config", # 왜 이걸 끄면 웹사이트가 압축된 형태로 보이는 진 모르곘음
"--disable-popup-blocking",
"--disable-background-networking",
"--no-first-run",
"--disable-blink-features=AutomationControlled",
"--password-store=basic",
"--enable-network-information-downlink-max",
"--allow-pre-commit-input",
"--enable-features=NetworkService,NetworkServiceInProcess",
"--metrics-recording-only",
"--silent-debugger-extension-api",
"--disable-features=AcceptCHFrame,AutoExpandDetailsElement,AvoidUnnecessaryBeforeUnloadCheckSync,CertificateTransparencyComponentUpdater,DestroyProfileOnBrowserClose,DialMediaRouteProvider,ExtensionManifestV2Disabled,GlobalMediaControls,HttpsUpgrades,ImprovedCookieControls,LazyFrameLoading,LensOverlay,MediaRouter,PaintHolding,ThirdPartyStoragePartitioning,Translate,AutomationControlled,BackForwardCache,OptimizationHints,ProcessPerSiteUpToMainFrameThreshold,InterestFeedContentSuggestions,CalculateNativeWinOcclusion,HeavyAdPrivacyMitigations,PrivacySandboxSettings4,AutofillServerCommunication,CrashReporting,OverscrollHistoryNavigation,InfiniteSessionRestore,ExtensionDisableUnsupportedDeveloper",
"--disable-ipc-flooding-protection",
"--disable-hang-monitor",
"--disable-dev-shm-usage",
"--disable-client-side-phishing-detection",
"--log-level=2",
"--generate-pdf-document-outline",
"--disable-speech-api",
"--disable-search-engine-choice-screen",
"--no-service-autorun",
"--no-pings",
"--disable-component-update",
'--simulate-outdated-no-au="Tue, 31 Dec 2099 23:59:59 GMT"',
"--disable-background-timer-throttling",
"--use-mock-keychain",
"--disable-features=IsolateOrigins,site-per-process",
# 아래는 기존 예시에 있던 인자들입니다. 필요에 따라 유지하거나 제거하세요.
"--enable-automation",
"--disable-extensions",
"--hide-scrollbars",
],
),
)
print("======================================================\n")
print(
"👉 브라우저가 열립니다. 필요한 로그인을 완료한 후 엔터키를 눌러 다음 단계로 진행하세요."
)
input("계속하려면 Enter 키를 누르세요...\n")
print("======================================================")
# 브라우저를 백그라운드에서 시작
def run_agent():
asyncio.run(agent.run())
agent_thread = threading.Thread(target=run_agent)
agent_thread.daemon = True
agent_thread.start()
# 사용자가 'n'을 입력할 때까지 대기
while True:
user_input = input("").strip().lower()
if user_input == "":
agent.stop()
break
print("======================================================")
print("✅ 설정이 완료되었습니다.")
else:
print("🚫 설정이 취소되었습니다.")
print("======================================================")
print(
"⚠️ 이후에 USER_DATA_DIR을 설정하려면, .env 파일을 참고하여 USER_DATA_DIR을 설정하세요.\n"
)
def setup_sensitive():
print("\n🔐 Sensitive Data을 설정하시겠습니까?")
print("👉 이미 세션을 설정했다면, 이 작업은 **선택사항**입니다.")
print(
"⚠️ 민감 정보 파일은 오류를 유발하거나 문제가 될 수 있으므로 가급적 세션 사용을 권장합니다."
)
if prompt_yes_no("\033[1m\033[33m선택하시려면 y를 입력하세요 (y/n):\033[0m "):
print("======================================================")
print("👀 .sensitive.json 파일을 생성합니다.")
print("💾 Browser Use의 문서를 참조하여 수정을 수정해주세요.")
print("https://docs.browser-use.com/customize/sensitive-data")
create_file_from_example(".sensitive.json", ".sensitive.example.json")
print("======================================================")
print("✅ .sensitive.json 파일이 생성되었습니다.")
else:
print("🚫 .sensitive.json 생성이 취소되었습니다.")
print("======================================================")
print(
"⚠️ 이후에 민감 정보 파일을 설정하려면, .sensitive.example.json 파일을 참고하여 .sensitive.json 파일을 생성하세요.\n"
)
if __name__ == "__main__":
# 1. .env 생성
create_file_from_example(".env", ".env.example")
print("=====================================================")
# 2. Playwright용 Chrome 설치
install_playwright_chrome()
print("=====================================================")
# 3. Windows 인코딩 문제 해결
# i_dont_like_windows()
# print("=====================================================")
# 4. Setup User Data
asyncio.run(setup_user_data())
print("=====================================================")
# 5. .sensitive.json 생성
# setup_sensitive()
print("=====================================================")
print("🎉 초기 설정이 완료되었습니다! 이제 스크립트를 실행할 준비가 되었습니다.")

View file

@ -0,0 +1,6 @@
from lib.browser_use.agents import *
from lib.browser_use.func import *
from lib.browser_use.init_profile import *
from lib.browser_use.model import *
from lib.browser_use.scanner import *
from lib.browser_use.sensitive_data import *

View file

@ -0,0 +1,417 @@
import asyncio
import json
import os
import shutil
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from browser_use import Agent, BrowserSession, Controller
from patchright.async_api import async_playwright as async_patchright
from lib.browser_use.init_profile import GetProfile
from lib.browser_use.sensitive_data import GetSensitiveData
from lib.llm import CreateChatGoogle, get_prompt
from lib.utils import config, logger
# Exponential backoff settings
INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", "60")) # seconds
MAX_BACKOFF = int(os.getenv("MAX_BACKOFF", "600")) # seconds
@dataclass
class RetryTask:
"""재시도할 작업을 나타내는 클래스"""
task_type: str # "oauth_list" or "oauth_login"
url: str
oauth_provider: Optional[str] = None
retry_count: int = 0
next_retry_time: Optional[datetime] = None
max_retries: int = 5
# 전역 재시도 큐
retry_queue: list[RetryTask] = []
retry_queue_lock = asyncio.Lock()
async def add_to_retry_queue(task: RetryTask):
"""작업을 재시도 큐에 추가"""
async with retry_queue_lock:
# 중복 작업 확인
existing_task = None
for existing in retry_queue:
if (
existing.task_type == task.task_type
and existing.url == task.url
and existing.oauth_provider == task.oauth_provider
):
existing_task = existing
break
if existing_task:
# 기존 작업이 있으면 재시도 횟수 업데이트
existing_task.retry_count = task.retry_count
existing_task.next_retry_time = task.next_retry_time
print(
f"📝 기존 작업 업데이트: {task.task_type} - {task.url} (재시도: {task.retry_count})"
)
else:
# 새 작업 추가
retry_queue.append(task)
print(
f" 재시도 큐에 작업 추가: {task.task_type} - {task.url} (재시도: {task.retry_count})"
)
async def process_retry_queue():
"""재시도 큐 처리"""
async with retry_queue_lock:
now = datetime.now()
ready_tasks = []
for task in retry_queue[:]: # 복사본에서 반복
if task.next_retry_time and task.next_retry_time <= now:
ready_tasks.append(task)
retry_queue.remove(task)
if ready_tasks:
print(f"🔄 {len(ready_tasks)}개의 재시도 작업 처리 중...")
for task in ready_tasks:
try:
if task.task_type == "oauth_list":
result = await _extract_oauth_list_internal(task.url)
if result:
print(f"✅ 재시도 성공: OAuth 리스트 추출 - {task.url}")
else:
await _handle_retry_failure(task)
elif task.task_type == "oauth_login":
result = await _test_oauth_login_internal(
task.url, task.oauth_provider
)
if result:
print(
f"✅ 재시도 성공: {task.oauth_provider} 로그인 - {task.url}"
)
else:
await _handle_retry_failure(task)
except Exception as e:
print(f"❌ 재시도 중 에러: {e}")
await _handle_retry_failure(task)
async def _handle_retry_failure(task: RetryTask):
"""재시도 실패 처리"""
if task.retry_count < task.max_retries:
task.retry_count += 1
wait_time = min(INITIAL_BACKOFF * (2**task.retry_count), MAX_BACKOFF)
task.next_retry_time = datetime.now() + timedelta(seconds=wait_time)
await add_to_retry_queue(task)
print(f"{wait_time}초 후 재시도 예정: {task.task_type} - {task.url}")
else:
print(f"❌ 최대 재시도 횟수 초과: {task.task_type} - {task.url}")
logger(f"❌ 최대 재시도 횟수 초과: {task.task_type} - {task.url}")
async def get_retry_queue_status():
"""재시도 큐 상태 조회"""
async with retry_queue_lock:
return {
"queue_length": len(retry_queue),
"tasks": [
{
"task_type": task.task_type,
"url": task.url,
"oauth_provider": task.oauth_provider,
"retry_count": task.retry_count,
"next_retry_time": (
task.next_retry_time.isoformat()
if task.next_retry_time
else None
),
}
for task in retry_queue
],
}
async def _run_agent_with_retry(agent_config):
"""Agent 실행을 위한 내부 헬퍼 함수 (재시도 로직 포함)"""
agent = None
session = None
try_cnt = 0
url = agent_config["url"]
headless = os.getenv("HEADLESS", "False").lower() == "true"
while try_cnt < 3:
try:
Profile = await GetProfile(headless=headless)
session = BrowserSession(
playwright=(await async_patchright().start()),
browser_profile=Profile[0],
)
agent = Agent(browser_session=session, **agent_config["agent_params"])
response = await agent.run()
if any(
keyword in str(response)
for keyword in [
"429",
"resource_exhausted",
"resourceexhausted",
"quota",
"rate limit",
"too many requests",
"exceeded",
"limit reached",
]
):
print(f"⚠️ API 쿼터 에러 발생, 재시도 큐에 추가: {url}")
task = RetryTask(
task_type=agent_config.get("task_type", "unknown"),
url=url,
retry_count=try_cnt + 1,
next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF),
)
await add_to_retry_queue(task)
return None
# remove profile
print(Profile)
if Profile[1] and isinstance(Profile[1], str):
print(1)
shutil.rmtree(Profile[1], ignore_errors=True)
print(f"🗑️ 임시 프로필 디렉토리 삭제 완료: {Profile[1]}")
return response
except Exception as e:
# 일반 에러 처리
try_cnt += 1
if try_cnt >= 3:
error_msg = f"최대 재시도 횟수 초과."
logger(
f"{url} - {agent_config['log_context']} 실패: {error_msg}: {e}"
)
print(f"{url} - {agent_config['log_context']} 실패: {error_msg}")
return None
print(f"⚠️ 에러 발생: {e}. {try_cnt}번째 재시도 중...")
await asyncio.sleep(30)
continue
return None
async def _extract_oauth_list_internal(url: str):
"""OAuth 리스트 추출 내부 함수 (재시도 큐에서 사용)"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🔎 OAuth 리스트 추출 시작: {target_url}")
prompt, model = get_prompt("auth")
agent_config = {
"url": target_url,
"log_context": "OAuth 리스트 추출",
"agent_params": {
"initial_actions": [{"go_to_url": {"url": target_url, 'new_tab': False}}],
"sensitive_data": GetSensitiveData(),
"task": (
"Navigate to the login page and identify all OAuth provider buttons (excluding Passkey). "
"DO NOT click any OAuth buttons or attempt to login. "
"Just find and list all available OAuth providers with their button texts or provider names. "
"Return a list of OAuth providers found on the login page."
),
"llm": CreateChatGoogle(config.GOOGLE_MODEL),
"planner_llm": (
CreateChatGoogle(config.GOOGLE_PLANNER_MODEL)
if config.GOOGLE_PLANNER_MODEL
and os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LIST")
else None
),
"controller": Controller(
output_model=model if not isinstance(model, str) else None,
exclude_actions=["search_google", "unknown_action", "unkown"],
),
"extend_system_message": prompt,
"extend_planner_system_message": prompt,
},
}
response = await _run_agent_with_retry(agent_config)
if not response:
return []
final_result = response.final_result()
if not final_result:
print("OAuth 리스트 추출 결과가 없습니다.")
return []
try:
data = json.loads(final_result)
print(final_result)
oauth_providers = data.get("sso_list", [])
if not oauth_providers:
print("❌ OAuth 제공자가 없습니다.")
logger(f"{url} - OAuth 제공자 없음: {final_result}")
return []
print(f"✅ OAuth 제공자 추출 완료: {oauth_providers}")
return oauth_providers
except (json.JSONDecodeError, KeyError) as e:
print(f"❌ 결과 파싱 실패: {e}")
logger(f"{url} 결과 파싱 실패: {final_result}")
return []
async def extract_oauth_list(url: str):
"""첫 번째 Agent: 로그인 페이지를 찾고 OAuth 리스트만 추출"""
try:
return await _extract_oauth_list_internal(url)
except Exception as e:
error_str = str(e).lower()
if any(
keyword in error_str
for keyword in [
"429",
"resource_exhausted",
"resourceexhausted",
"quota",
"rate limit",
"too many requests",
"exceeded",
"limit reached",
]
):
print(f"⚠️ API 쿼터 에러 발생, 재시도 큐에 추가: {url}")
task = RetryTask(
task_type="oauth_list",
url=url,
retry_count=1,
next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF),
)
await add_to_retry_queue(task)
return []
else:
raise e
async def _test_oauth_login_internal(url: str, oauth_provider: str):
"""OAuth 로그인 테스트 내부 함수 (재시도 큐에서 사용)"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🔐 {oauth_provider} 로그인 시작: {target_url}")
prompt, model = get_prompt(oauth_provider)
agent_config = {
"url": target_url,
"log_context": f"{oauth_provider} 로그인",
"agent_params": {
"initial_actions": [{"go_to_url": {"url": target_url, 'new_tab': False}}],
"sensitive_data": GetSensitiveData(),
"task": (
f"Navigate to the login page, find and click the {oauth_provider} OAuth button, "
f"then follow the complete OAuth login flow as far as possible with a real user account. "
f"Capture the final redirect URL after login completion. "
f"If login fails or encounters errors, report the issue. "
f"Focus only on {oauth_provider} - ignore other OAuth providers."
),
"llm": CreateChatGoogle(config.GOOGLE_MODEL),
"planner_llm": (
CreateChatGoogle(config.GOOGLE_PLANNER_MODEL)
if config.GOOGLE_PLANNER_MODEL
and os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN")
else None
),
"controller": Controller(
output_model=model if not isinstance(model, str) else None,
exclude_actions=["search_google", "unknown_action", "unkown"],
),
"extend_system_message": prompt,
"extend_planner_system_message": prompt,
},
}
response = await _run_agent_with_retry(agent_config)
if response and response.final_result():
final_result = response.final_result()
try:
import json
result_data = json.loads(final_result)
status = result_data.get("status", "")
if status == "success":
print(f"{oauth_provider} 로그인 완료")
logger(f"{url} - {oauth_provider} 로그인 결과: {final_result}")
return True
else:
print(f"{oauth_provider} 로그인 실패: {status}")
logger(f"{url} - {oauth_provider} 로그인 실패: {final_result}")
return False
except (json.JSONDecodeError, KeyError):
print(f"{oauth_provider} 결과 파싱 실패")
return False
print(f"{oauth_provider} 로그인 실패")
return False
async def test_oauth_login(url: str, oauth_provider: str):
"""두 번째 Agent: 특정 OAuth 제공자로 로그인 시도"""
try:
return await _test_oauth_login_internal(url, oauth_provider)
except Exception as e:
error_str = str(e).lower()
if any(
keyword in error_str
for keyword in [
"429",
"resource_exhausted",
"resourceexhausted",
"quota",
"rate limit",
"too many requests",
"exceeded",
"limit reached",
]
):
print(f"⚠️ API 쿼터 에러 발생, 재시도 큐에 추가: {oauth_provider} - {url}")
task = RetryTask(
task_type="oauth_login",
url=url,
oauth_provider=oauth_provider,
retry_count=1,
next_retry_time=datetime.now() + timedelta(seconds=INITIAL_BACKOFF),
)
await add_to_retry_queue(task)
return False
else:
raise e
async def start_retry_queue_processor():
"""재시도 큐 처리기를 백그라운드에서 시작"""
async def queue_processor():
while True:
try:
await process_retry_queue()
await asyncio.sleep(30) # 30초마다 큐 확인
except Exception as e:
print(f"❌ 재시도 큐 처리 중 에러: {e}")
await asyncio.sleep(60) # 에러 발생 시 1분 대기
# 백그라운드 태스크로 실행
asyncio.create_task(queue_processor())
print("🔄 재시도 큐 처리기 시작됨")
# 모듈 로딩 시 자동으로 백그라운드 처리기 시작
# (실제 애플리케이션에서는 main 함수에서 호출하는 것이 좋음)
def init_retry_system():
"""재시도 시스템 초기화"""
print("🔧 재시도 시스템 초기화 중...")
# 이 함수는 메인 애플리케이션에서 호출해야 함

View file

@ -0,0 +1,110 @@
"""
브라우저 리소스 정리를 위한 모듈
"""
import os
import shutil
import asyncio
from pathlib import Path
async def cleanup_browser_resources(agent=None, session=None, user_data_dir=None):
"""브라우저 관련 리소스를 정리하는 함수"""
print("🔄 브라우저 리소스 정리를 시작합니다...")
# 에이전트 리소스 정리
if agent:
try:
print("<EFBFBD> 에이전트 리소스 정리 중...")
# 브라우저 종료 대기 시간 설정
await asyncio.wait_for(agent.close(), timeout=10.0)
print("✅ 에이전트 리소스 정리 완료.")
except asyncio.TimeoutError:
print("⚠️ 에이전트 종료 시간 초과. 강제 종료합니다.")
except Exception as e:
print(f"⚠️ 에이전트 리소스 정리 실패: {e}")
# 세션 리소스 정리
if session:
try:
print("🔄 세션 리소스 정리 중...")
await asyncio.wait_for(session.close(), timeout=5.0)
print("✅ 세션 리소스 정리 완료.")
except asyncio.TimeoutError:
print("⚠️ 세션 종료 시간 초과.")
except Exception as e:
print(f"⚠️ 세션 리소스 정리 실패: {e}")
# 임시 스토리지 상태 파일 삭제
storage_state_temp_path = Path("./data/storage_state_temp.json").resolve()
if storage_state_temp_path.exists():
try:
print(f"<EFBFBD> 임시 스토리지 상태 파일 삭제 중: {storage_state_temp_path}")
storage_state_temp_path.unlink()
print("✅ 임시 스토리지 상태 파일 삭제 완료.")
except Exception as e:
print(f"⚠️ 임시 스토리지 상태 파일 삭제 실패: {e}")
# 임시 사용자 데이터 디렉토리 정리
if user_data_dir and os.path.exists(user_data_dir):
try:
print(f"🗑️ 임시 사용자 데이터 디렉토리 삭제 중: {user_data_dir}")
await asyncio.sleep(0.5) # 브라우저가 완전히 종료될 시간 제공
shutil.rmtree(user_data_dir)
print("✅ 임시 사용자 데이터 디렉토리 삭제 완료.")
except Exception as e:
print(f"⚠️ 임시 사용자 데이터 디렉토리 삭제 실패: {e}")
# userdata.dump 파일에서 기록된 디렉토리 정리
log_file = "./data/userdata.dump"
if os.path.exists(log_file):
try:
with open(log_file, "r") as f:
tmp_user_data_dir = f.read().strip()
if tmp_user_data_dir and os.path.exists(tmp_user_data_dir):
print(f"🗑️ 기록된 임시 사용자 데이터 디렉토리 삭제 중: {tmp_user_data_dir}")
await asyncio.sleep(0.5) # 브라우저가 완전히 종료될 시간 제공
shutil.rmtree(tmp_user_data_dir)
print("✅ 기록된 임시 사용자 데이터 디렉토리 삭제 완료.")
os.remove(log_file)
print("✅ userdata.dump 파일 삭제 완료.")
except Exception as e:
print(f"⚠️ userdata.dump 관련 정리 실패: {e}")
print("✅ 브라우저 리소스 정리가 완료되었습니다.")
def cleanup_all_running_tasks():
"""실행 중인 모든 asyncio 태스크를 정리"""
try:
loop = asyncio.get_running_loop()
tasks = [task for task in asyncio.all_tasks(loop) if not task.done()]
if tasks:
print(f"🔄 {len(tasks)}개의 실행 중인 태스크를 정리합니다...")
for task in tasks:
task.cancel()
# 태스크들이 정리될 때까지 잠시 대기
async def wait_for_tasks():
await asyncio.gather(*tasks, return_exceptions=True)
asyncio.create_task(wait_for_tasks())
print("✅ 모든 태스크 정리 완료.")
except RuntimeError:
# 이벤트 루프가 실행 중이 아닌 경우
pass
except Exception as e:
print(f"⚠️ 태스크 정리 중 오류: {e}")
async def emergency_cleanup():
"""긴급 종료 시 최소한의 리소스 정리"""
print("🚨 긴급 리소스 정리 실행 중...")
# 모든 태스크 취소
cleanup_all_running_tasks()
# 기본 리소스 정리
await cleanup_browser_resources()
print("✅ 긴급 리소스 정리 완료.")

View file

@ -0,0 +1,54 @@
import json
import os
from pathlib import Path
from browser_use import BrowserProfile
from dotenv import load_dotenv
# Load environment variables
load_dotenv(override=True)
def setup_proxy():
"""Configure proxy settings from environment variables."""
proxy_host = os.getenv("PROXY_HOST")
proxy_port = os.getenv("PROXY_PORT")
if proxy_host and proxy_port:
proxy_url = f"http://{proxy_host}:{proxy_port}"
print(f"🔗 Using proxy: {proxy_host}:{proxy_port}")
return proxy_url
else:
print("🔗 No proxy configured, using direct connection.")
return None
def get_browser_args():
"""Get browser arguments for enhanced compatibility and security."""
return [
# Security and isolation
"--disable-web-security",
"--disable-site-isolation-trials",
"--disable-features=IsolateOrigins,site-per-process",
"--ignore-certificate-errors",
"--ignore-ssl-errors",
"--allow-running-insecure-content",
# Performance and rendering
"--disable-features=VizDisplayCompositor",
"--disable-dev-shm-usage",
# Popup and automation
"--disable-popup-blocking",
"--disable-blink-features=AutomationControlled",
# Browser behavior
"--no-first-run",
"--no-service-autorun",
"--no-default-browser-check",
"--password-store=basic",
"--use-mock-keychain",
# Extensions
"--disable-extensions-file-access-check",
"--disable-extensions-http-throttling",
"--disable-component-extensions-with-background-pages",
# Language
f"--lang={os.getenv('LANG', 'en_US')}",
]

View file

@ -0,0 +1,134 @@
import os
import shutil
import tempfile
from lib.browser_use.func import *
from lib.utils.config import USER_DATA_DIR
# Initialize configuration
proxy_url = setup_proxy()
async def GetProfile(headless=False):
"""브라우저 프로필을 생성하고 임시 사용자 데이터 디렉토리를 관리합니다."""
user_data_dir = None
tmp_user_data_dir = None
if USER_DATA_DIR and os.path.isdir(USER_DATA_DIR):
try:
tmp_user_data_dir = tempfile.mkdtemp(prefix="browser_use_")
print(f"🔧 기본 사용자 데이터 디렉토리: {USER_DATA_DIR}")
print(f"🔧 임시 사용자 데이터 디렉토리: {tmp_user_data_dir}")
log_file = os.path.join("./data", "userdata.dump")
if not os.path.exists("./data"):
os.makedirs("./data")
# 기존 로그 파일이 있다면 해당 디렉토리 정리
if os.path.exists(log_file):
try:
with open(log_file, "r") as f:
old_tmp_dir = f.read().strip()
if old_tmp_dir and os.path.exists(old_tmp_dir):
shutil.rmtree(old_tmp_dir)
print(f"🗑️ 이전 임시 디렉토리 정리: {old_tmp_dir}")
except Exception as e:
print(f"⚠️ 이전 임시 디렉토리 정리 실패: {e}")
os.remove(log_file)
# 새 임시 디렉토리 경로 로깅
with open(log_file, "w") as f:
f.write(tmp_user_data_dir)
# 사용자 데이터 디렉토리 복사
if os.path.exists(tmp_user_data_dir):
shutil.rmtree(tmp_user_data_dir)
shutil.copytree(
USER_DATA_DIR,
tmp_user_data_dir,
dirs_exist_ok=False,
ignore_dangling_symlinks=True,
)
user_data_dir = tmp_user_data_dir
print(f"✅ 사용자 데이터 디렉토리 복사 완료: {user_data_dir}")
except Exception as e:
print(f"❌ 사용자 데이터 디렉토리 복사 실패: {e}")
# 실패 시 임시 디렉토리 정리
if tmp_user_data_dir and os.path.exists(tmp_user_data_dir):
try:
shutil.rmtree(tmp_user_data_dir)
except Exception:
pass
tmp_user_data_dir = None
user_data_dir = None
print(proxy_url)
profile = BrowserProfile(
# Security settings
# disable_security=True,
# Display settings
headless=headless,
# Data persistence
user_data_dir=user_data_dir,
# Network settings
proxy={"server": proxy_url} if proxy_url else None,
# Additional arguments
args=[
"--proxy-server=" + proxy_url if proxy_url else "",
# "--disable-features=Translate,PasswordManagerDefaultEnabled",
],
ignore_default_args=[
# "--disable-datasaver-prompt",
# "--disable-component-extensions-with-background-pages",
# "--disable-prompt-on-repost",
# "--safeBrowse-disable-auto-update",
# "--install-autogenerated-theme=0,0,0",
# "--disable-speech-synthesis-api",
# "--ash-no-nudges",
# "--test-type=gpu",
# "--noerrdialogs",
# "--disable-external-intent-requests",
# "--disable-breakpad",
# "--disable-backgrounding-occluded-windows",
# "--export-tagged-pdf",
# "--disable-focus-on-load",
# "--suppress-message-center-popups",
# "--disable-renderer-backgrounding",
# "--hide-crash-restore-bubble",
# "--disable-back-forward-cache",
# "--allow-legacy-extension-manifests",
# # "--disable-field-trial-config", # 왜 이걸 끄면 웹사이트가 압축된 형태로 보이는 진 모르곘음
# "--disable-popup-blocking",
# "--disable-background-networking",
# "--no-first-run",
# "--disable-blink-features=AutomationControlled",
# "--password-store=basic",
# "--enable-network-information-downlink-max",
# "--allow-pre-commit-input",
# "--enable-features=NetworkService,NetworkServiceInProcess",
# "--metrics-recording-only",
# "--silent-debugger-extension-api",
# "--disable-features=AcceptCHFrame,AutoExpandDetailsElement,AvoidUnnecessaryBeforeUnloadCheckSync,CertificateTransparencyComponentUpdater,DestroyProfileOnBrowserClose,DialMediaRouteProvider,ExtensionManifestV2Disabled,GlobalMediaControls,HttpsUpgrades,ImprovedCookieControls,LazyFrameLoading,LensOverlay,MediaRouter,PaintHolding,ThirdPartyStoragePartitioning,Translate,AutomationControlled,BackForwardCache,OptimizationHints,ProcessPerSiteUpToMainFrameThreshold,InterestFeedContentSuggestions,CalculateNativeWinOcclusion,HeavyAdPrivacyMitigations,PrivacySandboxSettings4,AutofillServerCommunication,CrashReporting,OverscrollHistoryNavigation,InfiniteSessionRestore,ExtensionDisableUnsupportedDeveloper",
# "--disable-ipc-flooding-protection",
# "--disable-hang-monitor",
# "--disable-dev-shm-usage",
# "--disable-client-side-phishing-detection",
# "--log-level=2",
# "--generate-pdf-document-outline",
# "--disable-speech-api",
# "--disable-search-engine-choice-screen",
# "--no-service-autorun",
# "--no-pings",
# "--disable-component-update",
# '--simulate-outdated-no-au="Tue, 31 Dec 2099 23:59:59 GMT"',
# "--disable-background-timer-throttling",
# "--use-mock-keychain",
# "--disable-features=IsolateOrigins,site-per-process",
# 아래는 기존 예시에 있던 인자들입니다. 필요에 따라 유지하거나 제거하세요.
"--enable-automation",
"--disable-extensions",
"--hide-scrollbars",
],
)
return [profile, tmp_user_data_dir] if tmp_user_data_dir else [profile]

View file

@ -0,0 +1,17 @@
from typing import List
from pydantic import BaseModel
# 출력 모델
class OAuth(BaseModel):
provider: str
oauth_uri: str = "" # OAuth 리스트 추출 단계에서는 URI가 없을 수 있음
class OAuthList(BaseModel):
oauth_providers: List[str] # 이제 문자열 배열로 변경
# 기존 모델 유지 (backward compatibility)
BaseModel = OAuthList

View file

@ -0,0 +1,191 @@
import asyncio
import csv
import os
from lib.browser_use.agents import (
extract_oauth_list,
get_retry_queue_status,
start_retry_queue_processor,
test_oauth_login,
)
from lib.browser_use.cleanup import cleanup_browser_resources
from lib.utils import is_html_url, notify_backend, read_lines_between
from lib.utils.progress import (
current_progress,
is_shutdown_requested,
load_progress,
progress_file,
save_progress,
)
async def scan_one_url(url: str, skip_html_check: bool = False):
"""URL 스캔 통합 함수: OAuth 리스트 추출 → 개별 OAuth 로그인 시도"""
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🚀 스캔 시작: {target_url}")
# Backend에 스캔 시작을 알림
notify_backend(target_url)
# 1) URL이 HTML 페이지인지 확인
if not is_html_url(target_url) and not skip_html_check:
print(f"{target_url} 은(는) HTML이 아닙니다. 스킵합니다.")
return
# 1단계: OAuth 리스트 추출
oauth_entries = await extract_oauth_list(target_url)
if not oauth_entries:
print(f"{target_url}에서 OAuth 제공자를 찾을 수 없습니다.")
return
print("-" * 50)
print(f"🔗 스캔 URL: {url}")
print(f"🔐 발견된 OAuth 제공자들: {len(oauth_entries)}")
for entry in oauth_entries:
print(f" - {entry}")
print("-" * 50)
# CSV에 OAuth 리스트 저장
csv_file = "./data/oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri", "login_tested"])
for entry in oauth_entries:
writer.writerow([url, entry, "", "pending"])
# 2단계: 각 OAuth 제공자별로 개별 로그인 시도
for i, oauth_entry in enumerate(oauth_entries):
print(f"\n🔄 OAuth 로그인 테스트 {i+1}/{len(oauth_entries)}: {oauth_entry}")
# OAuth 간 대기 시간
if i > 0:
print("⏳ OAuth 테스트 간 대기 중 (30초)...")
await asyncio.sleep(30)
# 개별 OAuth 로그인 시도
success = await test_oauth_login(url, oauth_entry)
# 결과를 CSV에 업데이트 (간단하게 로그만 남김)
status = "success" if success else "failed"
print(f"📝 {oauth_entry} 로그인 결과: {status}")
async def main_loop(
filepath: str, start_line: int, end_line: int, skip_html_check: bool = False
):
"""지정된 URL 목록에 대해 스캔을 실행하는 메인 루프"""
try:
# 재시도 큐 처리기 시작
await start_retry_queue_processor()
target_list = read_lines_between(
filepath=filepath, start_line=start_line, end_line=end_line
)
# 전체 목록 길이를 저장 (재개 시에도 유지되어야 함)
total_count = len(target_list)
current_progress["total"] = total_count
current_progress["start_line"] = start_line
current_progress["current_index"] = 0
prev_progress = load_progress()
if prev_progress and prev_progress.get("start_line") == start_line:
print("📋 이전 진행 상황을 발견했습니다:")
print(
f" - 이전 완료: {prev_progress['current_index']}/{prev_progress['total']}"
)
print(f" - 마지막 처리: {prev_progress.get('current_url', 'N/A')}")
resume = input("이어서 진행하시겠습니까? (y/n): ").lower().strip()
if resume == "y":
start_index = prev_progress.get("current_index", 0)
current_progress["current_index"] = start_index
# 전체 개수는 원래 목록 길이로 유지
current_progress["total"] = total_count
target_list = target_list[start_index:]
print(f"{start_index}번째부터 재개합니다.")
for i, url in enumerate(target_list):
# 종료 요청 체크
if is_shutdown_requested():
print("🛑 종료 요청으로 인해 스캔을 중단합니다.")
break
# current_index는 전체 목록에서의 현재 위치를 나타냄
current_url_index = current_progress["current_index"]
current_progress["current_url"] = url
print(
f"\n🔄 Processing {current_url_index + 1}/{current_progress['total']}: {url}"
)
print(
f"📍 {os.path.basename(filepath)}{start_line + current_url_index}번째 줄"
)
# 재시도 큐 상태 확인 및 출력
retry_status = await get_retry_queue_status()
if retry_status["queue_length"] > 0:
print(f"⏳ 재시도 큐에 {retry_status['queue_length']}개 작업 대기 중")
if i > 0:
print("⏳ API 쿼터 보호를 위해 30초 대기 중...")
# 대기 중에도 종료 요청 체크
for _ in range(30):
if is_shutdown_requested():
print("🛑 대기 중 종료 요청으로 스캔을 중단합니다.")
return
await asyncio.sleep(1)
try:
await scan_one_url(url, skip_html_check=skip_html_check)
except Exception as e:
print(f"{url} 스캔 중 오류 발생: {e}")
continue
# 스캔 완료 후 재시도 큐 상태 확인
retry_status_after = await get_retry_queue_status()
if retry_status_after["queue_length"] > 0:
print(
f"📊 스캔 완료 후 재시도 큐 상태: {retry_status_after['queue_length']}개 작업 대기 중"
)
# 다음 URL로 진행
current_progress["current_index"] = current_url_index + 1
save_progress()
# 모든 URL 처리 완료 후 재시도 큐가 빌 때까지 대기
if not is_shutdown_requested():
print("\n🔄 모든 URL 처리 완료. 재시도 큐 처리 대기 중...")
while True:
if is_shutdown_requested():
print("🛑 재시도 큐 대기 중 종료 요청으로 중단합니다.")
break
retry_status = await get_retry_queue_status()
if retry_status["queue_length"] == 0:
break
print(
f"⏳ 재시도 큐에 {retry_status['queue_length']}개 작업 남음. 30초 후 다시 확인..."
)
# 대기 중에도 종료 요청 체크
for _ in range(30):
if is_shutdown_requested():
print("🛑 재시도 큐 대기 중 종료 요청으로 중단합니다.")
break
await asyncio.sleep(1)
if not is_shutdown_requested():
print(f"\n🎉 모든 스캔이 완료되었습니다! ({total_count}개 URL)")
print("🎉 재시도 큐도 모두 처리되었습니다!")
else:
print("\n🛑 종료 요청으로 인해 스캔이 중단되었습니다.")
else:
print("\n🛑 종료 요청으로 인해 스캔이 중단되었습니다.")
finally:
# 항상 리소스 정리
print("🔄 브라우저 리소스를 정리합니다...")
await cleanup_browser_resources()

View file

@ -0,0 +1,22 @@
# read json file .sensitive.json
import json
import os
def GetSensitiveData():
"""
Reads sensitive data from a .sensitive.json file in the current directory.
Returns:
dict: A dictionary containing the sensitive data.
"""
file_path = os.path.join(os.getcwd(), ".sensitive.json")
if not os.path.exists(file_path):
return None
with open(file_path, "r") as file:
sensitive_data = json.load(file)
return sensitive_data

2
src/lib/llm/__init__.py Normal file
View file

@ -0,0 +1,2 @@
from lib.llm.create import *
from lib.llm.prompt import *

19
src/lib/llm/create.py Normal file
View file

@ -0,0 +1,19 @@
from browser_use.llm import ChatGoogle
from dotenv import load_dotenv
# 환경 변수 로드 (GOOGLE_API_KEY 필요)
load_dotenv(override=True)
def CreateChatGoogle(model: str):
"""Browser Use용 Google 모델 생성"""
if model == "fallback":
print("⚠️ Fallback 모델을 사용합니다. Environment 변수를 확인하세요.")
print("⚠️ Model gemini-2.0-flash-lite를 사용합니다.")
model = "gemini-2.0-flash-lite"
return ChatGoogle(
model=model,
temperature=0.0
# Browser Use는 내부적으로 재시도 로직을 처리합니다
)

View file

@ -0,0 +1,46 @@
from typing import Type, Union
from pydantic import BaseModel
def get_prompt(type: str) -> tuple[str, Type[BaseModel]] | str:
"""
Prompt를 반환합니다.
:param type: 'auth' {Auth List} 또는 'google' {OAuth Provider}, 'meta' {OAuth Provider} 지정합니다.
:return: 해당하는 프롬프트 문자열 또는 (프롬프트, 모델) 튜플
"""
if type.lower() == "auth":
from lib.llm.prompt._get_oauth import model, prompt
return prompt, model
elif type.lower() in ["google", "google account"]:
from lib.llm.prompt.google import model, prompt
return prompt, model
elif type.lower() in ["microsoft", "microsoftonline"]:
from lib.llm.prompt.microsoft import model, prompt
return prompt, model
elif type.lower() in ["meta", "facebook"]:
from lib.llm.prompt.facebook import model, prompt
return prompt, model
elif type.lower() in ["apple"]:
from lib.llm.prompt.apple import model, prompt
return prompt, model
elif type.lower() in ["github"]:
from lib.llm.prompt.github import model, prompt
return prompt, model
else:
from lib.llm.prompt._fallback import model, prompt
return prompt, model

View file

@ -0,0 +1,2 @@
from lib.llm.prompt._fallback.model import model
from lib.llm.prompt._fallback.prompt import prompt

View file

@ -0,0 +1,9 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = (
None # "success", "mfa_required", "blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -0,0 +1,66 @@
from dotenv import load_dotenv
import os
load_dotenv()
google_id = os.getenv("GOOGLE_ID")
google_password = os.getenv("GOOGLE_PASSWORD")
naver_id = os.getenv("NAVER_ID")
naver_password = os.getenv("NAVER_PASSWORD")
facebook_id = os.getenv("FACEBOOK_ID")
facebook_password = os.getenv("FACEBOOK_PASSWORD")
github_id = os.getenv("GITHUB_ID")
github_password = os.getenv("GITHUB_PASSWORD")
microsoft_id = os.getenv("MICROSOFT_ID")
microsoft_password = os.getenv("MICROSOFT_PASSWORD")
# Extended planner prompt
prompt = f"""
You are a web automation agent.
Your task is to visit the given domain and perform a full login via the **SSO Login button**, following all steps strictly as described below.
Instructions:
1. If any cookie or privacy popups appear, dismiss or accept them.
2. Navigate through the site's UI to find the **login or sign-in page** (e.g., via buttons like "Log In", "Sign In", "Get Started").
3. Click the **SSO login button**.
4. Check if the user is **already logged and immediately redirected back to the original site** without showing a login screen.
- If so, treat the login as successful and return immediately.
5. If login proceeds without interruptions, complete the login and **immediately close the browser window**. Do not perform any further actions.
6. Login is considered successful if:
- You are redirected to a page that indicates successful login (e.g., a welcome page, dashboard, or account page).
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Credentials to use for login:
- Google `{google_id}` / `{google_password}`
- Naver `{naver_id}` / `{naver_password}`
- GitHub `{github_id}` / `{github_password}`
- facebook `{facebook_id}` / `{facebook_password}`
- Microsoft `{microsoft_id}` / `{microsoft_password}`
Constraints:
- Do NOT use search engines or guess URLs.
- Do NOT proceed with login if:
- CAPTCHA or MFA appears
- If the user is already logged and redirected back automatically, stop there and report success.
- If the login page cannot be found, return "login_page_not_found".
- If the login button is not found, return "sso_not_found".
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Final Output:
Return the result in the following format only:
```json
{{
"msg": "login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -0,0 +1,2 @@
from lib.llm.prompt._get_oauth.model import model
from lib.llm.prompt._get_oauth.prompt import prompt

View file

@ -0,0 +1,7 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
url: str | None = None
sso_list: list[str] = [] # List of SSO providers found on the login page

View file

@ -1,11 +1,4 @@
from pydantic import BaseModel prompt = """
class FindLoginPageResponse(BaseModel):
msg: str | None = None
url: str | None = None
sso_list: list[str] = [] # List of SSO providers found on the login page
get_sso_list_task = """
You are an expert in finding login pages. You are an expert in finding login pages.
Your task is to navigate to the login page of the given URL. Follow the steps below strictly and return results only in the specified format. Your task is to navigate to the login page of the given URL. Follow the steps below strictly and return results only in the specified format.
@ -31,7 +24,7 @@ Your task is to navigate to the login page of the given URL. Follow the steps be
2. SSO BUTTON IDENTIFICATION 2. SSO BUTTON IDENTIFICATION
- On the login page, look for the following social login (SSO) buttons: - On the login page, look for the following social login (SSO) buttons:
- Google, GitHub, Facebook, LinkedIn, Microsoft, Naver, Slack, Etc. - Google, GitHub, Facebook, Microsoft, Naver, Etc.
- Proceed only if it is clearly an **actual SSO button**. - Proceed only if it is clearly an **actual SSO button**.
- Exclude the following: - Exclude the following:
- Passkey-related buttons - Passkey-related buttons

View file

@ -0,0 +1,2 @@
from lib.llm.prompt.apple.model import model
from lib.llm.prompt.apple.prompt import prompt

View file

@ -0,0 +1,9 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = (
None # "success", "mfa_required", "apple_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -0,0 +1,62 @@
import os
# Extended planner prompt
prompt = f"""
You are a web automation agent.
Your task is to visit the given domain and perform a full login via the **Apple SSO button**, following all steps strictly as described below.
Target: Find a login page inside this domain that allows "Sign in with Apple", and use it to complete login via Apple.
Instructions:
1. If any cookie or privacy popups appear, dismiss or accept them.
2. Navigate through the site's UI to find the **login or sign-in page** (e.g., via buttons like "Log In", "Sign In", "Get Started").
- Only follow links within the same domain.
3. On the login page, look for a clearly labeled **Apple SSO button** typically labeled as:
- "Continue with Apple"
- "Sign in with Apple"
- or a button with the Apple icon
4. Click the **Apple login button**.
- The Apple login flow MUST open in a **new browser tab** (not a new window or popup).
- If the login opens in a new **window** or **popup**, do NOT continue. Immediately stop and return the appropriate status.
5. Check if the user is **already logged in to Apple and immediately redirected back to the original site** without showing a Apple login screen.
- If so, treat the login as successful and return immediately.
6. If redirected to the Apple login page:
a. If a **CAPTCHA**, complete it.
b. If a **MFA prompt**, or a request for **ID/password entry** appears, do NOT proceed - Immediately stop and return the appropriate status.
- If a **"Continue"**, **"Trust"**, **"Authorize"**, or **"Allow"** button is displayed, click it to grant consent.
7. If login proceeds without interruptions, complete the login and **immediately close the browser window**. Do not perform any further actions.
8. Login is considered successful if:
- You are redirected to a page that indicates successful login (e.g., a welcome page, dashboard, or account page).
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Credentials to use for Apple login:
- Email: {os.getenv("APPLE_EMAIL", "")}
- Password: {os.getenv("APPLE_PASSWORD", "")}
Constraints:
- Do NOT use search engines or guess URLs.
- Do NOT use autofill, saved sessions, or cookies.
- Do NOT proceed with login if:
- The login opens in a new window (only tabs are allowed)
- CAPTCHA or MFA appears
- ID/password input is required
- If the user is already logged in to Apple and redirected back automatically, stop there and report success.
- If the login page cannot be found, return "login_page_not_found".
- If the Apple login button is not found, return "sso_not_found".
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Final Output:
Return the result in the following format only:
```json
{{
"msg": "Apple login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "apple_blocked" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -0,0 +1,2 @@
from lib.llm.prompt.facebook.model import model
from lib.llm.prompt.facebook.prompt import prompt

View file

@ -0,0 +1,9 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = (
None # "success", "mfa_required", "facebook_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -0,0 +1,71 @@
import os
# Extended planner prompt
prompt = f"""
You are a web automation agent.
Your task is to visit the given domain and perform a full login via the **Facebook SSO button**, following all steps strictly as described below.
Target: Find a login page inside this domain that allows "Sign in with Facebook", and use it to complete login via Facebook.
Instructions:
1. **CRITICAL**: Wait 3 seconds for page to load and interactive elements to appear
2. If any cookie or privacy popups appear, dismiss or accept them by looking for buttons like "Accept All", "Accept", "Allow All"
3. If any promotional/discount popups appear, dismiss them by looking for "Close", "X", or "No Thanks" buttons
4. Navigate through the site's UI to find the **login or sign-in page**:
- Look for "My Account", "Log In", "Sign In", "Get Started" buttons
- Only follow links within the same domain
5. On the login page, look for a clearly labeled **Facebook SSO button** typically labeled as:
- "Continue with Facebook"
- "Sign in with Facebook"
- "Login with Facebook"
- or a button with the Facebook 'f' icon
6. Click the **Facebook login button**.
- The Facebook login flow will open in a new browser tab or window (this is normal OAuth behavior)
- Continue with the Facebook login process in the new tab/window
7. **Facebook Login Flow Handling:**
- **If already logged in**: You may see a consent screen with buttons like "Continue as [Name]", "Continue", "Allow" - CLICK these buttons to proceed
- **If CAPTCHA appears on registration form**: Try clicking "Create account" button multiple times (up to 5 attempts) as some CAPTCHAs may not block form submission
- **If form submission fails after 5 attempts**: Report "success" anyway if all fields were filled and Facebook OAuth was completed successfully
- **Registration completion is optional**: The main goal is Facebook OAuth completion, not site registration
8. **After Facebook OAuth consent/login**:
- Wait for redirection back to the original site
- **CRITICAL**: If redirected to a registration/signup form with fields like:
- "Last name", "Email address", "Password"
- "Create account" or "Sign up" button
- ANY additional information form
- **MUST complete the registration process:**
- Fill in required fields with appropriate test data:
- Last name: "TestUser"
- Email: Use the Facebook email or "test@example.com"
- Password: "TestPassword123!"
- Click "Create account", "Sign up", or "Complete registration" button
- Only after completing ALL steps, record the final URL as successful login
9. If all steps are completed successfully, close your browser window.
10. Login is considered successful if:
- You are redirected to a page that indicates successful login (e.g., a welcome page, dashboard, or account page).
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Credentials to use for Facebook login (if needed):
- Email/Phone: {os.getenv("FACEBOOK_EMAIL", "")}
- Password: {os.getenv("FACEBOOK_PASSWORD", "")}
Constraints:
- Do NOT use search engines or guess URLs
- Do NOT use autofill, saved sessions, or cookies
- Do NOT proceed with login if CAPTCHA or MFA appears
- **ALWAYS complete any additional registration forms** after Facebook OAuth
- **Fill required fields** with test data if signup form appears
- **Only return "success" after completing ALL registration steps**
- If the login page cannot be found, return "login_page_not_found"
- If the Facebook login button is not found, return "sso_not_found"
Final Output:
Return the result in the following format only:
```json
{{
"msg": "Facebook login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "idpw_required" | "facebook_blocked" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -0,0 +1,2 @@
from lib.llm.prompt.github.model import model
from lib.llm.prompt.github.prompt import prompt

View file

@ -0,0 +1,9 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = (
None # "success", "mfa_required", "github_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -0,0 +1,81 @@
import os
# Extended planner prompt
prompt = f"""
You are a web automation agent.
Your task is to visit the given domain and perform a full login via the **GitHub SSO button**, following all steps strictly as described below.
Target: Find a login page inside this domain that allows "Sign in with GitHub", and use it to complete login via GitHub.
Instructions:
1. If any cookie or privacy popups appear, dismiss or accept them.
2. Navigate through the site's UI to find the **login or sign-in page** (e.g., via buttons like "Log In", "Sign In", "Get Started").
- Only follow links within the same domain.
- If a "Sign Up" or "Create Account" page appears instead, it is acceptable **as long as it includes a GitHub SSO option**.
3. On the login or sign-up page, look for a clearly labeled **GitHub SSO button** typically labeled as:
- "Continue with GitHub"
- "Sign in with GitHub"
- or a button with the GitHub logo
4. Click the **GitHub login button**.
- The GitHub login flow MUST open in a **new browser tab** (not a new window or popup).
- If the login opens in a new **window** or **popup**, do NOT continue. Immediately stop and return the appropriate status.
5. Check if the user is **already logged in to GitHub and immediately redirected back to the original site** without showing a GitHub login screen.
- If so, treat the login as successful and return immediately.
6. If redirected to the GitHub login page:
a. Wait for the username or email input field, then enter the email: {os.getenv("GITHUB_EMAIL", "")}
b. Click the "Continue" or "Next" button if present.
c. Enter the password: {os.getenv("GITHUB_PASSWORD", "")}
d. Click the "Sign in" button.
e. If a page appears asking to "Authorize" access for the application, click the "Authorize" button.
- GitHub may take a while to redirect after authorization, so please wait patiently.
- If a CAPTCHA, MFA prompt, or other interruption appears, do NOT proceed.
- If login fails due to incorrect credentials or authentication errors, treat as `"idpw_required"` and stop.
- Immediately stop and return the appropriate status.
7. If login proceeds without interruptions, wait for redirection back to the original site and record the final URL.
8. Close your browser window after the login is completed.
9. Login is considered successful if:
- You are redirected to a page that indicates successful login (e.g., a welcome page, dashboard, or account page).
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Credentials to use for GitHub login:
- Email: {os.getenv("GITHUB_EMAIL", "")}
- Password: {os.getenv("GITHUB_PASSWORD", "")}
Constraints:
- Do NOT use search engines or guess URLs.
- Do NOT use autofill, saved sessions, or cookies.
- Do NOT proceed with login if:
- The login opens in a new window (only tabs are allowed)
- CAPTCHA or MFA appears
- ID/password input is required and cannot be autofilled
- If the user is already logged in to GitHub and redirected back automatically, stop there and report success.
- If the login page cannot be found, return "login_page_not_found".
- If the GitHub login button is not found, return "sso_not_found".
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Final Output:
Return the result in the following format only:
```json
{{
"msg": "GitHub login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "github_blocked" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -0,0 +1,2 @@
from lib.llm.prompt.google.model import model
from lib.llm.prompt.google.prompt import prompt

View file

@ -0,0 +1,9 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = (
None # "success", "mfa_required", "google_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -1,12 +1,7 @@
from pydantic import BaseModel import os
from lib.config import GOOGLE_ID, GOOGLE_PASSWORD
class LoginGoogleResponse(BaseModel): # Extended planner prompt
msg: str | None = None prompt = f"""
status: str | None = None # "success", "mfa_required", "google_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
final_url: str | None = None
login_google_task = f"""
You are a web automation agent. You are a web automation agent.
Your task is to visit the given domain and perform a full login via the **Google SSO button**, following all steps strictly as described below. Your task is to visit the given domain and perform a full login via the **Google SSO button**, following all steps strictly as described below.
@ -28,13 +23,19 @@ Instructions:
5. Check if the user is **already logged in to Google and immediately redirected back to the original site** without showing a Google login screen. 5. Check if the user is **already logged in to Google and immediately redirected back to the original site** without showing a Google login screen.
- If so, treat the login as successful and return immediately. - If so, treat the login as successful and return immediately.
6. If redirected to the Google login page: 6. If redirected to the Google login page:
- If a **CAPTCHA**, **MFA prompt**, or a request for **ID/password entry** appears, do NOT proceed. a. Wait for the username or email input field, then enter the email: {os.getenv("GOOGLE_EMAIL", "")}
- Immediately stop and return the appropriate status. b. Click the "Continue" or "Next" button if present. (If still on the same page, reapeat step a)
c. Wait for the password input field, then enter the password: {os.getenv("GOOGLE_PASSWORD", "")}
d. Click the "Sign in" or "Next" button.
7. If login proceeds without interruptions, wait for redirection back to the original site and record the final URL. 7. If login proceeds without interruptions, wait for redirection back to the original site and record the final URL.
8. Close your browser window after the login is completed.
9. Login is considered successful if:
- You are redirected to a page that indicates successful login (e.g., a welcome page, dashboard, or account page).
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Credentials to use for Google login: Credentials to use for Google login:
- Email: {GOOGLE_ID} - Email: {os.getenv("GOOGLE_EMAIL", "")}
- Password: {GOOGLE_PASSWORD} - Password: {os.getenv("GOOGLE_PASSWORD", "")}
Constraints: Constraints:
- Do NOT use search engines or guess URLs. - Do NOT use search engines or guess URLs.
@ -42,7 +43,6 @@ Constraints:
- Do NOT proceed with login if: - Do NOT proceed with login if:
- The login opens in a new window (only tabs are allowed) - The login opens in a new window (only tabs are allowed)
- CAPTCHA or MFA appears - CAPTCHA or MFA appears
- ID/password input is required
- If the user is already logged in to Google and redirected back automatically, stop there and report success. - If the user is already logged in to Google and redirected back automatically, stop there and report success.
- If the login page cannot be found, return "login_page_not_found". - If the login page cannot be found, return "login_page_not_found".
- If the Google login button is not found, return "sso_not_found". - If the Google login button is not found, return "sso_not_found".

View file

@ -0,0 +1,2 @@
from lib.llm.prompt.microsoft.model import model
from lib.llm.prompt.microsoft.prompt import prompt

View file

@ -0,0 +1,9 @@
from pydantic import BaseModel
class model(BaseModel):
msg: str | None = None
status: str | None = (
None # "success", "mfa_required", "microsoft_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
)
final_url: str | None = None

View file

@ -0,0 +1,60 @@
import os
# This code snippet is used to generate a prompt for a web automation agent that performs Microsoft SSO login.
prompt = f"""
당신은 자동화 에이전트입니다.
당신의 임무는 주어진 도메인에 방문하여 아래에 엄격히 설명된 모든 단계를 따라 **Microsoft SSO 버튼** 통해 전체 로그인을 수행하는 것입니다.
목표: 도메인 내에서 "Microsoft로 로그인" 가능한 로그인 페이지를 찾아 Microsoft을 통해 로그인을 완료하세요.
지침:
1. 쿠키 또는 개인정보 팝업이 나타나면 닫거나 수락하세요.
2. 사이트의 UI를 탐색하여 **로그인 또는 로그인 페이지**(: "로그인", "Sign In", "Get Started" 같은 버튼) 찾으세요.
- 동일한 도메인 내의 링크만 따라가세요.
3. 로그인 페이지에서 명확하게 표시된 **Microsoft SSO 버튼** 찾으세요. 일반적으로 다음과 같이 표시됩니다:
- "Continue with Microsoft"
- "Sign in with Microsoft"
- or a button with the Microsoft logo (usually four squares)
4. **Microsoft 로그인 버튼** 클릭하세요.
- Microsoft 로그인 플로우는 반드시 ** 브라우저 **에서 열려야 합니다 ( 창이나 팝업이 아님).
- 로그인이 ****이나 **팝업**에서 열리면, 즉시 중단하고 적절한 상태를 반환하세요.
5. 사용자가 **이미 Microsoft에 로그인되어 있고 즉시 원래 사이트로 리디렉션**된다면,
- 경우 로그인이 성공한 것으로 간주하고 즉시 반환하세요.
6. Microsoft 로그인 페이지로 리디렉션된 경우:
- **CAPTCHA**, **MFA 프롬프트** 요청이 나타나면 진행하지 마세요.
- 즉시 중단하고 적절한 상태를 반환하세요.
7. 로그인에 방해가 없다면, 원래 사이트로 리디렉션될 때까지 기다리고 최종 URL을 기록하세요.
8. 로그인 되어있지 않으면 아래의 EMAIL과 PASSWORD를 사용하여 로그인하세요:
- Email: {os.getenv("MICROSOFT_EMAIL", "")}
- Password: {os.getenv("MICROSOFT_PASSWORD", "")}
9. 로그인 완료 브라우저 창을 닫으세요.
10. Login is considered successful if:
- You are redirected to a page that indicates successful login (e.g., a welcome page, dashboard, or account page).
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
제약 사항:
- 검색 엔진을 사용하거나 URL을 추측하지 마세요.
- 자동완성, 저장된 세션 또는 쿠키를 사용하지 마세요.
- 다음과 같은 경우 로그인 절차를 진행하지 마세요:
- 로그인이 창에서 열릴 (탭만 허용)
- CAPTCHA 또는 MFA가 나타날
- ID/비밀번호 입력이 필요하지만 자동입력이 불가한 경우
- 사용자가 이미 Microsoft에 로그인되어 자동으로 리디렉션된다면, 즉시 성공으로 보고 종료하세요.
- 로그인 페이지를 찾을 없으면 "login_page_not_found" 반환하세요.
- Microsoft 로그인 버튼을 찾을 없으면 "sso_not_found" 반환하세요.
- 회원가입 페이지와 같은 화면이 나타나면 성공적인 로그인으로 간주하고 즉시 종료하세요.
최종 출력:
다음 형식으로만 결과를 반환하세요:
```json
{{
"msg": "Microsoft login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "microsoft_blocked" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -0,0 +1,7 @@
# export from show_info
from lib.utils.agent_info import *
from lib.utils.config import *
from lib.utils.data import *
from lib.utils.parsing.is_html import *
from lib.utils.parsing.read_txt import *

View file

@ -0,0 +1,64 @@
import os
from dotenv import load_dotenv
from lib.utils.config import (
BACKEND_URL,
GOOGLE_API_KEY,
GOOGLE_MODEL,
GOOGLE_PLANNER_MODEL,
)
load_dotenv(override=True)
def show_info():
print("🔧 환경 설정:")
print(browser_use_version())
print(f"🔗 Backend URL: {BACKEND_URL}")
print(
f"🔑 Google API Key: {'*' * (len(GOOGLE_API_KEY) - 4) + GOOGLE_API_KEY[-4:] if GOOGLE_API_KEY else None}"
)
print(f"🌐 Google Model: {GOOGLE_MODEL}")
print(f"🌐 Google Planner Model: {GOOGLE_PLANNER_MODEL}")
def browser_use_version():
try:
# run uv pip show browser-use
import subprocess
result = subprocess.run(
["uv", "pip", "show", "browser-use"],
capture_output=True,
text=True,
check=True,
)
print("📦 Browser Use 패키지 정보:")
return result.stdout.strip()
except ImportError:
return None
def env_cheker():
if GOOGLE_API_KEY is None:
raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.")
if GOOGLE_PLANNER_MODEL != None and (
not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LOGIN")
or not os.getenv("ENABLE_PLANNER_MODEL_OAUTH_LIST")
):
print(
"⚠️ GOOGLE_PLANNER_MODEL이 설정되어 있지만, ENABLE_PLANNER_MODEL_OAUTH_LOGIN 또는 ENABLE_PLANNER_MODEL_OAUTH_LIST가 활성화되지 않았습니다."
)
print(
"⚠️ Planner 모델을 사용하려면 .env 파일에서 ENABLE_PLANNER_MODEL_OAUTH_LOGIN과 ENABLE_PLANNER_MODEL_OAUTH_LIST를 true로 설정하세요."
)
print(
"‼️ 하지만 현재 Planner 모델을 사용하는 것이 권장되지 않습니다. 이 기능은 오작동을 일으킬 수 있습니다."
)
print("⚠️ 이 경고는 1초동안 정지합니다.")
# 이 경고는 1초동안 sleep
import time
time.sleep(1)

View file

@ -1,10 +1,11 @@
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv(verbose=True, override=True) load_dotenv(verbose=True, override=True)
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:11081") BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:11081")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash") GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash")
GOOGLE_PLANNER_MODEL = os.getenv("GOOGLE_PLANNER_MODEL")
GOOGLE_ID = os.getenv("GOOGLE_ID", "google") USER_DATA_DIR = os.getenv("USER_DATA_DIR", "./data/user_data")
GOOGLE_PASSWORD = os.getenv("GOOGLE_PASSWORD", "google")

View file

@ -0,0 +1,2 @@
from lib.utils.data.backend_client import *
from lib.utils.data.logger import *

View file

@ -1,6 +1,7 @@
import requests import requests
from lib.config import BACKEND_URL from lib.utils.config import BACKEND_URL
def notify_backend(target_url): def notify_backend(target_url):
# Backend에 스캔 시작을 알림 # Backend에 스캔 시작을 알림

View file

@ -1,9 +1,10 @@
from pathlib import Path
from datetime import datetime from datetime import datetime
from pathlib import Path
# 미리 정해진 파일 경로 # 미리 정해진 파일 경로
FILE_PATH = Path("data/log.txt") FILE_PATH = Path("data/log.txt")
def logger(msg: str) -> None: def logger(msg: str) -> None:
try: try:
""" """

View file

@ -1,5 +1,6 @@
import requests import requests
def is_html_url(url: str, timeout: float = 10.0) -> bool: def is_html_url(url: str, timeout: float = 10.0) -> bool:
""" """
주어진 URL에 HEAD 요청을 보내고, 응답 헤더의 Content-Type이 HTML인지 확인합니다. 주어진 URL에 HEAD 요청을 보내고, 응답 헤더의 Content-Type이 HTML인지 확인합니다.
@ -16,17 +17,18 @@ def is_html_url(url: str, timeout: float = 10.0) -> bool:
if not response.ok: if not response.ok:
return False return False
content_type = response.headers.get('Content-Type', '') content_type = response.headers.get("Content-Type", "")
# Content-Type에 'text/html'이 포함되어 있으면 HTML로 간주 # Content-Type에 'text/html'이 포함되어 있으면 HTML로 간주
return content_type.lower().startswith('text/html') return content_type.lower().startswith("text/html")
except requests.RequestException: except requests.RequestException:
return False return False
if __name__ == '__main__':
if __name__ == "__main__":
test_urls = [ test_urls = [
'https://www.example.com', "https://www.example.com",
'https://api.github.com', # JSON API라서 HTML이 아닐 확률이 높음 "https://api.github.com", # JSON API라서 HTML이 아닐 확률이 높음
'https://raw.githubusercontent.com' # 텍스트 파일 등 다양한 타입 "https://raw.githubusercontent.com", # 텍스트 파일 등 다양한 타입
] ]
for url in test_urls: for url in test_urls:

View file

@ -20,10 +20,12 @@ def read_lines_between(filepath: str, start_line: int, end_line: int) -> list[st
""" """
if start_line < 1 or end_line < start_line: if start_line < 1 or end_line < start_line:
raise ValueError("start_line은 1 이상이어야 하며, end_line은 start_line 이상이어야 합니다.") raise ValueError(
"start_line은 1 이상이어야 하며, end_line은 start_line 이상이어야 합니다."
)
selected_lines: list[str] = [] selected_lines: list[str] = []
with open(filepath, 'r', encoding='utf-8') as f: with open(filepath, "r", encoding="utf-8") as f:
for idx, line in enumerate(f, start=1): for idx, line in enumerate(f, start=1):
if idx < start_line: if idx < start_line:
# 아직 읽기 시작 전 # 아직 읽기 시작 전
@ -32,5 +34,5 @@ def read_lines_between(filepath: str, start_line: int, end_line: int) -> list[st
# 읽을 범위를 벗어났으므로 중단 # 읽을 범위를 벗어났으므로 중단
break break
# 줄 끝의 개행 문자를 제거하고 리스트에 추가 # 줄 끝의 개행 문자를 제거하고 리스트에 추가
selected_lines.append(line.rstrip('\n')) selected_lines.append(line.rstrip("\n"))
return selected_lines return selected_lines

119
src/lib/utils/progress.py Normal file
View file

@ -0,0 +1,119 @@
import json
import os, sys
import signal
import time
import threading
from pathlib import Path
# 진행 상황 추적을 위한 전역 변수
current_progress = {"current_index": 0, "total": 0, "current_url": "", "start_line": 0}
progress_file = Path("data/scan_progress.json")
# Ctrl+C 처리를 위한 전역 변수
ctrl_c_count = 0
last_ctrl_c_time = 0
shutdown_requested = False
shutdown_lock = threading.Lock()
def save_progress():
"""현재 진행 상황을 파일에 저장"""
progress_file.parent.mkdir(parents=True, exist_ok=True)
with open(progress_file, "w", encoding="utf-8") as f:
json.dump(current_progress, f, ensure_ascii=False, indent=2)
def load_progress():
"""이전 진행 상황을 파일에서 불러오기"""
if os.path.exists(progress_file):
try:
with open(progress_file, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return None
return None
def signal_handler(signum, frame):
"""Ctrl+C 시그널 핸들러 - browser-use pause 기능과 호환"""
global shutdown_requested, ctrl_c_count, last_ctrl_c_time
current_time = time.time()
with shutdown_lock:
# 연속된 Ctrl+C 감지 (2초 내에 두 번 누르면 강제 종료)
if current_time - last_ctrl_c_time < 2.0:
ctrl_c_count += 1
else:
ctrl_c_count = 1
last_ctrl_c_time = current_time
# 두 번째 Ctrl+C이거나 이미 종료 요청이 있었다면 강제 종료
if ctrl_c_count >= 2 or shutdown_requested:
print("\n⚡ 강제 종료합니다!")
import asyncio
try:
loop = asyncio.get_running_loop()
for task in asyncio.all_tasks(loop):
task.cancel()
except RuntimeError:
pass
os._exit(1)
# 첫 번째 Ctrl+C: 정상 종료 요청
shutdown_requested = True
print("\n" + "=" * 60)
print("🛑 종료 신호를 받았습니다!")
print(f"📊 현재 진행 상황:")
print(f" - 전체: {current_progress['total']}개 URL")
print(f" - 완료: {current_progress['current_index']}개 URL")
print(f" - 현재 처리 중: {current_progress['current_url']}")
if current_progress.get('start_line'):
print(f" - domains.txt의 {current_progress['start_line'] + current_progress['current_index']}번째 줄")
if current_progress["total"] > 0:
print(f" - 진행률: {current_progress['current_index']}/{current_progress['total']} ({current_progress['current_index']/current_progress['total']*100:.1f}%)")
print("=" * 60)
# 진행 상황 저장
save_progress()
print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.")
print("다음에 같은 명령어로 실행하면 이어서 진행할 수 있습니다.")
print("<EFBFBD> 2초 내에 Ctrl+C를 다시 누르면 강제 종료됩니다.")
# 정상적인 종료를 위해 KeyboardInterrupt 발생
raise KeyboardInterrupt()
def is_shutdown_requested():
"""종료 요청 상태를 확인하는 함수"""
with shutdown_lock:
return shutdown_requested
def request_shutdown():
"""외부에서 종료를 요청할 수 있는 함수"""
global shutdown_requested
with shutdown_lock:
if not shutdown_requested:
shutdown_requested = True
print("\n🛑 종료가 요청되었습니다.")
print(f"📊 현재 진행 상황:")
print(f" - 전체: {current_progress['total']}개 URL")
print(f" - 완료: {current_progress['current_index']}개 URL")
print(f" - 현재 처리 중: {current_progress['current_url']}")
if current_progress.get('start_line'):
print(f" - domains.txt의 {current_progress['start_line'] + current_progress['current_index']}번째 줄")
if current_progress["total"] > 0:
print(f" - 진행률: {current_progress['current_index']}/{current_progress['total']} ({current_progress['current_index']/current_progress['total']*100:.1f}%)")
save_progress()
print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.")
print("다음에 같은 명령어로 실행하면 이어서 진행할 수 있습니다.")
def setup_signal_handler():
"""시그널 핸들러 등록 - browser-use와의 호환성을 위해 비활성화"""
# browser-use 라이브러리가 자체적으로 Ctrl+C 처리를 하므로
# 우리의 signal handler는 등록하지 않음
pass

View file

@ -0,0 +1,116 @@
"""
종료 처리를 위한 개선된 모듈
browser-use의 pause 기능과 호환되도록 설계
"""
import json
import os
import signal
import time
import threading
import asyncio
from pathlib import Path
# 진행 상황 추적을 위한 전역 변수
current_progress = {"current_index": 0, "total": 0, "current_url": "", "start_line": 0}
progress_file = Path("data/scan_progress.json")
# 종료 관리를 위한 전역 변수
shutdown_requested = False
shutdown_lock = threading.Lock()
original_handler = None
def save_progress():
"""현재 진행 상황을 파일에 저장"""
progress_file.parent.mkdir(parents=True, exist_ok=True)
with open(progress_file, "w", encoding="utf-8") as f:
json.dump(current_progress, f, ensure_ascii=False, indent=2)
def load_progress():
"""이전 진행 상황을 파일에서 불러오기"""
if os.path.exists(progress_file):
try:
with open(progress_file, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return None
return None
def request_shutdown():
"""종료 요청 함수 - 외부에서 호출 가능"""
global shutdown_requested
with shutdown_lock:
if not shutdown_requested:
shutdown_requested = True
print("\n🛑 종료가 요청되었습니다. 현재 작업을 완료한 후 종료합니다...")
save_progress()
print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.")
def is_shutdown_requested():
"""종료 요청 상태를 확인하는 함수"""
with shutdown_lock:
return shutdown_requested
def cleanup_signal_handler():
"""signal handler를 정리하고 원래 상태로 복원"""
global original_handler
if original_handler is not None:
signal.signal(signal.SIGINT, original_handler)
original_handler = None
def setup_minimal_signal_handler():
"""최소한의 signal handler만 설정 - browser-use와 충돌 방지"""
global original_handler
# 원래 핸들러 저장
original_handler = signal.signal(signal.SIGINT, signal.SIG_DFL)
def graceful_signal_handler(signum, frame):
"""우아한 종료를 위한 최소한의 signal handler"""
print("\n🛑 종료 신호를 받았습니다...")
save_progress()
print(f"💾 진행 상황이 {progress_file}에 저장되었습니다.")
# 원래 핸들러로 복원하고 신호를 다시 발생시킴
signal.signal(signal.SIGINT, original_handler)
os.kill(os.getpid(), signal.SIGINT)
signal.signal(signal.SIGINT, graceful_signal_handler)
class GracefulShutdown:
"""컨텍스트 매니저로 사용할 수 있는 우아한 종료 클래스"""
def __init__(self):
self.original_handler = None
def __enter__(self):
self.original_handler = signal.signal(signal.SIGINT, self._signal_handler)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.original_handler is not None:
signal.signal(signal.SIGINT, self.original_handler)
def _signal_handler(self, signum, frame):
"""내부 signal handler"""
request_shutdown()
# 원래 핸들러 복원 후 신호 재전송
signal.signal(signal.SIGINT, self.original_handler)
os.kill(os.getpid(), signal.SIGINT)
# 기존 함수들과의 호환성을 위한 별칭
def setup_signal_handler():
"""기존 코드와의 호환성을 위한 함수"""
pass # browser-use의 signal handler를 방해하지 않음
def signal_handler(signum, frame):
"""기존 코드와의 호환성을 위한 함수"""
request_shutdown()

121
src/main.py Normal file
View file

@ -0,0 +1,121 @@
import argparse
import asyncio
import os
import sys
from dotenv import load_dotenv
from lib.browser_use.scanner import main_loop
from lib.utils import env_cheker
from lib.utils.progress import progress_file, setup_signal_handler
def setup_environment():
"""환경 변수 로드 및 관련 라이브러리를 초기화합니다."""
# .env 파일 로드
load_dotenv(verbose=True, override=True)
# 환경 변수 체크
env_cheker()
# Laminar 초기화 (선택적)
if os.getenv("LMNR_PROJECT_API_KEY"):
try:
from lmnr import Laminar
Laminar.initialize(project_api_key=os.getenv("LMNR_PROJECT_API_KEY"))
except ImportError:
print("⚠️ Laminar 라이브러리가 설치되지 않았습니다. 관련 기능이 비활성화됩니다.")
else:
print("⚠️ LMNR_PROJECT_API_KEY 환경 변수가 설정되지 않았습니다. Laminar 기능이 비활성화됩니다.")
def parse_arguments():
"""커맨드 라인 인자를 파싱합니다."""
parser = argparse.ArgumentParser(
prog="domain_scanner",
description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다.",
)
parser.add_argument(
"-f",
"--file",
type=str,
required=True,
help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)",
)
parser.add_argument(
"-s", "--start", type=int, required=True, help="읽기 시작 줄 번호 (1-based)"
)
parser.add_argument(
"-e", "--end", type=int, required=True, help="읽기 종료 줄 번호 (1-based)"
)
parser.add_argument(
"-skh",
"--skip-html-check",
action="store_true",
help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다.",
)
return parser.parse_args()
def main():
"""애플리케이션 메인 진입점"""
setup_environment()
setup_signal_handler()
args = parse_arguments()
# read and remove user data path
log_file = os.path.join("./data", "userdata.dump")
if not os.path.exists("./data"):
os.makedirs("./data")
if os.path.exists(log_file):
with open(log_file, "r") as f:
tmp_user_data_dir = f.read().strip()
try:
import shutil
if os.path.exists(tmp_user_data_dir):
shutil.rmtree(tmp_user_data_dir)
print(f"🔧 이전 실행의 임시 사용자 데이터 디렉토리 {tmp_user_data_dir}를 삭제하였습니다.")
except (PermissionError, FileNotFoundError, OSError) as e:
print(f"⚠️ 임시 사용자 데이터 디렉토리 삭제 실패: {e}")
try:
os.remove(log_file)
except OSError:
pass
try:
asyncio.run(
main_loop(
filepath=args.file,
start_line=args.start,
end_line=args.end,
skip_html_check=args.skip_html_check,
)
)
except KeyboardInterrupt:
print("\n🛑 사용자에 의해 중단되었습니다.")
# 진행 상황 저장
from lib.utils.progress import save_progress, request_shutdown
request_shutdown()
print("✅ 정리 완료.")
sys.exit(0)
except Exception as e:
print(f"\n❌ 예상치 못한 오류가 발생했습니다: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
# 정상 종료 시에만 진행 상황 파일 삭제
from lib.utils.progress import is_shutdown_requested
if not is_shutdown_requested() and os.path.exists(progress_file):
try:
os.remove(progress_file)
print("✅ 진행 상황 파일이 삭제되었습니다.")
except OSError as e:
print(f"⚠️ 진행 상황 파일 삭제 실패: {e}", file=sys.stderr)
if __name__ == "__main__":
main()

52
temp.md
View file

@ -1,52 +0,0 @@
You are an AI model specialized in web crawling and analysis. Given a URI, perform the following tasks:
1. Navigate to the provided URI and locate the login page. If its not found, explore common auth-related pages like /login or /auth.
2. On the login page, identify all available social login buttons (OAuth-based) such as Google, GitHub, Facebook, etc.
3. Simulate clicking each social login button and follow the redirect to capture the full redirect URL (including query parameters).
4. From the redirect URL and parameters, extract:
- `client_id`
- `redirect_uri`
- `response_type`
- `scope`
5. Based on URL patterns, infer the OAuth method: Authorization Code, Implicit, PKCE, etc.
6. Return data in the following JSON format only:
```json
{
"oauths": [
{
"issue": "<site being tested, e.g., git.imnya.ng>",
"oauth_uri": "<original button href or URL triggered>"
}
]
}
````
7. If the login button says something like "Login with GitHub" or "Login with Google", follow the flow and use the **final redirect URL after clicking** as the value of `oauth_uri`.
**Examples:**
```json
{
"oauths": [
{
"issue": "git.imnya.ng",
"provider": "GitHub",
"client_id": "Iv1.xxxxx",
"redirect_uri": "https://git.imnya.ng/user/oauth2/callback",
"response_type": "code",
"scope": "read:user",
"oauth_uri": "https://github.com/login/oauth/authorize?client_id=Iv1.xxxxx&redirect_uri=https%3A%2F%2Fgit.imnya.ng%2Fuser%2Foauth2%2Fcallback&response_type=code&scope=read%3Auser"
}
]
}
```
**Constraints:**
* Simulate realistic interaction with buttons (e.g., clicking them to follow redirects).
* Ensure the output is strictly in the specified JSON format.
* Avoid any additional text or explanations outside the JSON response.
* If no OAuth logins are found, return an empty array.
* WebAuthn, PassKey is not OAuth, so do not include it in the results.

3533
uv.lock generated

File diff suppressed because it is too large Load diff