加入群管理员维护能力

This commit is contained in:
liuwei
2026-01-21 17:32:37 +08:00
parent 8e00c0078c
commit f3fe625ceb
2 changed files with 132 additions and 2 deletions

View File

@@ -1,6 +1,8 @@
from typing import Dict, Any, List, Optional, Tuple from typing import Dict, Any, List, Optional, Tuple
from loguru import logger from loguru import logger
import re
import xml.etree.ElementTree as ET
from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus from base.plugin_common.plugin_interface import PluginStatus
@@ -8,6 +10,7 @@ from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.decorator.points_decorator import plugin_points_cost from utils.decorator.points_decorator import plugin_points_cost
from utils.revoke.message_auto_revoke import MessageAutoRevoke from utils.revoke.message_auto_revoke import MessageAutoRevoke
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.wechat.contact_manager import ContactManager
from wechat_ipad import WechatAPIClient from wechat_ipad import WechatAPIClient
@@ -148,6 +151,19 @@ class RobotMenuPlugin(MessagePluginInterface):
return "当前没有启用机器人的群组" return "当前没有启用机器人的群组"
return "\n".join(group_list) return "\n".join(group_list)
def at_list(self, xml):
try:
root = ET.fromstring(xml)
atuserlist_element = root.find('.//atuserlist')
atuserlist_content = (atuserlist_element.text if atuserlist_element is not None else '').strip()
atuserlist_content_no_commas = atuserlist_content.strip(',')
atuserlist_content_no_commas = re.sub(r'\s+', '', atuserlist_content_no_commas)
atuserlist_set = set(atuserlist_content_no_commas.split(','))
atuserlist_set.discard('')
return atuserlist_set
except ET.ParseError:
return set()
@plugin_stats_decorator(plugin_name="机器人菜单") @plugin_stats_decorator(plugin_name="机器人菜单")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息""" """处理消息"""
@@ -159,6 +175,8 @@ class RobotMenuPlugin(MessagePluginInterface):
gbm: GroupBotManager = message.get("gbm") gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot") bot: WechatAPIClient = message.get("bot")
revoke: MessageAutoRevoke = message.get("revoke") revoke: MessageAutoRevoke = message.get("revoke")
wx_msg = message.get("full_wx_msg")
xml = wx_msg.msg_source if wx_msg else ""
if not bot: if not bot:
self.LOG.error("WechatAPIClient 未初始化") self.LOG.error("WechatAPIClient 未初始化")
@@ -203,7 +221,8 @@ class RobotMenuPlugin(MessagePluginInterface):
# 格式:菜单 启用 功能名 或 菜单 关闭 功能名 # 格式:菜单 启用 功能名 或 菜单 关闭 功能名
if len(parts) >= 3 and parts[1] in ["启用", "关闭"]: if len(parts) >= 3 and parts[1] in ["启用", "关闭"]:
# 检查管理员权限 # 检查管理员权限
if not GroupBotManager.is_admin(sender): group_id_for_perm = roomid if roomid else sender
if not GroupBotManager.is_admin_for_group(sender, group_id_for_perm):
await bot.send_at_message(target, "❌权限不足,只有管理员可以管理功能", [sender]) await bot.send_at_message(target, "❌权限不足,只有管理员可以管理功能", [sender])
return True, "权限不足" return True, "权限不足"
@@ -241,6 +260,70 @@ class RobotMenuPlugin(MessagePluginInterface):
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "处理功能命令" return True, "处理功能命令"
if len(parts) >= 2 and parts[1] == "管理员":
if not GroupBotManager.is_admin(sender):
await bot.send_at_message(target, "❌权限不足,只有全局管理员可以管理群管理员", [sender])
return True, "权限不足"
group_id = roomid if roomid else sender
if len(parts) < 3:
help_text = "管理员命令:菜单 管理员 添加 wxid | 菜单 管理员 删除 wxid | 菜单 管理员 列表"
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "管理员命令帮助"
admin_action = parts[2]
if admin_action == "添加" and len(parts) >= 4:
at_users = self.at_list(xml) if xml else set()
if len(at_users) == 1:
new_admin_id = next(iter(at_users))
GroupBotManager.add_group_admin(group_id, new_admin_id)
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, "已添加群管理员", sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "添加群管理员"
candidate = parts[3]
resolved_id = candidate
if roomid:
members = ContactManager.get_instance().get_group_members(roomid) or {}
for wxid, nick in members.items():
if nick == candidate:
resolved_id = wxid
break
GroupBotManager.add_group_admin(group_id, resolved_id)
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, "已添加群管理员", sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "添加群管理员"
if admin_action == "删除" and len(parts) >= 4:
at_users = self.at_list(xml) if xml else set()
if len(at_users) == 1:
del_admin_id = next(iter(at_users))
ok = GroupBotManager.remove_group_admin(group_id, del_admin_id)
msg = "已删除群管理员" if ok else "该管理员不在清单中"
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, msg, sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "删除群管理员"
candidate = parts[3]
resolved_id = candidate
if roomid:
members = ContactManager.get_instance().get_group_members(roomid) or {}
for wxid, nick in members.items():
if nick == candidate:
resolved_id = wxid
break
ok = GroupBotManager.remove_group_admin(group_id, resolved_id)
msg = "已删除群管理员" if ok else "该管理员不在清单中"
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, msg, sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "删除群管理员"
if admin_action in ["列表", "清单"]:
admins = GroupBotManager.get_group_admins(group_id)
list_text = "群管理员列表为空" if not admins else "\n".join(admins)
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, list_text, sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "列出群管理员"
help_text = "管理员命令:菜单 管理员 添加 wxid | 菜单 管理员 删除 wxid | 菜单 管理员 列表"
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "管理员命令帮助"
# 如果是其他未知命令,显示帮助 # 如果是其他未知命令,显示帮助
help_text = f"❌命令格式错误!\n{self.command_format}" help_text = f"❌命令格式错误!\n{self.command_format}"
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender) client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender)

View File

@@ -142,7 +142,8 @@ class GroupBotManager:
# 本地缓存作为类级别静态属性 # 本地缓存作为类级别静态属性
local_cache = { local_cache = {
"group_permissions": {}, # 用于缓存群组功能权限 "group_permissions": {}, # 用于缓存群组功能权限
"group_list": set() # 用于缓存 group:list "group_list": set(), # 用于缓存 group:list
"group_admins": {} # 用于缓存群级管理员
} }
# 管理员列表缓存 # 管理员列表缓存
@@ -179,6 +180,9 @@ class GroupBotManager:
GroupBotManager.local_cache["group_permissions"][group_id][feature] = PermissionStatus(status_value) GroupBotManager.local_cache["group_permissions"][group_id][feature] = PermissionStatus(status_value)
else: else:
GroupBotManager.local_cache["group_permissions"][group_id][feature] = PermissionStatus.DISABLED GroupBotManager.local_cache["group_permissions"][group_id][feature] = PermissionStatus.DISABLED
admins_key = f'group:{group_id}:admins'
admins = r.smembers(admins_key) or set()
GroupBotManager.local_cache["group_admins"][group_id] = set(admins)
@staticmethod @staticmethod
def save_to_redis(): def save_to_redis():
@@ -192,6 +196,10 @@ class GroupBotManager:
key = f'group:{group_id}:permissions' key = f'group:{group_id}:permissions'
for feature, status in permissions.items(): for feature, status in permissions.items():
r.hset(key, feature.name, status.value) r.hset(key, feature.name, status.value)
for group_id, admins in GroupBotManager.local_cache["group_admins"].items():
admins_key = f'group:{group_id}:admins'
if admins:
r.sadd(admins_key, *list(admins))
@staticmethod @staticmethod
def set_group_permission(group_id, feature: Feature, status: PermissionStatus): def set_group_permission(group_id, feature: Feature, status: PermissionStatus):
@@ -209,6 +217,42 @@ class GroupBotManager:
# 输出保存到redis成功 # 输出保存到redis成功
logger.info(f"set_group_permission({group_id}, {feature.name}, {status.value})") logger.info(f"set_group_permission({group_id}, {feature.name}, {status.value})")
@staticmethod
def get_group_admins(group_id) -> List[str]:
if group_id in GroupBotManager.local_cache["group_admins"]:
return list(GroupBotManager.local_cache["group_admins"][group_id])
return []
@staticmethod
def is_group_admin(group_id, user_id: str) -> bool:
admins = GroupBotManager.local_cache["group_admins"].get(group_id, set())
return user_id in admins
@staticmethod
def add_group_admin(group_id, user_id: str) -> bool:
r = get_redis_connection()
if group_id not in GroupBotManager.local_cache["group_admins"]:
GroupBotManager.local_cache["group_admins"][group_id] = set()
GroupBotManager.local_cache["group_admins"][group_id].add(user_id)
r.sadd(f'group:{group_id}:admins', user_id)
return True
@staticmethod
def remove_group_admin(group_id, user_id: str) -> bool:
r = get_redis_connection()
admins = GroupBotManager.local_cache["group_admins"].get(group_id, set())
if user_id in admins:
admins.remove(user_id)
r.srem(f'group:{group_id}:admins', user_id)
return True
return False
@staticmethod
def is_admin_for_group(user_id: str, group_id: str) -> bool:
if GroupBotManager.is_admin(user_id):
return True
return GroupBotManager.is_group_admin(group_id, user_id)
@staticmethod @staticmethod
def get_group_permission(group_id, feature: Feature): def get_group_permission(group_id, feature: Feature):
"""获取群组某个功能的权限状态""" """获取群组某个功能的权限状态"""
@@ -333,10 +377,13 @@ class GroupBotManager:
GroupBotManager.local_cache["group_list"].remove(group_id) GroupBotManager.local_cache["group_list"].remove(group_id)
if group_id in GroupBotManager.local_cache["group_permissions"]: if group_id in GroupBotManager.local_cache["group_permissions"]:
del GroupBotManager.local_cache["group_permissions"][group_id] del GroupBotManager.local_cache["group_permissions"][group_id]
if group_id in GroupBotManager.local_cache["group_admins"]:
del GroupBotManager.local_cache["group_admins"][group_id]
# 从Redis中移除群组 # 从Redis中移除群组
r.srem("group:list", group_id) r.srem("group:list", group_id)
r.delete(f'group:{group_id}:permissions') r.delete(f'group:{group_id}:permissions')
r.delete(f'group:{group_id}:admins')
return f"已成功清除群 {group_id} 的所有设置" return f"已成功清除群 {group_id} 的所有设置"