| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- import time
- import asyncio
- import random
- import re
- import json
- import os
- import concurrent.futures
- import httpx
- max_workers = 2
- proxies="http://127.0.0.1:7890"
- def check_urls_json_exists(key):
- downloads_path = os.path.join(os.getcwd(), "downloads")
- for root, dirs, files in os.walk(downloads_path):
- if f"{key}.json" in files:
- json_path = root.split('/')[-1]
- print(f'json文件已存在 {json_path} 中')
- return True
- return False
- def check_and_load_keys():
- # 从 keys.txt 文件中读取 key
- keys = []
- keys_file = os.path.join(os.getcwd(), "keys.txt")
- if not os.path.exists(keys_file):
- print("keys.txt 文件不存在\n新建keys.txt文件。")
- with open(keys_file, "w", encoding="utf-8") as f:
- f.write("")
- exit(0)
- with open(keys_file, "r", encoding="utf-8") as f:
- keys = [line.strip() for line in f.readlines()]
- if keys:
- return keys
- else:
- print("keys.txt 文件为空\n请填写key。")
- exit(0)
- async def fetch_page(client, url):
- try:
- response = await client.get(url)
- response.raise_for_status() # 检查请求是否成功
- return response.text
- except httpx.HTTPError as e:
- print(f"请求失败: {e}")
- return None
- def extract_image_links(content):
- # 使用正则表达式提取图片链接
- pattern = r'<meta itemprop="image" content="(.*?)">'
- image_links = re.findall(pattern, content)
- return image_links
- def clean_folder_name(title):
- # 清洗标题,使其成为 Windows 文件夹合法字符
- invalid_chars = r'[<>:"/\\|?*\x00-\x1F]'
- title = re.sub(invalid_chars, '_', title) # 替换非法字符为下划线
- title = title.replace(" ", "") # 删除空格
- title = title.replace("_", "") # 删除下划线
- return title.strip()
- async def get_urls(key):
- # 这里判定一下, 这个 key 是否已经爬取过
- is_exists = check_urls_json_exists(key)
- if is_exists:
- print(f"{key}.json 文件已存在,跳过爬取。")
- return
- base_url = f"https://www.kaizty.com/photos/{key}.html?page="
- data = {}
- folder_name = "default_folder" # 默认文件夹名
- async with httpx.AsyncClient(proxies=proxies) as client:
- n = 1
- for page in range(1, 30):
- url = base_url + str(page)
- print(f"正在爬取页面: {url}")
- content = await fetch_page(client, url)
- if content is None:
- print(f"无法获取页面内容: {url}")
- continue
- # 检查页面内容是否为空
- if "EMPTY" in content:
- print("页面内容为空,停止爬取。")
- break
- # 获取标题(仅在第一页获取)
- if page == 1:
- title_pattern = r'<title>(.*?)</title>'
- title_match = re.search(title_pattern, content)
- if title_match:
- title = title_match.group(1)
- folder_name = clean_folder_name(title)
- print(f"页面标题: {title}")
- print(f"清洗后的文件夹名: {folder_name}")
- else:
- print("无法获取页面标题,使用默认文件夹名。")
- # 提取图片链接
- image_links = extract_image_links(content)
- if image_links:
- print(f"在页面 {url} 中找到图片链接:")
- for link in image_links:
- print(link)
- prefix = str(n).zfill(3)
- suffix = link.split('.')[-1]
- img_name = f'{prefix}.{suffix}'
- data[img_name] = link
- n += 1
- else:
- print(f"页面 {url} 中未找到图片链接。")
- # 创建文件夹并保存数据
- downloads_path = os.path.join(os.getcwd(), "downloads")
- if not os.path.exists(downloads_path):
- os.makedirs(downloads_path)
- print("创建了 downloads 文件夹。")
- folder_path = os.path.join(downloads_path, folder_name)
- if not os.path.exists(folder_path):
- os.makedirs(folder_path)
- print(f"创建了文件夹: {folder_path}")
- data_file_path = os.path.join(folder_path, f"{key}.json")
- with open(data_file_path, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=4)
- print(f"数据已保存到 {data_file_path}")
- return [folder_name, data_file_path]
- def load_imgs_url_and_patn():
- result = []
- downloads_path = os.path.join(os.getcwd(), "downloads")
- for root, dirs, files in os.walk(downloads_path):
- for file in files:
- if file.endswith(".json"):
- json_path = os.path.join(root, file)
- with open(json_path, "r", encoding="utf-8") as f:
- data = json.load(f)
- for img_name, img_url in data.items():
- img_path = os.path.join(root, img_name)
- if not os.path.exists(img_path):
- result.append([img_path, img_url])
- return result
- def save_img(client, img_path, img_url, max_retries=999):
- retries = 0
- headers = {
- "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
- "Accept-Encoding": "gzip, deflate, br, zstd",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Priority": "u=0, i",
- "Sec-CH-UA": '"Chromium";v="134", "Not:A-Brand";v="24", "Google Chrome";v="134"',
- "Sec-CH-UA-Mobile": "?1",
- "Sec-CH-UA-Platform": '"Android"',
- "Sec-Fetch-Dest": "document",
- "Sec-Fetch-Mode": "navigate",
- "Sec-Fetch-Site": "none",
- "Sec-Fetch-User": "?1",
- "Upgrade-Insecure-Requests": "1",
- "User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Mobile Safari/537.36"
- }
- while retries < max_retries:
- try:
- # 使用传入的 client 下载图片,并设置请求头
- response = client.get(img_url, headers=headers, timeout=10)
- response.raise_for_status() # 检查请求是否成功
- # 保存图片到指定路径
- os.makedirs(os.path.dirname(img_path), exist_ok=True)
- with open(img_path, "wb") as f:
- f.write(response.content)
- print(f"图片已下载并保存到 {img_path}")
- time.sleep(random.uniform(1, 1.5))
- return # 成功下载后退出函数
- except httpx.HTTPStatusError as e:
- switch_to_random_proxy()
- if e.response.status_code == 429:
- # 如果是 429 错误,获取 Retry-After 时间
- retry_after = int(e.response.headers.get('Retry-After', 3))
- print(f"遇到 429 错误,等待 {retry_after} 秒后重试...")
- time.sleep(retry_after)
- retries += 1
- else:
- print(f"下载图片失败: {img_path.split('/')[-1]},错误码: {e.response.status_code}")
- break
- except Exception as e:
- print(f"保存图片时发生错误: {e}")
- break
- if retries == max_retries:
- print(f"图片下载失败,已达到最大重试次数: {img_path}")
- def switch_to_random_proxy(clash_api_url="http://127.0.0.1:9090", group_name="GLOBAL"):
- """
- 随机切换代理组中的一个节点(排除当前节点和 DIRECT/REJECT)
- :param clash_api_url: Clash RESTful API 地址,默认为 "http://127.0.0.1:9090"
- :param group_name: 代理组名称,默认为 "GLOBAL"
- """
- try:
- # 获取代理组的所有节点
- response = httpx.get(f"{clash_api_url}/proxies")
- response.raise_for_status()
- proxies = response.json()
- if group_name not in proxies['proxies']:
- print(f"代理组 '{group_name}' 不存在")
- return
- group_info = proxies['proxies'][group_name]
- if group_info['type'] != 'Selector':
- print(f"'{group_name}' 不是 Selector 类型的代理组")
- return
- # 获取当前使用的节点
- current_node = group_info['now']
- print(f"当前节点: {current_node}")
- # 获取所有可选节点(排除 DIRECT 和 REJECT)
- nodes = [node for node in group_info['all'] if node not in ["DIRECT", "REJECT"]]
- if not nodes:
- print("没有可用的代理节点")
- return
- # 随机选择一个非当前节点的代理
- available_nodes = [node for node in nodes if node != current_node]
- if not available_nodes:
- print("没有其他可用的代理节点")
- return
- random_node = random.choice(available_nodes)
- print(f"正在切换到随机节点: {random_node}")
- # 切换节点
- switch_url = f"{clash_api_url}/proxies/{group_name}"
- response = httpx.put(switch_url, json={"name": random_node})
- if response.status_code == 204:
- print(f"成功切换到节点: {random_node}")
- else:
- print(f"切换节点失败: {response.status_code}")
- except httpx.exceptions.RequestException as e:
- print(f"请求失败: {e}")
- def main():
- keys = check_and_load_keys()
- # 在这里获取当前路径并且判定如果没有downloads文件夹的话,就创建一个
- downloads_path = os.path.join(os.getcwd(), "downloads")
- if not os.path.exists(downloads_path):
- os.makedirs(downloads_path)
- print("创建了 downloads 文件夹。")
- for key in keys:
- # 调用异步函数
- result = asyncio.run(get_urls(key))
- if result:
- folder_name = result[0]
- data_file_path = result[1]
- print(f"处理完成,文件夹名称: {folder_name}, 数据保存路径: {data_file_path}")
- print(f'已获取全部keys的url数据, 开始下载图片')
- time.sleep(0.1)
- all_data = load_imgs_url_and_patn()
- # 创建一个全局的 httpx.Client 实例
- with httpx.Client(proxies=proxies) as client:
- # 使用线程池并发下载图片
- with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
- futures = []
- for img_path, img_url in all_data:
- futures.append(executor.submit(save_img, client, img_path, img_url))
- # 等待所有线程完成
- for future in concurrent.futures.as_completed(futures):
- future.result() # 捕获异常(如果有)
- print("所有图片下载完成!")
- if __name__ == "__main__":
- main()
|