| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- # -*- coding: utf-8 -*-
- # 获取目标网站图片合集的所有图片, 存数据库
- import re
- import socket
- import sys
- import os
- import time
- import random
- import psycopg2
- import httpx
- from playwright.sync_api import sync_playwright
- sys.path.append(os.path.join(os.path.abspath(__file__).split('ResourceCollection')[0] + 'ResourceCollection'))
- project_root = os.path.join(os.path.abspath(__file__).split('ResourceCollection')[0] + 'ResourceCollection')
- class ImageCollectorStep1:
- def __init__(self, ):
- self.target = 'flaticon' # 整体目标文件夹/psql表名
- self.category = '' # 细分分类文件夹
- self.local_proxy = 0
- self.thread_count = 8
- self.title_selector = '#pack-view__inner > section.pack-view__header > h1'
- self.img_selector = '#pack-view__inner > section.search-result > ul > li:nth-child({}) > div > a > img'
- self.img_count_selector = '#pack-view__inner > section.pack-view__header > p'
- self.not_find_page_selector = '#viewport > div.errorpage.e404 > h1'
- self.psql_params = self.get_psql_params()
- def get_img_set_urls(self, target_urls):
- link_count = 1
- for target_url in target_urls:
- print(f'\n开始获取 {target_url} 数据, 当前链接是第 {link_count} 个, 共 {len(target_urls)} 个链接')
- link_count += 1
- pages = '/{}'
- urls = []
- title = ''
- total_page_count = 0
- img_count = 0
- try:
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'}
- img_sequence_num = 1
- for page_count in range(1, 999):
- goto_url = target_url + pages.format(page_count)
- if self.local_proxy:
- proxies = {
- "http://": "http://127.0.0.1:7890",
- "https://": "http://127.0.0.1:7890",
- }
- with httpx.Client(proxies=proxies, headers=headers) as client:
- resp = client.get(goto_url, timeout=10)
- else:
- with httpx.Client(headers=headers) as client:
- resp = client.get(goto_url, timeout=10)
- resp.encoding = 'utf-8'
- page = resp.text
- if page_count == 1:
- # 在第一页, 获取 title
- title = re.findall(r'<span class="title">([\S\s]*?)</h1>', page)
- if title:
- title = title[0]
- invalid_chars = ['\n', '<', '>', ':', '"', '/', '\\', '|', '?', '*', '.', ' ',
- 'Icon Pack ',
- 'span', 'class=title-style']
- for char in invalid_chars:
- title = title.replace(char, '')
- title = title.replace(' ', ' ')
- title = title.strip()
- else:
- print('title 获取失败')
- continue
- # 获取总共多少图片
- img_count = re.findall(
- '<p class="pack-view__header--icons new--badge">(.*?) <span class="uppercase">icons</span></p>',
- page)
- if img_count:
- img_count = int(img_count[0])
- else:
- # 不是第一页, 获取到没有下一页的情况, 就跳出
- errorpage = re.findall('<title>Oopsies... Seems like you got lost! - Flaticon</title>', page)
- if errorpage:
- break
- re_urls = re.findall('"contentUrl":"(.*?)"', page)
- for url in re_urls:
- src = url.replace('/128/', '/512/')
- suffix = src.split('.')[-1]
- sequence = str(img_sequence_num).zfill(3)
- urls.append({
- 'url': src,
- 'file_title': title,
- 'serial': sequence,
- 'category': self.category,
- 'img': f'{title}_{sequence}',
- 'suffix': suffix
- })
- img_sequence_num += 1
- time.sleep(random.uniform(1, 2))
- except Exception as e:
- print(f'{target_url} 链接获取失败: {str(e)}')
- self.save_error_urls(target_url)
- continue
- # 获取到数据之后, 存数据库
- self.save_data({title: urls})
- print(f'{title} 共有 {len(urls)} 张图片, 已保存')
- def open_browser(self, target_urls):
- link_count = 1
- for target_url in target_urls:
- print(f'\n开始获取 {target_url} 数据, 当前链接是第 {link_count} 个, 共 {len(target_urls)} 个链接')
- link_count += 1
- pages = '/{}'
- urls = []
- title = ''
- total_page_count = 0
- with sync_playwright() as playwright:
- if self.local_proxy:
- browser = playwright.chromium.launch(
- headless=True,
- proxy={"server": "http://127.0.0.1:7890"}
- )
- else:
- browser = playwright.chromium.launch(headless=True)
- context = browser.new_context(viewport={'width': 1280, 'height': 700})
- page = context.new_page()
- img_sequence_num = 1
- for page_count in range(1, 999):
- try:
- goto_url = target_url + pages.format(page_count)
- page.goto(goto_url, timeout=8000)
- except Exception as e:
- pass
- if page_count == 1:
- page.wait_for_selector(self.title_selector, state="attached", timeout=10000)
- title = page.query_selector(self.title_selector).inner_text()
- img_count = page.query_selector(self.img_count_selector).inner_text()
- img_count = int(img_count.split(' ')[0])
- invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*', '.', ' ', 'Icon Pack ']
- for char in invalid_chars:
- title = title.replace(char, '')
- else:
- try:
- page.wait_for_selector(self.not_find_page_selector, state="attached", timeout=2000)
- total_page_count = page_count - 1
- break
- except:
- pass
- for i in range(1, img_count + 1):
- elements = page.query_selector_all(self.img_selector.format(i))
- for element in elements:
- src = element.get_attribute('src')
- if src:
- src = src.replace('/128/', '/512/')
- suffix = src.split('.')[-1]
- sequence = str(img_sequence_num).zfill(3)
- urls.append({
- 'url': src,
- 'file_title': title,
- 'serial': sequence,
- 'category': self.category,
- 'img': f'{title}_{sequence}',
- 'suffix': suffix
- })
- img_sequence_num += 1
- break
- print(f'所有图片URL已获取。总页数: 共 {total_page_count} 页, 总共图片 {len(urls)}, 正在写入数据库...')
- page.close()
- browser.close()
- self.save_data({title: urls})
- print(f'{title} 已保存')
- def save_data(self, data_item):
- conn = psycopg2.connect(**self.psql_params)
- cursor = conn.cursor()
- for k, v in data_item.items():
- for data in v:
- cursor.execute(f"SELECT img_url FROM {self.target} WHERE img_url = %s", (data['url'],))
- if cursor.fetchone() is None:
- cursor.execute(f"""
- INSERT INTO {self.target} (name, target_site, file_title, set_name, serial, download_state, image_suffix, img_url, category)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
- """, (
- None,
- self.target,
- data['file_title'],
- None,
- data['serial'],
- False,
- data['suffix'],
- data['url'],
- None,
- ))
- conn.commit()
- cursor.close()
- conn.close()
- def check_psql(self):
- try:
- conn = psycopg2.connect(**self.psql_params)
- except Exception as e:
- print(f"无法连接到数据库:{e}")
- exit(1)
- cur = conn.cursor()
- cur.execute("SELECT EXISTS(SELECT FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = %s)",
- (self.target,))
- exist = cur.fetchone()[0]
- if not exist:
- cur.execute(f"""
- CREATE TABLE {self.target} (
- id SERIAL PRIMARY KEY,
- name VARCHAR(255),
- target_site VARCHAR(255),
- file_title VARCHAR(255),
- set_name VARCHAR(255),
- serial INT,
- download_state BOOLEAN,
- image_suffix VARCHAR(50),
- img_url TEXT,
- category VARCHAR(255)
- );
- """)
- print(f"表 '{self.target}' 创建成功。")
- conn.commit()
- cur.close()
- conn.close()
- def save_error_urls(self, error_url):
- error_txt_path = os.path.join(str(project_root), str(self.target), 'url_file_2error_url.txt')
- if not os.path.exists(error_txt_path):
- open(error_txt_path, 'w').close()
- with open(error_txt_path, 'r') as f:
- existing_urls = f.read().splitlines()
- if error_url in existing_urls:
- return
- with open(error_txt_path, 'a') as f:
- f.write(error_url + '\n')
- def check_target_url_txt(self):
- txt_file_name = 'url_file_1_target_link.txt'
- if not os.path.exists(txt_file_name):
- with open(txt_file_name, 'w') as file:
- file.write('')
- print('需要在 url_file_1_target_link.txt 中填写目标链接')
- exit(0)
- else:
- with open('url_file_1_target_link.txt', 'r') as f:
- targets = [target.strip() for target in f.readlines()]
- if not targets:
- print('在 url_file_1_target_link.txt 中未找到目标链接')
- exit(0)
- return targets
- def get_psql_params(self):
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(('10.255.255.255', 1))
- IP = s.getsockname()[0]
- s.close()
- if '192.168.100' not in IP:
- return {
- "host": "home.erhe.link",
- "port": 55434,
- "user": "psql",
- "password": "psql",
- "dbname": "collect"
- }
- else:
- return {
- "host": "192.168.100.146",
- "port": 5434,
- "user": "psql",
- "password": "psql",
- "dbname": "collect"
- }
- if __name__ == '__main__':
- collector = ImageCollectorStep1()
- collector.check_psql()
- targets = collector.check_target_url_txt()
- # 下面二选一
- # collector.open_browser(targets) # 用浏览器
- collector.get_img_set_urls(targets) # 用 httpx
|