message_vix.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. # -*- coding: utf-8 -*-
  2. '''
  3. 检查 当前VIX恐慌指数
  4. '''
  5. import sys
  6. import os
  7. import re
  8. from datetime import datetime
  9. sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
  10. from message_check_base import *
  11. from utils.utils_send_gotify import *
  12. from utils.utils_send_serverchan import *
  13. class CheckVIX:
  14. def __init__(self):
  15. self.url_list = [{'VIX恐慌指数': 'https://wallstreetcn.com/markets/codes/VIX.OTC'}]
  16. self.selectors = [
  17. '.price-lastpx'
  18. ]
  19. self.send_message = False
  20. self.check_value = 60
  21. def clean_string(self, input_string):
  22. """
  23. 清除字符串中的特殊字符和HTML标签
  24. :param input_string: 要处理的字符串
  25. :return: 处理后的字符串
  26. """
  27. # 替换字符
  28. clean_list = ['\n', '\t', ' ']
  29. for char in clean_list:
  30. input_string = input_string.replace(char, '')
  31. # 使用正则表达式移除HTML标签
  32. cleaned_string = re.sub(r'<.*?>', '', input_string)
  33. return cleaned_string
  34. def main(self):
  35. all_data = None
  36. crawler = CryptoCrawler(self.url_list, self.selectors)
  37. all_data = crawler.main() # 返回的数据格式: dict: [{关键词str: 数据any}]
  38. # 组成context信息
  39. context = ''
  40. # 如果有返回数据
  41. if all_data:
  42. all_data = all_data[0]
  43. for data in all_data:
  44. for key, value in data.items():
  45. context += f"{key}: {value}%\n"
  46. # 如果恐慌指数大于指定数值, 则发送消息, 小于则不发送
  47. if float(value) >= self.check_value:
  48. self.send_message = True
  49. else:
  50. print('no data!')
  51. if self.send_message and context:
  52. print(f'发送消息: {context}')
  53. context += '\n{}'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  54. # 推送到 message
  55. GotifyNotifier('vix恐慌指数', context).send_message()
  56. # 推送到 serverchan
  57. ServerChanNotifier('vix恐慌指数', context.replace('\n', '\n\n')).send_message()
  58. else:
  59. print(f"VIX恐慌指数小于{self.check_value},不发送消息\n{context}")
  60. print('done!')
  61. if __name__ == '__main__':
  62. CheckVIX().main()