import asyncio import re import json import os import httpx def check_urls_json_exists(key): downloads_path = os.path.join(os.getcwd(), "downloads") for root, dirs, files in os.walk(downloads_path): if f"{key}.json" in files: json_path = root.split('/')[-1] print(f'json文件已存在 {json_path} 中') return True return False def check_and_load_keys(): # 从 keys.txt 文件中读取 key keys = [] keys_file = os.path.join(os.getcwd(), "keys.txt") if not os.path.exists(keys_file): print("keys.txt 文件不存在\n新建keys.txt文件。") with open(keys_file, "w", encoding="utf-8") as f: f.write("") exit(0) with open(keys_file, "r", encoding="utf-8") as f: keys = [line.strip() for line in f.readlines()] if keys: return list(set(keys)) else: print("keys.txt 文件为空\n请填写key。") exit(0) async def fetch_page(client, url): try: response = await client.get(url) response.raise_for_status() # 检查请求是否成功 return response.text except httpx.HTTPError as e: print(f"请求失败: {e}") return None def extract_image_links(content): # 使用正则表达式提取图片链接 pattern = r'' image_links = re.findall(pattern, content) return image_links def clean_folder_name(title): # 清洗标题,使其成为 Windows 文件夹合法字符 invalid_chars = r'[<>:"/\\|?*\x00-\x1F]' title = re.sub(invalid_chars, '_', title) # 替换非法字符为下划线 title = title.replace(" ", "") # 删除空格 title = title.replace("_", "") # 删除下划线 return title.strip() async def get_urls(key): # 这里判定一下, 这个 key 是否已经爬取过 is_exists = check_urls_json_exists(key) if is_exists: print(f"{key}.json 文件已存在,跳过爬取。") return base_url = f"https://www.kaizty.com/photos/{key}.html?page=" data = {} folder_name = "default_folder" # 默认文件夹名 async with httpx.AsyncClient(proxy="http://127.0.0.1:7890") as client: n = 1 retry_count = 5 for page in range(1, 30): url = base_url + str(page) print(f"正在爬取页面: {url}") content = await fetch_page(client, url) if content is None: print(f"无法获取页面内容: {url}") if retry_count > 0: retry_count -= 1 continue else: print(f"{key} 爬取失败,跳过") break # 检查页面内容是否为空 if "EMPTY" in content: print("页面内容为空,停止爬取。") break # 获取标题(仅在第一页获取) if page == 1: title_pattern = r'(.*?)' title_match = re.search(title_pattern, content) if title_match: title = title_match.group(1) folder_name = clean_folder_name(title) print(f"页面标题: {title}") print(f"清洗后的文件夹名: {folder_name}") else: print("无法获取页面标题,使用默认文件夹名。") # 提取图片链接 image_links = extract_image_links(content) if image_links: print(f"在页面 {url} 中找到图片链接:") for link in image_links: print(link) prefix = str(n).zfill(3) suffix = link.split('.')[-1] img_name = f'{prefix}.{suffix}' data[img_name] = link n += 1 else: print(f"页面 {url} 中未找到图片链接。") # 如果 data 有数据, 则保存, 没有则直接跳过 if not data: return {} # 创建文件夹并保存数据 downloads_path = os.path.join(os.getcwd(), "downloads") if not os.path.exists(downloads_path): os.makedirs(downloads_path) print("创建了 downloads 文件夹。") folder_path = os.path.join(downloads_path, folder_name) if not os.path.exists(folder_path): os.makedirs(folder_path) print(f"创建了文件夹: {folder_path}") data_file_path = os.path.join(folder_path, f"{key}.json") with open(data_file_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=4) print(f"数据已保存到 {data_file_path}") return [folder_name, data_file_path] def load_imgs_url_and_patn(): result = [] downloads_path = os.path.join(os.getcwd(), "downloads") for root, dirs, files in os.walk(downloads_path): for file in files: if file.endswith(".json"): json_path = os.path.join(root, file) with open(json_path, "r", encoding="utf-8") as f: data = json.load(f) for img_name, img_url in data.items(): img_path = os.path.join(root, img_name) if not os.path.exists(img_path): result.append([img_path, img_url]) return result def start_get_urls(): keys = check_and_load_keys() # 在这里获取当前路径并且判定如果没有downloads文件夹的话,就创建一个 downloads_path = os.path.join(os.getcwd(), "downloads") if not os.path.exists(downloads_path): os.makedirs(downloads_path) print("创建了 downloads 文件夹。") for key in keys: # 调用异步函数 result = asyncio.run(get_urls(key)) if result: folder_name = result[0] data_file_path = result[1] print(f"处理完成,文件夹名称: {folder_name}, 数据保存路径: {data_file_path}") else: print(f"没有获取到数据,跳过") print(f'已获取全部keys的url数据') if __name__ == "__main__": start_get_urls()