剥离无效事件系统并收口插件统计链路

- 删除未被实际消费的事件系统实现与相关发布逻辑
- 将插件调用统计改为在机器人主链路中直接埋点记录
- 重构统计收集插件初始化与记录方式,移除事件总线依赖
- 同步更新工程优化文档中的性能与链路治理描述
This commit is contained in:
liuwei
2026-04-30 14:54:22 +08:00
parent 78e4f50b7e
commit 0878f0d4ea
7 changed files with 174 additions and 284 deletions

View File

@@ -1,62 +0,0 @@
from loguru import logger
from typing import Dict, List, Type, Callable, Any
from threading import Lock
class Event:
"""事件基类"""
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
class EventManager:
"""事件管理器,单例模式"""
_instance = None
_lock = Lock()
@classmethod
def get_instance(cls):
"""获取事件管理器实例"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self):
if EventManager._instance is not None:
raise RuntimeError("EventManager 是单例类,请使用 get_instance() 方法获取实例")
self.handlers: Dict[Type[Event], List[Callable]] = {}
self.LOG = logger
def register(self, event_type: Type[Event], handler: Callable) -> None:
"""注册事件处理器"""
if event_type not in self.handlers:
self.handlers[event_type] = []
if handler not in self.handlers[event_type]:
self.handlers[event_type].append(handler)
self.LOG.debug(f"注册事件处理器: {event_type.__name__} -> {handler.__name__}")
def unregister(self, event_type: Type[Event], handler: Callable) -> None:
"""取消注册事件处理器"""
if event_type in self.handlers and handler in self.handlers[event_type]:
self.handlers[event_type].remove(handler)
self.LOG.debug(f"取消注册事件处理器: {event_type.__name__} -> {handler.__name__}")
def publish(self, event_type: Type[Event], event_data: Dict[str, Any] = None) -> None:
"""发布事件"""
if event_data is None:
event_data = {}
event = event_type(**event_data)
if event_type in self.handlers:
for handler in self.handlers[event_type]:
try:
handler(event)
except Exception as e:
self.LOG.error(f"事件处理器 {handler.__name__} 处理 {event_type.__name__} 事件出错: {e}")

View File

@@ -1,58 +0,0 @@
from dataclasses import dataclass
from typing import Dict, Any, Optional
from datetime import datetime
from base.event_system.event_manager import Event
@dataclass
class PluginCallStartEvent(Event):
"""插件调用开始事件"""
plugin_name: str # 插件名称
command: str # 触发的命令
full_command: str # 完整命令内容
user_id: str # 用户ID
group_id: Optional[str] = None # 群组ID私聊为None
is_group: bool = False # 是否群聊
message: Dict[str, Any] = None # 原始消息内容
timestamp: datetime = None # 事件时间戳
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
@dataclass
class PluginCallEndEvent(Event):
"""插件调用结束事件"""
plugin_name: str # 插件名称
command: str # 触发的命令
user_id: str # 用户ID
group_id: Optional[str] = None # 群组ID私聊为None
is_group: bool = False # 是否群聊
process_result: bool = True # 处理结果True成功False失败
result_message: Optional[str] = None # 处理结果消息
process_time: int = 0 # 处理耗时(毫秒)
timestamp: datetime = None # 事件时间戳
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
@dataclass
class PluginCallErrorEvent(Event):
"""插件调用错误事件"""
plugin_name: str # 插件名称
command: str # 触发的命令
user_id: str # 用户ID
group_id: Optional[str] # 群组ID私聊为None
is_group: bool # 是否群聊
error_message: str # 错误信息
stack_trace: Optional[str] = None # 堆栈跟踪
process_time: int = 0 # 处理耗时(毫秒)
timestamp: datetime = None # 事件时间戳
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()

View File

@@ -1,85 +0,0 @@
from enum import Enum, auto
from typing import Dict, Any, Callable, List
import threading
class EventType(Enum):
"""事件类型枚举"""
SYSTEM_STARTUP = auto()
SYSTEM_SHUTDOWN = auto()
PLUGIN_LOADED = auto()
PLUGIN_UNLOADED = auto()
MESSAGE_RECEIVED = auto()
MESSAGE_PROCESSED = auto()
CUSTOM_EVENT = auto()
class EventSystem:
"""事件系统,用于插件间通信"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super(EventSystem, cls).__new__(cls)
cls._instance._subscribers = {}
cls._instance._initialized = False
return cls._instance
def __init__(self):
if not self._initialized:
self._subscribers = {}
self._initialized = True
def subscribe(self, event_type: EventType, callback: Callable[[Dict[str, Any]], None]) -> None:
"""
订阅事件
Args:
event_type: 事件类型
callback: 回调函数,接收事件数据
"""
if event_type not in self._subscribers:
self._subscribers[event_type] = []
if callback not in self._subscribers[event_type]:
self._subscribers[event_type].append(callback)
def unsubscribe(self, event_type: EventType, callback: Callable[[Dict[str, Any]], None]) -> None:
"""
取消订阅事件
Args:
event_type: 事件类型
callback: 回调函数
"""
if event_type in self._subscribers and callback in self._subscribers[event_type]:
self._subscribers[event_type].remove(callback)
def publish(self, event_type: EventType, data: Dict[str, Any]) -> None:
"""
发布事件
Args:
event_type: 事件类型
data: 事件数据
"""
if event_type in self._subscribers:
for callback in self._subscribers[event_type]:
try:
callback(data)
except Exception as e:
print(f"事件处理错误: {e}")
def get_subscribers(self, event_type: EventType) -> List[Callable]:
"""
获取事件订阅者
Args:
event_type: 事件类型
Returns:
订阅者列表
"""
return self._subscribers.get(event_type, [])

View File

@@ -12,7 +12,6 @@ from base.plugin_common.plugin_interface import PluginInterface, PluginStatus
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.scheduled_plugin_interface import ScheduledPluginInterface
from base.plugin_common.plugin_registry import PluginRegistry
from base.plugin_common.event_system import EventSystem, EventType
from utils.decorator.async_job import async_job
from wechat_ipad import WechatAPIClient
@@ -431,9 +430,6 @@ class PluginManager:
self._refresh_module_file_state(module_name)
# self.LOG.info(f"PluginManager添加模块映射 {module_name} -> {display_name}")
# 发布插件加载事件
EventSystem().publish(EventType.PLUGIN_LOADED, {"plugin": plugin})
return plugin
else:
self.LOG.error(f"PluginManager插件模块 {module_name} 的 get_plugin() 返回的不是有效的插件实例")
@@ -483,9 +479,6 @@ class PluginManager:
self._refresh_module_file_state(module_name)
# self.LOG.info(f"PluginManager添加模块映射 {module_name} -> {display_name}")
# 发布插件加载事件
EventSystem().publish(EventType.PLUGIN_LOADED, {"plugin": plugin})
return plugin
except Exception as e:
@@ -544,9 +537,6 @@ class PluginManager:
# 移除插件实例
del self.plugins[display_name]
# 发布插件卸载事件
EventSystem().publish(EventType.PLUGIN_UNLOADED, {"plugin_name": display_name})
return True
def reload_plugin(self, name: str) -> Optional[PluginInterface]:

View File

@@ -92,7 +92,7 @@
消息处理主链路已经做了部分异步和并发控制,但仍有几个潜在热点:
- 插件处理仍以串行判定为主,插件数量增加后会放大延迟
- 事件系统是轻量同步发布模型,慢回调会影响主流程
- 统计、观测、异常记录等横切逻辑过去分散在多处抽象中,主链路需要继续收口
- 消息归档、媒体处理、AI 调用都可能形成局部阻塞
- Redis 和数据库部分写法在高消息量场景下会出现额外开销

View File

@@ -2,10 +2,6 @@ from loguru import logger
from typing import Dict, Any, Tuple, Optional, List
from base.plugin_common.plugin_interface import PluginInterface, PluginStatus
from base.event_system.event_manager import EventManager
# 使用正确的事件类型导入
from base.event_system.events.plugin_events import PluginCallStartEvent, PluginCallEndEvent, PluginCallErrorEvent
# 数据库导入
from db.stats_db import StatsDBOperator
from db.connection import DBConnectionManager
@@ -41,81 +37,95 @@ class StatsCollectorPlugin(PluginInterface):
super().__init__()
self.LOG = logger
self.LOG.debug(f"正在初始化 {self.name} 插件...")
# 默认配置
# 默认配置
# 1. 这个插件现在不再依赖事件总线,而是由主消息分发链路直接回调;
# 2. 因此这里保留一份轻量配置,只控制“是否记录”和“排除哪些插件”;
# 3. 这样既能延续原有统计面板数据结构,也能避免事件系统带来的额外复杂度。
self.config = {
"enable": True,
"record_all_plugins": True, # 是否记录所有插件的调用
"excluded_plugins": [], # 排除的插件列表
}
self.event_manager = EventManager.get_instance()
self.db_manager = DBConnectionManager.get_instance()
self.stats_db = StatsDBOperator(self.db_manager)
# 注册功能权限
def initialize(self, config: Dict[str, Any]) -> bool:
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
if config:
self.config.update(config)
# 这里显式只读取插件自己的配置,不再把 system_context 整体 merge 进来:
# 1. 旧实现把 initialize 入参当成“插件配置”使用,但实际传入的是 system_context
# 2. 结果会把 db_manager、redis_pool 等运行时对象误写到 self.config 中;
# 3. 改成只消费 load_config 后的 self._config避免配置结构持续污染。
if isinstance(self._config, dict):
self.config.update(self._config)
# 若主系统已经初始化了 DB 管理器,则优先复用统一实例,避免插件侧再走兜底单例。
if isinstance(context, dict) and context.get("db_manager") is not None:
self.db_manager = context.get("db_manager")
self.stats_db = StatsDBOperator(self.db_manager)
if not self.config["enable"]:
self.LOG.info("统计收集插件已禁用")
return False
# 注册事件处理器
self.event_manager.register(PluginCallStartEvent, self.handle_plugin_call_start)
self.event_manager.register(PluginCallEndEvent, self.handle_plugin_call_end)
self.event_manager.register(PluginCallErrorEvent, self.handle_plugin_error)
return True
def handle_plugin_call_start(self, event: PluginCallStartEvent) -> None:
"""处理插件调用开始事件"""
# 检查是否需要记录该插件
if not self._should_record_plugin(event.plugin_name):
def record_plugin_call(
self,
*,
plugin_name: str,
command: str,
user_id: str,
group_id: Optional[str],
is_group: bool,
process_result: bool,
process_time_ms: float,
) -> None:
"""由主链路直接调用,记录一次插件执行结果。"""
# 主链路可能会在高频场景下频繁回调这里,因此先做最便宜的开关判断,
# 避免对已关闭或被排除插件继续产生数据库写入成本。
if not self._should_record_plugin(plugin_name):
return
# 记录开始时间和相关信息
self.LOG.debug(f"记录插件调用开始: {event.plugin_name} - {event.command}")
def handle_plugin_call_end(self, event: PluginCallEndEvent) -> None:
"""处理插件调用结束事件"""
# 检查是否需要记录该插件
if not self._should_record_plugin(event.plugin_name):
return
# 记录统计数据
try:
# 确保使用正确的属性名
self.stats_db.record_plugin_call(
plugin_name=event.plugin_name,
command=event.command,
user_id=event.user_id,
group_id=event.group_id,
success=event.process_result,
process_time_ms=event.process_time
plugin_name=plugin_name,
command=command,
user_id=user_id,
group_id=group_id if is_group else None,
success=process_result,
process_time_ms=process_time_ms,
)
self.LOG.debug(
f"记录插件调用结束: {event.plugin_name} - {event.command} - 成功: {event.process_result} - 处理时间: {event.process_time}ms")
f"记录插件调用结束: {plugin_name} - {command} - 成功: {process_result} - 处理时间: {process_time_ms}ms"
)
except Exception as e:
self.LOG.error(f"记录插件调用统计数据出错: {e}")
def handle_plugin_error(self, event: PluginCallErrorEvent) -> None:
"""处理插件调用错误事件"""
# 检查是否需要记录该插件
if not self._should_record_plugin(event.plugin_name):
def record_plugin_error(
self,
*,
plugin_name: str,
command: str,
user_id: str,
group_id: Optional[str],
is_group: bool,
error_message: str,
stack_trace: Optional[str] = None,
) -> None:
"""由主链路直接调用,记录一次插件执行异常。"""
if not self._should_record_plugin(plugin_name):
return
# 记录错误信息
try:
self.stats_db.record_error(
plugin_name=event.plugin_name,
command=event.command,
user_id=event.user_id,
group_id=event.group_id,
error_message=event.error_message,
stack_trace=event.stack_trace
plugin_name=plugin_name,
command=command,
user_id=user_id,
group_id=group_id if is_group else None,
error_message=error_message,
stack_trace=stack_trace,
)
self.LOG.debug(f"记录插件调用错误: {event.plugin_name} - {event.command} - {event.error_message}")
self.LOG.debug(f"记录插件调用错误: {plugin_name} - {command} - {error_message}")
except Exception as e:
self.LOG.error(f"记录插件错误信息出错: {e}")
@@ -145,11 +155,6 @@ class StatsCollectorPlugin(PluginInterface):
def shutdown(self) -> None:
"""关闭插件"""
# 取消注册事件处理器
self.event_manager.unregister(PluginCallStartEvent, self.handle_plugin_call_start)
self.event_manager.unregister(PluginCallEndEvent, self.handle_plugin_call_end)
self.event_manager.unregister(PluginCallErrorEvent, self.handle_plugin_error)
self.LOG.info("统计收集插件已关闭")
def start(self) -> bool:

128
robot.py
View File

@@ -3,13 +3,13 @@ import asyncio
import threading
import time
import tomllib
import traceback
from collections import deque
import toml
from loguru import logger
import wechat_ipad
from base.plugin_common.event_system import EventType, EventSystem
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from base.plugin_common.plugin_manager import PluginManager
@@ -98,13 +98,11 @@ class Robot:
# 初始化插件系统
self.LOG.debug("开始初始化插件系统...")
self.plugin_registry = PluginRegistry()
self.event_system = EventSystem()
self.plugin_modules = {} # 存储已加载的插件模块
self.plugins = {} # 存储已加载的插件实例
# 设置插件系统上下文
self.system_context = {
"config": config,
"event_system": self.event_system,
"plugin_registry": self.plugin_registry,
"db_manager": self.db_manager,
"db_pool": self.db_pool,
@@ -545,11 +543,8 @@ class Robot:
except Exception as e:
self.LOG.error(f"获取群成员信息失败: {e}")
# 发布消息接收事件
self.event_system.publish(EventType.MESSAGE_RECEIVED, {"message": message})
# 尝试使用插件处理消息
plugin_processed = await self.process_plugin_message(message)
await self.process_plugin_message(message)
if is_group:
self.LOG.debug(f"入库和记录群消息: {message}")
@@ -609,13 +604,22 @@ class Robot:
if plugin.status != PluginStatus.RUNNING:
continue
# 这里在进入插件前统一准备统计上下文:
# 1. 事件系统删除后,插件调用统计需要直接在主链路埋点;
# 2. 提前抽出 room_id / sender / command后续无论成功还是异常都能复用
# 3. 这样可以保证观测逻辑收口在一处,避免每个插件自己重复埋点。
room_id = msg.roomid if msg.from_group() else ""
sender = msg.sender
command_name = self._extract_plugin_command(msg)
started_at = time.perf_counter()
try:
# 转换消息为插件可处理的格式
plugin_msg = {
"type": msg.msg_type,
"content": msg.content.clean_content,
"sender": msg.sender,
"roomid": msg.roomid if msg.from_group() else "",
"sender": sender,
"roomid": room_id,
"is_at": msg.is_at(self.wxid),
"timestamp": time.time(),
"all_contacts": self.allContacts,
@@ -628,18 +632,114 @@ class Robot:
# 检查插件是否可以处理该消息
if plugin.can_process(plugin_msg):
processed, _ = await plugin.process_message(plugin_msg)
self._record_plugin_call_result(
plugin=plugin,
msg=msg,
command_name=command_name,
# 这里把“无异常执行完成”视为统计意义上的成功:
# 1. 很多插件返回 False 只是表示“本次不拦截”或“异步排队后继续放行”;
# 2. 若直接把 processed=False 记成失败,会把成功率统计严重拉低;
# 3. 真正的失败已经会走异常分支,因此统计层这里按“未抛错即成功”更合理。
process_result=True,
process_time_ms=self._elapsed_ms(started_at),
)
if processed:
# 发布消息处理事件
self.event_system.publish(EventType.MESSAGE_PROCESSED, {
"message": msg,
"plugin": plugin.name
})
return True
except Exception as e:
self._record_plugin_call_error(
plugin=plugin,
msg=msg,
command_name=command_name,
error=e,
)
self.LOG.error(f"插件 {plugin.name} 处理消息失败: {e}")
return False
@staticmethod
def _elapsed_ms(started_at: float) -> float:
"""把 monotonic 起始时间转换为毫秒耗时。"""
return round((time.perf_counter() - started_at) * 1000, 2)
@staticmethod
def _extract_plugin_command(msg: WxMessage) -> str:
"""尽力从消息内容中提取一个可读的“触发命令”。"""
# 这里不追求把所有命令解析得非常精确,只要能满足后台统计可读性即可:
# 1. 文本消息优先取第一段词,避免把整句长文本都记成 command
# 2. 非文本消息统一落到消息类型名,便于区分“文本触发”和“链接触发”等场景;
# 3. 空内容时返回通用占位,避免统计表出现 NULL / 空字符串。
raw_content = str(getattr(getattr(msg, "content", None), "clean_content", "") or "").strip()
if raw_content:
first_token = raw_content.split()[0].strip()
return first_token[:50] if first_token else "[文本消息]"
msg_type = getattr(getattr(msg, "msg_type", None), "name", "")
return f"[{msg_type or 'UNKNOWN'}]"
def _get_stats_collector_plugin(self):
"""获取运行中的统计收集插件实例。"""
# 统计插件已经从“事件订阅”切到“主链路直接回调”,
# 因此每次埋点前都需要安全地确认插件实例是否存在且处于运行态。
plugin = self.plugin_manager.plugins.get("指令记录")
if not plugin:
return None
if getattr(plugin, "status", None) != PluginStatus.RUNNING:
return None
return plugin
def _record_plugin_call_result(
self,
*,
plugin,
msg: WxMessage,
command_name: str,
process_result: bool,
process_time_ms: float,
) -> None:
"""将插件执行结果直接写入统计插件。"""
stats_plugin = self._get_stats_collector_plugin()
if not stats_plugin or not hasattr(stats_plugin, "record_plugin_call"):
return
try:
stats_plugin.record_plugin_call(
plugin_name=plugin.name,
command=command_name,
user_id=msg.sender,
group_id=msg.roomid if msg.from_group() else None,
is_group=msg.from_group(),
process_result=process_result,
process_time_ms=process_time_ms,
)
except Exception as stats_error:
self.LOG.error(f"记录插件调用统计失败: plugin={plugin.name}, error={stats_error}")
def _record_plugin_call_error(
self,
*,
plugin,
msg: WxMessage,
command_name: str,
error: Exception,
) -> None:
"""将插件执行异常直接写入统计插件。"""
stats_plugin = self._get_stats_collector_plugin()
if not stats_plugin or not hasattr(stats_plugin, "record_plugin_error"):
return
try:
stats_plugin.record_plugin_error(
plugin_name=plugin.name,
command=command_name,
user_id=msg.sender,
group_id=msg.roomid if msg.from_group() else None,
is_group=msg.from_group(),
error_message=str(error),
# 这里保留完整堆栈,便于后台直接查看异常上下文,而不必只看摘要日志。
stack_trace=traceback.format_exc(),
)
except Exception as stats_error:
self.LOG.error(f"记录插件异常统计失败: plugin={plugin.name}, error={stats_error}")
@staticmethod
def _sort_message_plugins(message_plugins):
"""将兜底型插件放到最后执行,避免影响其他插件命中。"""