# -*- coding: utf-8 -*- import json import math import re import xml.etree.ElementTree as ET from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple from loguru import logger from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from db.base import BaseDBOperator from db.connection import DBConnectionManager from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import PermissionStatus, GroupBotManager from utils.wechat.contact_manager import ContactManager class ValueRankDB(BaseDBOperator): """Value Rank 数据访问层。 说明: 1. 这里集中封装 SQL,避免插件主流程里混杂大量查询语句; 2. 所有方法都只做“数据读写”,不做业务打分,便于后续独立测试; 3. 读取失败时统一返回空结构,保证上层逻辑可降级继续执行。 """ def get_candidate_users(self, group_id: str, active_window_days: int, social_window_days: int) -> List[str]: """获取需要参与打分的候选成员列表。 候选来源采用并集策略,避免遗漏: 1. 积分表中出现过的成员; 2. 近期发过言的成员; 3. 近期社交表出现过的成员(被@或主动@)。 """ sql = """ SELECT user_id FROM t_user_points WHERE group_id = %s AND user_id <> 'SYSTEM' UNION SELECT DISTINCT sender AS user_id FROM messages WHERE group_id = %s AND sender IS NOT NULL AND sender <> '' AND timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) UNION SELECT DISTINCT user_id FROM t_value_rank_social_daily WHERE group_id = %s AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) """ rows = self.execute_query(sql, (group_id, group_id, active_window_days, group_id, social_window_days)) or [] return [str(r.get("user_id") or "").strip() for r in rows if str(r.get("user_id") or "").strip()] def get_points_map(self, group_id: str) -> Dict[str, int]: """读取群内用户积分映射:{user_id: total_points}。""" sql = """ SELECT user_id, total_points FROM t_user_points WHERE group_id = %s AND user_id <> 'SYSTEM' """ rows = self.execute_query(sql, (group_id,)) or [] result: Dict[str, int] = {} for row in rows: user_id = str(row.get("user_id") or "").strip() if not user_id: continue result[user_id] = int(row.get("total_points") or 0) return result def get_message_stats_map(self, group_id: str, message_window_days: int, active_window_days: int) -> Dict[str, Dict[str, Any]]: """按用户读取消息指标。 返回结构示例: { "wxid_xxx": { "msg_count_window": 123, "active_days_window": 20, "last_active_date": "2026-04-21" } } """ sql = """ SELECT sender AS user_id, SUM(CASE WHEN timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) THEN 1 ELSE 0 END) AS msg_count_window, COUNT(DISTINCT DATE(CASE WHEN timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) THEN timestamp END)) AS active_days_window, MAX(DATE(timestamp)) AS last_active_date FROM messages WHERE group_id = %s AND sender IS NOT NULL AND sender <> '' AND timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) GROUP BY sender """ rows = self.execute_query(sql, (message_window_days, active_window_days, group_id, active_window_days)) or [] result: Dict[str, Dict[str, Any]] = {} for row in rows: user_id = str(row.get("user_id") or "").strip() if not user_id: continue result[user_id] = { "msg_count_window": int(row.get("msg_count_window") or 0), "active_days_window": int(row.get("active_days_window") or 0), "last_active_date": row.get("last_active_date"), } return result def get_last_active_date_map(self, group_id: str) -> Dict[str, Any]: """读取群内用户最后发言日期,用于计算潜水惩罚。""" sql = """ SELECT sender AS user_id, MAX(DATE(timestamp)) AS last_active_date FROM messages WHERE group_id = %s AND sender IS NOT NULL AND sender <> '' GROUP BY sender """ rows = self.execute_query(sql, (group_id,)) or [] return { str(row.get("user_id") or "").strip(): row.get("last_active_date") for row in rows if str(row.get("user_id") or "").strip() } def get_social_stats_map(self, group_id: str, social_window_days: int) -> Dict[str, Dict[str, Any]]: """按用户读取社交指标(日汇总窗口)。""" sql = """ SELECT user_id, SUM(mentioned_count) AS mentioned_count_window, SUM(mention_others_count) AS mention_others_count_window, SUM(unique_interactors) AS unique_interactors_window, SUM(interaction_score) AS interaction_score_window FROM t_value_rank_social_daily WHERE group_id = %s AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) GROUP BY user_id """ rows = self.execute_query(sql, (group_id, social_window_days)) or [] result: Dict[str, Dict[str, Any]] = {} for row in rows: user_id = str(row.get("user_id") or "").strip() if not user_id: continue result[user_id] = { "mentioned_count_window": int(row.get("mentioned_count_window") or 0), "mention_others_count_window": int(row.get("mention_others_count_window") or 0), "unique_interactors_window": int(row.get("unique_interactors_window") or 0), "interaction_score_window": float(row.get("interaction_score_window") or 0.0), } return result def upsert_snapshots(self, rows: List[Tuple[Any, ...]]) -> bool: """批量写入身价快照。""" if not rows: return True sql = """ INSERT INTO t_value_rank_snapshot ( stat_date, group_id, user_id, score, rank_no, title, points_total, msg_count_7d, active_days_30, inactive_days, score_detail_json ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE score = VALUES(score), rank_no = VALUES(rank_no), title = VALUES(title), points_total = VALUES(points_total), msg_count_7d = VALUES(msg_count_7d), active_days_30 = VALUES(active_days_30), inactive_days = VALUES(inactive_days), score_detail_json = VALUES(score_detail_json), updated_at = CURRENT_TIMESTAMP """ return self.execute_batch(sql, rows) def get_user_snapshot(self, stat_date: str, group_id: str, user_id: str) -> Optional[Dict[str, Any]]: """读取某用户在某天的身价快照。""" sql = """ SELECT * FROM t_value_rank_snapshot WHERE stat_date = %s AND group_id = %s AND user_id = %s LIMIT 1 """ return self.execute_query(sql, (stat_date, group_id, user_id), fetch_one=True) def get_yesterday_score(self, stat_date: str, group_id: str, user_id: str) -> Optional[float]: """读取昨日分数用于计算涨跌幅。""" sql = """ SELECT score FROM t_value_rank_snapshot WHERE stat_date = DATE_SUB(%s, INTERVAL 1 DAY) AND group_id = %s AND user_id = %s LIMIT 1 """ row = self.execute_query(sql, (stat_date, group_id, user_id), fetch_one=True) if not row: return None return float(row.get("score") or 0) def get_today_rankings(self, stat_date: str, group_id: str, limit: int) -> List[Dict[str, Any]]: """读取今日排行榜。""" sql = """ SELECT user_id, score, rank_no, title FROM t_value_rank_snapshot WHERE stat_date = %s AND group_id = %s ORDER BY rank_no ASC LIMIT %s """ return self.execute_query(sql, (stat_date, group_id, limit)) or [] def get_social_hot_rankings(self, group_id: str, social_window_days: int, limit: int) -> List[Dict[str, Any]]: """读取社交热度榜(窗口聚合)。""" sql = """ SELECT user_id, SUM(mentioned_count) AS mentioned_count_window, SUM(mention_others_count) AS mention_others_count_window, SUM(unique_interactors) AS unique_interactors_window, SUM(interaction_score) AS interaction_score_window FROM t_value_rank_social_daily WHERE group_id = %s AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) GROUP BY user_id ORDER BY interaction_score_window DESC, mentioned_count_window DESC LIMIT %s """ return self.execute_query(sql, (group_id, social_window_days, limit)) or [] def get_top_partner_pairs(self, group_id: str, social_window_days: int, limit: int) -> List[Dict[str, Any]]: """读取搭子榜(无向边聚合)。""" sql = """ SELECT LEAST(from_user_id, to_user_id) AS user_a, GREATEST(from_user_id, to_user_id) AS user_b, SUM(mention_count) AS mention_count_window, SUM(interaction_score) AS interaction_score_window FROM t_social_edges_daily WHERE group_id = %s AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) GROUP BY LEAST(from_user_id, to_user_id), GREATEST(from_user_id, to_user_id) ORDER BY interaction_score_window DESC, mention_count_window DESC LIMIT %s """ return self.execute_query(sql, (group_id, social_window_days, limit)) or [] def get_social_bridge_rankings(self, group_id: str, social_window_days: int, limit: int) -> List[Dict[str, Any]]: """读取社交桥梁榜。 口径说明: 1. 将有向边转换为“用户-伙伴”的无向视角(from 与 to 都算该用户触达); 2. 用去重伙伴数衡量“桥梁覆盖面”,再用互动分做同分排序; 3. 该榜更强调“连接不同人”的能力,而不是单点高频互动。 """ sql = """ SELECT user_id, COUNT(DISTINCT partner_id) AS partner_count, SUM(mention_count) AS mention_count_window, SUM(interaction_score) AS interaction_score_window FROM ( SELECT from_user_id AS user_id, to_user_id AS partner_id, mention_count, interaction_score FROM t_social_edges_daily WHERE group_id = %s AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) UNION ALL SELECT to_user_id AS user_id, from_user_id AS partner_id, mention_count, interaction_score FROM t_social_edges_daily WHERE group_id = %s AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) ) t GROUP BY user_id ORDER BY partner_count DESC, interaction_score_window DESC, mention_count_window DESC LIMIT %s """ return self.execute_query( sql, (group_id, social_window_days, group_id, social_window_days, limit), ) or [] def get_user_score_trend(self, group_id: str, user_id: str, days: int) -> List[Dict[str, Any]]: """读取用户近 N 天身价趋势(按日期升序)。""" sql = """ SELECT stat_date, score, rank_no FROM t_value_rank_snapshot WHERE group_id = %s AND user_id = %s AND stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) ORDER BY stat_date ASC """ return self.execute_query(sql, (group_id, user_id, days)) or [] def get_snapshot_score_map(self, stat_date: str, group_id: str) -> Dict[str, float]: """读取某天群内所有用户分数字典:{user_id: score}。""" sql = """ SELECT user_id, score FROM t_value_rank_snapshot WHERE stat_date = %s AND group_id = %s """ rows = self.execute_query(sql, (stat_date, group_id)) or [] result: Dict[str, float] = {} for row in rows: user_id = str(row.get("user_id") or "").strip() if not user_id: continue result[user_id] = float(row.get("score") or 0.0) return result def get_pending_mention_extract_messages_for_group( self, group_id: str, limit: int, window_start_minutes: int, window_end_minutes: int, ) -> List[Dict[str, Any]]: """按群读取待处理@抽取消息。""" sql = """ SELECT message_id, group_id, sender, message_xml, timestamp FROM messages WHERE group_id = %s AND (mentioned_user_ids IS NULL OR mentioned_user_ids = '') AND message_xml IS NOT NULL AND message_xml <> '' AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE) AND timestamp < DATE_SUB(NOW(), INTERVAL %s MINUTE) ORDER BY timestamp ASC LIMIT %s """ return self.execute_query(sql, (group_id, window_start_minutes, window_end_minutes, limit)) or [] def update_message_mentioned_user_ids( self, message_id: str, group_id: str, sender_id: str, mentioned_user_ids_json: str, ) -> bool: """回填消息表的 mentioned_user_ids 字段。""" return self.execute_update( """ UPDATE messages SET mentioned_user_ids = %s WHERE message_id = %s AND group_id = %s AND sender = %s """, (mentioned_user_ids_json, message_id, group_id, sender_id), ) def get_existing_mentions(self, message_id: str, group_id: str, sender_id: str) -> List[str]: """查询某条消息已经入库的@关系,避免重复累加。""" rows = self.execute_query( """ SELECT mentioned_user_id FROM t_message_mentions WHERE message_id = %s AND group_id = %s AND sender_id = %s """, (message_id, group_id, sender_id), ) or [] return [str(r.get("mentioned_user_id") or "").strip() for r in rows if str(r.get("mentioned_user_id") or "").strip()] def insert_message_mentions(self, rows: List[Tuple[Any, ...]]) -> bool: """批量写入@明细。""" if not rows: return True return self.execute_batch( """ INSERT IGNORE INTO t_message_mentions (message_id, group_id, sender_id, mentioned_user_id, stat_date, msg_time) VALUES (%s, %s, %s, %s, %s, %s) """, rows, ) def upsert_social_edges_daily(self, rows: List[Tuple[Any, ...]]) -> bool: """批量累加社交边。""" if not rows: return True return self.execute_batch( """ INSERT INTO t_social_edges_daily (stat_date, group_id, from_user_id, to_user_id, mention_count, interaction_score) VALUES (%s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE mention_count = mention_count + VALUES(mention_count), interaction_score = interaction_score + VALUES(interaction_score), update_time = CURRENT_TIMESTAMP """, rows, ) def upsert_social_daily_row(self, row: Tuple[Any, ...]) -> bool: """写入或更新单个用户的社交日汇总。""" return self.execute_update( """ INSERT INTO t_value_rank_social_daily (stat_date, group_id, user_id, mentioned_count, mention_others_count, unique_interactors, interaction_score) VALUES (%s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE mentioned_count = mentioned_count + VALUES(mentioned_count), mention_others_count = mention_others_count + VALUES(mention_others_count), interaction_score = interaction_score + VALUES(interaction_score), update_time = CURRENT_TIMESTAMP """, row, ) def refresh_unique_interactors(self, stat_date: str, group_id: str, user_ids: List[str]) -> None: """回填去重互动人数。""" deduped = [] seen = set() for uid in user_ids: normalized = str(uid or "").strip() if not normalized or normalized in seen: continue seen.add(normalized) deduped.append(normalized) for uid in deduped: row = self.execute_query( """ SELECT COUNT(DISTINCT partner_id) AS partner_count FROM ( SELECT mentioned_user_id AS partner_id FROM t_message_mentions WHERE stat_date = %s AND group_id = %s AND sender_id = %s UNION SELECT sender_id AS partner_id FROM t_message_mentions WHERE stat_date = %s AND group_id = %s AND mentioned_user_id = %s ) t """, (stat_date, group_id, uid, stat_date, group_id, uid), fetch_one=True, ) or {} partner_count = int(row.get("partner_count") or 0) self.execute_update( """ UPDATE t_value_rank_social_daily SET unique_interactors = %s, update_time = CURRENT_TIMESTAMP WHERE stat_date = %s AND group_id = %s AND user_id = %s """, (partner_count, stat_date, group_id, uid), ) class ValueRankPlugin(MessagePluginInterface): """群成员身价排行插件。 设计目标: 1. 将“积分、发言、活跃、社交影响力”统一折算为可解释的身价分; 2. 支持手动查询与后台定时重算; 3. 通过快照持久化,支持趋势分析和涨跌说明。 """ FEATURE_KEY = "VALUE_RANK" FEATURE_DESCRIPTION = "📊 身价排行 [我的身价, 身价排行, 社交热度榜, 搭子榜, 社交桥梁榜, 我的趋势, 身价周报, 身价说明, 重算身价]" @property def name(self) -> str: return "身价排行" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "根据积分、发言、活跃与社交影响力计算群成员身价。" @property def author(self) -> str: return "ABOT Team" @property def command_prefix(self) -> Optional[str]: return "" @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.feature = self.register_feature() self.db: Optional[ValueRankDB] = None # 配置默认值:即使未配置 config.toml,也能以保守参数运行。 self.enable = True self._commands = ["我的身价", "身价排行", "社交热度榜", "搭子榜", "社交桥梁榜", "我的趋势", "身价周报", "身价说明", "重算身价"] self.command_format = ( "我的身价 | 身价排行 [名次] | 社交热度榜 [名次] | 搭子榜 [名次] | " "社交桥梁榜 [名次] | 我的趋势 [天数] | 身价周报 | 身价说明 | 重算身价" ) self.message_window_days = 7 self.active_window_days = 30 self.social_window_days = 7 self.points_weight = 0.30 self.message_weight = 0.35 self.active_days_weight = 0.20 self.social_weight = 0.15 self.inactivity_penalty_max = 150 self.base_score_scale = 1000 self.default_rank_limit = 10 self.max_rank_limit = 50 self.default_trend_days = 7 self.max_trend_days = 30 self.mention_batch_size = 200 self.mention_window_start_minutes = 20 self.mention_window_end_minutes = 10 def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件与配置。""" self.LOG = logger cfg = self._config.get("ValueRank", {}) self.enable = bool(cfg.get("enable", True)) self._commands = cfg.get("command", self._commands) self.command_format = cfg.get("command-format", self.command_format) self.message_window_days = int(cfg.get("message_window_days", self.message_window_days)) self.active_window_days = int(cfg.get("active_window_days", self.active_window_days)) self.social_window_days = int(cfg.get("social_window_days", self.social_window_days)) self.points_weight = float(cfg.get("points_weight", self.points_weight)) self.message_weight = float(cfg.get("message_weight", self.message_weight)) self.active_days_weight = float(cfg.get("active_days_weight", self.active_days_weight)) self.social_weight = float(cfg.get("social_weight", self.social_weight)) self.inactivity_penalty_max = float(cfg.get("inactivity_penalty_max", self.inactivity_penalty_max)) self.base_score_scale = float(cfg.get("base_score_scale", self.base_score_scale)) self.default_rank_limit = int(cfg.get("default_rank_limit", self.default_rank_limit)) self.max_rank_limit = int(cfg.get("max_rank_limit", self.max_rank_limit)) self.default_trend_days = int(cfg.get("default_trend_days", self.default_trend_days)) self.max_trend_days = int(cfg.get("max_trend_days", self.max_trend_days)) self.mention_batch_size = int(cfg.get("mention_batch_size", self.mention_batch_size)) self.mention_window_start_minutes = int(cfg.get("mention_window_start_minutes", self.mention_window_start_minutes)) self.mention_window_end_minutes = int(cfg.get("mention_window_end_minutes", self.mention_window_end_minutes)) # 权重归一化:避免配置误差导致总权重不为 1。 weight_sum = self.points_weight + self.message_weight + self.active_days_weight + self.social_weight if weight_sum <= 0: self.points_weight, self.message_weight, self.active_days_weight, self.social_weight = 0.30, 0.35, 0.20, 0.15 else: self.points_weight /= weight_sum self.message_weight /= weight_sum self.active_days_weight /= weight_sum self.social_weight /= weight_sum self.db = ValueRankDB(DBConnectionManager.get_instance()) self.LOG.info(f"[{self.name}] 初始化完成,命令: {self._commands}") return True def start(self) -> bool: self.status = PluginStatus.RUNNING return True def stop(self) -> bool: self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: if not self.enable: return False content = str(message.get("content", "")).strip() if not content: return False command = content.split(" ")[0] return command in self._commands @plugin_stats_decorator(plugin_name="身价排行") async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理用户命令入口。""" content = str(message.get("content", "")).strip() command = content.split(" ")[0] sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") bot = message.get("bot") if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" if not roomid: await bot.send_text_message(sender, "该功能仅支持群聊使用。", sender) return True, "非群聊" if command == "我的身价": text = await self._build_my_value_text(roomid, sender) await bot.send_text_message(roomid, text, sender) return True, "查询成功" if command == "身价排行": limit = self._parse_rank_limit(content) text = await self._build_ranking_text(roomid, limit) await bot.send_text_message(roomid, text, sender) return True, "查询成功" if command == "社交热度榜": limit = self._parse_rank_limit(content) text = await self._build_social_hot_text(roomid, limit) await bot.send_text_message(roomid, text, sender) return True, "查询成功" if command == "搭子榜": limit = self._parse_rank_limit(content) text = await self._build_partner_pairs_text(roomid, limit) await bot.send_text_message(roomid, text, sender) return True, "查询成功" if command == "社交桥梁榜": limit = self._parse_rank_limit(content) text = await self._build_social_bridge_text(roomid, limit) await bot.send_text_message(roomid, text, sender) return True, "查询成功" if command == "我的趋势": days = self._parse_trend_days(content) text = await self._build_my_trend_text(roomid, sender, days) await bot.send_text_message(roomid, text, sender) return True, "查询成功" if command == "身价周报": text = await self._build_weekly_report_text(roomid) await bot.send_text_message(roomid, text, sender) return True, "查询成功" if command == "身价说明": await bot.send_text_message(roomid, self._build_explain_text(), sender) return True, "查询成功" if command == "重算身价": # 重算属于管理动作,只允许机器人管理员或群管理员执行,避免被滥用。 if not GroupBotManager.is_admin_for_group(sender, roomid): await bot.send_text_message(roomid, "仅管理员可执行重算身价。", sender) return True, "权限不足" stat_date = datetime.now().strftime("%Y-%m-%d") user_count = self._recompute_group_snapshot(roomid, stat_date) await bot.send_text_message(roomid, f"✅ 重算完成,已更新 {user_count} 名成员。", sender) return True, "重算成功" await bot.send_text_message(roomid, f"❌命令格式错误\n{self.command_format}", sender) return True, "命令错误" def get_schedule_actions(self) -> List[Dict[str, Any]]: """声明可调度动作:每日重算 + 每周周报。""" return [ { "action_key": "value_rank_daily_recompute", "name": "身价每日重算", "description": "每天凌晨重算群成员身价快照", "trigger_type": "at_times", "trigger_config": {"time_list": ["04:00"]}, "target_scope": "all_enabled_groups", "target_config": {}, "payload": {}, "default_enabled": True, }, { "action_key": "value_rank_mentions_extract", "name": "@关系批处理", "description": "每10分钟批量抽取10-20分钟前的@关系并更新社交图", "trigger_type": "every_seconds", "trigger_config": {"seconds": 600}, "target_scope": "all_enabled_groups", "target_config": {}, "payload": {}, "default_enabled": True, }, { "action_key": "value_rank_weekly_report_push", "name": "身价周报推送", "description": "每周推送身价周报(上升榜/下降榜/综合排行)", "trigger_type": "every_week_time", # weekday 取值与 datetime.weekday() 一致:周一=0 ... 周日=6。 "trigger_config": {"weekday": 0, "time_str": "09:35"}, "target_scope": "all_enabled_groups", "target_config": {}, "payload": {}, "default_enabled": True, }, ] async def run_scheduled_action(self, action_key: str, context: Dict[str, Any]) -> Dict[str, Any]: """执行调度动作。""" if action_key not in {"value_rank_daily_recompute", "value_rank_weekly_report_push", "value_rank_mentions_extract"}: return {"success": False, "summary": f"不支持动作: {action_key}", "detail": {}} target_groups = [str(g).strip() for g in (context.get("target_groups") or []) if str(g).strip()] if not target_groups: target_groups = [ gid for gid in GroupBotManager.get_group_list() if GroupBotManager.get_group_permission(gid, self.feature).value == "enabled" ] success_groups: List[str] = [] failed_groups: Dict[str, str] = {} updated_users = 0 pushed_groups: List[str] = [] stat_date = datetime.now().strftime("%Y-%m-%d") bot = context.get("bot") or getattr(self, "bot", None) # @抽取任务不依赖 bot。 if action_key == "value_rank_mentions_extract": total_stats = {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0} for gid in target_groups: try: stats = self._process_pending_mentions_for_group(gid) total_stats["total"] += int(stats.get("total", 0)) total_stats["processed"] += int(stats.get("processed", 0)) total_stats["with_mentions"] += int(stats.get("with_mentions", 0)) total_stats["failed"] += int(stats.get("failed", 0)) success_groups.append(gid) except Exception as e: failed_groups[gid] = str(e) return { "success": len(failed_groups) == 0, "summary": ( f"@关系批处理完成: 读取{total_stats['total']}条, " f"处理{total_stats['processed']}条, 含@{total_stats['with_mentions']}条, " f"失败{total_stats['failed']}条, 异常群{len(failed_groups)}个" ), "detail": { "window": f"[NOW-{self.mention_window_start_minutes}m, NOW-{self.mention_window_end_minutes}m)", "batch_size": self.mention_batch_size, "stats": total_stats, "failed_groups": failed_groups, }, } # 周报任务先确保当日快照存在,再执行推送,避免“有报表命令但无数据”。 if action_key == "value_rank_weekly_report_push" and not bot: return {"success": False, "summary": "周报推送失败:bot 未注入", "detail": {}} for gid in target_groups: try: # 每个群都先重算一次,确保报告与排行数据是“当天最新口径”。 updated_users += self._recompute_group_snapshot(gid, stat_date) if action_key == "value_rank_weekly_report_push": weekly_text = await self._build_weekly_report_text(gid) await bot.send_text_message(gid, weekly_text, "") pushed_groups.append(gid) success_groups.append(gid) except Exception as e: failed_groups[gid] = str(e) if action_key == "value_rank_weekly_report_push": return { "success": len(failed_groups) == 0, "summary": f"身价周报完成:推送{len(pushed_groups)}群,失败{len(failed_groups)}群", "detail": { "stat_date": stat_date, "updated_users": updated_users, "pushed_groups": pushed_groups, "failed_groups": failed_groups, }, } return { "success": len(failed_groups) == 0, "summary": f"身价重算完成:成功{len(success_groups)}群,失败{len(failed_groups)}群", "detail": { "stat_date": stat_date, "updated_users": updated_users, "success_groups": success_groups, "failed_groups": failed_groups, }, } def _recompute_group_snapshot(self, group_id: str, stat_date: str) -> int: """重算单群指定日期快照。 这是插件核心逻辑: 1. 聚合各维度原始指标; 2. 归一化并计算分数; 3. 生成排名与称号; 4. 持久化到快照表。 """ if not self.db: return 0 candidates = self.db.get_candidate_users(group_id, self.active_window_days, self.social_window_days) if not candidates: return 0 points_map = self.db.get_points_map(group_id) msg_map = self.db.get_message_stats_map(group_id, self.message_window_days, self.active_window_days) last_active_map = self.db.get_last_active_date_map(group_id) social_map = self.db.get_social_stats_map(group_id, self.social_window_days) # 组装原始指标,后续统一归一化处理。 metrics: List[Dict[str, Any]] = [] for user_id in candidates: points_total = int(points_map.get(user_id, 0)) msg_stats = msg_map.get(user_id, {}) social_stats = social_map.get(user_id, {}) msg_count_7d = int(msg_stats.get("msg_count_window", 0)) active_days_30 = int(msg_stats.get("active_days_window", 0)) interaction_score_7d = float(social_stats.get("interaction_score_window", 0.0)) # inactive_days 优先使用全量最后发言日期,若无记录视为长期未活跃。 inactive_days = 365 last_active_date = last_active_map.get(user_id) if last_active_date: try: if isinstance(last_active_date, datetime): dt = last_active_date else: dt = datetime.strptime(str(last_active_date), "%Y-%m-%d") inactive_days = max((datetime.now().date() - dt.date()).days, 0) except Exception: inactive_days = 365 metrics.append( { "user_id": user_id, "points_total": points_total, "msg_count_7d": msg_count_7d, "active_days_30": active_days_30, "interaction_score_7d": interaction_score_7d, "inactive_days": inactive_days, } ) if not metrics: return 0 # 计算分位阈值,避免极端值主导。 p95_points = self._percentile95([m["points_total"] for m in metrics]) p95_msg = self._percentile95([m["msg_count_7d"] for m in metrics]) p95_social = self._percentile95([m["interaction_score_7d"] for m in metrics]) scored_rows: List[Dict[str, Any]] = [] for m in metrics: p_norm = self._normalize_log(m["points_total"], p95_points) m_norm = self._normalize_linear(m["msg_count_7d"], p95_msg) a_norm = self._normalize_linear(m["active_days_30"], self.active_window_days) s_norm = self._normalize_linear(m["interaction_score_7d"], p95_social) i_penalty = self._normalize_linear(m["inactive_days"], 30) base_score = self.base_score_scale * ( self.points_weight * p_norm + self.message_weight * m_norm + self.active_days_weight * a_norm + self.social_weight * s_norm ) penalty_score = self.inactivity_penalty_max * i_penalty final_score = max(round(base_score - penalty_score, 2), 0.0) scored_rows.append( { **m, "score": final_score, "p_norm": p_norm, "m_norm": m_norm, "a_norm": a_norm, "s_norm": s_norm, "penalty": penalty_score, } ) # 分数高到低排序,同分时按活跃和积分补充排序,提升稳定性。 scored_rows.sort( key=lambda x: ( -float(x["score"]), -int(x["msg_count_7d"]), -float(x["interaction_score_7d"]), -int(x["points_total"]), ) ) total = len(scored_rows) batch_rows: List[Tuple[Any, ...]] = [] for idx, row in enumerate(scored_rows, start=1): title = self._build_title(idx, total, int(row["inactive_days"])) score_detail = { "points_norm": round(float(row["p_norm"]), 6), "message_norm": round(float(row["m_norm"]), 6), "active_norm": round(float(row["a_norm"]), 6), "social_norm": round(float(row["s_norm"]), 6), "penalty_score": round(float(row["penalty"]), 2), "weights": { "points": self.points_weight, "message": self.message_weight, "active": self.active_days_weight, "social": self.social_weight, }, } batch_rows.append( ( stat_date, group_id, row["user_id"], row["score"], idx, title, row["points_total"], row["msg_count_7d"], row["active_days_30"], row["inactive_days"], json.dumps(score_detail, ensure_ascii=False), ) ) self.db.upsert_snapshots(batch_rows) return len(batch_rows) async def _build_my_value_text(self, group_id: str, user_id: str) -> str: """构建“我的身价”输出文本。""" if not self.db: return "❌ 身价模块未初始化" stat_date = datetime.now().strftime("%Y-%m-%d") row = self.db.get_user_snapshot(stat_date, group_id, user_id) # 若当天还没有快照,按需触发一次当前群重算,保证命令可用性。 if not row: self._recompute_group_snapshot(group_id, stat_date) row = self.db.get_user_snapshot(stat_date, group_id, user_id) if not row: return "📊 暂无你的身价数据,请先在群里发言后再试。" yesterday_score = self.db.get_yesterday_score(stat_date, group_id, user_id) current_score = float(row.get("score") or 0) if yesterday_score is None: change_text = "新上榜(暂无昨日对比)" else: delta = current_score - float(yesterday_score) if abs(yesterday_score) < 1e-9: change_text = f"较昨日变化:{delta:+.2f}" else: pct = delta / float(yesterday_score) * 100 change_text = f"较昨日:{pct:+.2f}%" nick = ContactManager.get_instance().get_group_name(group_id, user_id) or user_id lines = [ f"📊 {nick} 的身价报告({stat_date})", f"总身价:{current_score:.2f}", f"群内排名:第 {int(row.get('rank_no') or 0)} 名({row.get('title') or '未定级'})", "", f"💰 积分资产:{int(row.get('points_total') or 0)}", f"🗣️ 7日发言:{int(row.get('msg_count_7d') or 0)}", f"📅 30日活跃:{int(row.get('active_days_30') or 0)} 天", f"🌊 潜水天数:{int(row.get('inactive_days') or 0)} 天", "", change_text, ] return "\n".join(lines) async def _build_ranking_text(self, group_id: str, limit: int) -> str: """构建“身价排行”输出文本。""" if not self.db: return "❌ 身价模块未初始化" stat_date = datetime.now().strftime("%Y-%m-%d") rows = self.db.get_today_rankings(stat_date, group_id, limit) if not rows: self._recompute_group_snapshot(group_id, stat_date) rows = self.db.get_today_rankings(stat_date, group_id, limit) if not rows: return "📊 今日暂无排行数据。" cm = ContactManager.get_instance() lines = [f"🏆 身价排行榜(Top{len(rows)} | {stat_date})"] for idx, row in enumerate(rows, start=1): user_id = str(row.get("user_id") or "") score = float(row.get("score") or 0) title = str(row.get("title") or "") nick = cm.get_group_name(group_id, user_id) or user_id medal = "🥇" if idx == 1 else "🥈" if idx == 2 else "🥉" if idx == 3 else f"{idx}." lines.append(f"{medal} {nick} | {score:.2f} | {title}") return "\n".join(lines) async def _build_social_hot_text(self, group_id: str, limit: int) -> str: """构建“社交热度榜”输出文本。""" if not self.db: return "❌ 身价模块未初始化" rows = self.db.get_social_hot_rankings(group_id, self.social_window_days, limit) if not rows: return "📊 近期暂无社交热度数据。" cm = ContactManager.get_instance() lines = [f"🔥 社交热度榜(近{self.social_window_days}天 Top{len(rows)})"] for idx, row in enumerate(rows, start=1): user_id = str(row.get("user_id") or "") score = float(row.get("interaction_score_window") or 0.0) mentioned_count = int(row.get("mentioned_count_window") or 0) mention_others_count = int(row.get("mention_others_count_window") or 0) nick = cm.get_group_name(group_id, user_id) or user_id medal = "🥇" if idx == 1 else "🥈" if idx == 2 else "🥉" if idx == 3 else f"{idx}." lines.append( f"{medal} {nick} | 热度{score:.1f} | 被@{mentioned_count} | 主动@{mention_others_count}" ) return "\n".join(lines) async def _build_partner_pairs_text(self, group_id: str, limit: int) -> str: """构建“搭子榜”输出文本。""" if not self.db: return "❌ 身价模块未初始化" rows = self.db.get_top_partner_pairs(group_id, self.social_window_days, limit) if not rows: return "📊 近期暂无搭子关系数据。" cm = ContactManager.get_instance() lines = [f"🤝 搭子榜(近{self.social_window_days}天 Top{len(rows)})"] for idx, row in enumerate(rows, start=1): user_a = str(row.get("user_a") or "") user_b = str(row.get("user_b") or "") nick_a = cm.get_group_name(group_id, user_a) or user_a nick_b = cm.get_group_name(group_id, user_b) or user_b mention_count = int(row.get("mention_count_window") or 0) score = float(row.get("interaction_score_window") or 0.0) medal = "🥇" if idx == 1 else "🥈" if idx == 2 else "🥉" if idx == 3 else f"{idx}." lines.append(f"{medal} {nick_a} × {nick_b} | 互动{score:.1f} | @次数{mention_count}") return "\n".join(lines) async def _build_social_bridge_text(self, group_id: str, limit: int) -> str: """构建“社交桥梁榜”输出文本。 说明: 1. 伙伴数越高,说明该成员连接的人越广; 2. 同伙伴数时按互动分排序,避免“浅连接”占优。 """ if not self.db: return "❌ 身价模块未初始化" rows = self.db.get_social_bridge_rankings(group_id, self.social_window_days, limit) if not rows: return "📊 近期暂无社交桥梁数据。" cm = ContactManager.get_instance() lines = [f"🌉 社交桥梁榜(近{self.social_window_days}天 Top{len(rows)})"] for idx, row in enumerate(rows, start=1): user_id = str(row.get("user_id") or "") partner_count = int(row.get("partner_count") or 0) mention_count = int(row.get("mention_count_window") or 0) score = float(row.get("interaction_score_window") or 0.0) nick = cm.get_group_name(group_id, user_id) or user_id medal = "🥇" if idx == 1 else "🥈" if idx == 2 else "🥉" if idx == 3 else f"{idx}." lines.append(f"{medal} {nick} | 连接{partner_count}人 | 互动{score:.1f} | 触达{mention_count}") return "\n".join(lines) async def _build_my_trend_text(self, group_id: str, user_id: str, days: int) -> str: """构建“我的趋势”输出文本。""" if not self.db: return "❌ 身价模块未初始化" # 先保障今日快照存在,避免趋势末尾为空导致用户误解“今天没分数”。 today = datetime.now().strftime("%Y-%m-%d") today_row = self.db.get_user_snapshot(today, group_id, user_id) if not today_row: self._recompute_group_snapshot(group_id, today) trend_rows = self.db.get_user_score_trend(group_id, user_id, days) if not trend_rows: return "📊 暂无你的趋势数据,请先在群内产生消息后再试。" cm = ContactManager.get_instance() nick = cm.get_group_name(group_id, user_id) or user_id first_score = float(trend_rows[0].get("score") or 0.0) last_score = float(trend_rows[-1].get("score") or 0.0) delta = round(last_score - first_score, 2) trend_flag = "上涨" if delta > 0 else "下滑" if delta < 0 else "持平" lines = [f"📉 {nick} 的身价趋势(近{days}天)"] for row in trend_rows: stat_date = str(row.get("stat_date") or "") score = float(row.get("score") or 0.0) rank_no = int(row.get("rank_no") or 0) lines.append(f"- {stat_date}: {score:.2f}(第{rank_no}名)") lines.append("") lines.append(f"趋势结论:{trend_flag} {delta:+.2f}") return "\n".join(lines) async def _build_weekly_report_text(self, group_id: str) -> str: """构建“身价周报”文本。 周报口径: 1. 综合排行:取今日 Top5; 2. 上升榜/下降榜:对比“今日分数 - 7天前分数”; 3. 若7天前无数据则按 0 处理,保证新群也可输出。 """ if not self.db: return "❌ 身价模块未初始化" today = datetime.now().strftime("%Y-%m-%d") # 使用 SQL DATE_SUB 也可计算,但这里使用 Python 日期便于阅读与调试。 base_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d") # 确保今天有快照,避免周报命令调用时出现空数据。 today_rank_rows = self.db.get_today_rankings(today, group_id, 50) if not today_rank_rows: self._recompute_group_snapshot(group_id, today) today_rank_rows = self.db.get_today_rankings(today, group_id, 50) if not today_rank_rows: return "📊 本周暂无可用身价数据。" today_score_map = self.db.get_snapshot_score_map(today, group_id) base_score_map = self.db.get_snapshot_score_map(base_date, group_id) # 计算用户分数变化,并做升降序榜单。 delta_rows: List[Tuple[str, float, float, float]] = [] for user_id, today_score in today_score_map.items(): old_score = float(base_score_map.get(user_id, 0.0)) delta = round(float(today_score) - old_score, 2) delta_rows.append((user_id, float(today_score), old_score, delta)) delta_rows_sorted_up = sorted(delta_rows, key=lambda x: x[3], reverse=True) delta_rows_sorted_down = sorted(delta_rows, key=lambda x: x[3]) cm = ContactManager.get_instance() lines: List[str] = [ f"📈 身价周报({today})", f"对比基线:{base_date}", "", "🏆 本周综合排行 Top5", ] for idx, row in enumerate(today_rank_rows[:5], start=1): uid = str(row.get("user_id") or "") nick = cm.get_group_name(group_id, uid) or uid score = float(row.get("score") or 0.0) title = str(row.get("title") or "") lines.append(f"{idx}. {nick} | {score:.2f} | {title}") lines.append("") lines.append("🚀 本周上升榜 Top5") for idx, (uid, today_score, old_score, delta) in enumerate(delta_rows_sorted_up[:5], start=1): nick = cm.get_group_name(group_id, uid) or uid lines.append(f"{idx}. {nick} | {old_score:.2f} -> {today_score:.2f} | {delta:+.2f}") lines.append("") lines.append("🧊 本周波动下行 Top5") for idx, (uid, today_score, old_score, delta) in enumerate(delta_rows_sorted_down[:5], start=1): nick = cm.get_group_name(group_id, uid) or uid lines.append(f"{idx}. {nick} | {old_score:.2f} -> {today_score:.2f} | {delta:+.2f}") lines.append("") lines.append("提示:分数由积分/发言/活跃/社交影响力综合计算。") # 加一段“社交桥梁人物”,让周报除了涨跌之外还能看到连接贡献。 bridge_rows = self.db.get_social_bridge_rankings(group_id, self.social_window_days, 3) if bridge_rows: lines.append("") lines.append("🌉 本周社交桥梁人物") for idx, row in enumerate(bridge_rows, start=1): uid = str(row.get("user_id") or "") nick = cm.get_group_name(group_id, uid) or uid partner_count = int(row.get("partner_count") or 0) lines.append(f"{idx}. {nick} | 连接{partner_count}人") return "\n".join(lines) def _process_pending_mentions_for_group(self, group_id: str) -> Dict[str, int]: """处理单群待抽取@消息(插件内定时业务)。""" if not self.db: return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0} started_at = datetime.now() window_start = max(int(self.mention_window_start_minutes), 1) window_end = max(int(self.mention_window_end_minutes), 0) if window_start <= window_end: window_start = window_end + 10 self.LOG.warning( f"[{self.name}] @窗口参数异常已修正: group={group_id}, " f"window_start={self.mention_window_start_minutes}, " f"window_end={self.mention_window_end_minutes}, fixed=[{window_start},{window_end}]" ) rows = self.db.get_pending_mention_extract_messages_for_group( group_id=group_id, limit=self.mention_batch_size, window_start_minutes=window_start, window_end_minutes=window_end, ) if not rows: return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0} processed, with_mentions, failed = 0, 0, 0 fail_samples: List[str] = [] for idx, row in enumerate(rows, start=1): try: message_id = str(row.get("message_id") or "").strip() sender_id = str(row.get("sender") or "").strip() raw_xml = str(row.get("message_xml") or "") msg_time = self._safe_parse_message_time(row.get("timestamp")) mentioned_ids = self._extract_mentioned_user_ids(raw_xml) mentioned_ids_json = json.dumps(mentioned_ids, ensure_ascii=False) self.db.update_message_mentioned_user_ids( message_id=message_id, group_id=group_id, sender_id=sender_id, mentioned_user_ids_json=mentioned_ids_json, ) self._persist_mention_graph_data( group_id=group_id, sender_id=sender_id, message_id=message_id, mentioned_user_ids=mentioned_ids, msg_time=msg_time, ) processed += 1 if mentioned_ids: with_mentions += 1 if idx <= 2: self.LOG.debug( f"[{self.name}] @抽取样本: group={group_id}, msg={message_id}, " f"sender={sender_id}, mentioned_count={len(mentioned_ids)}" ) except Exception as e: failed += 1 if len(fail_samples) < 5: fail_samples.append(str(row.get("message_id") or "")) self.LOG.error(f"[{self.name}] @抽取失败: group={group_id}, message_id={row.get('message_id')}, error={e}") elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000) stats = {"total": len(rows), "processed": processed, "with_mentions": with_mentions, "failed": failed} self.LOG.info( f"[{self.name}] @批处理完成: group={group_id}, total={stats['total']}, processed={processed}, " f"with_mentions={with_mentions}, failed={failed}, cost={elapsed_ms}ms, fail_samples={fail_samples}" ) return stats @staticmethod def _safe_parse_message_time(value: Any) -> datetime: """安全解析消息时间,失败时回退到当前时间。""" if isinstance(value, datetime): return value text = str(value or "").strip() if not text: return datetime.now() try: return datetime.strptime(text, "%Y-%m-%d %H:%M:%S") except Exception: return datetime.now() @staticmethod def _extract_mentioned_user_ids(raw_xml: str) -> List[str]: """从消息 XML 提取@用户ID清单。""" raw_xml = str(raw_xml or "") if not raw_xml: return [] at_user_list_text = "" try: root = ET.fromstring(raw_xml) node = root.find(".//atuserlist") if node is not None and node.text: at_user_list_text = str(node.text).strip() except Exception: match = re.search(r"", raw_xml, flags=re.IGNORECASE | re.DOTALL) if match: at_user_list_text = str(match.group(1) or "").strip() if not at_user_list_text: return [] raw_ids = re.split(r"[,\s;]+", at_user_list_text) seen = set() result: List[str] = [] for uid in raw_ids: normalized = str(uid or "").strip() if not normalized or normalized in seen: continue seen.add(normalized) result.append(normalized) return result def _persist_mention_graph_data( self, group_id: str, sender_id: str, message_id: str, mentioned_user_ids: List[str], msg_time: datetime, ) -> None: """落盘社交图增量数据(明细 + 边 + 个人日汇总)。""" if not self.db or not group_id or not sender_id or not message_id: return invalid_mentions = {"notify@all", "all", "@all"} clean_ids: List[str] = [] seen = set() for uid in mentioned_user_ids: normalized = str(uid or "").strip() if (not normalized or normalized in invalid_mentions or normalized == sender_id or normalized in seen): continue seen.add(normalized) clean_ids.append(normalized) if not clean_ids: return existing = set(self.db.get_existing_mentions(message_id, group_id, sender_id)) new_ids = [uid for uid in clean_ids if uid not in existing] if not new_ids: return stat_date = msg_time.strftime("%Y-%m-%d") msg_time_text = msg_time.strftime("%Y-%m-%d %H:%M:%S") mention_rows = [(message_id, group_id, sender_id, uid, stat_date, msg_time_text) for uid in new_ids] self.db.insert_message_mentions(mention_rows) edge_rows = [(stat_date, group_id, sender_id, uid, 1, 1.0) for uid in new_ids] self.db.upsert_social_edges_daily(edge_rows) # 发起方:主动@次数 + 互动分 self.db.upsert_social_daily_row((stat_date, group_id, sender_id, 0, len(new_ids), 0, float(len(new_ids)))) # 接收方:被@次数 + 互动分 for uid in new_ids: self.db.upsert_social_daily_row((stat_date, group_id, uid, 1, 0, 0, 1.0)) self.db.refresh_unique_interactors(stat_date, group_id, [sender_id, *new_ids]) def _build_explain_text(self) -> str: """输出算法说明文本。""" return ( "📘 身价算法说明\n" f"- 积分权重:{self.points_weight:.2f}\n" f"- 发言权重:{self.message_weight:.2f}({self.message_window_days}天)\n" f"- 活跃权重:{self.active_days_weight:.2f}({self.active_window_days}天)\n" f"- 社交权重:{self.social_weight:.2f}({self.social_window_days}天)\n" f"- 潜水惩罚上限:{self.inactivity_penalty_max:.0f}\n" "- 所有维度先归一化再加权,极端值按95分位截断,减少刷屏与极值干扰。" ) def _parse_rank_limit(self, content: str) -> int: """解析排行条数参数。""" parts = content.split() if len(parts) < 2: return self.default_rank_limit try: limit = int(parts[1]) except Exception: return self.default_rank_limit limit = max(1, limit) return min(limit, self.max_rank_limit) def _parse_trend_days(self, content: str) -> int: """解析趋势天数参数。 约束策略: 1. 默认展示近7天,避免输出过长; 2. 上限限制到配置值,防止单次命令读取过多历史。 """ parts = content.split() if len(parts) < 2: return self.default_trend_days try: days = int(parts[1]) except Exception: return self.default_trend_days days = max(1, days) return min(days, self.max_trend_days) @staticmethod def _normalize_linear(value: float, ceiling: float) -> float: """线性归一化并截断到 [0, 1]。""" if ceiling <= 0: return 0.0 return max(0.0, min(float(value) / float(ceiling), 1.0)) @staticmethod def _normalize_log(value: float, ceiling: float) -> float: """对高离散指标做对数归一化,避免大户统治。""" if ceiling <= 0: return 0.0 safe_value = max(float(value), 0.0) return max(0.0, min(math.log1p(safe_value) / math.log1p(float(ceiling)), 1.0)) @staticmethod def _percentile95(values: List[float]) -> float: """计算95分位数(最小返回1,避免除零)。""" if not values: return 1.0 sorted_vals = sorted([max(float(v), 0.0) for v in values]) n = len(sorted_vals) idx = max(0, min(n - 1, math.ceil(0.95 * n) - 1)) return max(sorted_vals[idx], 1.0) @staticmethod def _build_title(rank_no: int, total: int, inactive_days: int) -> str: """根据分位区间生成称号。""" if total <= 0: return "未定级" ratio = rank_no / total if ratio <= 0.01: return "群之巨鳄" if ratio <= 0.05: return "社交名流" if ratio <= 0.20: return "活跃中产" if ratio <= 0.80: return "稳定居民" if ratio >= 0.90 and inactive_days >= 30: return "潜水观察员" return "潜力新人"