message_rss_data_handel.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import smtplib
  2. from email.mime.text import MIMEText
  3. from email.header import Header
  4. import datetime
  5. import re
  6. import psycopg2
  7. from psycopg2 import Error
  8. class FreshRSSDatabase:
  9. def __init__(self):
  10. self.hostname = 'erhe.top'
  11. self.port = 20788
  12. self.database = 'freshrss'
  13. self.user = 'freshrss'
  14. self.password = 'freshrss'
  15. self.conn = None
  16. self.keys = [
  17. {'web3新闻': 'web3|btc|eth|区块链|NFT|数字币|数字资产|Dapp|DeFi|NFT|稳定币|元宇宙|GameFi|跨链|以太坊'},
  18. {'购物类新闻': '大疆|无人机|硬盘|鼠标|纸巾'}
  19. ]
  20. self.ellipsis = 300
  21. def connect(self):
  22. """连接到 PostgreSQL 数据库"""
  23. try:
  24. self.conn = psycopg2.connect(
  25. dbname=self.database,
  26. user=self.user,
  27. password=self.password,
  28. host=self.hostname,
  29. port=self.port
  30. )
  31. except Error as e:
  32. print(f"Error connecting to the database: {e}")
  33. raise # 重新抛出异常
  34. def execute_query(self, key):
  35. sql = """
  36. SELECT *
  37. FROM freshrss_toor_entry
  38. WHERE title LIKE %s
  39. OR content LIKE %s
  40. AND date > EXTRACT(EPOCH FROM NOW() - INTERVAL '1 day')
  41. ORDER BY date DESC
  42. LIMIT 100;
  43. """
  44. """执行 SQL 查询并返回结果"""
  45. if self.conn is None:
  46. self.connect()
  47. if self.conn is None:
  48. print("Database connection failed")
  49. return None
  50. try:
  51. cur = self.conn.cursor()
  52. cur.execute(sql, ('%' + key + '%', '%' + key + '%'))
  53. records = cur.fetchall()
  54. cur.close()
  55. return records
  56. except Error as e:
  57. print(f"An error occurred: {e}")
  58. return None
  59. def close(self):
  60. """关闭数据库连接"""
  61. if self.conn:
  62. self.conn.close()
  63. def remove_all_html_tags(self, text):
  64. """
  65. 移除字符串中的所有 HTML 标签。
  66. 参数:
  67. text (str): 包含 HTML 标签的原始文本。
  68. 返回:
  69. str: 移除所有 HTML 标签后的文本。
  70. """
  71. clean_text = re.sub(r'<[^>]+>', '', text)
  72. clean_text = clean_text.replace(' ', '')
  73. clean_text = clean_text.replace('\n', '')
  74. if len(clean_text) > self.ellipsis:
  75. clean_text = clean_text[:self.ellipsis] + '...'
  76. return clean_text
  77. def send_email(self, subject='', title='', text=''):
  78. mail_host = "smtp.163.com"
  79. mail_user = "pushmessagebot@163.com"
  80. mail_pass = "WSMSRKBKXIHIQWTU"
  81. sender = "pushmessagebot@163.com"
  82. receivers = ["pushmessagebot@163.com"]
  83. message = MIMEText(text, 'plain', 'utf-8')
  84. message['From'] = Header(title, 'utf-8')
  85. message['To'] = Header("RSS data", 'utf-8')
  86. message['Subject'] = Header(subject, 'utf-8')
  87. try:
  88. smtpObj = smtplib.SMTP_SSL(mail_host)
  89. smtpObj.login(mail_user, mail_pass)
  90. smtpObj.sendmail(sender, receivers, message.as_string())
  91. print("邮件发送成功")
  92. except smtplib.SMTPException as e:
  93. print("Error: 无法发送邮件", e)
  94. def main(self):
  95. # 执行查询
  96. loaded_data = {}
  97. for key_items in self.keys:
  98. for k, v in key_items.items():
  99. print(f'正在搜索 key-name: {k} 数据')
  100. keys = v.split('|')
  101. for key in keys:
  102. print(f'正在搜索 key: {key} 数据')
  103. records = self.execute_query(key)
  104. if records:
  105. for record in records:
  106. title = self.remove_all_html_tags(record[2])
  107. text = self.remove_all_html_tags(record[4])
  108. link = record[5]
  109. postdate = (datetime.datetime.utcfromtimestamp(record[7]).
  110. strftime('%Y-%m-%d %H:%M:%S')) if record[7] else ''
  111. posttimestamp = record[7] or 0
  112. if k not in loaded_data:
  113. loaded_data[k] = {
  114. 'source_key': k,
  115. 'keys': v,
  116. 'data': [{
  117. "key": key,
  118. "title": title,
  119. "content": text,
  120. "link": link,
  121. "postdate": postdate
  122. }]
  123. }
  124. else:
  125. loaded_data[k]['data'].append({
  126. "title": title,
  127. "content": text,
  128. "link": link,
  129. "postdate": postdate,
  130. "posttimestamp": posttimestamp
  131. })
  132. else:
  133. print(f'key: {key} 数据为空')
  134. # 关闭数据库连接
  135. self.close()
  136. for source_key, data in loaded_data.items():
  137. subject = 'RSS' + data.get('source_key')
  138. title = 'message bot'
  139. key_data_total = len(data.get('data'))
  140. text = '关键词: ' + data.get('keys') + '\n'
  141. text += '一共搜索到: ' + str(key_data_total) + ' 条数据\n'
  142. text += '*' * 120 + '\n'
  143. for d in data.get('data'):
  144. text += '标题: ' + d.get('title') + '\n'
  145. text += '内容: ' + d.get('content') + '\n'
  146. if d.get('key'):
  147. text += '关键词: ' + d.get('key') + '\n'
  148. text += '链接: ' + d.get('link') + '\n'
  149. text += '发布日期: ' + d.get('postdate') + '\n'
  150. text += '时间戳: ' + str(d.get('posttimestamp')) + '\n'
  151. text += '*' * 120
  152. text += '\n\n'
  153. self.send_email(subject=subject, title=title, text=text)
  154. if __name__ == "__main__":
  155. f = FreshRSSDatabase()
  156. f.main()