新增群级插件配置表与服务层,采用MySQL持久化+Redis长期缓存(TTL=-1);后台新增群级插件配置管理页面与API,支持按群按插件维护JSON配置并在修改后同步回填MySQL和刷新Redis;已将群成员变更监控插件接入该配置,支持欢迎文案与卡片URL等按群差异化。
100 lines
3.6 KiB
Python
100 lines
3.6 KiB
Python
# -*- coding: utf-8 -*-
|
||
import json
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
from loguru import logger
|
||
|
||
from db.group_plugin_config_db import GroupPluginConfigDBOperator
|
||
|
||
|
||
class GroupPluginConfigService:
|
||
"""群级插件配置服务(MySQL 持久化 + Redis 永久缓存)。"""
|
||
|
||
REDIS_KEY_PREFIX = "group:plugin:cfg"
|
||
|
||
def __init__(self, db_operator: GroupPluginConfigDBOperator, redis_client):
|
||
self.db = db_operator
|
||
self.redis = redis_client
|
||
|
||
@classmethod
|
||
def _build_cache_key(cls, group_id: str, plugin_name: str, config_key: str = "default") -> str:
|
||
return f"{cls.REDIS_KEY_PREFIX}:{group_id}:{plugin_name}:{config_key}"
|
||
|
||
def _write_cache(self, key: str, payload: Dict[str, Any]) -> None:
|
||
"""写入 Redis 缓存(TTL=-1,永久有效)。"""
|
||
# 需求明确要求长期缓存,这里不设置过期时间,保持 TTL=-1。
|
||
self.redis.set(key, json.dumps(payload, ensure_ascii=False))
|
||
|
||
def _delete_cache(self, key: str) -> None:
|
||
"""删除 Redis 缓存。"""
|
||
self.redis.delete(key)
|
||
|
||
def get_config(
|
||
self,
|
||
group_id: str,
|
||
plugin_name: str,
|
||
config_key: str = "default",
|
||
default: Optional[Dict[str, Any]] = None,
|
||
) -> Dict[str, Any]:
|
||
"""读取配置:先 Redis,未命中再查 MySQL 并回填 Redis。"""
|
||
cache_key = self._build_cache_key(group_id, plugin_name, config_key)
|
||
cached = self.redis.get(cache_key)
|
||
if cached:
|
||
try:
|
||
payload = json.loads(cached)
|
||
if isinstance(payload, dict):
|
||
return payload
|
||
except Exception as e:
|
||
logger.warning(f"群插件配置缓存解析失败,将回源数据库: key={cache_key}, error={e}")
|
||
|
||
row = self.db.get_config(group_id, plugin_name, config_key)
|
||
if not row:
|
||
result = dict(default or {})
|
||
# 缓存空结果可减少热点穿透;后续后台更新会主动刷新。
|
||
self._write_cache(cache_key, result)
|
||
return result
|
||
|
||
result = row.get("config_json") or {}
|
||
if not isinstance(result, dict):
|
||
result = {}
|
||
self._write_cache(cache_key, result)
|
||
return result
|
||
|
||
def list_configs(self, group_id: str = "", plugin_name: str = "") -> List[Dict[str, Any]]:
|
||
"""列出配置(后台展示用)。"""
|
||
return self.db.list_configs(group_id=group_id, plugin_name=plugin_name)
|
||
|
||
def upsert_config(
|
||
self,
|
||
group_id: str,
|
||
plugin_name: str,
|
||
config_data: Dict[str, Any],
|
||
config_key: str = "default",
|
||
enabled: bool = True,
|
||
updated_by: str = "system",
|
||
) -> bool:
|
||
"""写配置:先落 MySQL,再刷新 Redis。"""
|
||
ok = self.db.upsert_config(
|
||
group_id=group_id,
|
||
plugin_name=plugin_name,
|
||
config_key=config_key,
|
||
config_data=config_data or {},
|
||
enabled=enabled,
|
||
updated_by=updated_by,
|
||
)
|
||
if not ok:
|
||
return False
|
||
|
||
# 需求要求“修改后刷新 redis 内容”,这里直接回填最新值。
|
||
cache_key = self._build_cache_key(group_id, plugin_name, config_key)
|
||
self._write_cache(cache_key, config_data or {})
|
||
return True
|
||
|
||
def delete_config(self, group_id: str, plugin_name: str, config_key: str = "default") -> bool:
|
||
"""删除配置:删 MySQL 后同步清理 Redis。"""
|
||
ok = self.db.delete_config(group_id, plugin_name, config_key)
|
||
if ok:
|
||
cache_key = self._build_cache_key(group_id, plugin_name, config_key)
|
||
self._delete_cache(cache_key)
|
||
return ok
|