message_rss_data_handel.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import smtplib
  2. from email.mime.text import MIMEText
  3. from email.header import Header
  4. import datetime
  5. import re
  6. import psycopg2
  7. from psycopg2 import Error
  8. from concurrent.futures import ThreadPoolExecutor, as_completed
  9. class FreshRSSDatabase:
  10. def __init__(self):
  11. self.hostname = 'erhe.top'
  12. self.port = 20788
  13. self.database = 'freshrss'
  14. self.user = 'freshrss'
  15. self.password = 'freshrss'
  16. self.conn = None
  17. self.keys = [
  18. {'web3新闻': 'web3|区块链|NFT|DeFi|NFT'},
  19. {'购物类新闻': '大疆|无人机|硬盘|鼠标|纸巾|穿越机'},
  20. {'coin新闻': 'btc|eth|sui|degen'}
  21. ]
  22. self.ellipsis = 300
  23. def connect(self):
  24. """连接到 PostgreSQL 数据库"""
  25. try:
  26. self.conn = psycopg2.connect(
  27. dbname=self.database,
  28. user=self.user,
  29. password=self.password,
  30. host=self.hostname,
  31. port=self.port
  32. )
  33. except Error as e:
  34. print(f"Error connecting to the database: {e}")
  35. raise # 重新抛出异常
  36. def execute_query(self, keywords):
  37. """执行 SQL 查询并返回结果"""
  38. if self.conn is None:
  39. self.connect()
  40. if self.conn is None:
  41. print("Database connection failed")
  42. return None
  43. try:
  44. cur = self.conn.cursor()
  45. conditions = [f"title ILIKE '%{keyword}%' AND content ILIKE '%{keyword}%'" for keyword in
  46. keywords.split('|')]
  47. sql = f"""
  48. SELECT *
  49. FROM freshrss_toor_entry
  50. WHERE {" OR ".join(conditions)}
  51. AND date > EXTRACT(EPOCH FROM NOW() - INTERVAL '1 day')
  52. ORDER BY date DESC;
  53. """
  54. cur.execute(sql)
  55. records = cur.fetchall()
  56. cur.close()
  57. return records
  58. except Error as e:
  59. print(f"An error occurred: {e}")
  60. return None
  61. def close(self):
  62. """关闭数据库连接"""
  63. if self.conn:
  64. self.conn.close()
  65. def remove_all_html_tags(self, text):
  66. """
  67. 移除字符串中的所有 HTML 标签。
  68. 参数:
  69. text (str): 包含 HTML 标签的原始文本。
  70. 返回:
  71. str: 移除所有 HTML 标签后的文本。
  72. """
  73. clean_text = re.sub(r'<[^>]+>', '', text)
  74. clean_text = clean_text.replace(' ', '')
  75. clean_text = clean_text.replace('\n', '')
  76. if len(clean_text) > self.ellipsis:
  77. clean_text = clean_text[:self.ellipsis] + '...'
  78. return clean_text
  79. def send_email(self, subject='', title='', text=''):
  80. mail_host = "smtp.163.com"
  81. mail_user = "pushmessagebot@163.com"
  82. mail_pass = "WSMSRKBKXIHIQWTU"
  83. sender = "pushmessagebot@163.com"
  84. receivers = ["pushmessagebot@163.com"]
  85. message = MIMEText(text, 'plain', 'utf-8')
  86. message['From'] = Header(title, 'utf-8')
  87. message['To'] = Header("RSS data", 'utf-8')
  88. message['Subject'] = Header(subject, 'utf-8')
  89. try:
  90. smtpObj = smtplib.SMTP_SSL(mail_host)
  91. smtpObj.login(mail_user, mail_pass)
  92. smtpObj.sendmail(sender, receivers, message.as_string())
  93. print(f"{title} 邮件发送成功")
  94. except smtplib.SMTPException as e:
  95. print("Error: 无法发送邮件", e)
  96. def main(self):
  97. # 执行查询
  98. loaded_data = {}
  99. with ThreadPoolExecutor(max_workers=len(self.keys)) as executor:
  100. future_to_key = {executor.submit(self.query_and_process_key, k, v): (k, v) for sublist in self.keys for k, v
  101. in sublist.items()}
  102. for future in as_completed(future_to_key):
  103. key_name, keywords = future_to_key[future]
  104. try:
  105. data = future.result()
  106. if data:
  107. loaded_data[key_name] = {
  108. 'source_key': key_name,
  109. 'keys': keywords,
  110. 'data': data
  111. }
  112. else:
  113. print(f'key: {key_name} 数据为空')
  114. except Exception as exc:
  115. print(f'{key_name} generated an exception: {exc}')
  116. # 关闭数据库连接
  117. self.close()
  118. for source_key, data in loaded_data.items():
  119. subject = 'RSS' + data.get('source_key')
  120. title = source_key
  121. key_data_total = len(data.get('data'))
  122. text = '关键词:\n' + data.get('keys').replace('|', '\n') + '\n\n'
  123. text += '一共搜索到: ' + str(key_data_total) + ' 条数据\n\n'
  124. text += '*' * 80 + '\n'
  125. for d in data.get('data'):
  126. text += '标题: ' + d.get('title') + '\n'
  127. text += '内容: ' + d.get('content') + '\n'
  128. text += '链接: ' + d.get('link') + '\n'
  129. text += '发布日期: ' + d.get('postdate') + '\n'
  130. text += '时间戳: ' + str(d.get('posttimestamp')) + '\n\n'
  131. text += '*' * 80
  132. text += '\n\n'
  133. self.send_email(subject=subject, title=title, text=text)
  134. def query_and_process_key(self, key_name, keywords):
  135. records = self.execute_query(keywords)
  136. if records:
  137. unique_records = {}
  138. for record in records:
  139. title = self.remove_all_html_tags(record[2]) # 获取标题
  140. if title not in unique_records:
  141. unique_records[title] = {
  142. "title": title,
  143. "content": self.remove_all_html_tags(record[4]),
  144. "link": record[5],
  145. "postdate": (datetime.datetime.utcfromtimestamp(record[7])
  146. .strftime('%Y-%m-%d %H:%M:%S')) if record[7] else '',
  147. "posttimestamp": record[7] or 0
  148. }
  149. return list(unique_records.values())
  150. return None
  151. if __name__ == "__main__":
  152. f = FreshRSSDatabase()
  153. f.main()