Files
abot/plugins/value_rank/main.py

2086 lines
92 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import base64
import json
import math
import mimetypes
import re
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.markdown_to_image import html_to_image
from utils.revoke.message_auto_revoke import MessageAutoRevoke
from utils.robot_cmd.robot_command import PermissionStatus, GroupBotManager
from utils.wechat.contact_manager import ContactManager
class ValueRankDB(BaseDBOperator):
"""Value Rank 数据访问层。
说明:
1. 这里集中封装 SQL避免插件主流程里混杂大量查询语句
2. 所有方法都只做“数据读写”,不做业务打分,便于后续独立测试;
3. 读取失败时统一返回空结构,保证上层逻辑可降级继续执行。
"""
def get_candidate_users(self, group_id: str, active_window_days: int, social_window_days: int) -> List[str]:
"""获取需要参与打分的候选成员列表。
候选来源采用并集策略,避免遗漏:
1. 积分表中出现过的成员;
2. 近期发过言的成员;
3. 近期社交表出现过的成员(被@或主动@)。
"""
sql = """
SELECT user_id FROM t_user_points
WHERE group_id = %s AND user_id <> 'SYSTEM'
UNION
SELECT DISTINCT sender AS user_id
FROM messages
WHERE group_id = %s
AND sender IS NOT NULL
AND sender <> ''
AND timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
UNION
SELECT DISTINCT user_id
FROM t_value_rank_social_daily
WHERE group_id = %s
AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
"""
rows = self.execute_query(sql, (group_id, group_id, active_window_days, group_id, social_window_days)) or []
return [str(r.get("user_id") or "").strip() for r in rows if str(r.get("user_id") or "").strip()]
def get_points_map(self, group_id: str) -> Dict[str, int]:
"""读取群内用户积分映射:{user_id: total_points}。"""
sql = """
SELECT user_id, total_points
FROM t_user_points
WHERE group_id = %s
AND user_id <> 'SYSTEM'
"""
rows = self.execute_query(sql, (group_id,)) or []
result: Dict[str, int] = {}
for row in rows:
user_id = str(row.get("user_id") or "").strip()
if not user_id:
continue
result[user_id] = int(row.get("total_points") or 0)
return result
def get_message_stats_map(self, group_id: str, message_window_days: int, active_window_days: int) -> Dict[str, Dict[str, Any]]:
"""按用户读取消息指标。
返回结构示例:
{
"wxid_xxx": {
"msg_count_window": 123,
"active_days_window": 20,
"last_active_date": "2026-04-21"
}
}
"""
sql = """
SELECT
sender AS user_id,
SUM(CASE WHEN timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) THEN 1 ELSE 0 END) AS msg_count_window,
COUNT(DISTINCT DATE(CASE WHEN timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) THEN timestamp END)) AS active_days_window,
MAX(DATE(timestamp)) AS last_active_date
FROM messages
WHERE group_id = %s
AND sender IS NOT NULL
AND sender <> ''
AND timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
GROUP BY sender
"""
rows = self.execute_query(sql, (message_window_days, active_window_days, group_id, active_window_days)) or []
result: Dict[str, Dict[str, Any]] = {}
for row in rows:
user_id = str(row.get("user_id") or "").strip()
if not user_id:
continue
result[user_id] = {
"msg_count_window": int(row.get("msg_count_window") or 0),
"active_days_window": int(row.get("active_days_window") or 0),
"last_active_date": row.get("last_active_date"),
}
return result
def get_last_active_date_map(self, group_id: str) -> Dict[str, Any]:
"""读取群内用户最后发言日期,用于计算潜水惩罚。"""
sql = """
SELECT sender AS user_id, MAX(DATE(timestamp)) AS last_active_date
FROM messages
WHERE group_id = %s
AND sender IS NOT NULL
AND sender <> ''
GROUP BY sender
"""
rows = self.execute_query(sql, (group_id,)) or []
return {
str(row.get("user_id") or "").strip(): row.get("last_active_date")
for row in rows
if str(row.get("user_id") or "").strip()
}
def get_social_stats_map(self, group_id: str, social_window_days: int) -> Dict[str, Dict[str, Any]]:
"""按用户读取社交指标(日汇总窗口)。"""
sql = """
SELECT
user_id,
SUM(mentioned_count) AS mentioned_count_window,
SUM(mention_others_count) AS mention_others_count_window,
SUM(unique_interactors) AS unique_interactors_window,
SUM(interaction_score) AS interaction_score_window
FROM t_value_rank_social_daily
WHERE group_id = %s
AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY user_id
"""
rows = self.execute_query(sql, (group_id, social_window_days)) or []
result: Dict[str, Dict[str, Any]] = {}
for row in rows:
user_id = str(row.get("user_id") or "").strip()
if not user_id:
continue
result[user_id] = {
"mentioned_count_window": int(row.get("mentioned_count_window") or 0),
"mention_others_count_window": int(row.get("mention_others_count_window") or 0),
"unique_interactors_window": int(row.get("unique_interactors_window") or 0),
"interaction_score_window": float(row.get("interaction_score_window") or 0.0),
}
return result
def upsert_snapshots(self, rows: List[Tuple[Any, ...]]) -> bool:
"""批量写入身价快照。"""
if not rows:
return True
sql = """
INSERT INTO t_value_rank_snapshot (
stat_date, group_id, user_id, score, rank_no, title,
points_total, msg_count_7d, active_days_30, inactive_days,
score_detail_json
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
score = VALUES(score),
rank_no = VALUES(rank_no),
title = VALUES(title),
points_total = VALUES(points_total),
msg_count_7d = VALUES(msg_count_7d),
active_days_30 = VALUES(active_days_30),
inactive_days = VALUES(inactive_days),
score_detail_json = VALUES(score_detail_json),
updated_at = CURRENT_TIMESTAMP
"""
return self.execute_batch(sql, rows)
def get_user_snapshot(self, stat_date: str, group_id: str, user_id: str) -> Optional[Dict[str, Any]]:
"""读取某用户在某天的身价快照。"""
sql = """
SELECT * FROM t_value_rank_snapshot
WHERE stat_date = %s AND group_id = %s AND user_id = %s
LIMIT 1
"""
return self.execute_query(sql, (stat_date, group_id, user_id), fetch_one=True)
def get_yesterday_score(self, stat_date: str, group_id: str, user_id: str) -> Optional[float]:
"""读取昨日分数用于计算涨跌幅。"""
sql = """
SELECT score
FROM t_value_rank_snapshot
WHERE stat_date = DATE_SUB(%s, INTERVAL 1 DAY)
AND group_id = %s
AND user_id = %s
LIMIT 1
"""
row = self.execute_query(sql, (stat_date, group_id, user_id), fetch_one=True)
if not row:
return None
return float(row.get("score") or 0)
def get_today_rankings(self, stat_date: str, group_id: str, limit: int) -> List[Dict[str, Any]]:
"""读取今日排行榜。"""
sql = """
SELECT user_id, score, rank_no, title
FROM t_value_rank_snapshot
WHERE stat_date = %s
AND group_id = %s
ORDER BY rank_no ASC
LIMIT %s
"""
return self.execute_query(sql, (stat_date, group_id, limit)) or []
def get_social_hot_rankings(self, group_id: str, social_window_days: int, limit: int) -> List[Dict[str, Any]]:
"""读取社交热度榜(窗口聚合)。"""
sql = """
SELECT
user_id,
SUM(mentioned_count) AS mentioned_count_window,
SUM(mention_others_count) AS mention_others_count_window,
SUM(unique_interactors) AS unique_interactors_window,
SUM(interaction_score) AS interaction_score_window
FROM t_value_rank_social_daily
WHERE group_id = %s
AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY user_id
ORDER BY interaction_score_window DESC, mentioned_count_window DESC
LIMIT %s
"""
return self.execute_query(sql, (group_id, social_window_days, limit)) or []
def get_top_partner_pairs(self, group_id: str, social_window_days: int, limit: int) -> List[Dict[str, Any]]:
"""读取搭子榜(无向边聚合)。"""
sql = """
SELECT
LEAST(from_user_id, to_user_id) AS user_a,
GREATEST(from_user_id, to_user_id) AS user_b,
SUM(mention_count) AS mention_count_window,
SUM(interaction_score) AS interaction_score_window
FROM t_social_edges_daily
WHERE group_id = %s
AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY LEAST(from_user_id, to_user_id), GREATEST(from_user_id, to_user_id)
ORDER BY interaction_score_window DESC, mention_count_window DESC
LIMIT %s
"""
return self.execute_query(sql, (group_id, social_window_days, limit)) or []
def get_social_bridge_rankings(self, group_id: str, social_window_days: int, limit: int) -> List[Dict[str, Any]]:
"""读取社交桥梁榜。
口径说明:
1. 将有向边转换为“用户-伙伴”的无向视角from 与 to 都算该用户触达);
2. 用去重伙伴数衡量“桥梁覆盖面”,再用互动分做同分排序;
3. 该榜更强调“连接不同人”的能力,而不是单点高频互动。
"""
sql = """
SELECT
user_id,
COUNT(DISTINCT partner_id) AS partner_count,
SUM(mention_count) AS mention_count_window,
SUM(interaction_score) AS interaction_score_window
FROM (
SELECT
from_user_id AS user_id,
to_user_id AS partner_id,
mention_count,
interaction_score
FROM t_social_edges_daily
WHERE group_id = %s
AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
UNION ALL
SELECT
to_user_id AS user_id,
from_user_id AS partner_id,
mention_count,
interaction_score
FROM t_social_edges_daily
WHERE group_id = %s
AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
) t
GROUP BY user_id
ORDER BY partner_count DESC, interaction_score_window DESC, mention_count_window DESC
LIMIT %s
"""
return self.execute_query(
sql,
(group_id, social_window_days, group_id, social_window_days, limit),
) or []
def get_user_score_trend(self, group_id: str, user_id: str, days: int) -> List[Dict[str, Any]]:
"""读取用户近 N 天身价趋势(按日期升序)。"""
sql = """
SELECT stat_date, score, rank_no
FROM t_value_rank_snapshot
WHERE group_id = %s
AND user_id = %s
AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
ORDER BY stat_date ASC
"""
return self.execute_query(sql, (group_id, user_id, days)) or []
def get_social_edges_for_graph(self, group_id: str, social_window_days: int, limit: int) -> List[Dict[str, Any]]:
"""读取用于绘制社交关系图的边数据。"""
sql = """
SELECT
from_user_id,
to_user_id,
SUM(mention_count) AS mention_count_window,
SUM(interaction_score) AS interaction_score_window
FROM t_social_edges_daily
WHERE group_id = %s
AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY from_user_id, to_user_id
ORDER BY interaction_score_window DESC, mention_count_window DESC
LIMIT %s
"""
return self.execute_query(sql, (group_id, social_window_days, limit)) or []
def get_snapshot_score_map(self, stat_date: str, group_id: str) -> Dict[str, float]:
"""读取某天群内所有用户分数字典:{user_id: score}。"""
sql = """
SELECT user_id, score
FROM t_value_rank_snapshot
WHERE stat_date = %s
AND group_id = %s
"""
rows = self.execute_query(sql, (stat_date, group_id)) or []
result: Dict[str, float] = {}
for row in rows:
user_id = str(row.get("user_id") or "").strip()
if not user_id:
continue
result[user_id] = float(row.get("score") or 0.0)
return result
def get_pending_mention_extract_messages_for_group(
self,
group_id: str,
limit: int,
window_start_minutes: int,
window_end_minutes: int,
) -> List[Dict[str, Any]]:
"""按群读取待处理@抽取消息。"""
sql = """
SELECT message_id, group_id, sender, message_xml, timestamp
FROM messages
WHERE group_id = %s
AND (mentioned_user_ids IS NULL OR mentioned_user_ids = '')
AND message_xml IS NOT NULL
AND message_xml <> ''
AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE)
AND timestamp < DATE_SUB(NOW(), INTERVAL %s MINUTE)
ORDER BY timestamp ASC
LIMIT %s
"""
return self.execute_query(sql, (group_id, window_start_minutes, window_end_minutes, limit)) or []
def update_message_mentioned_user_ids(
self,
message_id: str,
group_id: str,
sender_id: str,
mentioned_user_ids_json: str,
) -> bool:
"""回填消息表的 mentioned_user_ids 字段。"""
return self.execute_update(
"""
UPDATE messages
SET mentioned_user_ids = %s
WHERE message_id = %s
AND group_id = %s
AND sender = %s
""",
(mentioned_user_ids_json, message_id, group_id, sender_id),
)
def get_existing_mentions(self, message_id: str, group_id: str, sender_id: str) -> List[str]:
"""查询某条消息已经入库的@关系,避免重复累加。"""
rows = self.execute_query(
"""
SELECT mentioned_user_id
FROM t_message_mentions
WHERE message_id = %s
AND group_id = %s
AND sender_id = %s
""",
(message_id, group_id, sender_id),
) or []
return [str(r.get("mentioned_user_id") or "").strip() for r in rows if str(r.get("mentioned_user_id") or "").strip()]
def insert_message_mentions(self, rows: List[Tuple[Any, ...]]) -> bool:
"""批量写入@明细。"""
if not rows:
return True
return self.execute_batch(
"""
INSERT IGNORE INTO t_message_mentions
(message_id, group_id, sender_id, mentioned_user_id, stat_date, msg_time)
VALUES (%s, %s, %s, %s, %s, %s)
""",
rows,
)
def upsert_social_edges_daily(self, rows: List[Tuple[Any, ...]]) -> bool:
"""批量累加社交边。"""
if not rows:
return True
return self.execute_batch(
"""
INSERT INTO t_social_edges_daily
(stat_date, group_id, from_user_id, to_user_id, mention_count, interaction_score)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
mention_count = mention_count + VALUES(mention_count),
interaction_score = interaction_score + VALUES(interaction_score),
update_time = CURRENT_TIMESTAMP
""",
rows,
)
def upsert_social_daily_row(self, row: Tuple[Any, ...]) -> bool:
"""写入或更新单个用户的社交日汇总。"""
return self.execute_update(
"""
INSERT INTO t_value_rank_social_daily
(stat_date, group_id, user_id, mentioned_count, mention_others_count, unique_interactors, interaction_score)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
mentioned_count = mentioned_count + VALUES(mentioned_count),
mention_others_count = mention_others_count + VALUES(mention_others_count),
interaction_score = interaction_score + VALUES(interaction_score),
update_time = CURRENT_TIMESTAMP
""",
row,
)
def refresh_unique_interactors(self, stat_date: str, group_id: str, user_ids: List[str]) -> None:
"""回填去重互动人数。"""
deduped = []
seen = set()
for uid in user_ids:
normalized = str(uid or "").strip()
if not normalized or normalized in seen:
continue
seen.add(normalized)
deduped.append(normalized)
for uid in deduped:
row = self.execute_query(
"""
SELECT COUNT(DISTINCT partner_id) AS partner_count
FROM (
SELECT mentioned_user_id AS partner_id
FROM t_message_mentions
WHERE stat_date = %s AND group_id = %s AND sender_id = %s
UNION
SELECT sender_id AS partner_id
FROM t_message_mentions
WHERE stat_date = %s AND group_id = %s AND mentioned_user_id = %s
) t
""",
(stat_date, group_id, uid, stat_date, group_id, uid),
fetch_one=True,
) or {}
partner_count = int(row.get("partner_count") or 0)
self.execute_update(
"""
UPDATE t_value_rank_social_daily
SET unique_interactors = %s, update_time = CURRENT_TIMESTAMP
WHERE stat_date = %s AND group_id = %s AND user_id = %s
""",
(partner_count, stat_date, group_id, uid),
)
class ValueRankPlugin(MessagePluginInterface):
"""群成员身价排行插件。
设计目标:
1. 将“积分、发言、活跃、社交影响力”统一折算为可解释的身价分;
2. 支持手动查询与后台定时重算;
3. 通过快照持久化,支持趋势分析和涨跌说明。
"""
FEATURE_KEY = "VALUE_RANK"
FEATURE_DESCRIPTION = "📊 身价排行 [我的身价, 身价排行, 社交热度榜, 搭子榜, 社交桥梁榜, 社交关系图, 我的趋势, 身价周报, 身价说明, 重算身价]"
@property
def name(self) -> str:
return "身价排行"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "根据积分、发言、活跃与社交影响力计算群成员身价。"
@property
def author(self) -> str:
return "ABOT Team"
@property
def command_prefix(self) -> Optional[str]:
return ""
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.feature = self.register_feature()
self.db: Optional[ValueRankDB] = None
# 配置默认值:即使未配置 config.toml也能以保守参数运行。
self.enable = True
self._commands = ["我的身价", "身价排行", "社交热度榜", "搭子榜", "社交桥梁榜", "社交关系图", "我的趋势", "身价周报", "身价说明", "重算身价"]
self.command_format = (
"我的身价 | 身价排行 [名次] | 社交热度榜 [名次] | 搭子榜 [名次] | "
"社交桥梁榜 [名次] | 社交关系图 [人数] | 我的趋势 [天数] | 身价周报 | 身价说明 | 重算身价"
)
self.message_window_days = 7
self.active_window_days = 30
self.social_window_days = 7
self.points_weight = 0.30
self.message_weight = 0.35
self.active_days_weight = 0.20
self.social_weight = 0.15
self.inactivity_penalty_max = 150
self.base_score_scale = 1000
self.default_rank_limit = 10
self.max_rank_limit = 50
# 社交关系图默认改成“尽量全量展示”:
# 1. 用户希望图上不再限制人数;
# 2. 这里用 0 表示“不截断”,后续解析层统一识别;
# 3. 如果用户手动传了人数参数,仍然允许看局部子图。
self.default_graph_nodes = 0
self.max_graph_nodes = 0
self.graph_edge_pool_limit = 300
self.social_graph_template_path = "plugins/value_rank/templates/social_graph.html"
self.text_auto_revoke_seconds = 80
self.default_trend_days = 7
self.max_trend_days = 30
self.mention_batch_size = 200
self.mention_window_start_minutes = 20
self.mention_window_end_minutes = 10
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件与配置。"""
self.LOG = logger
cfg = self._config.get("ValueRank", {})
self.enable = bool(cfg.get("enable", True))
self._commands = cfg.get("command", self._commands)
self.command_format = cfg.get("command-format", self.command_format)
self.message_window_days = int(cfg.get("message_window_days", self.message_window_days))
self.active_window_days = int(cfg.get("active_window_days", self.active_window_days))
self.social_window_days = int(cfg.get("social_window_days", self.social_window_days))
self.points_weight = float(cfg.get("points_weight", self.points_weight))
self.message_weight = float(cfg.get("message_weight", self.message_weight))
self.active_days_weight = float(cfg.get("active_days_weight", self.active_days_weight))
self.social_weight = float(cfg.get("social_weight", self.social_weight))
self.inactivity_penalty_max = float(cfg.get("inactivity_penalty_max", self.inactivity_penalty_max))
self.base_score_scale = float(cfg.get("base_score_scale", self.base_score_scale))
self.default_rank_limit = int(cfg.get("default_rank_limit", self.default_rank_limit))
self.max_rank_limit = int(cfg.get("max_rank_limit", self.max_rank_limit))
self.default_graph_nodes = int(cfg.get("default_graph_nodes", self.default_graph_nodes))
self.max_graph_nodes = int(cfg.get("max_graph_nodes", self.max_graph_nodes))
self.graph_edge_pool_limit = int(cfg.get("graph_edge_pool_limit", self.graph_edge_pool_limit))
self.social_graph_template_path = str(
cfg.get("social_graph_template_path", self.social_graph_template_path)
).strip()
self.text_auto_revoke_seconds = int(cfg.get("text_auto_revoke_seconds", self.text_auto_revoke_seconds))
self.default_trend_days = int(cfg.get("default_trend_days", self.default_trend_days))
self.max_trend_days = int(cfg.get("max_trend_days", self.max_trend_days))
self.mention_batch_size = int(cfg.get("mention_batch_size", self.mention_batch_size))
self.mention_window_start_minutes = int(cfg.get("mention_window_start_minutes", self.mention_window_start_minutes))
self.mention_window_end_minutes = int(cfg.get("mention_window_end_minutes", self.mention_window_end_minutes))
# 权重归一化:避免配置误差导致总权重不为 1。
weight_sum = self.points_weight + self.message_weight + self.active_days_weight + self.social_weight
if weight_sum <= 0:
self.points_weight, self.message_weight, self.active_days_weight, self.social_weight = 0.30, 0.35, 0.20, 0.15
else:
self.points_weight /= weight_sum
self.message_weight /= weight_sum
self.active_days_weight /= weight_sum
self.social_weight /= weight_sum
self.db = ValueRankDB(DBConnectionManager.get_instance())
self.LOG.info(f"[{self.name}] 初始化完成,命令: {self._commands}")
return True
def start(self) -> bool:
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
if not self.enable:
return False
content = str(message.get("content", "")).strip()
if not content:
return False
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="身价排行")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理用户命令入口。"""
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot = message.get("bot")
revoke: MessageAutoRevoke = message.get("revoke")
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
if not roomid:
await self._send_text_with_auto_revoke(bot, sender, "该功能仅支持群聊使用。", sender, revoke)
return True, "非群聊"
if command == "我的身价":
text = await self._build_my_value_text(roomid, sender)
await self._send_text_with_auto_revoke(bot, roomid, text, sender, revoke)
return True, "查询成功"
if command == "身价排行":
limit = self._parse_rank_limit(content)
text = await self._build_ranking_text(roomid, limit)
await self._send_text_with_auto_revoke(bot, roomid, text, sender, revoke)
return True, "查询成功"
if command == "社交热度榜":
limit = self._parse_rank_limit(content)
text = await self._build_social_hot_text(roomid, limit)
await self._send_text_with_auto_revoke(bot, roomid, text, sender, revoke)
return True, "查询成功"
if command == "搭子榜":
limit = self._parse_rank_limit(content)
text = await self._build_partner_pairs_text(roomid, limit)
await self._send_text_with_auto_revoke(bot, roomid, text, sender, revoke)
return True, "查询成功"
if command == "社交桥梁榜":
limit = self._parse_rank_limit(content)
text = await self._build_social_bridge_text(roomid, limit)
await self._send_text_with_auto_revoke(bot, roomid, text, sender, revoke)
return True, "查询成功"
if command == "社交关系图":
graph_nodes = self._parse_graph_nodes(content)
image_path = await self._render_social_graph_image(roomid, graph_nodes)
if image_path:
await bot.send_image_message(roomid, Path(image_path))
return True, "查询成功"
await self._send_text_with_auto_revoke(bot, roomid, "📊 近期社交关系数据不足,暂时无法绘制关系图。", sender, revoke)
return True, "数据不足"
if command == "我的趋势":
days = self._parse_trend_days(content)
text = await self._build_my_trend_text(roomid, sender, days)
await self._send_text_with_auto_revoke(bot, roomid, text, sender, revoke)
return True, "查询成功"
if command == "身价周报":
text = await self._build_weekly_report_text(roomid)
await self._send_text_with_auto_revoke(bot, roomid, text, sender, revoke)
return True, "查询成功"
if command == "身价说明":
await self._send_text_with_auto_revoke(bot, roomid, self._build_explain_text(), sender, revoke)
return True, "查询成功"
if command == "重算身价":
# 重算属于管理动作,只允许机器人管理员或群管理员执行,避免被滥用。
if not GroupBotManager.is_admin_for_group(sender, roomid):
await self._send_text_with_auto_revoke(bot, roomid, "仅管理员可执行重算身价。", sender, revoke)
return True, "权限不足"
stat_date = datetime.now().strftime("%Y-%m-%d")
user_count = self._recompute_group_snapshot(roomid, stat_date)
await self._send_text_with_auto_revoke(bot, roomid, f"✅ 重算完成,已更新 {user_count} 名成员。", sender, revoke)
return True, "重算成功"
await self._send_text_with_auto_revoke(bot, roomid, f"❌命令格式错误\n{self.command_format}", sender, revoke)
return True, "命令错误"
async def _send_text_with_auto_revoke(
self,
bot: Any,
target: str,
text: str,
sender: str,
revoke: Optional[MessageAutoRevoke],
) -> None:
"""发送文本并自动撤回(仅文本生效,图片不走该逻辑)。
说明:
1. 业务要求统一文本自动撤回,默认 80 秒;
2. 若当前上下文无 revoke 能力,则仅发送不撤回,避免影响主流程;
3. 撤回异常不抛出,防止影响消息主链路。
"""
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, text, sender)
if not revoke:
return
try:
revoke.add_message_to_revoke(
target,
client_msg_id,
create_time,
new_msg_id,
max(1, int(self.text_auto_revoke_seconds)),
)
except Exception as e:
self.LOG.warning(f"[{self.name}] 文本自动撤回登记失败: target={target}, error={e}")
def get_schedule_actions(self) -> List[Dict[str, Any]]:
"""声明可调度动作:每日重算 + 每周周报。"""
return [
{
"action_key": "value_rank_daily_recompute",
"name": "身价每日重算",
"description": "每天凌晨重算群成员身价快照",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["04:00"]},
"target_scope": "all_enabled_groups",
"target_config": {},
"payload": {},
"default_enabled": True,
},
{
"action_key": "value_rank_mentions_extract",
"name": "@关系批处理",
"description": "每10分钟批量抽取10-20分钟前的@关系并更新社交图",
"trigger_type": "every_seconds",
"trigger_config": {"seconds": 600},
"target_scope": "all_enabled_groups",
"target_config": {},
"payload": {},
"default_enabled": True,
},
{
"action_key": "value_rank_weekly_report_push",
"name": "身价周报推送",
"description": "每周推送身价周报(上升榜/下降榜/综合排行)",
"trigger_type": "every_week_time",
# weekday 取值与 datetime.weekday() 一致:周一=0 ... 周日=6。
"trigger_config": {"weekday": 0, "time_str": "09:35"},
"target_scope": "all_enabled_groups",
"target_config": {},
"payload": {},
"default_enabled": True,
},
]
async def run_scheduled_action(self, action_key: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""执行调度动作。"""
if action_key not in {"value_rank_daily_recompute", "value_rank_weekly_report_push", "value_rank_mentions_extract"}:
return {"success": False, "summary": f"不支持动作: {action_key}", "detail": {}}
target_groups = [str(g).strip() for g in (context.get("target_groups") or []) if str(g).strip()]
if not target_groups:
target_groups = [
gid for gid in GroupBotManager.get_group_list()
if GroupBotManager.get_group_permission(gid, self.feature).value == "enabled"
]
success_groups: List[str] = []
failed_groups: Dict[str, str] = {}
updated_users = 0
pushed_groups: List[str] = []
stat_date = datetime.now().strftime("%Y-%m-%d")
bot = context.get("bot") or getattr(self, "bot", None)
# @抽取任务不依赖 bot。
if action_key == "value_rank_mentions_extract":
total_stats = {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0}
for gid in target_groups:
try:
stats = self._process_pending_mentions_for_group(gid)
total_stats["total"] += int(stats.get("total", 0))
total_stats["processed"] += int(stats.get("processed", 0))
total_stats["with_mentions"] += int(stats.get("with_mentions", 0))
total_stats["failed"] += int(stats.get("failed", 0))
success_groups.append(gid)
except Exception as e:
failed_groups[gid] = str(e)
return {
"success": len(failed_groups) == 0,
"summary": (
f"@关系批处理完成: 读取{total_stats['total']}条, "
f"处理{total_stats['processed']}条, 含@{total_stats['with_mentions']}条, "
f"失败{total_stats['failed']}条, 异常群{len(failed_groups)}"
),
"detail": {
"window": f"[NOW-{self.mention_window_start_minutes}m, NOW-{self.mention_window_end_minutes}m)",
"batch_size": self.mention_batch_size,
"stats": total_stats,
"failed_groups": failed_groups,
},
}
# 周报任务先确保当日快照存在,再执行推送,避免“有报表命令但无数据”。
if action_key == "value_rank_weekly_report_push" and not bot:
return {"success": False, "summary": "周报推送失败bot 未注入", "detail": {}}
for gid in target_groups:
try:
# 每个群都先重算一次,确保报告与排行数据是“当天最新口径”。
updated_users += self._recompute_group_snapshot(gid, stat_date)
if action_key == "value_rank_weekly_report_push":
weekly_text = await self._build_weekly_report_text(gid)
await bot.send_text_message(gid, weekly_text, "")
pushed_groups.append(gid)
success_groups.append(gid)
except Exception as e:
failed_groups[gid] = str(e)
if action_key == "value_rank_weekly_report_push":
return {
"success": len(failed_groups) == 0,
"summary": f"身价周报完成:推送{len(pushed_groups)}群,失败{len(failed_groups)}",
"detail": {
"stat_date": stat_date,
"updated_users": updated_users,
"pushed_groups": pushed_groups,
"failed_groups": failed_groups,
},
}
return {
"success": len(failed_groups) == 0,
"summary": f"身价重算完成:成功{len(success_groups)}群,失败{len(failed_groups)}",
"detail": {
"stat_date": stat_date,
"updated_users": updated_users,
"success_groups": success_groups,
"failed_groups": failed_groups,
},
}
def _recompute_group_snapshot(self, group_id: str, stat_date: str) -> int:
"""重算单群指定日期快照。
这是插件核心逻辑:
1. 聚合各维度原始指标;
2. 归一化并计算分数;
3. 生成排名与称号;
4. 持久化到快照表。
"""
if not self.db:
return 0
candidates = self.db.get_candidate_users(group_id, self.active_window_days, self.social_window_days)
if not candidates:
return 0
points_map = self.db.get_points_map(group_id)
msg_map = self.db.get_message_stats_map(group_id, self.message_window_days, self.active_window_days)
last_active_map = self.db.get_last_active_date_map(group_id)
social_map = self.db.get_social_stats_map(group_id, self.social_window_days)
# 组装原始指标,后续统一归一化处理。
metrics: List[Dict[str, Any]] = []
for user_id in candidates:
points_total = int(points_map.get(user_id, 0))
msg_stats = msg_map.get(user_id, {})
social_stats = social_map.get(user_id, {})
msg_count_7d = int(msg_stats.get("msg_count_window", 0))
active_days_30 = int(msg_stats.get("active_days_window", 0))
interaction_score_7d = float(social_stats.get("interaction_score_window", 0.0))
# inactive_days 优先使用全量最后发言日期,若无记录视为长期未活跃。
inactive_days = 365
last_active_date = last_active_map.get(user_id)
if last_active_date:
try:
if isinstance(last_active_date, datetime):
dt = last_active_date
else:
dt = datetime.strptime(str(last_active_date), "%Y-%m-%d")
inactive_days = max((datetime.now().date() - dt.date()).days, 0)
except Exception:
inactive_days = 365
metrics.append(
{
"user_id": user_id,
"points_total": points_total,
"msg_count_7d": msg_count_7d,
"active_days_30": active_days_30,
"interaction_score_7d": interaction_score_7d,
"inactive_days": inactive_days,
}
)
if not metrics:
return 0
# 计算分位阈值,避免极端值主导。
p95_points = self._percentile95([m["points_total"] for m in metrics])
p95_msg = self._percentile95([m["msg_count_7d"] for m in metrics])
p95_social = self._percentile95([m["interaction_score_7d"] for m in metrics])
scored_rows: List[Dict[str, Any]] = []
for m in metrics:
p_norm = self._normalize_log(m["points_total"], p95_points)
m_norm = self._normalize_linear(m["msg_count_7d"], p95_msg)
a_norm = self._normalize_linear(m["active_days_30"], self.active_window_days)
s_norm = self._normalize_linear(m["interaction_score_7d"], p95_social)
i_penalty = self._normalize_linear(m["inactive_days"], 30)
base_score = self.base_score_scale * (
self.points_weight * p_norm
+ self.message_weight * m_norm
+ self.active_days_weight * a_norm
+ self.social_weight * s_norm
)
penalty_score = self.inactivity_penalty_max * i_penalty
final_score = max(round(base_score - penalty_score, 2), 0.0)
scored_rows.append(
{
**m,
"score": final_score,
"p_norm": p_norm,
"m_norm": m_norm,
"a_norm": a_norm,
"s_norm": s_norm,
"penalty": penalty_score,
}
)
# 分数高到低排序,同分时按活跃和积分补充排序,提升稳定性。
scored_rows.sort(
key=lambda x: (
-float(x["score"]),
-int(x["msg_count_7d"]),
-float(x["interaction_score_7d"]),
-int(x["points_total"]),
)
)
total = len(scored_rows)
batch_rows: List[Tuple[Any, ...]] = []
for idx, row in enumerate(scored_rows, start=1):
title = self._build_title(idx, total, int(row["inactive_days"]))
score_detail = {
"points_norm": round(float(row["p_norm"]), 6),
"message_norm": round(float(row["m_norm"]), 6),
"active_norm": round(float(row["a_norm"]), 6),
"social_norm": round(float(row["s_norm"]), 6),
"penalty_score": round(float(row["penalty"]), 2),
"weights": {
"points": self.points_weight,
"message": self.message_weight,
"active": self.active_days_weight,
"social": self.social_weight,
},
}
batch_rows.append(
(
stat_date,
group_id,
row["user_id"],
row["score"],
idx,
title,
row["points_total"],
row["msg_count_7d"],
row["active_days_30"],
row["inactive_days"],
json.dumps(score_detail, ensure_ascii=False),
)
)
self.db.upsert_snapshots(batch_rows)
return len(batch_rows)
async def _build_my_value_text(self, group_id: str, user_id: str) -> str:
"""构建“我的身价”输出文本。"""
if not self.db:
return "❌ 身价模块未初始化"
stat_date = datetime.now().strftime("%Y-%m-%d")
row = self.db.get_user_snapshot(stat_date, group_id, user_id)
# 若当天还没有快照,按需触发一次当前群重算,保证命令可用性。
if not row:
self._recompute_group_snapshot(group_id, stat_date)
row = self.db.get_user_snapshot(stat_date, group_id, user_id)
if not row:
return "📊 暂无你的身价数据,请先在群里发言后再试。"
yesterday_score = self.db.get_yesterday_score(stat_date, group_id, user_id)
current_score = float(row.get("score") or 0)
if yesterday_score is None:
change_text = "新上榜(暂无昨日对比)"
else:
delta = current_score - float(yesterday_score)
if abs(yesterday_score) < 1e-9:
change_text = f"较昨日变化:{delta:+.2f}"
else:
pct = delta / float(yesterday_score) * 100
change_text = f"较昨日:{pct:+.2f}%"
nick = ContactManager.get_instance().get_group_name(group_id, user_id) or user_id
lines = [
f"📊 {nick} 的身价报告({stat_date}",
f"总身价:{current_score:.2f}",
f"群内排名:第 {int(row.get('rank_no') or 0)} 名({row.get('title') or '未定级'}",
"",
f"💰 积分资产:{int(row.get('points_total') or 0)}",
f"🗣️ 7日发言{int(row.get('msg_count_7d') or 0)}",
f"📅 30日活跃{int(row.get('active_days_30') or 0)}",
f"🌊 潜水天数:{int(row.get('inactive_days') or 0)}",
"",
change_text,
]
return "\n".join(lines)
async def _build_ranking_text(self, group_id: str, limit: int) -> str:
"""构建“身价排行”输出文本。"""
if not self.db:
return "❌ 身价模块未初始化"
stat_date = datetime.now().strftime("%Y-%m-%d")
rows = self.db.get_today_rankings(stat_date, group_id, limit)
if not rows:
self._recompute_group_snapshot(group_id, stat_date)
rows = self.db.get_today_rankings(stat_date, group_id, limit)
if not rows:
return "📊 今日暂无排行数据。"
cm = ContactManager.get_instance()
lines = [f"🏆 身价排行榜Top{len(rows)} | {stat_date}"]
for idx, row in enumerate(rows, start=1):
user_id = str(row.get("user_id") or "")
score = float(row.get("score") or 0)
title = str(row.get("title") or "")
nick = cm.get_group_name(group_id, user_id) or user_id
medal = "🥇" if idx == 1 else "🥈" if idx == 2 else "🥉" if idx == 3 else f"{idx}."
lines.append(f"{medal} {nick} | {score:.2f} | {title}")
return "\n".join(lines)
async def _build_social_hot_text(self, group_id: str, limit: int) -> str:
"""构建“社交热度榜”输出文本。"""
if not self.db:
return "❌ 身价模块未初始化"
rows = self.db.get_social_hot_rankings(group_id, self.social_window_days, limit)
if not rows:
return "📊 近期暂无社交热度数据。"
cm = ContactManager.get_instance()
lines = [f"🔥 社交热度榜(近{self.social_window_days}天 Top{len(rows)}"]
for idx, row in enumerate(rows, start=1):
user_id = str(row.get("user_id") or "")
score = float(row.get("interaction_score_window") or 0.0)
mentioned_count = int(row.get("mentioned_count_window") or 0)
mention_others_count = int(row.get("mention_others_count_window") or 0)
nick = cm.get_group_name(group_id, user_id) or user_id
medal = "🥇" if idx == 1 else "🥈" if idx == 2 else "🥉" if idx == 3 else f"{idx}."
lines.append(
f"{medal} {nick} | 热度{score:.1f} | 被@{mentioned_count} | 主动@{mention_others_count}"
)
return "\n".join(lines)
async def _build_partner_pairs_text(self, group_id: str, limit: int) -> str:
"""构建“搭子榜”输出文本。"""
if not self.db:
return "❌ 身价模块未初始化"
rows = self.db.get_top_partner_pairs(group_id, self.social_window_days, limit)
if not rows:
return "📊 近期暂无搭子关系数据。"
cm = ContactManager.get_instance()
lines = [f"🤝 搭子榜(近{self.social_window_days}天 Top{len(rows)}"]
for idx, row in enumerate(rows, start=1):
user_a = str(row.get("user_a") or "")
user_b = str(row.get("user_b") or "")
nick_a = cm.get_group_name(group_id, user_a) or user_a
nick_b = cm.get_group_name(group_id, user_b) or user_b
mention_count = int(row.get("mention_count_window") or 0)
score = float(row.get("interaction_score_window") or 0.0)
medal = "🥇" if idx == 1 else "🥈" if idx == 2 else "🥉" if idx == 3 else f"{idx}."
lines.append(f"{medal} {nick_a} × {nick_b} | 互动{score:.1f} | @次数{mention_count}")
return "\n".join(lines)
async def _build_social_bridge_text(self, group_id: str, limit: int) -> str:
"""构建“社交桥梁榜”输出文本。
说明:
1. 伙伴数越高,说明该成员连接的人越广;
2. 同伙伴数时按互动分排序,避免“浅连接”占优。
"""
if not self.db:
return "❌ 身价模块未初始化"
rows = self.db.get_social_bridge_rankings(group_id, self.social_window_days, limit)
if not rows:
return "📊 近期暂无社交桥梁数据。"
cm = ContactManager.get_instance()
lines = [f"🌉 社交桥梁榜(近{self.social_window_days}天 Top{len(rows)}"]
for idx, row in enumerate(rows, start=1):
user_id = str(row.get("user_id") or "")
partner_count = int(row.get("partner_count") or 0)
mention_count = int(row.get("mention_count_window") or 0)
score = float(row.get("interaction_score_window") or 0.0)
nick = cm.get_group_name(group_id, user_id) or user_id
medal = "🥇" if idx == 1 else "🥈" if idx == 2 else "🥉" if idx == 3 else f"{idx}."
lines.append(f"{medal} {nick} | 连接{partner_count}人 | 互动{score:.1f} | 触达{mention_count}")
return "\n".join(lines)
async def _build_my_trend_text(self, group_id: str, user_id: str, days: int) -> str:
"""构建“我的趋势”输出文本。"""
if not self.db:
return "❌ 身价模块未初始化"
# 先保障今日快照存在,避免趋势末尾为空导致用户误解“今天没分数”。
today = datetime.now().strftime("%Y-%m-%d")
today_row = self.db.get_user_snapshot(today, group_id, user_id)
if not today_row:
self._recompute_group_snapshot(group_id, today)
trend_rows = self.db.get_user_score_trend(group_id, user_id, days)
if not trend_rows:
return "📊 暂无你的趋势数据,请先在群内产生消息后再试。"
cm = ContactManager.get_instance()
nick = cm.get_group_name(group_id, user_id) or user_id
first_score = float(trend_rows[0].get("score") or 0.0)
last_score = float(trend_rows[-1].get("score") or 0.0)
delta = round(last_score - first_score, 2)
trend_flag = "上涨" if delta > 0 else "下滑" if delta < 0 else "持平"
lines = [f"📉 {nick} 的身价趋势(近{days}天)"]
for row in trend_rows:
stat_date = str(row.get("stat_date") or "")
score = float(row.get("score") or 0.0)
rank_no = int(row.get("rank_no") or 0)
lines.append(f"- {stat_date}: {score:.2f}(第{rank_no}名)")
lines.append("")
lines.append(f"趋势结论:{trend_flag} {delta:+.2f}")
return "\n".join(lines)
async def _render_social_graph_image(self, group_id: str, max_nodes: int) -> Optional[str]:
"""渲染群友社交关系图并返回图片路径。
设计说明:
1. 数据层使用近窗口期社交边聚合结果,避免扫全量消息;
2. 先做无向边合并,再按“连接度高优先”构建分层图,让核心节点更靠近中心;
3. 通过 markdown2image 的 HTML 截图能力输出图片,统一渲染链路。
"""
if not self.db:
return None
edge_rows = self.db.get_social_edges_for_graph(
group_id=group_id,
social_window_days=self.social_window_days,
limit=max(20, int(self.graph_edge_pool_limit)),
)
if not edge_rows:
return None
# 先将有向边规整成无向边,防止 A->B 和 B->A 分裂成两条线导致视觉噪声。
edge_map: Dict[Tuple[str, str], Dict[str, float]] = {}
partner_map: Dict[str, set] = {}
node_score_map: Dict[str, float] = {}
for row in edge_rows:
uid_a = str(row.get("from_user_id") or "").strip()
uid_b = str(row.get("to_user_id") or "").strip()
if not uid_a or not uid_b or uid_a == uid_b:
continue
a, b = (uid_a, uid_b) if uid_a < uid_b else (uid_b, uid_a)
pair_key = (a, b)
if pair_key not in edge_map:
edge_map[pair_key] = {"mention_count": 0.0, "score": 0.0}
edge_map[pair_key]["mention_count"] += float(row.get("mention_count_window") or 0.0)
edge_map[pair_key]["score"] += float(row.get("interaction_score_window") or 0.0)
partner_map.setdefault(a, set()).add(b)
partner_map.setdefault(b, set()).add(a)
node_score_map[a] = float(node_score_map.get(a, 0.0)) + float(row.get("interaction_score_window") or 0.0)
node_score_map[b] = float(node_score_map.get(b, 0.0)) + float(row.get("interaction_score_window") or 0.0)
if not edge_map:
return None
# 使用“伙伴去重数 + 互动分”混合排序:
# 1. 连接人数越多,说明越像群里的桥梁节点;
# 2. 同连接数下再用互动分排序,避免只看“浅连接”;
# 3. 后续布局时也复用这份排序,让高连接节点自然更靠近中心。
sorted_nodes = sorted(
list(partner_map.keys()),
key=lambda uid: (
-len(partner_map.get(uid, set())),
-float(node_score_map.get(uid, 0.0)),
uid,
),
)
# 不再对关系图人数做硬截断:
# 1. 用户明确希望“不要限制图上人数”;
# 2. max_nodes 现在只作为“用户请求值”,当传 0 或很大时都允许尽量展示全量;
# 3. 若请求人数小于全量,则仍尊重用户显式指定,方便只看局部图。
if max_nodes > 0:
selected_nodes = sorted_nodes[:max_nodes]
else:
selected_nodes = sorted_nodes
selected_set = set(selected_nodes)
if len(selected_nodes) < 2:
return None
# 记录每条无向边对应的双向明细次数:
# pair_dir_count[(a,b)] = {"a_to_b": x, "b_to_a": y}
# 这样在图上可以直接标注“双向@次数”,避免只看总互动分看不出方向关系。
pair_dir_count: Dict[Tuple[str, str], Dict[str, int]] = {}
for row in edge_rows:
uid_a = str(row.get("from_user_id") or "").strip()
uid_b = str(row.get("to_user_id") or "").strip()
if not uid_a or not uid_b or uid_a == uid_b:
continue
a, b = (uid_a, uid_b) if uid_a < uid_b else (uid_b, uid_a)
mention_count = int(float(row.get("mention_count_window") or 0.0))
bucket = pair_dir_count.setdefault((a, b), {"a_to_b": 0, "b_to_a": 0})
if uid_a == a and uid_b == b:
bucket["a_to_b"] += mention_count
else:
bucket["b_to_a"] += mention_count
selected_edges: List[Tuple[str, str, float, float, int, int]] = []
for (a, b), data in edge_map.items():
if a in selected_set and b in selected_set:
dir_bucket = pair_dir_count.get((a, b), {"a_to_b": 0, "b_to_a": 0})
selected_edges.append(
(
a,
b,
float(data.get("mention_count", 0.0)),
float(data.get("score", 0.0)),
int(dir_bucket.get("a_to_b", 0)),
int(dir_bucket.get("b_to_a", 0)),
)
)
if not selected_edges:
return None
# 关系图改为“全员 + 主要边”策略:
# 1. 用户希望不要限制图上的人数,因此节点仍尽量保留全量;
# 2. 真正导致看不清的不是人数,而是边过多、边标签过多;
# 3. 这里对展示边做一层稀疏化,只保留每个人的主要关系和全图最强的若干边。
full_selected_edges = selected_edges
display_edge_cap = max(18, int(len(selected_nodes) * 1.18))
display_edge_cap = min(len(full_selected_edges), display_edge_cap)
if len(full_selected_edges) > display_edge_cap:
core_node_count = max(3, min(6, len(selected_nodes) // 8 + 2))
core_nodes = set(selected_nodes[:core_node_count])
node_degree_cap: Dict[str, int] = {
uid: (6 if uid in core_nodes else 3)
for uid in selected_nodes
}
edge_degree_map: Dict[str, int] = {uid: 0 for uid in selected_nodes}
strongest_edge_by_node: Dict[str, Tuple[str, str, float, float, int, int]] = {}
sorted_edges = sorted(
full_selected_edges,
key=lambda item: (
-float(item[3]),
-float(item[2]),
-min(len(partner_map.get(item[0], set())), len(partner_map.get(item[1], set()))),
item[0],
item[1],
),
)
for edge in sorted_edges:
a, b = edge[0], edge[1]
if a not in strongest_edge_by_node:
strongest_edge_by_node[a] = edge
if b not in strongest_edge_by_node:
strongest_edge_by_node[b] = edge
kept_edge_keys = set()
pruned_edges: List[Tuple[str, str, float, float, int, int]] = []
def try_append_edge(edge_item: Tuple[str, str, float, float, int, int], ignore_degree_cap: bool = False) -> bool:
edge_a, edge_b = edge_item[0], edge_item[1]
edge_key = (edge_a, edge_b)
if edge_key in kept_edge_keys:
return False
if not ignore_degree_cap:
if edge_degree_map.get(edge_a, 0) >= node_degree_cap.get(edge_a, 3):
return False
if edge_degree_map.get(edge_b, 0) >= node_degree_cap.get(edge_b, 3):
return False
kept_edge_keys.add(edge_key)
pruned_edges.append(edge_item)
edge_degree_map[edge_a] = edge_degree_map.get(edge_a, 0) + 1
edge_degree_map[edge_b] = edge_degree_map.get(edge_b, 0) + 1
return True
# 先保证每个人至少能挂上一条自己最强的关系线,避免外围节点变成“孤点”。
for uid in selected_nodes:
edge = strongest_edge_by_node.get(uid)
if edge:
try_append_edge(edge, ignore_degree_cap=True)
# 再按全图强度补满,兼顾整体关系结构,但限制每个节点的展示边数。
for edge in sorted_edges:
if len(pruned_edges) >= display_edge_cap:
break
try_append_edge(edge, ignore_degree_cap=False)
selected_edges = pruned_edges
html_content = self._build_social_graph_html(group_id, selected_nodes, selected_edges, partner_map, node_score_map)
if not html_content:
return None
output_dir = Path("temp") / "value_rank"
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f"social_graph_{group_id}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.png"
try:
await html_to_image(html_content, str(output_path))
except Exception as e:
self.LOG.error(f"[{self.name}] 社交关系图渲染失败: group={group_id}, error={e}")
return None
return str(output_path.resolve()) if output_path.exists() else None
def _build_social_graph_html(
self,
group_id: str,
selected_nodes: List[str],
selected_edges: List[Tuple[str, str, float, float, int, int]],
partner_map: Dict[str, set],
node_score_map: Dict[str, float],
) -> str:
"""基于模板文件构建社交关系图 HTML含 SVG 节点和边)。"""
if not selected_nodes:
return ""
import html
width = 1540
height = 1120
cx, cy = width // 2, height // 2 + 24
node_count = len(selected_nodes)
max_edge_score = max([edge[3] for edge in selected_edges] + [1.0])
max_partner_count = max([len(partner_map.get(uid, set())) for uid in selected_nodes] + [1])
max_node_score = max([float(node_score_map.get(uid, 0.0)) for uid in selected_nodes] + [1.0])
# 多环分层布局:
# 1. 核心节点(连接人数更多)放在内圈,外围节点逐层向外扩散;
# 2. 每圈容量递增,避免所有人都挤在一个圆上;
# 3. 外圈增加轻微抖动和椭圆拉伸,让大图下更分散,不会像等分钟表一样拥挤。
inner_margin = 188.0
outer_margin_x = 108.0
outer_margin_y = 102.0
max_rx = width * 0.5 - outer_margin_x
max_ry = height * 0.5 - outer_margin_y
ring_capacities: List[int] = []
placed = 0
ring_index = 0
while placed < node_count:
if ring_index == 0:
capacity = min(3, node_count)
elif ring_index == 1:
capacity = 7
else:
capacity = 12 + ring_index * 6
ring_capacities.append(capacity)
placed += capacity
ring_index += 1
ring_count = len(ring_capacities)
if ring_count <= 1:
ring_radii = [(inner_margin, inner_margin * 0.82)]
else:
ring_radii = []
for idx in range(ring_count):
progress = idx / max(ring_count - 1, 1)
rx = inner_margin + (max_rx - inner_margin) * progress
ry = inner_margin * 0.82 + (max_ry - inner_margin * 0.82) * progress
ring_radii.append((rx, ry))
pos_map: Dict[str, Tuple[float, float]] = {}
node_meta_map: Dict[str, Dict[str, float]] = {}
cursor = 0
for ring_idx, capacity in enumerate(ring_capacities):
ring_nodes = selected_nodes[cursor: cursor + capacity]
cursor += len(ring_nodes)
if not ring_nodes:
continue
rx, ry = ring_radii[min(ring_idx, len(ring_radii) - 1)]
angle_offset = (ring_idx % 2) * (math.pi / max(len(ring_nodes), 3))
for idx, uid in enumerate(ring_nodes):
angle = angle_offset + (2.0 * math.pi * idx) / max(len(ring_nodes), 1)
# 外圈轻微抖动,避免文本和头像在同一环上严格对齐后互相压住。
wobble = 1.0 + (0.03 * ((idx % 3) - 1) if ring_idx >= 1 else 0.0)
x = cx + rx * wobble * math.cos(angle)
y = cy + ry * wobble * math.sin(angle)
pos_map[uid] = (x, y)
node_meta_map[uid] = {
"ring_idx": float(ring_idx),
"angle": angle,
"rx": rx,
"ry": ry,
}
cm = ContactManager.get_instance()
strongest_uid = selected_nodes[0] if selected_nodes else ""
edge_defs_parts: List[str] = []
edge_svg_parts: List[str] = []
# 边数字标签只给“最重要的少数边”:
# 1. 全边标数字时,图面会被 0/1、1/0 这类噪声淹没;
# 2. 这里保留最强的一小部分边标签,其余只看粗细即可;
# 3. 这样既能保留方向信息,也不至于牺牲整体结构可读性。
edge_label_parts: List[str] = []
max_labeled_edges = max(6, min(12, len(selected_nodes) // 4 + 3))
labeled_edge_keys = {
(edge[0], edge[1])
for edge in sorted(
selected_edges,
key=lambda item: (-float(item[3]), -float(item[2]), item[0], item[1]),
)[:max_labeled_edges]
}
def _shorten_graph_nick(raw_name: str, max_len: int = 8) -> str:
text = str(raw_name or "").strip()
return text if len(text) <= max_len else f"{text[:max_len]}"
def _estimate_label_units(raw_text: str) -> float:
units = 0.0
for ch in str(raw_text or ""):
units += 1.0 if ord(ch) < 128 else 1.75
return units
def _build_avatar_data_url(raw_avatar_url: str, wxid: str) -> str:
"""优先读取本地缓存头像并转 data URL减少截图时依赖远端头像可用性。"""
# 社交图渲染发生在 Playwright 截图流程中:
# 1. 如果直接给远端头像 URL链接过期、网络抖动或防盗链都可能导致头像空白
# 2. 这里优先确保头像已缓存到本地,再内联为 data URL截图时最稳定
# 3. 如果本地缓存仍失败,再回退到原始 URL尽量不影响图谱整体生成。
local_avatar_path = str(cm.ensure_head_image_cached(wxid) or "").strip()
if local_avatar_path:
try:
avatar_path = Path(local_avatar_path)
image_bytes = avatar_path.read_bytes()
mime_type = mimetypes.guess_type(str(avatar_path))[0] or "image/jpeg"
base64_str = base64.b64encode(image_bytes).decode("utf-8")
return f"data:{mime_type};base64,{base64_str}"
except Exception as exc:
self.LOG.debug(f"[{self.name}] 头像转 data url 失败: wxid={wxid}, error={exc}")
return str(raw_avatar_url or "").strip()
for edge_idx, (a, b, mention_count, score, a_to_b_count, b_to_a_count) in enumerate(selected_edges, start=1):
ax, ay = pos_map[a]
bx, by = pos_map[b]
normalized = max(0.12, min(score / max_edge_score, 1.0))
stroke_width = 1.35 + 6.5 * normalized
opacity = 0.20 + 0.48 * normalized
edge_gradient_id = f"edge_gradient_{edge_idx}"
start_color = "#50D6FF"
end_color = "#FFBB5C" if (a == strongest_uid or b == strongest_uid) else "#6C92FF"
edge_defs_parts.append(
f'<linearGradient id="{edge_gradient_id}" gradientUnits="userSpaceOnUse" '
f'x1="{ax:.1f}" y1="{ay:.1f}" x2="{bx:.1f}" y2="{by:.1f}">'
f'<stop offset="0%" stop-color="{start_color}" stop-opacity="0.96"></stop>'
f'<stop offset="100%" stop-color="{end_color}" stop-opacity="0.90"></stop>'
f'</linearGradient>'
)
edge_svg_parts.append(
f'<line x1="{ax:.1f}" y1="{ay:.1f}" x2="{bx:.1f}" y2="{by:.1f}" '
f'stroke="url(#{edge_gradient_id})" stroke-opacity="{opacity:.3f}" '
f'stroke-width="{stroke_width:.2f}" stroke-linecap="round" />'
)
if (a, b) not in labeled_edge_keys:
continue
# 通过字典序固定方向,确保同一条边每次渲染文案方向一致。
# 当前标签只保留双向互动数字,同时继续沿法线方向偏移,避免数字压在线条上。
mx, my = (ax + bx) / 2.0, (ay + by) / 2.0
dx, dy = (bx - ax), (by - ay)
edge_len = max(math.hypot(dx, dy), 1.0)
# 线段法线单位向量逆时针旋转90度
nx, ny = (-dy / edge_len), (dx / edge_len)
# 标签改为“沿边平行显示”:
# 1. 旋转到与连线同方向,文本阅读会比水平浮在中点附近更顺;
# 2. 仍沿法线做少量偏移,防止字直接压在线条上;
# 3. 交错放在边两侧,减少多条边平行时的标签重叠。
side_sign = 1.0 if (edge_idx % 2 == 0) else -1.0
offset = max(18.0, 10.0 + stroke_width * 2.0)
label_x = mx + nx * offset * side_sign
label_y = my + ny * offset * side_sign
angle_deg = math.degrees(math.atan2(dy, dx))
if angle_deg > 90 or angle_deg < -90:
angle_deg += 180
label_text = f"{int(a_to_b_count)}/{int(b_to_a_count)}"
safe_label = html.escape(label_text)
label_width = max(50.0, min(88.0, 10.0 * len(label_text) + 20.0))
edge_label_parts.append(
f'<g transform="translate({label_x:.1f},{label_y:.1f}) rotate({angle_deg:.1f})">'
f'<rect x="-{label_width / 2:.1f}" y="-11" width="{label_width:.1f}" height="22" '
f'rx="7" ry="7" fill="rgba(16,38,74,0.68)" stroke="rgba(84,214,255,0.34)" stroke-width="1.0"></rect>'
f'<text x="0" y="4" text-anchor="middle" font-size="11" fill="#EAF7FF">{safe_label}</text>'
f'</g>'
)
# 节点头像层拆分为 defs + body 两段:
# 1. defs 内定义每个节点的裁剪路径,避免头像越界;
# 2. body 里再引用 image/圆环/文案,便于模板层做结构化插槽替换。
node_defs_parts: List[str] = []
node_svg_parts: List[str] = []
for idx, uid in enumerate(selected_nodes, start=1):
x, y = pos_map[uid]
partner_count = len(partner_map.get(uid, set()))
score = float(node_score_map.get(uid, 0.0))
# 节点大小继续体现连接人数,但收一点上限,避免大节点把标签区挤爆。
size_norm = max(0.15, min(partner_count / max_partner_count, 1.0))
score_norm = max(0.10, min(score / max(max_node_score, 1.0), 1.0))
node_radius = 18.0 + 18.0 * size_norm + 6.0 * score_norm
nick = cm.get_group_name(group_id, uid) or uid
display_nick = _shorten_graph_nick(str(nick), 8)
safe_nick = html.escape(display_nick)
avatar_url = _build_avatar_data_url(str(cm.get_head_image(uid) or "").strip(), uid)
ring_idx = int(node_meta_map.get(uid, {}).get("ring_idx", 0))
# 外框颜色按层级区分,而不是统一蓝金:
# 1. 核心层用暖金色,强调中心;
# 2. 第二层用青蓝色,形成科技感主色;
# 3. 外围层逐步过渡到蓝紫与浅蓝,方便一眼识别层次。
if ring_idx == 0:
ring_color = "rgba(255, 176, 58, 0.98)"
orbit_color = "rgba(255, 211, 130, 0.80)"
elif ring_idx == 1:
ring_color = "rgba(69, 212, 255, 0.96)"
orbit_color = "rgba(128, 238, 255, 0.72)"
elif ring_idx == 2:
ring_color = "rgba(93, 142, 255, 0.94)"
orbit_color = "rgba(148, 181, 255, 0.66)"
else:
ring_color = "rgba(145, 150, 255, 0.90)"
orbit_color = "rgba(188, 192, 255, 0.60)"
node_svg_parts.append(
f'<circle cx="{x:.1f}" cy="{y:.1f}" r="{node_radius + 2.8:.1f}" fill="rgba(255,255,255,0.88)" '
f'stroke="{ring_color}" stroke-width="3.4"></circle>'
)
node_svg_parts.append(
f'<circle cx="{x:.1f}" cy="{y:.1f}" r="{node_radius + 5.0:.1f}" fill="none" '
f'stroke="{orbit_color}" stroke-width="1.2" stroke-dasharray="7 5" stroke-linecap="round" opacity="0.78"></circle>'
)
if avatar_url:
# 有头像时,使用 SVG clipPath 裁剪成圆形头像,既美观又保持节点尺寸可变。
safe_avatar_url = html.escape(avatar_url, quote=True)
clip_id = f"avatar_clip_{idx}"
avatar_r = max(node_radius - 1.8, 8.0)
node_defs_parts.append(
f'<clipPath id="{clip_id}"><circle cx="{x:.1f}" cy="{y:.1f}" r="{avatar_r:.1f}" /></clipPath>'
)
node_svg_parts.append(
f'<image href="{safe_avatar_url}" x="{x - avatar_r:.1f}" y="{y - avatar_r:.1f}" '
f'width="{avatar_r * 2:.1f}" height="{avatar_r * 2:.1f}" clip-path="url(#{clip_id})" '
f'preserveAspectRatio="xMidYMid slice"></image>'
)
else:
# 无头像时回退为字符节点,保证图谱渲染完整可用。
node_svg_parts.append(
f'<circle cx="{x:.1f}" cy="{y:.1f}" r="{node_radius - 1.8:.1f}" fill="rgba(255, 193, 7, 0.90)"></circle>'
)
node_svg_parts.append(
f'<text x="{x:.1f}" y="{y + 6:.1f}" text-anchor="middle" '
f'font-size="{max(12, int(node_radius * 0.55))}" fill="#2F3B52" font-weight="700">'
f'{html.escape(str(nick)[:1] or "?")}</text>'
)
# 最强核心节点补一个皇冠标记:
# 1. strongest_uid 取当前排序后的第一名,代表群里最强的桥梁/核心节点;
# 2. 皇冠放在头像上方,视觉上比文字提示更直观;
# 3. 使用简单 SVG 形状,避免依赖字体里的 emoji / 特殊字符。
if uid == strongest_uid:
crown_y = y - node_radius - 16.0
crown_points = [
(x - 15.0, crown_y + 18.0),
(x - 10.0, crown_y + 6.0),
(x - 2.5, crown_y + 14.0),
(x + 0.0, crown_y + 2.0),
(x + 2.5, crown_y + 14.0),
(x + 10.0, crown_y + 6.0),
(x + 15.0, crown_y + 18.0),
]
crown_points_text = " ".join([f"{px:.1f},{py:.1f}" for px, py in crown_points])
node_svg_parts.append(
f'<g>'
f'<rect x="{x - 15.0:.1f}" y="{crown_y + 18.0:.1f}" width="30.0" height="5.5" rx="2.8" ry="2.8" '
f'fill="rgba(255, 183, 3, 0.98)" stroke="rgba(168, 104, 0, 0.85)" stroke-width="1.0"></rect>'
f'<polygon points="{crown_points_text}" fill="rgba(255, 202, 40, 0.98)" '
f'stroke="rgba(168, 104, 0, 0.85)" stroke-width="1.2"></polygon>'
f'<circle cx="{x - 10.0:.1f}" cy="{crown_y + 6.0:.1f}" r="2.2" fill="rgba(255,255,255,0.96)"></circle>'
f'<circle cx="{x + 0.0:.1f}" cy="{crown_y + 2.0:.1f}" r="2.4" fill="rgba(255,255,255,0.96)"></circle>'
f'<circle cx="{x + 10.0:.1f}" cy="{crown_y + 6.0:.1f}" r="2.2" fill="rgba(255,255,255,0.96)"></circle>'
f'</g>'
)
# “连接人数”改成贴在头像上的数字徽标:
# 1. 用户反馈原先那行“连接x · 分数y”混在线条里看不清
# 2. 数字徽标直接挂在节点边缘,读图时更像“这个人连了多少人”;
# 3. 这里再往头像外侧推一点,避免遮住头像主体内容。
badge_text = str(int(partner_count))
badge_font_size = 11 if len(badge_text) <= 2 else 10
badge_width = 16.0 + len(badge_text) * 7.5
badge_height = 20.0
badge_x = x + node_radius * 0.98 - badge_width / 2.0
badge_y = y - node_radius * 0.92 - badge_height / 2.0
node_svg_parts.append(
f'<rect x="{badge_x:.1f}" y="{badge_y:.1f}" width="{badge_width:.1f}" height="{badge_height:.1f}" '
f'rx="10" ry="10" fill="rgba(17, 41, 79, 0.84)" stroke="{ring_color}" stroke-width="1.6"></rect>'
)
node_svg_parts.append(
f'<text x="{badge_x + badge_width / 2.0:.1f}" y="{badge_y + 13.2:.1f}" text-anchor="middle" '
f'font-size="{badge_font_size}" fill="#F4FBFF" font-weight="700">{badge_text}</text>'
)
# 名字固定放在头像正下方,避免沿径向排布带来的“漂移感”:
# 1. 用户反馈名字位置飘忽,说明相对角度布局不利于快速扫图;
# 2. 统一放在头像下方后,读图路径会稳定很多;
# 3. 底板保留,继续提升名字在复杂线条背景上的可读性。
title_x = x
title_y = y + node_radius + 24.0
anchor = "middle"
label_units = _estimate_label_units(display_nick)
label_width = max(44.0, min(156.0, 10.0 * label_units + 18.0))
label_height = 24.0
label_box_x = title_x - label_width / 2.0
label_box_y = title_y - 18.0
node_svg_parts.append(
f'<rect x="{label_box_x:.1f}" y="{label_box_y:.1f}" width="{label_width:.1f}" height="{label_height:.1f}" '
f'rx="12" ry="12" fill="rgba(14, 33, 66, 0.66)" stroke="rgba(84,214,255,0.24)" stroke-width="1.0"></rect>'
)
node_svg_parts.append(
f'<text x="{title_x:.1f}" y="{title_y:.1f}" text-anchor="{anchor}" '
f'font-size="14.5" fill="#F2FAFF" font-weight="700">{safe_nick}</text>'
)
group_title = html.escape(ContactManager.get_instance().get_nickname(group_id) or group_id)
now_text = datetime.now().strftime("%Y-%m-%d %H:%M")
summary_text = (
f"统计窗口:近{self.social_window_days} 节点数:{len(selected_nodes)} "
f"关系边:{len(selected_edges)} 生成时间:{now_text}"
)
# 模板策略说明:
# 1. 本插件只走外部模板文件,便于后续视觉同学直接调整样式;
# 2. 模板缺失或读取失败时直接返回空字符串,让上层按“生成失败”处理;
# 3. 变量替换使用显式占位符,避免与 CSS 花括号冲突。
template_path = Path(self.social_graph_template_path)
if not template_path.is_absolute():
template_path = Path.cwd() / template_path
if not template_path.exists():
self.LOG.error(f"[{self.name}] 社交图模板不存在: {template_path}")
return ""
try:
template_html = template_path.read_text(encoding="utf-8")
except Exception as e:
self.LOG.error(f"[{self.name}] 社交图模板读取失败: {template_path}, error={e}")
return ""
replace_map = {
"__WIDTH__": str(width),
"__HEIGHT__": str(height),
"__GROUP_TITLE__": group_title,
"__SUMMARY_TEXT__": summary_text,
"__EDGE_DEFS__": "".join(edge_defs_parts),
"__EDGE_SVG__": "".join(edge_svg_parts),
"__EDGE_LABELS__": "".join(edge_label_parts),
"__NODE_DEFS__": "".join(node_defs_parts),
"__NODE_SVG__": "".join(node_svg_parts),
}
for key, value in replace_map.items():
template_html = template_html.replace(key, value)
return template_html
async def _build_weekly_report_text(self, group_id: str) -> str:
"""构建“身价周报”文本。
周报口径:
1. 综合排行:取今日 Top5
2. 上升榜/下降榜:对比“今日分数 - 7天前分数”
3. 若7天前无数据则按 0 处理,保证新群也可输出。
"""
if not self.db:
return "❌ 身价模块未初始化"
today = datetime.now().strftime("%Y-%m-%d")
# 使用 SQL DATE_SUB 也可计算,但这里使用 Python 日期便于阅读与调试。
base_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
# 确保今天有快照,避免周报命令调用时出现空数据。
today_rank_rows = self.db.get_today_rankings(today, group_id, 50)
if not today_rank_rows:
self._recompute_group_snapshot(group_id, today)
today_rank_rows = self.db.get_today_rankings(today, group_id, 50)
if not today_rank_rows:
return "📊 本周暂无可用身价数据。"
today_score_map = self.db.get_snapshot_score_map(today, group_id)
base_score_map = self.db.get_snapshot_score_map(base_date, group_id)
# 计算用户分数变化,并做升降序榜单。
delta_rows: List[Tuple[str, float, float, float]] = []
for user_id, today_score in today_score_map.items():
old_score = float(base_score_map.get(user_id, 0.0))
delta = round(float(today_score) - old_score, 2)
delta_rows.append((user_id, float(today_score), old_score, delta))
delta_rows_sorted_up = sorted(delta_rows, key=lambda x: x[3], reverse=True)
delta_rows_sorted_down = sorted(delta_rows, key=lambda x: x[3])
cm = ContactManager.get_instance()
lines: List[str] = [
f"📈 身价周报({today}",
f"对比基线:{base_date}",
"",
"🏆 本周综合排行 Top5",
]
for idx, row in enumerate(today_rank_rows[:5], start=1):
uid = str(row.get("user_id") or "")
nick = cm.get_group_name(group_id, uid) or uid
score = float(row.get("score") or 0.0)
title = str(row.get("title") or "")
lines.append(f"{idx}. {nick} | {score:.2f} | {title}")
lines.append("")
lines.append("🚀 本周上升榜 Top5")
for idx, (uid, today_score, old_score, delta) in enumerate(delta_rows_sorted_up[:5], start=1):
nick = cm.get_group_name(group_id, uid) or uid
lines.append(f"{idx}. {nick} | {old_score:.2f} -> {today_score:.2f} | {delta:+.2f}")
lines.append("")
lines.append("🧊 本周波动下行 Top5")
for idx, (uid, today_score, old_score, delta) in enumerate(delta_rows_sorted_down[:5], start=1):
nick = cm.get_group_name(group_id, uid) or uid
lines.append(f"{idx}. {nick} | {old_score:.2f} -> {today_score:.2f} | {delta:+.2f}")
lines.append("")
lines.append("提示:分数由积分/发言/活跃/社交影响力综合计算。")
# 加一段“社交桥梁人物”,让周报除了涨跌之外还能看到连接贡献。
bridge_rows = self.db.get_social_bridge_rankings(group_id, self.social_window_days, 3)
if bridge_rows:
lines.append("")
lines.append("🌉 本周社交桥梁人物")
for idx, row in enumerate(bridge_rows, start=1):
uid = str(row.get("user_id") or "")
nick = cm.get_group_name(group_id, uid) or uid
partner_count = int(row.get("partner_count") or 0)
lines.append(f"{idx}. {nick} | 连接{partner_count}")
return "\n".join(lines)
def _process_pending_mentions_for_group(self, group_id: str) -> Dict[str, int]:
"""处理单群待抽取@消息(插件内定时业务)。"""
if not self.db:
return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0}
started_at = datetime.now()
window_start = max(int(self.mention_window_start_minutes), 1)
window_end = max(int(self.mention_window_end_minutes), 0)
if window_start <= window_end:
window_start = window_end + 10
self.LOG.warning(
f"[{self.name}] @窗口参数异常已修正: group={group_id}, "
f"window_start={self.mention_window_start_minutes}, "
f"window_end={self.mention_window_end_minutes}, fixed=[{window_start},{window_end}]"
)
rows = self.db.get_pending_mention_extract_messages_for_group(
group_id=group_id,
limit=self.mention_batch_size,
window_start_minutes=window_start,
window_end_minutes=window_end,
)
if not rows:
return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0}
processed, with_mentions, failed = 0, 0, 0
fail_samples: List[str] = []
for idx, row in enumerate(rows, start=1):
try:
message_id = str(row.get("message_id") or "").strip()
sender_id = str(row.get("sender") or "").strip()
raw_xml = str(row.get("message_xml") or "")
msg_time = self._safe_parse_message_time(row.get("timestamp"))
mentioned_ids = self._extract_mentioned_user_ids(raw_xml)
mentioned_ids_json = json.dumps(mentioned_ids, ensure_ascii=False)
self.db.update_message_mentioned_user_ids(
message_id=message_id,
group_id=group_id,
sender_id=sender_id,
mentioned_user_ids_json=mentioned_ids_json,
)
self._persist_mention_graph_data(
group_id=group_id,
sender_id=sender_id,
message_id=message_id,
mentioned_user_ids=mentioned_ids,
msg_time=msg_time,
)
processed += 1
if mentioned_ids:
with_mentions += 1
if idx <= 2:
self.LOG.debug(
f"[{self.name}] @抽取样本: group={group_id}, msg={message_id}, "
f"sender={sender_id}, mentioned_count={len(mentioned_ids)}"
)
except Exception as e:
failed += 1
if len(fail_samples) < 5:
fail_samples.append(str(row.get("message_id") or ""))
self.LOG.error(f"[{self.name}] @抽取失败: group={group_id}, message_id={row.get('message_id')}, error={e}")
elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000)
stats = {"total": len(rows), "processed": processed, "with_mentions": with_mentions, "failed": failed}
self.LOG.info(
f"[{self.name}] @批处理完成: group={group_id}, total={stats['total']}, processed={processed}, "
f"with_mentions={with_mentions}, failed={failed}, cost={elapsed_ms}ms, fail_samples={fail_samples}"
)
return stats
@staticmethod
def _safe_parse_message_time(value: Any) -> datetime:
"""安全解析消息时间,失败时回退到当前时间。"""
if isinstance(value, datetime):
return value
text = str(value or "").strip()
if not text:
return datetime.now()
try:
return datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
except Exception:
return datetime.now()
@staticmethod
def _extract_mentioned_user_ids(raw_xml: str) -> List[str]:
"""从消息 XML 提取@用户ID清单。"""
raw_xml = str(raw_xml or "")
if not raw_xml:
return []
at_user_list_text = ""
try:
root = ET.fromstring(raw_xml)
node = root.find(".//atuserlist")
if node is not None and node.text:
at_user_list_text = str(node.text).strip()
except Exception:
match = re.search(r"<atuserlist><!\[CDATA\[(.*?)\]\]></atuserlist>", raw_xml, flags=re.IGNORECASE | re.DOTALL)
if match:
at_user_list_text = str(match.group(1) or "").strip()
if not at_user_list_text:
return []
raw_ids = re.split(r"[,\s;]+", at_user_list_text)
seen = set()
result: List[str] = []
for uid in raw_ids:
normalized = str(uid or "").strip()
if not normalized or normalized in seen:
continue
seen.add(normalized)
result.append(normalized)
return result
def _persist_mention_graph_data(
self,
group_id: str,
sender_id: str,
message_id: str,
mentioned_user_ids: List[str],
msg_time: datetime,
) -> None:
"""落盘社交图增量数据(明细 + 边 + 个人日汇总)。"""
if not self.db or not group_id or not sender_id or not message_id:
return
invalid_mentions = {"notify@all", "all", "@all"}
clean_ids: List[str] = []
seen = set()
for uid in mentioned_user_ids:
normalized = str(uid or "").strip()
if (not normalized or normalized in invalid_mentions or normalized == sender_id or normalized in seen):
continue
seen.add(normalized)
clean_ids.append(normalized)
if not clean_ids:
return
existing = set(self.db.get_existing_mentions(message_id, group_id, sender_id))
new_ids = [uid for uid in clean_ids if uid not in existing]
if not new_ids:
return
stat_date = msg_time.strftime("%Y-%m-%d")
msg_time_text = msg_time.strftime("%Y-%m-%d %H:%M:%S")
mention_rows = [(message_id, group_id, sender_id, uid, stat_date, msg_time_text) for uid in new_ids]
self.db.insert_message_mentions(mention_rows)
edge_rows = [(stat_date, group_id, sender_id, uid, 1, 1.0) for uid in new_ids]
self.db.upsert_social_edges_daily(edge_rows)
# 发起方:主动@次数 + 互动分
self.db.upsert_social_daily_row((stat_date, group_id, sender_id, 0, len(new_ids), 0, float(len(new_ids))))
# 接收方:被@次数 + 互动分
for uid in new_ids:
self.db.upsert_social_daily_row((stat_date, group_id, uid, 1, 0, 0, 1.0))
self.db.refresh_unique_interactors(stat_date, group_id, [sender_id, *new_ids])
def _build_explain_text(self) -> str:
"""输出算法说明文本。"""
return (
"📘 身价算法说明\n"
f"- 积分权重:{self.points_weight:.2f}\n"
f"- 发言权重:{self.message_weight:.2f}{self.message_window_days}天)\n"
f"- 活跃权重:{self.active_days_weight:.2f}{self.active_window_days}天)\n"
f"- 社交权重:{self.social_weight:.2f}{self.social_window_days}天)\n"
f"- 潜水惩罚上限:{self.inactivity_penalty_max:.0f}\n"
"- 所有维度先归一化再加权极端值按95分位截断减少刷屏与极值干扰。"
)
def _parse_rank_limit(self, content: str) -> int:
"""解析排行条数参数。"""
parts = content.split()
if len(parts) < 2:
return self.default_rank_limit
try:
limit = int(parts[1])
except Exception:
return self.default_rank_limit
limit = max(1, limit)
return min(limit, self.max_rank_limit)
def _parse_trend_days(self, content: str) -> int:
"""解析趋势天数参数。
约束策略:
1. 默认展示近7天避免输出过长
2. 上限限制到配置值,防止单次命令读取过多历史。
"""
parts = content.split()
if len(parts) < 2:
return self.default_trend_days
try:
days = int(parts[1])
except Exception:
return self.default_trend_days
days = max(1, days)
return min(days, self.max_trend_days)
def _parse_graph_nodes(self, content: str) -> int:
"""解析社交关系图节点数量参数。
规则说明:
1. 不传人数时,默认返回 0表示不截断、尽量全量展示
2. 传人数时仍允许只看局部,但不再强制套 max_graph_nodes 上限;
3. 最小保留 6避免用户传太小导致图失去可读性。
"""
parts = content.split()
if len(parts) < 2:
return self.default_graph_nodes
try:
node_count = int(parts[1])
except Exception:
return self.default_graph_nodes
if node_count <= 0:
return 0
node_count = max(6, node_count)
if self.max_graph_nodes > 0:
return min(node_count, self.max_graph_nodes)
return node_count
@staticmethod
def _normalize_linear(value: float, ceiling: float) -> float:
"""线性归一化并截断到 [0, 1]。"""
if ceiling <= 0:
return 0.0
return max(0.0, min(float(value) / float(ceiling), 1.0))
@staticmethod
def _normalize_log(value: float, ceiling: float) -> float:
"""对高离散指标做对数归一化,避免大户统治。"""
if ceiling <= 0:
return 0.0
safe_value = max(float(value), 0.0)
return max(0.0, min(math.log1p(safe_value) / math.log1p(float(ceiling)), 1.0))
@staticmethod
def _percentile95(values: List[float]) -> float:
"""计算95分位数最小返回1避免除零"""
if not values:
return 1.0
sorted_vals = sorted([max(float(v), 0.0) for v in values])
n = len(sorted_vals)
idx = max(0, min(n - 1, math.ceil(0.95 * n) - 1))
return max(sorted_vals[idx], 1.0)
@staticmethod
def _build_title(rank_no: int, total: int, inactive_days: int) -> str:
"""根据分位区间生成称号。"""
if total <= 0:
return "未定级"
ratio = rank_no / total
if ratio <= 0.01:
return "群之巨鳄"
if ratio <= 0.05:
return "社交名流"
if ratio <= 0.20:
return "活跃中产"
if ratio <= 0.80:
return "稳定居民"
if ratio >= 0.90 and inactive_days >= 30:
return "潜水观察员"
return "潜力新人"