# -*- coding: utf-8 -*-
import sys
import os
import time
import random
sys.path.append(os.path.join(os.path.abspath(__file__).split('ResourceCollection')[0] + 'ResourceCollection'))
import httpx
from playwright.sync_api import sync_playwright
title_selector = '#pack-view__inner > section.pack-view__header > h1'
selector = '#pack-view__inner > section.search-result > ul > li:nth-child({}) > div > a > img'
img_selector = '#detail > div > div.row.detail__top.mg-none > section > div > div > div.row.row--vertical-center.mg-none.full-height.detail__icon__inner > div > div > img'
img_count_selector = '#pack-view__inner > section.pack-view__header > p'
not_find_page_selector = '#viewport > div.errorpage.e404 > h1'
def main(target_urls):
all_data = {}
for target_url in target_urls:
urls, file_path, title = open_browser(target_url)
all_data[title] = [urls, file_path, title]
for data in all_data:
urls = all_data[data][0]
file_path = all_data[data][1]
title = all_data[data][2]
while True:
if download_img(urls, file_path):
print(f'All images have been downloaded: {title}')
break
else:
print(f'Some images have not been downloaded, continue downloading {title}')
print('\n\n')
print('All Done')
def open_browser(target_url):
pages = '/{}'
urls = []
file_path = '' # 存放图片的文件夹
title = '' # 存放当前页面的title
with sync_playwright() as playwright:
browser = playwright.webkit.launch(
headless=True,
proxy={"server": "http://127.0.0.1:7890"}
)
context = browser.new_context(viewport={'width': 1280, 'height': 700})
page = context.new_page()
img_sequence_num = 1
for page_count in range(1, 999):
try:
goto_url = target_url + pages.format(page_count)
page.goto(goto_url, timeout=5000)
except Exception as e:
print(e)
print(f'Page load failed: url is : {goto_url}')
# 检查一下当前页面是不是 404
try:
page.wait_for_selector(not_find_page_selector, state="attached", timeout=2000)
print(f'Total page is {page_count - 1} in url: {goto_url}')
break
except:
pass
if page_count == 1:
# 获取title
page.wait_for_selector(title_selector, state="attached", timeout=10000)
title = page.query_selector(title_selector).inner_text()
img_count = page.query_selector(img_count_selector).inner_text()
img_count = int(img_count.split(' ')[0])
invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*', '.', ' ', 'Icon Pack ']
for char in invalid_chars:
title = title.replace(char, '')
img_name = title.replace(' ', '_')
current_path = os.getcwd()
download_file_path = os.path.join(current_path, 'download')
if not os.path.exists(download_file_path):
os.mkdir(download_file_path)
file_path = os.path.join(download_file_path, title)
if not os.path.exists(file_path):
os.mkdir(file_path)
for i in range(1, img_count + 1):
# 选择所有的标签
elements = page.query_selector_all(selector.format(i))
# 遍历所有标签,提取href属性
for element in elements:
src = element.get_attribute('src')
if src:
src = src.replace('/128/', '/512/')
sequence = str(img_sequence_num).zfill(3)
urls.append({
'url': src,
'img': f'{img_name}_{sequence}.png'
})
img_sequence_num += 1
break
print(f'All image URLs have been obtained. Total img {len(urls)}')
page.close()
browser.close()
return urls, file_path, title
def download_img(urls, file_path):
all_done = True
print('Downloading pictures')
for url in urls:
# 如果png文件存在, 即已经下载过, 直接跳过
target_img_url = url['url']
img_png_name = url['img']
target_img_name = os.path.join(file_path, img_png_name)
if os.path.exists(target_img_name):
print(f'The image {img_png_name} already exists. continue!')
continue
try:
resp = httpx.get(target_img_url, headers={
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
})
with open(target_img_name, 'wb') as f:
f.write(resp.content)
print(f'Downloaded: {img_png_name}')
time.sleep(random.uniform(1, 2))
except Exception as e:
print(f'\nFailed to download image: {target_img_url}. err: {e}\n')
time.sleep(random.uniform(3, 5))
all_done = False
return all_done
if __name__ == "__main__":
txt_file_name = 'target_link.txt'
if not os.path.exists(txt_file_name):
with open(txt_file_name, 'w') as file:
file.write('')
print('Need to fill in the target link in target_link.txt')
exit(0)
else:
with open('target_link.txt', 'r') as f:
targets = [target.strip() for target in f.readlines()]
if not targets:
print('No target link found in target_link.txt')
exit(0)
print(f'target link is : {targets}')
main(targets)