import smtplib from email.mime.text import MIMEText from email.header import Header import datetime import re import psycopg2 from psycopg2 import Error class FreshRSSDatabase: def __init__(self): self.hostname = 'erhe.top' self.port = 20788 self.database = 'freshrss' self.user = 'freshrss' self.password = 'freshrss' self.conn = None self.keys = [ {'web3新闻': 'web3|btc|eth|区块链|NFT|数字币|数字资产|Dapp|DeFi|NFT|稳定币|元宇宙|GameFi|跨链|以太坊'}, {'购物类新闻': '大疆|无人机|硬盘|鼠标|纸巾'} ] self.ellipsis = 300 def connect(self): """连接到 PostgreSQL 数据库""" try: self.conn = psycopg2.connect( dbname=self.database, user=self.user, password=self.password, host=self.hostname, port=self.port ) except Error as e: print(f"Error connecting to the database: {e}") if not self.conn: raise Exception("Database connection failed") def execute_query(self, key): sql = f""" SELECT * FROM freshrss_toor_entry WHERE title LIKE %s OR content LIKE %s AND "date" > EXTRACT(EPOCH FROM NOW() - INTERVAL '1 day') ORDER BY "date" DESC LIMIT 100; """ """执行 SQL 查询并返回结果""" if self.conn is None: self.connect() try: cur = self.conn.cursor() cur.execute(sql, ('%' + key + '%', '%' + key + '%')) records = cur.fetchall() cur.close() return records except Error as e: print(f"An error occurred: {e}") return None def close(self): """关闭数据库连接""" if self.conn: self.conn.close() def remove_all_html_tags(self, text): """ 移除字符串中的所有 HTML 标签。 参数: text (str): 包含 HTML 标签的原始文本。 返回: str: 移除所有 HTML 标签后的文本。 """ # 移除所有 HTML 标签 clean_text = re.sub(r'<[^>]+>', '', text) clean_text = clean_text.replace(' ', '') clean_text = clean_text.replace('\n', '') if len(clean_text) > self.ellipsis: clean_text = clean_text[:self.ellipsis] + '...' return clean_text def send_email(self, subject='', title='', text=''): mail_host = "smtp.163.com" mail_user = "pushmessagebot@163.com" mail_pass = "WSMSRKBKXIHIQWTU" sender = "pushmessagebot@163.com" receivers = ["pushmessagebot@163.com"] message = MIMEText(text, 'plain', 'utf-8') message['From'] = Header(title, 'utf-8') message['To'] = Header("RSS data", 'utf-8') message['Subject'] = Header(subject, 'utf-8') try: smtpObj = smtplib.SMTP_SSL(mail_host) smtpObj.login(mail_user, mail_pass) smtpObj.sendmail(sender, receivers, message.as_string()) print("邮件发送成功") except smtplib.SMTPException as e: print("Error: 无法发送邮件", e) def main(self): # 执行查询 loaded_data = {} for key_items in self.keys: for k, v in key_items.items(): print(f'正在搜索 key-name: {k} 数据') keys = v.split('|') for key in keys: print(f'正在搜索 key: {key} 数据') records = self.execute_query(key) if records: for record in records: title = self.remove_all_html_tags(record[2]) text = self.remove_all_html_tags(record[4]) link = record[5] postdate = (datetime.datetime.utcfromtimestamp(record[7]). strftime('%Y-%m-%d %H:%M:%S')) if record[7] else '' posttimestamp = record[7] or 0 if k not in loaded_data: loaded_data[k] = { 'source_key': k, 'keys': v, 'data': [{ "key": key, "title": title, "content": text, "link": link, "postdate": postdate }] } else: loaded_data[k]['data'].append({ "title": title, "content": text, "link": link, "postdate": postdate, "posttimestamp": posttimestamp }) else: print(f'key: {key} 数据为空') # 关闭数据库连接 self.close() for source_key, data in loaded_data.items(): subject = 'RSS' + data.get('source_key') title = 'message bot' key_data_total = len(data.get('data')) text = '关键词: ' + data.get('keys') + '\n' text += '一共搜索到: ' + str(key_data_total) + ' 条数据\n' text += '*' * 120 + '\n' for d in data.get('data'): text += '标题: ' + d.get('title') + '\n' text += '内容: ' + d.get('content') + '\n' if d.get('key'): text += '关键词: ' + d.get('key') + '\n' text += '链接: ' + d.get('link') + '\n' text += '发布日期: ' + d.get('postdate') + '\n' text += '时间戳: ' + str(d.get('posttimestamp')) + '\n' text += '*' * 120 text += '\n\n' self.send_email(subject=subject, title=title, text=text) if __name__ == "__main__": f = FreshRSSDatabase() f.main()