downloader.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import aiofiles
  2. import httpx
  3. from typing import Optional
  4. import os
  5. class Downloader:
  6. def __init__(self):
  7. self.output_dir = "downloads"
  8. os.makedirs(self.output_dir, exist_ok=True)
  9. async def download(self, proxy_str: str, url: str) -> str:
  10. """
  11. 下载文件的主要逻辑
  12. """
  13. try:
  14. # 如果proxy_str不为空,构建代理配置
  15. proxy = None
  16. if proxy_str and ":" in proxy_str:
  17. ip, port = proxy_str.split(":", 1)
  18. proxy = f"http://{ip}:{port}"
  19. # 使用 httpx 异步下载
  20. async with httpx.AsyncClient(proxies=proxy, timeout=30.0) as client:
  21. response = await client.get(url)
  22. response.raise_for_status()
  23. # 获取文件名
  24. filename = self._get_filename(url, response)
  25. filepath = os.path.join(self.output_dir, filename)
  26. # 保存文件
  27. async with aiofiles.open(filepath, 'wb') as f:
  28. await f.write(response.content)
  29. return f"下载成功: {filename}\n保存路径: {filepath}\n文件大小: {len(response.content)} bytes"
  30. except Exception as e:
  31. raise Exception(f"下载过程中出错: {str(e)}")
  32. def _get_filename(self, url: str, response: httpx.Response) -> str:
  33. """从 URL 或响应头中获取文件名"""
  34. # 从 URL 中提取文件名
  35. if '/' in url:
  36. filename = url.split('/')[-1]
  37. if '?' in filename:
  38. filename = filename.split('?')[0]
  39. else:
  40. filename = "downloaded_file"
  41. # 如果没有扩展名,尝试从 Content-Type 推断
  42. if '.' not in filename:
  43. content_type = response.headers.get('content-type', '')
  44. if 'image' in content_type:
  45. ext = content_type.split('/')[-1]
  46. filename = f"{filename}.{ext}"
  47. return filename or "downloaded_file"
  48. async def download_image(self, proxy_str: str, url: str) -> str:
  49. """专门下载图片的方法"""
  50. # 这里可以添加图片下载的特殊逻辑
  51. return await self.download(proxy_str, url)