# 群清单管理 # 群功能管理 # 0.加入或者关闭群机器人 #启用群机器人 #关闭群机器人 # 1.每日新闻自动播报 #启用每日新闻播报 #关闭每日新闻播报 # 2.每日群发言总结 #启用群发言 #关闭群发言 # 3.群AI能力 #启用群AI #关闭群AI # 4.群总结能力 #启用群总结 #关闭群总结 # 5.sehuatang PDF能力 #启用pdf #关闭pdf import redis import json from enum import Enum # 连接到本地 Redis 服务 r = redis.StrictRedis(host='192.168.2.32', port=6379, db=0, decode_responses=True) class PermissionStatus(Enum): """权限状态枚举""" ENABLED = "enabled" DISABLED = "disabled" class Feature(Enum): """功能权限枚举,带序号""" ROBOT = 1, "群机器人" DAILY_NEWS = 2, "每日新闻自动播报" DAILY_SUMMARY = 3, "每日群发言总结" AI_CAPABILITY = 4, "群AI能力" SUMMARY_CAPABILITY = 5, "群总结能力" PDF_CAPABILITY = 6, "sehuatang PDF能力" EPIC = 7, "EPIC自动播报" # 新增的功能 def __new__(cls, value, description): obj = object.__new__(cls) obj._value_ = value obj.description = description # 添加描述 return obj def __str__(self): return self.description class GroupBotManager: """群机器人管理,支持本地缓存""" # 本地缓存作为类级别静态属性 local_cache = { "group_permissions": {}, # 用于缓存群组功能权限 "group_list": set() # 用于缓存 group:list } @staticmethod def load_local_cache(): """从 Redis 加载数据到本地缓存""" group_list = r.smembers("group:list") GroupBotManager.local_cache["group_list"] = set(group_list) # 加载群组权限 for group_id in GroupBotManager.local_cache["group_list"]: key = f'group:{group_id}:permissions' GroupBotManager.local_cache["group_permissions"][group_id] = {} for feature in Feature: status_value = r.hget(key, feature.name) if status_value: GroupBotManager.local_cache["group_permissions"][group_id][feature] = PermissionStatus(status_value) else: GroupBotManager.local_cache["group_permissions"][group_id][feature] = PermissionStatus.DISABLED @staticmethod def save_to_redis(): """将本地缓存保存回 Redis""" # 保存 group:list 到 Redis r.sadd("group:list", *GroupBotManager.local_cache["group_list"]) # 保存每个群组的权限到 Redis for group_id, permissions in GroupBotManager.local_cache["group_permissions"].items(): key = f'group:{group_id}:permissions' for feature, status in permissions.items(): r.hset(key, feature.name, status.value) @staticmethod def set_group_permission(group_id, feature: Feature, status: PermissionStatus): """设置群组功能权限并更新本地缓存""" # 更新本地缓存 if group_id not in GroupBotManager.local_cache["group_permissions"]: GroupBotManager.local_cache["group_permissions"][group_id] = {} GroupBotManager.local_cache["group_permissions"][group_id][feature] = status # 同步到 Redis key = f'group:{group_id}:permissions' r.hset(key, feature.name, status.value) @staticmethod def get_group_permission(group_id, feature: Feature): """获取群组某个功能的权限状态""" # 先从本地缓存获取 if group_id in GroupBotManager.local_cache["group_permissions"]: return GroupBotManager.local_cache["group_permissions"][group_id].get(feature, PermissionStatus.DISABLED) else: return PermissionStatus.DISABLED @staticmethod def handle_command(group_id, command_str): """统一处理群功能指令""" print(f"PermissionStatus handle_command command_str: {command_str}") # 命令解析 command_parts = command_str.strip().split("-") # 如果是MENU指令,返回功能列表 if command_str.strip().upper() == "MENU": return GroupBotManager.display_menu() # 如果是GROUP_LIST指令,返回 group:list 清单 if command_str.strip().upper() == "GROUP_LIST": return GroupBotManager.get_group_list() if len(command_parts) < 2: return "无效命令,格式应为:功能-操作 或 序号-操作" feature_str = command_parts[0] action = command_parts[1] # 如果第一个参数是序号,则转化为对应的功能 if feature_str.isdigit(): feature_num = int(feature_str) try: feature = Feature(feature_num) # 使用枚举序号查找功能 except ValueError: return "无效的功能序号" else: try: feature = Feature[feature_str] # 通过枚举名称获取功能枚举 except KeyError: return "无效功能名称" # 处理群机器人的启用和关闭(特别操作:更新 group:list) if feature == Feature.ROBOT: if action == "启用": GroupBotManager.set_group_permission(group_id, feature, PermissionStatus.ENABLED) # 启用群机器人时,将 group_id 加入 group:list GroupBotManager.local_cache["group_list"].add(group_id) # 同步到 Redis r.sadd("group:list", group_id) return f"群机器人已启用,群组 {group_id} 已加入 group:list" elif action == "关闭": GroupBotManager.set_group_permission(group_id, feature, PermissionStatus.DISABLED) # 关闭群机器人时,从 group:list 中删除 group_id GroupBotManager.local_cache["group_list"].remove(group_id) # 同步到 Redis r.srem("group:list", group_id) return f"群机器人已关闭,群组 {group_id} 已从 group:list 移除" else: return "无效操作,仅支持启用或关闭" # 先检查群机器人权限 robot_status = GroupBotManager.get_group_permission(group_id, Feature.ROBOT) if robot_status != PermissionStatus.ENABLED and feature != Feature.ROBOT: return "群机器人未启用,无法执行其他功能操作" # 根据不同的操作启用或禁用功能 if action == "启用": GroupBotManager.set_group_permission(group_id, feature, PermissionStatus.ENABLED) return f"{feature.description} 已启用" elif action == "关闭": GroupBotManager.set_group_permission(group_id, feature, PermissionStatus.DISABLED) return f"{feature.description} 已关闭" else: return "无效操作,仅支持启用或关闭" @staticmethod def list_group_permissions(group_id): """列出群组所有功能及其状态""" permissions = {} for feature in Feature: status = GroupBotManager.get_group_permission(group_id, feature) permissions[feature] = status if status else PermissionStatus.DISABLED # 默认为禁用状态 return permissions @staticmethod def display_menu(): """显示所有功能列表及其当前状态""" menu = [] for feature in Feature: menu.append(f"{feature.value}. {feature.description}") return "\n".join(menu) @staticmethod def get_group_list(): """返回所有启用了群机器人的群组清单,格式为集合""" return list(GroupBotManager.local_cache["group_list"]) # 示例命令 def simulate_commands(): # 加载本地缓存 GroupBotManager.load_local_cache() group_id = "12345" # 启用群机器人 print(GroupBotManager.handle_command(group_id, "ROBOT-启用")) # 启用每日新闻自动播报 print(GroupBotManager.handle_command(group_id, "2-启用")) # 使用序号启用每日新闻自动播报 # 关闭群AI能力 print(GroupBotManager.handle_command(group_id, "4-关闭")) # 使用序号关闭群AI能力 # 启用群总结能力 print(GroupBotManager.handle_command(group_id, "5-启用")) # 使用序号启用群总结能力 # 关闭Sehuatang PDF能力 print(GroupBotManager.handle_command(group_id, "6-关闭")) # 使用序号关闭Sehuatang PDF能力 # 启用EPIC自动播报 print(GroupBotManager.handle_command(group_id, "7-启用")) # 使用序号启用EPIC自动播报 # 查看当前群组的功能权限 print(GroupBotManager.get_group_permission(group_id, Feature.ROBOT)) print(GroupBotManager.get_group_permission(group_id, Feature.DAILY_NEWS)) print(GroupBotManager.get_group_permission(group_id, Feature.AI_CAPABILITY)) print(GroupBotManager.get_group_permission(group_id, Feature.SUMMARY_CAPABILITY)) print(GroupBotManager.get_group_permission(group_id, Feature.PDF_CAPABILITY)) print(GroupBotManager.get_group_permission(group_id, Feature.EPIC)) # 查看群组所有功能和状态 permissions = GroupBotManager.list_group_permissions(group_id) for feature, status in permissions.items(): print(f"{feature.description} (序号: {feature.value}): {status.value}") # 查看 group:list 中的群组 print("当前启用群机器人的群组:", GroupBotManager.get_group_list()) # 查看菜单功能列表 print("功能列表:") print(GroupBotManager.handle_command(group_id, "MENU")) # 查看 group:list 清单 print("群组清单:") print(GroupBotManager.handle_command(group_id, "GROUP_LIST")) # 更新缓存 print(GroupBotManager.handle_command(group_id, "UPDATE")) # 保存到 Redis GroupBotManager.save_to_redis() # 执行模拟命令 # simulate_commands()