| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- import asyncio
- import re
- import json
- import os
- import httpx
- def check_urls_json_exists(key):
- downloads_path = os.path.join(os.getcwd(), "downloads")
- for root, dirs, files in os.walk(downloads_path):
- if f"{key}.json" in files:
- json_path = root.split('/')[-1]
- print(f'json文件已存在 {json_path} 中')
- return True
- return False
- def check_and_load_keys():
- # 从 keys.txt 文件中读取 key
- keys = []
- keys_file = os.path.join(os.getcwd(), "keys.txt")
- if not os.path.exists(keys_file):
- print("keys.txt 文件不存在\n新建keys.txt文件。")
- with open(keys_file, "w", encoding="utf-8") as f:
- f.write("")
- exit(0)
- with open(keys_file, "r", encoding="utf-8") as f:
- keys = [line.strip() for line in f.readlines()]
- if keys:
- return list(set(keys))
- else:
- print("keys.txt 文件为空\n请填写key。")
- exit(0)
- async def fetch_page(client, url):
- try:
- response = await client.get(url)
- response.raise_for_status() # 检查请求是否成功
- return response.text
- except httpx.HTTPError as e:
- print(f"请求失败: {e}")
- return None
- def extract_image_links(content):
- # 使用正则表达式提取图片链接
- pattern = r'<meta itemprop="image" content="(.*?)">'
- image_links = re.findall(pattern, content)
- return image_links
- def clean_folder_name(title):
- # 清洗标题,使其成为 Windows 文件夹合法字符
- invalid_chars = r'[<>:"/\\|?*\x00-\x1F]'
- title = re.sub(invalid_chars, '_', title) # 替换非法字符为下划线
- title = title.replace(" ", "") # 删除空格
- title = title.replace("_", "") # 删除下划线
- return title.strip()
- async def get_urls(key):
- # 这里判定一下, 这个 key 是否已经爬取过
- is_exists = check_urls_json_exists(key)
- if is_exists:
- print(f"{key}.json 文件已存在,跳过爬取。")
- return
- base_url = f"https://www.kaizty.com/photos/{key}.html?page="
- data = {}
- folder_name = "default_folder" # 默认文件夹名
- async with httpx.AsyncClient(proxy="http://127.0.0.1:7890") as client:
- n = 1
- retry_count = 5
- for page in range(1, 30):
- url = base_url + str(page)
- print(f"正在爬取页面: {url}")
- content = await fetch_page(client, url)
- if content is None:
- print(f"无法获取页面内容: {url}")
- if retry_count > 0:
- retry_count -= 1
- continue
- else:
- print(f"{key} 爬取失败,跳过")
- break
- # 检查页面内容是否为空
- if "EMPTY" in content:
- print("页面内容为空,停止爬取。")
- break
- # 获取标题(仅在第一页获取)
- if page == 1:
- title_pattern = r'<title>(.*?)</title>'
- title_match = re.search(title_pattern, content)
- if title_match:
- title = title_match.group(1)
- folder_name = clean_folder_name(title)
- print(f"页面标题: {title}")
- print(f"清洗后的文件夹名: {folder_name}")
- else:
- print("无法获取页面标题,使用默认文件夹名。")
- # 提取图片链接
- image_links = extract_image_links(content)
- if image_links:
- print(f"在页面 {url} 中找到图片链接:")
- for link in image_links:
- print(link)
- prefix = str(n).zfill(3)
- suffix = link.split('.')[-1]
- img_name = f'{prefix}.{suffix}'
- data[img_name] = link
- n += 1
- else:
- print(f"页面 {url} 中未找到图片链接。")
- # 如果 data 有数据, 则保存, 没有则直接跳过
- if not data:
- return {}
- # 创建文件夹并保存数据
- downloads_path = os.path.join(os.getcwd(), "downloads")
- if not os.path.exists(downloads_path):
- os.makedirs(downloads_path)
- print("创建了 downloads 文件夹。")
- folder_path = os.path.join(downloads_path, folder_name)
- if not os.path.exists(folder_path):
- os.makedirs(folder_path)
- print(f"创建了文件夹: {folder_path}")
- data_file_path = os.path.join(folder_path, f"{key}.json")
- with open(data_file_path, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=4)
- print(f"数据已保存到 {data_file_path}")
- return [folder_name, data_file_path]
- def load_imgs_url_and_patn():
- result = []
- downloads_path = os.path.join(os.getcwd(), "downloads")
- for root, dirs, files in os.walk(downloads_path):
- for file in files:
- if file.endswith(".json"):
- json_path = os.path.join(root, file)
- with open(json_path, "r", encoding="utf-8") as f:
- data = json.load(f)
- for img_name, img_url in data.items():
- img_path = os.path.join(root, img_name)
- if not os.path.exists(img_path):
- result.append([img_path, img_url])
- return result
- def start_get_urls():
- keys = check_and_load_keys()
- # 在这里获取当前路径并且判定如果没有downloads文件夹的话,就创建一个
- downloads_path = os.path.join(os.getcwd(), "downloads")
- if not os.path.exists(downloads_path):
- os.makedirs(downloads_path)
- print("创建了 downloads 文件夹。")
- for key in keys:
- # 调用异步函数
- result = asyncio.run(get_urls(key))
- if result:
- folder_name = result[0]
- data_file_path = result[1]
- print(f"处理完成,文件夹名称: {folder_name}, 数据保存路径: {data_file_path}")
- else:
- print(f"没有获取到数据,跳过")
- print(f'已获取全部keys的url数据')
- if __name__ == "__main__":
- start_get_urls()
|