|
@@ -0,0 +1,112 @@
|
|
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
|
|
+'''
|
|
|
|
|
+获取天气预报
|
|
|
|
|
+'''
|
|
|
|
|
+import os
|
|
|
|
|
+import sys
|
|
|
|
|
+import time
|
|
|
|
|
+from datetime import datetime
|
|
|
|
|
+from bs4 import BeautifulSoup
|
|
|
|
|
+import httpx
|
|
|
|
|
+
|
|
|
|
|
+sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
|
|
|
|
|
+SUB_PROJECT_NAME = "获取天气预报"
|
|
|
|
|
+PROJECT_PATH = os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo')
|
|
|
|
|
+
|
|
|
|
|
+class Weather():
|
|
|
|
|
+ def main(self):
|
|
|
|
|
+ print('开始获取天气预报数据')
|
|
|
|
|
+ try:
|
|
|
|
|
+ area_code = '59287'
|
|
|
|
|
+ one_week = [
|
|
|
|
|
+ '/tomorrow-%s.htm' % area_code,
|
|
|
|
|
+ '/third-%s.htm' % area_code,
|
|
|
|
|
+ '/fourth-%s.htm' % area_code,
|
|
|
|
|
+ '/fifth-%s.htm' % area_code,
|
|
|
|
|
+ '/sixth-%s.htm' % area_code,
|
|
|
|
|
+ '/seventh-%s.htm' % area_code,
|
|
|
|
|
+ ]
|
|
|
|
|
+ url = "https://tianqi.2345.com/today-%s.htm" % area_code
|
|
|
|
|
+ header = {'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; Media Center PC 6.0; InfoPath.2; MS-RTC LM 8'}
|
|
|
|
|
+ response = httpx.get(url=url, headers=header)
|
|
|
|
|
+ response.encoding = "utf-8"
|
|
|
|
|
+ bs = BeautifulSoup(response.text, 'html.parser')
|
|
|
|
|
+
|
|
|
|
|
+ one_week_weather = []
|
|
|
|
|
+ for week in one_week:
|
|
|
|
|
+ a = bs.find_all('a', href=week)
|
|
|
|
|
+ a = ' '.join(a[0].text.split())
|
|
|
|
|
+ one_week_weather.append(a)
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(e)
|
|
|
|
|
+ print('Weather forecast')
|
|
|
|
|
+ exit(0)
|
|
|
|
|
+
|
|
|
|
|
+ text = "天气预报: {}获取并发送\n".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())) + '\n'.join(
|
|
|
|
|
+ one_week_weather)
|
|
|
|
|
+
|
|
|
|
|
+ # 推送到 message
|
|
|
|
|
+ GotifyNotifier('天气预报数', text, 'weather').send_message()
|
|
|
|
|
+
|
|
|
|
|
+ print('天气预报数据已获取')
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class GotifyNotifier:
|
|
|
|
|
+ def __init__(self, title, message, token_name=''):
|
|
|
|
|
+ self.gotify_url = 'https://gotify.erhe.top'
|
|
|
|
|
+ self.app_token = self.match_token_name(token_name)
|
|
|
|
|
+ self.title = title
|
|
|
|
|
+ self.message = message
|
|
|
|
|
+
|
|
|
|
|
+ def match_token_name(self, name):
|
|
|
|
|
+ token_name_dict = {
|
|
|
|
|
+ 'base': 'A8EVb0Cmxnb2vfk',
|
|
|
|
|
+ 'coin': 'AgfOJESqDKftBTQ',
|
|
|
|
|
+ 'dlt': 'A3bqt9Dlbs.fPUb',
|
|
|
|
|
+ 'AirdropTasksNews': 'Aoe0VKt-kkZnm8d',
|
|
|
|
|
+ 'weather': 'A9KF--mx_12PjSu',
|
|
|
|
|
+ 'news': 'AT2QGp_vyCX4akW',
|
|
|
|
|
+ 'CheckAndRemind': 'Aw7XKE2Ppk7Dgwk',
|
|
|
|
|
+ 'test': 'A0Xg6ZE5946iBYg',
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ token = token_name_dict.get(name)
|
|
|
|
|
+ if token:
|
|
|
|
|
+ return token
|
|
|
|
|
+ else:
|
|
|
|
|
+ return token_name_dict['base']
|
|
|
|
|
+
|
|
|
|
|
+ def send_message(self):
|
|
|
|
|
+ # 构建POST请求的headers
|
|
|
|
|
+ headers = {
|
|
|
|
|
+ 'Content-Type': 'application/json'
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # 构建POST请求的body
|
|
|
|
|
+ body = {
|
|
|
|
|
+ 'title': self.title,
|
|
|
|
|
+ 'message': self.message
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # 发送POST请求
|
|
|
|
|
+ with httpx.Client() as client:
|
|
|
|
|
+ response = client.post(
|
|
|
|
|
+ url=f"{self.gotify_url}/message?token={self.app_token}",
|
|
|
|
|
+ headers=headers,
|
|
|
|
|
+ json=body
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 或者可以使用 curl
|
|
|
|
|
+ # curl -k "https://gotify.erhe.top/message?token=A0Xg6ZE5946iBYg" -F "title=测试发送信息" -F "message=假装有信息,测试发送" -F "priority=5"
|
|
|
|
|
+
|
|
|
|
|
+ # 检查响应状态码
|
|
|
|
|
+ if response.status_code == 200:
|
|
|
|
|
+ print('Gotify Message sent successfully!')
|
|
|
|
|
+ else:
|
|
|
|
|
+ print('Failed to send message:', response.text)
|
|
|
|
|
+
|
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
|
+ # W = Weather().main()
|
|
|
|
|
+ print(PROJECT_PATH)
|
|
|
|
|
+
|