115 lines
3.1 KiB
Python
115 lines
3.1 KiB
Python
"""
|
|
消息统计器模块
|
|
|
|
提供消息处理的统计功能:
|
|
- 消息计数
|
|
- 过滤率统计
|
|
- 按类型统计
|
|
"""
|
|
|
|
import time
|
|
from collections import defaultdict
|
|
from threading import Lock
|
|
from typing import Any, Dict
|
|
|
|
|
|
class MessageStats:
|
|
"""
|
|
消息统计器
|
|
|
|
线程安全的消息统计实现
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._lock = Lock()
|
|
self._total_count = 0
|
|
self._filtered_count = 0
|
|
self._processed_count = 0
|
|
self._duplicate_count = 0
|
|
self._error_count = 0
|
|
self._by_type: Dict[str, int] = defaultdict(int)
|
|
self._start_time = time.time()
|
|
|
|
def record_received(self):
|
|
"""记录收到消息"""
|
|
with self._lock:
|
|
self._total_count += 1
|
|
|
|
def record_filtered(self):
|
|
"""记录被过滤的消息"""
|
|
with self._lock:
|
|
self._filtered_count += 1
|
|
|
|
def record_processed(self, event_type: str = None):
|
|
"""
|
|
记录已处理的消息
|
|
|
|
Args:
|
|
event_type: 消息事件类型(可选)
|
|
"""
|
|
with self._lock:
|
|
self._processed_count += 1
|
|
if event_type:
|
|
self._by_type[event_type] += 1
|
|
|
|
def record_duplicate(self):
|
|
"""记录重复消息"""
|
|
with self._lock:
|
|
self._duplicate_count += 1
|
|
|
|
def record_error(self):
|
|
"""记录处理错误"""
|
|
with self._lock:
|
|
self._error_count += 1
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""获取统计信息"""
|
|
with self._lock:
|
|
uptime = time.time() - self._start_time
|
|
total = max(self._total_count, 1) # 避免除零
|
|
|
|
return {
|
|
"total_messages": self._total_count,
|
|
"filtered_messages": self._filtered_count,
|
|
"processed_messages": self._processed_count,
|
|
"duplicate_messages": self._duplicate_count,
|
|
"error_count": self._error_count,
|
|
"filter_rate": self._filtered_count / total,
|
|
"process_rate": self._processed_count / total,
|
|
"duplicate_rate": self._duplicate_count / total,
|
|
"messages_per_minute": (self._total_count / uptime) * 60 if uptime > 0 else 0,
|
|
"uptime_seconds": uptime,
|
|
"by_type": dict(self._by_type),
|
|
}
|
|
|
|
def reset(self):
|
|
"""重置统计"""
|
|
with self._lock:
|
|
self._total_count = 0
|
|
self._filtered_count = 0
|
|
self._processed_count = 0
|
|
self._duplicate_count = 0
|
|
self._error_count = 0
|
|
self._by_type.clear()
|
|
self._start_time = time.time()
|
|
|
|
|
|
# 全局单例(可选使用)
|
|
_global_stats: MessageStats = None
|
|
_stats_lock = Lock()
|
|
|
|
|
|
def get_message_stats() -> MessageStats:
|
|
"""获取全局消息统计器实例"""
|
|
global _global_stats
|
|
if _global_stats is None:
|
|
with _stats_lock:
|
|
if _global_stats is None:
|
|
_global_stats = MessageStats()
|
|
return _global_stats
|
|
|
|
|
|
# ==================== 导出 ====================
|
|
|
|
__all__ = ['MessageStats', 'get_message_stats']
|