diff --git a/base/event_system/event_manager.py b/base/event_system/event_manager.py deleted file mode 100644 index 791c1ae..0000000 --- a/base/event_system/event_manager.py +++ /dev/null @@ -1,62 +0,0 @@ -from loguru import logger -from typing import Dict, List, Type, Callable, Any -from threading import Lock - - -class Event: - """事件基类""" - - def __init__(self, **kwargs): - for key, value in kwargs.items(): - setattr(self, key, value) - - -class EventManager: - """事件管理器,单例模式""" - _instance = None - _lock = Lock() - - @classmethod - def get_instance(cls): - """获取事件管理器实例""" - if cls._instance is None: - with cls._lock: - if cls._instance is None: - cls._instance = cls() - return cls._instance - - def __init__(self): - if EventManager._instance is not None: - raise RuntimeError("EventManager 是单例类,请使用 get_instance() 方法获取实例") - - self.handlers: Dict[Type[Event], List[Callable]] = {} - self.LOG = logger - - def register(self, event_type: Type[Event], handler: Callable) -> None: - """注册事件处理器""" - if event_type not in self.handlers: - self.handlers[event_type] = [] - - if handler not in self.handlers[event_type]: - self.handlers[event_type].append(handler) - self.LOG.debug(f"注册事件处理器: {event_type.__name__} -> {handler.__name__}") - - def unregister(self, event_type: Type[Event], handler: Callable) -> None: - """取消注册事件处理器""" - if event_type in self.handlers and handler in self.handlers[event_type]: - self.handlers[event_type].remove(handler) - self.LOG.debug(f"取消注册事件处理器: {event_type.__name__} -> {handler.__name__}") - - def publish(self, event_type: Type[Event], event_data: Dict[str, Any] = None) -> None: - """发布事件""" - if event_data is None: - event_data = {} - - event = event_type(**event_data) - - if event_type in self.handlers: - for handler in self.handlers[event_type]: - try: - handler(event) - except Exception as e: - self.LOG.error(f"事件处理器 {handler.__name__} 处理 {event_type.__name__} 事件出错: {e}") diff --git a/base/event_system/events/plugin_events.py b/base/event_system/events/plugin_events.py deleted file mode 100644 index e0ff161..0000000 --- a/base/event_system/events/plugin_events.py +++ /dev/null @@ -1,58 +0,0 @@ -from dataclasses import dataclass -from typing import Dict, Any, Optional -from datetime import datetime - -from base.event_system.event_manager import Event - - -@dataclass -class PluginCallStartEvent(Event): - """插件调用开始事件""" - plugin_name: str # 插件名称 - command: str # 触发的命令 - full_command: str # 完整命令内容 - user_id: str # 用户ID - group_id: Optional[str] = None # 群组ID,私聊为None - is_group: bool = False # 是否群聊 - message: Dict[str, Any] = None # 原始消息内容 - timestamp: datetime = None # 事件时间戳 - - def __post_init__(self): - if self.timestamp is None: - self.timestamp = datetime.now() - - -@dataclass -class PluginCallEndEvent(Event): - """插件调用结束事件""" - plugin_name: str # 插件名称 - command: str # 触发的命令 - user_id: str # 用户ID - group_id: Optional[str] = None # 群组ID,私聊为None - is_group: bool = False # 是否群聊 - process_result: bool = True # 处理结果:True成功,False失败 - result_message: Optional[str] = None # 处理结果消息 - process_time: int = 0 # 处理耗时(毫秒) - timestamp: datetime = None # 事件时间戳 - - def __post_init__(self): - if self.timestamp is None: - self.timestamp = datetime.now() - - -@dataclass -class PluginCallErrorEvent(Event): - """插件调用错误事件""" - plugin_name: str # 插件名称 - command: str # 触发的命令 - user_id: str # 用户ID - group_id: Optional[str] # 群组ID,私聊为None - is_group: bool # 是否群聊 - error_message: str # 错误信息 - stack_trace: Optional[str] = None # 堆栈跟踪 - process_time: int = 0 # 处理耗时(毫秒) - timestamp: datetime = None # 事件时间戳 - - def __post_init__(self): - if self.timestamp is None: - self.timestamp = datetime.now() diff --git a/base/plugin_common/event_system.py b/base/plugin_common/event_system.py deleted file mode 100644 index 1197656..0000000 --- a/base/plugin_common/event_system.py +++ /dev/null @@ -1,85 +0,0 @@ -from enum import Enum, auto -from typing import Dict, Any, Callable, List -import threading - - -class EventType(Enum): - """事件类型枚举""" - SYSTEM_STARTUP = auto() - SYSTEM_SHUTDOWN = auto() - PLUGIN_LOADED = auto() - PLUGIN_UNLOADED = auto() - MESSAGE_RECEIVED = auto() - MESSAGE_PROCESSED = auto() - CUSTOM_EVENT = auto() - - -class EventSystem: - """事件系统,用于插件间通信""" - _instance = None - _lock = threading.Lock() - - def __new__(cls): - with cls._lock: - if cls._instance is None: - cls._instance = super(EventSystem, cls).__new__(cls) - cls._instance._subscribers = {} - cls._instance._initialized = False - return cls._instance - - def __init__(self): - if not self._initialized: - self._subscribers = {} - self._initialized = True - - def subscribe(self, event_type: EventType, callback: Callable[[Dict[str, Any]], None]) -> None: - """ - 订阅事件 - - Args: - event_type: 事件类型 - callback: 回调函数,接收事件数据 - """ - if event_type not in self._subscribers: - self._subscribers[event_type] = [] - - if callback not in self._subscribers[event_type]: - self._subscribers[event_type].append(callback) - - def unsubscribe(self, event_type: EventType, callback: Callable[[Dict[str, Any]], None]) -> None: - """ - 取消订阅事件 - - Args: - event_type: 事件类型 - callback: 回调函数 - """ - if event_type in self._subscribers and callback in self._subscribers[event_type]: - self._subscribers[event_type].remove(callback) - - def publish(self, event_type: EventType, data: Dict[str, Any]) -> None: - """ - 发布事件 - - Args: - event_type: 事件类型 - data: 事件数据 - """ - if event_type in self._subscribers: - for callback in self._subscribers[event_type]: - try: - callback(data) - except Exception as e: - print(f"事件处理错误: {e}") - - def get_subscribers(self, event_type: EventType) -> List[Callable]: - """ - 获取事件订阅者 - - Args: - event_type: 事件类型 - - Returns: - 订阅者列表 - """ - return self._subscribers.get(event_type, []) \ No newline at end of file diff --git a/base/plugin_common/plugin_manager.py b/base/plugin_common/plugin_manager.py index aba06d7..2537c24 100644 --- a/base/plugin_common/plugin_manager.py +++ b/base/plugin_common/plugin_manager.py @@ -12,7 +12,6 @@ from base.plugin_common.plugin_interface import PluginInterface, PluginStatus from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.scheduled_plugin_interface import ScheduledPluginInterface from base.plugin_common.plugin_registry import PluginRegistry -from base.plugin_common.event_system import EventSystem, EventType from utils.decorator.async_job import async_job from wechat_ipad import WechatAPIClient @@ -431,9 +430,6 @@ class PluginManager: self._refresh_module_file_state(module_name) # self.LOG.info(f"PluginManager:添加模块映射 {module_name} -> {display_name}") - # 发布插件加载事件 - EventSystem().publish(EventType.PLUGIN_LOADED, {"plugin": plugin}) - return plugin else: self.LOG.error(f"PluginManager:插件模块 {module_name} 的 get_plugin() 返回的不是有效的插件实例") @@ -483,9 +479,6 @@ class PluginManager: self._refresh_module_file_state(module_name) # self.LOG.info(f"PluginManager:添加模块映射 {module_name} -> {display_name}") - # 发布插件加载事件 - EventSystem().publish(EventType.PLUGIN_LOADED, {"plugin": plugin}) - return plugin except Exception as e: @@ -544,9 +537,6 @@ class PluginManager: # 移除插件实例 del self.plugins[display_name] - # 发布插件卸载事件 - EventSystem().publish(EventType.PLUGIN_UNLOADED, {"plugin_name": display_name}) - return True def reload_plugin(self, name: str) -> Optional[PluginInterface]: diff --git a/docs/工程优化与Feature清单.md b/docs/工程优化与Feature清单.md index 0b4eac8..93744a2 100644 --- a/docs/工程优化与Feature清单.md +++ b/docs/工程优化与Feature清单.md @@ -92,7 +92,7 @@ 消息处理主链路已经做了部分异步和并发控制,但仍有几个潜在热点: - 插件处理仍以串行判定为主,插件数量增加后会放大延迟 -- 事件系统是轻量同步发布模型,慢回调会影响主流程 +- 统计、观测、异常记录等横切逻辑过去分散在多处抽象中,主链路需要继续收口 - 消息归档、媒体处理、AI 调用都可能形成局部阻塞 - Redis 和数据库部分写法在高消息量场景下会出现额外开销 diff --git a/plugins/stats_collector/main.py b/plugins/stats_collector/main.py index d1ba9a2..0f2b69f 100644 --- a/plugins/stats_collector/main.py +++ b/plugins/stats_collector/main.py @@ -2,10 +2,6 @@ from loguru import logger from typing import Dict, Any, Tuple, Optional, List from base.plugin_common.plugin_interface import PluginInterface, PluginStatus -from base.event_system.event_manager import EventManager -# 使用正确的事件类型导入 -from base.event_system.events.plugin_events import PluginCallStartEvent, PluginCallEndEvent, PluginCallErrorEvent -# 数据库导入 from db.stats_db import StatsDBOperator from db.connection import DBConnectionManager @@ -41,81 +37,95 @@ class StatsCollectorPlugin(PluginInterface): super().__init__() self.LOG = logger self.LOG.debug(f"正在初始化 {self.name} 插件...") - # 默认配置 + # 默认配置: + # 1. 这个插件现在不再依赖事件总线,而是由主消息分发链路直接回调; + # 2. 因此这里保留一份轻量配置,只控制“是否记录”和“排除哪些插件”; + # 3. 这样既能延续原有统计面板数据结构,也能避免事件系统带来的额外复杂度。 self.config = { "enable": True, "record_all_plugins": True, # 是否记录所有插件的调用 "excluded_plugins": [], # 排除的插件列表 } - self.event_manager = EventManager.get_instance() self.db_manager = DBConnectionManager.get_instance() self.stats_db = StatsDBOperator(self.db_manager) - # 注册功能权限 - def initialize(self, config: Dict[str, Any]) -> bool: + def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" - if config: - self.config.update(config) + # 这里显式只读取插件自己的配置,不再把 system_context 整体 merge 进来: + # 1. 旧实现把 initialize 入参当成“插件配置”使用,但实际传入的是 system_context; + # 2. 结果会把 db_manager、redis_pool 等运行时对象误写到 self.config 中; + # 3. 改成只消费 load_config 后的 self._config,避免配置结构持续污染。 + if isinstance(self._config, dict): + self.config.update(self._config) + + # 若主系统已经初始化了 DB 管理器,则优先复用统一实例,避免插件侧再走兜底单例。 + if isinstance(context, dict) and context.get("db_manager") is not None: + self.db_manager = context.get("db_manager") + self.stats_db = StatsDBOperator(self.db_manager) if not self.config["enable"]: self.LOG.info("统计收集插件已禁用") return False - # 注册事件处理器 - self.event_manager.register(PluginCallStartEvent, self.handle_plugin_call_start) - self.event_manager.register(PluginCallEndEvent, self.handle_plugin_call_end) - self.event_manager.register(PluginCallErrorEvent, self.handle_plugin_error) - return True - def handle_plugin_call_start(self, event: PluginCallStartEvent) -> None: - """处理插件调用开始事件""" - # 检查是否需要记录该插件 - if not self._should_record_plugin(event.plugin_name): + def record_plugin_call( + self, + *, + plugin_name: str, + command: str, + user_id: str, + group_id: Optional[str], + is_group: bool, + process_result: bool, + process_time_ms: float, + ) -> None: + """由主链路直接调用,记录一次插件执行结果。""" + # 主链路可能会在高频场景下频繁回调这里,因此先做最便宜的开关判断, + # 避免对已关闭或被排除插件继续产生数据库写入成本。 + if not self._should_record_plugin(plugin_name): return - # 记录开始时间和相关信息 - self.LOG.debug(f"记录插件调用开始: {event.plugin_name} - {event.command}") - - def handle_plugin_call_end(self, event: PluginCallEndEvent) -> None: - """处理插件调用结束事件""" - # 检查是否需要记录该插件 - if not self._should_record_plugin(event.plugin_name): - return - - # 记录统计数据 try: - # 确保使用正确的属性名 self.stats_db.record_plugin_call( - plugin_name=event.plugin_name, - command=event.command, - user_id=event.user_id, - group_id=event.group_id, - success=event.process_result, - process_time_ms=event.process_time + plugin_name=plugin_name, + command=command, + user_id=user_id, + group_id=group_id if is_group else None, + success=process_result, + process_time_ms=process_time_ms, ) self.LOG.debug( - f"记录插件调用结束: {event.plugin_name} - {event.command} - 成功: {event.process_result} - 处理时间: {event.process_time}ms") + f"记录插件调用结束: {plugin_name} - {command} - 成功: {process_result} - 处理时间: {process_time_ms}ms" + ) except Exception as e: self.LOG.error(f"记录插件调用统计数据出错: {e}") - def handle_plugin_error(self, event: PluginCallErrorEvent) -> None: - """处理插件调用错误事件""" - # 检查是否需要记录该插件 - if not self._should_record_plugin(event.plugin_name): + def record_plugin_error( + self, + *, + plugin_name: str, + command: str, + user_id: str, + group_id: Optional[str], + is_group: bool, + error_message: str, + stack_trace: Optional[str] = None, + ) -> None: + """由主链路直接调用,记录一次插件执行异常。""" + if not self._should_record_plugin(plugin_name): return - # 记录错误信息 try: self.stats_db.record_error( - plugin_name=event.plugin_name, - command=event.command, - user_id=event.user_id, - group_id=event.group_id, - error_message=event.error_message, - stack_trace=event.stack_trace + plugin_name=plugin_name, + command=command, + user_id=user_id, + group_id=group_id if is_group else None, + error_message=error_message, + stack_trace=stack_trace, ) - self.LOG.debug(f"记录插件调用错误: {event.plugin_name} - {event.command} - {event.error_message}") + self.LOG.debug(f"记录插件调用错误: {plugin_name} - {command} - {error_message}") except Exception as e: self.LOG.error(f"记录插件错误信息出错: {e}") @@ -145,11 +155,6 @@ class StatsCollectorPlugin(PluginInterface): def shutdown(self) -> None: """关闭插件""" - # 取消注册事件处理器 - self.event_manager.unregister(PluginCallStartEvent, self.handle_plugin_call_start) - self.event_manager.unregister(PluginCallEndEvent, self.handle_plugin_call_end) - self.event_manager.unregister(PluginCallErrorEvent, self.handle_plugin_error) - self.LOG.info("统计收集插件已关闭") def start(self) -> bool: diff --git a/robot.py b/robot.py index eb6d404..6d22d35 100644 --- a/robot.py +++ b/robot.py @@ -3,13 +3,13 @@ import asyncio import threading import time import tomllib +import traceback from collections import deque import toml from loguru import logger import wechat_ipad -from base.plugin_common.event_system import EventType, EventSystem from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from base.plugin_common.plugin_manager import PluginManager @@ -98,13 +98,11 @@ class Robot: # 初始化插件系统 self.LOG.debug("开始初始化插件系统...") self.plugin_registry = PluginRegistry() - self.event_system = EventSystem() self.plugin_modules = {} # 存储已加载的插件模块 self.plugins = {} # 存储已加载的插件实例 # 设置插件系统上下文 self.system_context = { "config": config, - "event_system": self.event_system, "plugin_registry": self.plugin_registry, "db_manager": self.db_manager, "db_pool": self.db_pool, @@ -545,11 +543,8 @@ class Robot: except Exception as e: self.LOG.error(f"获取群成员信息失败: {e}") - # 发布消息接收事件 - self.event_system.publish(EventType.MESSAGE_RECEIVED, {"message": message}) - # 尝试使用插件处理消息 - plugin_processed = await self.process_plugin_message(message) + await self.process_plugin_message(message) if is_group: self.LOG.debug(f"入库和记录群消息: {message}") @@ -609,13 +604,22 @@ class Robot: if plugin.status != PluginStatus.RUNNING: continue + # 这里在进入插件前统一准备统计上下文: + # 1. 事件系统删除后,插件调用统计需要直接在主链路埋点; + # 2. 提前抽出 room_id / sender / command,后续无论成功还是异常都能复用; + # 3. 这样可以保证观测逻辑收口在一处,避免每个插件自己重复埋点。 + room_id = msg.roomid if msg.from_group() else "" + sender = msg.sender + command_name = self._extract_plugin_command(msg) + started_at = time.perf_counter() + try: # 转换消息为插件可处理的格式 plugin_msg = { "type": msg.msg_type, "content": msg.content.clean_content, - "sender": msg.sender, - "roomid": msg.roomid if msg.from_group() else "", + "sender": sender, + "roomid": room_id, "is_at": msg.is_at(self.wxid), "timestamp": time.time(), "all_contacts": self.allContacts, @@ -628,18 +632,114 @@ class Robot: # 检查插件是否可以处理该消息 if plugin.can_process(plugin_msg): processed, _ = await plugin.process_message(plugin_msg) + self._record_plugin_call_result( + plugin=plugin, + msg=msg, + command_name=command_name, + # 这里把“无异常执行完成”视为统计意义上的成功: + # 1. 很多插件返回 False 只是表示“本次不拦截”或“异步排队后继续放行”; + # 2. 若直接把 processed=False 记成失败,会把成功率统计严重拉低; + # 3. 真正的失败已经会走异常分支,因此统计层这里按“未抛错即成功”更合理。 + process_result=True, + process_time_ms=self._elapsed_ms(started_at), + ) if processed: - # 发布消息处理事件 - self.event_system.publish(EventType.MESSAGE_PROCESSED, { - "message": msg, - "plugin": plugin.name - }) return True except Exception as e: + self._record_plugin_call_error( + plugin=plugin, + msg=msg, + command_name=command_name, + error=e, + ) self.LOG.error(f"插件 {plugin.name} 处理消息失败: {e}") return False + @staticmethod + def _elapsed_ms(started_at: float) -> float: + """把 monotonic 起始时间转换为毫秒耗时。""" + return round((time.perf_counter() - started_at) * 1000, 2) + + @staticmethod + def _extract_plugin_command(msg: WxMessage) -> str: + """尽力从消息内容中提取一个可读的“触发命令”。""" + # 这里不追求把所有命令解析得非常精确,只要能满足后台统计可读性即可: + # 1. 文本消息优先取第一段词,避免把整句长文本都记成 command; + # 2. 非文本消息统一落到消息类型名,便于区分“文本触发”和“链接触发”等场景; + # 3. 空内容时返回通用占位,避免统计表出现 NULL / 空字符串。 + raw_content = str(getattr(getattr(msg, "content", None), "clean_content", "") or "").strip() + if raw_content: + first_token = raw_content.split()[0].strip() + return first_token[:50] if first_token else "[文本消息]" + msg_type = getattr(getattr(msg, "msg_type", None), "name", "") + return f"[{msg_type or 'UNKNOWN'}]" + + def _get_stats_collector_plugin(self): + """获取运行中的统计收集插件实例。""" + # 统计插件已经从“事件订阅”切到“主链路直接回调”, + # 因此每次埋点前都需要安全地确认插件实例是否存在且处于运行态。 + plugin = self.plugin_manager.plugins.get("指令记录") + if not plugin: + return None + if getattr(plugin, "status", None) != PluginStatus.RUNNING: + return None + return plugin + + def _record_plugin_call_result( + self, + *, + plugin, + msg: WxMessage, + command_name: str, + process_result: bool, + process_time_ms: float, + ) -> None: + """将插件执行结果直接写入统计插件。""" + stats_plugin = self._get_stats_collector_plugin() + if not stats_plugin or not hasattr(stats_plugin, "record_plugin_call"): + return + + try: + stats_plugin.record_plugin_call( + plugin_name=plugin.name, + command=command_name, + user_id=msg.sender, + group_id=msg.roomid if msg.from_group() else None, + is_group=msg.from_group(), + process_result=process_result, + process_time_ms=process_time_ms, + ) + except Exception as stats_error: + self.LOG.error(f"记录插件调用统计失败: plugin={plugin.name}, error={stats_error}") + + def _record_plugin_call_error( + self, + *, + plugin, + msg: WxMessage, + command_name: str, + error: Exception, + ) -> None: + """将插件执行异常直接写入统计插件。""" + stats_plugin = self._get_stats_collector_plugin() + if not stats_plugin or not hasattr(stats_plugin, "record_plugin_error"): + return + + try: + stats_plugin.record_plugin_error( + plugin_name=plugin.name, + command=command_name, + user_id=msg.sender, + group_id=msg.roomid if msg.from_group() else None, + is_group=msg.from_group(), + error_message=str(error), + # 这里保留完整堆栈,便于后台直接查看异常上下文,而不必只看摘要日志。 + stack_trace=traceback.format_exc(), + ) + except Exception as stats_error: + self.LOG.error(f"记录插件异常统计失败: plugin={plugin.name}, error={stats_error}") + @staticmethod def _sort_message_plugins(message_plugins): """将兜底型插件放到最后执行,避免影响其他插件命中。"""