# -*- coding: utf-8 -*- ''' 获取天气预报 ''' import os import sys import time from datetime import datetime from bs4 import BeautifulSoup import httpx sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo')) SUB_PROJECT_NAME = "获取天气预报" PROJECT_PATH = os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo') class Weather(): def main(self): print('开始获取天气预报数据') try: area_code = '59287' one_week = [ '/tomorrow-%s.htm' % area_code, '/third-%s.htm' % area_code, '/fourth-%s.htm' % area_code, '/fifth-%s.htm' % area_code, '/sixth-%s.htm' % area_code, '/seventh-%s.htm' % area_code, ] url = "https://tianqi.2345.com/today-%s.htm" % area_code header = {'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; Media Center PC 6.0; InfoPath.2; MS-RTC LM 8'} response = httpx.get(url=url, headers=header) response.encoding = "utf-8" bs = BeautifulSoup(response.text, 'html.parser') one_week_weather = [] for week in one_week: a = bs.find_all('a', href=week) a = ' '.join(a[0].text.split()) one_week_weather.append(a) except Exception as e: print(e) print('Weather forecast') exit(0) text = "天气预报: {}获取并发送\n".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())) + '\n'.join( one_week_weather) # 推送到 message GotifyNotifier('天气预报数', text, 'weather').send_message() print('天气预报数据已获取') class GotifyNotifier: def __init__(self, title, message, token_name=''): self.gotify_url = 'https://gotify.erhe.top' self.app_token = self.match_token_name(token_name) self.title = title self.message = message def match_token_name(self, name): token_name_dict = { 'base': 'A8EVb0Cmxnb2vfk', 'coin': 'AgfOJESqDKftBTQ', 'dlt': 'A3bqt9Dlbs.fPUb', 'AirdropTasksNews': 'Aoe0VKt-kkZnm8d', 'weather': 'A9KF--mx_12PjSu', 'news': 'AT2QGp_vyCX4akW', 'CheckAndRemind': 'Aw7XKE2Ppk7Dgwk', 'test': 'A0Xg6ZE5946iBYg', } token = token_name_dict.get(name) if token: return token else: return token_name_dict['base'] def send_message(self): # 构建POST请求的headers headers = { 'Content-Type': 'application/json' } # 构建POST请求的body body = { 'title': self.title, 'message': self.message } # 发送POST请求 with httpx.Client() as client: response = client.post( url=f"{self.gotify_url}/message?token={self.app_token}", headers=headers, json=body ) # 或者可以使用 curl # curl -k "https://gotify.erhe.top/message?token=A0Xg6ZE5946iBYg" -F "title=测试发送信息" -F "message=假装有信息,测试发送" -F "priority=5" # 检查响应状态码 if response.status_code == 200: print('Gotify Message sent successfully!') else: print('Failed to send message:', response.text) if __name__ == "__main__": # W = Weather().main() print(PROJECT_PATH)