realtime_logger.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 实时日志输出模块
  5. """
  6. import asyncio
  7. import json
  8. import time
  9. import threading
  10. from typing import List, Dict, Any, Optional
  11. from pathlib import Path
  12. import logging
  13. logger = logging.getLogger("realtime_logger")
  14. class RealtimeLogger:
  15. """实时日志记录器"""
  16. def __init__(self):
  17. self.connections: List[Any] = []
  18. self.log_buffer: List[Dict[str, Any]] = []
  19. self.max_buffer_size = 1000
  20. self._lock = threading.Lock()
  21. self._loop: Optional[asyncio.AbstractEventLoop] = None
  22. def set_loop(self, loop: asyncio.AbstractEventLoop) -> None:
  23. """注册主事件循环,便于跨线程安全调度发送任务"""
  24. self._loop = loop
  25. def add_connection(self, websocket):
  26. """添加WebSocket连接"""
  27. with self._lock:
  28. self.connections.append(websocket)
  29. logger.info(f"新增WebSocket连接,当前连接数: {len(self.connections)}")
  30. def remove_connection(self, websocket):
  31. """移除WebSocket连接"""
  32. with self._lock:
  33. if websocket in self.connections:
  34. self.connections.remove(websocket)
  35. logger.info(f"移除WebSocket连接,当前连接数: {len(self.connections)}")
  36. async def broadcast_log(self, message: str, level: str = "INFO", source: str = "system"):
  37. """广播日志消息到所有连接的客户端"""
  38. log_entry = {
  39. "timestamp": time.time(),
  40. "time": time.strftime("%H:%M:%S"),
  41. "level": level,
  42. "source": source,
  43. "message": message
  44. }
  45. # 添加到缓冲区
  46. with self._lock:
  47. self.log_buffer.append(log_entry)
  48. if len(self.log_buffer) > self.max_buffer_size:
  49. self.log_buffer = self.log_buffer[-self.max_buffer_size:]
  50. # 广播到所有连接
  51. if self.connections:
  52. message_data = json.dumps(log_entry, ensure_ascii=False)
  53. disconnected = []
  54. for websocket in self.connections.copy(): # 使用副本避免并发修改
  55. try:
  56. await websocket.send_text(message_data)
  57. except Exception as e:
  58. logger.warning(f"发送消息失败: {e}")
  59. disconnected.append(websocket)
  60. # 清理断开的连接
  61. for ws in disconnected:
  62. self.remove_connection(ws)
  63. def broadcast_log_sync(self, message: str, level: str = "INFO", source: str = "system"):
  64. """同步广播日志消息(用于非异步环境)"""
  65. log_entry = {
  66. "timestamp": time.time(),
  67. "time": time.strftime("%H:%M:%S"),
  68. "level": level,
  69. "source": source,
  70. "message": message
  71. }
  72. # 添加到缓冲区
  73. with self._lock:
  74. self.log_buffer.append(log_entry)
  75. if len(self.log_buffer) > self.max_buffer_size:
  76. self.log_buffer = self.log_buffer[-self.max_buffer_size:]
  77. # 若已注册事件循环,尝试在线程安全地调度异步广播
  78. if self._loop is not None:
  79. try:
  80. asyncio.run_coroutine_threadsafe(
  81. self.broadcast_log(message=message, level=level, source=source),
  82. self._loop,
  83. )
  84. except Exception:
  85. # 忽略发送失败,缓冲区仍可用于新连接回放
  86. pass
  87. async def get_recent_logs(self, count: int = 50) -> List[Dict[str, Any]]:
  88. """获取最近的日志"""
  89. with self._lock:
  90. return self.log_buffer[-count:] if self.log_buffer else []
  91. def clear_buffer(self):
  92. """清空日志缓冲区"""
  93. with self._lock:
  94. self.log_buffer.clear()
  95. logger.info("日志缓冲区已清空")
  96. # 全局实时日志记录器
  97. realtime_logger = RealtimeLogger()