import asyncio import time import json import os import httpx async def download_image(session, img_path, img_url, retry_count=3): for attempt in range(retry_count): try: # 发起请求下载图片 response = await session.get(img_url) response.raise_for_status() # 检查请求是否成功 # 确保图片文件夹存在 os.makedirs(os.path.dirname(img_path), exist_ok=True) # 将图片内容写入文件 with open(img_path, 'wb') as f: f.write(response.content) # print(f"图片下载完成: {img_path}") return True except httpx.HTTPStatusError as e: if e.response.status_code == 429: wait_time = 2 ** attempt # 指数退避策略 # print(f"429 Too Many Requests, 等待 {wait_time} 秒后重试...") await asyncio.sleep(wait_time) else: # print(f"下载图片失败: {img_url}, 错误信息: {e}") return False except Exception as e: # print(f"下载图片失败: {img_url}, 错误信息: {e}") await asyncio.sleep(1) # 简单的重试间隔 # print(f"图片下载失败,达到最大重试次数: {img_url}") return False # 异步下载所有图片 async def download_all_images(ready_to_download_list, max_concurrent_downloads=5): async with httpx.AsyncClient() as session: tasks = [] semaphore = asyncio.Semaphore(max_concurrent_downloads) # 限制并发数量 async def bounded_download(item): async with semaphore: return await download_image(session, item['img_path'], item['img_url']) for item in ready_to_download_list: task = asyncio.create_task(bounded_download(item)) tasks.append(task) # 等待所有任务完成 await asyncio.gather(*tasks) # 加载需要下载的图片列表 def load_ready_to_download_list(): result = [] # 获取项目根目录 project_root = os.path.dirname(os.path.abspath(__file__)) downloads_path = os.path.join(project_root, 'downloads') all_path = [] for root, dirs, files in os.walk(downloads_path): for dir in dirs: all_path.append(os.path.join(root, dir)) for path in all_path: json_files = [f for f in os.listdir(path) if f.endswith('.json')] if len(json_files) != 1: continue json_file = json_files[0] json_path = os.path.join(path, json_file) with open(json_path, 'r', encoding='utf-8') as f: img_list = json.load(f) for k, v in img_list.items(): img_path = os.path.join(path, k) if os.path.exists(img_path): continue result.append({ 'img_path': img_path, 'img_url': v }) return result # 主函数 async def start_download(): for retry in range(3): ready_to_download_list = load_ready_to_download_list() print(f"准备下载图片共: {len(ready_to_download_list)} 张") if not ready_to_download_list: print("已全部下载完成或没有需要下载的图片") return await download_all_images(ready_to_download_list) time.sleep(2) # 间隔2秒后重新检查 if __name__ == "__main__": asyncio.run(start_download())