message_rss_data_handel.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. '''
  2. 读取 frashrss 数据库, 通过关键词过滤数据, 推送到邮箱
  3. '''
  4. import smtplib
  5. from email.mime.text import MIMEText
  6. from email.header import Header
  7. import datetime
  8. import re
  9. import psycopg2
  10. from psycopg2 import Error
  11. class FreshRSSDatabase:
  12. def __init__(self):
  13. self.hostname = 'erhe.top'
  14. self.port = 20788
  15. self.database = 'freshrss'
  16. self.user = 'freshrss'
  17. self.password = 'freshrss'
  18. self.conn = None
  19. self.keys = [
  20. {'web3新闻': 'web3|btc|eth|区块链|NFT|数字币|数字资产|Dapp|DeFi|NFT|稳定币|元宇宙|GameFi|跨链|以太坊'},
  21. {'购物类新闻': '大疆|无人机|硬盘|鼠标|纸巾'}
  22. ]
  23. self.ellipsis = 300
  24. def connect(self):
  25. """连接到 PostgreSQL 数据库"""
  26. try:
  27. self.conn = psycopg2.connect(
  28. dbname=self.database,
  29. user=self.user,
  30. password=self.password,
  31. host=self.hostname,
  32. port=self.port
  33. )
  34. except Error as e:
  35. print(f"Error connecting to the database: {e}")
  36. raise # 重新抛出异常
  37. def execute_query(self, key):
  38. sql = """
  39. SELECT *
  40. FROM freshrss_toor_entry
  41. WHERE title LIKE %s
  42. OR content LIKE %s
  43. AND date > EXTRACT(EPOCH FROM NOW() - INTERVAL '1 day')
  44. ORDER BY date DESC
  45. LIMIT 100;
  46. """
  47. """执行 SQL 查询并返回结果"""
  48. if self.conn is None:
  49. self.connect()
  50. if self.conn is None:
  51. print("Database connection failed")
  52. return None
  53. try:
  54. cur = self.conn.cursor()
  55. cur.execute(sql, ('%' + key + '%', '%' + key + '%'))
  56. records = cur.fetchall()
  57. cur.close()
  58. return records
  59. except Error as e:
  60. print(f"An error occurred: {e}")
  61. return None
  62. def close(self):
  63. """关闭数据库连接"""
  64. if self.conn:
  65. self.conn.close()
  66. def remove_all_html_tags(self, text):
  67. """
  68. 移除字符串中的所有 HTML 标签。
  69. 参数:
  70. text (str): 包含 HTML 标签的原始文本。
  71. 返回:
  72. str: 移除所有 HTML 标签后的文本。
  73. """
  74. clean_text = re.sub(r'<[^>]+>', '', text)
  75. clean_text = clean_text.replace(' ', '')
  76. clean_text = clean_text.replace('\n', '')
  77. if len(clean_text) > self.ellipsis:
  78. clean_text = clean_text[:self.ellipsis] + '...'
  79. return clean_text
  80. def send_email(self, subject='', title='', text=''):
  81. mail_host = "smtp.163.com"
  82. mail_user = "pushmessagebot@163.com"
  83. mail_pass = "WSMSRKBKXIHIQWTU"
  84. sender = "pushmessagebot@163.com"
  85. receivers = ["pushmessagebot@163.com"]
  86. message = MIMEText(text, 'plain', 'utf-8')
  87. message['From'] = Header(title, 'utf-8')
  88. message['To'] = Header("RSS data", 'utf-8')
  89. message['Subject'] = Header(subject, 'utf-8')
  90. try:
  91. smtpObj = smtplib.SMTP_SSL(mail_host)
  92. smtpObj.login(mail_user, mail_pass)
  93. smtpObj.sendmail(sender, receivers, message.as_string())
  94. print("邮件发送成功")
  95. except smtplib.SMTPException as e:
  96. print("Error: 无法发送邮件", e)
  97. def main(self):
  98. # 执行查询
  99. loaded_data = {}
  100. for key_items in self.keys:
  101. for k, v in key_items.items():
  102. print(f'正在搜索 key-name: {k} 数据')
  103. keys = v.split('|')
  104. for key in keys:
  105. print(f'正在搜索 key: {key} 数据')
  106. records = self.execute_query(key)
  107. if records:
  108. for record in records:
  109. title = self.remove_all_html_tags(record[2])
  110. text = self.remove_all_html_tags(record[4])
  111. link = record[5]
  112. postdate = (datetime.datetime.utcfromtimestamp(record[7]).
  113. strftime('%Y-%m-%d %H:%M:%S')) if record[7] else ''
  114. posttimestamp = record[7] or 0
  115. if k not in loaded_data:
  116. loaded_data[k] = {
  117. 'source_key': k,
  118. 'keys': v,
  119. 'data': [{
  120. "key": key,
  121. "title": title,
  122. "content": text,
  123. "link": link,
  124. "postdate": postdate
  125. }]
  126. }
  127. else:
  128. loaded_data[k]['data'].append({
  129. "title": title,
  130. "content": text,
  131. "link": link,
  132. "postdate": postdate,
  133. "posttimestamp": posttimestamp
  134. })
  135. else:
  136. print(f'key: {key} 数据为空')
  137. # 关闭数据库连接
  138. self.close()
  139. for source_key, data in loaded_data.items():
  140. subject = 'RSS' + data.get('source_key')
  141. title = 'message bot'
  142. key_data_total = len(data.get('data'))
  143. text = '关键词: ' + data.get('keys') + '\n'
  144. text += '一共搜索到: ' + str(key_data_total) + ' 条数据\n'
  145. text += '*' * 120 + '\n'
  146. for d in data.get('data'):
  147. text += '标题: ' + d.get('title') + '\n'
  148. text += '内容: ' + d.get('content') + '\n'
  149. if d.get('key'):
  150. text += '关键词: ' + d.get('key') + '\n'
  151. text += '链接: ' + d.get('link') + '\n'
  152. text += '发布日期: ' + d.get('postdate') + '\n'
  153. text += '时间戳: ' + str(d.get('posttimestamp')) + '\n'
  154. text += '*' * 120
  155. text += '\n\n'
  156. self.send_email(subject=subject, title=title, text=text)
  157. if __name__ == "__main__":
  158. f = FreshRSSDatabase()
  159. f.main()