| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- # -*- coding: utf-8 -*-
- import os
- import time
- import httpx
- import asyncio
- import re
- import sqlite3
- from playwright.sync_api import sync_playwright
- comico_id = '419025'
- base_url = 'https://www.zhuimh.com'
- target_href_url = 'https://www.zhuimh.com/comic/'
- scroll_speed = 2
- current_dir_path = os.path.dirname(os.path.abspath(__file__))
- download_folder = os.path.join(current_dir_path, 'downloads')
- if not os.path.exists(download_folder):
- os.mkdir(download_folder)
- db_path = os.path.join(download_folder, 'zhuimh.db')
- txt_path = os.path.join(download_folder, 'target_comico_name.txt')
- def create_db(title):
- conn = sqlite3.connect(db_path)
- cursor = conn.cursor()
- cursor.execute(
- f'CREATE TABLE IF NOT EXISTS {title} (id INTEGER PRIMARY KEY AUTOINCREMENT, chapter_name TEXT, url TEXT, state BOOLEAN DEFAULT 0)'
- )
- conn.commit()
- cursor.close()
- conn.close()
- def write_to_db(title, chapter_name, url):
- conn = sqlite3.connect(db_path)
- cursor = conn.cursor()
- # 检查chapter_name是否已存在
- cursor.execute(
- f'SELECT EXISTS(SELECT 1 FROM {title} WHERE chapter_name = ?)', (chapter_name,))
- exists = cursor.fetchone()[0]
- if not exists:
- # 如果不存在,则插入新记录
- cursor.execute(f'INSERT INTO {title} (chapter_name, url) VALUES (?, ?)', (chapter_name, url))
- conn.commit()
- cursor.close()
- conn.close()
- async def async_get_chapter_list():
- async with httpx.AsyncClient() as client:
- chapters_data = {}
- response = await client.get(target_href_url + comico_id)
- if response.status_code == 200:
- text = response.text
- title = re.findall(r'<h4>(.*?)</h4>', text)
- title = title[0] if title else comico_id
- print(title)
- # 这里先创建一个 txt, 固定获取本次爬取的名称,下面读取数据库的时候, 需要通过这个名称作为表名读出来
- with open(txt_path, 'w', encoding='utf-8') as f:
- print('写入当前目标名称')
- f.write(title)
- chapters = re.findall(r'<li><a href="(.*?)">(.*?)</a></li>', text)
- for chapter in chapters:
- chapters_data[chapter[1]] = base_url + chapter[0]
- # 创建 sqlite 将数据存起来
- create_db(title)
- for chapter_name, url in chapters_data.items():
- write_to_db(title, chapter_name, url)
- print('数据ok')
- async def get_chapter_list():
- await async_get_chapter_list()
- def load_db(title):
- conn = sqlite3.connect(db_path)
- cursor = conn.cursor()
- cursor.execute(f'SELECT * FROM {title} WHERE state = 0 ORDER BY id ASC')
- rows = cursor.fetchall()
- cursor.close()
- conn.close()
- return rows
- def change_db_data_state(data_id, t_name):
- conn = sqlite3.connect(db_path)
- cursor = conn.cursor()
- table_name = t_name
- id_column = 'id'
- id_value = data_id
- bool_column = 'state'
- sql = f'UPDATE {table_name} SET {bool_column} = 1 WHERE {id_column} = ?'
- cursor.execute(sql, (id_value,))
- conn.commit()
- cursor.close()
- conn.close()
- def scroll_to_percentage(page):
- percentage_list = [i for i in range(5, 101, scroll_speed)]
- for percentage in percentage_list:
- # 计算页面的指定百分比高度
- height = page.evaluate("() => document.body.scrollHeight")
- scroll_position = height * (percentage / 100)
- # 跳转到指定的百分比位置
- page.evaluate(f"window.scrollTo({0}, {scroll_position})")
- time.sleep(0.5)
-
- def request_chapter_data(title, data_id, chapter_name, chapter_url):
- chapter_folder = os.path.join(current_dir_path, 'downloads', title, chapter_name)
- with sync_playwright() as playwright:
- try:
- browser = playwright.chromium.launch(headless=True)
- page = browser.new_page()
- page.goto(chapter_url)
- page.wait_for_load_state('networkidle')
- except Exception as e:
- print(e)
- return False
- # 滚动页面
- print('开始滚动页面')
- scroll_to_percentage(page)
- page.evaluate("window.scrollTo({top: 0, behavior: 'smooth'})")
- scroll_to_percentage(page)
- print('滚动完成')
- time.sleep(2)
- # 检查一下是否所有的图片都加载完成, 如果没有, 直接退出函数, 然后重新打开浏览器
- html_content = page.content()
- check_list = re.findall('img class="lazy-read" src="(.*?)"', html_content)
- for l in check_list:
- if 'lazy-read.gif' in l:
- return False
- # 创建章节文件夹
- if not os.path.exists(chapter_folder):
- os.makedirs(chapter_folder)
- # 获取匹配的元素数量
- total_images = page.locator('.lazy-read').count()
- for page_num in range(1, total_images+1):
- img_locator = f'body > div.chpater-images > img:nth-child({page_num})'
- img_path = os.path.join(chapter_folder, f'{str(page_num).zfill(3)}.png')
- page.locator(img_locator).screenshot(path=img_path)
- print(f'已下载 {img_path}')
- page_num += 1
- # 下载完当前章节后, 将state字段改为True
- print(f'{chapter_name} 已下载完成\n\n')
- change_db_data_state(data_id, title)
- browser.close()
- return True
- def main():
- asyncio.run(get_chapter_list())
- # 开始对每一页的章节进行爬取
- # 先读取当前的目标名称
- title = ''
- with open(txt_path, 'r', encoding='utf-8') as f:
- title = f.read()
- folder_name = os.path.join(download_folder, title)
- if not os.path.exists(folder_name):
- os.mkdir(folder_name)
- for retry in range(999):
- load_data = load_db(title)
- if not load_data:
- print('The database has no data or all done!')
- exit(0)
-
- for data in load_data:
- ok = True
- data_id = data[0]
- chapter_name = data[1]
- chapter_url = data[2]
- print(f'准备获取图片: {title} {chapter_name}')
- ok = request_chapter_data(title, data_id, chapter_name, chapter_url)
- if not ok:
- print(f'图片加载失败: {title} {chapter_name} 重试\n\n')
- time.sleep(5)
- break
- if __name__ == "__main__":
- main()
|