step1.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 异步批量抓取 E-H 画廊图片链接,按专辑保存 json
  5. python eh_crawler.py
  6. """
  7. from __future__ import annotations
  8. import asyncio
  9. import json
  10. import logging
  11. import os
  12. import re
  13. import sys
  14. from pathlib import Path
  15. from typing import Dict, List, Optional
  16. import httpx
  17. from bs4 import BeautifulSoup
  18. from tqdm.asyncio import tqdm_asyncio
  19. from aiopath import AsyncPath
  20. # -------------------- 可配置常量 --------------------
  21. CONCURRENCY = 20 # 并发页数
  22. MAX_PAGE = 100 # 单专辑最大翻页
  23. RETRY_PER_PAGE = 5 # 单页重试
  24. TIMEOUT = httpx.Timeout(10.0) # 请求超时
  25. IMG_SELECTOR = "#gdt" # 图片入口区域
  26. FAILED_RECORD = "data/failed_keys.json"
  27. LOG_LEVEL = logging.INFO
  28. # ----------------------------------------------------
  29. if not os.path.exists("data"):
  30. os.mkdir("data")
  31. logging.basicConfig(
  32. level=LOG_LEVEL,
  33. format="[%(asctime)s] [%(levelname)s] %(message)s",
  34. handlers=[
  35. logging.StreamHandler(sys.stdout),
  36. logging.FileHandler("data/crawl.log", encoding="utf-8"),
  37. ],
  38. )
  39. log = logging.getLogger("data/eh_crawler")
  40. # 预编译正则
  41. ILLEGAL_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1F]')
  42. # -------------------- 工具函数 --------------------
  43. def clean_folder_name(title: str) -> str:
  44. """清洗文件夹名"""
  45. return ILLEGAL_CHARS.sub("_", title).replace(" ", "").replace("_", "").strip() or "gallery"
  46. def load_targets() -> List[str]:
  47. """读取 targets.txt"""
  48. tgt = Path("data/targets.txt")
  49. with open(tgt, 'r', encoding='utf-8') as f:
  50. urls = [line.strip() for line in f.readlines() if line.strip()]
  51. lines = []
  52. for ln in tgt.read_text(encoding="utf-8").splitlines():
  53. url = ln.strip()
  54. if url and not url.startswith('#'):
  55. lines.append(url)
  56. if not lines:
  57. log.error("targets.txt 为空,请先填写 URL")
  58. return
  59. return list(set(lines)) # 去重
  60. def load_failed() -> List[str]:
  61. if Path(FAILED_RECORD).exists():
  62. try:
  63. return json.loads(Path(FAILED_RECORD).read_text(encoding="utf-8"))
  64. except Exception as exc:
  65. log.warning(f"加载失败记录失败 -> {exc}")
  66. return []
  67. def save_failed(keys: List[str]) -> None:
  68. Path(FAILED_RECORD).write_text(json.dumps(keys, ensure_ascii=False, indent=2), encoding="utf-8")
  69. # -------------------- 爬虫核心 --------------------
  70. async def fetch_page(client: httpx.AsyncClient, url: str) -> Optional[str]:
  71. """获取单页 HTML"""
  72. for attempt in range(1, RETRY_PER_PAGE + 1):
  73. try:
  74. resp = await client.get(url)
  75. resp.raise_for_status()
  76. return resp.text
  77. except httpx.HTTPError as exc:
  78. log.error(f"[{attempt}/{RETRY_PER_PAGE}] 请求失败 {url} -> {exc}")
  79. await asyncio.sleep(2 ** attempt)
  80. return None
  81. async def crawl_single_gallery(
  82. client: httpx.AsyncClient, sem: asyncio.Semaphore, gallery_url: str
  83. ) -> bool:
  84. """抓取单个画廊,成功返回 True"""
  85. async with sem:
  86. base_url = gallery_url.rstrip("/")
  87. key = base_url.split("/")[-1] # 用最后一截当 key
  88. json_name = f"{key}.json"
  89. folder_path: Optional[AsyncPath] = None
  90. json_data: Dict[str, str] = {}
  91. img_count = 1
  92. last_page = False
  93. for page in range(MAX_PAGE):
  94. if last_page:
  95. break
  96. url = f"{base_url}?p={page}"
  97. html = await fetch_page(client, url)
  98. if html is None:
  99. continue
  100. soup = BeautifulSoup(html, "lxml")
  101. title = soup.title.string if soup.title else "gallery"
  102. clean_title = clean_folder_name(title)
  103. folder_path = AsyncPath("data/downloads") / clean_title
  104. await folder_path.mkdir(parents=True, exist_ok=True)
  105. # 如果 json 已存在则跳过整个画廊
  106. json_path = folder_path / json_name
  107. if await json_path.exists():
  108. log.info(f"{json_name} 已存在,跳过")
  109. return True
  110. log.info(f"当前页码:{page + 1} {url}")
  111. selected = soup.select_one(IMG_SELECTOR)
  112. if not selected:
  113. log.warning(f"未找到选择器 {IMG_SELECTOR}")
  114. continue
  115. links = re.findall(r'<a href="(.*?)"', selected.prettify())
  116. if not links:
  117. log.info("本页无图片入口,视为最后一页")
  118. last_page = True
  119. continue
  120. for img_entry in links:
  121. if img_entry in json_data.values():
  122. last_page = True
  123. break
  124. json_data[f"{img_count:04d}"] = img_entry
  125. img_count += 1
  126. if json_data:
  127. await json_path.write_text(
  128. json.dumps(json_data, ensure_ascii=False, indent=2), encoding="utf-8"
  129. )
  130. log.info(f"保存成功 -> {json_path} ({len(json_data)} 张)")
  131. return True
  132. else:
  133. log.warning(f"{key} 未解析到任何图片链接")
  134. return False
  135. # -------------------- 主流程 --------------------
  136. async def main(proxy: str | None = None) -> None:
  137. targets = load_targets()
  138. failed = load_failed()
  139. if failed:
  140. log.info(f"优先重试上次失败画廊: {len(failed)} 个")
  141. all_urls = list(set(targets + failed))
  142. print(proxy)
  143. limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
  144. async with httpx.AsyncClient(
  145. limits=limits, timeout=TIMEOUT, proxies=proxy, verify=True
  146. ) as client:
  147. sem = asyncio.Semaphore(CONCURRENCY)
  148. results = await tqdm_asyncio.gather(
  149. *[crawl_single_gallery(client, sem, u) for u in all_urls],
  150. desc="Galleries",
  151. total=len(all_urls),
  152. )
  153. # 失败持久化
  154. new_failed = [u for u, ok in zip(all_urls, results) if not ok]
  155. if new_failed:
  156. save_failed(new_failed)
  157. log.warning(f"本轮仍有 {len(new_failed)} 个画廊失败,已写入 {FAILED_RECORD}")
  158. else:
  159. Path(FAILED_RECORD).unlink(missing_ok=True)
  160. log.info("全部画廊抓取完成!")
  161. if __name__ == "__main__":
  162. try:
  163. asyncio.run(main())
  164. except KeyboardInterrupt:
  165. log.info("用户中断,抓取结束")