| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- # -*- coding: utf-8 -*-
- import time
- import os
- import zipfile
- import httpx
- from odoo import models, fields, api
- from odoo.exceptions import UserError
- class CollectionImg(models.Model):
- _name = 'collection.img'
- _description = 'Collection Img'
- _order = 'create_date DESC'
- line_ids = fields.One2many('collection.img.line', 'collection_img_id', string='Img Lines')
- name = fields.Char(string='Name', default=str(int(time.time())))
- target_site_id = fields.Many2one('collection.img.target.site', string='Target Base Site')
- file_title = fields.Char(string='File Title')
- img_set_url = fields.Char(string='Image Set URL')
- show_browser = fields.Boolean(string='Show Browser', default=False)
- image_count = fields.Integer(string='Image Count')
- def btn_get_img_data(self):
- # 删除之前保存的 line
- self.btn_clear_data()
- if self.target_site_id.name not in self.img_set_url:
- raise UserError('target web site incorrect')
- self._check_file_exist()
- try:
- exec(self.target_site_id.codes)
- except Exception as e:
- raise UserError(str(e))
- def btn_download_img(self):
- self._check_file_exist()
- # 检查一下 line 有没有缺少 url
- for line in self.line_ids:
- if not line.url:
- raise UserError(f'{line.name}{line.serial} Missing image address')
- # 先查看下载位置的文件夹, 有没有这个图片合集的文件夹
- download_path = os.path.join('/tmp/collection_downloads', self.target_site_id.name, self.file_title)
- if not os.path.exists(download_path):
- os.mkdir(download_path)
- # 准备一个列表存放图片路径, 然后打包下载
- img_path_list = []
- # 一行一行处理数据, 以后或者成使用多线程或者对进程或协程
- for line in self.line_ids:
- if line.set_name:
- # 这里是有分图片合集的情况, 即类似 comico
- pass
- else:
- # 这里是没有分图片合集的情况, 即类似 图片资源站
- img_file_path = os.path.join(download_path, line.name + '.' + line.image_suffix)
- if os.path.exists(img_file_path): # 这个图片存在就直接加入打包列表, 否则就下载
- img_path_list.append(img_file_path)
- line.update({'download_state': 'downloaded'})
- continue
- # 这里开始下载图片
- retry_count = 10
- while retry_count:
- try:
- img = httpx.get(line.url, headers={
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
- })
- if img.status_code == 200:
- with open(img_file_path, 'wb') as f:
- f.write(img.content)
- line.update({'download_state': 'downloaded'})
- img_path_list.append(img_file_path)
- break
- else:
- time.sleep(5)
- retry_count -= 1
- except Exception as e:
- print(str(e))
- time.sleep(5)
- retry_count -= 1
- # 打这里所有的 img 都已经下载完, 并且所有图片路径已经存入 img_path_list 中
- # 这里开始打包下载好的图片
- zip_file_path = os.path.join(download_path, self.file_title + '.zip')
- with zipfile.ZipFile(zip_file_path, 'w') as zip_file:
- for img_path in img_path_list:
- zip_file.write(img_path, os.path.basename(img_path))
- # 构建下载地址
- download_url = f"{download_path}.zip"
- print(download_url)
- # 返回一个动作字典,用于下载文件
- return {
- 'type': 'ir.actions.act_url',
- "url": download_url
- }
- def btn_clear_data(self):
- for line in self.line_ids:
- line.unlink()
- def _check_file_exist(self):
- # 如果下载位置的文件夹不存在, 则创建一个先
- download_path = '/tmp/collection_downloads'
- if not os.path.exists(download_path):
- os.mkdir(download_path)
- # 然后查看对应网站的储存文件夹在不在,不在就创建一个
- target_site_name = os.path.join(download_path, self.target_site_id.name)
- if not os.path.exists(target_site_name):
- os.mkdir(target_site_name)
- class CollectionImgLine(models.Model):
- _name = 'collection.img.line'
- _description = 'Collection Img Line'
- collection_img_id = fields.Many2one('collection.img', string='Collection Img', ondelete='cascade')
- name = fields.Char(string='Name')
- set_name = fields.Char(string='Set Name')
- serial = fields.Integer(string='Serial')
- url = fields.Char(string='URL')
- download_state = fields.Selection([('not_download', 'not download'), ('downloaded', 'downloaded')],
- string='Download State', default='not_download')
- image_suffix = fields.Char(string='Image Suffix')
- class CollectionImgTargetSite(models.Model):
- _name = 'collection.img.target.site'
- _description = 'Collection Img Target Site'
- name = fields.Char(string='Name')
- target_url = fields.Char(string='Target URL')
- codes = fields.Text(string='Codes')
|