# -*- coding: utf-8 -*- import os import re import json import httpx import asyncio import time from bs4 import BeautifulSoup from ollama import Client as oClient from playwright.async_api import async_playwright from matrix_client.client import MatrixClient from matrix_client.api import MatrixHttpApi key_list = ['web3'] text_batch = 0 class OllamaChat(object): def __init__(self, host='http://192.168.31.28:11434'): self.host = host def call_ollama(self, role, text, prompt_words, model='llava:13b', temperature=0.4): # 使用 ollama 里面的模型 message = text + '\n\n' + prompt_words print(f'use model: {model}') try: response_iter = oClient(host=self.host).chat(model=model, messages=[ {'role': 'system', 'content': role}, {'role': 'user', 'content': message} ], options={"temperature": temperature}, stream=False) return response_iter['message']['content'] except Exception as e: print(f"\n发生错误: {e}") return None class MatrixBot: def __init__(self, user, password): self.base_url = "https://matrix.erhe.top" self.user = user self.password = password self.client = MatrixClient("https://matrix.erhe.top") self.token = self.login() self.to = "!CgWvWEnLbKYvhXLvil:chat.abeginner.cn" def login(self): self.token = self.client.login(username=self.user, password=self.password) return self.token def send_message(self, message): if self.token: try: api = MatrixHttpApi(self.base_url, token=self.token) api.send_message(self.to, message) except Exception as e: print(e) api = MatrixHttpApi(self.base_url, token=self.token) api.send_message(self.to, str(e)) else: print("Bot is not logged in. Please login first.") class AINEWS: def create_config_if_not_exists(self): # 如果当前路径无 config 则新建 config.json, 并写入一个配置例子 current_dir = os.path.dirname(os.path.abspath(__file__)) # 获取当前文件的目录路径 # 构建 config.json 文件的完整路径 config_path = os.path.join(current_dir, 'config.json') # 检查 config.json 文件是否存在 if not os.path.exists(config_path): # 如果不存在,创建并写入默认的 JSON 数据 default_config = { "example": { "use_browser": 0, "ai_host": 'http://127.0.0.1:11434', "target_url_list": ['目标网站'], "role": "AI的角色, 例如: 你是一个聊天机器人", "prompt_words": "提示词: 帮我总结, 用中文回复" } } # 写入 JSON 数据到 config.json 文件 with open(config_path, 'w', encoding='utf-8') as f: json.dump(default_config, f, indent=4) print(f"Created {config_path} with default configuration.") def save_to_txt(self, url_to_text): # 将爬取的新闻 保存到 txt 文件中 current_file_path = os.path.dirname(__file__) save_file_path = os.path.join(current_file_path, 'save_txt') if not os.path.exists(save_file_path): os.makedirs(save_file_path) file = os.path.join(save_file_path, str(int(time.time())) + '.txt') with open(file, 'w', encoding='utf-8') as file: file.write(str(url_to_text)) def load_config(self, key): # 读取配置文件 config.json config = {} if os.path.exists('config.json'): with open('config.json', 'r', encoding='utf-8') as f: config = json.load(f) if not config: print('config.json is not exist!') exit(0) k = config[key] return k['target_url_list'], k['prompt_words'], k['role'], k['use_browser'], k['ai_host'] async def get_htmls(self, urls): # 获取 HTML async with httpx.AsyncClient() as client: async def get_html(url): try: print(f'正在打开: {url}') # 发送 GET 请求获取页面内容 response = await client.get(url) response.raise_for_status() # 确保请求成功 # 使用 BeautifulSoup 解析 HTML 内容 soup = BeautifulSoup(response.text, 'html.parser') # 提取纯文本内容 text = soup.get_text(separator=' ', strip=True) # 去除多余的空白字符 cleaned_text = re.sub(r'\s+', ' ', text).strip() return url, cleaned_text except Exception as e: print(f"Error fetching {url}: {e}") return url, "" # 使用 asyncio.gather 同时获取所有网站的 HTML tasks = [get_html(url) for url in urls] results = await asyncio.gather(*tasks) # 将结果存储在字典中 url_to_text = {url: text for url, text in results} return url_to_text async def get_htmls_with_browser(self, urls): # 使用 Playwright 获取 HTML 内容 url_to_text = {} async with async_playwright() as p: # 启动浏览器 browser = await p.chromium.launch(headless=True) # 创建浏览器上下文 context = await browser.new_context() async def get_html(url): try: print(f'正在打开: {url}') # 在上下文中打开新页面 page = await context.new_page() # 导航到指定网址 await page.goto(url) # 滚动页面以加载动态内容 await self.scroll_to_percentage(page) # 获取渲染后的 HTML html = await page.content() # 使用 BeautifulSoup 解析 HTML 内容 soup = BeautifulSoup(html, 'html.parser') # 提取纯文本内容 text = soup.get_text(separator=' ', strip=True) # 去除多余的空白字符 cleaned_text = re.sub(r'\s+', ' ', text).strip() # 关闭页面 await page.close() return url, cleaned_text except Exception as e: print(f"Error fetching {url}: {e}") return url, "" # 使用 asyncio.gather 同时获取所有网站的 HTML tasks = [get_html(url) for url in urls] results = await asyncio.gather(*tasks) # 将结果存储在字典中 url_to_text = {url: text for url, text in results} # 关闭上下文和浏览器 await context.close() await browser.close() return url_to_text @staticmethod async def scroll_to_percentage(page): # 获取页面标题并打印 title = await page.title() print(f'正在滚动浏览器页面: {title}') percentage_list = [i for i in range(5, 101, 2)] for percentage in percentage_list: # 计算页面的指定百分比高度 height = await page.evaluate("() => document.body.scrollHeight") scroll_position = height * (percentage / 100) # 跳转到指定的百分比位置 await page.evaluate(f"window.scrollTo({{top: {scroll_position}, behavior: 'smooth'}})") await asyncio.sleep(0.5) # 使用异步 sleep await page.evaluate("window.scrollTo({top: 0, behavior: 'smooth'})") def process_data(self, result_text, prompt_words, role): # 整理获取的数据, 返回准备发送的数据 process_send = [] O = OllamaChat(ai_host) if text_batch: for k, v in result_text.items(): response_context = O.call_ollama(role, v, prompt_words) if response_context: message = f'{k}\n{response_context}\n' process_send.append(message) else: t = '' for k, v in result_text.items(): t += f'{k}\n{v}\n' response_context = O.call_ollama(role, t, prompt_words) if response_context: process_send.append(response_context) return process_send def main(self, target_url_list, prompt_words, role, use_browser, ai_host): # 获取所有的网页html内容 if use_browser: result_text = asyncio.run(self.get_htmls_with_browser(target_url_list)) else: result_text = asyncio.run(self.get_htmls(target_url_list)) # 保存文本 self.save_to_txt(result_text) # # 如果只需要保存爬取数据, 不使用 AI, 注释下面 # # 创建消息bot实例 # bot = MatrixBot('message-bot', 'aaaAAA111!!!') # # # 处理发送 text 数据 # process_send = self.process_data(result_text, prompt_words, role, ai_host) # # # 发送消息 # for process_text in process_send: # bot.send_message(process_text) if __name__ == "__main__": ainews = AINEWS() ainews.create_config_if_not_exists() for key in key_list: target_url_list, prompt_words, role, use_browser, ai_host = ainews.load_config(key) ainews.main(target_url_list, prompt_words, role, use_browser, ai_host) print('done!')