# -*- coding: utf-8 -*- import os import re import json from playwright.async_api import async_playwright import asyncio from bs4 import BeautifulSoup from api_ollama import * from api_kimi import * from api_deepseek import * from send_matrix import * key = 'web3' class AINEWS: def create_config_if_not_exists(self): # 获取当前文件的目录路径 current_dir = os.path.dirname(os.path.abspath(__file__)) # 构建 config.json 文件的完整路径 config_path = os.path.join(current_dir, 'config.json') # 检查 config.json 文件是否存在 if not os.path.exists(config_path): # 如果不存在,创建并写入默认的 JSON 数据 default_config = { "example": { "target_url_list": [], "prompt_words": "", "role": "" } } # 写入 JSON 数据到 config.json 文件 with open(config_path, 'w', encoding='utf-8') as f: json.dump(default_config, f, indent=4) print(f"Created {config_path} with default configuration.") def save_to_txt(self, url_to_text): current_file_path = os.path.dirname(__file__) save_file_path = os.path.join(current_file_path, 'save_txt') if not os.path.exists(save_file_path): os.makedirs(save_file_path) file = os.path.join(save_file_path, str(int(time.time())) + '.txt') with open(file, 'w', encoding='utf-8') as file: file.write(str(url_to_text)) print(f'txt文件已保存, 路径为: {file}') def load_config(self, key): config = {} if os.path.exists('config.json'): with open('config.json', 'r', encoding='utf-8') as f: config = json.load(f) if not config: print('config.json is not exist!') exit(0) k = config[key] return k['target_url_list'], k['prompt_words'], k['role'] @staticmethod async def scroll_to_percentage(page): percentage_list = [i for i in range(5, 101, 2)] for percentage in percentage_list: # 计算页面的指定百分比高度 height = await page.evaluate("() => document.body.scrollHeight") scroll_position = height * (percentage / 100) # 跳转到指定的百分比位置 await page.evaluate(f"window.scrollTo({{top: {scroll_position}, behavior: 'smooth'}})") await asyncio.sleep(0.5) # 使用异步 sleep await page.evaluate("window.scrollTo({top: 0, behavior: 'smooth'})") async def get_htmls(self, urls): async with async_playwright() as p: # 启动浏览器 browser = await p.chromium.launch(headless=True) # 创建浏览器上下文 context = await browser.new_context() async def get_html(url): try: print(f'正在打开: {url}') # 在上下文中打开新页面 page = await context.new_page() # 导航到指定网址 await page.goto(url, wait_until='networkidle') # 等待网络空闲 # 滚动页面, 获取更多信息 await self.scroll_to_percentage(page) # 获取渲染后的 HTML html = await page.content() # 关闭页面 await page.close() # 使用 BeautifulSoup 格式化 HTML 内容 soup = BeautifulSoup(html, 'html.parser') formatted_html = soup.get_text() cleaned_text = re.sub(r'[\n\t\r]+', ' ', formatted_html) cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip() return url, cleaned_text except Exception as e: print(f"Error fetching {url}: {e}") return url, "" # 使用 asyncio.gather 同时获取所有网站的 HTML tasks = [get_html(url) for url in urls] results = await asyncio.gather(*tasks) # 将结果存储在字典中 url_to_text = {url: text for url, text in results} # 关闭上下文和浏览器 await context.close() await browser.close() return url_to_text def main(self, target_url_list, prompt_words, role): url_to_text = asyncio.run(self.get_htmls(target_url_list)) # 创建消息bot实例 bot = MatrixBot('message-bot', 'aaaAAA111!!!') self.save_to_txt(url_to_text) O = OllamaChat() for k, v in url_to_text.items(): response_context = O.call_ollama('http://127.0.0.1:11434', role, v, prompt_words) message = f'{k}\n{response_context}\n' # 发送消息 bot.send_message(message) # K = KIMI() # response_context = K.call_kimi(prompt_words) # print(response_context) # D = DeepSeek() # for k, v in url_to_text.items(): # response_context = D.call_deepseek(v, prompt_words) # # 保存每一个字符串准备发送信息 # message = f'{k}\n{response_context}\n' # print(message) if __name__ == "__main__": ainews = AINEWS() ainews.create_config_if_not_exists() target_url_list, prompt_words, role = ainews.load_config(key) ainews.main(target_url_list, prompt_words, role) print('done!')