2step.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 异步批量下载 EH 画廊真实图片
  5. python download_images.py
  6. """
  7. from __future__ import annotations
  8. import asyncio
  9. import json
  10. import logging
  11. import re
  12. import sys
  13. from pathlib import Path
  14. from typing import Dict, List, Optional
  15. import aiofiles
  16. import httpx
  17. from aiopath import AsyncPath
  18. from tqdm.asyncio import tqdm_asyncio
  19. # -------------------- 可配置常量 --------------------
  20. CONCURRENCY = 20 # 并发下载数
  21. RETRY_PER_IMG = 3 # 单图重试
  22. TIMEOUT = httpx.Timeout(15.0) # 请求超时
  23. PROXY = "http://127.0.0.1:7890" # 科学上网代理,不需要留空
  24. FAILED_RECORD = "failed_downloads.json"
  25. LOG_LEVEL = logging.INFO
  26. # ----------------------------------------------------
  27. logging.basicConfig(
  28. level=LOG_LEVEL,
  29. format="[%(asctime)s] [%(levelname)s] %(message)s",
  30. handlers=[
  31. logging.StreamHandler(sys.stdout),
  32. logging.FileHandler("download.log", encoding="utf-8"),
  33. ],
  34. )
  35. log = logging.getLogger("img_downloader")
  36. # 预编译正则
  37. IMG_URL_RE = re.compile(r'<img id="img" src="(.*?)"', re.S)
  38. EXT_RE = re.compile(r"\.(jpg|jpeg|png|gif|webp)$", re.I)
  39. # -------------------- 工具函数 --------------------
  40. def load_failed() -> List[Dict[str, str]]:
  41. if Path(FAILED_RECORD).exists():
  42. try:
  43. return json.loads(Path(FAILED_RECORD).read_text(encoding="utf-8"))
  44. except Exception as exc:
  45. log.warning(f"加载失败记录失败 -> {exc}")
  46. return []
  47. def save_failed(failed: List[Dict[str, str]]) -> None:
  48. Path(FAILED_RECORD).write_text(json.dumps(failed, ensure_ascii=False, indent=2), encoding="utf-8")
  49. # -------------------- 下载核心 --------------------
  50. async def download_one(
  51. client: httpx.AsyncClient, sem: asyncio.Semaphore, item: Dict[str, str]
  52. ) -> bool:
  53. """下载单张图,成功返回 True"""
  54. img_path, img_url = Path(item["img_path"]), item["img_url"]
  55. await sem.acquire()
  56. try:
  57. for attempt in range(1, RETRY_PER_IMG + 1):
  58. try:
  59. # 1. 获取详情页
  60. resp = await client.get(img_url)
  61. resp.raise_for_status()
  62. real_url_match = IMG_URL_RE.search(resp.text)
  63. if not real_url_match:
  64. log.warning(f"未解析到真实图片链接: {img_url}")
  65. return False # <- 这里不会触发 await
  66. real_url = real_url_match.group(1)
  67. # 2. 下载真实图片(流式)
  68. ext_match = EXT_RE.search(real_url)
  69. ext = ext_match.group(1).lower() if ext_match else "jpg"
  70. final_path = img_path.with_suffix(f".{ext}")
  71. if await AsyncPath(final_path).exists():
  72. log.info(f"已存在,跳过: {final_path.name}")
  73. return True
  74. async with client.stream("GET", real_url) as img_resp:
  75. img_resp.raise_for_status()
  76. await AsyncPath(final_path).parent.mkdir(parents=True, exist_ok=True)
  77. async with aiofiles.open(final_path, "wb") as fp:
  78. async for chunk in img_resp.aiter_bytes(chunk_size=65536):
  79. await fp.write(chunk)
  80. log.info(f"[OK] {final_path.name}")
  81. return True
  82. except httpx.HTTPStatusError as exc:
  83. if exc.response.status_code == 429:
  84. wait = 2 ** (attempt - 1)
  85. log.warning(f"[429] 等待 {wait}s 后重试({attempt}/{RETRY_PER_IMG})")
  86. await asyncio.sleep(wait)
  87. else:
  88. log.error(f"[HTTP {exc.response.status_code}] {img_url}")
  89. break
  90. except Exception as exc:
  91. log.error(f"[ERROR] {img_url} -> {exc} ({attempt}/{RETRY_PER_IMG})")
  92. await asyncio.sleep(1)
  93. return False
  94. finally:
  95. sem.release()
  96. # -------------------- 扫描待下载列表 --------------------
  97. async def scan_tasks() -> List[Dict[str, str]]:
  98. """扫描 downloads/ 下所有 json,返回待下载列表"""
  99. result = []
  100. root = AsyncPath("downloads")
  101. if not await root.exists():
  102. return result
  103. async for json_path in root.rglob("*.json"):
  104. folder = json_path.parent
  105. try:
  106. data: Dict[str, str] = json.loads(await json_path.read_text(encoding="utf-8"))
  107. except Exception as exc:
  108. log.warning(f"读取 json 失败 {json_path} -> {exc}")
  109. continue
  110. for img_name, img_url in data.items():
  111. img_path = folder / img_name # 无后缀
  112. # 异步判断任意后缀是否存在
  113. exists = False
  114. for ext in (".jpg", ".jpeg", ".png", ".gif", ".webp"):
  115. if await img_path.with_suffix(ext).exists():
  116. exists = True
  117. break
  118. if not exists:
  119. result.append({"img_path": str(img_path), "img_url": img_url})
  120. return result
  121. # -------------------- 主流程 --------------------
  122. async def main() -> None:
  123. # 1. 优先重试上次失败
  124. failed_tasks = load_failed()
  125. if failed_tasks:
  126. log.info(f"优先重试上次失败任务: {len(failed_tasks)} 张")
  127. tasks = failed_tasks + await scan_tasks()
  128. if not tasks:
  129. log.info("没有需要下载的图片,收工!")
  130. return
  131. proxy = PROXY if PROXY else None
  132. limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
  133. async with httpx.AsyncClient(limits=limits, timeout=TIMEOUT, proxies=proxy, verify=True) as client:
  134. sem = asyncio.Semaphore(CONCURRENCY)
  135. results = await tqdm_asyncio.gather(
  136. *[download_one(client, sem, t) for t in tasks],
  137. desc="Downloading",
  138. total=len(tasks),
  139. )
  140. # 统计 & 持久化新失败
  141. failed_again = [t for t, ok in zip(tasks, results) if not ok]
  142. if failed_again:
  143. save_failed(failed_again)
  144. log.warning(f"本轮仍有 {len(failed_again)} 张下载失败,已写入 {FAILED_RECORD}")
  145. else:
  146. Path(FAILED_RECORD).unlink(missing_ok=True)
  147. log.info("全部下载完成!")
  148. if __name__ == "__main__":
  149. try:
  150. asyncio.run(main())
  151. except KeyboardInterrupt:
  152. log.info("用户中断,下载结束")