rss_data.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import smtplib
  2. from email.mime.text import MIMEText
  3. from email.header import Header
  4. import datetime
  5. import re
  6. import psycopg2
  7. from psycopg2 import Error
  8. from concurrent.futures import ThreadPoolExecutor, as_completed
  9. class FreshRSSDatabase:
  10. def __init__(self):
  11. self.hostname = 'erhe.top'
  12. self.port = 20788
  13. self.database = 'freshrss'
  14. self.user = 'freshrss'
  15. self.password = 'freshrss'
  16. self.conn = None
  17. self.keys = [
  18. {'web3新闻': 'web3|区块链|NFT|DeFi|NFT'},
  19. {'购物类新闻': '大疆|无人机|硬盘|鼠标|纸巾|穿越机|礼物'},
  20. {'coin新闻': 'btc|eth|sui|degen'}
  21. ]
  22. self.ellipsis = 300
  23. self.days = 3
  24. def connect(self):
  25. """连接到 PostgreSQL 数据库"""
  26. try:
  27. self.conn = psycopg2.connect(
  28. dbname=self.database,
  29. user=self.user,
  30. password=self.password,
  31. host=self.hostname,
  32. port=self.port
  33. )
  34. except Error as e:
  35. print(f"Error connecting to the database: {e}")
  36. raise # 重新抛出异常
  37. def execute_query(self, keywords):
  38. """执行 SQL 查询并返回结果"""
  39. if self.conn is None:
  40. self.connect()
  41. if self.conn is None:
  42. print("Database connection failed")
  43. return None
  44. try:
  45. cur = self.conn.cursor()
  46. conditions = [f"title ILIKE '%{keyword}%' AND content ILIKE '%{keyword}%'" for keyword in
  47. keywords.split('|')]
  48. sql = f"""
  49. SELECT *
  50. FROM freshrss_toor_entry
  51. WHERE {" OR ".join(conditions)}
  52. AND date > EXTRACT(EPOCH FROM NOW() - INTERVAL '1 day')
  53. ORDER BY date DESC;
  54. """
  55. cur.execute(sql)
  56. records = cur.fetchall()
  57. cur.close()
  58. return records
  59. except Error as e:
  60. print(f"An error occurred: {e}")
  61. return None
  62. def close(self):
  63. """关闭数据库连接"""
  64. if self.conn:
  65. self.conn.close()
  66. def remove_all_html_tags(self, text):
  67. """
  68. 移除字符串中的所有 HTML 标签。
  69. 参数:
  70. text (str): 包含 HTML 标签的原始文本。
  71. 返回:
  72. str: 移除所有 HTML 标签后的文本。
  73. """
  74. clean_text = re.sub(r'<[^>]+>', '', text)
  75. clean_text = clean_text.replace(' ', '')
  76. clean_text = clean_text.replace('\n', '')
  77. if len(clean_text) > self.ellipsis:
  78. clean_text = clean_text[:self.ellipsis] + '...'
  79. return clean_text
  80. def send_email(self, subject='', title='', text=''):
  81. mail_host = "smtp.163.com"
  82. mail_user = "pushmessagebot@163.com"
  83. mail_pass = "WSMSRKBKXIHIQWTU"
  84. sender = "pushmessagebot@163.com"
  85. receivers = ["pushmessagebot@163.com"]
  86. message = MIMEText(text, 'plain', 'utf-8')
  87. message['From'] = Header(title, 'utf-8')
  88. message['To'] = Header("RSS data", 'utf-8')
  89. message['Subject'] = Header(subject, 'utf-8')
  90. try:
  91. smtpObj = smtplib.SMTP_SSL(mail_host)
  92. smtpObj.login(mail_user, mail_pass)
  93. smtpObj.sendmail(sender, receivers, message.as_string())
  94. print(f"{title} 邮件发送成功")
  95. except smtplib.SMTPException as e:
  96. print("Error: 无法发送邮件", e)
  97. def query_and_process_key(self, key_name, keywords):
  98. records = self.execute_query(keywords)
  99. if records:
  100. unique_records = {}
  101. for record in records:
  102. title = self.remove_all_html_tags(record[2]) # 获取标题
  103. if title not in unique_records:
  104. unique_records[title] = {
  105. "title": title,
  106. "content": self.remove_all_html_tags(record[4]),
  107. "link": record[5],
  108. "postdate": (datetime.datetime.utcfromtimestamp(record[7])
  109. .strftime('%Y-%m-%d %H:%M:%S')) if record[7] else '',
  110. "posttimestamp": record[7] or 0
  111. }
  112. return list(unique_records.values())
  113. return None
  114. def prepare_to_send(self, data):
  115. source_key = data.get('source_key')
  116. keys = data.get('keys')
  117. data_list = data.get('data')
  118. filter_data = []
  119. # 计算过去一天的时间戳
  120. one_day_ago = datetime.datetime.now() - datetime.timedelta(days=self.days)
  121. # 将 datetime 对象转换为时间戳
  122. one_day_ago_timestamp = one_day_ago.timestamp()
  123. for value in data_list:
  124. if value['posttimestamp'] >= one_day_ago_timestamp:
  125. filter_data.append(value)
  126. sorted_list = sorted(filter_data, key=lambda x: x['posttimestamp'], reverse=True)
  127. subject = 'RSS' + data.get('source_key')
  128. title = source_key
  129. key_data_total = len(data.get('data'))
  130. text = '关键词:\n' + data.get('keys').replace('|', '\n') + '\n\n'
  131. text += '一共搜索到: ' + str(key_data_total) + ' 条数据\n\n'
  132. text += '*' * 80 + '\n'
  133. for d in sorted_list:
  134. text += '标题: ' + d.get('title') + '\n'
  135. text += '内容: ' + d.get('content') + '\n'
  136. text += '链接: ' + d.get('link') + '\n'
  137. text += '发布日期: ' + d.get('postdate') + '\n'
  138. text += '时间戳: ' + str(d.get('posttimestamp')) + '\n\n'
  139. text += '*' * 80
  140. text += '\n\n'
  141. self.send_email(subject=subject, title=title, text=text)
  142. def main(self):
  143. # 执行查询
  144. loaded_data = {}
  145. with ThreadPoolExecutor(max_workers=len(self.keys)) as executor:
  146. future_to_key = {executor.submit(self.query_and_process_key, k, v): (k, v) for sublist in self.keys for k, v
  147. in sublist.items()}
  148. for future in as_completed(future_to_key):
  149. key_name, keywords = future_to_key[future]
  150. try:
  151. data = future.result()
  152. if data:
  153. loaded_data[key_name] = {
  154. 'source_key': key_name,
  155. 'keys': keywords,
  156. 'data': data
  157. }
  158. else:
  159. print(f'key: {key_name} 数据为空')
  160. except Exception as exc:
  161. print(f'{key_name} generated an exception: {exc}')
  162. # 关闭数据库连接
  163. self.close()
  164. for source_key, data in loaded_data.items():
  165. self.prepare_to_send(data)
  166. print('done!')
  167. if __name__ == "__main__":
  168. f = FreshRSSDatabase()
  169. f.main()