import asyncio
import re
import json
import os
import httpx
def check_urls_json_exists(key):
downloads_path = os.path.join(os.getcwd(), "downloads")
for root, dirs, files in os.walk(downloads_path):
if f"{key}.json" in files:
json_path = root.split('/')[-1]
print(f'json文件已存在 {json_path} 中')
return True
return False
def check_and_load_keys():
# 从 keys.txt 文件中读取 key
keys = []
keys_file = os.path.join(os.getcwd(), "keys.txt")
if not os.path.exists(keys_file):
print("keys.txt 文件不存在\n新建keys.txt文件。")
with open(keys_file, "w", encoding="utf-8") as f:
f.write("")
exit(0)
with open(keys_file, "r", encoding="utf-8") as f:
keys = [line.strip() for line in f.readlines()]
if keys:
return list(set(keys))
else:
print("keys.txt 文件为空\n请填写key。")
exit(0)
async def fetch_page(client, url):
try:
response = await client.get(url)
response.raise_for_status() # 检查请求是否成功
return response.text
except httpx.HTTPError as e:
print(f"请求失败: {e}")
return None
def extract_image_links(content):
# 使用正则表达式提取图片链接
pattern = r''
image_links = re.findall(pattern, content)
return image_links
def clean_folder_name(title):
# 清洗标题,使其成为 Windows 文件夹合法字符
invalid_chars = r'[<>:"/\\|?*\x00-\x1F]'
title = re.sub(invalid_chars, '_', title) # 替换非法字符为下划线
title = title.replace(" ", "") # 删除空格
title = title.replace("_", "") # 删除下划线
return title.strip()
async def get_urls(key):
# 这里判定一下, 这个 key 是否已经爬取过
is_exists = check_urls_json_exists(key)
if is_exists:
print(f"{key}.json 文件已存在,跳过爬取。")
return
base_url = f"https://www.kaizty.com/photos/{key}.html?page="
data = {}
folder_name = "default_folder" # 默认文件夹名
async with httpx.AsyncClient(proxy="http://127.0.0.1:7890") as client:
n = 1
retry_count = 5
for page in range(1, 30):
url = base_url + str(page)
print(f"正在爬取页面: {url}")
content = await fetch_page(client, url)
if content is None:
print(f"无法获取页面内容: {url}")
if retry_count > 0:
retry_count -= 1
continue
else:
print(f"{key} 爬取失败,跳过")
break
# 检查页面内容是否为空
if "EMPTY" in content:
print("页面内容为空,停止爬取。")
break
# 获取标题(仅在第一页获取)
if page == 1:
title_pattern = r'
(.*?)'
title_match = re.search(title_pattern, content)
if title_match:
title = title_match.group(1)
folder_name = clean_folder_name(title)
print(f"页面标题: {title}")
print(f"清洗后的文件夹名: {folder_name}")
else:
print("无法获取页面标题,使用默认文件夹名。")
# 提取图片链接
image_links = extract_image_links(content)
if image_links:
print(f"在页面 {url} 中找到图片链接:")
for link in image_links:
print(link)
prefix = str(n).zfill(3)
suffix = link.split('.')[-1]
img_name = f'{prefix}.{suffix}'
data[img_name] = link
n += 1
else:
print(f"页面 {url} 中未找到图片链接。")
# 如果 data 有数据, 则保存, 没有则直接跳过
if not data:
return {}
# 创建文件夹并保存数据
downloads_path = os.path.join(os.getcwd(), "downloads")
if not os.path.exists(downloads_path):
os.makedirs(downloads_path)
print("创建了 downloads 文件夹。")
folder_path = os.path.join(downloads_path, folder_name)
if not os.path.exists(folder_path):
os.makedirs(folder_path)
print(f"创建了文件夹: {folder_path}")
data_file_path = os.path.join(folder_path, f"{key}.json")
with open(data_file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
print(f"数据已保存到 {data_file_path}")
return [folder_name, data_file_path]
def load_imgs_url_and_patn():
result = []
downloads_path = os.path.join(os.getcwd(), "downloads")
for root, dirs, files in os.walk(downloads_path):
for file in files:
if file.endswith(".json"):
json_path = os.path.join(root, file)
with open(json_path, "r", encoding="utf-8") as f:
data = json.load(f)
for img_name, img_url in data.items():
img_path = os.path.join(root, img_name)
if not os.path.exists(img_path):
result.append([img_path, img_url])
return result
def start_get_urls():
keys = check_and_load_keys()
# 在这里获取当前路径并且判定如果没有downloads文件夹的话,就创建一个
downloads_path = os.path.join(os.getcwd(), "downloads")
if not os.path.exists(downloads_path):
os.makedirs(downloads_path)
print("创建了 downloads 文件夹。")
for key in keys:
# 调用异步函数
result = asyncio.run(get_urls(key))
if result:
folder_name = result[0]
data_file_path = result[1]
print(f"处理完成,文件夹名称: {folder_name}, 数据保存路径: {data_file_path}")
else:
print(f"没有获取到数据,跳过")
print(f'已获取全部keys的url数据')
if __name__ == "__main__":
start_get_urls()