Files
abot/db/group_plugin_config_db.py
liuwei d4b7cb32f6 feat(群级配置): 新增MySQL+Redis持久缓存并接入进群欢迎差异化配置
新增群级插件配置表与服务层,采用MySQL持久化+Redis长期缓存(TTL=-1);后台新增群级插件配置管理页面与API,支持按群按插件维护JSON配置并在修改后同步回填MySQL和刷新Redis;已将群成员变更监控插件接入该配置,支持欢迎文案与卡片URL等按群差异化。
2026-04-20 10:42:46 +08:00

133 lines
4.6 KiB
Python

# -*- coding: utf-8 -*-
import json
from typing import Any, Dict, List, Optional
from loguru import logger
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class GroupPluginConfigDBOperator(BaseDBOperator):
"""群级插件配置数据库操作器。"""
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
def init_tables(self) -> bool:
"""初始化群级插件配置表。"""
try:
return self.execute_update(
"""
CREATE TABLE IF NOT EXISTS t_group_plugin_config (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
group_id VARCHAR(100) NOT NULL,
plugin_name VARCHAR(128) NOT NULL,
config_key VARCHAR(128) NOT NULL DEFAULT 'default',
config_json JSON NOT NULL,
enabled TINYINT(1) NOT NULL DEFAULT 1,
version INT NOT NULL DEFAULT 1,
updated_by VARCHAR(100) NOT NULL DEFAULT 'system',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_group_plugin_key (group_id, plugin_name, config_key),
INDEX idx_group_plugin (group_id, plugin_name),
INDEX idx_plugin_name (plugin_name)
)
"""
)
except Exception as e:
logger.error(f"初始化群级插件配置表失败: {e}")
return False
@staticmethod
def _parse_json_field(row: Dict[str, Any], key: str) -> None:
value = row.get(key)
if isinstance(value, str):
try:
row[key] = json.loads(value)
except Exception:
row[key] = {}
elif value is None:
row[key] = {}
def get_config(self, group_id: str, plugin_name: str, config_key: str = "default") -> Optional[Dict[str, Any]]:
"""查询单条群级插件配置。"""
row = self.execute_query(
"""
SELECT * FROM t_group_plugin_config
WHERE group_id = %s AND plugin_name = %s AND config_key = %s
LIMIT 1
""",
(group_id, plugin_name, config_key),
fetch_one=True,
)
if not row:
return None
self._parse_json_field(row, "config_json")
return row
def list_configs(self, group_id: str = "", plugin_name: str = "") -> List[Dict[str, Any]]:
"""按条件列出配置。"""
where_sql = []
params = []
if group_id:
where_sql.append("group_id = %s")
params.append(group_id)
if plugin_name:
where_sql.append("plugin_name = %s")
params.append(plugin_name)
where_clause = f"WHERE {' AND '.join(where_sql)}" if where_sql else ""
rows = self.execute_query(
f"""
SELECT * FROM t_group_plugin_config
{where_clause}
ORDER BY updated_at DESC, id DESC
""",
tuple(params) if params else None,
) or []
for row in rows:
self._parse_json_field(row, "config_json")
return rows
def upsert_config(
self,
group_id: str,
plugin_name: str,
config_key: str,
config_data: Dict[str, Any],
enabled: bool = True,
updated_by: str = "system",
) -> bool:
"""新增或更新配置。"""
return self.execute_update(
"""
INSERT INTO t_group_plugin_config (
group_id, plugin_name, config_key, config_json, enabled, version, updated_by
) VALUES (%s, %s, %s, %s, %s, 1, %s)
ON DUPLICATE KEY UPDATE
config_json = VALUES(config_json),
enabled = VALUES(enabled),
updated_by = VALUES(updated_by),
version = version + 1
""",
(
group_id,
plugin_name,
config_key,
json.dumps(config_data or {}, ensure_ascii=False),
1 if enabled else 0,
updated_by,
),
)
def delete_config(self, group_id: str, plugin_name: str, config_key: str = "default") -> bool:
"""删除指定配置。"""
return self.execute_update(
"""
DELETE FROM t_group_plugin_config
WHERE group_id = %s AND plugin_name = %s AND config_key = %s
""",
(group_id, plugin_name, config_key),
)