| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- import smtplib
- from email.mime.text import MIMEText
- from email.header import Header
- import datetime
- import re
- import psycopg2
- from psycopg2 import Error
- from concurrent.futures import ThreadPoolExecutor, as_completed
- class FreshRSSDatabase:
- def __init__(self):
- self.hostname = 'erhe.top'
- self.port = 20788
- self.database = 'freshrss'
- self.user = 'freshrss'
- self.password = 'freshrss'
- self.conn = None
- self.keys = [
- {'web3新闻': 'web3|区块链|NFT|DeFi|NFT'},
- {'购物类新闻': '大疆|无人机|硬盘|鼠标|纸巾|穿越机|礼物'},
- {'coin新闻': 'btc|eth|sui|degen'}
- ]
- self.ellipsis = 300
- self.days = 3
- def connect(self):
- """连接到 PostgreSQL 数据库"""
- try:
- self.conn = psycopg2.connect(
- dbname=self.database,
- user=self.user,
- password=self.password,
- host=self.hostname,
- port=self.port
- )
- except Error as e:
- print(f"Error connecting to the database: {e}")
- raise # 重新抛出异常
- def execute_query(self, keywords):
- """执行 SQL 查询并返回结果"""
- if self.conn is None:
- self.connect()
- if self.conn is None:
- print("Database connection failed")
- return None
- try:
- cur = self.conn.cursor()
- conditions = [f"title ILIKE '%{keyword}%' AND content ILIKE '%{keyword}%'" for keyword in
- keywords.split('|')]
- sql = f"""
- SELECT *
- FROM freshrss_toor_entry
- WHERE {" OR ".join(conditions)}
- AND date > EXTRACT(EPOCH FROM NOW() - INTERVAL '1 day')
- ORDER BY date DESC;
- """
- cur.execute(sql)
- records = cur.fetchall()
- cur.close()
- return records
- except Error as e:
- print(f"An error occurred: {e}")
- return None
- def close(self):
- """关闭数据库连接"""
- if self.conn:
- self.conn.close()
- def remove_all_html_tags(self, text):
- """
- 移除字符串中的所有 HTML 标签。
- 参数:
- text (str): 包含 HTML 标签的原始文本。
- 返回:
- str: 移除所有 HTML 标签后的文本。
- """
- clean_text = re.sub(r'<[^>]+>', '', text)
- clean_text = clean_text.replace(' ', '')
- clean_text = clean_text.replace('\n', '')
- if len(clean_text) > self.ellipsis:
- clean_text = clean_text[:self.ellipsis] + '...'
- return clean_text
- def send_email(self, subject='', title='', text=''):
- mail_host = "smtp.163.com"
- mail_user = "pushmessagebot@163.com"
- mail_pass = "WSMSRKBKXIHIQWTU"
- sender = "pushmessagebot@163.com"
- receivers = ["pushmessagebot@163.com"]
- message = MIMEText(text, 'plain', 'utf-8')
- message['From'] = Header(title, 'utf-8')
- message['To'] = Header("RSS data", 'utf-8')
- message['Subject'] = Header(subject, 'utf-8')
- try:
- smtpObj = smtplib.SMTP_SSL(mail_host)
- smtpObj.login(mail_user, mail_pass)
- smtpObj.sendmail(sender, receivers, message.as_string())
- print(f"{title} 邮件发送成功")
- except smtplib.SMTPException as e:
- print("Error: 无法发送邮件", e)
- def query_and_process_key(self, key_name, keywords):
- records = self.execute_query(keywords)
- if records:
- unique_records = {}
- for record in records:
- title = self.remove_all_html_tags(record[2]) # 获取标题
- if title not in unique_records:
- unique_records[title] = {
- "title": title,
- "content": self.remove_all_html_tags(record[4]),
- "link": record[5],
- "postdate": (datetime.datetime.utcfromtimestamp(record[7])
- .strftime('%Y-%m-%d %H:%M:%S')) if record[7] else '',
- "posttimestamp": record[7] or 0
- }
- return list(unique_records.values())
- return None
- def prepare_to_send(self, data):
- source_key = data.get('source_key')
- keys = data.get('keys')
- data_list = data.get('data')
- filter_data = []
- # 计算过去一天的时间戳
- one_day_ago = datetime.datetime.now() - datetime.timedelta(days=self.days)
- # 将 datetime 对象转换为时间戳
- one_day_ago_timestamp = one_day_ago.timestamp()
- for value in data_list:
- if value['posttimestamp'] >= one_day_ago_timestamp:
- filter_data.append(value)
- sorted_list = sorted(filter_data, key=lambda x: x['posttimestamp'], reverse=True)
- subject = 'RSS' + data.get('source_key')
- title = source_key
- key_data_total = len(data.get('data'))
- text = '关键词:\n' + data.get('keys').replace('|', '\n') + '\n\n'
- text += '一共搜索到: ' + str(key_data_total) + ' 条数据\n\n'
- text += '*' * 80 + '\n'
- for d in sorted_list:
- text += '标题: ' + d.get('title') + '\n'
- text += '内容: ' + d.get('content') + '\n'
- text += '链接: ' + d.get('link') + '\n'
- text += '发布日期: ' + d.get('postdate') + '\n'
- text += '时间戳: ' + str(d.get('posttimestamp')) + '\n\n'
- text += '*' * 80
- text += '\n\n'
- self.send_email(subject=subject, title=title, text=text)
- def main(self):
- # 执行查询
- loaded_data = {}
- with ThreadPoolExecutor(max_workers=len(self.keys)) as executor:
- future_to_key = {executor.submit(self.query_and_process_key, k, v): (k, v) for sublist in self.keys for k, v
- in sublist.items()}
- for future in as_completed(future_to_key):
- key_name, keywords = future_to_key[future]
- try:
- data = future.result()
- if data:
- loaded_data[key_name] = {
- 'source_key': key_name,
- 'keys': keywords,
- 'data': data
- }
- else:
- print(f'key: {key_name} 数据为空')
- except Exception as exc:
- print(f'{key_name} generated an exception: {exc}')
- # 关闭数据库连接
- self.close()
- for source_key, data in loaded_data.items():
- self.prepare_to_send(data)
- print('done!')
- if __name__ == "__main__":
- f = FreshRSSDatabase()
- f.main()
|