| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- # -*- coding: utf-8 -*-
- import platform
- import time
- import random
- from datetime import datetime
- import re
- import os
- from pymongo import MongoClient
- from selenium import webdriver
- from selenium.webdriver.chrome.options import Options
- from selenium.webdriver.support.ui import WebDriverWait
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.common.by import By
- import httpx
- def browser_opt():
- # 浏览器打开前, 设置浏览器
- os_name = platform.system()
- chrome_options = Options()
- chrome_options.add_argument('--no-sandbox')
- chrome_options.add_argument('--disable-setuid-sandbox')
- chrome_options.add_argument('--disable-gpu')
- chrome_options.add_argument('--headless') # 添加无头模式参数
- # chrome_options.add_argument('--incognito') # 隐身模式(无痕模式)
- # chrome_options.binary_location = r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe" # 手动指定使用的浏览器位置
- if os_name == 'Linux':
- chrome_options.add_argument('/usr/bin/chromium') # linux 必须指定chromium路径
- else:
- pass # 其他系统不需要指定路径
- browser = webdriver.Chrome(options=chrome_options)
- return browser
- def browser_open(browser, url):
- # 打开浏览器
- browser.get(url)
- time.sleep(random.uniform(1, 2))
- return browser
- def browser_get_page_source(browser):
- # 获取当前页面源代码
- return browser.page_source
- def browser_find_by_selector(browser, selector):
- # 通过 css 选择器搜素
- try:
- WebDriverWait(browser, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, selector)))
- element = browser.find_element(By.CSS_SELECTOR, selector)
- if not element:
- return None
- return element.text
- except Exception as e:
- print(e)
- return None
- def browser_screenshot(browser):
- # 获取当前网页的标题
- title = browser.title
- # 获取当前时间的时间戳
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- # 构建文件名
- filename = f"{title.replace(' ', '')}_{timestamp}.png"
- # 保存截图
- browser.save_screenshot(filename)
- print(f"保存截图文件: {filename}")
- def browser_close(browser):
- browser.close()
- def sanitize_filename(string):
- # 替换Windows不允许的字符
- allowed_chars = re.compile(r'[<>:"/\\|?*]')
- sanitized_filename = allowed_chars.sub('', string)
- # 替换空格为下划线
- sanitized_filename = sanitized_filename.replace(' ', '_')
- # 确保文件名不以点开头
- if sanitized_filename.startswith('.'):
- sanitized_filename = '_' + sanitized_filename[1:]
- # 确保文件名不包含两个连续的点
- sanitized_filename = sanitized_filename.replace('..', '.')
- # 确保文件名不是空字符串
- if not sanitized_filename:
- sanitized_filename = 'noname' + '_' + str(int(time.time()))
- return sanitized_filename
- def task1():
- browser = browser_opt()
- print(f'正在打开浏览器')
- browser = browser_open(browser, url)
- print(f'前往 url: {url}')
- page_source = browser_get_page_source(browser)
- # 获取漫画名, 作为文件夹名
- book_name = re.findall('meta property="og:novel:book_name" content="(.*?)"', page_source)
- if book_name:
- book_name = book_name[0]
- book_name = sanitize_filename(book_name)
- else:
- print("获取漫画名称失败")
- exit(0)
- # 获取每一集的url
- all_set = []
- host = 'https://zcymh.com'
- start_tag = '<ol class="chapter-list col-4 text" id="j_chapter_list">'
- end_tag = '</ol>'
- start_index = page_source.find(start_tag)
- end_index = page_source.find(end_tag, start_index)
- if start_index != -1 and end_index != -1:
- target_element = page_source[start_index + len(start_tag):end_index]
- pattern = r'<a title="(.*?)" href="(.*?)" target="_self">'
- matches = re.findall(pattern, target_element)
- set_num = 1
- for match in matches:
- title = sanitize_filename(match[0])
- set_url = host + match[1]
- # 观看顺序排序, 集名, 集url
- all_set.append([str(set_num).zfill(4), title, set_url])
- set_num += 1
- # 循环每一集的 url, 拿到每集的每一个图片, 存到一个总列表里面
- all_data_list = []
- for set_data in all_set:
- browser = browser_open(browser, set_data[2])
- page_source = browser_get_page_source(browser)
- page_list = re.findall('<img src="(.*?)" width', page_source)
- print(f'正在获取 {set_data[1]}')
- page_num = 1
- for page in page_list:
- # 此处是 db 或者 csv 的一行数据
- all_data_list.append({
- 'comico_serial': set_data[0],
- 'set_name': set_data[1],
- 'page_num': page_num,
- 'set_url': set_data[2],
- 'img_url': page,
- 'is_download': 0,
- })
- page_num += 1
- # 总列表储存所有数据, 存 mongodb
- conn = MongoClient(mongodb_link)
- db = conn[db_name]
- collection = db[book_name]
- for data in all_data_list:
- data_exists = collection.find_one({"img_url": data['img_url']})
- if data_exists is None:
- try:
- result = collection.insert_one(data)
- print(f"数据插入成功,ObjectId: {data['comico_serial']}\t{data['set_name']}\t{data['page_num']}")
- except Exception as e:
- print(f"数据插入失败,错误信息: {e}")
- else:
- print(f'数据已存在: {data}')
- comico_path = os.path.join(os.getcwd(), 'comico')
- if not os.path.exists(comico_path):
- os.makedirs(comico_path)
- # 写完所有数据, 创建一个文件夹
- file_path = os.path.join(comico_path, book_name)
- if not os.path.exists(file_path):
- os.mkdir(file_path)
- browser_close(browser)
- def task2():
- file_path = os.path.join(os.getcwd(), 'comico', load_book_name)
- if not os.path.exists(file_path):
- os.mkdir(file_path)
- client = MongoClient(mongodb_link)
- db = client[db_name]
- collection = db[load_book_name]
- # 还原is_download
- # for document in collection.find():
- # collection.update_one({'_id': document['_id']}, {'$set': {'is_download': 0}})
- # 读取集合中的所有文档
- try:
- for document in collection.find():
- if document['is_download'] == 0:
- # 执行你的代码
- try:
- resp = httpx.get(document['img_url'], headers=headers)
- if resp.status_code != 200:
- err = f'请求图片失败, 错误码: {resp.status_code}'
- raise Exception(err)
- set_file_name = document['comico_serial'] + '_' + sanitize_filename(document['set_name'])
- if not os.path.exists(os.path.join(file_path, set_file_name)):
- os.makedirs(os.path.join(file_path, set_file_name))
- img_name = str(document['page_num']).zfill(4)
- suffix = document['img_url'].split('.')[-1]
- img_path = file_path + '/' + set_file_name + '/' + img_name + '.' + suffix
- with open(img_path, 'wb') as f:
- f.write(resp.content)
- # 执行成功后,将is_download字段更新为1
- collection.update_one({'_id': document['_id']}, {'$set': {'is_download': 1}})
- print(f"已更新文档: {load_book_name}\t{document['comico_serial']}\t{document['set_name']}\t{document['page_num']}")
- except Exception as e:
- print(f"处理文档时发生错误:{e}")
- else:
- print("已下载,跳过")
- except Exception as e:
- print(f"读取集合时发生错误:{e}")
- # 关闭数据库连接
- client.close()
- if __name__ == "__main__":
- choose = 2
- mongodb_link = 'mongodb://root:aaaAAA111!!!@192.168.31.177:38000/'
- db_name = 'comico'
- if choose == 1:
- comico_id = '384'
- url = 'https://zcymh.com/manben/{}/'.format(comico_id)
- host = 'https://zcymh.com'
- task1()
- elif choose == 2:
- load_book_name = '诚如神之所说'
- headers = {'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36 Edg/127.0.0.0'}
- task2()
|