# -*- coding: utf-8 -*- import time import os import zipfile import httpx from odoo import models, fields, api from odoo.exceptions import UserError class CollectionImg(models.Model): _name = 'collection.img' _description = 'Collection Img' _order = 'create_date DESC' line_ids = fields.One2many('collection.img.line', 'collection_img_id', string='Img Lines') name = fields.Char(string='Name', default=str(int(time.time()))) target_site_id = fields.Many2one('collection.img.target.site', string='Target Base Site') file_title = fields.Char(string='File Title') img_set_url = fields.Char(string='Image Set URL') show_browser = fields.Boolean(string='Show Browser', default=False) image_count = fields.Integer(string='Image Count') def btn_get_img_data(self): # 删除之前保存的 line self.btn_clear_data() if self.target_site_id.name not in self.img_set_url: raise UserError('target web site incorrect') self._check_file_exist() try: exec(self.target_site_id.codes) except Exception as e: raise UserError(str(e)) def btn_download_img(self): self._check_file_exist() # 检查一下 line 有没有缺少 url for line in self.line_ids: if not line.url: raise UserError(f'{line.name}{line.serial} Missing image address') # 先查看下载位置的文件夹, 有没有这个图片合集的文件夹 download_path = os.path.join('/tmp/collection_downloads', self.target_site_id.name, self.file_title) if not os.path.exists(download_path): os.mkdir(download_path) # 准备一个列表存放图片路径, 然后打包下载 img_path_list = [] # 一行一行处理数据, 以后或者成使用多线程或者对进程或协程 for line in self.line_ids: if line.set_name: # 这里是有分图片合集的情况, 即类似 comico pass else: # 这里是没有分图片合集的情况, 即类似 图片资源站 img_file_path = os.path.join(download_path, line.name + '.' + line.image_suffix) if os.path.exists(img_file_path): # 这个图片存在就直接加入打包列表, 否则就下载 img_path_list.append(img_file_path) line.update({'download_state': 'downloaded'}) continue # 这里开始下载图片 retry_count = 10 while retry_count: try: img = httpx.get(line.url, headers={ "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36" }) if img.status_code == 200: with open(img_file_path, 'wb') as f: f.write(img.content) line.update({'download_state': 'downloaded'}) img_path_list.append(img_file_path) break else: time.sleep(5) retry_count -= 1 except Exception as e: print(str(e)) time.sleep(5) retry_count -= 1 # 打这里所有的 img 都已经下载完, 并且所有图片路径已经存入 img_path_list 中 # 这里开始打包下载好的图片 zip_file_path = os.path.join(download_path, self.file_title + '.zip') with zipfile.ZipFile(zip_file_path, 'w') as zip_file: for img_path in img_path_list: zip_file.write(img_path, os.path.basename(img_path)) # 构建下载地址 download_url = f"{download_path}.zip" print(download_url) # 返回一个动作字典,用于下载文件 return { 'type': 'ir.actions.act_url', "url": download_url } def btn_clear_data(self): for line in self.line_ids: line.unlink() def _check_file_exist(self): # 如果下载位置的文件夹不存在, 则创建一个先 download_path = '/tmp/collection_downloads' if not os.path.exists(download_path): os.mkdir(download_path) # 然后查看对应网站的储存文件夹在不在,不在就创建一个 target_site_name = os.path.join(download_path, self.target_site_id.name) if not os.path.exists(target_site_name): os.mkdir(target_site_name) class CollectionImgLine(models.Model): _name = 'collection.img.line' _description = 'Collection Img Line' collection_img_id = fields.Many2one('collection.img', string='Collection Img', ondelete='cascade') name = fields.Char(string='Name') set_name = fields.Char(string='Set Name') serial = fields.Integer(string='Serial') url = fields.Char(string='URL') download_state = fields.Selection([('not_download', 'not download'), ('downloaded', 'downloaded')], string='Download State', default='not_download') image_suffix = fields.Char(string='Image Suffix') class CollectionImgTargetSite(models.Model): _name = 'collection.img.target.site' _description = 'Collection Img Target Site' name = fields.Char(string='Name') target_url = fields.Char(string='Target URL') codes = fields.Text(string='Codes')