Files
abot/plugins/message_recall/main.py
2025-04-30 13:22:33 +08:00

143 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from loguru import logger
from typing import Dict, Any, List, Tuple, Optional
from message_util import MessageUtil
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import GroupBotManager
class MessageRecallPlugin(MessagePluginInterface):
"""消息撤回功能插件"""
@property
def name(self) -> str:
return "消息撤回"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "允许管理员撤回机器人在群聊中发送的消息"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
self.event_system = context.get("event_system")
self.message_util: MessageUtil = context.get("message_util")
# 从配置中获取命令和启用状态
self._commands = self._config.get("MessageRecall", {}).get("command", ["撤回"])
# 希望加入撤回最近3条的功能,将最近的3条发言撤回,加上群信息
self.command_format = self._config.get("MessageRecall", {}).get("command-format", "撤回 3")
self.enable = self._config.get("MessageRecall", {}).get("enable", True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
# 检查是否是撤回命令
for cmd in self._commands:
if content.startswith(cmd):
return True
return False
@plugin_stats_decorator(plugin_name="消息撤回")
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
# 检查是否是管理员
admin_list = GroupBotManager.get_admin_list()
self.LOG.info(f"admin_list={admin_list}")
# if sender not in admin_list:
# self.message_util.send_text("⚠️ 权限不足,只有管理员才能撤回消息",
# (roomid if roomid else sender), sender)
# return True, "权限不足"
# 解析命令获取消息ID
parts = content.split(" ", 1)
if len(parts) < 2:
self.message_util.send_text(f"❌命令格式错误!\n{self.command_format}",
(roomid if roomid else sender), sender)
return True, "命令格式错误"
#
# try:
# # 从数据库里面提取可以处理的消息and StrTalker ={roomid}
# sql = (f"SELECT * FROM MSG where IsSender=1 and CreateTime > (strftime('%s', 'now') - 120) "
# f"limit {parts[1]}")
# data = self.message_util.query_sql('MSG0.db', sql)
# self.LOG.info(f"SQL:{sql}\n 查询到可撤回数据: {data}")
# if not data:
# self.message_util.send_text("❌ 没有可撤回的消息", (roomid if roomid else sender), sender)
# return True, "没有可撤回的消息"
# for item in data:
# if item["MsgSvrID"]:
# # 调用撤回消息API
# result = self.message_util.revoke_msg(item["MsgSvrID"])
# if result:
# self.message_util.send_text("✅ 消息撤回成功", (roomid if roomid else sender), sender)
# return True, "撤回成功"
# else:
# self.message_util.send_text("❌ 消息撤回失败可能是消息ID无效或已超过撤回时间限制2分钟",
# (roomid if roomid else sender), sender)
# return True, "撤回失败"
#
# except Exception as e:
# self.LOG.error(f"撤回消息出错: {e}")
# self.message_util.send_text(f"❌ 撤回消息出错: {str(e)}", (roomid if roomid else sender), sender)
# return True, f"处理出错: {e}"
def get_help(self) -> str:
"""获取插件帮助信息"""
return """消息撤回插件:
功能:允许管理员撤回机器人在群聊中发送的消息
用法:#撤回 3
说明:
1. 只有管理员可以使用此功能
2. 消息ID需要从日志或其他方式获取
3. 微信限制只能撤回2分钟内的消息"""