""" 消息统计器模块 提供消息处理的统计功能: - 消息计数 - 过滤率统计 - 按类型统计 """ import time from collections import defaultdict from threading import Lock from typing import Any, Dict class MessageStats: """ 消息统计器 线程安全的消息统计实现 """ def __init__(self): self._lock = Lock() self._total_count = 0 self._filtered_count = 0 self._processed_count = 0 self._duplicate_count = 0 self._error_count = 0 self._by_type: Dict[str, int] = defaultdict(int) self._start_time = time.time() def record_received(self): """记录收到消息""" with self._lock: self._total_count += 1 def record_filtered(self): """记录被过滤的消息""" with self._lock: self._filtered_count += 1 def record_processed(self, event_type: str = None): """ 记录已处理的消息 Args: event_type: 消息事件类型(可选) """ with self._lock: self._processed_count += 1 if event_type: self._by_type[event_type] += 1 def record_duplicate(self): """记录重复消息""" with self._lock: self._duplicate_count += 1 def record_error(self): """记录处理错误""" with self._lock: self._error_count += 1 def get_stats(self) -> Dict[str, Any]: """获取统计信息""" with self._lock: uptime = time.time() - self._start_time total = max(self._total_count, 1) # 避免除零 return { "total_messages": self._total_count, "filtered_messages": self._filtered_count, "processed_messages": self._processed_count, "duplicate_messages": self._duplicate_count, "error_count": self._error_count, "filter_rate": self._filtered_count / total, "process_rate": self._processed_count / total, "duplicate_rate": self._duplicate_count / total, "messages_per_minute": (self._total_count / uptime) * 60 if uptime > 0 else 0, "uptime_seconds": uptime, "by_type": dict(self._by_type), } def reset(self): """重置统计""" with self._lock: self._total_count = 0 self._filtered_count = 0 self._processed_count = 0 self._duplicate_count = 0 self._error_count = 0 self._by_type.clear() self._start_time = time.time() # 全局单例(可选使用) _global_stats: MessageStats = None _stats_lock = Lock() def get_message_stats() -> MessageStats: """获取全局消息统计器实例""" global _global_stats if _global_stats is None: with _stats_lock: if _global_stats is None: _global_stats = MessageStats() return _global_stats # ==================== 导出 ==================== __all__ = ['MessageStats', 'get_message_stats']