| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 异步批量下载 EH 画廊真实图片
- python download_images.py
- """
- from __future__ import annotations
- import asyncio
- import json
- import logging
- import os
- import re
- import sys
- from pathlib import Path
- from typing import Dict, List
- import aiofiles
- import httpx
- from pathlib import Path
- from tqdm.asyncio import tqdm_asyncio
- # -------------------- 可配置常量 --------------------
- from config import config
- CONCURRENCY = config.concurrency
- RETRY_PER_IMG = config.retry_per_image
- TIMEOUT = httpx.Timeout(config.image_timeout)
- FAILED_RECORD = "data/failed_downloads.json"
- LOG_LEVEL = getattr(logging, config.log_level.upper())
- # ----------------------------------------------------
- # 确保数据目录存在
- if not os.path.exists("data"):
- os.mkdir("data")
- # 使用统一的日志配置
- from logger import get_logger
- from realtime_logger import realtime_logger
- log = get_logger("step2", "download.log")
- # 预编译正则
- IMG_URL_RE = re.compile(r'<img id="img" src="(.*?)"', re.S)
- EXT_RE = re.compile(r"\.(jpg|jpeg|png|gif|webp)$", re.I)
- # -------------------- 工具函数 --------------------
- def load_failed() -> List[Dict[str, str]]:
- if Path(FAILED_RECORD).exists():
- try:
- return json.loads(Path(FAILED_RECORD).read_text(encoding="utf-8"))
- except Exception as exc:
- log.warning(f"加载失败记录失败 -> {exc}")
- return []
- def save_failed(failed: List[Dict[str, str]]) -> None:
- Path(FAILED_RECORD).write_text(json.dumps(failed, ensure_ascii=False, indent=2), encoding="utf-8")
- # -------------------- 下载核心 --------------------
- async def download_one(
- client: httpx.AsyncClient, sem: asyncio.Semaphore, item: Dict[str, str]
- ) -> bool:
- """下载单张图,成功返回 True"""
- img_path, img_url = Path(item["img_path"]), item["img_url"]
- await sem.acquire()
- try:
- for attempt in range(1, RETRY_PER_IMG + 1):
- try:
- # 1. 获取详情页
- resp = await client.get(img_url)
- resp.raise_for_status()
- real_url_match = IMG_URL_RE.search(resp.text)
- if not real_url_match:
- log.warning(f"未解析到真实图片链接: {img_url}")
- return False # <- 这里不会触发 await
- real_url = real_url_match.group(1)
- # 2. 下载真实图片(流式)
- ext_match = EXT_RE.search(real_url)
- ext = ext_match.group(1).lower() if ext_match else "jpg"
- final_path = img_path.with_suffix(f".{ext}")
- if final_path.exists():
- log.info(f"已存在,跳过: {final_path.name}")
- return True
- async with client.stream("GET", real_url) as img_resp:
- img_resp.raise_for_status()
- final_path.parent.mkdir(parents=True, exist_ok=True)
- async with aiofiles.open(final_path, "wb") as fp:
- async for chunk in img_resp.aiter_bytes(chunk_size=65536):
- await fp.write(chunk)
- # log.info(f"[OK] {final_path.name}")
- # 发送实时日志
- try:
- realtime_logger.broadcast_log_sync(f"下载完成: {final_path.name}", "SUCCESS", "step2")
- except Exception as e:
- log.warning(f"发送实时日志失败: {e}")
- return True
- except httpx.HTTPStatusError as exc:
- if exc.response.status_code == 429:
- wait = 2 ** (attempt - 1)
- log.warning(f"[429] 等待 {wait}s 后重试({attempt}/{RETRY_PER_IMG})")
- await asyncio.sleep(wait)
- else:
- log.error(f"[HTTP {exc.response.status_code}] {img_url}")
- break
- except Exception as exc:
- log.error(f"[ERROR] {img_url} -> {exc} ({attempt}/{RETRY_PER_IMG})")
- await asyncio.sleep(1)
- return False
- finally:
- sem.release()
- # -------------------- 扫描待下载列表 --------------------
- async def scan_tasks() -> List[Dict[str, str]]:
- """扫描 downloads/ 下所有 json,返回待下载列表"""
- result = []
- root = Path("data/downloads")
- if not root.exists():
- return result
- for json_path in root.rglob("*.json"):
- folder = json_path.parent
- try:
- data: Dict[str, str] = json.loads(json_path.read_text(encoding="utf-8"))
- except Exception as exc:
- log.warning(f"读取 json 失败 {json_path} -> {exc}")
- continue
- for img_name, img_url in data.items():
- img_path = folder / img_name # 无后缀
- # 判断任意后缀是否存在
- exists = False
- for ext in (".jpg", ".jpeg", ".png", ".gif", ".webp"):
- if img_path.with_suffix(ext).exists():
- exists = True
- break
- if not exists:
- result.append({"img_path": str(img_path), "img_url": img_url})
- return result
- # -------------------- 主流程 --------------------
- async def main(proxy: str | None = None) -> None:
- # 1. 优先重试上次失败
- failed_tasks = load_failed()
- if failed_tasks:
- log.info(f"优先重试上次失败任务: {len(failed_tasks)} 张")
- tasks = failed_tasks + await scan_tasks()
- if not tasks:
- log.info("没有需要下载的图片,收工!")
- return
- limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
- async with httpx.AsyncClient(limits=limits, timeout=TIMEOUT, proxies=proxy, verify=True) as client:
- sem = asyncio.Semaphore(CONCURRENCY)
- results = await tqdm_asyncio.gather(
- *[download_one(client, sem, t) for t in tasks],
- desc="Downloading",
- total=len(tasks),
- )
- # 统计 & 持久化新失败
- failed_again = [t for t, ok in zip(tasks, results) if not ok]
- if failed_again:
- save_failed(failed_again)
- log.warning(f"本轮仍有 {len(failed_again)} 张下载失败,已写入 {FAILED_RECORD}")
- else:
- Path(FAILED_RECORD).unlink(missing_ok=True)
- log.info("全部下载完成!")
- if __name__ == "__main__":
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- log.info("用户中断,下载结束")
|