diff --git a/db/points_db.py b/db/points_db.py index 4d643e2..e027d72 100644 --- a/db/points_db.py +++ b/db/points_db.py @@ -610,6 +610,90 @@ class PointsDBOperator(BaseDBOperator): self.LOG.error(f"获取群组积分统计信息失败: {e}") return stats + def get_group_plugin_consumption_stats(self, group_id: str, lookback_hours: int = 72) -> Dict[str, Any]: + """ + 获取群聊插件积分消耗统计信息。 + + 这里专门为“积分通货膨胀”策略提供数据支撑,只统计真正会销毁积分总量的插件扣费: + 1. 用户之间转账、打劫、保释这类行为,本质上只是积分在用户之间流转,不算真实消耗; + 2. `plugin` 来源的 `spend` 流水,才代表群积分池被真正消耗掉了; + 3. 因此通胀策略应基于“当前群积分存量”与“最近一段时间插件真实消耗量”的关系来判断。 + + Args: + group_id: 群ID + lookback_hours: 统计最近多少小时的插件消耗 + + Returns: + 包含群积分存量、系统沉淀积分、最近插件消耗量等字段的统计字典 + """ + normalized_hours = max(1, int(lookback_hours or 72)) + stats = { + "group_id": group_id, + "lookback_hours": normalized_hours, + "total_users": 0, + "active_points_total": 0, + "system_points_total": 0, + "plugin_spend_points": 0, + "plugin_spend_count": 0, + "plugin_spend_ratio": 0.0, + } + + if not group_id: + return stats + + try: + # 用户积分存量统计中要把 SYSTEM 单独拆出来: + # SYSTEM 账户通常用于承接抽水、保释金等沉淀积分, + # 这些积分已经不在普通成员手里流通,所以需要单独记录,避免干扰真实“群成员持有积分”。 + points_result = self.execute_query( + """ + SELECT + SUM(CASE WHEN user_id <> 'SYSTEM' THEN 1 ELSE 0 END) AS total_users, + SUM(CASE WHEN user_id <> 'SYSTEM' THEN total_points ELSE 0 END) AS active_points_total, + SUM(CASE WHEN user_id = 'SYSTEM' THEN total_points ELSE 0 END) AS system_points_total + FROM t_user_points + WHERE group_id = %s + """, + (group_id,), + fetch_one=True, + ) or {} + + # 最近插件消耗采用时间窗口统计。 + # 只看 plugin 来源的 spend,能更准确反映“带注解功能”对积分池的真实消耗效率。 + time_boundary = datetime.now() - timedelta(hours=normalized_hours) + spend_result = self.execute_query( + """ + SELECT + SUM(CASE WHEN transaction_type = 'spend' AND source = %s THEN ABS(points) ELSE 0 END) + AS plugin_spend_points, + SUM(CASE WHEN transaction_type = 'spend' AND source = %s THEN 1 ELSE 0 END) + AS plugin_spend_count + FROM t_point_transactions + WHERE group_id = %s + AND created_at >= %s + """, + (PointSource.PLUGIN.value, PointSource.PLUGIN.value, group_id, time_boundary), + fetch_one=True, + ) or {} + + stats["total_users"] = int(points_result.get("total_users") or 0) + stats["active_points_total"] = int(points_result.get("active_points_total") or 0) + stats["system_points_total"] = int(points_result.get("system_points_total") or 0) + stats["plugin_spend_points"] = int(spend_result.get("plugin_spend_points") or 0) + stats["plugin_spend_count"] = int(spend_result.get("plugin_spend_count") or 0) + + active_points_total = stats["active_points_total"] + if active_points_total > 0: + stats["plugin_spend_ratio"] = round( + stats["plugin_spend_points"] / active_points_total, + 6, + ) + + return stats + except Exception as e: + self.LOG.error(f"获取群聊插件积分消耗统计失败: {e}") + return stats + def imprison_user(self, user_id: str, group_id: str, hours: int = 24, reason: str = None) -> bool: """关押用户 Args: diff --git a/utils/decorator/points_decorator.py b/utils/decorator/points_decorator.py index 9a86501..6d0907a 100644 --- a/utils/decorator/points_decorator.py +++ b/utils/decorator/points_decorator.py @@ -1,4 +1,5 @@ import functools +import math import time import traceback import asyncio @@ -12,6 +13,138 @@ from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotMan from wechat_ipad import WechatAPIClient +def _read_points_inflation_config(config: Dict[str, Any], key: str, default: Any) -> Any: + """ + 兼容读取通胀配置。 + + 之所以做这一层兼容,是因为插件 TOML 里常见两种写法: + 1. `lookback_hours = 72` + 2. `lookback-hours = 72` + 统一兼容后,后续新增配置就不用担心命名风格差异。 + """ + if not isinstance(config, dict): + return default + return config.get(key, config.get(key.replace("_", "-"), default)) + + +def _resolve_points_cost_profile(plugin_instance: Any, message: Dict[str, Any], base_points: int) -> Dict[str, Any]: + """ + 解析本次积分消费的实际扣费画像。 + + 默认情况下,注解的扣费是固定值;这里加入群聊级“通货膨胀”策略后, + 会根据群里当前积分存量和最近插件消耗效率,动态算出本次实际扣费。 + + 返回值说明: + - `base_points`: 注解原始价格 + - `actual_points`: 本次实际扣费 + - `multiplier`: 通胀倍率 + - `inflated`: 是否触发了通胀涨价 + """ + roomid = message.get("roomid", "") + profile = { + "base_points": int(base_points), + "actual_points": int(base_points), + "multiplier": 1.0, + "inflated": False, + "lookback_hours": 0, + "spend_ratio": 0.0, + "active_points_total": 0, + "plugin_spend_points": 0, + "plugin_spend_count": 0, + } + + # 用户需求明确指向“群里积分消耗不掉了”,因此这里只对群聊启用通胀。 + if not roomid: + return profile + + plugin_config = getattr(plugin_instance, "_config", {}) or {} + inflation_config = plugin_config.get("PointsCostInflation", {}) or {} + if not _read_points_inflation_config(inflation_config, "enabled", True): + return profile + + # 下面这些默认值尽量保守: + # 1. 只有群积分存量和群成员数达到一定规模,才说明积分开始“堆积”; + # 2. 目标消耗比、步进倍率、最大倍率都做了封顶,避免涨价过于激进。 + lookback_hours = max(1, int(_read_points_inflation_config(inflation_config, "lookback_hours", 72) or 72)) + min_group_users = max(1, int(_read_points_inflation_config(inflation_config, "min_group_users", 5) or 5)) + min_group_points = max(0, int(_read_points_inflation_config(inflation_config, "min_group_points", 1000) or 1000)) + target_spend_ratio = float(_read_points_inflation_config(inflation_config, "target_spend_ratio", 0.08) or 0.08) + ratio_per_step = float(_read_points_inflation_config(inflation_config, "ratio_per_step", 0.02) or 0.02) + multiplier_step = float(_read_points_inflation_config(inflation_config, "multiplier_step", 0.5) or 0.5) + max_multiplier = float(_read_points_inflation_config(inflation_config, "max_multiplier", 3.0) or 3.0) + + try: + db_manager = DBConnectionManager.get_instance() + points_db = PointsDBOperator(db_manager) + stats = points_db.get_group_plugin_consumption_stats(roomid, lookback_hours) + + profile["lookback_hours"] = lookback_hours + profile["spend_ratio"] = float(stats.get("plugin_spend_ratio") or 0.0) + profile["active_points_total"] = int(stats.get("active_points_total") or 0) + profile["plugin_spend_points"] = int(stats.get("plugin_spend_points") or 0) + profile["plugin_spend_count"] = int(stats.get("plugin_spend_count") or 0) + + # 群体规模太小,或者群里本身没什么积分时,没必要触发通胀。 + if int(stats.get("total_users") or 0) < min_group_users: + return profile + if profile["active_points_total"] < min_group_points: + return profile + if target_spend_ratio <= 0: + return profile + + spend_ratio = profile["spend_ratio"] + if spend_ratio >= target_spend_ratio: + return profile + + # 根据“离目标消耗比差多少”来提升倍率。 + # 例子: + # - 目标 8%,当前只有 1%,说明群积分沉淀较严重; + # - 每差 2% 提高 0.5 倍,直到达到封顶倍率。 + safe_ratio_per_step = max(ratio_per_step, 0.0001) + gap_ratio = max(0.0, target_spend_ratio - spend_ratio) + step_count = max(1, math.ceil(gap_ratio / safe_ratio_per_step)) + multiplier = min(max_multiplier, 1.0 + step_count * multiplier_step) + actual_points = max(int(base_points), math.ceil(int(base_points) * multiplier)) + + profile["actual_points"] = actual_points + profile["multiplier"] = round(multiplier, 2) + profile["inflated"] = actual_points > int(base_points) + + if profile["inflated"]: + plugin_name = getattr(plugin_instance, "name", "未知插件") + logger.info( + f"群 {roomid} 触发积分通胀: plugin={plugin_name}, " + f"base={base_points}, actual={actual_points}, " + f"ratio={spend_ratio:.4f}, multiplier={profile['multiplier']:.2f}" + ) + return profile + except Exception as e: + # 通胀策略属于增强能力,不能因为统计异常影响原有功能可用性。 + logger.warning(f"解析积分通胀倍率失败,回退固定扣费: {e}") + logger.warning(traceback.format_exc()) + return profile + + +def _build_points_cost_text(profile: Dict[str, Any]) -> str: + """ + 生成扣费说明文案。 + + 统一文案可以让“积分不足提示”和“扣费成功提示”保持一致, + 用户更容易理解为什么这次扣费不是固定值。 + """ + if not profile.get("inflated"): + return f"💰消费 {profile['actual_points']} 积分" + + multiplier = float(profile.get("multiplier") or 1.0) + ratio = float(profile.get("spend_ratio") or 0.0) * 100 + return ( + f"💰消费 {profile['actual_points']} 积分\n" + f"📈 群通胀倍率: x{multiplier:.2f}\n" + f"🧾 基础价: {profile['base_points']} | 实际价: {profile['actual_points']}\n" + f"📉 近{profile['lookback_hours']}小时插件消耗占比: {ratio:.2f}%" + ) + + def points_reward_decorator(points_calculator: Union[int, Callable], source_type: str = "other", description: str = None, feature_key: str = None): """积分奖励装饰器 @@ -183,7 +316,7 @@ def plugin_points_cost(points: int, description: str = None, feature_key: str = """插件积分消费装饰器 Args: - points: 消费积分数量 + points: 基础消费积分数量 description: 积分消费描述 feature_key: 功能权限键名 @@ -224,14 +357,24 @@ def plugin_points_cost(points: int, description: str = None, feature_key: str = plugin_name = self.name if hasattr(self, 'name') else "未知插件" logger.info(f"PointsCost.{plugin_name}") + cost_profile = _resolve_points_cost_profile(self, message, points) + actual_cost_points = int(cost_profile["actual_points"]) user_points = points_db.get_user_points(sender, roomid) - if user_points["total_points"] < points: + if user_points["total_points"] < actual_cost_points: # 积分不足 + shortage = actual_cost_points - user_points["total_points"] + inflation_tip = "" + if cost_profile["inflated"]: + inflation_tip = ( + f"\n📈 当前群通胀倍率: x{cost_profile['multiplier']:.2f}" + f"\n🧾 基础价: {cost_profile['base_points']} | 实际价: {actual_cost_points}" + ) await bot.send_at_message((roomid if roomid else sender), f"❌ 积分不足\n" f"🪙 先参与积分活动[签到]赚取吧!\n" - f"💰 有: {user_points['total_points']} | 需: {points} |差: {points - user_points['total_points']} ", + f"💰 有: {user_points['total_points']} | 需: {actual_cost_points} | 差: {shortage}" + f"{inflation_tip}", [sender] ) logger.info(f"用户 {sender} 积分不足,无法使用功能") @@ -243,20 +386,20 @@ def plugin_points_cost(points: int, description: str = None, feature_key: str = # 如果原始方法执行成功,扣除积分 if success: deduct_success, deduct_result = points_db.deduct_points( - sender, roomid, points, PointSource.PLUGIN, + sender, roomid, actual_cost_points, PointSource.PLUGIN, description or f"使用 {plugin_name} 功能" ) if deduct_success: - logger.info(f"用户 {sender} 使用 {plugin_name} 功能扣除 {points} 积分") + logger.info(f"用户 {sender} 使用 {plugin_name} 功能扣除 {actual_cost_points} 积分") # 添加对 response 的类型检查 if isinstance(response, str) and "积分" not in response: - response += f"\n\n💰 已消费 {points} 积分" + response += f"\n\n{_build_points_cost_text(cost_profile)}" client_msg_id, create_time, new_msg_id = await bot.send_at_message( (roomid if roomid else sender), - f"💰消费 {points} 积分", [sender] + _build_points_cost_text(cost_profile), [sender] ) revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 5) @@ -300,14 +443,23 @@ def plugin_points_cost(points: int, description: str = None, feature_key: str = plugin_name = self.name if hasattr(self, 'name') else "未知插件" logger.info(f"PointsCost.{plugin_name}") + cost_profile = _resolve_points_cost_profile(self, message, points) + actual_cost_points = int(cost_profile["actual_points"]) user_points = points_db.get_user_points(sender, roomid) - if user_points["total_points"] < points: + if user_points["total_points"] < actual_cost_points: # 积分不足 + inflation_tip = "" + if cost_profile["inflated"]: + inflation_tip = ( + f"\n📈 当前群通胀倍率: x{cost_profile['multiplier']:.2f}" + f"\n🧾 基础价: {cost_profile['base_points']} | 实际价: {actual_cost_points}" + ) self.message_util.send_text( f"❌ 积分不足\n无法使用 {plugin_name} 功能\n" f"🪙 先参与积分活动[签到,答题/t]赚取吧!\n" - f"💰 有: {user_points['total_points']} | 需: {points} |差: {points - user_points['total_points']} ", + f"💰 有: {user_points['total_points']} | 需: {actual_cost_points} " + f"| 差: {actual_cost_points - user_points['total_points']}{inflation_tip}", (roomid if roomid else sender), sender ) logger.info(f"用户 {sender} 积分不足,无法使用功能") @@ -319,18 +471,18 @@ def plugin_points_cost(points: int, description: str = None, feature_key: str = # 如果原始方法执行成功,扣除积分 if success: deduct_success, deduct_result = points_db.deduct_points( - sender, roomid, points, PointSource.PLUGIN, + sender, roomid, actual_cost_points, PointSource.PLUGIN, description or f"使用 {plugin_name} 功能" ) if deduct_success: - logger.info(f"用户 {sender} 使用 {plugin_name} 功能扣除 {points} 积分") + logger.info(f"用户 {sender} 使用 {plugin_name} 功能扣除 {actual_cost_points} 积分") # 添加对 response 的类型检查 if isinstance(response, str) and "积分" not in response: - response += f"\n\n💰 已消费 {points} 积分" + response += f"\n\n{_build_points_cost_text(cost_profile)}" self.message_util.send_text( - f"💰消费 {points} 积分", + _build_points_cost_text(cost_profile), (roomid if roomid else sender), sender ) else: