| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- import aiofiles
- import httpx
- from typing import Optional
- import os
- class Downloader:
- def __init__(self):
- self.output_dir = "downloads"
- os.makedirs(self.output_dir, exist_ok=True)
-
- async def download(self, proxy_str: str, url: str) -> str:
- """
- 下载文件的主要逻辑
- """
- try:
- # 如果proxy_str不为空,构建代理配置
- proxy = None
- if proxy_str and ":" in proxy_str:
- ip, port = proxy_str.split(":", 1)
- proxy = f"http://{ip}:{port}"
-
- # 使用 httpx 异步下载
- async with httpx.AsyncClient(proxies=proxy, timeout=30.0) as client:
- response = await client.get(url)
- response.raise_for_status()
-
- # 获取文件名
- filename = self._get_filename(url, response)
- filepath = os.path.join(self.output_dir, filename)
-
- # 保存文件
- async with aiofiles.open(filepath, 'wb') as f:
- await f.write(response.content)
-
- return f"下载成功: {filename}\n保存路径: {filepath}\n文件大小: {len(response.content)} bytes"
-
- except Exception as e:
- raise Exception(f"下载过程中出错: {str(e)}")
-
- def _get_filename(self, url: str, response: httpx.Response) -> str:
- """从 URL 或响应头中获取文件名"""
- # 从 URL 中提取文件名
- if '/' in url:
- filename = url.split('/')[-1]
- if '?' in filename:
- filename = filename.split('?')[0]
- else:
- filename = "downloaded_file"
-
- # 如果没有扩展名,尝试从 Content-Type 推断
- if '.' not in filename:
- content_type = response.headers.get('content-type', '')
- if 'image' in content_type:
- ext = content_type.split('/')[-1]
- filename = f"{filename}.{ext}"
-
- return filename or "downloaded_file"
-
- async def download_image(self, proxy_str: str, url: str) -> str:
- """专门下载图片的方法"""
- # 这里可以添加图片下载的特殊逻辑
- return await self.download(proxy_str, url)
|