message_rss_data_handel.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. import smtplib
  2. from email.mime.text import MIMEText
  3. from email.header import Header
  4. import datetime
  5. import re
  6. import psycopg2
  7. from psycopg2 import Error
  8. from concurrent.futures import ThreadPoolExecutor, as_completed
  9. class FreshRSSDatabase:
  10. def __init__(self):
  11. self.hostname = 'erhe.top'
  12. self.port = 20788
  13. self.database = 'freshrss'
  14. self.user = 'freshrss'
  15. self.password = 'freshrss'
  16. self.conn = None
  17. self.keys = [
  18. {'web3新闻': 'web3|区块链|NFT|DeFi|NFT'},
  19. {'购物类新闻': '大疆|无人机|硬盘|鼠标|纸巾|穿越机'},
  20. {'coin新闻': 'btc|eth|sui|degen'}
  21. ]
  22. self.ellipsis = 300
  23. def connect(self):
  24. """连接到 PostgreSQL 数据库"""
  25. try:
  26. self.conn = psycopg2.connect(
  27. dbname=self.database,
  28. user=self.user,
  29. password=self.password,
  30. host=self.hostname,
  31. port=self.port
  32. )
  33. except Error as e:
  34. print(f"Error connecting to the database: {e}")
  35. raise # 重新抛出异常
  36. def execute_query(self, keywords):
  37. """执行 SQL 查询并返回结果"""
  38. if self.conn is None:
  39. self.connect()
  40. if self.conn is None:
  41. print("Database connection failed")
  42. return None
  43. try:
  44. # 计算过去一天的时间戳
  45. one_day_ago = datetime.datetime.now() - datetime.timedelta(days=1)
  46. # 将 datetime 对象转换为时间戳
  47. one_day_ago_timestamp = one_day_ago.timestamp()
  48. cur = self.conn.cursor()
  49. conditions = [f"title ILIKE '%{keyword}%' AND content ILIKE '%{keyword}%'" for keyword in
  50. keywords.split('|')]
  51. sql = f"""
  52. SELECT *
  53. FROM freshrss_toor_entry
  54. WHERE {" OR ".join(conditions)}
  55. AND date > EXTRACT(EPOCH FROM NOW() - INTERVAL '1 day')
  56. ORDER BY date DESC;
  57. """
  58. cur.execute(sql)
  59. records = cur.fetchall()
  60. cur.close()
  61. return records
  62. except Error as e:
  63. print(f"An error occurred: {e}")
  64. return None
  65. def close(self):
  66. """关闭数据库连接"""
  67. if self.conn:
  68. self.conn.close()
  69. def remove_all_html_tags(self, text):
  70. """
  71. 移除字符串中的所有 HTML 标签。
  72. 参数:
  73. text (str): 包含 HTML 标签的原始文本。
  74. 返回:
  75. str: 移除所有 HTML 标签后的文本。
  76. """
  77. clean_text = re.sub(r'<[^>]+>', '', text)
  78. clean_text = clean_text.replace(' ', '')
  79. clean_text = clean_text.replace('\n', '')
  80. if len(clean_text) > self.ellipsis:
  81. clean_text = clean_text[:self.ellipsis] + '...'
  82. return clean_text
  83. def send_email(self, subject='', title='', text=''):
  84. mail_host = "smtp.163.com"
  85. mail_user = "pushmessagebot@163.com"
  86. mail_pass = "WSMSRKBKXIHIQWTU"
  87. sender = "pushmessagebot@163.com"
  88. receivers = ["pushmessagebot@163.com"]
  89. message = MIMEText(text, 'plain', 'utf-8')
  90. message['From'] = Header(title, 'utf-8')
  91. message['To'] = Header("RSS data", 'utf-8')
  92. message['Subject'] = Header(subject, 'utf-8')
  93. try:
  94. smtpObj = smtplib.SMTP_SSL(mail_host)
  95. smtpObj.login(mail_user, mail_pass)
  96. smtpObj.sendmail(sender, receivers, message.as_string())
  97. print(f"{title} 邮件发送成功")
  98. except smtplib.SMTPException as e:
  99. print("Error: 无法发送邮件", e)
  100. def main(self):
  101. # 执行查询
  102. loaded_data = {}
  103. with ThreadPoolExecutor(max_workers=len(self.keys)) as executor:
  104. future_to_key = {executor.submit(self.query_and_process_key, k, v): (k, v) for sublist in self.keys for k, v
  105. in sublist.items()}
  106. for future in as_completed(future_to_key):
  107. key_name, keywords = future_to_key[future]
  108. try:
  109. data = future.result()
  110. if data:
  111. loaded_data[key_name] = {
  112. 'source_key': key_name,
  113. 'keys': keywords,
  114. 'data': data
  115. }
  116. else:
  117. print(f'key: {key_name} 数据为空')
  118. except Exception as exc:
  119. print(f'{key_name} generated an exception: {exc}')
  120. # 关闭数据库连接
  121. self.close()
  122. for source_key, data in loaded_data.items():
  123. subject = 'RSS' + data.get('source_key')
  124. title = source_key
  125. key_data_total = len(data.get('data'))
  126. text = '关键词:\n' + data.get('keys').replace('|', '\n') + '\n\n'
  127. text += '一共搜索到: ' + str(key_data_total) + ' 条数据\n\n'
  128. text += '*' * 80 + '\n'
  129. for d in data.get('data'):
  130. text += '标题: ' + d.get('title') + '\n'
  131. text += '内容: ' + d.get('content') + '\n'
  132. text += '链接: ' + d.get('link') + '\n'
  133. text += '发布日期: ' + d.get('postdate') + '\n'
  134. text += '时间戳: ' + str(d.get('posttimestamp')) + '\n\n'
  135. text += '*' * 80
  136. text += '\n\n'
  137. self.send_email(subject=subject, title=title, text=text)
  138. def query_and_process_key(self, key_name, keywords):
  139. records = self.execute_query(keywords)
  140. if records:
  141. unique_records = {}
  142. for record in records:
  143. title = self.remove_all_html_tags(record[2]) # 获取标题
  144. if title not in unique_records:
  145. unique_records[title] = {
  146. "title": title,
  147. "content": self.remove_all_html_tags(record[4]),
  148. "link": record[5],
  149. "postdate": (datetime.datetime.utcfromtimestamp(record[7])
  150. .strftime('%Y-%m-%d %H:%M:%S')) if record[7] else '',
  151. "posttimestamp": record[7] or 0
  152. }
  153. return list(unique_records.values())
  154. return None
  155. if __name__ == "__main__":
  156. f = FreshRSSDatabase()
  157. f.main()