| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- # -*- coding: utf-8 -*-
- # step2 读取数据库, 下载未下载的图片
- import re
- import socket
- import sys
- import os
- import time
- import random
- from concurrent.futures import ThreadPoolExecutor
- import psycopg2
- import httpx
- sys.path.append(os.path.join(os.path.abspath(__file__).split('ResourceCollection')[0] + 'ResourceCollection'))
- project_root = os.path.join(os.path.abspath(__file__).split('ResourceCollection')[0] + 'ResourceCollection')
- class ImageCollectorStep2:
- def __init__(self):
- self.target = 'flaticon' # 整体目标文件夹/psql表名
- self.category = '' # 细分分类文件夹
- self.local_proxy = 0
- self.thread_count = 8
- self.title_selector = '#pack-view__inner > section.pack-view__header > h1'
- self.img_selector = '#pack-view__inner > section.search-result > ul > li:nth-child({}) > div > a > img'
- self.img_count_selector = '#pack-view__inner > section.pack-view__header > p'
- self.not_find_page_selector = '#viewport > div.errorpage.e404 > h1'
- self.psql_params = self.get_psql_params()
- def download_img(self, load_data, target_file_path):
- print('正在下载图片')
- with ThreadPoolExecutor(max_workers=self.thread_count) as executor:
- executor.map(self.single_img_download,
- [(index, data, load_data, target_file_path) for index, data in enumerate(load_data)])
- def single_img_download(self, args):
- index, data, load_data, target_file_path = args
- conn = psycopg2.connect(**self.psql_params)
- cursor = conn.cursor()
- id = data['id']
- name = data['name']
- target_site = data['target_site']
- file_title = data['file_title'].replace(' ', '_')
- set_name = data['set_name']
- serial = str(data['serial']).zfill(3)
- image_suffix = data['image_suffix']
- img_url = data['img_url']
- if self.category:
- category_path = os.path.join(target_file_path, self.category)
- if not os.path.exists(category_path):
- os.mkdir(category_path)
- title_file_path = os.path.join(category_path, file_title)
- if not os.path.exists(title_file_path):
- os.mkdir(title_file_path)
- # 否则,直接创建图片文件夹
- img_name = f'{file_title}_{serial}.{image_suffix}'
- img_file_path = os.path.join(title_file_path, img_name)
- else:
- title_file_path = os.path.join(target_file_path, file_title)
- if not os.path.exists(title_file_path):
- os.mkdir(title_file_path)
- # 否则,直接创建图片文件夹
- img_name = f'{file_title}_{serial}.{image_suffix}'
- img_file_path = os.path.join(title_file_path, img_name)
- if os.path.exists(img_file_path):
- query = f"UPDATE {self.target} SET download_state = %s WHERE id = %s"
- cursor.execute(query, (True, id))
- conn.commit()
- print(f'图片 {img_file_path} 已存在。继续!')
- return
- retry = 8
- while retry:
- try:
- resp = httpx.get(img_url, headers={
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
- })
- with open(img_file_path, 'wb') as f:
- f.write(resp.content)
- query = f"UPDATE {self.target} SET download_state = %s WHERE id = %s"
- cursor.execute(query, (True, id))
- conn.commit()
- rate = index / len(load_data) * 100
- print(f'已下载:{img_name}, 当前第 {index + 1} 个, 共 {len(load_data)} 个, 已下载 {rate:.2f}%')
- time.sleep(random.uniform(1, 2))
- break
- except Exception as e:
- print(f'下载图片失败:{img_name}。错误:{e} 重试: {retry}')
- retry -= 1
- time.sleep(random.uniform(3, 5))
- conn.close()
- def load_data(self):
- conn = psycopg2.connect(**self.psql_params)
- cursor = conn.cursor()
- query = f"SELECT * FROM {self.target} WHERE download_state = %s order by id asc"
- load_data_list = []
- try:
- cursor.execute(query, (False,))
- rows = cursor.fetchall()
- for row in rows:
- load_data_list.append(
- {
- 'id': row[0],
- 'name': row[1],
- 'target_site': row[2],
- 'file_title': row[3],
- 'set_name': row[4],
- 'serial': row[5],
- 'download_state': row[6],
- 'image_suffix': row[7],
- 'img_url': row[8],
- 'category': row[9]
- }
- )
- except psycopg2.Error as e:
- print(f"Database error: {e}")
- finally:
- cursor.close()
- conn.close()
- if load_data_list:
- return load_data_list
- else:
- print("没有需要下载的数据。")
- return None
- def check_local_downloads_dir(self):
- download_file_path = os.path.join(str(project_root), 'downloads')
- if not os.path.exists(download_file_path):
- os.mkdir(download_file_path)
- target_file_path = os.path.join(download_file_path, self.target)
- if not os.path.exists(target_file_path):
- os.mkdir(target_file_path)
- return target_file_path
- def get_psql_params(self):
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(('10.255.255.255', 1))
- IP = s.getsockname()[0]
- s.close()
- if '192.168.100' not in IP:
- return {
- "host": "home.erhe.link",
- "port": 55434,
- "user": "psql",
- "password": "psql",
- "dbname": "collect"
- }
- else:
- return {
- "host": "192.168.100.146",
- "port": 5434,
- "user": "psql",
- "password": "psql",
- "dbname": "collect"
- }
- if __name__ == '__main__':
- collector = ImageCollectorStep2()
- while True:
- load_data = collector.load_data()
- if not load_data:
- break
- target_file_path = collector.check_local_downloads_dir()
- collector.download_img(load_data, target_file_path)
- print('下载完成, 程序退出')
|