# -*- coding: utf-8 -*- # 获取目标网站图片合集的所有图片, 存数据库 import re import socket import sys import os import time import random import psycopg2 import httpx from playwright.sync_api import sync_playwright sys.path.append(os.path.join(os.path.abspath(__file__).split('ResourceCollection')[0] + 'ResourceCollection')) project_root = os.path.join(os.path.abspath(__file__).split('ResourceCollection')[0] + 'ResourceCollection') class ImageCollectorStep1: def __init__(self, ): self.target = 'flaticon' # 整体目标文件夹/psql表名 self.category = '' # 细分分类文件夹 self.local_proxy = 0 self.thread_count = 8 self.title_selector = '#pack-view__inner > section.pack-view__header > h1' self.img_selector = '#pack-view__inner > section.search-result > ul > li:nth-child({}) > div > a > img' self.img_count_selector = '#pack-view__inner > section.pack-view__header > p' self.not_find_page_selector = '#viewport > div.errorpage.e404 > h1' self.psql_params = self.get_psql_params() def get_img_set_urls(self, target_urls): link_count = 1 for target_url in target_urls: print(f'\n开始获取 {target_url} 数据, 当前链接是第 {link_count} 个, 共 {len(target_urls)} 个链接') link_count += 1 pages = '/{}' urls = [] title = '' total_page_count = 0 img_count = 0 try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'} img_sequence_num = 1 for page_count in range(1, 999): goto_url = target_url + pages.format(page_count) if self.local_proxy: proxies = { "http://": "http://127.0.0.1:7890", "https://": "http://127.0.0.1:7890", } with httpx.Client(proxies=proxies, headers=headers) as client: resp = client.get(goto_url, timeout=10) else: with httpx.Client(headers=headers) as client: resp = client.get(goto_url, timeout=10) resp.encoding = 'utf-8' page = resp.text if page_count == 1: # 在第一页, 获取 title title = re.findall(r'([\S\s]*?)', page) if title: title = title[0] invalid_chars = ['\n', '<', '>', ':', '"', '/', '\\', '|', '?', '*', '.', ' ', 'Icon Pack ', 'span', 'class=title-style'] for char in invalid_chars: title = title.replace(char, '') title = title.replace(' ', ' ') title = title.strip() else: print('title 获取失败') continue # 获取总共多少图片 img_count = re.findall( '

(.*?) icons

', page) if img_count: img_count = int(img_count[0]) else: # 不是第一页, 获取到没有下一页的情况, 就跳出 errorpage = re.findall('Oopsies... Seems like you got lost! - Flaticon', page) if errorpage: break re_urls = re.findall('"contentUrl":"(.*?)"', page) for url in re_urls: src = url.replace('/128/', '/512/') suffix = src.split('.')[-1] sequence = str(img_sequence_num).zfill(3) urls.append({ 'url': src, 'file_title': title, 'serial': sequence, 'category': self.category, 'img': f'{title}_{sequence}', 'suffix': suffix }) img_sequence_num += 1 time.sleep(random.uniform(1, 2)) except Exception as e: print(f'{target_url} 链接获取失败: {str(e)}') self.save_error_urls(target_url) continue # 获取到数据之后, 存数据库 self.save_data({title: urls}) print(f'{title} 共有 {len(urls)} 张图片, 已保存') def open_browser(self, target_urls): link_count = 1 for target_url in target_urls: print(f'\n开始获取 {target_url} 数据, 当前链接是第 {link_count} 个, 共 {len(target_urls)} 个链接') link_count += 1 pages = '/{}' urls = [] title = '' total_page_count = 0 with sync_playwright() as playwright: if self.local_proxy: browser = playwright.chromium.launch( headless=True, proxy={"server": "http://127.0.0.1:7890"} ) else: browser = playwright.chromium.launch(headless=True) context = browser.new_context(viewport={'width': 1280, 'height': 700}) page = context.new_page() img_sequence_num = 1 for page_count in range(1, 999): try: goto_url = target_url + pages.format(page_count) page.goto(goto_url, timeout=8000) except Exception as e: pass if page_count == 1: page.wait_for_selector(self.title_selector, state="attached", timeout=10000) title = page.query_selector(self.title_selector).inner_text() img_count = page.query_selector(self.img_count_selector).inner_text() img_count = int(img_count.split(' ')[0]) invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*', '.', ' ', 'Icon Pack '] for char in invalid_chars: title = title.replace(char, '') else: try: page.wait_for_selector(self.not_find_page_selector, state="attached", timeout=2000) total_page_count = page_count - 1 break except: pass for i in range(1, img_count + 1): elements = page.query_selector_all(self.img_selector.format(i)) for element in elements: src = element.get_attribute('src') if src: src = src.replace('/128/', '/512/') suffix = src.split('.')[-1] sequence = str(img_sequence_num).zfill(3) urls.append({ 'url': src, 'file_title': title, 'serial': sequence, 'category': self.category, 'img': f'{title}_{sequence}', 'suffix': suffix }) img_sequence_num += 1 break print(f'所有图片URL已获取。总页数: 共 {total_page_count} 页, 总共图片 {len(urls)}, 正在写入数据库...') page.close() browser.close() self.save_data({title: urls}) print(f'{title} 已保存') def save_data(self, data_item): conn = psycopg2.connect(**self.psql_params) cursor = conn.cursor() for k, v in data_item.items(): for data in v: cursor.execute(f"SELECT img_url FROM {self.target} WHERE img_url = %s", (data['url'],)) if cursor.fetchone() is None: cursor.execute(f""" INSERT INTO {self.target} (name, target_site, file_title, set_name, serial, download_state, image_suffix, img_url, category) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( None, self.target, data['file_title'], None, data['serial'], False, data['suffix'], data['url'], None, )) conn.commit() cursor.close() conn.close() def check_psql(self): try: conn = psycopg2.connect(**self.psql_params) except Exception as e: print(f"无法连接到数据库:{e}") exit(1) cur = conn.cursor() cur.execute("SELECT EXISTS(SELECT FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = %s)", (self.target,)) exist = cur.fetchone()[0] if not exist: cur.execute(f""" CREATE TABLE {self.target} ( id SERIAL PRIMARY KEY, name VARCHAR(255), target_site VARCHAR(255), file_title VARCHAR(255), set_name VARCHAR(255), serial INT, download_state BOOLEAN, image_suffix VARCHAR(50), img_url TEXT, category VARCHAR(255) ); """) print(f"表 '{self.target}' 创建成功。") conn.commit() cur.close() conn.close() def save_error_urls(self, error_url): error_txt_path = os.path.join(str(project_root), str(self.target), 'url_file_2error_url.txt') if not os.path.exists(error_txt_path): open(error_txt_path, 'w').close() with open(error_txt_path, 'r') as f: existing_urls = f.read().splitlines() if error_url in existing_urls: return with open(error_txt_path, 'a') as f: f.write(error_url + '\n') def check_target_url_txt(self): txt_file_name = 'url_file_1_target_link.txt' if not os.path.exists(txt_file_name): with open(txt_file_name, 'w') as file: file.write('') print('需要在 url_file_1_target_link.txt 中填写目标链接') exit(0) else: with open('url_file_1_target_link.txt', 'r') as f: targets = [target.strip() for target in f.readlines()] if not targets: print('在 url_file_1_target_link.txt 中未找到目标链接') exit(0) return targets def get_psql_params(self): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('10.255.255.255', 1)) IP = s.getsockname()[0] s.close() if '192.168.100' not in IP: return { "host": "home.erhe.link", "port": 55434, "user": "psql", "password": "psql", "dbname": "collect" } else: return { "host": "192.168.100.146", "port": 5434, "user": "psql", "password": "psql", "dbname": "collect" } if __name__ == '__main__': collector = ImageCollectorStep1() collector.check_psql() targets = collector.check_target_url_txt() # 下面二选一 # collector.open_browser(targets) # 用浏览器 collector.get_img_set_urls(targets) # 用 httpx