| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- # -*- coding: utf-8 -*-
- import os
- import re
- import json
- from playwright.async_api import async_playwright
- import asyncio
- from bs4 import BeautifulSoup
- from api_ollama import *
- from api_kimi import *
- from api_deepseek import *
- from send_to_email import *
- key = 'A'
- class AINEWS:
- def save_to_txt(self, text):
- current_file_path = os.path.dirname(__file__)
- save_file_path = os.path.join(current_file_path, 'save_txt')
- if not os.path.exists(save_file_path):
- os.makedirs(save_file_path)
- file = os.path.join(save_file_path, str(int(time.time())) + '.txt')
- with open(file, 'w', encoding='utf-8') as file:
- file.write(text)
- def load_config(self, key):
- config = {}
- if os.path.exists('config.json'):
- with open('config.json', 'r', encoding='utf-8') as f:
- config = json.load(f)
- if not config:
- print('config.json is not exist!')
- exit()
- k = config[key]
- return k['target_url_list'], k['prompt_words']
- @staticmethod
- async def scroll_to_percentage(page):
- percentage_list = [i for i in range(5, 101, 2)]
- for percentage in percentage_list:
- # 计算页面的指定百分比高度
- height = await page.evaluate("() => document.body.scrollHeight")
- scroll_position = height * (percentage / 100)
- # 跳转到指定的百分比位置
- await page.evaluate(f"window.scrollTo({{top: {scroll_position}, behavior: 'smooth'}})")
- await asyncio.sleep(0.5) # 使用异步 sleep
- await page.evaluate("window.scrollTo({top: 0, behavior: 'smooth'})")
- async def get_htmls(self, urls):
- htmls = []
- async with async_playwright() as p:
- # 启动浏览器
- browser = await p.chromium.launch(headless=True)
- # 创建浏览器上下文
- context = await browser.new_context()
- async def get_html(url):
- try:
- print(f'正在打开: {url}')
- # 在上下文中打开新页面
- page = await context.new_page()
- # 监听导航事件
- def handle_navigation(frame):
- print(f"导航发生: {frame.url}")
- page.on('framenavigated', handle_navigation)
- # 导航到指定网址
- await page.goto(url, wait_until='networkidle') # 等待网络空闲
- # 滚动页面, 获取更多信息
- await self.scroll_to_percentage(page)
- # 获取渲染后的 HTML
- html = await page.content()
- # 关闭页面
- await page.close()
- return html
- except Exception as e:
- print(f"Error fetching {url}: {e}")
- return ""
- # 使用 asyncio.gather 同时获取所有网站的 HTML
- tasks = [get_html(url) for url in urls]
- htmls_list = await asyncio.gather(*tasks)
- # 使用 BeautifulSoup 格式化每个 HTML 内容
- formatted_htmls = []
- for html in htmls_list:
- soup = BeautifulSoup(html, 'html.parser')
- formatted_html = soup.get_text()
- cleaned_text = re.sub(r'[\n\t\r]+', ' ', formatted_html)
- cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()
- formatted_htmls.append(cleaned_text)
- # 将所有格式化后的 HTML 内容合并到一个字符串中
- text = "\n".join(formatted_htmls)
- # 关闭上下文和浏览器
- await context.close()
- await browser.close()
- return text
- def main(self, target_url_list, prompt_words):
- # text = asyncio.run(self.get_htmls(target_url_list))
- #
- # self.save_to_txt(text)
- #
- # prompt_words += text
- C = ChatBot('http://192.168.31.28:11434', '你好', 'llava:13b')
- response_context = C.start_chat()
- print(response_context)
- # K = KIMI()
- # response_context = K.call_kimi(prompt_words)
- # print(response_context)
- # D = DeepSeek()
- # response_context = D.call_deepseek(prompt_words)
- # print(response_context)
- if __name__ == "__main__":
- ainews = AINEWS()
- target_url_list, prompt_words = ainews.load_config(key)
- ainews.main(target_url_list, prompt_words)
- print('done!')
|