Merge pull request #4 from j93es/imnyang

프록시와 통신하게 구현
This commit is contained in:
James 2025-06-07 15:16:19 +09:00 committed by GitHub
commit 714c0ebea7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 52542 additions and 110 deletions

View file

@ -1,9 +1,10 @@
ANONYMIZED_TELEMETRY=false
GOOGLE_API_KEY=
GOOGLE_MODEL=gemini-2.5-flash-preview-05-20 # 권장 (다른 모델로 교체 가능) [다른 모델로 교체시 성능 보장 불가]
GOOGLE_PLANNER_MODEL=gemini-2.5-flash-preview-05-20 # 권장 (다른 모델로 교체 가능) [다른 모델로 교체시 성능 보장 불가]
# 권장 (다른 모델로 교체 가능) [다른 모델로 교체시 성능 보장 불가]
GOOGLE_MODEL=gemini-2.5-flash-preview-05-20
GOOGLE_PLANNER_MODEL=gemini-2.5-flash-preview-05-20
# 선택
PROXY_HOST=127.0.0.1
PROXY_PORT=8080
PROXY_PORT=11080

2
.gitignore vendored
View file

@ -6,6 +6,8 @@ dist/
wheels/
*.egg-info
oauth_providers.csv
# Virtual environments
.venv

View file

@ -1,10 +1,11 @@
# 테스트한 영상
https://f.imnya.ng/.whs/teamproject/
# 참고하면 좋을만한 것
- [ ] 일부 웹사이트는 사용자의 언어에 따라 OAuth 옵션을 바꾸기도 합니다.
- [ ] https://docs.browser-use.com/customize/custom-functions
https://f.imnya.ng/.whs/tp-domains/data/domains/latest.txt
이거 도메인 리스트 HTML만 필터링 해둔거니까 이거 쓰세요.
# 환경 설정
이 프로젝트는 [uv](https://docs.astral.sh/uv/getting-started/installation/)라는 Python 패키지 관리자를 사용하여 설정해야합니다.
@ -14,6 +15,7 @@ uv 설치 후 다음과 같은 명령어를 입력합니다.
```
uv sync
```
venv와 패키지가 설치가 됩니다.
browser_use가 Playwright에 대한 의존성이 있어 브라우저 설치가 필요합니다
@ -29,20 +31,24 @@ uv run main.py
```
Environment에는 다음과 같은 값이 들어갑니다.
```
ANONYMIZED_TELEMETRY=false
OPENAI_API_KEY=your_openai_api_key_here
OPENAI_BASE_URL=https://models.github.ai/inference # 선택
OPENAI_MODEL=openai/gpt-4o-mini # Github Models가 아닐시 gpt-4.1
GOOGLE_API_KEY=your_openai_api_key_here
GOOGLE_MODEL=gemini-2.5-flash-preview-04-17
GOOGLE_PLANNER_MODEL=gemini-2.0-flash-lite
# 선택
PROXY_HOST=127.0.0.1
PROXY_PORT=8080
```
`OPENAI_BASE_URL`은 GitHub Models가 아닐시 비워둡니다.
`OPENAI_MODEL`은 GitHub Models가 아닐시 `openai/`를 제거합니다.
`PROXY_HOST``PROXY_PORT`는 만약 Caido를 사용 중일 시 환경에 맞게 설정 후 설정합니다.
# 실행
```sh
# ./run.sh {domains.txt 시작 줄} {domains.txt 끝 줄}
./run.sh 12540 13000
```

16012
domains.txt Normal file

File diff suppressed because it is too large Load diff

71
is-html-fast/.gitignore vendored Normal file
View file

@ -0,0 +1,71 @@
# Generated by Cargo
# will have compiled files and executables
debug/
target/
# These are backup files generated by rustfmt
**/*.rs.bk
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
# Generated by cargo mutants
# Contains mutation testing data
**/mutants.out*/
# RustRover
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# General
.DS_Store
.AppleDouble
.LSOverride
Icon[]
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
# Windows thumbnail cache files
Thumbs.db
Thumbs.db:encryptable
ehthumbs.db
ehthumbs_vista.db
# Dump file
*.stackdump
# Folder config file
[Dd]esktop.ini
# Recycle Bin used on file shares
$RECYCLE.BIN/
# Windows Installer files
*.cab
*.msi
*.msix
*.msm
*.msp
# Windows shortcuts
*.lnk

1581
is-html-fast/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

8
is-html-fast/Cargo.toml Normal file
View file

@ -0,0 +1,8 @@
[package]
name = "is-html-fast"
version = "0.1.0"
edition = "2024"
[dependencies]
rayon = "1.10.0"
reqwest = { version = "0.12.19", features = ["blocking", "json"]}

2
is-html-fast/README.md Normal file
View file

@ -0,0 +1,2 @@
실제로 사용되진 않습니다.
일회용 코드입니다.

34377
is-html-fast/domains.txt Normal file

File diff suppressed because it is too large Load diff

92
is-html-fast/src/main.rs Normal file
View file

@ -0,0 +1,92 @@
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::sync::atomic::{AtomicUsize, Ordering};
use rayon::prelude::*;
use reqwest::blocking::Client;
use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE, USER_AGENT, ACCEPT, ACCEPT_LANGUAGE, ACCEPT_ENCODING, CONNECTION, UPGRADE_INSECURE_REQUESTS};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let input_file = File::open("domains.txt")?;
let reader = BufReader::new(input_file);
let domains: Vec<String> = reader.lines().filter_map(Result::ok).collect();
let total_count = domains.len();
let counter = Arc::new(AtomicUsize::new(0));
let html_count = Arc::new(AtomicUsize::new(0));
let failed_count = Arc::new(AtomicUsize::new(0));
let non_html_count = Arc::new(AtomicUsize::new(0));
let output_file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open("domains-filtered.txt")?;
let output = Arc::new(Mutex::new(output_file));
// 브라우저 헤더 세팅
let mut headers = HeaderMap::new();
headers.insert(USER_AGENT, HeaderValue::from_static("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:124.0) Gecko/20100101 Firefox/124.0"));
headers.insert(ACCEPT, HeaderValue::from_static("text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8"));
headers.insert(ACCEPT_LANGUAGE, HeaderValue::from_static("ko,en-US;q=0.7,en;q=0.3"));
headers.insert(ACCEPT_ENCODING, HeaderValue::from_static("gzip, deflate, br"));
headers.insert(CONNECTION, HeaderValue::from_static("keep-alive"));
headers.insert(UPGRADE_INSECURE_REQUESTS, HeaderValue::from_static("1"));
let client = Arc::new(
Client::builder()
.timeout(Duration::from_secs(5))
.default_headers(headers)
.build()?,
);
domains.par_iter().for_each(|domain| {
let current = counter.fetch_add(1, Ordering::SeqCst) + 1;
let url = format!("https://{}", domain);
let response = client.get(&url).send();
match response {
Ok(resp) => {
if let Some(content_type) = resp.headers().get(CONTENT_TYPE) {
if let Ok(content_type_str) = content_type.to_str() {
if content_type_str.starts_with("text/html") {
if let Ok(mut file) = output.lock() {
writeln!(file, "{}", domain).ok();
}
html_count.fetch_add(1, Ordering::SeqCst);
println!("[{}/{}] ✅ HTML: {}", current, total_count, domain);
} else {
non_html_count.fetch_add(1, Ordering::SeqCst);
println!("[{}/{}] ❌ Not HTML: {} ({})", current, total_count, domain, content_type_str);
}
}
} else {
non_html_count.fetch_add(1, Ordering::SeqCst);
println!("[{}/{}] ❌ No Content-Type: {}", current, total_count, domain);
}
}
Err(_) => {
failed_count.fetch_add(1, Ordering::SeqCst);
println!("[{}/{}] ⚠️ Failed to connect: {}", current, total_count, domain);
}
}
});
// Final results
let html_final = html_count.load(Ordering::SeqCst);
let failed_final = failed_count.load(Ordering::SeqCst);
let non_html_final = non_html_count.load(Ordering::SeqCst);
println!("\n=== Final Results ===");
println!("📊 Total domains: {}", total_count);
println!("✅ HTML domains: {} ({:.1}%)", html_final, (html_final as f64 / total_count as f64) * 100.0);
println!("❌ Non-HTML domains: {} ({:.1}%)", non_html_final, (non_html_final as f64 / total_count as f64) * 100.0);
println!("⚠️ Failed connections: {} ({:.1}%)", failed_final, (failed_final as f64 / total_count as f64) * 100.0);
println!("💾 HTML domains saved to: domains-filtered.txt");
Ok(())
}

View file

@ -15,12 +15,15 @@ def browser_config_kwargs(lang: str = "en_US") -> dict[str, Any]:
"--disable-features=IsolateOrigins,site-per-process",
"--disable-popup-blocking",
f"--lang={lang}",
"--ignore-certificate-errors"
],
}
proxy_host = os.getenv("PROXY_HOST")
proxy_port = os.getenv("PROXY_PORT")
if proxy_host and proxy_port:
browser_config_kwargs["proxy"] = {"server": f"http://{proxy_host}:{proxy_port}"}
browser_config_kwargs["extra_browser_args"].append(
f"--proxy-server=http={proxy_host}:{proxy_port};https={proxy_host}:{proxy_port}"
)
return browser_config_kwargs

36
lib/is_html.py Normal file
View file

@ -0,0 +1,36 @@
import requests
def is_html_url(url: str, timeout: float = 10.0) -> bool:
"""
주어진 URL에 HEAD 요청을 보내고, 응답 헤더의 Content-Type이 HTML인지 확인합니다.
- url: 검사할 URL 문자열
- timeout: 요청 타임아웃( 단위)
반환값:
- Content-Type이 'text/html' 시작하면 True, 그렇지 않으면 False
"""
try:
with requests.get(url, timeout=timeout, stream=True) as response:
# 응답 코드가 200번대가 아니면 False로 간주
if not response.ok:
return False
content_type = response.headers.get('Content-Type', '')
# Content-Type에 'text/html'이 포함되어 있으면 HTML로 간주
return content_type.lower().startswith('text/html')
except requests.RequestException:
return False
if __name__ == '__main__':
test_urls = [
'https://www.example.com',
'https://api.github.com', # JSON API라서 HTML이 아닐 확률이 높음
'https://raw.githubusercontent.com' # 텍스트 파일 등 다양한 타입
]
for url in test_urls:
if is_html_url(url):
print(f"[HTML] {url}")
else:
print(f"[Not HTML] {url}")

36
lib/read_txt.py Normal file
View file

@ -0,0 +1,36 @@
def read_lines_between(filepath: str, start_line: int, end_line: int) -> list[str]:
"""
파일에서 start_line번 줄부터 end_line번 줄까지 읽어와
줄을 요소로 갖는 리스트를 반환하는 함수.
Parameters:
----------
filepath : str
읽을 텍스트 파일의 경로
start_line : int
읽기 시작할 번호 (1부터 시작)
end_line : int
읽을 마지막 번호 (start_line <= end_line)
Returns:
-------
list[str]
줄을 문자열로 저장한 리스트.
파일에 해당 범위의 줄이 없으면 가능한 만큼만 반환.
"""
if start_line < 1 or end_line < start_line:
raise ValueError("start_line은 1 이상이어야 하며, end_line은 start_line 이상이어야 합니다.")
selected_lines: list[str] = []
with open(filepath, 'r', encoding='utf-8') as f:
for idx, line in enumerate(f, start=1):
if idx < start_line:
# 아직 읽기 시작 전
continue
if idx > end_line:
# 읽을 범위를 벗어났으므로 중단
break
# 줄 끝의 개행 문자를 제거하고 리스트에 추가
selected_lines.append(line.rstrip('\n'))
return selected_lines

310
main.py
View file

@ -1,6 +1,9 @@
import asyncio
import json
import os
import csv
import argparse
import requests
from typing import List
from dotenv import load_dotenv
from pydantic import BaseModel
@ -8,38 +11,21 @@ from langchain_google_genai import ChatGoogleGenerativeAI
from browser_use import Agent, Browser, BrowserConfig, Controller
from browser_use.browser.context import BrowserContext, BrowserContextConfig
from lib.browser_config import browser_config_kwargs
import csv
from lib.is_html import is_html_url
from lib.read_txt import read_lines_between
load_dotenv()
# Check environment variables
if os.getenv("GOOGLE_API_KEY") is None:
raise ValueError("OPENAI_API_KEY environment variable not set.")
raise ValueError("GOOGLE_API_KEY 환경변수가 설정되지 않았습니다.")
if os.getenv("GOOGLE_MODEL") is None:
raise ValueError("OPENAI_MODEL environment variable not set.")
raise ValueError("GOOGLE_MODEL 환경변수가 설정되지 않았습니다.")
if os.getenv("GOOGLE_PLANNER_MODEL") is None:
raise ValueError("OPENAI_PLANNER_MODEL environment variable not set.")
raise ValueError("GOOGLE_PLANNER_MODEL 환경변수가 설정되지 않았습니다.")
# Configure browser
browser = Browser(
config=BrowserConfig(**browser_config_kwargs())
)
backend_url = os.getenv("BACKEND_URL", "http://localhost:4040")
# Set browser context
context = BrowserContext(
browser=browser,
config=BrowserContextConfig(
wait_for_network_idle_page_load_time=3.0,
window_width=1600,
window_height=900,
locale='en-US',
highlight_elements=True,
viewport_expansion=500,
keep_alive=True
)
)
# Output model: each result is one OAuth entry with metadata
# 출력 모델
class OAuth(BaseModel):
provider: str
oauth_uri: str
@ -47,51 +33,116 @@ class OAuth(BaseModel):
class OAuthList(BaseModel):
oauth_providers: List[OAuth]
controller = Controller(output_model=OAuthList)
# Controller는 매번 새로 생성해도 무방합니다.
def make_controller():
return Controller(output_model=OAuthList)
# Extended planner prompt
extend_planner_system_message = """
🎯 Your mission is to collect the real OAuth login URLs from the website.
🎯 Mission: Collect Initial SSO Redirect URLs (For Browser Automation)
1. First, go to the websites **login page**.
2. On the login page, look for OAuth login buttons. These usually say things like **"Continue with Google"**, **"Sign in with GitHub"**, etc.
3. **DO NOT collect or include "Passkey"** it is NOT an OAuth provider.
**절대로 구글 검색, Bing 검색 어떤 외부 검색 기능도 사용하지 말고, 주어진 로그인 페이지 URL을 직접 방문하여 탐색하세요.**
---
0. **초기 블록(Block) 체크**
- 브라우저가 로그인 페이지에 접근하려 , **페이지가 차단(blocked)** 되거나 **방화벽, CAPTCHA, 접근 제한** 등으로 인해 정상적으로 로드되지 않으면 즉시 프로세스를 종료하고 아래 JSON만 반환해야 합니다.
```json
[
{
"provider": "Blocked",
"oauth_uri": "-"
}
]
```
- 이후 단계로 절대 넘어가지 않도록 합니다.
For EACH OAuth button you find:
1. **로그인 페이지 탐색**
- **클라이언트(비엔터프라이즈) 로그인 페이지** 직접 이동합니다. (검색 엔진을 사용하여 찾아서는 됩니다.)
- 접근 **개인정보/쿠키/동의 팝업** 뜨면, 이를 반드시 **닫거나(Dismiss)** 처리하고 계속 진행합니다.
- (이미 0단계에서 블록 여부를 확인했으므로, 단계에서는 페이지가 정상 로드되었다고 가정합니다.)
- **Try opening it in a new tab**. If it redirects to an OAuth URL (e.g. `https://accounts.google.com/...`, `https://github.com/login/oauth/...`), copy that **exact final URL**.
- If it **doesnt open in a new tab**, **click the button** and wait for the redirect to happen.
- As soon as you see the redirected URL with **client_id**, **redirect_uri**, etc., copy that **entire URL without changing or hiding anything**.
- Then come back to the original tab (if needed) and continue with the next provider.
2. **SSO 버튼 식별**
- 로그인 페이지에서 다음과 같은 소셜 로그인 버튼을 찾습니다:
- Continue with Google
- Sign in with GitHub
- Login with Naver
- **실제 SSO 버튼**임이 명확히 확인되는 경우에만 진행합니다.
- 제외 대상:
- Passkey 관련 버튼
- 아이디/비밀번호 입력란
- 이메일 기반 로그인
- 인증서, 휴대폰 인증 -OAuth 로그인 옵션
---
3. **리디렉션 URL 캡처**
- 유효한 SSO 버튼을 하나 이상 찾았다면, 각각의 버튼을 ** 탭으로 열기** 시도하거나, 불가능할 경우 **직접 클릭**합니다.
- 클릭 번째로 **리디렉션된 URL(쿼리 스트링 포함)** 캡처합니다. URL은:
- 예시: `https://example.com/auth/google?include_all_params=...`
- **OAuth 공급자 자체 엔드포인트** (: `https://accounts.google.com/...`) 수집하지 않습니다.
- 만약 **반복 행동(looping)** 감지될 경우(: 동일한 버튼을 여러 열거나 페이지 반복 이동), 즉시 프로세스를 종료하고 ** 배열** 반환합니다:
```json
[]
```
- 정상적으로 리디렉션 URL을 획득했다면, 아래 형식으로 결과를 수집합니다:
```json
[
{
"provider": "Google",
"oauth_uri": "https://example.com/auth/google?include_all_params=..."
},
{
"provider": "GitHub",
"oauth_uri": "https://example.com/auth/github?include_all_params=..."
}
]
```
💡 **Do not guess** the OAuth URLs only collect them by actually interacting with the buttons.
🚫 **Do not redact or mask any part** of the URL, including `client_id`, `redirect_uri`, `state`, or any other parameters. Record them exactly as they appear.
Return a list of all OAuth providers and their **full raw redirect URLs** in this exact format:
```json
[
{
"provider": "Google",
"oauth_uri": "https://accounts.google.com/o/oauth2/v2/auth?client_id=...&redirect_uri=...&...",
},
{
"provider": "GitHub",
"oauth_uri": "https://github.com/login/oauth/authorize?client_id=...&redirect_uri=...",
}
]
```
4. **SSO 버튼 미발견 또는 오류 발생 **
- 페이지 내부에 유효한 SSO 버튼이 전혀 없거나, 탐색 예기치 않은 오류가 발생하면 즉시 프로세스를 종료하고 ** 배열** 반환합니다:
```json
[]
```
"""
# Main async runner
async def main():
url = "https://git.imnya.ng"
# ── URL별로 Browser를 새로 띄우는 함수 ──
async def scan_one_url(url: str, skip_html_check: bool = False):
# 1) URL이 HTML 페이지인지 확인
if not is_html_url(url) and not skip_html_check:
print(f"{url} 은(는) HTML이 아닙니다. 스킵합니다.")
return
target_url = url if url.startswith("http") else f"https://{url}"
print(f"🚀 Starting scan for: {target_url}")
# Backend에 스캔 시작을 알림
try:
response = requests.post(f"{backend_url}/start", params={"url": target_url}, timeout=5)
if response.status_code == 200:
print(f"✅ Backend notified: {response.text}")
else:
print(f"⚠️ Backend notification failed: {response.status_code}")
except requests.exceptions.ConnectionError:
print(f"⚠️ Backend server not available at {backend_url}. Continuing without notification.")
except requests.exceptions.Timeout:
print(f"⚠️ Backend notification timed out. Continuing without notification.")
except Exception as e:
print(f"⚠️ Failed to notify backend: {e}")
# 2) Browser + Context 생성
browser = Browser(config=BrowserConfig(**browser_config_kwargs()))
context = BrowserContext(
browser=browser,
config=BrowserContextConfig(
wait_for_network_idle_page_load_time=3.0,
window_width=1600,
window_height=900,
locale='en-US',
highlight_elements=True,
viewport_expansion=500,
keep_alive=False
)
)
# 3) Agent, Controller 생성
controller = make_controller()
agent = Agent(
browser_context=context,
browser=browser,
@ -101,52 +152,115 @@ async def main():
controller=controller,
extend_planner_system_message=extend_planner_system_message,
)
# Run the agent
response = await agent.run()
final_result = response.final_result()
if final_result is None:
raise ValueError("final_result() returned None")
data = json.loads(final_result)
try:
oauth_entries: List[OAuth] = [OAuth(**entry) for entry in data["oauth_providers"]]
except Exception as e:
raise ValueError(f"Failed to parse result: {e}\nRaw result: {final_result}")
# 4) 실제 스캔 실행
response = await agent.run()
final_result = response.final_result()
if final_result is None:
raise ValueError("final_result()가 None을 반환했습니다.")
# Clear terminal
#print("\033c", end="")
print("-" * 20)
data = json.loads(final_result)
try:
oauth_entries: List[OAuth] = [OAuth(**entry) for entry in data["oauth_providers"]]
except Exception as e:
raise ValueError(f"결과 파싱 실패: {e}\n원본 결과: {final_result}")
print(f"Raw result: {final_result}")
print(f"🔗 Scanned URL: {url}\n")
print("🔐 Detected OAuth Providers and URLs:")
for entry in oauth_entries:
if "<" in entry.oauth_uri or "..." in entry.oauth_uri:
print(f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n")
else:
print(f"- {entry.provider}: {entry.oauth_uri}")
# Save the result to CSV (append mode, so you can continue later)
# 이거 좀 이상한데 나중에 고쳐야 할듯 파일이 수정이 안됨
csv_file = "oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri"])
# 5) 결과 출력
print("-" * 50)
print(f"🔗 Scanned URL: {url}\n")
print("🔐 Detected OAuth Providers and URLs:")
for entry in oauth_entries:
writer.writerow([url, entry.provider, entry.oauth_uri])
print(f"\n✅ OAuth providers saved to {csv_file}")
if "<" in entry.oauth_uri or "..." in entry.oauth_uri:
print(f"⚠️ WARNING: {entry.provider} URL may be masked or incomplete:\n{entry.oauth_uri}\n")
else:
print(f"- {entry.provider}: {entry.oauth_uri}")
print("-" * 50)
# Save the result to JSON
with open(f"oauth_providers_{url}.json", "w") as f:
json.dump(data, f, indent=2)
print(f"✅ OAuth providers saved to oauth_providers_{url}.json")
# 6) CSV에 저장 (append)
csv_file = "./oauth_providers.csv"
file_exists = os.path.isfile(csv_file)
with open(csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["issuer", "provider", "oauth_uri"])
for entry in oauth_entries:
writer.writerow([url, entry.provider, entry.oauth_uri])
print(f"✅ OAuth providers saved to {csv_file}\n")
# 7) Agent와 Browser 닫기
await agent.close() # Agent 내부 작업 정리
await context.close() # 브라우저 컨텍스트 종료 (탭/세션 닫기)
await browser.close() # 실제 브라우저 프로세스 종료
except Exception as e:
print(f"❌ Error scanning {url}: {e}")
# 에러 발생 시에도 Agent와 Browser는 닫아야 합니다.
await agent.close()
await context.close()
await browser.close()
async def loop(filepath: str, start_line: int, end_line: int, skip_html_check: bool = False):
# 인자값으로 받은 파일 경로와 줄 범위를 통해 도메인 리스트 생성
target_list = read_lines_between(
filepath=filepath,
start_line=start_line,
end_line=end_line
)
# (필요하다면) 강제 설정이 필요한 경우, 아래 주석을 해제하여 target_list[0] 등을 덮어쓸 수 있습니다.
# target_list[0] = "velog.io"
for url in target_list:
# scan_one_url은 외부에 정의된 비동기 함수라고 가정합니다.
# 실제로 scan_one_url이 정의된 위치를 import하거나
# 모듈 수준에 구현해두셔야 합니다.
await scan_one_url(f'http://{url}', skip_html_check=skip_html_check)
# Run it
asyncio.run(main())
def main():
parser = argparse.ArgumentParser(
prog="domain_scanner",
description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다."
)
# 커맨드라인 인자로 받을 옵션들 정의
parser.add_argument(
"-f", "--file",
type=str,
required=True,
help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)"
)
parser.add_argument(
"-s", "--start",
type=int,
required=True,
help="읽기 시작 줄 번호 (1-based)"
)
parser.add_argument(
"-e", "--end",
type=int,
required=True,
help="읽기 종료 줄 번호 (1-based)"
)
parser.add_argument(
"-skh", "--skip-html-check",
type=bool,
default=False,
help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다. (기본값: False)"
)
args = parser.parse_args()
# 인자값을 비동기 함수에 전달
asyncio.run(loop(
filepath=args.file,
start_line=args.start,
end_line=args.end,
skip_html_check=args.skip_html_check
))
if __name__ == "__main__":
main()

48
run.ps1
View file

@ -1 +1,47 @@
uv run ./main.py
# ── 설정 부분 ──
# 실행할 Python 스크립트 이름 (파일 확장자까지)
$PYTHON_SCRIPT = "main.py"
# 도메인 목록 파일 경로 (Python 스크립트 실행 시 -f 옵션에 전달)
$DOMAIN_FILE = "./domains.txt"
# 몇 줄씩(chunk) 나눠서 실행할지
$CHUNK_SIZE = 10
# ─────────────
# 인자 개수 확인 (2개 또는 3개)
if ($args.Count -lt 2 -or $args.Count -gt 3) {
Write-Host "Usage: $($MyInvocation.MyCommand.Name) <start_line> <end_line> [skip_header]"
Write-Host "예시) $($MyInvocation.MyCommand.Name) 10000 11000"
Write-Host "예시) $($MyInvocation.MyCommand.Name) 10000 11000 True"
exit 1
}
$START_LINE = [int]$args[0]
$END_LINE = [int]$args[1]
$SKIP_HEADER = if ($args.Count -eq 3) { $args[2] } else { "False" }
# START_LINE부터 END_LINE까지 CHUNK_SIZE 만큼씩 반복
$current = $START_LINE
while ($current -le $END_LINE) {
# 각 청크 구간의 마지막 줄 계산
$chunk_end = $current + $CHUNK_SIZE - 1
if ($chunk_end -gt $END_LINE) {
$chunk_end = $END_LINE
}
$timestamp = Get-Date -Format "yyyy-MM-dd HH:mm:ss"
Write-Host "[$timestamp] Processing lines $current to $chunk_end..."
# Python 스크립트 실행
# -f DOMAIN_FILE: 도메인 목록 파일 경로
# -s current : 읽기 시작 줄
# -e chunk_end: 읽기 끝 줄
# -skh SKIP_HEADER: 헤더 스킵 여부
uv run $PYTHON_SCRIPT -f $DOMAIN_FILE -s $current -e $chunk_end -skh $SKIP_HEADER
# 다음 청크의 시작 값 설정
$current = $chunk_end + 1
}
Write-Host "모든 청크 처리 완료."

45
run.sh Executable file
View file

@ -0,0 +1,45 @@
#!/bin/bash
# ── 설정 부분 ──
# 실행할 Python 스크립트 이름 (파일 확장자까지)
PYTHON_SCRIPT="main.py"
# 도메인 목록 파일 경로 (Python 스크립트 실행 시 -f 옵션에 전달)
DOMAIN_FILE="./domains.txt"
# 몇 줄씩(chunk) 나눠서 실행할지
CHUNK_SIZE=10
# ─────────────
# 인자 개수 확인
if [ $# -ne 2 ]; then
echo "Usage: $0 <start_line> <end_line>"
echo "예시) $0 10000 11000"
exit 1
fi
START_LINE=$1
END_LINE=$2
# START_LINE부터 END_LINE까지 CHUNK_SIZE 만큼씩 반복
current=$START_LINE
while [ "$current" -le "$END_LINE" ]; do
# 각 청크 구간의 마지막 줄 계산
chunk_end=$(( current + CHUNK_SIZE - 1 ))
if [ "$chunk_end" -gt "$END_LINE" ]; then
chunk_end=$END_LINE
fi
echo "[$(date '+%Y-%m-%d %H:%M:%S')] Processing lines ${current} to ${chunk_end}..."
# Python 스크립트 실행
# -f DOMAIN_FILE: 도메인 목록 파일 경로
# -s current : 읽기 시작 줄
# -e chunk_end: 읽기 끝 줄
# -skh True False: 추가 옵션
uv run "$PYTHON_SCRIPT" -f "$DOMAIN_FILE" -s "$current" -e "$chunk_end" -skh $3
# 다음 청크의 시작 값 설정
current=$(( chunk_end + 1 ))
done
echo "모든 청크 처리 완료."