2step.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 异步批量下载脚本(httpx + asyncio)
  5. python downloader.py
  6. """
  7. from __future__ import annotations
  8. import asyncio
  9. import json
  10. import logging
  11. import sys
  12. from pathlib import Path
  13. from typing import Any, Dict, List
  14. import aiofiles
  15. import httpx
  16. from tqdm.asyncio import tqdm_asyncio
  17. # -------------------- 可调参数 --------------------
  18. CONCURRENCY = 20 # 最大并发下载数
  19. RETRY_PER_URL = 3 # 单 URL 重试次数
  20. TIMEOUT = httpx.Timeout(15.0) # 单次请求超时
  21. LOG_LEVEL = logging.INFO # DEBUG / INFO / WARNING
  22. FAILED_RECORD = "failed_downloads.json" # 失败记录落盘文件
  23. # -------------------------------------------------
  24. # 日志同时打到控制台 + 文件
  25. logging.basicConfig(
  26. level=LOG_LEVEL,
  27. format="[%(asctime)s] [%(levelname)s] %(message)s",
  28. handlers=[
  29. logging.StreamHandler(sys.stdout),
  30. logging.FileHandler("download.log", encoding="utf-8"),
  31. ],
  32. )
  33. log = logging.getLogger("img_downloader")
  34. # -------------------- 下载核心 --------------------
  35. async def download_one(
  36. client: httpx.AsyncClient,
  37. sem: asyncio.Semaphore,
  38. item: Dict[str, str],
  39. ) -> bool:
  40. """下载单张图,返回 True 表示成功"""
  41. img_path, img_url = Path(item["img_path"]), item["img_url"]
  42. await sem.acquire()
  43. try:
  44. for attempt in range(1, RETRY_PER_URL + 1):
  45. try:
  46. async with client.stream("GET", img_url) as resp:
  47. resp.raise_for_status()
  48. img_path.parent.mkdir(parents=True, exist_ok=True)
  49. async with aiofiles.open(img_path, "wb") as fp:
  50. async for chunk in resp.aiter_bytes(chunk_size=65536):
  51. await fp.write(chunk)
  52. pass
  53. # log.info(f"[OK] {img_path.name}")
  54. return True
  55. except httpx.HTTPStatusError as exc:
  56. if exc.response.status_code == 429:
  57. wait = 2 ** (attempt - 1)
  58. pass
  59. # log.warning(f"[429] {img_url} 等待 {wait}s 后重试({attempt}/{RETRY_PER_URL})")
  60. await asyncio.sleep(wait)
  61. else:
  62. log.error(f"[HTTP {exc.response.status_code}] {img_url}")
  63. break
  64. except Exception as exc:
  65. pass
  66. # log.error(f"[ERROR] {img_url} -> {exc} ({attempt}/{RETRY_PER_URL})")
  67. await asyncio.sleep(1)
  68. # 重试耗尽
  69. return False
  70. finally:
  71. sem.release()
  72. # -------------------- 扫描待下载列表 --------------------
  73. def scan_tasks(root: Path) -> List[Dict[str, str]]:
  74. """扫描 downloads/ 下所有 json,返回待下载列表"""
  75. tasks = []
  76. for json_file in root.rglob("*.json"):
  77. folder = json_file.parent
  78. try:
  79. with json_file.open(encoding="utf-8") as f:
  80. url_map: Dict[str, str] = json.load(f)
  81. except Exception as exc:
  82. log.warning(f"读取 json 失败 {json_file} -> {exc}")
  83. continue
  84. for filename, url in url_map.items():
  85. img_path = folder / filename
  86. if not img_path.exists():
  87. pass
  88. # tasks.append({"img_path": str(img_path), "img_url": url})
  89. return tasks
  90. # -------------------- 失败记录读写 --------------------
  91. def load_failed() -> List[Dict[str, str]]:
  92. if Path(FAILED_RECORD).exists():
  93. try:
  94. with open(FAILED_RECORD, encoding="utf-8") as f:
  95. return json.load(f)
  96. except Exception as exc:
  97. pass
  98. # log.warning(f"加载失败记录失败 -> {exc}")
  99. return []
  100. def save_failed(failed: List[Dict[str, str]]) -> None:
  101. with open(FAILED_RECORD, "w", encoding="utf-8") as f:
  102. json.dump(failed, f, ensure_ascii=False, indent=2)
  103. # -------------------- 主流程 --------------------
  104. async def main() -> None:
  105. proj_root = Path(__file__).resolve().parent
  106. download_root = proj_root / "downloads"
  107. if not download_root.exists():
  108. log.error(f"目录不存在: {download_root}")
  109. return
  110. # 1. 先重试上次失败列表
  111. failed_tasks = load_failed()
  112. if failed_tasks:
  113. log.info(f"优先重试上次失败任务: {len(failed_tasks)} 张")
  114. tasks = failed_tasks + scan_tasks(download_root)
  115. if not tasks:
  116. log.info("没有需要下载的图片,收工!")
  117. return
  118. limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
  119. async with httpx.AsyncClient(limits=limits, timeout=TIMEOUT, verify=True) as client:
  120. sem = asyncio.Semaphore(CONCURRENCY)
  121. results = await tqdm_asyncio.gather(
  122. *[download_one(client, sem, t) for t in tasks],
  123. desc="Downloading",
  124. total=len(tasks),
  125. )
  126. # 统计 & 持久化新失败
  127. failed_again = [t for t, ok in zip(tasks, results) if not ok]
  128. if failed_again:
  129. save_failed(failed_again)
  130. log.warning(f"本轮仍有 {len(failed_again)} 张下载失败,已写入 {FAILED_RECORD}")
  131. else:
  132. Path(FAILED_RECORD).unlink(missing_ok=True)
  133. log.info("全部下载完成!")
  134. if __name__ == "__main__":
  135. try:
  136. asyncio.run(main())
  137. except KeyboardInterrupt:
  138. log.info("用户中断,下载已结束")