# 群清单管理 # 群功能管理 # 0.加入或者关闭群机器人 #启用群机器人 #关闭群机器人 # 1.每日新闻自动播报 #启用每日新闻播报 #关闭每日新闻播报 # 2.每日群发言总结 #启用群发言 #关闭群发言 # 3.群AI能力 #启用群AI #关闭群AI # 4.群总结能力 #启用群总结 #关闭群总结 # 5.sehuatang PDF能力 #启用pdf #关闭pdf import redis import json from enum import Enum # 连接到本地 Redis 服务 r = redis.StrictRedis(host='192.168.2.32', port=6379, db=0, decode_responses=True) class PermissionStatus(Enum): """权限状态枚举""" ENABLED = "enabled" DISABLED = "disabled" class Feature(Enum): """功能权限枚举,带序号""" ROBOT = 1, "群机器人" DAILY_NEWS = 2, "每日新闻自动播报" DAILY_SUMMARY = 3, "每日群发言总结" AI_CAPABILITY = 4, "群AI能力" SUMMARY_CAPABILITY = 5, "群总结能力" PDF_CAPABILITY = 6, "sehuatang PDF能力" EPIC = 7, "EPIC自动播报" # 新增的功能 PIC = 8, "图来能力" TASK_GAME = 9, "百科答题游戏" MUSIC = 10, "点歌功能" SIGNIN = 11, "签到功能" POINT_TRADE = 12, "积分赠送功能" BEAUTY_LEG = 13, "腿来能力" VIDEO = 14, "视频点播功能" def __new__(cls, value, description): obj = object.__new__(cls) obj._value_ = value obj.description = description # 添加描述 return obj def __str__(self): return self.description class GroupBotManager: """群机器人管理,支持本地缓存""" # 本地缓存作为类级别静态属性 local_cache = { "group_permissions": {}, # 用于缓存群组功能权限 "group_list": set() # 用于缓存 group:list } @staticmethod def display_menu_status(group_id): """显示所有功能列表及其在指定群组中的当前状态,带emoji""" menu = [] for feature in Feature: status = GroupBotManager.get_group_permission(group_id, feature) status_emoji = "✅" if status == PermissionStatus.ENABLED else "❌" status_str = "启用" if status == PermissionStatus.ENABLED else "关闭" menu.append(f"{status_emoji} {status_str}-{feature.value}-{feature.description}") return "\n".join(menu) @staticmethod def load_local_cache(): """从 Redis 加载数据到本地缓存""" group_list = r.smembers("group:list") GroupBotManager.local_cache["group_list"] = set(group_list) # 加载群组权限 for group_id in GroupBotManager.local_cache["group_list"]: key = f'group:{group_id}:permissions' GroupBotManager.local_cache["group_permissions"][group_id] = {} for feature in Feature: status_value = r.hget(key, feature.name) if status_value: GroupBotManager.local_cache["group_permissions"][group_id][feature] = PermissionStatus(status_value) else: GroupBotManager.local_cache["group_permissions"][group_id][feature] = PermissionStatus.DISABLED @staticmethod def save_to_redis(): """将本地缓存保存回 Redis""" # 保存 group:list 到 Redis r.sadd("group:list", *GroupBotManager.local_cache["group_list"]) # 保存每个群组的权限到 Redis for group_id, permissions in GroupBotManager.local_cache["group_permissions"].items(): key = f'group:{group_id}:permissions' for feature, status in permissions.items(): r.hset(key, feature.name, status.value) @staticmethod def set_group_permission(group_id, feature: Feature, status: PermissionStatus): """设置群组功能权限并更新本地缓存""" # 更新本地缓存 if group_id not in GroupBotManager.local_cache["group_permissions"]: GroupBotManager.local_cache["group_permissions"][group_id] = {} GroupBotManager.local_cache["group_permissions"][group_id][feature] = status # 同步到 Redis key = f'group:{group_id}:permissions' r.hset(key, feature.name, status.value) @staticmethod def get_group_permission(group_id, feature: Feature): """获取群组某个功能的权限状态""" # 先从本地缓存获取 if group_id in GroupBotManager.local_cache["group_permissions"]: return GroupBotManager.local_cache["group_permissions"][group_id].get(feature, PermissionStatus.DISABLED) else: return PermissionStatus.DISABLED @staticmethod def check_permission(group_id, feature: Feature): """检查某个功能是否启用,若未启用则返回提示信息""" status = GroupBotManager.get_group_permission(group_id, feature) if status == PermissionStatus.DISABLED: return f"该功能未启用,请开启 {feature.description}。" return None # 如果已启用,则返回 None,不做处理 @staticmethod def handle_command(group_id, command_str): """统一处理群功能指令""" print(f"PermissionStatus handle_command command_str: {command_str}") # 命令解析 command_parts = command_str.strip().split("-") # 如果是MENU指令,返回功能列表 if command_str.strip().upper() == "MENU": return GroupBotManager.display_menu() # 如果是MENU-STATUS指令,返回功能列表及其状态 if command_str.strip().upper() == "MENU_STATUS": return GroupBotManager.display_menu_status(group_id) # 如果是GROUP_LIST指令,返回 group:list 清单 if command_str.strip().upper() == "GROUP_LIST": return GroupBotManager.get_group_list() if len(command_parts) < 2: return "无效命令,格式应为:功能-操作 或 序号-操作" feature_str = command_parts[0] action = command_parts[1] # 如果第一个参数是序号,则转化为对应的功能 if feature_str.isdigit(): feature_num = int(feature_str) try: feature = Feature(feature_num) # 使用枚举序号查找功能 except ValueError: return "无效的功能序号" else: try: feature = Feature[feature_str] # 通过枚举名称获取功能枚举 except KeyError: return "无效功能名称" # 处理群机器人的启用和关闭(特别操作:更新 group:list) if feature == Feature.ROBOT: if action == "启用": GroupBotManager.set_group_permission(group_id, feature, PermissionStatus.ENABLED) # 启用群机器人时,将 group_id 加入 group:list GroupBotManager.local_cache["group_list"].add(group_id) # 同步到 Redis r.sadd("group:list", group_id) return f"BOT已启用,群组 {group_id} 已加入" elif action == "关闭": GroupBotManager.set_group_permission(group_id, feature, PermissionStatus.DISABLED) # 关闭群机器人时,从 group:list 中删除 group_id GroupBotManager.local_cache["group_list"].remove(group_id) # 同步到 Redis r.srem("group:list", group_id) return f"BOT已关闭,群组 {group_id} 已移除" else: return "无效操作,仅支持启用或关闭" # 先检查群机器人权限 robot_status = GroupBotManager.get_group_permission(group_id, Feature.ROBOT) if robot_status != PermissionStatus.ENABLED and feature != Feature.ROBOT: return "群机器人未启用,无法执行其他功能操作" # 根据不同的操作启用或禁用功能 if action == "启用": GroupBotManager.set_group_permission(group_id, feature, PermissionStatus.ENABLED) return f"{feature.description} 已启用" elif action == "关闭": GroupBotManager.set_group_permission(group_id, feature, PermissionStatus.DISABLED) return f"{feature.description} 已关闭" else: return "无效操作,仅支持启用或关闭" @staticmethod def list_group_permissions(group_id): """列出群组所有功能及其状态""" permissions = {} for feature in Feature: status = GroupBotManager.get_group_permission(group_id, feature) permissions[feature] = status if status else PermissionStatus.DISABLED # 默认为禁用状态 return permissions @staticmethod def display_menu(): """显示所有功能列表及其当前状态""" menu = [] for feature in Feature: menu.append(f"{feature.value}. {feature.description}") return "\n".join(menu) @staticmethod def get_group_list(): """返回所有启用了群机器人的群组清单,格式为集合""" return list(GroupBotManager.local_cache["group_list"]) # 示例命令 def simulate_commands(): # 加载本地缓存 GroupBotManager.load_local_cache() group_id = "49571962306@chatroom" print(GroupBotManager.get_group_permission(group_id, Feature.AI_CAPABILITY) == PermissionStatus.DISABLED) print(GroupBotManager.get_group_permission(group_id, Feature.SUMMARY_CAPABILITY) == PermissionStatus.ENABLED) # # 启用群机器人 # print(GroupBoManager.handle_command(group_id, "ROBOT-启用")) # # # 启用每日新闻自动播报 # print(GroupBotManager.handle_command(group_id, "2-启用")) # 使用序号启用每日新闻自动播报 # # # 关闭群AI能力 # print(GroupBotManager.handle_command(group_id, "4-关闭")) # 使用序号关闭群AI能力 # # # 启用群总结能力 # print(GroupBotManager.handle_command(group_id, "5-启用")) # 使用序号启用群总结能力 # # # 关闭Sehuatang PDF能力 # print(GroupBotManager.handle_command(group_id, "6-关闭")) # 使用序号关闭Sehuatang PDF能力 # # # 启用EPIC自动播报 # print(GroupBotManager.handle_command(group_id, "7-启用")) # 使用序号启用EPIC自动播报 # # # 查看当前群组的功能权限 # print(GroupBotManager.get_group_permission(group_id, Feature.ROBOT)) # print(GroupBotManager.get_group_permission(group_id, Feature.DAILY_NEWS)) # print(GroupBotManager.get_group_permission(group_id, Feature.AI_CAPABILITY)) # print(GroupBotManager.get_group_permission(group_id, Feature.SUMMARY_CAPABILITY)) # print(GroupBotManager.get_group_permission(group_id, Feature.PDF_CAPABILITY)) # print(GroupBotManager.get_group_permission(group_id, Feature.EPIC)) # # # 查看群组所有功能和状态 # permissions = GroupBotManager.list_group_permissions(group_id) # for feature, status in permissions.items(): # print(f"{feature.description} (序号: {feature.value}): {status.value}") # # # 查看 group:list 中的群组 # print("当前启用群机器人的群组:", GroupBotManager.get_group_list()) # # # 查看菜单功能列表 # print("功能列表:") # print(GroupBotManager.handle_command(group_id, "MENU")) # # # 查看 group:list 清单 # print("群组清单:") # print(GroupBotManager.handle_command(group_id, "GROUP_LIST")) # # # 更新缓存 # print(GroupBotManager.handle_command(group_id, "UPDATE")) # # # 保存到 Redis # GroupBotManager.save_to_redis() if __name__ == '__main__': # 执行模拟命令 simulate_commands()