|
|
@@ -1,6 +1,3 @@
|
|
|
-'''
|
|
|
-读取 frashrss 数据库, 通过关键词过滤数据, 推送到邮箱
|
|
|
-'''
|
|
|
import smtplib
|
|
|
from email.mime.text import MIMEText
|
|
|
from email.header import Header
|
|
|
@@ -8,6 +5,7 @@ import datetime
|
|
|
import re
|
|
|
import psycopg2
|
|
|
from psycopg2 import Error
|
|
|
+from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
|
|
|
|
|
class FreshRSSDatabase:
|
|
|
@@ -19,8 +17,9 @@ class FreshRSSDatabase:
|
|
|
self.password = 'freshrss'
|
|
|
self.conn = None
|
|
|
self.keys = [
|
|
|
- {'web3新闻': 'web3|btc|eth|区块链|NFT|数字币|数字资产|Dapp|DeFi|NFT|稳定币|元宇宙|GameFi|跨链|以太坊'},
|
|
|
- {'购物类新闻': '大疆|无人机|硬盘|鼠标|纸巾'}
|
|
|
+ {'web3新闻': 'web3|区块链|NFT|DeFi|NFT'},
|
|
|
+ {'购物类新闻': '大疆|无人机|硬盘|鼠标|纸巾|穿越机'},
|
|
|
+ {'coin新闻': 'btc|eth|sui|degen'}
|
|
|
]
|
|
|
self.ellipsis = 300
|
|
|
|
|
|
@@ -38,17 +37,7 @@ class FreshRSSDatabase:
|
|
|
print(f"Error connecting to the database: {e}")
|
|
|
raise # 重新抛出异常
|
|
|
|
|
|
- def execute_query(self, key):
|
|
|
- sql = """
|
|
|
- SELECT *
|
|
|
- FROM freshrss_toor_entry
|
|
|
- WHERE title LIKE %s
|
|
|
- OR content LIKE %s
|
|
|
- AND date > EXTRACT(EPOCH FROM NOW() - INTERVAL '1 day')
|
|
|
- ORDER BY date DESC
|
|
|
- LIMIT 100;
|
|
|
- """
|
|
|
-
|
|
|
+ def execute_query(self, keywords):
|
|
|
"""执行 SQL 查询并返回结果"""
|
|
|
if self.conn is None:
|
|
|
self.connect()
|
|
|
@@ -57,7 +46,16 @@ class FreshRSSDatabase:
|
|
|
return None
|
|
|
try:
|
|
|
cur = self.conn.cursor()
|
|
|
- cur.execute(sql, ('%' + key + '%', '%' + key + '%'))
|
|
|
+ conditions = [f"title ILIKE '%{keyword}%' AND content ILIKE '%{keyword}%'" for keyword in
|
|
|
+ keywords.split('|')]
|
|
|
+ sql = f"""
|
|
|
+ SELECT *
|
|
|
+ FROM freshrss_toor_entry
|
|
|
+ WHERE {" OR ".join(conditions)}
|
|
|
+ AND date > EXTRACT(EPOCH FROM NOW() - INTERVAL '1 day')
|
|
|
+ ORDER BY date DESC;
|
|
|
+ """
|
|
|
+ cur.execute(sql)
|
|
|
records = cur.fetchall()
|
|
|
cur.close()
|
|
|
return records
|
|
|
@@ -104,75 +102,71 @@ class FreshRSSDatabase:
|
|
|
smtpObj = smtplib.SMTP_SSL(mail_host)
|
|
|
smtpObj.login(mail_user, mail_pass)
|
|
|
smtpObj.sendmail(sender, receivers, message.as_string())
|
|
|
- print("邮件发送成功")
|
|
|
+ print(f"{title} 邮件发送成功")
|
|
|
except smtplib.SMTPException as e:
|
|
|
print("Error: 无法发送邮件", e)
|
|
|
|
|
|
def main(self):
|
|
|
# 执行查询
|
|
|
loaded_data = {}
|
|
|
- for key_items in self.keys:
|
|
|
- for k, v in key_items.items():
|
|
|
- print(f'正在搜索 key-name: {k} 数据')
|
|
|
- keys = v.split('|')
|
|
|
- for key in keys:
|
|
|
- print(f'正在搜索 key: {key} 数据')
|
|
|
- records = self.execute_query(key)
|
|
|
- if records:
|
|
|
- for record in records:
|
|
|
- title = self.remove_all_html_tags(record[2])
|
|
|
- text = self.remove_all_html_tags(record[4])
|
|
|
- link = record[5]
|
|
|
- postdate = (datetime.datetime.utcfromtimestamp(record[7]).
|
|
|
- strftime('%Y-%m-%d %H:%M:%S')) if record[7] else ''
|
|
|
- posttimestamp = record[7] or 0
|
|
|
- if k not in loaded_data:
|
|
|
- loaded_data[k] = {
|
|
|
- 'source_key': k,
|
|
|
- 'keys': v,
|
|
|
- 'data': [{
|
|
|
- "key": key,
|
|
|
- "title": title,
|
|
|
- "content": text,
|
|
|
- "link": link,
|
|
|
- "postdate": postdate
|
|
|
- }]
|
|
|
- }
|
|
|
- else:
|
|
|
- loaded_data[k]['data'].append({
|
|
|
- "title": title,
|
|
|
- "content": text,
|
|
|
- "link": link,
|
|
|
- "postdate": postdate,
|
|
|
- "posttimestamp": posttimestamp
|
|
|
- })
|
|
|
+ with ThreadPoolExecutor(max_workers=len(self.keys)) as executor:
|
|
|
+ future_to_key = {executor.submit(self.query_and_process_key, k, v): (k, v) for sublist in self.keys for k, v
|
|
|
+ in sublist.items()}
|
|
|
+ for future in as_completed(future_to_key):
|
|
|
+ key_name, keywords = future_to_key[future]
|
|
|
+ try:
|
|
|
+ data = future.result()
|
|
|
+ if data:
|
|
|
+ loaded_data[key_name] = {
|
|
|
+ 'source_key': key_name,
|
|
|
+ 'keys': keywords,
|
|
|
+ 'data': data
|
|
|
+ }
|
|
|
else:
|
|
|
- print(f'key: {key} 数据为空')
|
|
|
+ print(f'key: {key_name} 数据为空')
|
|
|
+ except Exception as exc:
|
|
|
+ print(f'{key_name} generated an exception: {exc}')
|
|
|
|
|
|
# 关闭数据库连接
|
|
|
self.close()
|
|
|
|
|
|
for source_key, data in loaded_data.items():
|
|
|
subject = 'RSS' + data.get('source_key')
|
|
|
- title = 'message bot'
|
|
|
+ title = source_key
|
|
|
|
|
|
key_data_total = len(data.get('data'))
|
|
|
- text = '关键词: ' + data.get('keys') + '\n'
|
|
|
- text += '一共搜索到: ' + str(key_data_total) + ' 条数据\n'
|
|
|
- text += '*' * 120 + '\n'
|
|
|
+ text = '关键词:\n' + data.get('keys').replace('|', '\n') + '\n\n'
|
|
|
+ text += '一共搜索到: ' + str(key_data_total) + ' 条数据\n\n'
|
|
|
+ text += '*' * 80 + '\n'
|
|
|
for d in data.get('data'):
|
|
|
text += '标题: ' + d.get('title') + '\n'
|
|
|
text += '内容: ' + d.get('content') + '\n'
|
|
|
- if d.get('key'):
|
|
|
- text += '关键词: ' + d.get('key') + '\n'
|
|
|
text += '链接: ' + d.get('link') + '\n'
|
|
|
text += '发布日期: ' + d.get('postdate') + '\n'
|
|
|
- text += '时间戳: ' + str(d.get('posttimestamp')) + '\n'
|
|
|
- text += '*' * 120
|
|
|
+ text += '时间戳: ' + str(d.get('posttimestamp')) + '\n\n'
|
|
|
+ text += '*' * 80
|
|
|
text += '\n\n'
|
|
|
|
|
|
self.send_email(subject=subject, title=title, text=text)
|
|
|
|
|
|
+ def query_and_process_key(self, key_name, keywords):
|
|
|
+ records = self.execute_query(keywords)
|
|
|
+ if records:
|
|
|
+ unique_records = {}
|
|
|
+ for record in records:
|
|
|
+ title = self.remove_all_html_tags(record[2]) # 获取标题
|
|
|
+ if title not in unique_records:
|
|
|
+ unique_records[title] = {
|
|
|
+ "title": title,
|
|
|
+ "content": self.remove_all_html_tags(record[4]),
|
|
|
+ "link": record[5],
|
|
|
+ "postdate": (datetime.datetime.utcfromtimestamp(record[7])
|
|
|
+ .strftime('%Y-%m-%d %H:%M:%S')) if record[7] else '',
|
|
|
+ "posttimestamp": record[7] or 0
|
|
|
+ }
|
|
|
+ return list(unique_records.values())
|
|
|
+ return None
|
|
|
+
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
f = FreshRSSDatabase()
|