2step.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 异步批量下载脚本(httpx + asyncio)
  5. python downloader.py
  6. """
  7. from __future__ import annotations
  8. import asyncio
  9. import json
  10. import logging
  11. import sys
  12. from pathlib import Path
  13. from typing import Any, Dict, List
  14. import aiofiles
  15. import httpx
  16. from tqdm.asyncio import tqdm_asyncio
  17. # -------------------- 可调参数 --------------------
  18. CONCURRENCY = 20 # 最大并发下载数
  19. RETRY_PER_URL = 3 # 单 URL 重试次数
  20. TIMEOUT = httpx.Timeout(15.0) # 单次请求超时
  21. LOG_LEVEL = logging.INFO # DEBUG / INFO / WARNING
  22. FAILED_RECORD = "failed_downloads.json" # 失败记录落盘文件
  23. # -------------------------------------------------
  24. # 日志同时打到控制台 + 文件
  25. logging.basicConfig(
  26. level=LOG_LEVEL,
  27. format="[%(asctime)s] [%(levelname)s] %(message)s",
  28. handlers=[
  29. logging.StreamHandler(sys.stdout),
  30. logging.FileHandler("download.log", encoding="utf-8"),
  31. ],
  32. )
  33. log = logging.getLogger("img_downloader")
  34. # -------------------- 下载核心 --------------------
  35. async def download_one(
  36. client: httpx.AsyncClient,
  37. sem: asyncio.Semaphore,
  38. item: Dict[str, str],
  39. ) -> bool:
  40. """下载单张图,返回 True 表示成功"""
  41. img_path, img_url = Path(item["img_path"]), item["img_url"]
  42. await sem.acquire()
  43. try:
  44. for attempt in range(1, RETRY_PER_URL + 1):
  45. try:
  46. async with client.stream("GET", img_url) as resp:
  47. resp.raise_for_status()
  48. img_path.parent.mkdir(parents=True, exist_ok=True)
  49. async with aiofiles.open(img_path, "wb") as fp:
  50. async for chunk in resp.aiter_bytes(chunk_size=65536):
  51. await fp.write(chunk)
  52. log.info(f"[OK] {img_path.name}")
  53. return True
  54. except httpx.HTTPStatusError as exc:
  55. if exc.response.status_code == 429:
  56. wait = 2 ** (attempt - 1)
  57. log.warning(f"[429] {img_url} 等待 {wait}s 后重试({attempt}/{RETRY_PER_URL})")
  58. await asyncio.sleep(wait)
  59. else:
  60. log.error(f"[HTTP {exc.response.status_code}] {img_url}")
  61. break
  62. except Exception as exc:
  63. log.error(f"[ERROR] {img_url} -> {exc} ({attempt}/{RETRY_PER_URL})")
  64. await asyncio.sleep(1)
  65. # 重试耗尽
  66. return False
  67. finally:
  68. sem.release()
  69. # -------------------- 扫描待下载列表 --------------------
  70. def scan_tasks(root: Path) -> List[Dict[str, str]]:
  71. """扫描 downloads/ 下所有 json,返回待下载列表"""
  72. tasks = []
  73. for json_file in root.rglob("*.json"):
  74. folder = json_file.parent
  75. try:
  76. with json_file.open(encoding="utf-8") as f:
  77. url_map: Dict[str, str] = json.load(f)
  78. except Exception as exc:
  79. log.warning(f"读取 json 失败 {json_file} -> {exc}")
  80. continue
  81. for filename, url in url_map.items():
  82. img_path = folder / filename
  83. if not img_path.exists():
  84. tasks.append({"img_path": str(img_path), "img_url": url})
  85. return tasks
  86. # -------------------- 失败记录读写 --------------------
  87. def load_failed() -> List[Dict[str, str]]:
  88. if Path(FAILED_RECORD).exists():
  89. try:
  90. with open(FAILED_RECORD, encoding="utf-8") as f:
  91. return json.load(f)
  92. except Exception as exc:
  93. log.warning(f"加载失败记录失败 -> {exc}")
  94. return []
  95. def save_failed(failed: List[Dict[str, str]]) -> None:
  96. with open(FAILED_RECORD, "w", encoding="utf-8") as f:
  97. json.dump(failed, f, ensure_ascii=False, indent=2)
  98. # -------------------- 主流程 --------------------
  99. async def main() -> None:
  100. proj_root = Path(__file__).resolve().parent
  101. download_root = proj_root / "downloads"
  102. if not download_root.exists():
  103. log.error(f"目录不存在: {download_root}")
  104. return
  105. # 1. 先重试上次失败列表
  106. failed_tasks = load_failed()
  107. if failed_tasks:
  108. log.info(f"优先重试上次失败任务: {len(failed_tasks)} 张")
  109. tasks = failed_tasks + scan_tasks(download_root)
  110. if not tasks:
  111. log.info("没有需要下载的图片,收工!")
  112. return
  113. limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
  114. async with httpx.AsyncClient(limits=limits, timeout=TIMEOUT, verify=True) as client:
  115. sem = asyncio.Semaphore(CONCURRENCY)
  116. results = await tqdm_asyncio.gather(
  117. *[download_one(client, sem, t) for t in tasks],
  118. desc="Downloading",
  119. total=len(tasks),
  120. )
  121. # 统计 & 持久化新失败
  122. failed_again = [t for t, ok in zip(tasks, results) if not ok]
  123. if failed_again:
  124. save_failed(failed_again)
  125. log.warning(f"本轮仍有 {len(failed_again)} 张下载失败,已写入 {FAILED_RECORD}")
  126. else:
  127. Path(FAILED_RECORD).unlink(missing_ok=True)
  128. log.info("全部下载完成!")
  129. if __name__ == "__main__":
  130. try:
  131. asyncio.run(main())
  132. except KeyboardInterrupt:
  133. log.info("用户中断,下载已结束")