From d4d290fec8d416ab3382e7fc70a6fbbbc58e7ef6 Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 21 Apr 2026 13:42:57 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=20value=5Frank=20=E9=A6=96?= =?UTF-8?q?=E7=89=88=E6=8F=92=E4=BB=B6=E5=B9=B6=E6=8E=A5=E5=85=A5=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E9=87=8D=E7=AE=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 ValueRank 插件入口、配置与主逻辑(我的身价/身价排行/身价说明/重算身价) - 新增每日 04:00 调度动作,支持按群批量重算并写入快照 - 实现积分/发言/活跃/社交四维打分与潜水惩罚,采用95分位截断与归一化 - 新增 t_value_rank_snapshot 建表迁移脚本,并同步更新 init.sql - 代码中补充详细中文注释,说明算法意图、边界处理与稳定性策略 --- db/scripts/init.sql | 25 + ...20260421_add_value_rank_snapshot_table.sql | 19 + plugins/value_rank/__init__.py | 7 + plugins/value_rank/config.toml | 27 + plugins/value_rank/main.py | 717 ++++++++++++++++++ 5 files changed, 795 insertions(+) create mode 100644 db/scripts/migrations/20260421_add_value_rank_snapshot_table.sql create mode 100644 plugins/value_rank/__init__.py create mode 100644 plugins/value_rank/config.toml create mode 100644 plugins/value_rank/main.py diff --git a/db/scripts/init.sql b/db/scripts/init.sql index b0a8894..b803c94 100644 --- a/db/scripts/init.sql +++ b/db/scripts/init.sql @@ -520,6 +520,31 @@ create or replace table message_archive.t_user_stats ) comment '用户使用统计表'; +create or replace table message_archive.t_value_rank_snapshot +( + id bigint auto_increment + primary key, + stat_date date not null comment '统计日期', + group_id varchar(100) not null comment '群ID', + user_id varchar(100) not null comment '用户ID', + score decimal(10, 2) default 0.00 not null comment '身价分', + rank_no int default 0 not null comment '排名', + title varchar(50) default '' not null comment '称号', + points_total int default 0 not null comment '积分存量', + msg_count_7d int default 0 not null comment '7日发言数', + active_days_30 int default 0 not null comment '30日活跃天数', + inactive_days int default 0 not null comment '距今未发言天数', + score_detail_json json null comment '分项得分明细', + created_at datetime default current_timestamp() not null comment '创建时间', + updated_at datetime default current_timestamp() not null on update current_timestamp() comment '更新时间', + constraint uniq_day_group_user + unique (stat_date, group_id, user_id) +) + comment '身价日快照表'; + +create or replace index idx_group_day_rank + on message_archive.t_value_rank_snapshot (group_id, stat_date, rank_no); + create or replace index idx_last_used_at on message_archive.t_user_stats (last_used_at); diff --git a/db/scripts/migrations/20260421_add_value_rank_snapshot_table.sql b/db/scripts/migrations/20260421_add_value_rank_snapshot_table.sql new file mode 100644 index 0000000..6817427 --- /dev/null +++ b/db/scripts/migrations/20260421_add_value_rank_snapshot_table.sql @@ -0,0 +1,19 @@ +-- Value Rank 主快照表:用于每日身价结果持久化与趋势对比 +CREATE TABLE IF NOT EXISTS message_archive.t_value_rank_snapshot ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + stat_date DATE NOT NULL COMMENT '统计日期', + group_id VARCHAR(100) NOT NULL COMMENT '群ID', + user_id VARCHAR(100) NOT NULL COMMENT '用户ID', + score DECIMAL(10,2) NOT NULL DEFAULT 0 COMMENT '身价分', + rank_no INT NOT NULL DEFAULT 0 COMMENT '排名', + title VARCHAR(50) NOT NULL DEFAULT '' COMMENT '称号', + points_total INT NOT NULL DEFAULT 0 COMMENT '积分存量', + msg_count_7d INT NOT NULL DEFAULT 0 COMMENT '7日发言数', + active_days_30 INT NOT NULL DEFAULT 0 COMMENT '30日活跃天数', + inactive_days INT NOT NULL DEFAULT 0 COMMENT '距今未发言天数', + score_detail_json JSON NULL COMMENT '分项得分明细', + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY uniq_day_group_user (stat_date, group_id, user_id), + KEY idx_group_day_rank (group_id, stat_date, rank_no) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='身价日快照表'; diff --git a/plugins/value_rank/__init__.py b/plugins/value_rank/__init__.py new file mode 100644 index 0000000..cc068cb --- /dev/null +++ b/plugins/value_rank/__init__.py @@ -0,0 +1,7 @@ +# 从当前包的main模块导入ValueRankPlugin类 +from .main import ValueRankPlugin + + +def get_plugin(): + """获取插件实例""" + return ValueRankPlugin() diff --git a/plugins/value_rank/config.toml b/plugins/value_rank/config.toml new file mode 100644 index 0000000..8330fe8 --- /dev/null +++ b/plugins/value_rank/config.toml @@ -0,0 +1,27 @@ +[ValueRank] +enable = true +command = ["我的身价", "身价排行", "身价说明", "重算身价"] +command-format = """ +📊 身价系统命令: +1. 我的身价 +2. 身价排行 [名次] +3. 身价说明 +4. 重算身价(管理员) +""" + +# 统计窗口(天) +message_window_days = 7 +active_window_days = 30 +social_window_days = 7 + +# 排分参数 +points_weight = 0.30 +message_weight = 0.35 +active_days_weight = 0.20 +social_weight = 0.15 +inactivity_penalty_max = 150 +base_score_scale = 1000 + +# 排行默认展示数量 +default_rank_limit = 10 +max_rank_limit = 50 diff --git a/plugins/value_rank/main.py b/plugins/value_rank/main.py new file mode 100644 index 0000000..7af4c06 --- /dev/null +++ b/plugins/value_rank/main.py @@ -0,0 +1,717 @@ +# -*- coding: utf-8 -*- +import json +import math +from datetime import datetime +from typing import Any, Dict, List, Optional, Tuple + +from loguru import logger + +from base.plugin_common.message_plugin_interface import MessagePluginInterface +from base.plugin_common.plugin_interface import PluginStatus +from db.base import BaseDBOperator +from db.connection import DBConnectionManager +from utils.decorator.plugin_decorators import plugin_stats_decorator +from utils.robot_cmd.robot_command import PermissionStatus, GroupBotManager +from utils.wechat.contact_manager import ContactManager + + +class ValueRankDB(BaseDBOperator): + """Value Rank 数据访问层。 + + 说明: + 1. 这里集中封装 SQL,避免插件主流程里混杂大量查询语句; + 2. 所有方法都只做“数据读写”,不做业务打分,便于后续独立测试; + 3. 读取失败时统一返回空结构,保证上层逻辑可降级继续执行。 + """ + + def get_candidate_users(self, group_id: str, active_window_days: int, social_window_days: int) -> List[str]: + """获取需要参与打分的候选成员列表。 + + 候选来源采用并集策略,避免遗漏: + 1. 积分表中出现过的成员; + 2. 近期发过言的成员; + 3. 近期社交表出现过的成员(被@或主动@)。 + """ + sql = """ + SELECT user_id FROM t_user_points + WHERE group_id = %s AND user_id <> 'SYSTEM' + + UNION + + SELECT DISTINCT sender AS user_id + FROM messages + WHERE group_id = %s + AND sender IS NOT NULL + AND sender <> '' + AND timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) + + UNION + + SELECT DISTINCT user_id + FROM t_value_rank_social_daily + WHERE group_id = %s + AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) + """ + rows = self.execute_query(sql, (group_id, group_id, active_window_days, group_id, social_window_days)) or [] + return [str(r.get("user_id") or "").strip() for r in rows if str(r.get("user_id") or "").strip()] + + def get_points_map(self, group_id: str) -> Dict[str, int]: + """读取群内用户积分映射:{user_id: total_points}。""" + sql = """ + SELECT user_id, total_points + FROM t_user_points + WHERE group_id = %s + AND user_id <> 'SYSTEM' + """ + rows = self.execute_query(sql, (group_id,)) or [] + result: Dict[str, int] = {} + for row in rows: + user_id = str(row.get("user_id") or "").strip() + if not user_id: + continue + result[user_id] = int(row.get("total_points") or 0) + return result + + def get_message_stats_map(self, group_id: str, message_window_days: int, active_window_days: int) -> Dict[str, Dict[str, Any]]: + """按用户读取消息指标。 + + 返回结构示例: + { + "wxid_xxx": { + "msg_count_window": 123, + "active_days_window": 20, + "last_active_date": "2026-04-21" + } + } + """ + sql = """ + SELECT + sender AS user_id, + SUM(CASE WHEN timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) THEN 1 ELSE 0 END) AS msg_count_window, + COUNT(DISTINCT DATE(CASE WHEN timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) THEN timestamp END)) AS active_days_window, + MAX(DATE(timestamp)) AS last_active_date + FROM messages + WHERE group_id = %s + AND sender IS NOT NULL + AND sender <> '' + AND timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) + GROUP BY sender + """ + rows = self.execute_query(sql, (message_window_days, active_window_days, group_id, active_window_days)) or [] + result: Dict[str, Dict[str, Any]] = {} + for row in rows: + user_id = str(row.get("user_id") or "").strip() + if not user_id: + continue + result[user_id] = { + "msg_count_window": int(row.get("msg_count_window") or 0), + "active_days_window": int(row.get("active_days_window") or 0), + "last_active_date": row.get("last_active_date"), + } + return result + + def get_last_active_date_map(self, group_id: str) -> Dict[str, Any]: + """读取群内用户最后发言日期,用于计算潜水惩罚。""" + sql = """ + SELECT sender AS user_id, MAX(DATE(timestamp)) AS last_active_date + FROM messages + WHERE group_id = %s + AND sender IS NOT NULL + AND sender <> '' + GROUP BY sender + """ + rows = self.execute_query(sql, (group_id,)) or [] + return { + str(row.get("user_id") or "").strip(): row.get("last_active_date") + for row in rows + if str(row.get("user_id") or "").strip() + } + + def get_social_stats_map(self, group_id: str, social_window_days: int) -> Dict[str, Dict[str, Any]]: + """按用户读取社交指标(日汇总窗口)。""" + sql = """ + SELECT + user_id, + SUM(mentioned_count) AS mentioned_count_window, + SUM(mention_others_count) AS mention_others_count_window, + SUM(unique_interactors) AS unique_interactors_window, + SUM(interaction_score) AS interaction_score_window + FROM t_value_rank_social_daily + WHERE group_id = %s + AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) + GROUP BY user_id + """ + rows = self.execute_query(sql, (group_id, social_window_days)) or [] + result: Dict[str, Dict[str, Any]] = {} + for row in rows: + user_id = str(row.get("user_id") or "").strip() + if not user_id: + continue + result[user_id] = { + "mentioned_count_window": int(row.get("mentioned_count_window") or 0), + "mention_others_count_window": int(row.get("mention_others_count_window") or 0), + "unique_interactors_window": int(row.get("unique_interactors_window") or 0), + "interaction_score_window": float(row.get("interaction_score_window") or 0.0), + } + return result + + def upsert_snapshots(self, rows: List[Tuple[Any, ...]]) -> bool: + """批量写入身价快照。""" + if not rows: + return True + + sql = """ + INSERT INTO t_value_rank_snapshot ( + stat_date, group_id, user_id, score, rank_no, title, + points_total, msg_count_7d, active_days_30, inactive_days, + score_detail_json + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + score = VALUES(score), + rank_no = VALUES(rank_no), + title = VALUES(title), + points_total = VALUES(points_total), + msg_count_7d = VALUES(msg_count_7d), + active_days_30 = VALUES(active_days_30), + inactive_days = VALUES(inactive_days), + score_detail_json = VALUES(score_detail_json), + updated_at = CURRENT_TIMESTAMP + """ + return self.execute_batch(sql, rows) + + def get_user_snapshot(self, stat_date: str, group_id: str, user_id: str) -> Optional[Dict[str, Any]]: + """读取某用户在某天的身价快照。""" + sql = """ + SELECT * FROM t_value_rank_snapshot + WHERE stat_date = %s AND group_id = %s AND user_id = %s + LIMIT 1 + """ + return self.execute_query(sql, (stat_date, group_id, user_id), fetch_one=True) + + def get_yesterday_score(self, stat_date: str, group_id: str, user_id: str) -> Optional[float]: + """读取昨日分数用于计算涨跌幅。""" + sql = """ + SELECT score + FROM t_value_rank_snapshot + WHERE stat_date = DATE_SUB(%s, INTERVAL 1 DAY) + AND group_id = %s + AND user_id = %s + LIMIT 1 + """ + row = self.execute_query(sql, (stat_date, group_id, user_id), fetch_one=True) + if not row: + return None + return float(row.get("score") or 0) + + def get_today_rankings(self, stat_date: str, group_id: str, limit: int) -> List[Dict[str, Any]]: + """读取今日排行榜。""" + sql = """ + SELECT user_id, score, rank_no, title + FROM t_value_rank_snapshot + WHERE stat_date = %s + AND group_id = %s + ORDER BY rank_no ASC + LIMIT %s + """ + return self.execute_query(sql, (stat_date, group_id, limit)) or [] + + +class ValueRankPlugin(MessagePluginInterface): + """群成员身价排行插件。 + + 设计目标: + 1. 将“积分、发言、活跃、社交影响力”统一折算为可解释的身价分; + 2. 支持手动查询与后台定时重算; + 3. 通过快照持久化,支持趋势分析和涨跌说明。 + """ + + FEATURE_KEY = "VALUE_RANK" + FEATURE_DESCRIPTION = "📊 身价排行 [我的身价, 身价排行, 身价说明, 重算身价]" + + @property + def name(self) -> str: + return "身价排行" + + @property + def version(self) -> str: + return "1.0.0" + + @property + def description(self) -> str: + return "根据积分、发言、活跃与社交影响力计算群成员身价。" + + @property + def author(self) -> str: + return "ABOT Team" + + @property + def command_prefix(self) -> Optional[str]: + return "" + + @property + def commands(self) -> List[str]: + return self._commands + + @property + def feature_key(self) -> Optional[str]: + return self.FEATURE_KEY + + @property + def feature_description(self) -> Optional[str]: + return self.FEATURE_DESCRIPTION + + def __init__(self): + super().__init__() + self.feature = self.register_feature() + self.db: Optional[ValueRankDB] = None + + # 配置默认值:即使未配置 config.toml,也能以保守参数运行。 + self.enable = True + self._commands = ["我的身价", "身价排行", "身价说明", "重算身价"] + self.command_format = "我的身价 | 身价排行 [名次] | 身价说明 | 重算身价" + + self.message_window_days = 7 + self.active_window_days = 30 + self.social_window_days = 7 + + self.points_weight = 0.30 + self.message_weight = 0.35 + self.active_days_weight = 0.20 + self.social_weight = 0.15 + + self.inactivity_penalty_max = 150 + self.base_score_scale = 1000 + + self.default_rank_limit = 10 + self.max_rank_limit = 50 + + def initialize(self, context: Dict[str, Any]) -> bool: + """初始化插件与配置。""" + self.LOG = logger + cfg = self._config.get("ValueRank", {}) + + self.enable = bool(cfg.get("enable", True)) + self._commands = cfg.get("command", self._commands) + self.command_format = cfg.get("command-format", self.command_format) + + self.message_window_days = int(cfg.get("message_window_days", self.message_window_days)) + self.active_window_days = int(cfg.get("active_window_days", self.active_window_days)) + self.social_window_days = int(cfg.get("social_window_days", self.social_window_days)) + + self.points_weight = float(cfg.get("points_weight", self.points_weight)) + self.message_weight = float(cfg.get("message_weight", self.message_weight)) + self.active_days_weight = float(cfg.get("active_days_weight", self.active_days_weight)) + self.social_weight = float(cfg.get("social_weight", self.social_weight)) + + self.inactivity_penalty_max = float(cfg.get("inactivity_penalty_max", self.inactivity_penalty_max)) + self.base_score_scale = float(cfg.get("base_score_scale", self.base_score_scale)) + + self.default_rank_limit = int(cfg.get("default_rank_limit", self.default_rank_limit)) + self.max_rank_limit = int(cfg.get("max_rank_limit", self.max_rank_limit)) + + # 权重归一化:避免配置误差导致总权重不为 1。 + weight_sum = self.points_weight + self.message_weight + self.active_days_weight + self.social_weight + if weight_sum <= 0: + self.points_weight, self.message_weight, self.active_days_weight, self.social_weight = 0.30, 0.35, 0.20, 0.15 + else: + self.points_weight /= weight_sum + self.message_weight /= weight_sum + self.active_days_weight /= weight_sum + self.social_weight /= weight_sum + + self.db = ValueRankDB(DBConnectionManager.get_instance()) + self.LOG.info(f"[{self.name}] 初始化完成,命令: {self._commands}") + return True + + def start(self) -> bool: + self.status = PluginStatus.RUNNING + return True + + def stop(self) -> bool: + self.status = PluginStatus.STOPPED + return True + + def can_process(self, message: Dict[str, Any]) -> bool: + if not self.enable: + return False + content = str(message.get("content", "")).strip() + if not content: + return False + command = content.split(" ")[0] + return command in self._commands + + @plugin_stats_decorator(plugin_name="身价排行") + async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """处理用户命令入口。""" + content = str(message.get("content", "")).strip() + command = content.split(" ")[0] + sender = message.get("sender") + roomid = message.get("roomid", "") + gbm: GroupBotManager = message.get("gbm") + bot = message.get("bot") + + if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: + return False, "没有权限" + + if not roomid: + await bot.send_text_message(sender, "该功能仅支持群聊使用。", sender) + return True, "非群聊" + + if command == "我的身价": + text = await self._build_my_value_text(roomid, sender) + await bot.send_text_message(roomid, text, sender) + return True, "查询成功" + + if command == "身价排行": + limit = self._parse_rank_limit(content) + text = await self._build_ranking_text(roomid, limit) + await bot.send_text_message(roomid, text, sender) + return True, "查询成功" + + if command == "身价说明": + await bot.send_text_message(roomid, self._build_explain_text(), sender) + return True, "查询成功" + + if command == "重算身价": + # 重算属于管理动作,只允许机器人管理员或群管理员执行,避免被滥用。 + if not GroupBotManager.is_admin_for_group(sender, roomid): + await bot.send_text_message(roomid, "仅管理员可执行重算身价。", sender) + return True, "权限不足" + + stat_date = datetime.now().strftime("%Y-%m-%d") + user_count = self._recompute_group_snapshot(roomid, stat_date) + await bot.send_text_message(roomid, f"✅ 重算完成,已更新 {user_count} 名成员。", sender) + return True, "重算成功" + + await bot.send_text_message(roomid, f"❌命令格式错误\n{self.command_format}", sender) + return True, "命令错误" + + def get_schedule_actions(self) -> List[Dict[str, Any]]: + """声明可调度动作:每日凌晨全量重算。""" + return [ + { + "action_key": "value_rank_daily_recompute", + "name": "身价每日重算", + "description": "每天凌晨重算群成员身价快照", + "trigger_type": "at_times", + "trigger_config": {"time_list": ["04:00"]}, + "target_scope": "all_enabled_groups", + "target_config": {}, + "payload": {}, + "default_enabled": True, + } + ] + + async def run_scheduled_action(self, action_key: str, context: Dict[str, Any]) -> Dict[str, Any]: + """执行调度动作。""" + if action_key != "value_rank_daily_recompute": + return {"success": False, "summary": f"不支持动作: {action_key}", "detail": {}} + + stat_date = datetime.now().strftime("%Y-%m-%d") + target_groups = [str(g).strip() for g in (context.get("target_groups") or []) if str(g).strip()] + if not target_groups: + target_groups = [ + gid for gid in GroupBotManager.get_group_list() + if GroupBotManager.get_group_permission(gid, self.feature).value == "enabled" + ] + + success_groups: List[str] = [] + failed_groups: Dict[str, str] = {} + updated_users = 0 + + for gid in target_groups: + try: + updated_users += self._recompute_group_snapshot(gid, stat_date) + success_groups.append(gid) + except Exception as e: + failed_groups[gid] = str(e) + + return { + "success": len(failed_groups) == 0, + "summary": f"身价重算完成:成功{len(success_groups)}群,失败{len(failed_groups)}群", + "detail": { + "stat_date": stat_date, + "updated_users": updated_users, + "success_groups": success_groups, + "failed_groups": failed_groups, + }, + } + + def _recompute_group_snapshot(self, group_id: str, stat_date: str) -> int: + """重算单群指定日期快照。 + + 这是插件核心逻辑: + 1. 聚合各维度原始指标; + 2. 归一化并计算分数; + 3. 生成排名与称号; + 4. 持久化到快照表。 + """ + if not self.db: + return 0 + + candidates = self.db.get_candidate_users(group_id, self.active_window_days, self.social_window_days) + if not candidates: + return 0 + + points_map = self.db.get_points_map(group_id) + msg_map = self.db.get_message_stats_map(group_id, self.message_window_days, self.active_window_days) + last_active_map = self.db.get_last_active_date_map(group_id) + social_map = self.db.get_social_stats_map(group_id, self.social_window_days) + + # 组装原始指标,后续统一归一化处理。 + metrics: List[Dict[str, Any]] = [] + for user_id in candidates: + points_total = int(points_map.get(user_id, 0)) + msg_stats = msg_map.get(user_id, {}) + social_stats = social_map.get(user_id, {}) + + msg_count_7d = int(msg_stats.get("msg_count_window", 0)) + active_days_30 = int(msg_stats.get("active_days_window", 0)) + interaction_score_7d = float(social_stats.get("interaction_score_window", 0.0)) + + # inactive_days 优先使用全量最后发言日期,若无记录视为长期未活跃。 + inactive_days = 365 + last_active_date = last_active_map.get(user_id) + if last_active_date: + try: + if isinstance(last_active_date, datetime): + dt = last_active_date + else: + dt = datetime.strptime(str(last_active_date), "%Y-%m-%d") + inactive_days = max((datetime.now().date() - dt.date()).days, 0) + except Exception: + inactive_days = 365 + + metrics.append( + { + "user_id": user_id, + "points_total": points_total, + "msg_count_7d": msg_count_7d, + "active_days_30": active_days_30, + "interaction_score_7d": interaction_score_7d, + "inactive_days": inactive_days, + } + ) + + if not metrics: + return 0 + + # 计算分位阈值,避免极端值主导。 + p95_points = self._percentile95([m["points_total"] for m in metrics]) + p95_msg = self._percentile95([m["msg_count_7d"] for m in metrics]) + p95_social = self._percentile95([m["interaction_score_7d"] for m in metrics]) + + scored_rows: List[Dict[str, Any]] = [] + for m in metrics: + p_norm = self._normalize_log(m["points_total"], p95_points) + m_norm = self._normalize_linear(m["msg_count_7d"], p95_msg) + a_norm = self._normalize_linear(m["active_days_30"], self.active_window_days) + s_norm = self._normalize_linear(m["interaction_score_7d"], p95_social) + i_penalty = self._normalize_linear(m["inactive_days"], 30) + + base_score = self.base_score_scale * ( + self.points_weight * p_norm + + self.message_weight * m_norm + + self.active_days_weight * a_norm + + self.social_weight * s_norm + ) + penalty_score = self.inactivity_penalty_max * i_penalty + final_score = max(round(base_score - penalty_score, 2), 0.0) + + scored_rows.append( + { + **m, + "score": final_score, + "p_norm": p_norm, + "m_norm": m_norm, + "a_norm": a_norm, + "s_norm": s_norm, + "penalty": penalty_score, + } + ) + + # 分数高到低排序,同分时按活跃和积分补充排序,提升稳定性。 + scored_rows.sort( + key=lambda x: ( + -float(x["score"]), + -int(x["msg_count_7d"]), + -float(x["interaction_score_7d"]), + -int(x["points_total"]), + ) + ) + + total = len(scored_rows) + batch_rows: List[Tuple[Any, ...]] = [] + for idx, row in enumerate(scored_rows, start=1): + title = self._build_title(idx, total, int(row["inactive_days"])) + score_detail = { + "points_norm": round(float(row["p_norm"]), 6), + "message_norm": round(float(row["m_norm"]), 6), + "active_norm": round(float(row["a_norm"]), 6), + "social_norm": round(float(row["s_norm"]), 6), + "penalty_score": round(float(row["penalty"]), 2), + "weights": { + "points": self.points_weight, + "message": self.message_weight, + "active": self.active_days_weight, + "social": self.social_weight, + }, + } + batch_rows.append( + ( + stat_date, + group_id, + row["user_id"], + row["score"], + idx, + title, + row["points_total"], + row["msg_count_7d"], + row["active_days_30"], + row["inactive_days"], + json.dumps(score_detail, ensure_ascii=False), + ) + ) + + self.db.upsert_snapshots(batch_rows) + return len(batch_rows) + + async def _build_my_value_text(self, group_id: str, user_id: str) -> str: + """构建“我的身价”输出文本。""" + if not self.db: + return "❌ 身价模块未初始化" + + stat_date = datetime.now().strftime("%Y-%m-%d") + row = self.db.get_user_snapshot(stat_date, group_id, user_id) + + # 若当天还没有快照,按需触发一次当前群重算,保证命令可用性。 + if not row: + self._recompute_group_snapshot(group_id, stat_date) + row = self.db.get_user_snapshot(stat_date, group_id, user_id) + + if not row: + return "📊 暂无你的身价数据,请先在群里发言后再试。" + + yesterday_score = self.db.get_yesterday_score(stat_date, group_id, user_id) + current_score = float(row.get("score") or 0) + + if yesterday_score is None: + change_text = "新上榜(暂无昨日对比)" + else: + delta = current_score - float(yesterday_score) + if abs(yesterday_score) < 1e-9: + change_text = f"较昨日变化:{delta:+.2f}" + else: + pct = delta / float(yesterday_score) * 100 + change_text = f"较昨日:{pct:+.2f}%" + + nick = ContactManager.get_instance().get_group_name(group_id, user_id) or user_id + + lines = [ + f"📊 {nick} 的身价报告({stat_date})", + f"总身价:{current_score:.2f}", + f"群内排名:第 {int(row.get('rank_no') or 0)} 名({row.get('title') or '未定级'})", + "", + f"💰 积分资产:{int(row.get('points_total') or 0)}", + f"🗣️ 7日发言:{int(row.get('msg_count_7d') or 0)}", + f"📅 30日活跃:{int(row.get('active_days_30') or 0)} 天", + f"🌊 潜水天数:{int(row.get('inactive_days') or 0)} 天", + "", + change_text, + ] + return "\n".join(lines) + + async def _build_ranking_text(self, group_id: str, limit: int) -> str: + """构建“身价排行”输出文本。""" + if not self.db: + return "❌ 身价模块未初始化" + + stat_date = datetime.now().strftime("%Y-%m-%d") + rows = self.db.get_today_rankings(stat_date, group_id, limit) + if not rows: + self._recompute_group_snapshot(group_id, stat_date) + rows = self.db.get_today_rankings(stat_date, group_id, limit) + + if not rows: + return "📊 今日暂无排行数据。" + + cm = ContactManager.get_instance() + lines = [f"🏆 身价排行榜(Top{len(rows)} | {stat_date})"] + for idx, row in enumerate(rows, start=1): + user_id = str(row.get("user_id") or "") + score = float(row.get("score") or 0) + title = str(row.get("title") or "") + nick = cm.get_group_name(group_id, user_id) or user_id + medal = "🥇" if idx == 1 else "🥈" if idx == 2 else "🥉" if idx == 3 else f"{idx}." + lines.append(f"{medal} {nick} | {score:.2f} | {title}") + + return "\n".join(lines) + + def _build_explain_text(self) -> str: + """输出算法说明文本。""" + return ( + "📘 身价算法说明\n" + f"- 积分权重:{self.points_weight:.2f}\n" + f"- 发言权重:{self.message_weight:.2f}({self.message_window_days}天)\n" + f"- 活跃权重:{self.active_days_weight:.2f}({self.active_window_days}天)\n" + f"- 社交权重:{self.social_weight:.2f}({self.social_window_days}天)\n" + f"- 潜水惩罚上限:{self.inactivity_penalty_max:.0f}\n" + "- 所有维度先归一化再加权,极端值按95分位截断,减少刷屏与极值干扰。" + ) + + def _parse_rank_limit(self, content: str) -> int: + """解析排行条数参数。""" + parts = content.split() + if len(parts) < 2: + return self.default_rank_limit + try: + limit = int(parts[1]) + except Exception: + return self.default_rank_limit + limit = max(1, limit) + return min(limit, self.max_rank_limit) + + @staticmethod + def _normalize_linear(value: float, ceiling: float) -> float: + """线性归一化并截断到 [0, 1]。""" + if ceiling <= 0: + return 0.0 + return max(0.0, min(float(value) / float(ceiling), 1.0)) + + @staticmethod + def _normalize_log(value: float, ceiling: float) -> float: + """对高离散指标做对数归一化,避免大户统治。""" + if ceiling <= 0: + return 0.0 + safe_value = max(float(value), 0.0) + return max(0.0, min(math.log1p(safe_value) / math.log1p(float(ceiling)), 1.0)) + + @staticmethod + def _percentile95(values: List[float]) -> float: + """计算95分位数(最小返回1,避免除零)。""" + if not values: + return 1.0 + sorted_vals = sorted([max(float(v), 0.0) for v in values]) + n = len(sorted_vals) + idx = max(0, min(n - 1, math.ceil(0.95 * n) - 1)) + return max(sorted_vals[idx], 1.0) + + @staticmethod + def _build_title(rank_no: int, total: int, inactive_days: int) -> str: + """根据分位区间生成称号。""" + if total <= 0: + return "未定级" + + ratio = rank_no / total + if ratio <= 0.01: + return "群之巨鳄" + if ratio <= 0.05: + return "社交名流" + if ratio <= 0.20: + return "活跃中产" + if ratio <= 0.80: + return "稳定居民" + if ratio >= 0.90 and inactive_days >= 30: + return "潜水观察员" + return "潜力新人"