Merge pull request #21 from j93es/feat/j93es

Feat/j93es
This commit is contained in:
James 2025-06-27 10:26:42 +09:00 committed by GitHub
commit 8c22a52774
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 451 additions and 378 deletions

9
lib/agents/__init__.py Normal file
View file

@ -0,0 +1,9 @@
from lib.agents.get_sso_list import get_sso_list
# 업데이트될 버전 import 아직 개발 중
from lib.agents.get_sso_list_v2 import get_sso_list as get_sso_list_v2
from lib.agents.login_google import login_google
__all__ = [
"get_sso_list",
"login_google",
]

View file

@ -1,92 +0,0 @@
import json
from pydantic import BaseModel
from browser_use import (
Agent,
Controller,
)
from lib.agents.run_agent import run_agent
from lib.utils.logger import logger
from lib.browser_use_utils.create_google_ai import create_google_ai
from lib.config import GOOGLE_MODEL, GOOGLE_PLANNER_MODEL
NOT_FOUND_LOGIN_PAGE = 0
FOUND_LOGIN_PAGE = 1
class FindLoginPageResponse(BaseModel):
status: int = NOT_FOUND_LOGIN_PAGE # 0 if not found, 1 if found
msg: str | None = None
url: str | None = None
async def find_login_page(target_url, session) -> tuple[bool, str | None]:
initial_actions = [{"open_tab": {"url": target_url}}]
task = """
You are an expert in finding login pages.
Your task is to navigate to the login page of the given URL. Follow the steps below strictly and return results only in the specified format.
You are NOT allowed to navigate to URLs that are not directly discoverable within the initial domain. Do NOT use search engines or guess external login URLs.
0. INITIAL BLOCK CHECK
- If the browser is blocked when trying to access the page due to firewall, CAPTCHA, regional restrictions, or other access denials immediately terminate the process and return the following JSON:
```json
{
"status": 0,
"msg": "Blocked",
"url": ""
}
```
- Do NOT proceed to further steps in this case.
1. LOGIN PAGE NAVIGATION
- Navigate only to a **client-side (non-enterprise)** login page within the provided domain.
- Do NOT rely on external tools, search engines, or links not directly found on the site.
- If a consent popup (e.g. for privacy/cookies) appears, you MUST dismiss or close it before proceeding.
- Since step 0 confirmed access, assume the page now loads properly.
2. RETURN FORMAT
- Once the login page is reached, return a JSON object matching the following schema:
```json
{
"status": 1, // 1 if login page is found, 0 otherwise
"msg": "Login page found", // Optional message
"url": "https://example.com/login" // Full URL of the login page if found
}
```
- If the login page cannot be found, return:
```json
{
"status": 0,
"msg": "Login page not found",
"url": ""
}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""
controller = Controller(output_model=FindLoginPageResponse, exclude_actions=['search_google'])
agent = Agent(
browser_session=session,
initial_actions=initial_actions,
task=task,
llm=create_google_ai(GOOGLE_MODEL),
controller=controller,
)
is_failed, final_result = await run_agent(agent)
if is_failed:
logger(f"⚠️ 스캔 실패: {target_url} | {final_result}")
print(f"⚠️ 스캔 실패: {target_url} | {final_result}")
return False, None;
data = json.loads(final_result)
try:
resp = FindLoginPageResponse(**data)
if resp.status == FOUND_LOGIN_PAGE and len(resp.url) > 0:
return True, resp.url
else:
return False, resp.msg
except Exception as e:
logger(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
print(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
return False, data.msg

View file

@ -1,66 +0,0 @@
import json
from pydantic import BaseModel
from browser_use import (
Agent,
Controller,
)
from lib.agents.run_agent import run_agent
from lib.utils.logger import logger
from lib.browser_use_utils.create_google_ai import create_google_ai
from lib.config import GOOGLE_MODEL, GOOGLE_PLANNER_MODEL
NOT_FOUND_SSO_LIST = 0
FOUND_SSO_LIST = 1
class EachSSOProvider(BaseModel):
provider: str
oauth_uri: str | None = None
class FindLoginPageResponse(BaseModel):
EachSSOProviders: list[EachSSOProvider] | None = None
status: int = NOT_FOUND_SSO_LIST # 0 if not found,
msg: str | None = None
async def get_sso_list(target_url, session) -> tuple[bool, str | None]:
initial_actions = [{"open_tab": {"url": target_url}}]
task = "Navigate to the login page, and return the result in the specified format."
extend_planner_system_message = """
You are an expert in finding login pages.
Your task is to navigate to the login page of the given URL.
Once you reach the login page, stop and return a JSON object that matches the following schema:
```json
{
"status": 1, # 1 if login page found, 0 otherwise
"url": "https://example.com/login" # Full URL of the login page if found
}
Return only this JSON object. Do not include any explanation or additional text.
"""
controller = Controller(output_model=FindLoginPageResponse, exclude_actions=['search_google'])
agent = Agent(
browser_session=session,
initial_actions=initial_actions,
task=task,
llm=create_google_ai(GOOGLE_MODEL),
planner_llm=create_google_ai(GOOGLE_PLANNER_MODEL),
controller=controller,
extend_planner_system_message=extend_planner_system_message,
)
is_failed, final_result = await run_agent(agent)
if is_failed:
logger(f"⚠️ 스캔 실패: {target_url} | {final_result}")
print(f"⚠️ 스캔 실패: {target_url} | {final_result}")
return False, None;
data = json.loads(final_result)
try:
resp = FindLoginPageResponse(**data)
if resp.status == FOUND_SSO_LIST:
return True, resp
else:
return False, None
except Exception as e:
logger(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
print(f"⚠️ 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
return False, data.msg

View file

@ -0,0 +1,3 @@
from lib.agents.get_sso_list.get_sso_list import get_sso_list
__all__ = ["get_sso_list"]

View file

@ -0,0 +1,22 @@
from lib.agents.get_sso_list.prompt import get_sso_list_task, FindLoginPageResponse
from lib.browser_use_utils.run_task import run_task
NOT_FOUND_LOGIN_PAGE = 0
FOUND_LOGIN_PAGE = 1
async def get_sso_list(target_url) -> tuple[bool, str | FindLoginPageResponse | None]:
task = get_sso_list_task
ReturnModel = FindLoginPageResponse
success, response = await run_task(target_url, ReturnModel, task)
if not success:
return False, response
if isinstance(response, str):
return False, response
return True, response

View file

@ -0,0 +1,68 @@
from pydantic import BaseModel
class FindLoginPageResponse(BaseModel):
msg: str | None = None
url: str | None = None
sso_list: list[str] = [] # List of SSO providers found on the login page
get_sso_list_task = """
You are an expert in finding login pages.
Your task is to navigate to the login page of the given URL. Follow the steps below strictly and return results only in the specified format.
You are NOT allowed to navigate to URLs that are not directly discoverable within the initial domain. Do NOT use search engines or guess external login URLs.
0. INITIAL BLOCK CHECK
- If the browser is blocked when trying to access the page due to firewall, CAPTCHA, regional restrictions, or other access denials immediately terminate the process and return the following JSON:
```json
{
"msg": "Blocked",
"url": "",
"sso_list": []
}
```
- Do NOT proceed to further steps in this case.
1. LOGIN PAGE NAVIGATION
- Navigate only to a **client-side (non-enterprise)** login page within the provided domain.
- Do NOT rely on external tools, search engines, or links not directly found on the site.
- If a consent popup (e.g. for privacy/cookies) appears, you MUST dismiss or close it before proceeding.
- Since step 0 confirmed access, assume the page now loads properly.
2. SSO BUTTON IDENTIFICATION
- On the login page, look for the following social login (SSO) buttons:
- Google, GitHub, Facebook, LinkedIn, Microsoft, Naver, Slack, Etc.
- Proceed only if it is clearly an **actual SSO button**.
- Exclude the following:
- Passkey-related buttons
- Username/password fields
- Email-based login
- Non-OAuth methods such as certificate or phone verification
3. RETURN FORMAT
- If the login page is successfully found, return:
```json
{
"msg": "Login page found",
"url": "https://example.com/login",
"sso_list": ["Google", "GitHub"]
}
```
- If the login page cannot be found, return:
```json
{
"msg": "Login page not found",
"url": "",
"sso_list": []
}
```
- If blocked (as in step 0), return:
```json
{
"msg": "Blocked",
"url": "",
"sso_list": []
}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -0,0 +1,3 @@
from lib.agents.get_sso_list_v2 import get_sso_list
__all__ = ["get_sso_list"]

View file

@ -0,0 +1,20 @@
from lib.agents.get_sso_list_v2.prompt import get_sso_list_task, FindLoginPageResponse
from lib.browser_use_utils.run_task import run_task
# TODO - Split find login page agent and get SSO list agent
async def get_sso_list(target_url) -> tuple[bool, str | FindLoginPageResponse | None]:
task = get_sso_list_task
ReturnModel = FindLoginPageResponse
success, response = await run_task(target_url, ReturnModel, task)
if not success:
return False, response
if isinstance(response, str):
return False, response
return True, response

View file

@ -0,0 +1,68 @@
from pydantic import BaseModel
class FindLoginPageResponse(BaseModel):
msg: str | None = None
url: str | None = None
sso_list: list[str] = [] # List of SSO providers found on the login page
get_sso_list_task = """
You are an expert in finding login pages.
Your task is to navigate to the login page of the given URL. Follow the steps below strictly and return results only in the specified format.
You are NOT allowed to navigate to URLs that are not directly discoverable within the initial domain. Do NOT use search engines or guess external login URLs.
0. INITIAL BLOCK CHECK
- If the browser is blocked when trying to access the page due to firewall, CAPTCHA, regional restrictions, or other access denials immediately terminate the process and return the following JSON:
```json
{
"msg": "Blocked",
"url": "",
"sso_list": []
}
```
- Do NOT proceed to further steps in this case.
1. LOGIN PAGE NAVIGATION
- Navigate only to a **client-side (non-enterprise)** login page within the provided domain.
- Do NOT rely on external tools, search engines, or links not directly found on the site.
- If a consent popup (e.g. for privacy/cookies) appears, you MUST dismiss or close it before proceeding.
- Since step 0 confirmed access, assume the page now loads properly.
2. SSO BUTTON IDENTIFICATION
- On the login page, look for the following social login (SSO) buttons:
- Google, GitHub, Facebook, LinkedIn, Microsoft, Naver, Slack, Etc.
- Proceed only if it is clearly an **actual SSO button**.
- Exclude the following:
- Passkey-related buttons
- Username/password fields
- Email-based login
- Non-OAuth methods such as certificate or phone verification
3. RETURN FORMAT
- If the login page is successfully found, return:
```json
{
"msg": "Login page found",
"url": "https://example.com/login",
"sso_list": ["Google", "GitHub"]
}
```
- If the login page cannot be found, return:
```json
{
"msg": "Login page not found",
"url": "",
"sso_list": []
}
```
- If blocked (as in step 0), return:
```json
{
"msg": "Blocked",
"url": "",
"sso_list": []
}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -0,0 +1,3 @@
from lib.agents.login_google.login_google import login_google
__all__ = ["login_google"]

View file

@ -0,0 +1,11 @@
from lib.agents.login_google.prompt import login_google_task, LoginGoogleResponse
from lib.browser_use_utils.run_task import run_task
async def login_google(target_url) -> tuple[bool, str | LoginGoogleResponse | None]:
task = login_google_task
ReturnModel = LoginGoogleResponse
success, response = await run_task(target_url, ReturnModel, task)
if not success:
return False, None
return True, response

View file

@ -0,0 +1,63 @@
from pydantic import BaseModel
from lib.config import GOOGLE_ID, GOOGLE_PASSWORD
class LoginGoogleResponse(BaseModel):
msg: str | None = None
status: str | None = None # "success", "mfa_required", "google_blocked", "sso_not_found", "login_page_not_found", "invalid_credentials"
final_url: str | None = None
login_google_task = f"""
You are a web automation agent.
Your task is to visit the given domain and perform a full login via the **Google SSO button**, following all steps strictly as described below.
Target: Find a login page inside this domain that allows "Sign in with Google", and use it to complete login via Google.
Instructions:
1. If any cookie or privacy popups appear, dismiss or accept them.
2. Navigate through the site's UI to find the **login or sign-in page** (e.g., via buttons like "Log In", "Sign In", "Get Started").
- Only follow links within the same domain.
3. On the login page, look for a clearly labeled **Google SSO button** typically labeled as:
- "Continue with Google"
- "Sign in with Google"
- or a button with the Google 'G' icon
4. Click the **Google login button**.
- The Google login flow MUST open in a **new browser tab** (not a new window or popup).
- If the login opens in a new **window** or **popup**, do NOT continue. Immediately stop and return the appropriate status.
5. Check if the user is **already logged in to Google and immediately redirected back to the original site** without showing a Google login screen.
- If so, treat the login as successful and return immediately.
6. If redirected to the Google login page:
- If a **CAPTCHA**, **MFA prompt**, or a request for **ID/password entry** appears, do NOT proceed.
- Immediately stop and return the appropriate status.
7. If login proceeds without interruptions, wait for redirection back to the original site and record the final URL.
Credentials to use for Google login:
- Email: {GOOGLE_ID}
- Password: {GOOGLE_PASSWORD}
Constraints:
- Do NOT use search engines or guess URLs.
- Do NOT use autofill, saved sessions, or cookies.
- Do NOT proceed with login if:
- The login opens in a new window (only tabs are allowed)
- CAPTCHA or MFA appears
- ID/password input is required
- If the user is already logged in to Google and redirected back automatically, stop there and report success.
- If the login page cannot be found, return "login_page_not_found".
- If the Google login button is not found, return "sso_not_found".
- If a page such as a sign-up page appears, consider it a successful login and terminate immediately.
Final Output:
Return the result in the following format only:
```json
{{
"msg": "Google login completed",
"status": "success" | "already_logged_in" | "mfa_required" | "captcha_triggered" | "window_blocked" | "idpw_required" | "google_blocked" | "sso_not_found" | "login_page_not_found",
"final_url": "<url_after_login_redirect or empty string>"
}}
```
- Return ONLY the JSON object. Do NOT include any explanation, logging, or extra output.
"""

View file

@ -1,19 +0,0 @@
from lib.browser_use_utils.clean_resources import clean_agent_resources
async def run_agent(agent) -> tuple[int, str]:
try:
response = await agent.run()
final_result = response.final_result()
if final_result is None:
return -1, "최종 결과가 없습니다. 에이전트 실행 실패"
return 0, final_result
except Exception as e:
# API 쿼터 문제인지 확인
if "ResourceExhausted" in str(e) or "429" in str(e):
return 1, "API 쿼터 에러로 인한 실패"
# 일반 에러 처리
else:
return 2, "일반 에러로 인한 실패"
finally:
await clean_agent_resources(agent)

View file

@ -0,0 +1,15 @@
from lib.browser_use_utils.clean_resources import clean_resources, clean_agent_resources, clean_session_resources
from lib.browser_use_utils.create_google_ai import create_google_ai
from lib.browser_use_utils.get_profile import get_profile
from lib.browser_use_utils.run_agent import run_agent
from lib.browser_use_utils.run_task import run_task
__all__ = [
"clean_resources",
"clean_agent_resources",
"clean_session_resources",
"create_google_ai",
"get_profile",
"run_agent",
"run_task",
]

View file

@ -0,0 +1,40 @@
from typing import Any
from pydantic import BaseModel
from lib.browser_use_utils.clean_resources import clean_agent_resources
from lib.config import GOOGLE_MODEL
from browser_use import (
Agent,
Controller,
)
from lib.browser_use_utils.create_google_ai import create_google_ai
async def run_agent(session, initial_actions, ReturnModel: type[BaseModel], task: str) -> tuple[bool, str, Any | None]:
controller = Controller(output_model=ReturnModel, exclude_actions=['search_google'])
agent = Agent(
browser_session=session,
initial_actions=initial_actions,
task=task,
llm=create_google_ai(GOOGLE_MODEL),
controller=controller,
)
try:
response = await agent.run()
final_result = response.final_result()
if final_result is None:
return False, "LLM이 반환한 최종 결과가 없습니다.", None
except Exception as e:
# API 쿼터 문제인지 확인
if "ResourceExhausted" in str(e) or "429" in str(e):
return False, "API 쿼터 에러로 인한 실패", None
# 일반 에러 처리
else:
return False, "일반 에러로 인한 실패", None
finally:
await clean_agent_resources(agent)
return True, "ok", final_result

View file

@ -0,0 +1,40 @@
import json
from typing import Any
from pydantic import BaseModel
from browser_use import (
BrowserSession
)
from patchright.async_api import async_playwright as async_patchright
from lib.utils.logger import logger
from lib.browser_use_utils import get_profile, clean_session_resources, run_agent
async def run_task(target_url: str, ReturnModel: type[BaseModel], task: str) -> tuple[bool, type[BaseModel] | None]:
session = BrowserSession(
playwright=(await async_patchright().start()),
browser_profile=await get_profile(),
)
initial_actions = [{"open_tab": {"url": target_url}}]
seccess, msg, final_result = await run_agent(session=session,
initial_actions=initial_actions,
ReturnModel=ReturnModel,
task=task)
if not seccess:
logger(f"⚠️ LLM 실행 실패: {target_url} | {msg}")
print(f"⚠️ LLM 실행 실패: {target_url} | {msg}")
await clean_session_resources(session)
return False, None
try:
data = json.loads(final_result)
resp = ReturnModel(**data)
return True, resp
except Exception as e:
logger(f"⚠️ LLM 응답 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
print(f"⚠️ LLM 응답 결과 파싱 실패: {target_url} | {e}\n원본 결과: {data.msg}")
return False, None
finally:
await clean_session_resources(session)

View file

@ -4,5 +4,7 @@ load_dotenv(verbose=True, override=True)
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:11081") BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:11081")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash-preview-05-20") GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash")
GOOGLE_PLANNER_MODEL = os.getenv("GOOGLE_PLANNER_MODEL", "gemini-2.5-pro-preview-06-05")
GOOGLE_ID = os.getenv("GOOGLE_ID", "google")
GOOGLE_PASSWORD = os.getenv("GOOGLE_PASSWORD", "google")

View file

@ -1,65 +0,0 @@
import asyncio
from browser_use import Agent, BrowserSession
from patchright.async_api import async_playwright as async_patchright
from lib.agents.find_login_page import find_login_page
from lib.browser_use_utils.clean_resources import clean_session_resources
from lib.browser_use_utils.get_profile import get_profile
from lib.utils.save_oauth_providers import save_oauth_providers
async def find_sso_list(target_url):
session = BrowserSession(
playwright=(await async_patchright().start()),
browser_profile=await get_profile(),
)
FIND_LOGIN_PAGE = 1
FIND_SSO_LIST = 2
SAVE_DATA = 3
WHEN_ERROR = -1
FINISH = 0
final_result = None
login_url = target_url
state = FIND_LOGIN_PAGE
while True:
if state == FIND_LOGIN_PAGE:
is_success, resp = await find_login_page(
target_url=target_url,
session=session,
)
if not is_success:
print(f"⚠️ 로그인 페이지 탐지 실패: {target_url} | {resp}")
state = WHEN_ERROR
login_url = resp if resp else target_url
state = FIND_SSO_LIST
if state == FIND_SSO_LIST:
print(f"🔎 SSO 목록 찾는 중: {target_url}")
is_success, resp = await find_sso_list(
target_url=login_url,
session=session,
)
if not is_success:
print(f"⚠️ SSO 목록 탐지 실패: {target_url} | {resp}")
state = WHEN_ERROR
final_result = ""
state = SAVE_DATA
if state == SAVE_DATA:
print(f"💾 데이터 저장 중: {target_url}")
if not final_result:
print(f"⚠️ SSO 목록이 전달되지 않았습니다: {target_url}")
state = WHEN_ERROR
save_oauth_providers(target_url, final_result)
state = FINISH
if state == WHEN_ERROR:
print(f"⚠️ 에러 발생: {target_url} | 스캔을 중단합니다.")
return
if state == FINISH:
print(f"✅ 스캔 완료: {target_url}")
break
await clean_session_resources(session)

20
lib/utils/__init__.py Normal file
View file

@ -0,0 +1,20 @@
from lib.utils.env_checker import check_env_variables
from lib.utils.is_html import is_html_url
from lib.utils.logger import logger
from lib.utils.notify_backend import notify_backend
from lib.utils.progress_checker import save_progress, load_progress
# v2 import => 아직 개발 중
from lib.utils.progress_checker_v2 import ProgressChecker
from lib.utils.read_txt import read_lines_between
from lib.utils.save_oauth_providers import save_oauth_providers
__all__ = [
"check_env_variables",
"is_html_url",
"logger",
"notify_backend",
"read_lines_between",
"save_progress",
"load_progress",
"save_oauth_providers",
]

View file

@ -8,7 +8,6 @@ def check_env_variables():
"BACKEND_URL", "BACKEND_URL",
"GOOGLE_API_KEY", "GOOGLE_API_KEY",
"GOOGLE_MODEL", "GOOGLE_MODEL",
"GOOGLE_PLANNER_MODEL"
] ]
for var in required_vars: for var in required_vars:

View file

@ -4,7 +4,6 @@ from pathlib import Path
progress_file = Path("data/scan_progress.json") progress_file = Path("data/scan_progress.json")
def save_progress(current_progress): def save_progress(current_progress):
"""현재 진행 상황을 파일에 저장""" """현재 진행 상황을 파일에 저장"""
with open(progress_file, 'w', encoding='utf-8') as f: with open(progress_file, 'w', encoding='utf-8') as f:

View file

@ -0,0 +1,25 @@
import json
import os
from pathlib import Path
progress_file = Path("data/scan_progress.json")
class ProgressChecker:
def __init__(self, filepath):
self.filepath = filepath
self.progress = self.load_progress()
def save(self):
"""현재 진행 상황을 파일에 저장"""
with open(self.filepath, 'w', encoding='utf-8') as f:
json.dump(self.progress, f, ensure_ascii=False, indent=2)
def load(self):
"""이전 진행 상황을 파일에서 불러오기"""
if os.path.exists(self.filepath):
try:
with open(self.filepath, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return None
return None

View file

@ -1,90 +0,0 @@
from dotenv import load_dotenv
import os
load_dotenv()
google_id = os.getenv("GOOGLE_ID")
google_password = os.getenv("GOOGLE_PASSWORD")
naver_id = os.getenv("NAVER_ID")
naver_password = os.getenv("NAVER_PASSWORD")
facebook_id = os.getenv("FACEBOOK_ID")
facebook_password = os.getenv("FACEBOOK_PASSWORD")
github_id = os.getenv("GITHUB_ID")
github_password = os.getenv("GITHUB_PASSWORD")
# Extended planner prompt
extend_planner_system_message = f"""
🎯 Mission: Collect Initial SSO Redirect URLs (For Browser Automation)
**모든 STEP에서 구글 검색, Bing 검색 어떤 외부 검색 기능도 절대 사용하지 않고, 초기에 주어진 URL에서 탐색하세요.**
**초기에 주어진 URL 내에서 실제로 확인되지 않은 URL로 직접 이동하는것은 허용되지 않습니다.**
0. **초기 블록(Block) 체크**
- 브라우저가 로그인 페이지에 접근하려 , **페이지가 차단(blocked)** 되거나 **방화벽, CAPTCHA, 접근 제한** 등으로 인해 정상적으로 로드되지 않으면 즉시 프로세스를 종료하고 아래 JSON만 반환해야 합니다.
```json
[
{{
"provider": "Blocked",
"oauth_uri": "-"
}}
]
```
- 이후 단계로 절대 넘어가지 않도록 합니다.
1. **로그인 페이지 탐색**
- **클라이언트(비엔터프라이즈) 로그인 페이지** 직접 이동합니다. **검색 엔진을 사용하여 찾아서는 됩니다.**
- 접근 **개인정보/쿠키/동의 팝업** 뜨면, 이를 반드시 **닫거나(Dismiss)** 처리하고 계속 진행합니다.
- (이미 0단계에서 블록 여부를 확인했으므로, 단계에서는 페이지가 정상 로드되었다고 가정합니다.)
2. **SSO 버튼 식별**
- 로그인 페이지에서 다음과 같은 소셜 로그인 버튼을 찾습니다:
- Google, GitHub, Facebook, Linkedin, Microsoft, Naver
- **실제 SSO 버튼**임이 명확히 확인되는 경우에만 진행합니다.
- 제외 대상:
- Passkey 관련 버튼
- 아이디/비밀번호 입력란
- 이메일 기반 로그인
- 인증서, 휴대폰 인증 -OAuth 로그인 옵션
3. **SSO 버튼 클릭 로그인 시도**
- 유효한 SSO 버튼이 발견되면, 버튼을 클릭합니다.
- 클릭 ** 번째로 리디렉션된 URL(쿼리 스트링 포함)** `oauth_uri` 기록합니다.
- 공급자 페이지가 열리면, 아래 자격증명을 이용해 로그인을 시도합니다, 아래 자격증명에 포함되지 않는 SSO 버튼도 클릭까지는 시도합니다.:
- Google `{google_id}` / `{google_password}`
- Naver `{naver_id}` / `{naver_password}`
- GitHub `{github_id}` / `{github_password}`
- facebook `{facebook_id}` / `{facebook_password}`
- **자격증명이 주어진 SSO 버튼인 경우 로그인 과정을 진행합니다.**
- 로그인 과정이 모두 끝나거나 로그인이 되지 않는 경우 세션 쿠키를 모두 삭제하고 페이지를 새로고침합니다.
- 한번이라도 SSO 버튼을 클릭한 경우, 해당 버튼은 이상 탐색하지 않습니다.
- id/pw 입력 성공 , 아직 로그인되지 않았다면, 최대 5초간 대기합니다.
- 아직 로그인을 시도하지 않은 SSO 버튼이 있다면 이전 단계인 1. **로그인 페이지 탐색**, 2. **SSO 버튼 식별**, 3. **SSO 버튼 클릭 로그인 시도** 돌아가 절차를 반복합니다.
- 최종 결과는 다음과 같이 기록합니다:
```json
[
{{
"provider": "Google",
"oauth_uri": "(optional) https://example.com/auth/google?client_id=...",
}},
{{
"provider": "Naver",
"oauth_uri": "(optional) https://example.com/auth/naver?client_id=...",
}}
]
```
4. **SSO 버튼 미발견 또는 오류 발생 **
- 페이지 내부에 유효한 SSO 버튼이 전혀 없거나, 탐색 예기치 않은 오류가 발생하면 즉시 프로세스를 종료하고 ** 배열** 반환합니다:
```json
[]
```
5. **중요 사항**
- **반드시** 위의 단계들을 순서대로 수행해야 하며, 단계에서 발생하는 예외 상황을 정확히 처리해야 합니다.
- **반복 행동** 감지되면 즉시 배열을 반환하고, **블록된 페이지** 초기 단계에서 처리하여 프로세스를 종료해야 합니다.
- **SSO 버튼이 발견되지 않거나, 오류가 발생한 경우에도 배열을 반환해야 합니다.**
- **반드시** JSON 형식으로 결과를 반환해야 하며, 다른 형식은 허용되지 않습니다.
- 최대한 효율적인 단계로 진행하며, 불필요한 반복이나 검색 엔진 사용을 피해야 합니다.
"""

59
main.py
View file

@ -3,18 +3,18 @@ import argparse
import signal import signal
from dotenv import load_dotenv from dotenv import load_dotenv
from lib.config import BACKEND_URL from lib.config import BACKEND_URL
from lib.utils.notify_backend import notify_backend from lib.utils import notify_backend, is_html_url, read_lines_between, save_progress, load_progress, check_env_variables
from lib.utils.is_html import is_html_url from lib.agents import get_sso_list, login_google
from lib.utils.read_txt import read_lines_between
from lib.utils.progress_checker import save_progress, load_progress
from lib.utils.env_checker import check_env_variables
from lib.find_sso_list import find_sso_list
load_dotenv() load_dotenv()
check_env_variables() check_env_variables()
backend_url = BACKEND_URL backend_url = BACKEND_URL
login_agents = {
"google": login_google
}
# ── URL별로 Browser를 새로 띄우는 함수 ── # ── URL별로 Browser를 새로 띄우는 함수 ──
async def scan_one_url(url: str, skip_html_check: bool = False): async def scan_one_url(url: str, skip_html_check: bool = False):
target_url = url if url.startswith("http") else f"https://{url}" target_url = url if url.startswith("http") else f"https://{url}"
@ -28,33 +28,28 @@ async def scan_one_url(url: str, skip_html_check: bool = False):
# Backend에 스캔 시작 알림 # Backend에 스캔 시작 알림
notify_backend(target_url) notify_backend(target_url)
await find_sso_list(target_url) success, response = await get_sso_list(target_url)
if not success:
return
if len(response.sso_list) == 0:
return
for sso in response.sso_list:
target_login_agent = login_agents.get(sso.lower())
if target_login_agent:
print(f"🔍 {target_url} 에서 SSO 발견: {sso}, 로그인 시도 중...")
success, login_response = await target_login_agent(target_url)
if not success:
print(f"⚠️ {target_url} 에서 {sso} 로그인 실패")
continue
print(f"{target_url} 에서 {sso} 로그인 성공: {login_response.final_url}")
else:
print(f"{target_url} 에서 SSO 발견: {sso} | TODO")
# Backend에 스캔 완료 알림
# 오탐 검증
# # 5) 결과 출력
# print("-" * 50)
# print(f"🔗 Scanned URL: {url}\n")
# print("🔐 Detected OAuth Providers and URLs:")
# for entry in oauth_entries:
# if "<" in entry.oauth_uri or "..." in entry.oauth_uri:
# print(f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n")
# else:
# print(f"- {entry.provider}: {entry.oauth_uri}")
# print("-" * 50)
# # 6) CSV에 저장 (append)
# csv_file = "./oauth_providers.csv"
# file_exists = os.path.isfile(csv_file)
# with open(csv_file, "a", newline="", encoding="utf-8") as f:
# writer = csv.writer(f)
# if not file_exists:
# writer.writerow(["issuer", "provider", "oauth_uri"])
# for entry in oauth_entries:
# writer.writerow([url, entry.provider, entry.oauth_uri])
# print(f"✅ OAuth providers saved to {csv_file}\n")
@ -111,7 +106,7 @@ async def loop(
current_progress["current_index"] = actual_index current_progress["current_index"] = actual_index
print(f"\n🔄 Processing {actual_index + 1}/{current_progress['total']}: {url}") print(f"\n🔄 Processing {actual_index + 1}/{current_progress['total']}: {url}")
print(f"📍 domains.txt의 {start_line + actual_index}번째 줄") print(f"📍 domains.txt의 {actual_index}번째 줄")
await scan_one_url(url, skip_html_check=skip_html_check) await scan_one_url(url, skip_html_check=skip_html_check)

20
uv.lock generated
View file

@ -75,30 +75,30 @@ wheels = [
[[package]] [[package]]
name = "boto3" name = "boto3"
version = "1.38.41" version = "1.38.44"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "botocore" }, { name = "botocore" },
{ name = "jmespath" }, { name = "jmespath" },
{ name = "s3transfer" }, { name = "s3transfer" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/2f/3b/f421b30e32c33ce63f0de3b32ea12954039a4595c693db4ea4900babe742/boto3-1.38.41.tar.gz", hash = "sha256:c6710fc533c8e1f5d1f025c74ffe1222c3659094cd51c076ec50c201a54c8f22", size = 111835, upload-time = "2025-06-20T19:26:41.584Z" } sdist = { url = "https://files.pythonhosted.org/packages/7b/7f/ea50e25a049072c0078045437d25fc9c8eaec4bd58f2cc340e6ed52e55cd/boto3-1.38.44.tar.gz", hash = "sha256:af1769dfb2a8a30eec24d0b74a8c17db2accc5a6224d4fab39dd36df6590f741", size = 111899, upload-time = "2025-06-25T19:27:40.825Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/0a/bb/541825bf9811eb7fe13a357e691dc4cfead56a5fed4556aa101dc62e06ca/boto3-1.38.41-py3-none-any.whl", hash = "sha256:6119e9f272b9f004f052ca78ce94d3fe10198bc159ae808f75c0e1b9c07518bd", size = 139922, upload-time = "2025-06-20T19:26:39.963Z" }, { url = "https://files.pythonhosted.org/packages/17/73/4a1bbd696e492f17064e7404c49d4d3bafcc8b50239ec6624c10ea824dd1/boto3-1.38.44-py3-none-any.whl", hash = "sha256:73fcb2f8c7bec25d17e3f1940a1776c515b458b3da77ad3a31a177479591028b", size = 139923, upload-time = "2025-06-25T19:27:38.748Z" },
] ]
[[package]] [[package]]
name = "botocore" name = "botocore"
version = "1.38.41" version = "1.38.44"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "jmespath" }, { name = "jmespath" },
{ name = "python-dateutil" }, { name = "python-dateutil" },
{ name = "urllib3" }, { name = "urllib3" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/98/46/cb33f5a0b00086a97c4eebbc4e0211fe85d66d45e53a9545b33805f25b31/botocore-1.38.41.tar.gz", hash = "sha256:98e3fed636ebb519320c4b2d078db6fa6099b052b4bb9b5c66632a5a7fe72507", size = 14031081, upload-time = "2025-06-20T19:26:31.365Z" } sdist = { url = "https://files.pythonhosted.org/packages/31/06/c6e652e8b449837218d83cedda9c54104cfd5d38dc97762044a40116b209/botocore-1.38.44.tar.gz", hash = "sha256:8d54795a084204e4cd7885d9307e4bfaccc96411dc0384f6ba240b515c45bf54", size = 14050056, upload-time = "2025-06-25T19:27:29.354Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/ec/b7/37d9f1a633e72250408cb7d53d8915561ac6108b5c3a1973eb8f53ce2990/botocore-1.38.41-py3-none-any.whl", hash = "sha256:06069a06f1352accb1f6c9505d6e323753627112be80a9d2e057c6d9c9779ffd", size = 13690225, upload-time = "2025-06-20T19:26:26.014Z" }, { url = "https://files.pythonhosted.org/packages/ad/85/e3cd7bf4237af134a90290c8e37bf7f786c5e58b9ff98eeb0495615e3985/botocore-1.38.44-py3-none-any.whl", hash = "sha256:d0171ac6ec0bfdf86083b41c801f212e2b2d5756a61ea1d45af2051f21dbf886", size = 13710700, upload-time = "2025-06-25T19:27:23.645Z" },
] ]
[[package]] [[package]]
@ -156,7 +156,7 @@ dependencies = [
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "browser-use", extras = ["memory"], specifier = "==0.2.7" }, { name = "browser-use", extras = ["memory"], specifier = "==0.2.7" },
{ name = "patchright", specifier = ">=1.52.5" }, { name = "patchright", specifier = "==1.52.5" },
] ]
[[package]] [[package]]
@ -809,7 +809,7 @@ wheels = [
[[package]] [[package]]
name = "mem0ai" name = "mem0ai"
version = "0.1.110" version = "0.1.111"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "openai" }, { name = "openai" },
@ -819,9 +819,9 @@ dependencies = [
{ name = "qdrant-client" }, { name = "qdrant-client" },
{ name = "sqlalchemy" }, { name = "sqlalchemy" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/8d/fd/95c6285ad55a5fb78df17f15b5710273d59ae687c6ff79dcd03acb15e24f/mem0ai-0.1.110.tar.gz", hash = "sha256:8a9b6f45c2c4e5d97ce1aa096dc85991cd657acccde796422b65a52089ca7fcb", size = 107869, upload-time = "2025-06-20T15:01:56.754Z" } sdist = { url = "https://files.pythonhosted.org/packages/2d/93/ff302f96e02b5ac80a1ad18b94617985296f78aee212f86d83cba1c2a1a5/mem0ai-0.1.111.tar.gz", hash = "sha256:cc4b1a20cd4fd3b980cca4fd9f77ee4c9cff81b92e6f4d30014fd900dce59bba", size = 108299, upload-time = "2025-06-23T16:23:19.642Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/8a/d6/3d67909445682f5e73e910b187fc64ff84643709ac1956240d8a3834b1bd/mem0ai-0.1.110-py3-none-any.whl", hash = "sha256:4f69df6e633200b9d1b0177f82eaa96bf70a446aee8f40e56eedb67403f14395", size = 166820, upload-time = "2025-06-20T15:01:54.864Z" }, { url = "https://files.pythonhosted.org/packages/2a/f5/185c88df177d0d9ae1226cc1ae75a2b2480280521a5c7690f1ca6a54b6af/mem0ai-0.1.111-py3-none-any.whl", hash = "sha256:53e8ce3551ffe1454b6e28ba90a8a88907280a9052edfeb872241662a4707f14", size = 168161, upload-time = "2025-06-23T16:23:18.146Z" },
] ]
[[package]] [[package]]