Files
abot/utils/group_plugin_config_service.py
liuwei d4b7cb32f6 feat(群级配置): 新增MySQL+Redis持久缓存并接入进群欢迎差异化配置
新增群级插件配置表与服务层,采用MySQL持久化+Redis长期缓存(TTL=-1);后台新增群级插件配置管理页面与API,支持按群按插件维护JSON配置并在修改后同步回填MySQL和刷新Redis;已将群成员变更监控插件接入该配置,支持欢迎文案与卡片URL等按群差异化。
2026-04-20 10:42:46 +08:00

100 lines
3.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import json
from typing import Any, Dict, List, Optional
from loguru import logger
from db.group_plugin_config_db import GroupPluginConfigDBOperator
class GroupPluginConfigService:
"""群级插件配置服务MySQL 持久化 + Redis 永久缓存)。"""
REDIS_KEY_PREFIX = "group:plugin:cfg"
def __init__(self, db_operator: GroupPluginConfigDBOperator, redis_client):
self.db = db_operator
self.redis = redis_client
@classmethod
def _build_cache_key(cls, group_id: str, plugin_name: str, config_key: str = "default") -> str:
return f"{cls.REDIS_KEY_PREFIX}:{group_id}:{plugin_name}:{config_key}"
def _write_cache(self, key: str, payload: Dict[str, Any]) -> None:
"""写入 Redis 缓存TTL=-1永久有效"""
# 需求明确要求长期缓存,这里不设置过期时间,保持 TTL=-1。
self.redis.set(key, json.dumps(payload, ensure_ascii=False))
def _delete_cache(self, key: str) -> None:
"""删除 Redis 缓存。"""
self.redis.delete(key)
def get_config(
self,
group_id: str,
plugin_name: str,
config_key: str = "default",
default: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""读取配置:先 Redis未命中再查 MySQL 并回填 Redis。"""
cache_key = self._build_cache_key(group_id, plugin_name, config_key)
cached = self.redis.get(cache_key)
if cached:
try:
payload = json.loads(cached)
if isinstance(payload, dict):
return payload
except Exception as e:
logger.warning(f"群插件配置缓存解析失败,将回源数据库: key={cache_key}, error={e}")
row = self.db.get_config(group_id, plugin_name, config_key)
if not row:
result = dict(default or {})
# 缓存空结果可减少热点穿透;后续后台更新会主动刷新。
self._write_cache(cache_key, result)
return result
result = row.get("config_json") or {}
if not isinstance(result, dict):
result = {}
self._write_cache(cache_key, result)
return result
def list_configs(self, group_id: str = "", plugin_name: str = "") -> List[Dict[str, Any]]:
"""列出配置(后台展示用)。"""
return self.db.list_configs(group_id=group_id, plugin_name=plugin_name)
def upsert_config(
self,
group_id: str,
plugin_name: str,
config_data: Dict[str, Any],
config_key: str = "default",
enabled: bool = True,
updated_by: str = "system",
) -> bool:
"""写配置:先落 MySQL再刷新 Redis。"""
ok = self.db.upsert_config(
group_id=group_id,
plugin_name=plugin_name,
config_key=config_key,
config_data=config_data or {},
enabled=enabled,
updated_by=updated_by,
)
if not ok:
return False
# 需求要求“修改后刷新 redis 内容”,这里直接回填最新值。
cache_key = self._build_cache_key(group_id, plugin_name, config_key)
self._write_cache(cache_key, config_data or {})
return True
def delete_config(self, group_id: str, plugin_name: str, config_key: str = "default") -> bool:
"""删除配置:删 MySQL 后同步清理 Redis。"""
ok = self.db.delete_config(group_id, plugin_name, config_key)
if ok:
cache_key = self._build_cache_key(group_id, plugin_name, config_key)
self._delete_cache(cache_key)
return ok