1step.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import asyncio
  2. import re
  3. import json
  4. import os
  5. import httpx
  6. def check_urls_json_exists(key):
  7. downloads_path = os.path.join(os.getcwd(), "downloads")
  8. for root, dirs, files in os.walk(downloads_path):
  9. if f"{key}.json" in files:
  10. json_path = root.split('/')[-1]
  11. print(f'json文件已存在 {json_path} 中')
  12. return True
  13. return False
  14. def check_and_load_keys():
  15. # 从 keys.txt 文件中读取 key
  16. keys = []
  17. keys_file = os.path.join(os.getcwd(), "keys.txt")
  18. if not os.path.exists(keys_file):
  19. print("keys.txt 文件不存在\n新建keys.txt文件。")
  20. with open(keys_file, "w", encoding="utf-8") as f:
  21. f.write("")
  22. exit(0)
  23. with open(keys_file, "r", encoding="utf-8") as f:
  24. keys = [line.strip() for line in f.readlines()]
  25. if keys:
  26. return list(set(keys))
  27. else:
  28. print("keys.txt 文件为空\n请填写key。")
  29. exit(0)
  30. async def fetch_page(client, url):
  31. try:
  32. response = await client.get(url)
  33. response.raise_for_status() # 检查请求是否成功
  34. return response.text
  35. except httpx.HTTPError as e:
  36. print(f"请求失败: {e}")
  37. return None
  38. def extract_image_links(content):
  39. # 使用正则表达式提取图片链接
  40. pattern = r'<meta itemprop="image" content="(.*?)">'
  41. image_links = re.findall(pattern, content)
  42. return image_links
  43. def clean_folder_name(title):
  44. # 清洗标题,使其成为 Windows 文件夹合法字符
  45. invalid_chars = r'[<>:"/\\|?*\x00-\x1F]'
  46. title = re.sub(invalid_chars, '_', title) # 替换非法字符为下划线
  47. title = title.replace(" ", "") # 删除空格
  48. title = title.replace("_", "") # 删除下划线
  49. return title.strip()
  50. async def get_urls(key):
  51. # 这里判定一下, 这个 key 是否已经爬取过
  52. is_exists = check_urls_json_exists(key)
  53. if is_exists:
  54. print(f"{key}.json 文件已存在,跳过爬取。")
  55. return
  56. base_url = f"https://www.kaizty.com/photos/{key}.html?page="
  57. data = {}
  58. folder_name = "default_folder" # 默认文件夹名
  59. async with httpx.AsyncClient(proxy="http://127.0.0.1:7890") as client:
  60. n = 1
  61. retry_count = 5
  62. for page in range(1, 30):
  63. url = base_url + str(page)
  64. print(f"正在爬取页面: {url}")
  65. content = await fetch_page(client, url)
  66. if content is None:
  67. print(f"无法获取页面内容: {url}")
  68. if retry_count > 0:
  69. retry_count -= 1
  70. continue
  71. else:
  72. print(f"{key} 爬取失败,跳过")
  73. break
  74. # 检查页面内容是否为空
  75. if "EMPTY" in content:
  76. print("页面内容为空,停止爬取。")
  77. break
  78. # 获取标题(仅在第一页获取)
  79. if page == 1:
  80. title_pattern = r'<title>(.*?)</title>'
  81. title_match = re.search(title_pattern, content)
  82. if title_match:
  83. title = title_match.group(1)
  84. folder_name = clean_folder_name(title)
  85. print(f"页面标题: {title}")
  86. print(f"清洗后的文件夹名: {folder_name}")
  87. else:
  88. print("无法获取页面标题,使用默认文件夹名。")
  89. # 提取图片链接
  90. image_links = extract_image_links(content)
  91. if image_links:
  92. print(f"在页面 {url} 中找到图片链接:")
  93. for link in image_links:
  94. print(link)
  95. prefix = str(n).zfill(3)
  96. suffix = link.split('.')[-1]
  97. img_name = f'{prefix}.{suffix}'
  98. data[img_name] = link
  99. n += 1
  100. else:
  101. print(f"页面 {url} 中未找到图片链接。")
  102. # 如果 data 有数据, 则保存, 没有则直接跳过
  103. if not data:
  104. return {}
  105. # 创建文件夹并保存数据
  106. downloads_path = os.path.join(os.getcwd(), "downloads")
  107. if not os.path.exists(downloads_path):
  108. os.makedirs(downloads_path)
  109. print("创建了 downloads 文件夹。")
  110. folder_path = os.path.join(downloads_path, folder_name)
  111. if not os.path.exists(folder_path):
  112. os.makedirs(folder_path)
  113. print(f"创建了文件夹: {folder_path}")
  114. data_file_path = os.path.join(folder_path, f"{key}.json")
  115. with open(data_file_path, "w", encoding="utf-8") as f:
  116. json.dump(data, f, ensure_ascii=False, indent=4)
  117. print(f"数据已保存到 {data_file_path}")
  118. return [folder_name, data_file_path]
  119. def load_imgs_url_and_patn():
  120. result = []
  121. downloads_path = os.path.join(os.getcwd(), "downloads")
  122. for root, dirs, files in os.walk(downloads_path):
  123. for file in files:
  124. if file.endswith(".json"):
  125. json_path = os.path.join(root, file)
  126. with open(json_path, "r", encoding="utf-8") as f:
  127. data = json.load(f)
  128. for img_name, img_url in data.items():
  129. img_path = os.path.join(root, img_name)
  130. if not os.path.exists(img_path):
  131. result.append([img_path, img_url])
  132. return result
  133. def start_get_urls():
  134. keys = check_and_load_keys()
  135. # 在这里获取当前路径并且判定如果没有downloads文件夹的话,就创建一个
  136. downloads_path = os.path.join(os.getcwd(), "downloads")
  137. if not os.path.exists(downloads_path):
  138. os.makedirs(downloads_path)
  139. print("创建了 downloads 文件夹。")
  140. for key in keys:
  141. # 调用异步函数
  142. result = asyncio.run(get_urls(key))
  143. if result:
  144. folder_name = result[0]
  145. data_file_path = result[1]
  146. print(f"处理完成,文件夹名称: {folder_name}, 数据保存路径: {data_file_path}")
  147. else:
  148. print(f"没有获取到数据,跳过")
  149. print(f'已获取全部keys的url数据')
  150. if __name__ == "__main__":
  151. start_get_urls()