| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- # -*- coding: utf-8 -*-
- '''
- 获取天气预报
- '''
- import os
- import sys
- import time
- from datetime import datetime
- from bs4 import BeautifulSoup
- import httpx
- sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
- SUB_PROJECT_NAME = "获取天气预报"
- PROJECT_PATH = os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo')
- class Weather():
- def main(self):
- print('开始获取天气预报数据')
- try:
- area_code = '59287'
- one_week = [
- '/tomorrow-%s.htm' % area_code,
- '/third-%s.htm' % area_code,
- '/fourth-%s.htm' % area_code,
- '/fifth-%s.htm' % area_code,
- '/sixth-%s.htm' % area_code,
- '/seventh-%s.htm' % area_code,
- ]
- url = "https://tianqi.2345.com/today-%s.htm" % area_code
- header = {'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; Media Center PC 6.0; InfoPath.2; MS-RTC LM 8'}
- response = httpx.get(url=url, headers=header)
- response.encoding = "utf-8"
- bs = BeautifulSoup(response.text, 'html.parser')
- one_week_weather = []
- for week in one_week:
- a = bs.find_all('a', href=week)
- a = ' '.join(a[0].text.split())
- one_week_weather.append(a)
- except Exception as e:
- print(e)
- print('Weather forecast')
- exit(0)
- text = "天气预报: {}获取并发送\n".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())) + '\n'.join(
- one_week_weather)
- # 推送到 message
- GotifyNotifier('天气预报数', text, 'weather').send_message()
- print('天气预报数据已获取')
- class GotifyNotifier:
- def __init__(self, title, message, token_name=''):
- self.gotify_url = 'https://gotify.erhe.top'
- self.app_token = self.match_token_name(token_name)
- self.title = title
- self.message = message
- def match_token_name(self, name):
- token_name_dict = {
- 'base': 'A8EVb0Cmxnb2vfk',
- 'coin': 'AgfOJESqDKftBTQ',
- 'dlt': 'A3bqt9Dlbs.fPUb',
- 'AirdropTasksNews': 'Aoe0VKt-kkZnm8d',
- 'weather': 'A9KF--mx_12PjSu',
- 'news': 'AT2QGp_vyCX4akW',
- 'CheckAndRemind': 'Aw7XKE2Ppk7Dgwk',
- 'test': 'A0Xg6ZE5946iBYg',
- }
- token = token_name_dict.get(name)
- if token:
- return token
- else:
- return token_name_dict['base']
- def send_message(self):
- # 构建POST请求的headers
- headers = {
- 'Content-Type': 'application/json'
- }
- # 构建POST请求的body
- body = {
- 'title': self.title,
- 'message': self.message
- }
- # 发送POST请求
- with httpx.Client() as client:
- response = client.post(
- url=f"{self.gotify_url}/message?token={self.app_token}",
- headers=headers,
- json=body
- )
- # 或者可以使用 curl
- # curl -k "https://gotify.erhe.top/message?token=A0Xg6ZE5946iBYg" -F "title=测试发送信息" -F "message=假装有信息,测试发送" -F "priority=5"
- # 检查响应状态码
- if response.status_code == 200:
- print('Gotify Message sent successfully!')
- else:
- print('Failed to send message:', response.text)
- if __name__ == "__main__":
- # W = Weather().main()
- print(PROJECT_PATH)
|