step2.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 异步批量下载 EH 画廊真实图片
  5. python download_images.py
  6. """
  7. from __future__ import annotations
  8. import asyncio
  9. import json
  10. import logging
  11. import os
  12. import re
  13. import sys
  14. from pathlib import Path
  15. from typing import Dict, List
  16. import aiofiles
  17. import httpx
  18. from pathlib import Path
  19. from tqdm.asyncio import tqdm_asyncio
  20. # -------------------- 可配置常量 --------------------
  21. from config import config
  22. CONCURRENCY = config.concurrency
  23. RETRY_PER_IMG = config.retry_per_image
  24. TIMEOUT = httpx.Timeout(config.image_timeout)
  25. FAILED_RECORD = "data/failed_downloads.json"
  26. LOG_LEVEL = getattr(logging, config.log_level.upper())
  27. # ----------------------------------------------------
  28. # 确保数据目录存在
  29. if not os.path.exists("data"):
  30. os.mkdir("data")
  31. # 使用统一的日志配置
  32. from logger import get_logger
  33. from realtime_logger import realtime_logger
  34. log = get_logger("step2", "download.log")
  35. # 预编译正则
  36. IMG_URL_RE = re.compile(r'<img id="img" src="(.*?)"', re.S)
  37. EXT_RE = re.compile(r"\.(jpg|jpeg|png|gif|webp)$", re.I)
  38. # -------------------- 工具函数 --------------------
  39. def load_failed() -> List[Dict[str, str]]:
  40. if Path(FAILED_RECORD).exists():
  41. try:
  42. return json.loads(Path(FAILED_RECORD).read_text(encoding="utf-8"))
  43. except Exception as exc:
  44. log.warning(f"加载失败记录失败 -> {exc}")
  45. return []
  46. def save_failed(failed: List[Dict[str, str]]) -> None:
  47. Path(FAILED_RECORD).write_text(json.dumps(failed, ensure_ascii=False, indent=2), encoding="utf-8")
  48. # -------------------- 下载核心 --------------------
  49. async def download_one(
  50. client: httpx.AsyncClient, sem: asyncio.Semaphore, item: Dict[str, str]
  51. ) -> bool:
  52. """下载单张图,成功返回 True"""
  53. img_path, img_url = Path(item["img_path"]), item["img_url"]
  54. await sem.acquire()
  55. try:
  56. for attempt in range(1, RETRY_PER_IMG + 1):
  57. try:
  58. # 1. 获取详情页
  59. resp = await client.get(img_url)
  60. resp.raise_for_status()
  61. real_url_match = IMG_URL_RE.search(resp.text)
  62. if not real_url_match:
  63. log.warning(f"未解析到真实图片链接: {img_url}")
  64. return False # <- 这里不会触发 await
  65. real_url = real_url_match.group(1)
  66. # 2. 下载真实图片(流式)
  67. ext_match = EXT_RE.search(real_url)
  68. ext = ext_match.group(1).lower() if ext_match else "jpg"
  69. final_path = img_path.with_suffix(f".{ext}")
  70. if final_path.exists():
  71. log.info(f"已存在,跳过: {final_path.name}")
  72. return True
  73. async with client.stream("GET", real_url) as img_resp:
  74. img_resp.raise_for_status()
  75. final_path.parent.mkdir(parents=True, exist_ok=True)
  76. async with aiofiles.open(final_path, "wb") as fp:
  77. async for chunk in img_resp.aiter_bytes(chunk_size=65536):
  78. await fp.write(chunk)
  79. # log.info(f"[OK] {final_path.name}")
  80. # 发送实时日志
  81. try:
  82. realtime_logger.broadcast_log_sync(f"下载完成: {final_path.name}", "SUCCESS", "step2")
  83. except Exception as e:
  84. log.warning(f"发送实时日志失败: {e}")
  85. return True
  86. except httpx.HTTPStatusError as exc:
  87. if exc.response.status_code == 429:
  88. wait = 2 ** (attempt - 1)
  89. log.warning(f"[429] 等待 {wait}s 后重试({attempt}/{RETRY_PER_IMG})")
  90. await asyncio.sleep(wait)
  91. else:
  92. log.error(f"[HTTP {exc.response.status_code}] {img_url}")
  93. break
  94. except Exception as exc:
  95. log.error(f"[ERROR] {img_url} -> {exc} ({attempt}/{RETRY_PER_IMG})")
  96. await asyncio.sleep(1)
  97. return False
  98. finally:
  99. sem.release()
  100. # -------------------- 扫描待下载列表 --------------------
  101. async def scan_tasks() -> List[Dict[str, str]]:
  102. """扫描 downloads/ 下所有 json,返回待下载列表"""
  103. result = []
  104. root = Path("data/downloads")
  105. if not root.exists():
  106. return result
  107. for json_path in root.rglob("*.json"):
  108. folder = json_path.parent
  109. try:
  110. data: Dict[str, str] = json.loads(json_path.read_text(encoding="utf-8"))
  111. except Exception as exc:
  112. log.warning(f"读取 json 失败 {json_path} -> {exc}")
  113. continue
  114. for img_name, img_url in data.items():
  115. img_path = folder / img_name # 无后缀
  116. # 判断任意后缀是否存在
  117. exists = False
  118. for ext in (".jpg", ".jpeg", ".png", ".gif", ".webp"):
  119. if img_path.with_suffix(ext).exists():
  120. exists = True
  121. break
  122. if not exists:
  123. result.append({"img_path": str(img_path), "img_url": img_url})
  124. return result
  125. # -------------------- 主流程 --------------------
  126. async def main(proxy: str | None = None) -> None:
  127. # 1. 优先重试上次失败
  128. failed_tasks = load_failed()
  129. if failed_tasks:
  130. log.info(f"优先重试上次失败任务: {len(failed_tasks)} 张")
  131. tasks = failed_tasks + await scan_tasks()
  132. if not tasks:
  133. log.info("没有需要下载的图片,收工!")
  134. return
  135. limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
  136. async with httpx.AsyncClient(limits=limits, timeout=TIMEOUT, proxies=proxy, verify=True) as client:
  137. sem = asyncio.Semaphore(CONCURRENCY)
  138. results = await tqdm_asyncio.gather(
  139. *[download_one(client, sem, t) for t in tasks],
  140. desc="Downloading",
  141. total=len(tasks),
  142. )
  143. # 统计 & 持久化新失败
  144. failed_again = [t for t, ok in zip(tasks, results) if not ok]
  145. if failed_again:
  146. save_failed(failed_again)
  147. log.warning(f"本轮仍有 {len(failed_again)} 张下载失败,已写入 {FAILED_RECORD}")
  148. else:
  149. Path(FAILED_RECORD).unlink(missing_ok=True)
  150. log.info("全部下载完成!")
  151. if __name__ == "__main__":
  152. try:
  153. asyncio.run(main())
  154. except KeyboardInterrupt:
  155. log.info("用户中断,下载结束")