Files
abot/plugins/value_rank/main.py
liuwei d60d496bc3 扩展 value_rank 周报能力并新增周报命令
- 新增命令 身价周报,并接入配置与帮助文案

- 新增每周定时动作 value_rank_weekly_report_push(周一09:35)自动推送周报

- 周报内容包含:综合排行Top5、上升榜Top5、下行榜Top5(对比7天前)

- 扩展 ValueRankDB:新增按日期读取快照分数字典能力,支持周报对比计算

- 调度执行中支持周报推送并补充重算保障,确保周报数据为当天最新
2026-04-21 14:00:57 +08:00

934 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import json
import math
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import PermissionStatus, GroupBotManager
from utils.wechat.contact_manager import ContactManager
class ValueRankDB(BaseDBOperator):
"""Value Rank 数据访问层。
说明:
1. 这里集中封装 SQL避免插件主流程里混杂大量查询语句
2. 所有方法都只做“数据读写”,不做业务打分,便于后续独立测试;
3. 读取失败时统一返回空结构,保证上层逻辑可降级继续执行。
"""
def get_candidate_users(self, group_id: str, active_window_days: int, social_window_days: int) -> List[str]:
"""获取需要参与打分的候选成员列表。
候选来源采用并集策略,避免遗漏:
1. 积分表中出现过的成员;
2. 近期发过言的成员;
3. 近期社交表出现过的成员(被@或主动@)。
"""
sql = """
SELECT user_id FROM t_user_points
WHERE group_id = %s AND user_id <> 'SYSTEM'
UNION
SELECT DISTINCT sender AS user_id
FROM messages
WHERE group_id = %s
AND sender IS NOT NULL
AND sender <> ''
AND timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
UNION
SELECT DISTINCT user_id
FROM t_value_rank_social_daily
WHERE group_id = %s
AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
"""
rows = self.execute_query(sql, (group_id, group_id, active_window_days, group_id, social_window_days)) or []
return [str(r.get("user_id") or "").strip() for r in rows if str(r.get("user_id") or "").strip()]
def get_points_map(self, group_id: str) -> Dict[str, int]:
"""读取群内用户积分映射:{user_id: total_points}。"""
sql = """
SELECT user_id, total_points
FROM t_user_points
WHERE group_id = %s
AND user_id <> 'SYSTEM'
"""
rows = self.execute_query(sql, (group_id,)) or []
result: Dict[str, int] = {}
for row in rows:
user_id = str(row.get("user_id") or "").strip()
if not user_id:
continue
result[user_id] = int(row.get("total_points") or 0)
return result
def get_message_stats_map(self, group_id: str, message_window_days: int, active_window_days: int) -> Dict[str, Dict[str, Any]]:
"""按用户读取消息指标。
返回结构示例:
{
"wxid_xxx": {
"msg_count_window": 123,
"active_days_window": 20,
"last_active_date": "2026-04-21"
}
}
"""
sql = """
SELECT
sender AS user_id,
SUM(CASE WHEN timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) THEN 1 ELSE 0 END) AS msg_count_window,
COUNT(DISTINCT DATE(CASE WHEN timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) THEN timestamp END)) AS active_days_window,
MAX(DATE(timestamp)) AS last_active_date
FROM messages
WHERE group_id = %s
AND sender IS NOT NULL
AND sender <> ''
AND timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
GROUP BY sender
"""
rows = self.execute_query(sql, (message_window_days, active_window_days, group_id, active_window_days)) or []
result: Dict[str, Dict[str, Any]] = {}
for row in rows:
user_id = str(row.get("user_id") or "").strip()
if not user_id:
continue
result[user_id] = {
"msg_count_window": int(row.get("msg_count_window") or 0),
"active_days_window": int(row.get("active_days_window") or 0),
"last_active_date": row.get("last_active_date"),
}
return result
def get_last_active_date_map(self, group_id: str) -> Dict[str, Any]:
"""读取群内用户最后发言日期,用于计算潜水惩罚。"""
sql = """
SELECT sender AS user_id, MAX(DATE(timestamp)) AS last_active_date
FROM messages
WHERE group_id = %s
AND sender IS NOT NULL
AND sender <> ''
GROUP BY sender
"""
rows = self.execute_query(sql, (group_id,)) or []
return {
str(row.get("user_id") or "").strip(): row.get("last_active_date")
for row in rows
if str(row.get("user_id") or "").strip()
}
def get_social_stats_map(self, group_id: str, social_window_days: int) -> Dict[str, Dict[str, Any]]:
"""按用户读取社交指标(日汇总窗口)。"""
sql = """
SELECT
user_id,
SUM(mentioned_count) AS mentioned_count_window,
SUM(mention_others_count) AS mention_others_count_window,
SUM(unique_interactors) AS unique_interactors_window,
SUM(interaction_score) AS interaction_score_window
FROM t_value_rank_social_daily
WHERE group_id = %s
AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY user_id
"""
rows = self.execute_query(sql, (group_id, social_window_days)) or []
result: Dict[str, Dict[str, Any]] = {}
for row in rows:
user_id = str(row.get("user_id") or "").strip()
if not user_id:
continue
result[user_id] = {
"mentioned_count_window": int(row.get("mentioned_count_window") or 0),
"mention_others_count_window": int(row.get("mention_others_count_window") or 0),
"unique_interactors_window": int(row.get("unique_interactors_window") or 0),
"interaction_score_window": float(row.get("interaction_score_window") or 0.0),
}
return result
def upsert_snapshots(self, rows: List[Tuple[Any, ...]]) -> bool:
"""批量写入身价快照。"""
if not rows:
return True
sql = """
INSERT INTO t_value_rank_snapshot (
stat_date, group_id, user_id, score, rank_no, title,
points_total, msg_count_7d, active_days_30, inactive_days,
score_detail_json
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
score = VALUES(score),
rank_no = VALUES(rank_no),
title = VALUES(title),
points_total = VALUES(points_total),
msg_count_7d = VALUES(msg_count_7d),
active_days_30 = VALUES(active_days_30),
inactive_days = VALUES(inactive_days),
score_detail_json = VALUES(score_detail_json),
updated_at = CURRENT_TIMESTAMP
"""
return self.execute_batch(sql, rows)
def get_user_snapshot(self, stat_date: str, group_id: str, user_id: str) -> Optional[Dict[str, Any]]:
"""读取某用户在某天的身价快照。"""
sql = """
SELECT * FROM t_value_rank_snapshot
WHERE stat_date = %s AND group_id = %s AND user_id = %s
LIMIT 1
"""
return self.execute_query(sql, (stat_date, group_id, user_id), fetch_one=True)
def get_yesterday_score(self, stat_date: str, group_id: str, user_id: str) -> Optional[float]:
"""读取昨日分数用于计算涨跌幅。"""
sql = """
SELECT score
FROM t_value_rank_snapshot
WHERE stat_date = DATE_SUB(%s, INTERVAL 1 DAY)
AND group_id = %s
AND user_id = %s
LIMIT 1
"""
row = self.execute_query(sql, (stat_date, group_id, user_id), fetch_one=True)
if not row:
return None
return float(row.get("score") or 0)
def get_today_rankings(self, stat_date: str, group_id: str, limit: int) -> List[Dict[str, Any]]:
"""读取今日排行榜。"""
sql = """
SELECT user_id, score, rank_no, title
FROM t_value_rank_snapshot
WHERE stat_date = %s
AND group_id = %s
ORDER BY rank_no ASC
LIMIT %s
"""
return self.execute_query(sql, (stat_date, group_id, limit)) or []
def get_social_hot_rankings(self, group_id: str, social_window_days: int, limit: int) -> List[Dict[str, Any]]:
"""读取社交热度榜(窗口聚合)。"""
sql = """
SELECT
user_id,
SUM(mentioned_count) AS mentioned_count_window,
SUM(mention_others_count) AS mention_others_count_window,
SUM(unique_interactors) AS unique_interactors_window,
SUM(interaction_score) AS interaction_score_window
FROM t_value_rank_social_daily
WHERE group_id = %s
AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY user_id
ORDER BY interaction_score_window DESC, mentioned_count_window DESC
LIMIT %s
"""
return self.execute_query(sql, (group_id, social_window_days, limit)) or []
def get_top_partner_pairs(self, group_id: str, social_window_days: int, limit: int) -> List[Dict[str, Any]]:
"""读取搭子榜(无向边聚合)。"""
sql = """
SELECT
LEAST(from_user_id, to_user_id) AS user_a,
GREATEST(from_user_id, to_user_id) AS user_b,
SUM(mention_count) AS mention_count_window,
SUM(interaction_score) AS interaction_score_window
FROM t_social_edges_daily
WHERE group_id = %s
AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY LEAST(from_user_id, to_user_id), GREATEST(from_user_id, to_user_id)
ORDER BY interaction_score_window DESC, mention_count_window DESC
LIMIT %s
"""
return self.execute_query(sql, (group_id, social_window_days, limit)) or []
def get_snapshot_score_map(self, stat_date: str, group_id: str) -> Dict[str, float]:
"""读取某天群内所有用户分数字典:{user_id: score}。"""
sql = """
SELECT user_id, score
FROM t_value_rank_snapshot
WHERE stat_date = %s
AND group_id = %s
"""
rows = self.execute_query(sql, (stat_date, group_id)) or []
result: Dict[str, float] = {}
for row in rows:
user_id = str(row.get("user_id") or "").strip()
if not user_id:
continue
result[user_id] = float(row.get("score") or 0.0)
return result
class ValueRankPlugin(MessagePluginInterface):
"""群成员身价排行插件。
设计目标:
1. 将“积分、发言、活跃、社交影响力”统一折算为可解释的身价分;
2. 支持手动查询与后台定时重算;
3. 通过快照持久化,支持趋势分析和涨跌说明。
"""
FEATURE_KEY = "VALUE_RANK"
FEATURE_DESCRIPTION = "📊 身价排行 [我的身价, 身价排行, 身价说明, 重算身价]"
@property
def name(self) -> str:
return "身价排行"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "根据积分、发言、活跃与社交影响力计算群成员身价。"
@property
def author(self) -> str:
return "ABOT Team"
@property
def command_prefix(self) -> Optional[str]:
return ""
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.feature = self.register_feature()
self.db: Optional[ValueRankDB] = None
# 配置默认值:即使未配置 config.toml也能以保守参数运行。
self.enable = True
self._commands = ["我的身价", "身价排行", "社交热度榜", "搭子榜", "身价周报", "身价说明", "重算身价"]
self.command_format = "我的身价 | 身价排行 [名次] | 身价说明 | 重算身价"
self.message_window_days = 7
self.active_window_days = 30
self.social_window_days = 7
self.points_weight = 0.30
self.message_weight = 0.35
self.active_days_weight = 0.20
self.social_weight = 0.15
self.inactivity_penalty_max = 150
self.base_score_scale = 1000
self.default_rank_limit = 10
self.max_rank_limit = 50
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件与配置。"""
self.LOG = logger
cfg = self._config.get("ValueRank", {})
self.enable = bool(cfg.get("enable", True))
self._commands = cfg.get("command", self._commands)
self.command_format = cfg.get("command-format", self.command_format)
self.message_window_days = int(cfg.get("message_window_days", self.message_window_days))
self.active_window_days = int(cfg.get("active_window_days", self.active_window_days))
self.social_window_days = int(cfg.get("social_window_days", self.social_window_days))
self.points_weight = float(cfg.get("points_weight", self.points_weight))
self.message_weight = float(cfg.get("message_weight", self.message_weight))
self.active_days_weight = float(cfg.get("active_days_weight", self.active_days_weight))
self.social_weight = float(cfg.get("social_weight", self.social_weight))
self.inactivity_penalty_max = float(cfg.get("inactivity_penalty_max", self.inactivity_penalty_max))
self.base_score_scale = float(cfg.get("base_score_scale", self.base_score_scale))
self.default_rank_limit = int(cfg.get("default_rank_limit", self.default_rank_limit))
self.max_rank_limit = int(cfg.get("max_rank_limit", self.max_rank_limit))
# 权重归一化:避免配置误差导致总权重不为 1。
weight_sum = self.points_weight + self.message_weight + self.active_days_weight + self.social_weight
if weight_sum <= 0:
self.points_weight, self.message_weight, self.active_days_weight, self.social_weight = 0.30, 0.35, 0.20, 0.15
else:
self.points_weight /= weight_sum
self.message_weight /= weight_sum
self.active_days_weight /= weight_sum
self.social_weight /= weight_sum
self.db = ValueRankDB(DBConnectionManager.get_instance())
self.LOG.info(f"[{self.name}] 初始化完成,命令: {self._commands}")
return True
def start(self) -> bool:
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
if not self.enable:
return False
content = str(message.get("content", "")).strip()
if not content:
return False
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="身价排行")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理用户命令入口。"""
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot = message.get("bot")
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
if not roomid:
await bot.send_text_message(sender, "该功能仅支持群聊使用。", sender)
return True, "非群聊"
if command == "我的身价":
text = await self._build_my_value_text(roomid, sender)
await bot.send_text_message(roomid, text, sender)
return True, "查询成功"
if command == "身价排行":
limit = self._parse_rank_limit(content)
text = await self._build_ranking_text(roomid, limit)
await bot.send_text_message(roomid, text, sender)
return True, "查询成功"
if command == "社交热度榜":
limit = self._parse_rank_limit(content)
text = await self._build_social_hot_text(roomid, limit)
await bot.send_text_message(roomid, text, sender)
return True, "查询成功"
if command == "搭子榜":
limit = self._parse_rank_limit(content)
text = await self._build_partner_pairs_text(roomid, limit)
await bot.send_text_message(roomid, text, sender)
return True, "查询成功"
if command == "身价周报":
text = await self._build_weekly_report_text(roomid)
await bot.send_text_message(roomid, text, sender)
return True, "查询成功"
if command == "身价说明":
await bot.send_text_message(roomid, self._build_explain_text(), sender)
return True, "查询成功"
if command == "重算身价":
# 重算属于管理动作,只允许机器人管理员或群管理员执行,避免被滥用。
if not GroupBotManager.is_admin_for_group(sender, roomid):
await bot.send_text_message(roomid, "仅管理员可执行重算身价。", sender)
return True, "权限不足"
stat_date = datetime.now().strftime("%Y-%m-%d")
user_count = self._recompute_group_snapshot(roomid, stat_date)
await bot.send_text_message(roomid, f"✅ 重算完成,已更新 {user_count} 名成员。", sender)
return True, "重算成功"
await bot.send_text_message(roomid, f"❌命令格式错误\n{self.command_format}", sender)
return True, "命令错误"
def get_schedule_actions(self) -> List[Dict[str, Any]]:
"""声明可调度动作:每日重算 + 每周周报。"""
return [
{
"action_key": "value_rank_daily_recompute",
"name": "身价每日重算",
"description": "每天凌晨重算群成员身价快照",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["04:00"]},
"target_scope": "all_enabled_groups",
"target_config": {},
"payload": {},
"default_enabled": True,
},
{
"action_key": "value_rank_weekly_report_push",
"name": "身价周报推送",
"description": "每周推送身价周报(上升榜/下降榜/综合排行)",
"trigger_type": "every_week_time",
# weekday 取值与 datetime.weekday() 一致:周一=0 ... 周日=6。
"trigger_config": {"weekday": 0, "time_str": "09:35"},
"target_scope": "all_enabled_groups",
"target_config": {},
"payload": {},
"default_enabled": True,
},
]
async def run_scheduled_action(self, action_key: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""执行调度动作。"""
if action_key not in {"value_rank_daily_recompute", "value_rank_weekly_report_push"}:
return {"success": False, "summary": f"不支持动作: {action_key}", "detail": {}}
target_groups = [str(g).strip() for g in (context.get("target_groups") or []) if str(g).strip()]
if not target_groups:
target_groups = [
gid for gid in GroupBotManager.get_group_list()
if GroupBotManager.get_group_permission(gid, self.feature).value == "enabled"
]
success_groups: List[str] = []
failed_groups: Dict[str, str] = {}
updated_users = 0
pushed_groups: List[str] = []
stat_date = datetime.now().strftime("%Y-%m-%d")
bot = context.get("bot") or getattr(self, "bot", None)
# 周报任务先确保当日快照存在,再执行推送,避免“有报表命令但无数据”。
if action_key == "value_rank_weekly_report_push" and not bot:
return {"success": False, "summary": "周报推送失败bot 未注入", "detail": {}}
for gid in target_groups:
try:
# 每个群都先重算一次,确保报告与排行数据是“当天最新口径”。
updated_users += self._recompute_group_snapshot(gid, stat_date)
if action_key == "value_rank_weekly_report_push":
weekly_text = await self._build_weekly_report_text(gid)
await bot.send_text_message(gid, weekly_text, "")
pushed_groups.append(gid)
success_groups.append(gid)
except Exception as e:
failed_groups[gid] = str(e)
if action_key == "value_rank_weekly_report_push":
return {
"success": len(failed_groups) == 0,
"summary": f"身价周报完成:推送{len(pushed_groups)}群,失败{len(failed_groups)}",
"detail": {
"stat_date": stat_date,
"updated_users": updated_users,
"pushed_groups": pushed_groups,
"failed_groups": failed_groups,
},
}
return {
"success": len(failed_groups) == 0,
"summary": f"身价重算完成:成功{len(success_groups)}群,失败{len(failed_groups)}",
"detail": {
"stat_date": stat_date,
"updated_users": updated_users,
"success_groups": success_groups,
"failed_groups": failed_groups,
},
}
def _recompute_group_snapshot(self, group_id: str, stat_date: str) -> int:
"""重算单群指定日期快照。
这是插件核心逻辑:
1. 聚合各维度原始指标;
2. 归一化并计算分数;
3. 生成排名与称号;
4. 持久化到快照表。
"""
if not self.db:
return 0
candidates = self.db.get_candidate_users(group_id, self.active_window_days, self.social_window_days)
if not candidates:
return 0
points_map = self.db.get_points_map(group_id)
msg_map = self.db.get_message_stats_map(group_id, self.message_window_days, self.active_window_days)
last_active_map = self.db.get_last_active_date_map(group_id)
social_map = self.db.get_social_stats_map(group_id, self.social_window_days)
# 组装原始指标,后续统一归一化处理。
metrics: List[Dict[str, Any]] = []
for user_id in candidates:
points_total = int(points_map.get(user_id, 0))
msg_stats = msg_map.get(user_id, {})
social_stats = social_map.get(user_id, {})
msg_count_7d = int(msg_stats.get("msg_count_window", 0))
active_days_30 = int(msg_stats.get("active_days_window", 0))
interaction_score_7d = float(social_stats.get("interaction_score_window", 0.0))
# inactive_days 优先使用全量最后发言日期,若无记录视为长期未活跃。
inactive_days = 365
last_active_date = last_active_map.get(user_id)
if last_active_date:
try:
if isinstance(last_active_date, datetime):
dt = last_active_date
else:
dt = datetime.strptime(str(last_active_date), "%Y-%m-%d")
inactive_days = max((datetime.now().date() - dt.date()).days, 0)
except Exception:
inactive_days = 365
metrics.append(
{
"user_id": user_id,
"points_total": points_total,
"msg_count_7d": msg_count_7d,
"active_days_30": active_days_30,
"interaction_score_7d": interaction_score_7d,
"inactive_days": inactive_days,
}
)
if not metrics:
return 0
# 计算分位阈值,避免极端值主导。
p95_points = self._percentile95([m["points_total"] for m in metrics])
p95_msg = self._percentile95([m["msg_count_7d"] for m in metrics])
p95_social = self._percentile95([m["interaction_score_7d"] for m in metrics])
scored_rows: List[Dict[str, Any]] = []
for m in metrics:
p_norm = self._normalize_log(m["points_total"], p95_points)
m_norm = self._normalize_linear(m["msg_count_7d"], p95_msg)
a_norm = self._normalize_linear(m["active_days_30"], self.active_window_days)
s_norm = self._normalize_linear(m["interaction_score_7d"], p95_social)
i_penalty = self._normalize_linear(m["inactive_days"], 30)
base_score = self.base_score_scale * (
self.points_weight * p_norm
+ self.message_weight * m_norm
+ self.active_days_weight * a_norm
+ self.social_weight * s_norm
)
penalty_score = self.inactivity_penalty_max * i_penalty
final_score = max(round(base_score - penalty_score, 2), 0.0)
scored_rows.append(
{
**m,
"score": final_score,
"p_norm": p_norm,
"m_norm": m_norm,
"a_norm": a_norm,
"s_norm": s_norm,
"penalty": penalty_score,
}
)
# 分数高到低排序,同分时按活跃和积分补充排序,提升稳定性。
scored_rows.sort(
key=lambda x: (
-float(x["score"]),
-int(x["msg_count_7d"]),
-float(x["interaction_score_7d"]),
-int(x["points_total"]),
)
)
total = len(scored_rows)
batch_rows: List[Tuple[Any, ...]] = []
for idx, row in enumerate(scored_rows, start=1):
title = self._build_title(idx, total, int(row["inactive_days"]))
score_detail = {
"points_norm": round(float(row["p_norm"]), 6),
"message_norm": round(float(row["m_norm"]), 6),
"active_norm": round(float(row["a_norm"]), 6),
"social_norm": round(float(row["s_norm"]), 6),
"penalty_score": round(float(row["penalty"]), 2),
"weights": {
"points": self.points_weight,
"message": self.message_weight,
"active": self.active_days_weight,
"social": self.social_weight,
},
}
batch_rows.append(
(
stat_date,
group_id,
row["user_id"],
row["score"],
idx,
title,
row["points_total"],
row["msg_count_7d"],
row["active_days_30"],
row["inactive_days"],
json.dumps(score_detail, ensure_ascii=False),
)
)
self.db.upsert_snapshots(batch_rows)
return len(batch_rows)
async def _build_my_value_text(self, group_id: str, user_id: str) -> str:
"""构建“我的身价”输出文本。"""
if not self.db:
return "❌ 身价模块未初始化"
stat_date = datetime.now().strftime("%Y-%m-%d")
row = self.db.get_user_snapshot(stat_date, group_id, user_id)
# 若当天还没有快照,按需触发一次当前群重算,保证命令可用性。
if not row:
self._recompute_group_snapshot(group_id, stat_date)
row = self.db.get_user_snapshot(stat_date, group_id, user_id)
if not row:
return "📊 暂无你的身价数据,请先在群里发言后再试。"
yesterday_score = self.db.get_yesterday_score(stat_date, group_id, user_id)
current_score = float(row.get("score") or 0)
if yesterday_score is None:
change_text = "新上榜(暂无昨日对比)"
else:
delta = current_score - float(yesterday_score)
if abs(yesterday_score) < 1e-9:
change_text = f"较昨日变化:{delta:+.2f}"
else:
pct = delta / float(yesterday_score) * 100
change_text = f"较昨日:{pct:+.2f}%"
nick = ContactManager.get_instance().get_group_name(group_id, user_id) or user_id
lines = [
f"📊 {nick} 的身价报告({stat_date}",
f"总身价:{current_score:.2f}",
f"群内排名:第 {int(row.get('rank_no') or 0)} 名({row.get('title') or '未定级'}",
"",
f"💰 积分资产:{int(row.get('points_total') or 0)}",
f"🗣️ 7日发言{int(row.get('msg_count_7d') or 0)}",
f"📅 30日活跃{int(row.get('active_days_30') or 0)}",
f"🌊 潜水天数:{int(row.get('inactive_days') or 0)}",
"",
change_text,
]
return "\n".join(lines)
async def _build_ranking_text(self, group_id: str, limit: int) -> str:
"""构建“身价排行”输出文本。"""
if not self.db:
return "❌ 身价模块未初始化"
stat_date = datetime.now().strftime("%Y-%m-%d")
rows = self.db.get_today_rankings(stat_date, group_id, limit)
if not rows:
self._recompute_group_snapshot(group_id, stat_date)
rows = self.db.get_today_rankings(stat_date, group_id, limit)
if not rows:
return "📊 今日暂无排行数据。"
cm = ContactManager.get_instance()
lines = [f"🏆 身价排行榜Top{len(rows)} | {stat_date}"]
for idx, row in enumerate(rows, start=1):
user_id = str(row.get("user_id") or "")
score = float(row.get("score") or 0)
title = str(row.get("title") or "")
nick = cm.get_group_name(group_id, user_id) or user_id
medal = "🥇" if idx == 1 else "🥈" if idx == 2 else "🥉" if idx == 3 else f"{idx}."
lines.append(f"{medal} {nick} | {score:.2f} | {title}")
return "\n".join(lines)
async def _build_social_hot_text(self, group_id: str, limit: int) -> str:
"""构建“社交热度榜”输出文本。"""
if not self.db:
return "❌ 身价模块未初始化"
rows = self.db.get_social_hot_rankings(group_id, self.social_window_days, limit)
if not rows:
return "📊 近期暂无社交热度数据。"
cm = ContactManager.get_instance()
lines = [f"🔥 社交热度榜(近{self.social_window_days}天 Top{len(rows)}"]
for idx, row in enumerate(rows, start=1):
user_id = str(row.get("user_id") or "")
score = float(row.get("interaction_score_window") or 0.0)
mentioned_count = int(row.get("mentioned_count_window") or 0)
mention_others_count = int(row.get("mention_others_count_window") or 0)
nick = cm.get_group_name(group_id, user_id) or user_id
medal = "🥇" if idx == 1 else "🥈" if idx == 2 else "🥉" if idx == 3 else f"{idx}."
lines.append(
f"{medal} {nick} | 热度{score:.1f} | 被@{mentioned_count} | 主动@{mention_others_count}"
)
return "\n".join(lines)
async def _build_partner_pairs_text(self, group_id: str, limit: int) -> str:
"""构建“搭子榜”输出文本。"""
if not self.db:
return "❌ 身价模块未初始化"
rows = self.db.get_top_partner_pairs(group_id, self.social_window_days, limit)
if not rows:
return "📊 近期暂无搭子关系数据。"
cm = ContactManager.get_instance()
lines = [f"🤝 搭子榜(近{self.social_window_days}天 Top{len(rows)}"]
for idx, row in enumerate(rows, start=1):
user_a = str(row.get("user_a") or "")
user_b = str(row.get("user_b") or "")
nick_a = cm.get_group_name(group_id, user_a) or user_a
nick_b = cm.get_group_name(group_id, user_b) or user_b
mention_count = int(row.get("mention_count_window") or 0)
score = float(row.get("interaction_score_window") or 0.0)
medal = "🥇" if idx == 1 else "🥈" if idx == 2 else "🥉" if idx == 3 else f"{idx}."
lines.append(f"{medal} {nick_a} × {nick_b} | 互动{score:.1f} | @次数{mention_count}")
return "\n".join(lines)
async def _build_weekly_report_text(self, group_id: str) -> str:
"""构建“身价周报”文本。
周报口径:
1. 综合排行:取今日 Top5
2. 上升榜/下降榜:对比“今日分数 - 7天前分数”
3. 若7天前无数据则按 0 处理,保证新群也可输出。
"""
if not self.db:
return "❌ 身价模块未初始化"
today = datetime.now().strftime("%Y-%m-%d")
# 使用 SQL DATE_SUB 也可计算,但这里使用 Python 日期便于阅读与调试。
base_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
# 确保今天有快照,避免周报命令调用时出现空数据。
today_rank_rows = self.db.get_today_rankings(today, group_id, 50)
if not today_rank_rows:
self._recompute_group_snapshot(group_id, today)
today_rank_rows = self.db.get_today_rankings(today, group_id, 50)
if not today_rank_rows:
return "📊 本周暂无可用身价数据。"
today_score_map = self.db.get_snapshot_score_map(today, group_id)
base_score_map = self.db.get_snapshot_score_map(base_date, group_id)
# 计算用户分数变化,并做升降序榜单。
delta_rows: List[Tuple[str, float, float, float]] = []
for user_id, today_score in today_score_map.items():
old_score = float(base_score_map.get(user_id, 0.0))
delta = round(float(today_score) - old_score, 2)
delta_rows.append((user_id, float(today_score), old_score, delta))
delta_rows_sorted_up = sorted(delta_rows, key=lambda x: x[3], reverse=True)
delta_rows_sorted_down = sorted(delta_rows, key=lambda x: x[3])
cm = ContactManager.get_instance()
lines: List[str] = [
f"📈 身价周报({today}",
f"对比基线:{base_date}",
"",
"🏆 本周综合排行 Top5",
]
for idx, row in enumerate(today_rank_rows[:5], start=1):
uid = str(row.get("user_id") or "")
nick = cm.get_group_name(group_id, uid) or uid
score = float(row.get("score") or 0.0)
title = str(row.get("title") or "")
lines.append(f"{idx}. {nick} | {score:.2f} | {title}")
lines.append("")
lines.append("🚀 本周上升榜 Top5")
for idx, (uid, today_score, old_score, delta) in enumerate(delta_rows_sorted_up[:5], start=1):
nick = cm.get_group_name(group_id, uid) or uid
lines.append(f"{idx}. {nick} | {old_score:.2f} -> {today_score:.2f} | {delta:+.2f}")
lines.append("")
lines.append("🧊 本周波动下行 Top5")
for idx, (uid, today_score, old_score, delta) in enumerate(delta_rows_sorted_down[:5], start=1):
nick = cm.get_group_name(group_id, uid) or uid
lines.append(f"{idx}. {nick} | {old_score:.2f} -> {today_score:.2f} | {delta:+.2f}")
lines.append("")
lines.append("提示:分数由积分/发言/活跃/社交影响力综合计算。")
return "\n".join(lines)
def _build_explain_text(self) -> str:
"""输出算法说明文本。"""
return (
"📘 身价算法说明\n"
f"- 积分权重:{self.points_weight:.2f}\n"
f"- 发言权重:{self.message_weight:.2f}{self.message_window_days}天)\n"
f"- 活跃权重:{self.active_days_weight:.2f}{self.active_window_days}天)\n"
f"- 社交权重:{self.social_weight:.2f}{self.social_window_days}天)\n"
f"- 潜水惩罚上限:{self.inactivity_penalty_max:.0f}\n"
"- 所有维度先归一化再加权极端值按95分位截断减少刷屏与极值干扰。"
)
def _parse_rank_limit(self, content: str) -> int:
"""解析排行条数参数。"""
parts = content.split()
if len(parts) < 2:
return self.default_rank_limit
try:
limit = int(parts[1])
except Exception:
return self.default_rank_limit
limit = max(1, limit)
return min(limit, self.max_rank_limit)
@staticmethod
def _normalize_linear(value: float, ceiling: float) -> float:
"""线性归一化并截断到 [0, 1]。"""
if ceiling <= 0:
return 0.0
return max(0.0, min(float(value) / float(ceiling), 1.0))
@staticmethod
def _normalize_log(value: float, ceiling: float) -> float:
"""对高离散指标做对数归一化,避免大户统治。"""
if ceiling <= 0:
return 0.0
safe_value = max(float(value), 0.0)
return max(0.0, min(math.log1p(safe_value) / math.log1p(float(ceiling)), 1.0))
@staticmethod
def _percentile95(values: List[float]) -> float:
"""计算95分位数最小返回1避免除零"""
if not values:
return 1.0
sorted_vals = sorted([max(float(v), 0.0) for v in values])
n = len(sorted_vals)
idx = max(0, min(n - 1, math.ceil(0.95 * n) - 1))
return max(sorted_vals[idx], 1.0)
@staticmethod
def _build_title(rank_no: int, total: int, inactive_days: int) -> str:
"""根据分位区间生成称号。"""
if total <= 0:
return "未定级"
ratio = rank_no / total
if ratio <= 0.01:
return "群之巨鳄"
if ratio <= 0.05:
return "社交名流"
if ratio <= 0.20:
return "活跃中产"
if ratio <= 0.80:
return "稳定居民"
if ratio >= 0.90 and inactive_days >= 30:
return "潜水观察员"
return "潜力新人"