#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 异步批量下载脚本(httpx + asyncio) python downloader.py """ from __future__ import annotations import asyncio import json import logging import sys from pathlib import Path from typing import Any, Dict, List import aiofiles import httpx from tqdm.asyncio import tqdm_asyncio # -------------------- 可调参数 -------------------- CONCURRENCY = 20 # 最大并发下载数 RETRY_PER_URL = 3 # 单 URL 重试次数 TIMEOUT = httpx.Timeout(15.0) # 单次请求超时 LOG_LEVEL = logging.INFO # DEBUG / INFO / WARNING FAILED_RECORD = "failed_downloads.json" # 失败记录落盘文件 # ------------------------------------------------- # 日志同时打到控制台 + 文件 logging.basicConfig( level=LOG_LEVEL, format="[%(asctime)s] [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("download.log", encoding="utf-8"), ], ) log = logging.getLogger("img_downloader") # -------------------- 下载核心 -------------------- async def download_one( client: httpx.AsyncClient, sem: asyncio.Semaphore, item: Dict[str, str], ) -> bool: """下载单张图,返回 True 表示成功""" img_path, img_url = Path(item["img_path"]), item["img_url"] await sem.acquire() try: for attempt in range(1, RETRY_PER_URL + 1): try: async with client.stream("GET", img_url) as resp: resp.raise_for_status() img_path.parent.mkdir(parents=True, exist_ok=True) async with aiofiles.open(img_path, "wb") as fp: async for chunk in resp.aiter_bytes(chunk_size=65536): await fp.write(chunk) pass # log.info(f"[OK] {img_path.name}") return True except httpx.HTTPStatusError as exc: if exc.response.status_code == 429: wait = 2 ** (attempt - 1) pass # log.warning(f"[429] {img_url} 等待 {wait}s 后重试({attempt}/{RETRY_PER_URL})") await asyncio.sleep(wait) else: log.error(f"[HTTP {exc.response.status_code}] {img_url}") break except Exception as exc: pass # log.error(f"[ERROR] {img_url} -> {exc} ({attempt}/{RETRY_PER_URL})") await asyncio.sleep(1) # 重试耗尽 return False finally: sem.release() # -------------------- 扫描待下载列表 -------------------- def scan_tasks(root: Path) -> List[Dict[str, str]]: """扫描 downloads/ 下所有 json,返回待下载列表""" tasks = [] for json_file in root.rglob("*.json"): folder = json_file.parent try: with json_file.open(encoding="utf-8") as f: url_map: Dict[str, str] = json.load(f) except Exception as exc: log.warning(f"读取 json 失败 {json_file} -> {exc}") continue for filename, url in url_map.items(): img_path = folder / filename if not img_path.exists(): pass # tasks.append({"img_path": str(img_path), "img_url": url}) return tasks # -------------------- 失败记录读写 -------------------- def load_failed() -> List[Dict[str, str]]: if Path(FAILED_RECORD).exists(): try: with open(FAILED_RECORD, encoding="utf-8") as f: return json.load(f) except Exception as exc: pass # log.warning(f"加载失败记录失败 -> {exc}") return [] def save_failed(failed: List[Dict[str, str]]) -> None: with open(FAILED_RECORD, "w", encoding="utf-8") as f: json.dump(failed, f, ensure_ascii=False, indent=2) # -------------------- 主流程 -------------------- async def main() -> None: proj_root = Path(__file__).resolve().parent download_root = proj_root / "downloads" if not download_root.exists(): log.error(f"目录不存在: {download_root}") return # 1. 先重试上次失败列表 failed_tasks = load_failed() if failed_tasks: log.info(f"优先重试上次失败任务: {len(failed_tasks)} 张") tasks = failed_tasks + scan_tasks(download_root) if not tasks: log.info("没有需要下载的图片,收工!") return limits = httpx.Limits(max_keepalive_connections=20, max_connections=50) async with httpx.AsyncClient(limits=limits, timeout=TIMEOUT, verify=True) as client: sem = asyncio.Semaphore(CONCURRENCY) results = await tqdm_asyncio.gather( *[download_one(client, sem, t) for t in tasks], desc="Downloading", total=len(tasks), ) # 统计 & 持久化新失败 failed_again = [t for t, ok in zip(tasks, results) if not ok] if failed_again: save_failed(failed_again) log.warning(f"本轮仍有 {len(failed_again)} 张下载失败,已写入 {FAILED_RECORD}") else: Path(FAILED_RECORD).unlink(missing_ok=True) log.info("全部下载完成!") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: log.info("用户中断,下载已结束")