| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- # -*- coding: utf-8 -*-
- from odoo import models, fields, api
- from datetime import datetime, timedelta
- from pymongo import MongoClient
- import smtplib
- from email.mime.text import MIMEText
- from email.header import Header
- from odoo.exceptions import UserError
- class News(models.Model):
- _name = 'news.info'
- _description = 'News Info'
- _order = 'create_datetime DESC'
- name = fields.Char(string='Title', required=True)
- context = fields.Text(string='Context')
- context_simple = fields.Char(string='Context Simple', compute='_compute_context')
- source_url = fields.Char(string='Source URL')
- link = fields.Char(string='Link')
- article_type = fields.Char(string='Article Type')
- article_source = fields.Char(string='Article Source')
- img_url = fields.Char(string='Image URL')
- keyword = fields.Char(string='Keyword')
- posted_date = fields.Char(string='Posted Date')
- create_time_ts = fields.Float(string='Creation Time TS')
- create_time = fields.Datetime(string='Creation Time')
- create_datetime = fields.Datetime(string='Creation Datetime')
- def _compute_context(self):
- context = ''
- for record in self:
- if record.context:
- if len(record.context) < 80:
- record.context_simple = record.context
- else:
- record.context_simple = f'{record.context[:80]}...'
- else:
- record.context_simple = context
- class SearchKeys(models.Model):
- _name = 'news.search_keys'
- _description = 'News Search Keys'
- _order = 'id DESC'
- def _default_name(self):
- search_keys_id = self.env['news.search_keys'].search([], limit=1, order='id DESC')
- if search_keys_id:
- return str(int(search_keys_id.name) + 1).zfill(5)
- else:
- return '1'.zfill(5)
- name = fields.Char(string='Name', default=_default_name)
- keys = fields.Char(string='Keys')
- text = fields.Text(string='Text')
- auto_send = fields.Boolean(string='Auto Send', default=True)
- search_days = fields.Integer(string='Search Days', default=7, required=False)
- def btn_search(self):
- try:
- client = MongoClient('mongodb://root:aaaAAA111!!!@erhe.top:38000/', connectTimeoutMS=30000,
- serverSelectionTimeoutMS=5000)
- # 指定数据库名称
- db_name = 'NEWS' # 替换为你的数据库名称
- # 选择数据库
- db = client[db_name]
- # 列出数据库中的所有集合
- collections = db.list_collection_names()
- except Exception as e:
- raise UserError(e)
- all_data = []
- search_days = 7 # 假设你想搜索最近7天的数据
- # 计算日期范围
- current_date = datetime.utcnow()
- date_threshold = (current_date - timedelta(days=self.search_days)).strftime('%Y-%m-%d %H:%M:%S')
- # 将 self.keys 字符串分割成列表,如果 self.keys 为空,则设置为空列表
- keys_list = self.keys.split('|||') if self.keys else []
- for collection_name in collections:
- # 选择集合
- collection = db[collection_name]
- # 构建查询条件
- query_conditions = {'create_time': {'$gte': date_threshold}}
- # 如果有传入关键词,添加到查询条件中
- if keys_list:
- for search_key in keys_list:
- # 构建查询
- query = {
- '$or': [
- {'title': {'$regex': search_key, '$options': 'i'}}, # 搜索title字段
- {'context': {'$regex': search_key, '$options': 'i'}} # 搜索context字段
- ],
- 'create_datetime': {'$gte': date_threshold} # create_datetime字段大于等于计算出的日期
- }
- # 执行查询
- results = collection.find(query, {'_id': 0})
- for result in results:
- all_data.append(result)
- else:
- query = {'create_datetime': {'$gte': date_threshold}}
- results = collection.find(query, {'_id': 0})
- for result in results:
- all_data.append(result)
- sorted_data = []
- if all_data:
- sorted_data = sorted(all_data, key=lambda x: x['create_time'], reverse=True)
- else:
- return
- if sorted_data:
- if self.search_days:
- t = '关键词: {}\n{} 天内数据\n{}\n\n'.format(self.keys, self.search_days, '*' * 100)
- t = '{} 天内数据\n{}\n\n'.format(self.search_days, '*' * 100)
- for d in sorted_data:
- title = d.setdefault('title')
- context = d.setdefault('context') or '/'
- link = d.setdefault('link') or '/'
- posted_date = d.setdefault('posted_date') or '/'
- create_datetime = d.setdefault('create_datetime')
- if len(context) > 300:
- context = context[:300] + '...'
- t += f"标题: {title}\n内容: {context}\n链接: {link}\n发表时间: {posted_date}\n获取时间: {create_datetime}\n"
- t += '*' * 100 + '\n\n'
- self.write({'text': t})
- else:
- self.write({'text': 'No Data'})
- if self.auto_send:
- self.btn_send_email()
- def btn_send_email(self):
- if self.text:
- mail_host = 'smtp.163.com'
- mail_user = 'pushmessagebot@163.com'
- mail_pass = 'WSMSRKBKXIHIQWTU'
- sender = 'pushmessagebot@163.com'
- receivers = ['pushmessagebot@163.com']
- message = MIMEText(self.text, 'plain', 'utf-8')
- message['From'] = Header(f'keyword: {self.keys}' if self.keys else f'{self.search_days}天内数据', 'utf-8')
- message['To'] = Header("SearchAndSend", 'utf-8')
- message['Subject'] = Header(f'keyword: {self.keys}' if self.keys else f'{self.search_days}天内数据',
- 'utf-8')
- try:
- smtp_obj = smtplib.SMTP_SSL(mail_host)
- smtp_obj.login(mail_user, mail_pass)
- smtp_obj.sendmail(sender, receivers, message.as_string())
- print("邮件发送成功")
- except smtplib.SMTPException as e:
- print("Error: 无法发送邮件", e)
- else:
- raise UserError('请先搜索数据')
|