from typing import Dict, Any, Tuple, Optional, List from base.plugin_common.plugin_interface import PluginInterface from wechat_ipad import WechatAPIClient from utils.robot_cmd.robot_command import Feature class MessagePluginInterface(PluginInterface): """消息处理插件接口""" @staticmethod def normalize_message_dispatch_mode(raw_mode: Any) -> str: """把插件声明的消息分发模式标准化为 `sync` 或 `background`。 设计说明: 1. `sync` 表示沿用当前主链路同步执行,插件会占用当前消息处理协程直到完成; 2. `background` 表示命中后立即转入后台任务池,主消息链路尽快释放,不再占用前台并发槽位; 3. 这里集中做别名兼容,后续插件只需要写 `background/async/queue` 这类语义值即可。 """ mode = str(raw_mode or "").strip().lower() if mode in {"background", "async", "queued", "queue", "detached"}: return "background" return "sync" @property def command_prefix(self) -> Optional[str]: """命令前缀,如 '/'""" return None @property def commands(self) -> List[str]: """支持的命令列表""" return [] @property def feature_key(self) -> Optional[str]: """插件对应的功能权限键名""" return None @property def feature_description(self) -> Optional[str]: """插件对应的功能权限描述""" return None def register_feature(self) -> Optional[Feature]: """注册插件功能权限 Returns: Feature: 注册的功能权限枚举,如果不需要权限则返回None """ if self.feature_key and self.feature_description: return Feature.register_feature(self.feature_key, self.feature_description) return None # 需要完成jobs 的业务,所以完成bot注入 def set_bot(self, bot: WechatAPIClient) -> None: self.bot: WechatAPIClient = bot def can_process(self, message: Dict[str, Any]) -> bool: """ 检查插件是否可以处理该消息 Args: message: 消息字典,包含消息的各种属性 Returns: 是否可以处理 """ # 默认实现:检查是否是命令 if self.command_prefix and self.commands: content = message.get("content", "") if content.startswith(self.command_prefix): command = content[len(self.command_prefix):].split()[0] return command in self.commands return False def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """ 处理消息 Args: message: 消息字典,包含消息的各种属性,以及发送消息所需的对象 - wcf: WcfAPI对象,可用于发送消息 - message_util: 消息工具类,提供更高级的消息处理功能 Returns: (是否已处理, 处理结果) """ raise NotImplementedError("子类必须实现此方法") def get_message_dispatch_mode(self, message: Dict[str, Any]) -> str: """返回当前消息应采用的执行模式。 默认行为: 1. 优先读取插件自身 `config.toml` 里的 `[runtime] message_dispatch_mode`; 2. 若未配置,则回退为 `sync`,保持历史行为不变; 3. 长任务插件如果需要“按命令动态切换前后台”,可以在子类中覆盖本方法。 """ plugin_config = getattr(self, "_config", {}) or {} runtime_config = plugin_config.get("runtime", {}) if isinstance(plugin_config, dict) else {} runtime_config = runtime_config if isinstance(runtime_config, dict) else {} raw_mode = runtime_config.get("message_dispatch_mode") or runtime_config.get("dispatch_mode") or "sync" return self.normalize_message_dispatch_mode(raw_mode) def get_message_process_timeout_seconds(self, message: Dict[str, Any]) -> Optional[int]: """返回当前消息建议使用的插件总超时秒数。 默认行为: 1. 返回 `None`,表示继续沿用插件配置或机器人侧的自动推断逻辑; 2. 适合“同一个插件里既有轻命令,也有重命令”的场景,避免所有命令共用同一个超时; 3. 子类若需要按命令动态放宽超时,可覆盖本方法并返回正整数秒数。 """ return None # ---------------- 插件定时调度能力(可选实现) ---------------- def get_schedule_actions(self) -> List[Dict[str, Any]]: """返回插件支持的可调度动作定义列表。 每项示例: { "action_key": "daily_push", "name": "每日推送", "description": "给目标群发送每日内容", "trigger_type": "at_times", "trigger_config": {"time_list": ["09:00"]}, "target_scope": "all_enabled_groups", "target_config": {}, "payload": {}, "default_enabled": False } """ return [] async def run_scheduled_action(self, action_key: str, context: Dict[str, Any]) -> Dict[str, Any]: """执行调度动作(插件可覆盖)。 Returns: dict: {"success": bool, "summary": str, "detail": dict} """ return { "success": False, "summary": f"插件未实现调度动作: {action_key}", "detail": {"action_key": action_key}, }