message_vix.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. # -*- coding: utf-8 -*-
  2. '''
  3. 检查 当前VIX恐慌指数
  4. '''
  5. import sys
  6. import os
  7. import re
  8. from datetime import datetime
  9. sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
  10. from message_check_base import *
  11. from utils.utils_send_gotify import *
  12. class CheckVIX:
  13. def __init__(self):
  14. self.url_list = [{'VIX恐慌指数': 'https://wallstreetcn.com/markets/codes/VIX.OTC'}]
  15. self.selectors = [
  16. '.price-lastpx'
  17. ]
  18. self.send_message = False
  19. self.check_value = 60
  20. def clean_string(self, input_string):
  21. """
  22. 清除字符串中的特殊字符和HTML标签
  23. :param input_string: 要处理的字符串
  24. :return: 处理后的字符串
  25. """
  26. # 替换字符
  27. clean_list = ['\n', '\t', ' ']
  28. for char in clean_list:
  29. input_string = input_string.replace(char, '')
  30. # 使用正则表达式移除HTML标签
  31. cleaned_string = re.sub(r'<.*?>', '', input_string)
  32. return cleaned_string
  33. def main(self):
  34. all_data = None
  35. crawler = CryptoCrawler(self.url_list, self.selectors)
  36. all_data = crawler.main() # 返回的数据格式: dict: [{关键词str: 数据any}]
  37. # 组成context信息
  38. context = ''
  39. # 如果有返回数据
  40. if all_data:
  41. all_data = all_data[0]
  42. for data in all_data:
  43. for key, value in data.items():
  44. context += f"{key}: {value}%\n"
  45. # 如果恐慌指数大于指定数值, 则发送消息, 小于则不发送
  46. if float(value) >= self.check_value:
  47. self.send_message = True
  48. else:
  49. print('no data!')
  50. if self.send_message and context:
  51. print(f'发送消息: {context}')
  52. context += '\n{}'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  53. # 推送到 message
  54. GotifyNotifier('vix恐慌指数', context).send_message()
  55. else:
  56. print(f"VIX恐慌指数小于{self.check_value},不发送消息\n{context}")
  57. print('done!')
  58. if __name__ == '__main__':
  59. CheckVIX().main()