# -*- coding: utf-8 -*- # step2 读取数据库, 下载未下载的图片 import re import socket import sys import os import time import random from concurrent.futures import ThreadPoolExecutor import psycopg2 import httpx sys.path.append(os.path.join(os.path.abspath(__file__).split('ResourceCollection')[0] + 'ResourceCollection')) project_root = os.path.join(os.path.abspath(__file__).split('ResourceCollection')[0] + 'ResourceCollection') class ImageCollectorStep2: def __init__(self): self.target = 'flaticon' # 整体目标文件夹/psql表名 self.category = '' # 细分分类文件夹 self.local_proxy = 0 self.thread_count = 8 self.title_selector = '#pack-view__inner > section.pack-view__header > h1' self.img_selector = '#pack-view__inner > section.search-result > ul > li:nth-child({}) > div > a > img' self.img_count_selector = '#pack-view__inner > section.pack-view__header > p' self.not_find_page_selector = '#viewport > div.errorpage.e404 > h1' self.psql_params = self.get_psql_params() def download_img(self, load_data, target_file_path): print('正在下载图片') with ThreadPoolExecutor(max_workers=self.thread_count) as executor: executor.map(self.single_img_download, [(index, data, load_data, target_file_path) for index, data in enumerate(load_data)]) def single_img_download(self, args): index, data, load_data, target_file_path = args conn = psycopg2.connect(**self.psql_params) cursor = conn.cursor() id = data['id'] name = data['name'] target_site = data['target_site'] file_title = data['file_title'].replace(' ', '_') set_name = data['set_name'] serial = str(data['serial']).zfill(3) image_suffix = data['image_suffix'] img_url = data['img_url'] if self.category: category_path = os.path.join(target_file_path, self.category) if not os.path.exists(category_path): os.mkdir(category_path) title_file_path = os.path.join(category_path, file_title) if not os.path.exists(title_file_path): os.mkdir(title_file_path) # 否则,直接创建图片文件夹 img_name = f'{file_title}_{serial}.{image_suffix}' img_file_path = os.path.join(title_file_path, img_name) else: title_file_path = os.path.join(target_file_path, file_title) if not os.path.exists(title_file_path): os.mkdir(title_file_path) # 否则,直接创建图片文件夹 img_name = f'{file_title}_{serial}.{image_suffix}' img_file_path = os.path.join(title_file_path, img_name) if os.path.exists(img_file_path): query = f"UPDATE {self.target} SET download_state = %s WHERE id = %s" cursor.execute(query, (True, id)) conn.commit() print(f'图片 {img_file_path} 已存在。继续!') return retry = 8 while retry: try: resp = httpx.get(img_url, headers={ "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36" }) with open(img_file_path, 'wb') as f: f.write(resp.content) query = f"UPDATE {self.target} SET download_state = %s WHERE id = %s" cursor.execute(query, (True, id)) conn.commit() rate = index / len(load_data) * 100 print(f'已下载:{img_name}, 当前第 {index + 1} 个, 共 {len(load_data)} 个, 已下载 {rate:.2f}%') time.sleep(random.uniform(1, 2)) break except Exception as e: print(f'下载图片失败:{img_name}。错误:{e} 重试: {retry}') retry -= 1 time.sleep(random.uniform(3, 5)) conn.close() def load_data(self): conn = psycopg2.connect(**self.psql_params) cursor = conn.cursor() query = f"SELECT * FROM {self.target} WHERE download_state = %s order by id asc" load_data_list = [] try: cursor.execute(query, (False,)) rows = cursor.fetchall() for row in rows: load_data_list.append( { 'id': row[0], 'name': row[1], 'target_site': row[2], 'file_title': row[3], 'set_name': row[4], 'serial': row[5], 'download_state': row[6], 'image_suffix': row[7], 'img_url': row[8], 'category': row[9] } ) except psycopg2.Error as e: print(f"Database error: {e}") finally: cursor.close() conn.close() if load_data_list: return load_data_list else: print("没有需要下载的数据。") return None def check_local_downloads_dir(self): download_file_path = os.path.join(str(project_root), 'downloads') if not os.path.exists(download_file_path): os.mkdir(download_file_path) target_file_path = os.path.join(download_file_path, self.target) if not os.path.exists(target_file_path): os.mkdir(target_file_path) return target_file_path def get_psql_params(self): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('10.255.255.255', 1)) IP = s.getsockname()[0] s.close() if '192.168.100' not in IP: return { "host": "home.erhe.link", "port": 55434, "user": "psql", "password": "psql", "dbname": "collect" } else: return { "host": "192.168.100.146", "port": 5434, "user": "psql", "password": "psql", "dbname": "collect" } if __name__ == '__main__': collector = ImageCollectorStep2() while True: load_data = collector.load_data() if not load_data: break target_file_path = collector.check_local_downloads_dir() collector.download_img(load_data, target_file_path) print('下载完成, 程序退出')