| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 异步批量抓取 E-H 画廊图片链接,按专辑保存 json
- python eh_crawler.py
- """
- from __future__ import annotations
- import asyncio
- import json
- import logging
- import re
- import sys
- from pathlib import Path
- from typing import Dict, List, Optional, Tuple
- import aiofiles
- import httpx
- from bs4 import BeautifulSoup
- from tqdm.asyncio import tqdm_asyncio
- from aiopath import AsyncPath
- # -------------------- 可配置常量 --------------------
- CONCURRENCY = 20 # 并发页数
- MAX_PAGE = 100 # 单专辑最大翻页
- RETRY_PER_PAGE = 5 # 单页重试
- TIMEOUT = httpx.Timeout(10.0) # 请求超时
- PROXY = "http://127.0.0.1:7890" # 科学上网代理,不需要留空
- IMG_SELECTOR = "#gdt" # 图片入口区域
- FAILED_RECORD = "failed_keys.json"
- LOG_LEVEL = logging.INFO
- # ----------------------------------------------------
- logging.basicConfig(
- level=LOG_LEVEL,
- format="[%(asctime)s] [%(levelname)s] %(message)s",
- handlers=[
- logging.StreamHandler(sys.stdout),
- logging.FileHandler("crawl.log", encoding="utf-8"),
- ],
- )
- log = logging.getLogger("eh_crawler")
- # 预编译正则
- ILLEGAL_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1F]')
- # -------------------- 工具函数 --------------------
- def clean_folder_name(title: str) -> str:
- """清洗文件夹名"""
- return ILLEGAL_CHARS.sub("_", title).replace(" ", "").replace("_", "").strip() or "gallery"
- def load_targets() -> List[str]:
- """读取 targets.txt"""
- tgt = Path("targets.txt")
- if not tgt.exists():
- log.error("targets.txt 不存在,已自动创建,请先填写 URL")
- tgt.touch()
- sys.exit(0)
- lines = [ln.strip() for ln in tgt.read_text(encoding="utf-8").splitlines() if ln.strip()]
- if not lines:
- log.error("targets.txt 为空,请先填写 URL")
- sys.exit(0)
- return list(set(lines)) # 去重
- def load_failed() -> List[str]:
- if Path(FAILED_RECORD).exists():
- try:
- return json.loads(Path(FAILED_RECORD).read_text(encoding="utf-8"))
- except Exception as exc:
- log.warning(f"加载失败记录失败 -> {exc}")
- return []
- def save_failed(keys: List[str]) -> None:
- Path(FAILED_RECORD).write_text(json.dumps(keys, ensure_ascii=False, indent=2), encoding="utf-8")
- # -------------------- 爬虫核心 --------------------
- async def fetch_page(client: httpx.AsyncClient, url: str) -> Optional[str]:
- """获取单页 HTML"""
- for attempt in range(1, RETRY_PER_PAGE + 1):
- try:
- resp = await client.get(url)
- resp.raise_for_status()
- return resp.text
- except httpx.HTTPError as exc:
- log.error(f"[{attempt}/{RETRY_PER_PAGE}] 请求失败 {url} -> {exc}")
- await asyncio.sleep(2 ** attempt)
- return None
- async def crawl_single_gallery(
- client: httpx.AsyncClient, sem: asyncio.Semaphore, gallery_url: str
- ) -> bool:
- """抓取单个画廊,成功返回 True"""
- async with sem:
- base_url = gallery_url.rstrip("/")
- key = base_url.split("/")[-1] # 用最后一截当 key
- json_name = f"{key}.json"
- folder_path: Optional[AsyncPath] = None
- json_data: Dict[str, str] = {}
- img_count = 1
- last_page = False
- for page in range(MAX_PAGE):
- if last_page:
- break
- url = f"{base_url}?p={page}"
- html = await fetch_page(client, url)
- if html is None:
- continue
- soup = BeautifulSoup(html, "lxml")
- title = soup.title.string if soup.title else "gallery"
- clean_title = clean_folder_name(title)
- folder_path = AsyncPath("downloads") / clean_title
- await folder_path.mkdir(parents=True, exist_ok=True)
- # 如果 json 已存在则跳过整个画廊
- json_path = folder_path / json_name
- if await json_path.exists():
- log.info(f"{json_name} 已存在,跳过")
- return True
- log.info(f"当前页码:{page + 1} {url}")
- selected = soup.select_one(IMG_SELECTOR)
- if not selected:
- log.warning(f"未找到选择器 {IMG_SELECTOR}")
- continue
- links = re.findall(r'<a href="(.*?)"', selected.prettify())
- if not links:
- log.info("本页无图片入口,视为最后一页")
- last_page = True
- continue
- for img_entry in links:
- if img_entry in json_data.values():
- last_page = True
- break
- json_data[f"{img_count:04d}"] = img_entry
- img_count += 1
- if json_data:
- await json_path.write_text(
- json.dumps(json_data, ensure_ascii=False, indent=2), encoding="utf-8"
- )
- log.info(f"保存成功 -> {json_path} ({len(json_data)} 张)")
- return True
- else:
- log.warning(f"{key} 未解析到任何图片链接")
- return False
- # -------------------- 主流程 --------------------
- async def main() -> None:
- targets = load_targets()
- failed = load_failed()
- if failed:
- log.info(f"优先重试上次失败画廊: {len(failed)} 个")
- all_urls = list(set(targets + failed))
- proxy = PROXY if PROXY else None
- limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
- async with httpx.AsyncClient(
- limits=limits, timeout=TIMEOUT, proxies=proxy, verify=True
- ) as client:
- sem = asyncio.Semaphore(CONCURRENCY)
- results = await tqdm_asyncio.gather(
- *[crawl_single_gallery(client, sem, u) for u in all_urls],
- desc="Galleries",
- total=len(all_urls),
- )
- # 失败持久化
- new_failed = [u for u, ok in zip(all_urls, results) if not ok]
- if new_failed:
- save_failed(new_failed)
- log.warning(f"本轮仍有 {len(new_failed)} 个画廊失败,已写入 {FAILED_RECORD}")
- else:
- Path(FAILED_RECORD).unlink(missing_ok=True)
- log.info("全部画廊抓取完成!")
- if __name__ == "__main__":
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- log.info("用户中断,抓取结束")
|