临时调整权限模块,备份

This commit is contained in:
liuwei
2025-06-09 14:12:31 +08:00
parent cedab1cefd
commit 9d15bf965b
30 changed files with 882 additions and 138 deletions

118
base/auth/permission.py Normal file
View File

@@ -0,0 +1,118 @@
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
import json
import os
from utils.logger import logger
@dataclass
class Permission:
"""权限类"""
key: str
name: str
description: str
value: int
plugin_id: Optional[str] = None
class PermissionStatus(Enum):
"""权限状态枚举"""
ENABLED = 1
DISABLED = 0
class PermissionManager:
"""权限管理器(单例模式)"""
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not self._initialized:
self._permissions: Dict[str, Permission] = {}
self._config_file = "config/permissions.json"
self._load_permissions()
self._initialized = True
def _load_permissions(self):
"""从配置文件加载权限"""
try:
if os.path.exists(self._config_file):
with open(self._config_file, 'r', encoding='utf-8') as f:
data = json.load(f)
for key, perm_data in data.items():
self._permissions[key] = Permission(**perm_data)
except Exception as e:
logger.error(f"加载权限配置失败: {e}")
def _save_permissions(self):
"""保存权限到配置文件"""
try:
os.makedirs(os.path.dirname(self._config_file), exist_ok=True)
with open(self._config_file, 'w', encoding='utf-8') as f:
json.dump(
{k: v.__dict__ for k, v in self._permissions.items()},
f,
ensure_ascii=False,
indent=2
)
except Exception as e:
logger.error(f"保存权限配置失败: {e}")
def register_permission(
self,
key: str,
name: str,
description: str,
plugin_id: Optional[str] = None
) -> Permission:
"""注册新权限"""
if key in self._permissions:
return self._permissions[key]
value = len(self._permissions) + 1
permission = Permission(
key=key,
name=name,
description=description,
value=value,
plugin_id=plugin_id
)
self._permissions[key] = permission
self._save_permissions()
return permission
def get_permission(self, key: str) -> Optional[Permission]:
"""获取权限"""
return self._permissions.get(key)
def get_permission_by_value(self, value: int) -> Optional[Permission]:
"""通过值获取权限"""
for perm in self._permissions.values():
if perm.value == value:
return perm
return None
def get_all_permissions(self) -> List[Permission]:
"""获取所有权限"""
return list(self._permissions.values())
def get_plugin_permissions(self, plugin_id: str) -> List[Permission]:
"""获取插件的所有权限"""
return [
perm for perm in self._permissions.values()
if perm.plugin_id == plugin_id
]
def remove_permission(self, key: str) -> bool:
"""移除权限"""
if key in self._permissions:
del self._permissions[key]
self._save_permissions()
return True
return False

View File

@@ -0,0 +1,98 @@
from .permission import PermissionManager, PermissionStatus
from .group_permission import GroupPermissionManager
def test_permission_system():
# 获取权限管理器实例
perm_manager = PermissionManager()
group_manager = GroupPermissionManager()
# 测试注册权限
print("测试注册权限...")
robot_perm = perm_manager.register_permission(
"ROBOT",
"群机器人",
"🔧 群机器人 [总开关]"
)
print(f"注册权限: {robot_perm.key} = {robot_perm.value} - {robot_perm.description}")
# 测试重复注册
print("\n测试重复注册...")
same_perm = perm_manager.register_permission(
"ROBOT",
"群机器人",
"🔧 群机器人 [总开关]"
)
print(f"重复注册结果: {same_perm.key} = {same_perm.value} - {same_perm.description}")
# 测试注册新权限
print("\n测试注册新权限...")
news_perm = perm_manager.register_permission(
"NEWS",
"新闻播报",
"📰 每日新闻播报",
plugin_id="news_plugin"
)
print(f"新权限: {news_perm.key} = {news_perm.value} - {news_perm.description}")
# 测试获取权限
print("\n测试获取权限...")
retrieved_perm = perm_manager.get_permission("ROBOT")
print(f"获取权限: {retrieved_perm.key} = {retrieved_perm.value} - {retrieved_perm.description}")
# 测试通过值获取权限
print("\n测试通过值获取权限...")
perm_by_value = perm_manager.get_permission_by_value(1)
print(f"通过值获取: {perm_by_value.key} = {perm_by_value.value} - {perm_by_value.description}")
# 测试获取所有权限
print("\n测试获取所有权限...")
all_perms = perm_manager.get_all_permissions()
print("所有权限:")
for perm in all_perms:
print(f" {perm.key} = {perm.value} - {perm.description}")
# 测试获取插件权限
print("\n测试获取插件权限...")
plugin_perms = perm_manager.get_plugin_permissions("news_plugin")
print("插件权限:")
for perm in plugin_perms:
print(f" {perm.key} = {perm.value} - {perm.description}")
# 测试群组权限
print("\n测试群组权限...")
group_id = "test_group"
# 添加群组
group_manager.add_group(group_id)
print(f"添加群组: {group_id}")
# 设置群组权限
group_manager.set_group_permission(group_id, robot_perm, PermissionStatus.ENABLED)
print(f"设置权限: {group_id} - {robot_perm.key} = {PermissionStatus.ENABLED.value}")
# 获取群组权限
status = group_manager.get_group_permission(group_id, robot_perm)
print(f"获取权限状态: {group_id} - {robot_perm.key} = {status.value}")
# 检查权限
result = group_manager.check_permission(group_id, robot_perm)
print(f"检查权限: {group_id} - {robot_perm.key} = {result}")
# 列出群组权限
print("\n列出群组权限...")
perms = group_manager.list_group_permissions(group_id)
print(f"群组 {group_id} 的权限:")
for perm, status in perms.items():
print(f" {perm.key} = {status.value}")
# 移除群组
print("\n测试移除群组...")
if group_manager.remove_group(group_id):
print(f"成功移除群组: {group_id}")
else:
print(f"移除群组失败: {group_id}")
if __name__ == '__main__':
test_permission_system()

View File

@@ -1,6 +1,7 @@
from typing import Dict, Any, Tuple, Optional, List
from base.plugin_common.plugin_interface import PluginInterface
from wechat_ipad import WechatAPIClient
from utils.robot_cmd.robot_command import Feature
class MessagePluginInterface(PluginInterface):
@@ -16,6 +17,26 @@ class MessagePluginInterface(PluginInterface):
"""支持的命令列表"""
return []
@property
def feature_key(self) -> Optional[str]:
"""插件对应的功能权限键名"""
return None
@property
def feature_description(self) -> Optional[str]:
"""插件对应的功能权限描述"""
return None
def register_feature(self) -> Optional[Feature]:
"""注册插件功能权限
Returns:
Feature: 注册的功能权限枚举如果不需要权限则返回None
"""
if self.feature_key and self.feature_description:
return Feature.register_feature(self.feature_key, self.feature_description)
return None
# 需要完成jobs 的业务所以完成bot注入
def set_bot(self, bot: WechatAPIClient) -> None:
self.bot: WechatAPIClient = bot

View File

@@ -331,7 +331,7 @@ class PluginManager:
return plugin
except Exception as e:
self.LOG.error(f"PluginManager加载插件模块 {module_name} 失败: {e}", exc_info=True)
self.LOG.exception(f"PluginManager加载插件模块 {module_name} 失败: {e}", exc_info=True)
return None
def unload_plugin(self, name: str) -> bool: