get_one_week_weather.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. # -*- coding: utf-8 -*-
  2. '''
  3. 获取天气预报
  4. '''
  5. import os
  6. import sys
  7. import time
  8. from datetime import datetime
  9. from bs4 import BeautifulSoup
  10. import httpx
  11. sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
  12. SUB_PROJECT_NAME = "获取天气预报"
  13. PROJECT_PATH = os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo')
  14. class Weather():
  15. def main(self):
  16. print('开始获取天气预报数据')
  17. try:
  18. area_code = '59287'
  19. one_week = [
  20. '/tomorrow-%s.htm' % area_code,
  21. '/third-%s.htm' % area_code,
  22. '/fourth-%s.htm' % area_code,
  23. '/fifth-%s.htm' % area_code,
  24. '/sixth-%s.htm' % area_code,
  25. '/seventh-%s.htm' % area_code,
  26. ]
  27. url = "https://tianqi.2345.com/today-%s.htm" % area_code
  28. header = {'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; Media Center PC 6.0; InfoPath.2; MS-RTC LM 8'}
  29. response = httpx.get(url=url, headers=header)
  30. response.encoding = "utf-8"
  31. bs = BeautifulSoup(response.text, 'html.parser')
  32. one_week_weather = []
  33. for week in one_week:
  34. a = bs.find_all('a', href=week)
  35. a = ' '.join(a[0].text.split())
  36. one_week_weather.append(a)
  37. except Exception as e:
  38. print(e)
  39. print('Weather forecast')
  40. exit(0)
  41. text = "天气预报: {}获取并发送\n".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())) + '\n'.join(
  42. one_week_weather)
  43. # 推送到 message
  44. GotifyNotifier('天气预报数', text, 'weather').send_message()
  45. print('天气预报数据已获取')
  46. class GotifyNotifier:
  47. def __init__(self, title, message, token_name=''):
  48. self.gotify_url = 'https://gotify.erhe.top'
  49. self.app_token = self.match_token_name(token_name)
  50. self.title = title
  51. self.message = message
  52. def match_token_name(self, name):
  53. token_name_dict = {
  54. 'base': 'A8EVb0Cmxnb2vfk',
  55. 'coin': 'AgfOJESqDKftBTQ',
  56. 'dlt': 'A3bqt9Dlbs.fPUb',
  57. 'AirdropTasksNews': 'Aoe0VKt-kkZnm8d',
  58. 'weather': 'A9KF--mx_12PjSu',
  59. 'news': 'AT2QGp_vyCX4akW',
  60. 'CheckAndRemind': 'Aw7XKE2Ppk7Dgwk',
  61. 'test': 'A0Xg6ZE5946iBYg',
  62. }
  63. token = token_name_dict.get(name)
  64. if token:
  65. return token
  66. else:
  67. return token_name_dict['base']
  68. def send_message(self):
  69. # 构建POST请求的headers
  70. headers = {
  71. 'Content-Type': 'application/json'
  72. }
  73. # 构建POST请求的body
  74. body = {
  75. 'title': self.title,
  76. 'message': self.message
  77. }
  78. # 发送POST请求
  79. with httpx.Client() as client:
  80. response = client.post(
  81. url=f"{self.gotify_url}/message?token={self.app_token}",
  82. headers=headers,
  83. json=body
  84. )
  85. # 或者可以使用 curl
  86. # curl -k "https://gotify.erhe.top/message?token=A0Xg6ZE5946iBYg" -F "title=测试发送信息" -F "message=假装有信息,测试发送" -F "priority=5"
  87. # 检查响应状态码
  88. if response.status_code == 200:
  89. print('Gotify Message sent successfully!')
  90. else:
  91. print('Failed to send message:', response.text)
  92. if __name__ == "__main__":
  93. # W = Weather().main()
  94. print(PROJECT_PATH)