# -*- coding: utf-8 -*- import sys import os import time import random sys.path.append(os.path.join(os.path.abspath(__file__).split('ResourceCollection')[0] + 'ResourceCollection')) import httpx from playwright.sync_api import sync_playwright title_selector = '#pack-view__inner > section.pack-view__header > h1' selector = '#pack-view__inner > section.search-result > ul > li:nth-child({}) > div > a > img' img_selector = '#detail > div > div.row.detail__top.mg-none > section > div > div > div.row.row--vertical-center.mg-none.full-height.detail__icon__inner > div > div > img' img_count_selector = '#pack-view__inner > section.pack-view__header > p' not_find_page_selector = '#viewport > div.errorpage.e404 > h1' def main(target_urls): all_data = {} for target_url in target_urls: urls, file_path, title = open_browser(target_url) all_data[title] = [urls, file_path, title] for data in all_data: urls = all_data[data][0] file_path = all_data[data][1] title = all_data[data][2] while True: if download_img(urls, file_path): print(f'All images have been downloaded: {title}') break else: print(f'Some images have not been downloaded, continue downloading {title}') print('\n\n') print('All Done') def open_browser(target_url): pages = '/{}' urls = [] file_path = '' # 存放图片的文件夹 title = '' # 存放当前页面的title with sync_playwright() as playwright: browser = playwright.webkit.launch( headless=True, proxy={"server": "http://127.0.0.1:7890"} ) context = browser.new_context(viewport={'width': 1280, 'height': 700}) page = context.new_page() img_sequence_num = 1 for page_count in range(1, 999): try: goto_url = target_url + pages.format(page_count) page.goto(goto_url, timeout=5000) except Exception as e: print(e) print(f'Page load failed: url is : {goto_url}') # 检查一下当前页面是不是 404 try: page.wait_for_selector(not_find_page_selector, state="attached", timeout=2000) print(f'Total page is {page_count - 1} in url: {goto_url}') break except: pass if page_count == 1: # 获取title page.wait_for_selector(title_selector, state="attached", timeout=10000) title = page.query_selector(title_selector).inner_text() img_count = page.query_selector(img_count_selector).inner_text() img_count = int(img_count.split(' ')[0]) invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*', '.', ' ', 'Icon Pack '] for char in invalid_chars: title = title.replace(char, '') img_name = title.replace(' ', '_') current_path = os.getcwd() download_file_path = os.path.join(current_path, 'download') if not os.path.exists(download_file_path): os.mkdir(download_file_path) file_path = os.path.join(download_file_path, title) if not os.path.exists(file_path): os.mkdir(file_path) for i in range(1, img_count + 1): # 选择所有的标签 elements = page.query_selector_all(selector.format(i)) # 遍历所有标签,提取href属性 for element in elements: src = element.get_attribute('src') if src: src = src.replace('/128/', '/512/') sequence = str(img_sequence_num).zfill(3) urls.append({ 'url': src, 'img': f'{img_name}_{sequence}.png' }) img_sequence_num += 1 break print(f'All image URLs have been obtained. Total img {len(urls)}') page.close() browser.close() return urls, file_path, title def download_img(urls, file_path): all_done = True print('Downloading pictures') for url in urls: # 如果png文件存在, 即已经下载过, 直接跳过 target_img_url = url['url'] img_png_name = url['img'] target_img_name = os.path.join(file_path, img_png_name) if os.path.exists(target_img_name): print(f'The image {img_png_name} already exists. continue!') continue try: resp = httpx.get(target_img_url, headers={ "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36" }) with open(target_img_name, 'wb') as f: f.write(resp.content) print(f'Downloaded: {img_png_name}') time.sleep(random.uniform(1, 2)) except Exception as e: print(f'\nFailed to download image: {target_img_url}. err: {e}\n') time.sleep(random.uniform(3, 5)) all_done = False return all_done if __name__ == "__main__": txt_file_name = 'target_link.txt' if not os.path.exists(txt_file_name): with open(txt_file_name, 'w') as file: file.write('') print('Need to fill in the target link in target_link.txt') exit(0) else: with open('target_link.txt', 'r') as f: targets = [target.strip() for target in f.readlines()] if not targets: print('No target link found in target_link.txt') exit(0) print(f'target link is : {targets}') main(targets)