From f3fe625ceb1473e85d7f9b819ef90f08bd358a44 Mon Sep 17 00:00:00 2001 From: liuwei Date: Wed, 21 Jan 2026 17:32:37 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E7=BE=A4=E7=AE=A1=E7=90=86?= =?UTF-8?q?=E5=91=98=E7=BB=B4=E6=8A=A4=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugins/robot_menu/main.py | 85 +++++++++++++++++++++++++++++++- utils/robot_cmd/robot_command.py | 49 +++++++++++++++++- 2 files changed, 132 insertions(+), 2 deletions(-) diff --git a/plugins/robot_menu/main.py b/plugins/robot_menu/main.py index c35ceff..5c6753a 100644 --- a/plugins/robot_menu/main.py +++ b/plugins/robot_menu/main.py @@ -1,6 +1,8 @@ from typing import Dict, Any, List, Optional, Tuple from loguru import logger +import re +import xml.etree.ElementTree as ET from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus @@ -8,6 +10,7 @@ from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.revoke.message_auto_revoke import MessageAutoRevoke from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager +from utils.wechat.contact_manager import ContactManager from wechat_ipad import WechatAPIClient @@ -148,6 +151,19 @@ class RobotMenuPlugin(MessagePluginInterface): return "当前没有启用机器人的群组" return "\n".join(group_list) + def at_list(self, xml): + try: + root = ET.fromstring(xml) + atuserlist_element = root.find('.//atuserlist') + atuserlist_content = (atuserlist_element.text if atuserlist_element is not None else '').strip() + atuserlist_content_no_commas = atuserlist_content.strip(',') + atuserlist_content_no_commas = re.sub(r'\s+', '', atuserlist_content_no_commas) + atuserlist_set = set(atuserlist_content_no_commas.split(',')) + atuserlist_set.discard('') + return atuserlist_set + except ET.ParseError: + return set() + @plugin_stats_decorator(plugin_name="机器人菜单") async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" @@ -159,6 +175,8 @@ class RobotMenuPlugin(MessagePluginInterface): gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") revoke: MessageAutoRevoke = message.get("revoke") + wx_msg = message.get("full_wx_msg") + xml = wx_msg.msg_source if wx_msg else "" if not bot: self.LOG.error("WechatAPIClient 未初始化") @@ -203,7 +221,8 @@ class RobotMenuPlugin(MessagePluginInterface): # 格式:菜单 启用 功能名 或 菜单 关闭 功能名 if len(parts) >= 3 and parts[1] in ["启用", "关闭"]: # 检查管理员权限 - if not GroupBotManager.is_admin(sender): + group_id_for_perm = roomid if roomid else sender + if not GroupBotManager.is_admin_for_group(sender, group_id_for_perm): await bot.send_at_message(target, "❌权限不足,只有管理员可以管理功能", [sender]) return True, "权限不足" @@ -241,6 +260,70 @@ class RobotMenuPlugin(MessagePluginInterface): revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "处理功能命令" + if len(parts) >= 2 and parts[1] == "管理员": + if not GroupBotManager.is_admin(sender): + await bot.send_at_message(target, "❌权限不足,只有全局管理员可以管理群管理员", [sender]) + return True, "权限不足" + group_id = roomid if roomid else sender + if len(parts) < 3: + help_text = "管理员命令:菜单 管理员 添加 wxid | 菜单 管理员 删除 wxid | 菜单 管理员 列表" + client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender) + revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) + return True, "管理员命令帮助" + admin_action = parts[2] + if admin_action == "添加" and len(parts) >= 4: + at_users = self.at_list(xml) if xml else set() + if len(at_users) == 1: + new_admin_id = next(iter(at_users)) + GroupBotManager.add_group_admin(group_id, new_admin_id) + client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, "已添加群管理员", sender) + revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) + return True, "添加群管理员" + candidate = parts[3] + resolved_id = candidate + if roomid: + members = ContactManager.get_instance().get_group_members(roomid) or {} + for wxid, nick in members.items(): + if nick == candidate: + resolved_id = wxid + break + GroupBotManager.add_group_admin(group_id, resolved_id) + client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, "已添加群管理员", sender) + revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) + return True, "添加群管理员" + if admin_action == "删除" and len(parts) >= 4: + at_users = self.at_list(xml) if xml else set() + if len(at_users) == 1: + del_admin_id = next(iter(at_users)) + ok = GroupBotManager.remove_group_admin(group_id, del_admin_id) + msg = "已删除群管理员" if ok else "该管理员不在清单中" + client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, msg, sender) + revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) + return True, "删除群管理员" + candidate = parts[3] + resolved_id = candidate + if roomid: + members = ContactManager.get_instance().get_group_members(roomid) or {} + for wxid, nick in members.items(): + if nick == candidate: + resolved_id = wxid + break + ok = GroupBotManager.remove_group_admin(group_id, resolved_id) + msg = "已删除群管理员" if ok else "该管理员不在清单中" + client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, msg, sender) + revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) + return True, "删除群管理员" + if admin_action in ["列表", "清单"]: + admins = GroupBotManager.get_group_admins(group_id) + list_text = "群管理员列表为空" if not admins else "\n".join(admins) + client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, list_text, sender) + revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) + return True, "列出群管理员" + help_text = "管理员命令:菜单 管理员 添加 wxid | 菜单 管理员 删除 wxid | 菜单 管理员 列表" + client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender) + revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) + return True, "管理员命令帮助" + # 如果是其他未知命令,显示帮助 help_text = f"❌命令格式错误!\n{self.command_format}" client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender) diff --git a/utils/robot_cmd/robot_command.py b/utils/robot_cmd/robot_command.py index dc9da9c..c9092ab 100644 --- a/utils/robot_cmd/robot_command.py +++ b/utils/robot_cmd/robot_command.py @@ -142,7 +142,8 @@ class GroupBotManager: # 本地缓存作为类级别静态属性 local_cache = { "group_permissions": {}, # 用于缓存群组功能权限 - "group_list": set() # 用于缓存 group:list + "group_list": set(), # 用于缓存 group:list + "group_admins": {} # 用于缓存群级管理员 } # 管理员列表缓存 @@ -179,6 +180,9 @@ class GroupBotManager: GroupBotManager.local_cache["group_permissions"][group_id][feature] = PermissionStatus(status_value) else: GroupBotManager.local_cache["group_permissions"][group_id][feature] = PermissionStatus.DISABLED + admins_key = f'group:{group_id}:admins' + admins = r.smembers(admins_key) or set() + GroupBotManager.local_cache["group_admins"][group_id] = set(admins) @staticmethod def save_to_redis(): @@ -192,6 +196,10 @@ class GroupBotManager: key = f'group:{group_id}:permissions' for feature, status in permissions.items(): r.hset(key, feature.name, status.value) + for group_id, admins in GroupBotManager.local_cache["group_admins"].items(): + admins_key = f'group:{group_id}:admins' + if admins: + r.sadd(admins_key, *list(admins)) @staticmethod def set_group_permission(group_id, feature: Feature, status: PermissionStatus): @@ -209,6 +217,42 @@ class GroupBotManager: # 输出保存到redis成功 logger.info(f"set_group_permission({group_id}, {feature.name}, {status.value})") + @staticmethod + def get_group_admins(group_id) -> List[str]: + if group_id in GroupBotManager.local_cache["group_admins"]: + return list(GroupBotManager.local_cache["group_admins"][group_id]) + return [] + + @staticmethod + def is_group_admin(group_id, user_id: str) -> bool: + admins = GroupBotManager.local_cache["group_admins"].get(group_id, set()) + return user_id in admins + + @staticmethod + def add_group_admin(group_id, user_id: str) -> bool: + r = get_redis_connection() + if group_id not in GroupBotManager.local_cache["group_admins"]: + GroupBotManager.local_cache["group_admins"][group_id] = set() + GroupBotManager.local_cache["group_admins"][group_id].add(user_id) + r.sadd(f'group:{group_id}:admins', user_id) + return True + + @staticmethod + def remove_group_admin(group_id, user_id: str) -> bool: + r = get_redis_connection() + admins = GroupBotManager.local_cache["group_admins"].get(group_id, set()) + if user_id in admins: + admins.remove(user_id) + r.srem(f'group:{group_id}:admins', user_id) + return True + return False + + @staticmethod + def is_admin_for_group(user_id: str, group_id: str) -> bool: + if GroupBotManager.is_admin(user_id): + return True + return GroupBotManager.is_group_admin(group_id, user_id) + @staticmethod def get_group_permission(group_id, feature: Feature): """获取群组某个功能的权限状态""" @@ -333,10 +377,13 @@ class GroupBotManager: GroupBotManager.local_cache["group_list"].remove(group_id) if group_id in GroupBotManager.local_cache["group_permissions"]: del GroupBotManager.local_cache["group_permissions"][group_id] + if group_id in GroupBotManager.local_cache["group_admins"]: + del GroupBotManager.local_cache["group_admins"][group_id] # 从Redis中移除群组 r.srem("group:list", group_id) r.delete(f'group:{group_id}:permissions') + r.delete(f'group:{group_id}:admins') return f"已成功清除群 {group_id} 的所有设置"