| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 异步批量抓取 kaizty 图片 URL,按专辑分文件夹保存 json
- python crawl_urls.py
- """
- from __future__ import annotations
- import asyncio
- import json
- import logging
- import re
- import sys
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- import aiofiles
- import httpx
- from tqdm.asyncio import tqdm_asyncio
- # -------------------- 可配置常量 --------------------
- CONCURRENCY = 20 # 并发抓取数
- MAX_PAGE = 30 # 单专辑最大翻页
- RETRY_PER_PAGE = 5 # 单页重试次数
- TIMEOUT = httpx.Timeout(10.0) # 单次请求超时
- PROXY = "http://127.0.0.1:7890" # 科学上网代理,不需要就留空
- FAILED_RECORD = "failed_keys.json" # 失败 key 落盘
- LOG_LEVEL = logging.INFO # DEBUG / INFO / WARNING
- # ----------------------------------------------------
- # 日志同时写文件 + 控制台
- logging.basicConfig(
- level=LOG_LEVEL,
- format="[%(asctime)s] [%(levelname)s] %(message)s",
- handlers=[
- logging.StreamHandler(sys.stdout),
- logging.FileHandler("crawl.log", encoding="utf-8"),
- ],
- )
- log = logging.getLogger("crawler")
- # 预编译正则,提速
- TITLE_RE = re.compile(r"<title>(.*?)</title>", re.S)
- IMG_RE = re.compile(r'<meta itemprop="image" content="(.*?)">', re.S)
- ILLEGAL_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1F]')
- # -------------------- 工具函数 --------------------
- def clean_folder_name(title: str) -> str:
- """清洗 Windows 合法文件夹名"""
- title = ILLEGAL_CHARS.sub("_", title)
- return title.replace(" ", "").replace("_", "").strip() or "default"
- def load_keys() -> List[str]:
- keys_file = Path("keys.txt")
- if not keys_file.exists():
- log.warning("keys.txt 不存在,已自动创建,请先填写 key")
- keys_file.touch()
- sys.exit(0)
- lines = [ln.strip() for ln in keys_file.read_text(encoding="utf-8").splitlines() if ln.strip()]
- if not lines:
- log.warning("keys.txt 为空,请先填写 key")
- sys.exit(0)
- return list(set(lines)) # 去重
- def load_failed_keys() -> List[str]:
- if Path(FAILED_RECORD).exists():
- try:
- return json.loads(Path(FAILED_RECORD).read_text(encoding="utf-8"))
- except Exception as exc:
- log.warning(f"加载失败记录失败 -> {exc}")
- return []
- def save_failed_keys(keys: List[str]) -> None:
- Path(FAILED_RECORD).write_text(json.dumps(keys, ensure_ascii=False, indent=2), encoding="utf-8")
- # -------------------- 爬虫核心 --------------------
- async def fetch_page(client: httpx.AsyncClient, url: str) -> Optional[str]:
- for attempt in range(1, RETRY_PER_PAGE + 1):
- try:
- resp = await client.get(url)
- resp.raise_for_status()
- return resp.text
- except httpx.HTTPError as exc:
- log.error(f"[{attempt}/{RETRY_PER_PAGE}] 请求失败 {url} -> {exc}")
- await asyncio.sleep(2 ** attempt)
- return None
- async def crawl_one_key(key: str, client: httpx.AsyncClient, sem: asyncio.Semaphore) -> bool:
- """抓取单个 key,成功返回 True"""
- async with sem:
- base_url = f"https://www.kaizty.com/photos/{key}.html?page="
- data: Dict[str, str] = {}
- folder_name = "default"
- n = 1
- for page in range(1, MAX_PAGE + 1):
- url = base_url + str(page)
- html = await fetch_page(client, url)
- if html is None:
- continue
- if "EMPTY" in html:
- log.info(f"{key} 第 {page} 页为空,终止翻页")
- break
- # 第一页解析标题
- if page == 1:
- title_match = TITLE_RE.search(html)
- if title_match:
- folder_name = clean_folder_name(title_match.group(1))
- log.info(f"{key} 专辑名 -> {folder_name}")
- links = IMG_RE.findall(html)
- if links:
- for link in links:
- suffix = link.split(".")[-1]
- img_name = f"{n:03d}.{suffix}"
- data[img_name] = link
- n += 1
- if not data:
- log.warning(f"{key} 未解析到任何图片链接")
- return False
- # 写 json
- base = Path("downloads") / folder_name
- base.mkdir(parents=True, exist_ok=True)
- json_path = base / f"{key}.json"
- async with aiofiles.open(json_path, "w", encoding="utf-8") as f:
- await f.write(json.dumps(data, ensure_ascii=False, indent=2))
- log.info(f"{key} 已保存 -> {json_path} ({len(data)} 张)")
- return True
- # -------------------- 主流程 --------------------
- async def main() -> None:
- keys = load_keys()
- failed_keys = load_failed_keys()
- if failed_keys:
- log.info(f"优先重试上次失败 key: {len(failed_keys)} 个")
- all_keys = list(set(keys + failed_keys))
- proxy = PROXY if PROXY else None
- limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
- async with httpx.AsyncClient(
- limits=limits, timeout=TIMEOUT, proxies=proxy, verify=True
- ) as client:
- sem = asyncio.Semaphore(CONCURRENCY)
- tasks = [crawl_one_key(k, client, sem) for k in all_keys]
- results = await tqdm_asyncio.gather(*tasks, desc="Crawling")
- # 统计失败
- new_failed = [k for k, ok in zip(all_keys, results) if not ok]
- if new_failed:
- save_failed_keys(new_failed)
- log.warning(f"本轮仍有 {len(new_failed)} 个 key 失败,已写入 {FAILED_RECORD}")
- else:
- Path(FAILED_RECORD).unlink(missing_ok=True)
- log.info("全部 key 抓取完成!")
- if __name__ == "__main__":
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- log.info("用户中断,抓取结束")
|