1step.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 异步批量抓取 kaizty 图片 URL,按专辑分文件夹保存 json
  5. python crawl_urls.py
  6. """
  7. from __future__ import annotations
  8. import asyncio
  9. import json
  10. import logging
  11. import re
  12. import sys
  13. from pathlib import Path
  14. from typing import Any, Dict, List, Optional
  15. import aiofiles
  16. import httpx
  17. from tqdm.asyncio import tqdm_asyncio
  18. # -------------------- 可配置常量 --------------------
  19. CONCURRENCY = 20 # 并发抓取数
  20. MAX_PAGE = 30 # 单专辑最大翻页
  21. RETRY_PER_PAGE = 5 # 单页重试次数
  22. TIMEOUT = httpx.Timeout(10.0) # 单次请求超时
  23. PROXY = "http://127.0.0.1:7890" # 科学上网代理,不需要就留空
  24. FAILED_RECORD = "failed_keys.json" # 失败 key 落盘
  25. LOG_LEVEL = logging.INFO # DEBUG / INFO / WARNING
  26. # ----------------------------------------------------
  27. # 日志同时写文件 + 控制台
  28. logging.basicConfig(
  29. level=LOG_LEVEL,
  30. format="[%(asctime)s] [%(levelname)s] %(message)s",
  31. handlers=[
  32. logging.StreamHandler(sys.stdout),
  33. logging.FileHandler("crawl.log", encoding="utf-8"),
  34. ],
  35. )
  36. log = logging.getLogger("crawler")
  37. # 预编译正则,提速
  38. TITLE_RE = re.compile(r"<title>(.*?)</title>", re.S)
  39. IMG_RE = re.compile(r'<meta itemprop="image" content="(.*?)">', re.S)
  40. ILLEGAL_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1F]')
  41. # -------------------- 工具函数 --------------------
  42. def clean_folder_name(title: str) -> str:
  43. """清洗 Windows 合法文件夹名"""
  44. title = ILLEGAL_CHARS.sub("_", title)
  45. return title.replace(" ", "").replace("_", "").strip() or "default"
  46. def load_keys() -> List[str]:
  47. keys_file = Path("keys.txt")
  48. if not keys_file.exists():
  49. log.warning("keys.txt 不存在,已自动创建,请先填写 key")
  50. keys_file.touch()
  51. sys.exit(0)
  52. lines = [ln.strip() for ln in keys_file.read_text(encoding="utf-8").splitlines() if ln.strip()]
  53. if not lines:
  54. log.warning("keys.txt 为空,请先填写 key")
  55. sys.exit(0)
  56. return list(set(lines)) # 去重
  57. def load_failed_keys() -> List[str]:
  58. if Path(FAILED_RECORD).exists():
  59. try:
  60. return json.loads(Path(FAILED_RECORD).read_text(encoding="utf-8"))
  61. except Exception as exc:
  62. log.warning(f"加载失败记录失败 -> {exc}")
  63. return []
  64. def save_failed_keys(keys: List[str]) -> None:
  65. Path(FAILED_RECORD).write_text(json.dumps(keys, ensure_ascii=False, indent=2), encoding="utf-8")
  66. # -------------------- 爬虫核心 --------------------
  67. async def fetch_page(client: httpx.AsyncClient, url: str) -> Optional[str]:
  68. for attempt in range(1, RETRY_PER_PAGE + 1):
  69. try:
  70. resp = await client.get(url)
  71. resp.raise_for_status()
  72. return resp.text
  73. except httpx.HTTPError as exc:
  74. log.error(f"[{attempt}/{RETRY_PER_PAGE}] 请求失败 {url} -> {exc}")
  75. await asyncio.sleep(2 ** attempt)
  76. return None
  77. async def crawl_one_key(key: str, client: httpx.AsyncClient, sem: asyncio.Semaphore) -> bool:
  78. """抓取单个 key,成功返回 True"""
  79. async with sem:
  80. base_url = f"https://www.kaizty.com/photos/{key}.html?page="
  81. data: Dict[str, str] = {}
  82. folder_name = "default"
  83. n = 1
  84. for page in range(1, MAX_PAGE + 1):
  85. url = base_url + str(page)
  86. html = await fetch_page(client, url)
  87. if html is None:
  88. continue
  89. if "EMPTY" in html:
  90. log.info(f"{key} 第 {page} 页为空,终止翻页")
  91. break
  92. # 第一页解析标题
  93. if page == 1:
  94. title_match = TITLE_RE.search(html)
  95. if title_match:
  96. folder_name = clean_folder_name(title_match.group(1))
  97. log.info(f"{key} 专辑名 -> {folder_name}")
  98. links = IMG_RE.findall(html)
  99. if links:
  100. for link in links:
  101. suffix = link.split(".")[-1]
  102. img_name = f"{n:03d}.{suffix}"
  103. data[img_name] = link
  104. n += 1
  105. if not data:
  106. log.warning(f"{key} 未解析到任何图片链接")
  107. return False
  108. # 写 json
  109. base = Path("downloads") / folder_name
  110. base.mkdir(parents=True, exist_ok=True)
  111. json_path = base / f"{key}.json"
  112. async with aiofiles.open(json_path, "w", encoding="utf-8") as f:
  113. await f.write(json.dumps(data, ensure_ascii=False, indent=2))
  114. log.info(f"{key} 已保存 -> {json_path} ({len(data)} 张)")
  115. return True
  116. # -------------------- 主流程 --------------------
  117. async def main() -> None:
  118. keys = load_keys()
  119. failed_keys = load_failed_keys()
  120. if failed_keys:
  121. log.info(f"优先重试上次失败 key: {len(failed_keys)} 个")
  122. all_keys = list(set(keys + failed_keys))
  123. proxy = PROXY if PROXY else None
  124. limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
  125. async with httpx.AsyncClient(
  126. limits=limits, timeout=TIMEOUT, proxies=proxy, verify=True
  127. ) as client:
  128. sem = asyncio.Semaphore(CONCURRENCY)
  129. tasks = [crawl_one_key(k, client, sem) for k in all_keys]
  130. results = await tqdm_asyncio.gather(*tasks, desc="Crawling")
  131. # 统计失败
  132. new_failed = [k for k, ok in zip(all_keys, results) if not ok]
  133. if new_failed:
  134. save_failed_keys(new_failed)
  135. log.warning(f"本轮仍有 {len(new_failed)} 个 key 失败,已写入 {FAILED_RECORD}")
  136. else:
  137. Path(FAILED_RECORD).unlink(missing_ok=True)
  138. log.info("全部 key 抓取完成!")
  139. if __name__ == "__main__":
  140. try:
  141. asyncio.run(main())
  142. except KeyboardInterrupt:
  143. log.info("用户中断,抓取结束")