from typing import Dict, Any, Tuple, Optional, List from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from plugins.member_context.service import MemberContextService from utils.decorator.async_job import async_job class MemberContextPlugin(MessagePluginInterface): """群成员交互摘要后台插件""" FEATURE_KEY = "MEMBER_CONTEXT_CAPABILITY" FEATURE_DESCRIPTION = "🧠 成员交互摘要 [后台AI提取,仅对启用群生效]" @property def name(self) -> str: return "成员交互摘要" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "为群成员生成后台交互摘要,并按群功能开关控制" @property def author(self) -> str: return "ABOT Team" @property def commands(self) -> List[str]: return [] @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.feature = self.register_feature() self.service: Optional[MemberContextService] = None self._job_registered = False def initialize(self, context: Dict[str, Any]) -> bool: self.LOG.debug(f"正在初始化 {self.name} 插件...") self.service = MemberContextService(context["db_manager"], self._config) refresh_times = self._config.get("schedule", {}).get("refresh_times", []) if refresh_times and not self._job_registered: @async_job.at_times(refresh_times) async def refresh_member_context_job(): if self.service: self.LOG.info("开始刷新成员交互摘要") self.service.refresh_all_chatrooms() self.LOG.info("成员交互摘要刷新完成") self._job_registered = True self.LOG.debug(f"{self.name} 插件初始化完成") return True def start(self) -> bool: self.LOG.debug(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: return False async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: return False, None