| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449 |
- # -*- coding: utf-8 -*-
- # 共两个步骤, 1, 将目标图片的信息拉到数据库(标题, 所有img的url), 2, 从数据库中读取对应目标站点的所有未下载过的img的url, 下载到本地
- # 需要安装psql, 并且 CREATE DATABASE collect; 运行会自动建表
- import re
- import socket
- import sys
- import os
- import time
- import random
- from concurrent.futures import ThreadPoolExecutor
- import psycopg2
- sys.path.append(os.path.join(os.path.abspath(__file__).split('ResourceCollection')[0] + 'ResourceCollection'))
- import httpx
- from playwright.sync_api import sync_playwright
- class ImageCollector:
- def __init__(self):
- self.target = 'flaticon' # 整体目标文件夹/psql表名
- self.category = '' # 细分分类文件夹
- self.step = 2 # 1 = 获取img链接, 2 = 下载图片, 3 = 1 + 2, 4 = 调试
- self.local_proxy = 0
- self.thread_count = 1
- self.title_selector = '#pack-view__inner > section.pack-view__header > h1'
- self.img_selector = '#pack-view__inner > section.search-result > ul > li:nth-child({}) > div > a > img'
- self.img_count_selector = '#pack-view__inner > section.pack-view__header > p'
- self.not_find_page_selector = '#viewport > div.errorpage.e404 > h1'
- self.project_root = os.path.join(
- os.path.abspath(__file__).split('ResourceCollection')[0] + 'ResourceCollection')
- self.psql_params = self.get_psql_params()
- def get_img_set_urls(self, target_urls):
- link_count = 1
- for target_url in target_urls:
- print(f'\n开始获取 {target_url} 数据, 当前链接是第 {link_count} 个, 共 {len(target_urls)} 个链接')
- link_count += 1
- pages = '/{}'
- urls = []
- title = ''
- total_page_count = 0
- img_count = 0
- try:
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'}
- img_sequence_num = 1
- for page_count in range(1, 999):
- goto_url = target_url + pages.format(page_count)
- if self.local_proxy:
- proxies = {
- "http://": "http://127.0.0.1:7890",
- "https://": "http://127.0.0.1:7890",
- }
- with httpx.Client(proxies=proxies, headers=headers) as client:
- resp = client.get(goto_url, timeout=10)
- else:
- with httpx.Client(headers=headers) as client:
- resp = client.get(goto_url, timeout=10)
- resp.encoding = 'utf-8'
- page = resp.text
- if page_count == 1:
- # 在第一页, 获取 title
- title = re.findall('<span class="title">([\S\s]*?)</h1>', page)
- if title:
- title = title[0]
- invalid_chars = ['\n', '<', '>', ':', '"', '/', '\\', '|', '?', '*', '.', ' ',
- 'Icon Pack ',
- 'span', 'class=title-style']
- for char in invalid_chars:
- title = title.replace(char, '')
- title = title.replace(' ', ' ')
- else:
- print('title 获取失败')
- continue
- # 获取总共多少图片
- img_count = re.findall(
- '<p class="pack-view__header--icons new--badge">(.*?) <span class="uppercase">icons</span></p>',
- page)
- if img_count:
- img_count = int(img_count[0])
- else:
- # 不是第一页, 获取到没有下一页的情况, 就跳出
- errorpage = re.findall('<title>Oopsies... Seems like you got lost! - Flaticon</title>', page)
- if errorpage:
- break
- re_urls = re.findall('"contentUrl":"(.*?)"', page)
- for url in re_urls:
- src = url.replace('/128/', '/512/')
- suffix = src.split('.')[-1]
- sequence = str(img_sequence_num).zfill(3)
- urls.append({
- 'url': src,
- 'file_title': title,
- 'serial': sequence,
- 'category': self.category,
- 'img': f'{title}_{sequence}',
- 'suffix': suffix
- })
- img_sequence_num += 1
- time.sleep(random.uniform(1, 2))
- except Exception as e:
- print(f'{target_url} 链接获取失败: {str(e)}')
- self.save_error_urls(target_url)
- continue
- # 获取到数据之后, 存数据库
- self.save_data({title: urls})
- print(f'{title} 共有 {len(urls)} 张图片, 已保存')
- def open_browser(self, target_urls):
- link_count = 1
- for target_url in target_urls:
- print(f'\n开始获取 {target_url} 数据, 当前链接是第 {link_count} 个, 共 {len(target_urls)} 个链接')
- link_count += 1
- pages = '/{}'
- urls = []
- title = ''
- total_page_count = 0
- with sync_playwright() as playwright:
- if self.local_proxy:
- browser = playwright.chromium.launch(
- headless=True,
- proxy={"server": "http://127.0.0.1:7890"}
- )
- else:
- browser = playwright.chromium.launch(headless=True)
- context = browser.new_context(viewport={'width': 1280, 'height': 700})
- page = context.new_page()
- img_sequence_num = 1
- for page_count in range(1, 999):
- try:
- goto_url = target_url + pages.format(page_count)
- page.goto(goto_url, timeout=8000)
- except Exception as e:
- pass
- if page_count == 1:
- page.wait_for_selector(self.title_selector, state="attached", timeout=10000)
- title = page.query_selector(self.title_selector).inner_text()
- img_count = page.query_selector(self.img_count_selector).inner_text()
- img_count = int(img_count.split(' ')[0])
- invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*', '.', ' ', 'Icon Pack ']
- for char in invalid_chars:
- title = title.replace(char, '')
- else:
- try:
- page.wait_for_selector(self.not_find_page_selector, state="attached", timeout=2000)
- total_page_count = page_count - 1
- break
- except:
- pass
- for i in range(1, img_count + 1):
- elements = page.query_selector_all(self.img_selector.format(i))
- for element in elements:
- src = element.get_attribute('src')
- if src:
- src = src.replace('/128/', '/512/')
- suffix = src.split('.')[-1]
- sequence = str(img_sequence_num).zfill(3)
- urls.append({
- 'url': src,
- 'file_title': title,
- 'serial': sequence,
- 'category': self.category,
- 'img': f'{title}_{sequence}',
- 'suffix': suffix
- })
- img_sequence_num += 1
- break
- print(f'所有图片URL已获取。总页数: 共 {total_page_count} 页, 总共图片 {len(urls)}, 正在写入数据库...')
- page.close()
- browser.close()
- self.save_data({title: urls})
- print(f'{title} 已保存')
- def download_img(self, load_data, target_file_path):
- print('正在下载图片')
- with ThreadPoolExecutor(max_workers=self.thread_count) as executor:
- executor.map(self.single_img_download,
- [(index, data, load_data, target_file_path) for index, data in enumerate(load_data)])
- def single_img_download(self, args):
- index, data, load_data, target_file_path = args
- conn = psycopg2.connect(**self.psql_params)
- cursor = conn.cursor()
- id = data['id']
- name = data['name']
- target_site = data['target_site']
- file_title = data['file_title'].replace(' ', '_')
- set_name = data['set_name']
- serial = str(data['serial']).zfill(3)
- image_suffix = data['image_suffix']
- img_url = data['img_url']
- if self.category:
- category_path = os.path.join(target_file_path, self.category)
- if not os.path.exists(category_path):
- os.mkdir(category_path)
- title_file_path = os.path.join(category_path, file_title)
- if not os.path.exists(title_file_path):
- os.mkdir(title_file_path)
- # 否则,直接创建图片文件夹
- img_name = f'{file_title}_{serial}.{image_suffix}'
- img_file_path = os.path.join(title_file_path, img_name)
- else:
- title_file_path = os.path.join(target_file_path, file_title)
- if not os.path.exists(title_file_path):
- os.mkdir(title_file_path)
- # 否则,直接创建图片文件夹
- img_name = f'{file_title}_{serial}.{image_suffix}'
- img_file_path = os.path.join(title_file_path, img_name)
- if os.path.exists(img_file_path):
- query = f"UPDATE {self.target} SET download_state = %s WHERE id = %s"
- cursor.execute(query, (True, id))
- conn.commit()
- print(f'图片 {img_file_path} 已存在。继续!')
- return
- retry = 8
- while retry:
- try:
- resp = httpx.get(img_url, headers={
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
- })
- with open(img_file_path, 'wb') as f:
- f.write(resp.content)
- query = f"UPDATE {self.target} SET download_state = %s WHERE id = %s"
- cursor.execute(query, (True, id))
- conn.commit()
- rate = index / len(load_data) * 100
- print(f'已下载:{img_name}, 当前第 {index + 1} 个, 共 {len(load_data)} 个, 已下载 {rate:.2f}%')
- time.sleep(random.uniform(1, 2))
- break
- except Exception as e:
- print(f'下载图片失败:{img_name}。错误:{e} 重试: {retry}')
- retry -= 1
- time.sleep(random.uniform(3, 5))
- conn.close()
- def save_data(self, data_item):
- conn = psycopg2.connect(**self.psql_params)
- cursor = conn.cursor()
- for k, v in data_item.items():
- for data in v:
- cursor.execute(f"SELECT img_url FROM {self.target} WHERE img_url = %s", (data['url'],))
- if cursor.fetchone() is None:
- cursor.execute(f"""
- INSERT INTO {self.target} (name, target_site, file_title, set_name, serial, download_state, image_suffix, img_url, category)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
- """, (
- None,
- self.target,
- data['file_title'],
- None,
- data['serial'],
- False,
- data['suffix'],
- data['url'],
- None,
- ))
- conn.commit()
- cursor.close()
- conn.close()
- def load_data(self):
- conn = psycopg2.connect(**self.psql_params)
- cursor = conn.cursor()
- query = f"SELECT * FROM {self.target} WHERE download_state = %s order by id asc"
- load_data_list = []
- try:
- cursor.execute(query, (False,))
- rows = cursor.fetchall()
- for row in rows:
- load_data_list.append(
- {
- 'id': row[0],
- 'name': row[1],
- 'target_site': row[2],
- 'file_title': row[3],
- 'set_name': row[4],
- 'serial': row[5],
- 'download_state': row[6],
- 'image_suffix': row[7],
- 'img_url': row[8],
- 'category': row[9]
- }
- )
- except psycopg2.Error as e:
- print(f"Database error: {e}")
- finally:
- cursor.close()
- conn.close()
- if load_data_list:
- return load_data_list
- else:
- print("没有需要下载的数据。")
- exit(0)
- def check_psql(self):
- try:
- conn = psycopg2.connect(**self.psql_params)
- except Exception as e:
- print(f"无法连接到数据库:{e}")
- exit(1)
- cur = conn.cursor()
- cur.execute("SELECT EXISTS(SELECT FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = %s)",
- (self.target,))
- exist = cur.fetchone()[0]
- if not exist:
- cur.execute(f"""
- CREATE TABLE {self.target} (
- id SERIAL PRIMARY KEY,
- name VARCHAR(255),
- target_site VARCHAR(255),
- file_title VARCHAR(255),
- set_name VARCHAR(255),
- serial INT,
- download_state BOOLEAN,
- image_suffix VARCHAR(50),
- img_url TEXT,
- category VARCHAR(255)
- );
- """)
- print(f"表 '{self.target}' 创建成功。")
- conn.commit()
- cur.close()
- conn.close()
- def check_local_downloads_dir(self):
- download_file_path = os.path.join(str(self.project_root), 'downloads')
- if not os.path.exists(download_file_path):
- os.mkdir(download_file_path)
- target_file_path = os.path.join(download_file_path, self.target)
- if not os.path.exists(target_file_path):
- os.mkdir(target_file_path)
- return target_file_path
- def check_target_url_txt(self):
- txt_file_name = 'target_link.txt'
- if not os.path.exists(txt_file_name):
- with open(txt_file_name, 'w') as file:
- file.write('')
- print('需要在 target_link.txt 中填写目标链接')
- exit(0)
- else:
- with open('target_link.txt', 'r') as f:
- targets = [target.strip() for target in f.readlines()]
- if not targets:
- print('在 target_link.txt 中未找到目标链接')
- exit(0)
- return targets
- def get_psql_params(self):
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(('10.255.255.255', 1))
- IP = s.getsockname()[0]
- s.close()
- if '192.168.100' not in IP:
- return {
- "host": "home.erhe.link",
- "port": 55434,
- "user": "psql",
- "password": "psql",
- "dbname": "collect"
- }
- else:
- return {
- "host": "192.168.100.146",
- "port": 5434,
- "user": "psql",
- "password": "psql",
- "dbname": "collect"
- }
- def save_error_urls(self, error_url):
- error_txt_path = os.path.join(str(self.project_root), str(self.target), 'error_url.txt')
- if not os.path.exists(error_txt_path):
- open(error_txt_path, 'w').close()
- with open(error_txt_path, 'r') as f:
- existing_urls = f.read().splitlines()
- if error_url in existing_urls:
- return
- with open(error_txt_path, 'a') as f:
- f.write(error_url + '\n')
- if __name__ == '__main__':
- collector = ImageCollector()
- collector.check_psql()
- if collector.step == 1:
- targets = collector.check_target_url_txt()
- # 下面二选一
- # collector.open_browser(targets) # 用浏览器
- collector.get_img_set_urls(targets) # 用 httpx
- elif collector.step == 2:
- load_data = collector.load_data()
- target_file_path = collector.check_local_downloads_dir()
- collector.download_img(load_data, target_file_path)
- print('下载完成, 程序退出')
- elif collector.step == 3:
- targets = collector.check_target_url_txt()
- # 下面二选一
- # collector.open_browser(targets) # 用浏览器
- collector.get_img_set_urls(targets) # 用 httpx
- load_data = collector.load_data()
- target_file_path = collector.check_local_downloads_dir()
- collector.download_img(load_data, target_file_path)
- print('下载完成, 程序退出')
- elif collector.step == 4:
- pass
- else:
- pass
|