# -*- coding: utf-8 -*- # 共两个步骤, 1, 将目标图片的信息拉到数据库(标题, 所有img的url), 2, 从数据库中读取对应目标站点的所有未下载过的img的url, 下载到本地 # 需要安装psql, 并且 CREATE DATABASE collect; 运行会自动建表 import re import socket import sys import os import time import random from concurrent.futures import ThreadPoolExecutor import psycopg2 sys.path.append(os.path.join(os.path.abspath(__file__).split('ResourceCollection')[0] + 'ResourceCollection')) import httpx from playwright.sync_api import sync_playwright class ImageCollector: def __init__(self): self.target = 'flaticon' # 整体目标文件夹/psql表名 self.category = '' # 细分分类文件夹 self.step = 2 # 1 = 获取img链接, 2 = 下载图片, 3 = 1 + 2, 4 = 调试 self.local_proxy = 0 self.thread_count = 8 self.title_selector = '#pack-view__inner > section.pack-view__header > h1' self.img_selector = '#pack-view__inner > section.search-result > ul > li:nth-child({}) > div > a > img' self.img_count_selector = '#pack-view__inner > section.pack-view__header > p' self.not_find_page_selector = '#viewport > div.errorpage.e404 > h1' self.project_root = os.path.join( os.path.abspath(__file__).split('ResourceCollection')[0] + 'ResourceCollection') self.psql_params = self.get_psql_params() def get_img_set_urls(self, target_urls): link_count = 1 for target_url in target_urls: print(f'\n开始获取 {target_url} 数据, 当前链接是第 {link_count} 个, 共 {len(target_urls)} 个链接') link_count += 1 pages = '/{}' urls = [] title = '' total_page_count = 0 img_count = 0 try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'} img_sequence_num = 1 for page_count in range(1, 999): goto_url = target_url + pages.format(page_count) if self.local_proxy: proxies = { "http://": "http://127.0.0.1:7890", "https://": "http://127.0.0.1:7890", } with httpx.Client(proxies=proxies, headers=headers) as client: resp = client.get(goto_url, timeout=10) else: with httpx.Client(headers=headers) as client: resp = client.get(goto_url, timeout=10) resp.encoding = 'utf-8' page = resp.text if page_count == 1: # 在第一页, 获取 title title = re.findall(r'([\S\s]*?)', page) if title: title = title[0] invalid_chars = ['\n', '<', '>', ':', '"', '/', '\\', '|', '?', '*', '.', ' ', 'Icon Pack ', 'span', 'class=title-style'] for char in invalid_chars: title = title.replace(char, '') title = title.replace(' ', ' ') else: print('title 获取失败') continue # 获取总共多少图片 img_count = re.findall( '

(.*?) icons

', page) if img_count: img_count = int(img_count[0]) else: # 不是第一页, 获取到没有下一页的情况, 就跳出 errorpage = re.findall('Oopsies... Seems like you got lost! - Flaticon', page) if errorpage: break re_urls = re.findall('"contentUrl":"(.*?)"', page) for url in re_urls: src = url.replace('/128/', '/512/') suffix = src.split('.')[-1] sequence = str(img_sequence_num).zfill(3) urls.append({ 'url': src, 'file_title': title, 'serial': sequence, 'category': self.category, 'img': f'{title}_{sequence}', 'suffix': suffix }) img_sequence_num += 1 time.sleep(random.uniform(1, 2)) except Exception as e: print(f'{target_url} 链接获取失败: {str(e)}') self.save_error_urls(target_url) continue # 获取到数据之后, 存数据库 self.save_data({title: urls}) print(f'{title} 共有 {len(urls)} 张图片, 已保存') def open_browser(self, target_urls): link_count = 1 for target_url in target_urls: print(f'\n开始获取 {target_url} 数据, 当前链接是第 {link_count} 个, 共 {len(target_urls)} 个链接') link_count += 1 pages = '/{}' urls = [] title = '' total_page_count = 0 with sync_playwright() as playwright: if self.local_proxy: browser = playwright.chromium.launch( headless=True, proxy={"server": "http://127.0.0.1:7890"} ) else: browser = playwright.chromium.launch(headless=True) context = browser.new_context(viewport={'width': 1280, 'height': 700}) page = context.new_page() img_sequence_num = 1 for page_count in range(1, 999): try: goto_url = target_url + pages.format(page_count) page.goto(goto_url, timeout=8000) except Exception as e: pass if page_count == 1: page.wait_for_selector(self.title_selector, state="attached", timeout=10000) title = page.query_selector(self.title_selector).inner_text() img_count = page.query_selector(self.img_count_selector).inner_text() img_count = int(img_count.split(' ')[0]) invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*', '.', ' ', 'Icon Pack '] for char in invalid_chars: title = title.replace(char, '') else: try: page.wait_for_selector(self.not_find_page_selector, state="attached", timeout=2000) total_page_count = page_count - 1 break except: pass for i in range(1, img_count + 1): elements = page.query_selector_all(self.img_selector.format(i)) for element in elements: src = element.get_attribute('src') if src: src = src.replace('/128/', '/512/') suffix = src.split('.')[-1] sequence = str(img_sequence_num).zfill(3) urls.append({ 'url': src, 'file_title': title, 'serial': sequence, 'category': self.category, 'img': f'{title}_{sequence}', 'suffix': suffix }) img_sequence_num += 1 break print(f'所有图片URL已获取。总页数: 共 {total_page_count} 页, 总共图片 {len(urls)}, 正在写入数据库...') page.close() browser.close() self.save_data({title: urls}) print(f'{title} 已保存') def download_img(self, load_data, target_file_path): print('正在下载图片') with ThreadPoolExecutor(max_workers=self.thread_count) as executor: executor.map(self.single_img_download, [(index, data, load_data, target_file_path) for index, data in enumerate(load_data)]) def single_img_download(self, args): index, data, load_data, target_file_path = args conn = psycopg2.connect(**self.psql_params) cursor = conn.cursor() id = data['id'] name = data['name'] target_site = data['target_site'] file_title = data['file_title'].replace(' ', '_') set_name = data['set_name'] serial = str(data['serial']).zfill(3) image_suffix = data['image_suffix'] img_url = data['img_url'] if self.category: category_path = os.path.join(target_file_path, self.category) if not os.path.exists(category_path): os.mkdir(category_path) title_file_path = os.path.join(category_path, file_title) if not os.path.exists(title_file_path): os.mkdir(title_file_path) # 否则,直接创建图片文件夹 img_name = f'{file_title}_{serial}.{image_suffix}' img_file_path = os.path.join(title_file_path, img_name) else: title_file_path = os.path.join(target_file_path, file_title) if not os.path.exists(title_file_path): os.mkdir(title_file_path) # 否则,直接创建图片文件夹 img_name = f'{file_title}_{serial}.{image_suffix}' img_file_path = os.path.join(title_file_path, img_name) if os.path.exists(img_file_path): query = f"UPDATE {self.target} SET download_state = %s WHERE id = %s" cursor.execute(query, (True, id)) conn.commit() print(f'图片 {img_file_path} 已存在。继续!') return retry = 8 while retry: try: resp = httpx.get(img_url, headers={ "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36" }) with open(img_file_path, 'wb') as f: f.write(resp.content) query = f"UPDATE {self.target} SET download_state = %s WHERE id = %s" cursor.execute(query, (True, id)) conn.commit() rate = index / len(load_data) * 100 print(f'已下载:{img_name}, 当前第 {index + 1} 个, 共 {len(load_data)} 个, 已下载 {rate:.2f}%') time.sleep(random.uniform(1, 2)) break except Exception as e: print(f'下载图片失败:{img_name}。错误:{e} 重试: {retry}') retry -= 1 time.sleep(random.uniform(3, 5)) conn.close() def save_data(self, data_item): conn = psycopg2.connect(**self.psql_params) cursor = conn.cursor() for k, v in data_item.items(): for data in v: cursor.execute(f"SELECT img_url FROM {self.target} WHERE img_url = %s", (data['url'],)) if cursor.fetchone() is None: cursor.execute(f""" INSERT INTO {self.target} (name, target_site, file_title, set_name, serial, download_state, image_suffix, img_url, category) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( None, self.target, data['file_title'], None, data['serial'], False, data['suffix'], data['url'], None, )) conn.commit() cursor.close() conn.close() def load_data(self): conn = psycopg2.connect(**self.psql_params) cursor = conn.cursor() query = f"SELECT * FROM {self.target} WHERE download_state = %s order by id asc" load_data_list = [] try: cursor.execute(query, (False,)) rows = cursor.fetchall() for row in rows: load_data_list.append( { 'id': row[0], 'name': row[1], 'target_site': row[2], 'file_title': row[3], 'set_name': row[4], 'serial': row[5], 'download_state': row[6], 'image_suffix': row[7], 'img_url': row[8], 'category': row[9] } ) except psycopg2.Error as e: print(f"Database error: {e}") finally: cursor.close() conn.close() if load_data_list: return load_data_list else: print("没有需要下载的数据。") exit(0) def check_psql(self): try: conn = psycopg2.connect(**self.psql_params) except Exception as e: print(f"无法连接到数据库:{e}") exit(1) cur = conn.cursor() cur.execute("SELECT EXISTS(SELECT FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = %s)", (self.target,)) exist = cur.fetchone()[0] if not exist: cur.execute(f""" CREATE TABLE {self.target} ( id SERIAL PRIMARY KEY, name VARCHAR(255), target_site VARCHAR(255), file_title VARCHAR(255), set_name VARCHAR(255), serial INT, download_state BOOLEAN, image_suffix VARCHAR(50), img_url TEXT, category VARCHAR(255) ); """) print(f"表 '{self.target}' 创建成功。") conn.commit() cur.close() conn.close() def check_local_downloads_dir(self): download_file_path = os.path.join(str(self.project_root), 'downloads') if not os.path.exists(download_file_path): os.mkdir(download_file_path) target_file_path = os.path.join(download_file_path, self.target) if not os.path.exists(target_file_path): os.mkdir(target_file_path) return target_file_path def check_target_url_txt(self): txt_file_name = 'target_link.txt' if not os.path.exists(txt_file_name): with open(txt_file_name, 'w') as file: file.write('') print('需要在 target_link.txt 中填写目标链接') exit(0) else: with open('target_link.txt', 'r') as f: targets = [target.strip() for target in f.readlines()] if not targets: print('在 target_link.txt 中未找到目标链接') exit(0) return targets def get_psql_params(self): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('10.255.255.255', 1)) IP = s.getsockname()[0] s.close() if '192.168.100' not in IP: return { "host": "home.erhe.link", "port": 55434, "user": "psql", "password": "psql", "dbname": "collect" } else: return { "host": "192.168.100.146", "port": 5434, "user": "psql", "password": "psql", "dbname": "collect" } def save_error_urls(self, error_url): error_txt_path = os.path.join(str(self.project_root), str(self.target), 'error_url.txt') if not os.path.exists(error_txt_path): open(error_txt_path, 'w').close() with open(error_txt_path, 'r') as f: existing_urls = f.read().splitlines() if error_url in existing_urls: return with open(error_txt_path, 'a') as f: f.write(error_url + '\n') if __name__ == '__main__': collector = ImageCollector() collector.check_psql() if collector.step == 1: targets = collector.check_target_url_txt() # 下面二选一 # collector.open_browser(targets) # 用浏览器 collector.get_img_set_urls(targets) # 用 httpx elif collector.step == 2: load_data = collector.load_data() target_file_path = collector.check_local_downloads_dir() collector.download_img(load_data, target_file_path) print('下载完成, 程序退出') elif collector.step == 3: targets = collector.check_target_url_txt() # 下面二选一 # collector.open_browser(targets) # 用浏览器 collector.get_img_set_urls(targets) # 用 httpx load_data = collector.load_data() target_file_path = collector.check_local_downloads_dir() collector.download_img(load_data, target_file_path) print('下载完成, 程序退出') elif collector.step == 4: pass else: pass