| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- # -*-coding: utf-8 -*-
- import os
- import sys
- import threading
- from datetime import datetime
- import time
- import httpx
- sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
- from utils.utils_mongo_handle import MongoHandle
- from utils.utils_logs_handle import LogsHandle
- from utils.utils_send_email import SendEmail
- class GetData(object):
- def __init__(self, get_num=9999999):
- self.get_num = get_num
- self.url = 'https://webapi.sporttery.cn/gateway/lottery/getHistoryPageListV1.qry?gameNo=85&provinceId=0&pageSize={}&isVerify=1&pageNo=1'.format(
- get_num)
- self.logs_handle = LogsHandle()
- self.email_subject = 'dlt'
- self.email_title = '超级大乐透最新一期开奖查询对比'
- self.email_text = '获取数据时间:\n{0}\n{1}\n\n\n\n'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- ('-' * 90))
- self.logs_handle = LogsHandle()
- self.now_day = time.strftime('%Y-%m-%d', time.localtime())
- db = 'dlt'
- collection = 'dlt_' + self.now_day
- self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=False, auto_remove=0)
- def main(self):
- data_list = self.req()
- result_data = self.data_handle(data_list)
- return result_data
- def req(self):
- resp = httpx.get(self.url)
- if resp.status_code != 200:
- print('state code: {}'.format(resp.status_code))
- log_detail = '访问失败, 状态码:{},url:{}'.format(resp.status_code, self.url)
- self.logs_handle.logs_write('auto_get_and_check_dlt', log_detail, 'error', False)
- exit(0)
- resp_json = resp.json()
- value = resp_json.setdefault('value')
- data_list = value.setdefault('list')
- if not data_list:
- self.logs_handle.logs_write('auto_get_and_check_dlt', '返回的数据为空, 获取数据失败', 'error', False)
- return
- print('已获取数据')
- return data_list
- def data_handle(self, data_list):
- result_data = []
- for d in data_list:
- numbers = d.setdefault('lotteryUnsortDrawresult')
- try:
- if len(numbers.split(' ')) < 7:
- continue
- except Exception as e:
- print('numbers: {}, err: {}'.format(numbers, e))
- continue
- red_list = numbers.split(' ')[:5]
- blue_list = numbers.split(' ')[5:]
- red_list.sort()
- blue_list.sort()
- try:
- # 切开红球,蓝球数组
- red1 = red_list[0]
- red2 = red_list[1]
- red3 = red_list[2]
- red4 = red_list[3]
- red5 = red_list[4]
- blue1 = blue_list[0]
- blue2 = blue_list[1]
- except Exception as e:
- print('红球或蓝球数据丢失')
- continue
- result_data.append({
- 'serial': d.setdefault('lotteryDrawNum'),
- 'red1': red1 or '',
- 'red2': red2 or '',
- 'red3': red3 or '',
- 'red4': red4 or '',
- 'red5': red5 or '',
- 'blue1': blue1 or '',
- 'blue2': blue2 or '',
- 'drawPdfUrl': d.setdefault('drawPdfUrl'),
- 'date': d.setdefault('lotteryDrawTime'),
- 'pool': d.setdefault('poolBalanceAfterdraw')
- })
- if result_data:
- return result_data
- else:
- self.logs_handle.logs_write('auto_get_and_check_dlt', '返回的数据为空, 获取数据失败', 'error', False)
- exit(0)
- class CheckMyDLT(object):
- def __init__(self, data):
- self.my_dlt = [
- ['10', '11', '16', '17', '18', '11', '12'],
- ['02', '03', '11', '12', '23', '05', '06'],
- ['07', '09', '15', '17', '22', '09', '11'],
- ['05', '06', '07', '34', '35', '02', '09'],
- ['09', '10', '11', '21', '22', '04', '05']
- ]
- self.data = data
- def main(self):
- print('开始数据对比')
- prepare_send_text, prepare_send_subject = self.process_text()
- self.send_data(prepare_send_subject, prepare_send_text)
- def process_text(self):
- text = ''
- serial_text = None
- subject = None
- for data in self.data:
- red_list = [data['red1'], data['red2'], data['red3'], data['red4'], data['red5']]
- blue_list = [data['blue1'], data['blue2']]
- # 只查询一期时, subject显示, 如果查询多期,则subject不显示
- if len(data) == 1:
- subject = '{}'.format(data['serial'])
- # 组成每期数据的text
- serial_text = 'serial: {}\t\tlottery draw date: {}\t\tbonus pool: {} RMB\n{}\nlottery draw num: {} + {}\n'.format(
- data['serial'], data['date'], data['pool'], '*' * 90,
- red_list, blue_list)
- for my_num in self.my_dlt:
- my_red_list = my_num[:5]
- my_blue_list = my_num[5:]
- # 使用列表推导式找出两个列表中都存在的元素
- red_common_elements = [element for element in red_list if element in my_red_list]
- blue_common_elements = [element for element in blue_list if element in my_blue_list]
- # 计算相等元素的数量
- red_equal_count = len(red_common_elements)
- blue_equal_count = len(blue_common_elements)
- serial_text += 'my nums: {} + {}\t\tred hit: {}\tblue hit: {}\n'.format(my_red_list, my_blue_list,
- red_equal_count,
- blue_equal_count)
- text += serial_text
- text += '{}\n\n\n\n'.format('*' * 90)
- return text, subject
- def send_data(self, subject, text):
- title = '超级大乐透最新一期开奖查询对比'
- SendEmail(subject, title, text).send()
- class SaveToDB(object):
- def __init__(self, data):
- self.logs_handle = LogsHandle()
- self.now_day = time.strftime('%Y-%m-%d', time.localtime())
- db = 'dlt'
- collection = 'dlt_' + self.now_day
- self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=True, auto_remove=0)
- self.data = data
- def save_data(self):
- print('开始保存数据')
- for data in self.data:
- data_to_insert = {
- "serial": data.setdefault('serial'),
- "red1": data.setdefault('red1'),
- "red2": data.setdefault('red2'),
- "red3": data.setdefault('red3'),
- "red4": data.setdefault('red4'),
- "red5": data.setdefault('red5'),
- "blue1": data.setdefault('blue1'),
- "blue2": data.setdefault('blue2'),
- "date": data.setdefault('date'),
- "pool": data.setdefault('pool'),
- "drawPdfUrl": data.setdefault('drawPdfUrl'),
- "create_time": int(time.time()),
- "create_datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- }
- self.mongo.collection.insert_one(data_to_insert)
- print('数据已储存, 共储存数据{}条'.format(len(self.data)))
- class DLT(object):
- def start(self, n):
- # # 获取数据
- G = GetData(n)
- data = G.main()
- return data
- def check(self, data):
- # # 读取数据并发送到邮件
- Check = CheckMyDLT(data)
- Check.main()
- def mongo(self, data):
- # 存 mongodb
- Mongo = SaveToDB(data)
- Mongo.save_data()
- def main(self):
- L = LogsHandle()
- L.logs_write('auto_get_and_check_dlt', 'dlt任务开始', 'start', False)
- data = self.start(30)
- if data:
- tasks = [
- self.check,
- self.mongo
- ]
- threads = []
- for i in tasks:
- thread = threading.Thread(target=i, args=(data,))
- threads.append(thread)
- thread.start()
- for thread in threads:
- thread.join()
- L.logs_write('auto_get_and_check_dlt', 'dlt任务结束', 'start', False)
- print('done')
- else:
- L.logs_write('auto_get_and_check_dlt', '获取数据失败', 'error', False)
- class Luanch(object):
- def start(self, n):
- # # 获取数据
- G = GetData(n)
- data = G.main()
- return data
- def check(self, data):
- # # 读取数据并发送到邮件
- Check = CheckMyDLT(data)
- Check.main()
- def mongo(self, data):
- # 存 mongodb
- Mongo = SaveToDB(data)
- Mongo.save_data()
- def main(self):
- Logs = LogsHandle()
- Logs.logs_write('auto_get_and_check_dlt', 'dlt任务开始', 'start', False)
- data = self.start(30)
- if data:
- tasks = [
- self.check,
- self.mongo
- ]
- threads = []
- for i in tasks:
- thread = threading.Thread(target=i, args=(data,))
- threads.append(thread)
- thread.start()
- for thread in threads:
- thread.join()
- Logs.logs_write('auto_get_and_check_dlt', 'dlt任务结束', 'start', False)
- print('done')
- else:
- Logs.logs_write('auto_get_and_check_dlt', '获取数据失败', 'error', False)
- if __name__ == '__main__':
- Luanch().main()
- # ## 单独获取数据
- # G = GetData()
- # data = G.main()
- # re_data = data[::-1]
- # save_txt = ''
- # for item in re_data:
- # save_txt += f'[[{item["red1"]}, {item["red2"]}, {item["red3"]}, {item["red4"]}, {item["red5"]}], [{item["blue1"]}, {item["blue2"]}]],\n'
- #
- # with open('dlt.txt', 'w') as f:
- # f.write(save_txt)
|