75 lines
2.4 KiB
Python
75 lines
2.4 KiB
Python
from typing import Dict, Any, Tuple, Optional, List
|
||
from base.plugin_common.plugin_interface import PluginInterface
|
||
from wechat_ipad import WechatAPIClient
|
||
from utils.robot_cmd.robot_command import Feature
|
||
|
||
|
||
class MessagePluginInterface(PluginInterface):
|
||
"""消息处理插件接口"""
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
"""命令前缀,如 '/'"""
|
||
return None
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
"""支持的命令列表"""
|
||
return []
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
"""插件对应的功能权限键名"""
|
||
return None
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
"""插件对应的功能权限描述"""
|
||
return None
|
||
|
||
def register_feature(self) -> Optional[Feature]:
|
||
"""注册插件功能权限
|
||
|
||
Returns:
|
||
Feature: 注册的功能权限枚举,如果不需要权限则返回None
|
||
"""
|
||
if self.feature_key and self.feature_description:
|
||
return Feature.register_feature(self.feature_key, self.feature_description)
|
||
return None
|
||
|
||
# 需要完成jobs 的业务,所以完成bot注入
|
||
def set_bot(self, bot: WechatAPIClient) -> None:
|
||
self.bot: WechatAPIClient = bot
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""
|
||
检查插件是否可以处理该消息
|
||
|
||
Args:
|
||
message: 消息字典,包含消息的各种属性
|
||
|
||
Returns:
|
||
是否可以处理
|
||
"""
|
||
# 默认实现:检查是否是命令
|
||
if self.command_prefix and self.commands:
|
||
content = message.get("content", "")
|
||
if content.startswith(self.command_prefix):
|
||
command = content[len(self.command_prefix):].split()[0]
|
||
return command in self.commands
|
||
return False
|
||
|
||
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""
|
||
处理消息
|
||
|
||
Args:
|
||
message: 消息字典,包含消息的各种属性,以及发送消息所需的对象
|
||
- wcf: WcfAPI对象,可用于发送消息
|
||
- message_util: 消息工具类,提供更高级的消息处理功能
|
||
|
||
Returns:
|
||
(是否已处理, 处理结果)
|
||
"""
|
||
raise NotImplementedError("子类必须实现此方法")
|