为积分通胀策略增加Redis日缓存

1. 新增群积分通胀统计的Redis缓存键与按天过期策略,减少重复聚合查询。

2. 新增带缓存的群插件积分消耗统计方法,Redis异常时自动回退实时查询。

3. 调整积分消耗注解优先读取缓存版统计,降低高频群聊场景下的数据库压力。
This commit is contained in:
liuwei
2026-04-27 13:40:27 +08:00
parent 66ac0a7e89
commit 955c2f2797
2 changed files with 87 additions and 1 deletions

View File

@@ -2,6 +2,7 @@
""" """
积分系统数据库操作类 积分系统数据库操作类
""" """
import json
from loguru import logger from loguru import logger
from datetime import datetime, timedelta from datetime import datetime, timedelta
from enum import Enum from enum import Enum
@@ -98,6 +99,35 @@ class PointsDBOperator(BaseDBOperator):
self.LOG.error(f"创建积分系统数据库表失败: {e}") self.LOG.error(f"创建积分系统数据库表失败: {e}")
raise raise
def _build_group_plugin_consumption_cache_key(self, group_id: str, lookback_hours: int) -> str:
"""
构造群聊插件积分消耗统计的 Redis 缓存键。
这里按“天维度”缓存,原因有两点:
1. 用户明确希望不要每次都实时计算;
2. 通胀策略的目标是压缩群积分长期沉淀,对秒级实时性要求不高,
因此按天缓存可以显著减少数据库聚合压力,同时保持策略方向稳定。
Key 里额外带上 `lookback_hours`,是为了避免后续不同窗口参数互相污染缓存。
"""
stat_day = datetime.now().strftime("%Y%m%d")
normalized_group_id = str(group_id or "private")
normalized_hours = max(1, int(lookback_hours or 72))
return f"bot:points:inflation:group_stats:{normalized_group_id}:{stat_day}:{normalized_hours}"
@staticmethod
def _seconds_until_next_day() -> int:
"""
计算距离次日零点还有多少秒。
缓存采用“自然日切换”:
- 当天第一次命中后,后续同一天直接复用;
- 到了第二天自动失效,重新基于新一天的数据重算。
"""
now = datetime.now()
tomorrow = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
return max(60, int((tomorrow - now).total_seconds()))
def get_user_points(self, user_id: str, group_id: str) -> Dict: def get_user_points(self, user_id: str, group_id: str) -> Dict:
""" """
获取用户积分信息 获取用户积分信息
@@ -694,6 +724,58 @@ class PointsDBOperator(BaseDBOperator):
self.LOG.error(f"获取群聊插件积分消耗统计失败: {e}") self.LOG.error(f"获取群聊插件积分消耗统计失败: {e}")
return stats return stats
def get_group_plugin_consumption_stats_cached(self, group_id: str, lookback_hours: int = 72) -> Dict[str, Any]:
"""
获取带 Redis 日缓存的群聊插件积分消耗统计。
设计目标:
1. 避免每次插件扣费前都扫描积分表和流水表;
2. 使用“群 + 日期 + 回看窗口”的 Redis 键,把同一天的同群统计复用起来;
3. Redis 不可用时自动降级为实时查询,保证功能可用性优先。
Args:
group_id: 群ID
lookback_hours: 统计最近多少小时的插件消耗
Returns:
群积分通胀计算所需的统计字典
"""
normalized_hours = max(1, int(lookback_hours or 72))
if not group_id:
return self.get_group_plugin_consumption_stats(group_id, normalized_hours)
cache_key = self._build_group_plugin_consumption_cache_key(group_id, normalized_hours)
try:
# 先读 Redis 日缓存。
# 同一天同一群通常会多次触发同类功能,这一步能把热点 SQL 压掉。
with self.db_manager.get_redis_connection() as redis_client:
cached_payload = redis_client.get(cache_key)
if cached_payload:
if isinstance(cached_payload, bytes):
cached_payload = cached_payload.decode("utf-8")
cached_stats = json.loads(cached_payload)
if isinstance(cached_stats, dict):
return cached_stats
except Exception as e:
# 缓存失败不影响主流程,直接回退实时统计。
self.LOG.warning(f"读取群积分通胀缓存失败,回退实时查询: {e}")
stats = self.get_group_plugin_consumption_stats(group_id, normalized_hours)
try:
with self.db_manager.get_redis_connection() as redis_client:
# 额外多保留 5 分钟缓冲,避免 00:00 刚过时多个请求同时击穿。
redis_client.set(
cache_key,
json.dumps(stats, ensure_ascii=False),
ex=self._seconds_until_next_day() + 300,
)
except Exception as e:
self.LOG.warning(f"写入群积分通胀缓存失败: {e}")
return stats
def imprison_user(self, user_id: str, group_id: str, hours: int = 24, reason: str = None) -> bool: def imprison_user(self, user_id: str, group_id: str, hours: int = 24, reason: str = None) -> bool:
"""关押用户 """关押用户
Args: Args:

View File

@@ -76,7 +76,11 @@ def _resolve_points_cost_profile(plugin_instance: Any, message: Dict[str, Any],
try: try:
db_manager = DBConnectionManager.get_instance() db_manager = DBConnectionManager.get_instance()
points_db = PointsDBOperator(db_manager) points_db = PointsDBOperator(db_manager)
stats = points_db.get_group_plugin_consumption_stats(roomid, lookback_hours) # 通胀画像优先走 Redis 日缓存:
# 1. 同一个群在一天内会重复触发多次插件扣费;
# 2. 若每次都实时聚合,会对积分表和流水表造成额外压力;
# 3. 因此这里统一走“按天缓存”的统计入口,把数据库开销压到每天首次访问。
stats = points_db.get_group_plugin_consumption_stats_cached(roomid, lookback_hours)
profile["lookback_hours"] = lookback_hours profile["lookback_hours"] = lookback_hours
profile["spend_ratio"] = float(stats.get("plugin_spend_ratio") or 0.0) profile["spend_ratio"] = float(stats.get("plugin_spend_ratio") or 0.0)