调整指令信息

This commit is contained in:
liuwei
2025-03-18 17:53:25 +08:00
parent 1d7ee9f953
commit 345439be90
4 changed files with 121 additions and 154 deletions

View File

@@ -2,17 +2,20 @@ import logging
from typing import Dict, List, Type, Callable, Any
from threading import Lock
class Event:
"""事件基类"""
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
class EventManager:
"""事件管理器,单例模式"""
_instance = None
_lock = Lock()
@classmethod
def get_instance(cls):
"""获取事件管理器实例"""
@@ -21,39 +24,39 @@ class EventManager:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self):
if EventManager._instance is not None:
raise RuntimeError("EventManager 是单例类,请使用 get_instance() 方法获取实例")
self.handlers: Dict[Type[Event], List[Callable]] = {}
self.logger = logging.getLogger("EventManager")
def register(self, event_type: Type[Event], handler: Callable) -> None:
"""注册事件处理器"""
if event_type not in self.handlers:
self.handlers[event_type] = []
if handler not in self.handlers[event_type]:
self.handlers[event_type].append(handler)
self.logger.debug(f"注册事件处理器: {event_type.__name__} -> {handler.__name__}")
def unregister(self, event_type: Type[Event], handler: Callable) -> None:
"""取消注册事件处理器"""
if event_type in self.handlers and handler in self.handlers[event_type]:
self.handlers[event_type].remove(handler)
self.logger.debug(f"取消注册事件处理器: {event_type.__name__} -> {handler.__name__}")
def publish(self, event_type: Type[Event], event_data: Dict[str, Any] = None) -> None:
"""发布事件"""
if event_data is None:
event_data = {}
event = event_type(**event_data)
if event_type in self.handlers:
for handler in self.handlers[event_type]:
try:
handler(event)
except Exception as e:
self.logger.error(f"事件处理器 {handler.__name__} 处理 {event_type.__name__} 事件出错: {e}")
self.logger.error(f"事件处理器 {handler.__name__} 处理 {event_type.__name__} 事件出错: {e}")

View File

@@ -1,74 +1,58 @@
from dataclasses import dataclass
from typing import Dict, Any, Optional
from datetime import datetime
from event_system.event_manager import Event
from typing import Optional, Dict, Any
@dataclass
class PluginCallStartEvent(Event):
"""插件调用开始事件"""
def __init__(self, plugin_name: str, command: str, user_id: str,
group_id: Optional[str] = None, **kwargs):
"""
Args:
plugin_name: 插件名称
command: 触发的命令
user_id: 用户ID
group_id: 群组ID私聊为None
"""
super().__init__(
plugin_name=plugin_name,
command=command,
user_id=user_id,
group_id=group_id,
**kwargs
)
plugin_name: str # 插件名称
command: str # 触发的命令
full_command: str # 完整命令内容
user_id: str # 用户ID
group_id: Optional[str] = None # 群组ID私聊为None
is_group: bool = False # 是否群聊
message: Dict[str, Any] = None # 原始消息内容
timestamp: datetime = None # 事件时间戳
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
@dataclass
class PluginCallEndEvent(Event):
"""插件调用结束事件"""
def __init__(self, plugin_name: str, command: str, user_id: str,
group_id: Optional[str], success: bool,
process_time_ms: float, result: Any = None, **kwargs):
"""
Args:
plugin_name: 插件名称
command: 触发的命令
user_id: 用户ID
group_id: 群组ID私聊为None
success: 是否调用成功
process_time_ms: 处理时间(毫秒)
result: 处理结果
"""
super().__init__(
plugin_name=plugin_name,
command=command,
user_id=user_id,
group_id=group_id,
success=success,
process_time_ms=process_time_ms,
result=result,
**kwargs
)
plugin_name: str # 插件名称
command: str # 触发的命令
user_id: str # 用户ID
group_id: Optional[str] = None # 群组ID私聊为None
is_group: bool = False # 是否群聊
process_result: bool = True # 处理结果True成功False失败
result_message: Optional[str] = None # 处理结果消息
process_time: int = 0 # 处理耗时(毫秒)
timestamp: datetime = None # 事件时间戳
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
class PluginErrorEvent(Event):
"""插件错误事件"""
def __init__(self, plugin_name: str, command: str, user_id: str,
group_id: Optional[str], error_message: str,
stack_trace: Optional[str] = None, **kwargs):
"""
Args:
plugin_name: 插件名称
command: 触发的命令
user_id: 用户ID
group_id: 群组ID私聊为None
error_message: 错误信息
stack_trace: 堆栈跟踪
"""
super().__init__(
plugin_name=plugin_name,
command=command,
user_id=user_id,
group_id=group_id,
error_message=error_message,
stack_trace=stack_trace,
**kwargs
)
@dataclass
class PluginCallErrorEvent(Event):
"""插件调用错误事件"""
plugin_name: str # 插件名称
command: str # 触发的命令
user_id: str # 用户ID
group_id: Optional[str] = None # 群组ID私聊为None
is_group: bool = False # 是否群聊
error_message: str # 错误信息
stack_trace: Optional[str] = None # 堆栈跟踪
process_time: int = 0 # 处理耗时(毫秒)
timestamp: datetime = None # 事件时间戳
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()

View File

@@ -1,40 +0,0 @@
from dataclasses import dataclass
from typing import Optional
from datetime import datetime
from event_system.event import Event
@dataclass
class PluginCallStartEvent(Event):
"""插件调用开始事件"""
plugin_name: str
command: str
user_id: str
group_id: Optional[str]
start_time: datetime
@dataclass
class PluginCallEndEvent(Event):
"""插件调用结束事件"""
plugin_name: str
command: str
user_id: str
group_id: Optional[str]
start_time: datetime
end_time: datetime
success: bool
response: Optional[str]
@dataclass
class PluginCallErrorEvent(Event):
"""插件调用错误事件"""
plugin_name: str
command: str
user_id: str
group_id: Optional[str]
start_time: datetime
error_message: str
stack_trace: Optional[str]

View File

@@ -1,97 +1,113 @@
import logging
import time
from typing import Dict, Any, Tuple, Optional
from typing import Dict, Any, Tuple, Optional, List
from datetime import datetime
from plugin_common.plugin_interface import PluginInterface
from event_system.event_manager import EventManager
# 修正导入,使用与装饰器相同的事件类型
# 使用正确的事件类型导入
from event_system.events.plugin_events import PluginCallStartEvent, PluginCallEndEvent, PluginCallErrorEvent
# 数据库导入
from db.stats_db import StatsDBOperator
from db.db_manager import DBConnectionManager
from job_decorators import register_job_decorator
from .decorators import plugin_stats_decorator
from db.connection import DBConnectionManager
class StatsCollectorPlugin(PluginInterface):
"""统计收集插件"""
@property
def name(self) -> str:
return "command collector"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "群聊指令数据记录"
@property
def author(self) -> str:
return "Liuwei"
@property
def command_prefix(self) -> Optional[str]:
return "#"
@property
def commands(self) -> List[str]:
return []
def __init__(self):
self.name = "统计收集器"
self.version = "1.0.0"
self.description = "收集插件调用统计数据"
self.author = "Trae AI"
self.logger = logging.getLogger("StatsCollector")
# 默认配置
self.config = {
"enable": True,
"record_all_plugins": True, # 是否记录所有插件的调用
"excluded_plugins": [], # 排除的插件列表
"excluded_plugins": [], # 排除的插件列表
}
self.event_manager = EventManager.get_instance()
# 修正获取数据库连接管理器的方式
self.db_manager = DBConnectionManager.get_instance()
self.db_manager = DBConnectionManager()
self.stats_db = StatsDBOperator(self.db_manager)
# 用于临时存储插件调用开始时间的字典
self.plugin_call_start_times = {}
def initialize(self, config: Dict[str, Any]) -> bool:
"""初始化插件"""
if config:
self.config.update(config)
if not self.config["enable"]:
self.logger.info("统计收集插件已禁用")
return False
# 注册事件处理器
self.event_manager.register(PluginCallStartEvent, self.handle_plugin_call_start)
self.event_manager.register(PluginCallEndEvent, self.handle_plugin_call_end)
self.event_manager.register(PluginCallErrorEvent, self.handle_plugin_error)
self.logger.info("统计收集插件已初始化")
return True
def handle_plugin_call_start(self, event: PluginCallStartEvent) -> None:
"""处理插件调用开始事件"""
# 检查是否需要记录该插件
if not self._should_record_plugin(event.plugin_name):
return
# 记录开始时间和相关信息
# 注意plugin_events.py 中的事件结构与 stats_events.py 不同
self.logger.debug(f"记录插件调用开始: {event.plugin_name} - {event.command}")
def handle_plugin_call_end(self, event: PluginCallEndEvent) -> None:
"""处理插件调用结束事件"""
# 检查是否需要记录该插件
if not self._should_record_plugin(event.plugin_name):
return
# 记录统计数据
try:
# 确保使用正确的属性名
self.stats_db.record_plugin_call(
plugin_name=event.plugin_name,
command=event.command,
user_id=event.user_id,
group_id=event.group_id,
success=event.process_result, # 注意字段名不同
process_time_ms=event.process_time # 注意字段名不同
success=event.process_result,
process_time_ms=event.process_time
)
self.logger.debug(f"记录插件调用结束: {event.plugin_name} - {event.command} - 成功: {event.process_result} - 处理时间: {event.process_time}ms")
self.logger.debug(
f"记录插件调用结束: {event.plugin_name} - {event.command} - 成功: {event.process_result} - 处理时间: {event.process_time}ms")
except Exception as e:
self.logger.error(f"记录插件调用统计数据出错: {e}")
def handle_plugin_error(self, event: PluginCallErrorEvent) -> None:
"""处理插件调用错误事件"""
# 检查是否需要记录该插件
if not self._should_record_plugin(event.plugin_name):
return
# 记录错误信息
try:
self.stats_db.record_error(
@@ -105,32 +121,36 @@ class StatsCollectorPlugin(PluginInterface):
self.logger.debug(f"记录插件调用错误: {event.plugin_name} - {event.command} - {event.error_message}")
except Exception as e:
self.logger.error(f"记录插件错误信息出错: {e}")
def _should_record_plugin(self, plugin_name: str) -> bool:
"""检查是否应该记录该插件的调用"""
if not self.config["record_all_plugins"]:
return False
if plugin_name in self.config["excluded_plugins"]:
return False
# 不记录自身的调用
if plugin_name == self.name:
return False
return True
def match_command(self, content: str) -> bool:
"""匹配命令"""
# 该插件不处理用户消息
return False
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, str]:
"""处理消息"""
# 该插件不处理用户消息
return False, ""
def shutdown(self) -> None:
"""关闭插件"""
# 取消注册事件处理器
self.event_manager.unregister(PluginCallStartEvent, self.handle_plugin_call_start)
self.event_manager.unregister(PluginCallEndEvent, self.handle_plugin_call_end)
self.event_manager.unregister(PluginCallErrorEvent, self.handle_plugin_error)
self.logger.info("统计收集插件已关闭")
self.logger.info("统计收集插件已关闭")