main.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. # -*- coding: utf-8 -*-
  2. import os
  3. import re
  4. import json
  5. from playwright.async_api import async_playwright
  6. import asyncio
  7. from bs4 import BeautifulSoup
  8. from api_ollama import *
  9. from api_kimi import *
  10. from api_deepseek import *
  11. from send_matrix import *
  12. key = 'web3'
  13. class AINEWS:
  14. def create_config_if_not_exists(self):
  15. # 获取当前文件的目录路径
  16. current_dir = os.path.dirname(os.path.abspath(__file__))
  17. # 构建 config.json 文件的完整路径
  18. config_path = os.path.join(current_dir, 'config.json')
  19. # 检查 config.json 文件是否存在
  20. if not os.path.exists(config_path):
  21. # 如果不存在,创建并写入默认的 JSON 数据
  22. default_config = {
  23. "example": {
  24. "target_url_list": [],
  25. "prompt_words": "",
  26. "role": ""
  27. }
  28. }
  29. # 写入 JSON 数据到 config.json 文件
  30. with open(config_path, 'w', encoding='utf-8') as f:
  31. json.dump(default_config, f, indent=4)
  32. print(f"Created {config_path} with default configuration.")
  33. def save_to_txt(self, url_to_text):
  34. current_file_path = os.path.dirname(__file__)
  35. save_file_path = os.path.join(current_file_path, 'save_txt')
  36. if not os.path.exists(save_file_path):
  37. os.makedirs(save_file_path)
  38. file = os.path.join(save_file_path, str(int(time.time())) + '.txt')
  39. with open(file, 'w', encoding='utf-8') as file:
  40. file.write(str(url_to_text))
  41. print(f'txt文件已保存, 路径为: {file}')
  42. def load_config(self, key):
  43. config = {}
  44. if os.path.exists('config.json'):
  45. with open('config.json', 'r', encoding='utf-8') as f:
  46. config = json.load(f)
  47. if not config:
  48. print('config.json is not exist!')
  49. exit(0)
  50. k = config[key]
  51. return k['target_url_list'], k['prompt_words'], k['role']
  52. @staticmethod
  53. async def scroll_to_percentage(page):
  54. percentage_list = [i for i in range(5, 101, 2)]
  55. for percentage in percentage_list:
  56. # 计算页面的指定百分比高度
  57. height = await page.evaluate("() => document.body.scrollHeight")
  58. scroll_position = height * (percentage / 100)
  59. # 跳转到指定的百分比位置
  60. await page.evaluate(f"window.scrollTo({{top: {scroll_position}, behavior: 'smooth'}})")
  61. await asyncio.sleep(0.5) # 使用异步 sleep
  62. await page.evaluate("window.scrollTo({top: 0, behavior: 'smooth'})")
  63. async def get_htmls(self, urls):
  64. async with async_playwright() as p:
  65. # 启动浏览器
  66. browser = await p.chromium.launch(headless=True)
  67. # 创建浏览器上下文
  68. context = await browser.new_context()
  69. async def get_html(url):
  70. try:
  71. print(f'正在打开: {url}')
  72. # 在上下文中打开新页面
  73. page = await context.new_page()
  74. # 导航到指定网址
  75. await page.goto(url, wait_until='networkidle') # 等待网络空闲
  76. # 滚动页面, 获取更多信息
  77. await self.scroll_to_percentage(page)
  78. # 获取渲染后的 HTML
  79. html = await page.content()
  80. # 关闭页面
  81. await page.close()
  82. # 使用 BeautifulSoup 格式化 HTML 内容
  83. soup = BeautifulSoup(html, 'html.parser')
  84. formatted_html = soup.get_text()
  85. cleaned_text = re.sub(r'[\n\t\r]+', ' ', formatted_html)
  86. cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()
  87. return url, cleaned_text
  88. except Exception as e:
  89. print(f"Error fetching {url}: {e}")
  90. return url, ""
  91. # 使用 asyncio.gather 同时获取所有网站的 HTML
  92. tasks = [get_html(url) for url in urls]
  93. results = await asyncio.gather(*tasks)
  94. # 将结果存储在字典中
  95. url_to_text = {url: text for url, text in results}
  96. # 关闭上下文和浏览器
  97. await context.close()
  98. await browser.close()
  99. return url_to_text
  100. def main(self, target_url_list, prompt_words, role):
  101. url_to_text = asyncio.run(self.get_htmls(target_url_list))
  102. # 创建消息bot实例
  103. bot = MatrixBot('message-bot', 'aaaAAA111!!!')
  104. self.save_to_txt(url_to_text)
  105. O = OllamaChat()
  106. for k, v in url_to_text.items():
  107. response_context = O.call_ollama('http://127.0.0.1:11434', role, v, prompt_words)
  108. message = f'{k}\n{response_context}\n'
  109. # 发送消息
  110. bot.send_message(message)
  111. # K = KIMI()
  112. # response_context = K.call_kimi(prompt_words)
  113. # print(response_context)
  114. # D = DeepSeek()
  115. # for k, v in url_to_text.items():
  116. # response_context = D.call_deepseek(v, prompt_words)
  117. # # 保存每一个字符串准备发送信息
  118. # message = f'{k}\n{response_context}\n'
  119. # print(message)
  120. if __name__ == "__main__":
  121. ainews = AINEWS()
  122. ainews.create_config_if_not_exists()
  123. target_url_list, prompt_words, role = ainews.load_config(key)
  124. ainews.main(target_url_list, prompt_words, role)
  125. print('done!')