import asyncio import argparse import signal from dotenv import load_dotenv from lib.config import BACKEND_URL from lib.utils.notify_backend import notify_backend from lib.utils.is_html import is_html_url from lib.utils.read_txt import read_lines_between from lib.utils.progress_checker import save_progress, load_progress from lib.utils.env_checker import check_env_variables from lib.get_sso_list import get_sso_list load_dotenv() check_env_variables() backend_url = BACKEND_URL # ── URL별로 Browser를 새로 띄우는 함수 ── async def scan_one_url(url: str, skip_html_check: bool = False): target_url = url if url.startswith("http") else f"https://{url}" print(f"🚀 Starting scan for: {target_url}") # 1) URL이 HTML 페이지인지 확인 if not is_html_url(target_url) and not skip_html_check: print(f"❌ {target_url} 은(는) HTML이 아닙니다. 스킵합니다.") return # Backend에 스캔 시작 알림 notify_backend(target_url) print(await get_sso_list(target_url)) current_progress = {"current_index": 0, "total": 0, "current_url": "", "start_line": 0} def signal_handler(signum, frame): """Ctrl+C 시그널 핸들러""" print("\n" + "="*60) print("🛑 스캔이 중단되었습니다!") print(f"📊 진행 상황:") print(f" - 전체: {current_progress['total']}개 URL") print(f" - 완료: {current_progress['current_index']}개 URL") print(f" - 현재 처리 중: {current_progress['current_url']}") print(f" - domains.txt의 {current_progress['current_index']}번째 줄") print(f" - 진행률: {current_progress['current_index']}/{current_progress['total']} ({current_progress['current_index']/current_progress['total']*100:.1f}%)") print("="*60) save_progress(current_progress) exit(0) signal.signal(signal.SIGINT, signal_handler) async def loop( filepath: str, start_line: int, end_line: int, skip_html_check: bool = False ): # 인자값으로 받은 파일 경로와 줄 범위를 통해 도메인 리스트 생성 target_list = read_lines_between( filepath=filepath, start_line=start_line, end_line=end_line ) # 진행 상황 초기화 current_progress["total"] = len(target_list) current_progress["start_line"] = start_line current_progress["current_index"] = 0 # 이전 진행 상황 확인 prev_progress = load_progress() if prev_progress and prev_progress.get("start_line") == start_line: print(f"📋 이전 진행 상황을 발견했습니다:") print(f" - 이전 완료: {prev_progress['current_index']}/{prev_progress['total']}") print(f" - 마지막 처리: {prev_progress.get('current_url', 'N/A')}") resume = input("이어서 진행하시겠습니까? (Y/n): ").lower().strip() if resume != 'n': current_progress["current_index"] = prev_progress["current_index"] target_list = target_list[current_progress["current_index"]:] print(f"✅ {current_progress['current_index']}번째부터 재개합니다.") # (필요하다면) 강제 설정이 필요한 경우, 아래 주석을 해제하여 target_list[0] 등을 덮어쓸 수 있습니다. # target_list[0] = "velog.io" for i, url in enumerate(target_list): actual_index = current_progress["current_index"] + i current_progress["current_url"] = url current_progress["current_index"] = actual_index print(f"\n🔄 Processing {actual_index + 1}/{current_progress['total']}: {url}") print(f"📍 domains.txt의 {start_line + actual_index}번째 줄") await scan_one_url(url, skip_html_check=skip_html_check) # 진행 상황 저장 current_progress["current_index"] = actual_index + 1 save_progress(current_progress) print("⏳ API 쿼터 보호를 위해 10초 대기 중...") await asyncio.sleep(10) print(f"\n🎉 모든 스캔이 완료되었습니다! ({current_progress['total']}개 URL)") def main(): parser = argparse.ArgumentParser( prog="domain_scanner", description="도메인 목록 파일에서 지정한 줄 범위를 읽어 SSO 스캔을 수행합니다." ) # 커맨드라인 인자로 받을 옵션들 정의 parser.add_argument( "-f", "--file", type=str, required=True, help="도메인 목록이 들어 있는 텍스트 파일 경로 (예: ./domains.txt)" ) parser.add_argument( "-s", "--start", type=int, required=True, help="읽기 시작 줄 번호 (1-based)" ) parser.add_argument( "-e", "--end", type=int, required=True, help="읽기 종료 줄 번호 (1-based)" ) parser.add_argument( "-skh", "--skip-html-check", type=bool, default=False, help="HTML 페이지 체크를 건너뛰고 모든 URL을 스캔합니다. (기본값: False)" ) args = parser.parse_args() # 인자값을 비동기 함수에 전달 asyncio.run(loop( filepath=args.file, start_line=args.start, end_line=args.end, skip_html_check=args.skip_html_check )) if __name__ == "__main__": main()