| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 异步批量下载脚本(httpx + asyncio)
- python downloader.py
- """
- from __future__ import annotations
- import asyncio
- import json
- import logging
- import sys
- from pathlib import Path
- from typing import Any, Dict, List
- import aiofiles
- import httpx
- from tqdm.asyncio import tqdm_asyncio
- # -------------------- 可调参数 --------------------
- CONCURRENCY = 20 # 最大并发下载数
- RETRY_PER_URL = 3 # 单 URL 重试次数
- TIMEOUT = httpx.Timeout(15.0) # 单次请求超时
- LOG_LEVEL = logging.INFO # DEBUG / INFO / WARNING
- FAILED_RECORD = "failed_downloads.json" # 失败记录落盘文件
- # -------------------------------------------------
- # 日志同时打到控制台 + 文件
- logging.basicConfig(
- level=LOG_LEVEL,
- format="[%(asctime)s] [%(levelname)s] %(message)s",
- handlers=[
- logging.StreamHandler(sys.stdout),
- logging.FileHandler("download.log", encoding="utf-8"),
- ],
- )
- log = logging.getLogger("img_downloader")
- # -------------------- 下载核心 --------------------
- async def download_one(
- client: httpx.AsyncClient,
- sem: asyncio.Semaphore,
- item: Dict[str, str],
- ) -> bool:
- """下载单张图,返回 True 表示成功"""
- img_path, img_url = Path(item["img_path"]), item["img_url"]
- await sem.acquire()
- try:
- for attempt in range(1, RETRY_PER_URL + 1):
- try:
- async with client.stream("GET", img_url) as resp:
- resp.raise_for_status()
- img_path.parent.mkdir(parents=True, exist_ok=True)
- async with aiofiles.open(img_path, "wb") as fp:
- async for chunk in resp.aiter_bytes(chunk_size=65536):
- await fp.write(chunk)
- pass
- # log.info(f"[OK] {img_path.name}")
- return True
- except httpx.HTTPStatusError as exc:
- if exc.response.status_code == 429:
- wait = 2 ** (attempt - 1)
- pass
- # log.warning(f"[429] {img_url} 等待 {wait}s 后重试({attempt}/{RETRY_PER_URL})")
- await asyncio.sleep(wait)
- else:
- log.error(f"[HTTP {exc.response.status_code}] {img_url}")
- break
- except Exception as exc:
- pass
- # log.error(f"[ERROR] {img_url} -> {exc} ({attempt}/{RETRY_PER_URL})")
- await asyncio.sleep(1)
- # 重试耗尽
- return False
- finally:
- sem.release()
- # -------------------- 扫描待下载列表 --------------------
- def scan_tasks(root: Path) -> List[Dict[str, str]]:
- """扫描 downloads/ 下所有 json,返回待下载列表"""
- tasks = []
- for json_file in root.rglob("*.json"):
- folder = json_file.parent
- try:
- with json_file.open(encoding="utf-8") as f:
- url_map: Dict[str, str] = json.load(f)
- except Exception as exc:
- log.warning(f"读取 json 失败 {json_file} -> {exc}")
- continue
- for filename, url in url_map.items():
- img_path = folder / filename
- if not img_path.exists():
- pass
- # tasks.append({"img_path": str(img_path), "img_url": url})
- return tasks
- # -------------------- 失败记录读写 --------------------
- def load_failed() -> List[Dict[str, str]]:
- if Path(FAILED_RECORD).exists():
- try:
- with open(FAILED_RECORD, encoding="utf-8") as f:
- return json.load(f)
- except Exception as exc:
- pass
- # log.warning(f"加载失败记录失败 -> {exc}")
- return []
- def save_failed(failed: List[Dict[str, str]]) -> None:
- with open(FAILED_RECORD, "w", encoding="utf-8") as f:
- json.dump(failed, f, ensure_ascii=False, indent=2)
- # -------------------- 主流程 --------------------
- async def main() -> None:
- proj_root = Path(__file__).resolve().parent
- download_root = proj_root / "downloads"
- if not download_root.exists():
- log.error(f"目录不存在: {download_root}")
- return
- # 1. 先重试上次失败列表
- failed_tasks = load_failed()
- if failed_tasks:
- log.info(f"优先重试上次失败任务: {len(failed_tasks)} 张")
- tasks = failed_tasks + scan_tasks(download_root)
- if not tasks:
- log.info("没有需要下载的图片,收工!")
- return
- limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
- async with httpx.AsyncClient(limits=limits, timeout=TIMEOUT, verify=True) as client:
- sem = asyncio.Semaphore(CONCURRENCY)
- results = await tqdm_asyncio.gather(
- *[download_one(client, sem, t) for t in tasks],
- desc="Downloading",
- total=len(tasks),
- )
- # 统计 & 持久化新失败
- failed_again = [t for t, ok in zip(tasks, results) if not ok]
- if failed_again:
- save_failed(failed_again)
- log.warning(f"本轮仍有 {len(failed_again)} 张下载失败,已写入 {FAILED_RECORD}")
- else:
- Path(FAILED_RECORD).unlink(missing_ok=True)
- log.info("全部下载完成!")
- if __name__ == "__main__":
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- log.info("用户中断,下载已结束")
|