Files
2025-12-05 18:06:13 +08:00

235 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
性能监控插件
提供系统性能统计和监控功能
"""
import asyncio
import time
import os
from utils.plugin_base import PluginBase
from utils.decorators import on_text_message, schedule
from loguru import logger
class PerformanceMonitor(PluginBase):
"""性能监控插件"""
# 插件元数据
description = "系统性能监控和统计"
author = "System"
version = "1.0.0"
def __init__(self):
super().__init__()
self.start_time = time.time()
self.last_stats_time = time.time()
self.last_message_count = 0
async def async_init(self):
"""插件异步初始化"""
logger.info("性能监控插件已加载")
@on_text_message(priority=90)
async def handle_stats_command(self, bot, message: dict):
"""处理性能统计命令"""
content = message.get("Content", "").strip()
from_wxid = message.get("FromWxid", "")
# 检查是否是管理员
import tomllib
with open("main_config.toml", "rb") as f:
config = tomllib.load(f)
admins = config.get("Bot", {}).get("admins", [])
sender_wxid = message.get("SenderWxid", from_wxid)
if sender_wxid not in admins:
return
if content in ["/性能", "/stats", "/状态", "/性能报告"]:
stats_msg = await self._get_performance_stats(bot)
await bot.send_text(from_wxid, stats_msg)
return False # 阻止其他插件处理
@schedule('interval', minutes=30)
async def log_performance_stats(self, bot):
"""定期记录性能统计"""
try:
stats = await self._get_performance_data()
logger.info(f"性能统计: 消息队列={stats['queue_size']}/{stats['queue_max']}, "
f"处理速率={stats['message_rate']:.1f}/min, "
f"总消息={stats['total_messages']}, "
f"已处理={stats['processed_messages']}")
except Exception as e:
logger.error(f"性能统计记录失败: {e}")
async def _get_performance_stats(self, bot) -> str:
"""获取性能统计信息"""
try:
# 尝试使用新的性能监控器
try:
from utils.bot_utils import get_performance_monitor
monitor = get_performance_monitor()
if monitor and monitor.message_received > 0:
return self._format_new_stats(monitor)
except ImportError:
pass
# 降级到旧的统计方式
stats = await self._get_performance_data()
# 格式化统计信息
uptime_hours = (time.time() - self.start_time) / 3600
msg = f"""📊 系统性能统计
🕐 运行时间: {uptime_hours:.1f} 小时
📨 消息统计:
• 总消息数: {stats['total_messages']}
• 已处理: {stats['processed_messages']}
• 已过滤: {stats['filtered_messages']}
• 处理率: {stats['process_rate']:.1%}
• 过滤率: {stats['filter_rate']:.1%}
⚡ 性能指标:
• 消息队列: {stats['queue_size']} / {stats['queue_max']}
• 处理速率: {stats['message_rate']:.1f} 消息/分钟
• 熔断器状态: {'🔴 开启' if stats['circuit_breaker_open'] else '🟢 正常'}
🔧 配置状态:
• 控制台日志: {'✅ 启用' if stats['console_log_enabled'] else '❌ 禁用'}
• 日志采样率: {stats['log_sampling_rate']:.0%}
• 最大并发: {stats['max_concurrency']}
• 过滤模式: {stats['ignore_mode']}"""
return msg
except Exception as e:
logger.error(f"获取性能统计失败: {e}")
return f"❌ 获取性能统计失败: {str(e)}"
def _format_new_stats(self, monitor) -> str:
"""格式化新性能监控器的统计信息"""
stats = monitor.get_stats()
# 基础信息
msg = f"""📊 系统性能报告
🕐 运行时间: {stats['uptime_formatted']}
📨 消息统计:
• 收到: {stats['messages']['received']}
• 处理: {stats['messages']['processed']}
• 失败: {stats['messages']['failed']}
• 丢弃: {stats['messages']['dropped']}
• 成功率: {stats['messages']['success_rate']}
• 处理速率: {stats['messages']['processing_rate']}
⚡ 处理性能:
• 平均耗时: {stats['processing_time']['average_ms']}ms
• 最大耗时: {stats['processing_time']['max_ms']}ms
• 最小耗时: {stats['processing_time']['min_ms']}ms
📦 队列状态:
• 当前大小: {stats['queue']['current_size']}
• 历史最大: {stats['queue']['max_size']}"""
# 熔断器状态
cb = stats.get('circuit_breaker', {})
if cb:
state_icon = {'closed': '🟢', 'open': '🔴', 'half_open': '🟡'}.get(cb.get('state', ''), '')
msg += f"""
🔌 熔断器:
• 状态: {state_icon} {cb.get('state', 'N/A')}
• 失败计数: {cb.get('failure_count', 0)}
• 恢复时间: {cb.get('current_recovery_time', 0):.0f}s"""
# 插件耗时排行
plugins = stats.get('plugins', [])
if plugins:
msg += "\n\n🔧 插件耗时排行:"
for i, p in enumerate(plugins[:5], 1):
msg += f"\n {i}. {p['name']}: {p['avg_time_ms']}ms ({p['calls']}次)"
return msg
async def _get_performance_data(self) -> dict:
"""获取性能数据"""
# 系统资源简化版本不依赖psutil
try:
import platform
system_info = platform.system()
# 简单的进程信息
import os
pid = os.getpid()
cpu_percent = 0.0 # 暂时设为0避免依赖psutil
memory_percent = 0.0
memory_mb = 0.0
except:
cpu_percent = 0.0
memory_percent = 0.0
memory_mb = 0.0
# 获取 HookBot 统计
from utils.plugin_manager import PluginManager
bot_service = getattr(PluginManager(), '_bot_service', None)
if bot_service and hasattr(bot_service, 'hookbot') and bot_service.hookbot:
hookbot_stats = bot_service.hookbot.get_stats()
queue_size = bot_service.message_queue.qsize() if bot_service.message_queue else 0
queue_max = bot_service.queue_config.get("max_size", 1000)
circuit_breaker_open = bot_service.circuit_breaker_open
max_concurrency = bot_service.concurrency_config.get("plugin_max_concurrency", 8)
ignore_mode = bot_service.hookbot.ignore_mode
else:
hookbot_stats = {
"total_messages": 0,
"processed_messages": 0,
"filtered_messages": 0,
"process_rate": 0,
"filter_rate": 0
}
queue_size = 0
queue_max = 1000
circuit_breaker_open = False
max_concurrency = 8
ignore_mode = "None"
# 计算消息处理速率
current_time = time.time()
time_diff = current_time - self.last_stats_time
message_diff = hookbot_stats["total_messages"] - self.last_message_count
if time_diff > 0:
message_rate = (message_diff / time_diff) * 60 # 每分钟消息数
else:
message_rate = 0
self.last_stats_time = current_time
self.last_message_count = hookbot_stats["total_messages"]
# 读取配置
try:
import tomllib
with open("main_config.toml", "rb") as f:
config = tomllib.load(f)
perf_config = config.get("Performance", {})
console_log_enabled = perf_config.get("log_console_enabled", True)
log_sampling_rate = perf_config.get("log_sampling_rate", 1.0)
except:
console_log_enabled = True
log_sampling_rate = 1.0
return {
"queue_size": queue_size,
"queue_max": queue_max,
"message_rate": message_rate,
"circuit_breaker_open": circuit_breaker_open,
"console_log_enabled": console_log_enabled,
"log_sampling_rate": log_sampling_rate,
"max_concurrency": max_concurrency,
"ignore_mode": ignore_mode,
**hookbot_stats
}