| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 实时日志输出模块
- """
- import asyncio
- import json
- import time
- import threading
- from typing import List, Dict, Any, Optional
- from pathlib import Path
- import logging
- logger = logging.getLogger("realtime_logger")
- class RealtimeLogger:
- """实时日志记录器"""
-
- def __init__(self):
- self.connections: List[Any] = []
- self.log_buffer: List[Dict[str, Any]] = []
- self.max_buffer_size = 1000
- self._lock = threading.Lock()
- self._loop: Optional[asyncio.AbstractEventLoop] = None
-
- def set_loop(self, loop: asyncio.AbstractEventLoop) -> None:
- """注册主事件循环,便于跨线程安全调度发送任务"""
- self._loop = loop
-
- def add_connection(self, websocket):
- """添加WebSocket连接"""
- with self._lock:
- self.connections.append(websocket)
- logger.info(f"新增WebSocket连接,当前连接数: {len(self.connections)}")
-
- def remove_connection(self, websocket):
- """移除WebSocket连接"""
- with self._lock:
- if websocket in self.connections:
- self.connections.remove(websocket)
- logger.info(f"移除WebSocket连接,当前连接数: {len(self.connections)}")
-
- async def broadcast_log(self, message: str, level: str = "INFO", source: str = "system"):
- """广播日志消息到所有连接的客户端"""
- log_entry = {
- "timestamp": time.time(),
- "time": time.strftime("%H:%M:%S"),
- "level": level,
- "source": source,
- "message": message
- }
-
- # 添加到缓冲区
- with self._lock:
- self.log_buffer.append(log_entry)
- if len(self.log_buffer) > self.max_buffer_size:
- self.log_buffer = self.log_buffer[-self.max_buffer_size:]
-
- # 广播到所有连接
- if self.connections:
- message_data = json.dumps(log_entry, ensure_ascii=False)
- disconnected = []
-
- for websocket in self.connections.copy(): # 使用副本避免并发修改
- try:
- await websocket.send_text(message_data)
- except Exception as e:
- logger.warning(f"发送消息失败: {e}")
- disconnected.append(websocket)
-
- # 清理断开的连接
- for ws in disconnected:
- self.remove_connection(ws)
-
- def broadcast_log_sync(self, message: str, level: str = "INFO", source: str = "system"):
- """同步广播日志消息(用于非异步环境)"""
- log_entry = {
- "timestamp": time.time(),
- "time": time.strftime("%H:%M:%S"),
- "level": level,
- "source": source,
- "message": message
- }
-
- # 添加到缓冲区
- with self._lock:
- self.log_buffer.append(log_entry)
- if len(self.log_buffer) > self.max_buffer_size:
- self.log_buffer = self.log_buffer[-self.max_buffer_size:]
-
- # 若已注册事件循环,尝试在线程安全地调度异步广播
- if self._loop is not None:
- try:
- asyncio.run_coroutine_threadsafe(
- self.broadcast_log(message=message, level=level, source=source),
- self._loop,
- )
- except Exception:
- # 忽略发送失败,缓冲区仍可用于新连接回放
- pass
-
- async def get_recent_logs(self, count: int = 50) -> List[Dict[str, Any]]:
- """获取最近的日志"""
- with self._lock:
- return self.log_buffer[-count:] if self.log_buffer else []
-
- def clear_buffer(self):
- """清空日志缓冲区"""
- with self._lock:
- self.log_buffer.clear()
- logger.info("日志缓冲区已清空")
- # 全局实时日志记录器
- realtime_logger = RealtimeLogger()
|