| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- import time
- import re
- import os
- import sqlite3
- import httpx
- from playwright.sync_api import sync_playwright
- current_dir_path = os.path.dirname(os.path.abspath(__file__))
- comico_key = 'OMzNzNS'
- base_url = 'https://www.dumanwu.com'
- target_url = base_url + '/' + comico_key
- download_folder = os.path.join(current_dir_path, 'downloads')
- if not os.path.exists(download_folder):
- os.mkdir(download_folder)
- def write_db(title, db_path, chapter_folder_name, chapter_url):
- conn = sqlite3.connect(db_path)
- cursor = conn.cursor()
- cursor.execute(
- f'CREATE TABLE IF NOT EXISTS {title} (id INTEGER PRIMARY KEY AUTOINCREMENT, chapter_name TEXT, url TEXT, state BOOLEAN DEFAULT 0)'
- )
- conn.commit()
- # 检查chapter_name是否已存在
- cursor.execute(
- f'SELECT EXISTS(SELECT 1 FROM {title} WHERE chapter_name = ?)', (chapter_folder_name,))
- exists = cursor.fetchone()[0]
- if not exists:
- # 如果不存在,则插入新记录
- cursor.execute(f'INSERT INTO {title} (chapter_name, url) VALUES (?, ?)', (chapter_folder_name, chapter_url))
- conn.commit()
- cursor.close()
- conn.close()
- def load_db(title, db_path):
- conn = sqlite3.connect(db_path)
- cursor = conn.cursor()
- cursor.execute(f'SELECT * FROM {title} WHERE state = 0 ORDER BY id ASC')
- rows = cursor.fetchall()
- cursor.close()
- conn.close()
- return rows
- def fetch_page_title(target_url):
- with httpx.Client(verify=False) as client: # 设置不验证证书
- response = client.get(target_url)
- if response.status_code != 200:
- print(f'Error: {response.status_code}')
- exit(0)
- title = re.findall(r'<p></p><h1 class="name_mh">(.*?)</h1><p></p>', response.text)
- if title:
- return title[0]
- else:
- print("Title not found")
- exit(0)
- def fetch_chapter_data():
- with sync_playwright() as playwright:
- browser = playwright.chromium.launch(
- headless=True,
- args=['--ignore-certificate-errors']
- )
- page = browser.new_page()
- page.goto(target_url)
- time.sleep(1)
- button_selector = 'body > div > div > div.forminfo > div.chapterList > div.chapterlistload > div > button'
- for i in range(3):
- try:
- page.click(button_selector)
- break
- except Exception as e:
- pass
- page.wait_for_timeout(1000)
- source = page.content()
- ul_list = re.findall('<ul>(.*?)</ul>', source, re.DOTALL)
- if len(ul_list) > 0:
- ul_list = ul_list[0]
- else:
- return False
- chapter_url_list = re.findall('<a href="(.*?)">', ul_list)
- chapter_name_list = re.findall('<li>(.*?)</li>', ul_list)
- chapter_url_list = chapter_url_list[::-1]
- chapter_name_list = chapter_name_list[::-1]
- result = {}
- chapter_count = 1
- for chapter_name, chapter_url in zip(chapter_name_list, chapter_url_list):
- chapter_count_str = str(chapter_count).zfill(4)
- chapter_url = base_url + chapter_url
- result[chapter_count_str] = (chapter_name, chapter_url)
- chapter_count += 1
- browser.close()
- return result
- def fetch_images(data, chapter_folder_name):
- data_id = data[0]
- chapter_url = data[2]
- with sync_playwright() as playwright:
- browser = playwright.chromium.launch(
- headless=False,
- args=['--ignore-certificate-errors']
- )
- page = browser.new_page()
- page.goto(chapter_url)
- time.sleep(1)
- html_content = page.content() # 获取渲染后的整个页面HTML
- img_list = re.findall('<div class="main_img"><div class="chapter-img-box">([\S\s]*?)</a></div>', html_content)
- img_list = img_list[0]
- urls = re.findall('<img (src="|data-src=")(.*?)"', img_list)
- for url in urls:
- page.goto(url)
- browser.close()
- def main():
- print(target_url)
- # ------------------------------ step1 ------------------------------
- title = fetch_page_title(target_url)
- comico_folder = os.path.join(download_folder, title)
- if not os.path.exists(comico_folder):
- os.mkdir(comico_folder)
- # 创建 chapter db, 保存 chapter 数据
- db_path = os.path.join(comico_folder, 'comico.db')
- # 获取章节的 title, url
- chapter_data = fetch_chapter_data()
- for k, v in chapter_data.items():
- chapter_url = v[1]
- write_db(title, db_path, k + '_' + v[0], chapter_url)
- # ------------------------------ step2 ------------------------------
- all_data = load_db(title, db_path)
- for data in all_data:
- chapter_folder_name = os.path.join(comico_folder, data[1])
- if not os.path.exists(chapter_folder_name):
- os.mkdir(chapter_folder_name)
- fetch_images(data, chapter_folder_name)
- time.sleep(999)
- if __name__ == '__main__':
- main()
|