From 2730595a8867ab16fbc500852a478949f1c02817 Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 21 Apr 2026 13:34:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=20value=5Frank=20=E7=A4=BE?= =?UTF-8?q?=E4=BA=A4=E5=9B=BE=E8=AE=BE=E8=AE=A1=E5=B9=B6=E8=90=BD=E5=9C=B0?= =?UTF-8?q?=20@=20=E7=BB=93=E6=9E=84=E5=8C=96=E5=AD=98=E5=82=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - messages 表新增 mentioned_user_ids 字段设计,使用 JSON 数组字符串存储被@用户清单 - 新增社交图相关表设计:t_message_mentions、t_social_edges_daily、t_value_rank_social_daily - 增加迁移脚本 20260421_add_mentions_and_social_graph_tables.sql,支持现网平滑升级 - 改造 MessageStorageDB 入库流程:解析 msg_source.atuserlist 并写入 mentioned_user_ids - 更新 value_rank README:补充社交图数据链路、可产出图表及实现说明 --- db/message_storage.py | 52 ++++++++++++- db/scripts/init.sql | 44 +++++++++++ ...1_add_mentions_and_social_graph_tables.sql | 50 ++++++++++++ plugins/value_rank/README.md | 77 +++++++++++++++++-- 4 files changed, 214 insertions(+), 9 deletions(-) create mode 100644 db/scripts/migrations/20260421_add_mentions_and_social_graph_tables.sql diff --git a/db/message_storage.py b/db/message_storage.py index 234114c..9db150a 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -2,6 +2,8 @@ from datetime import datetime import json +import re +import xml.etree.ElementTree as ET from typing import Dict, List, Optional from db.base import BaseDBOperator @@ -39,15 +41,17 @@ class MessageStorageDB(BaseDBOperator): # 尽可能保存完整原始负载:优先使用对象自带序列化能力,其次兜底到 __dict__。 raw_payload = self._serialize_raw_payload(msg) + # 在入库阶段结构化提取被@清单,避免后续统计每次都回扫原始包。 + mentioned_user_ids_json = self._extract_mentioned_user_ids(msg) sql_with_raw_payload = """ INSERT INTO messages ( group_id, timestamp, sender, content, message_type, - attachment_url, message_id, message_xml, raw_payload, message_thumb + attachment_url, message_id, message_xml, raw_payload, mentioned_user_ids, message_thumb ) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ - params_with_raw_payload = (*base_params[:8], raw_payload, base_params[8]) + params_with_raw_payload = (*base_params[:8], raw_payload, mentioned_user_ids_json, base_params[8]) if self.execute_update(sql_with_raw_payload, params_with_raw_payload): return True @@ -87,6 +91,48 @@ class MessageStorageDB(BaseDBOperator): # 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。 return str(msg) + def _extract_mentioned_user_ids(self, msg: WxMessage) -> str: + """从消息中提取被@用户ID列表,并返回 JSON 数组字符串。 + + 解析策略: + 1. 优先从 `msg.msg_source` 的 XML 里读取 `atuserlist` 节点; + 2. 若 XML 解析失败,则退化为正则提取 `atuserlist` 文本; + 3. 去重并过滤空值,保证输出稳定。 + + 返回值示例:`["wxid_a","wxid_b"]` + """ + raw_xml = str(getattr(msg, "msg_source", "") or "") + if not raw_xml: + return "[]" + + at_user_list_text = "" + try: + root = ET.fromstring(raw_xml) + node = root.find(".//atuserlist") + if node is not None and node.text: + at_user_list_text = str(node.text).strip() + except Exception: + # 兼容异常格式 XML,采用正则兜底,确保尽量不丢数据。 + match = re.search(r"", raw_xml, flags=re.IGNORECASE | re.DOTALL) + if match: + at_user_list_text = str(match.group(1) or "").strip() + + if not at_user_list_text: + return "[]" + + # 微信 atuserlist 常见分隔符为 ',',但实际环境可能混入 ';' 或空白,这里统一兼容。 + raw_ids = re.split(r"[,\s;]+", at_user_list_text) + seen = set() + result = [] + for uid in raw_ids: + normalized_uid = str(uid or "").strip() + if not normalized_uid or normalized_uid in seen: + continue + seen.add(normalized_uid) + result.append(normalized_uid) + + return json.dumps(result, ensure_ascii=False) + def get_recent_messages(self, group_id: str, hours_ago: int = 8, min_content_length: int = 6) -> List[Dict]: """获取最近的消息""" sql = """ diff --git a/db/scripts/init.sql b/db/scripts/init.sql index dfe804b..b0a8894 100644 --- a/db/scripts/init.sql +++ b/db/scripts/init.sql @@ -40,6 +40,7 @@ create or replace table message_archive.messages message_id varchar(32) null comment '消息 id', message_xml text null comment '消息 xml 部分', raw_payload longtext null comment 'API 原始消息完整负载(完整序列化数据)', + mentioned_user_ids longtext null comment '消息中被@用户ID清单(JSON数组字符串)', message_thumb longtext null comment '视频或图片消息的缩略图路径', image_path varchar(255) null comment '图片URL路径' ) @@ -60,6 +61,49 @@ create or replace index idx_message_type create or replace index messages_message_id_index on message_archive.messages (message_id); +create or replace table message_archive.t_message_mentions +( + id bigint auto_increment + primary key, + message_id varchar(32) not null comment '原始消息ID', + group_id varchar(100) not null comment '群ID', + sender_id varchar(100) not null comment '发送者ID(@发起人)', + mentioned_user_id varchar(100) not null comment '被@用户ID', + stat_date date not null comment '统计日期', + msg_time datetime not null comment '消息时间', + create_time datetime default current_timestamp() null comment '创建时间', + constraint uk_message_sender_mentioned + unique (message_id, sender_id, mentioned_user_id) +) + comment '消息@关系明细表'; + +create or replace index idx_group_date + on message_archive.t_message_mentions (group_id, stat_date); + +create or replace index idx_mentioned_group_date + on message_archive.t_message_mentions (mentioned_user_id, group_id, stat_date); + +create or replace table message_archive.t_social_edges_daily +( + id bigint auto_increment + primary key, + stat_date date not null comment '统计日期', + group_id varchar(100) not null comment '群ID', + from_user_id varchar(100) not null comment '互动发起方', + to_user_id varchar(100) not null comment '互动接收方', + mention_count int default 0 not null comment '@次数', + reply_count int default 0 not null comment '回复次数(预留)', + interaction_score decimal(10, 2) default 0.00 not null comment '互动强度分(可用于关系网权重)', + create_time datetime default current_timestamp() null comment '创建时间', + update_time datetime default current_timestamp() null on update current_timestamp() comment '更新时间', + constraint uk_day_group_edge + unique (stat_date, group_id, from_user_id, to_user_id) +) + comment '社交关系日边表(用于关系网和搭子榜)'; + +create or replace index idx_group_day_score + on message_archive.t_social_edges_daily (group_id, stat_date, interaction_score); + create or replace table message_archive.speech_counts ( id int auto_increment comment '自增主键ID' diff --git a/db/scripts/migrations/20260421_add_mentions_and_social_graph_tables.sql b/db/scripts/migrations/20260421_add_mentions_and_social_graph_tables.sql new file mode 100644 index 0000000..c4faba0 --- /dev/null +++ b/db/scripts/migrations/20260421_add_mentions_and_social_graph_tables.sql @@ -0,0 +1,50 @@ +-- 消息表增加被@清单字段(JSON 数组字符串) +ALTER TABLE message_archive.messages + ADD COLUMN IF NOT EXISTS mentioned_user_ids LONGTEXT NULL COMMENT '消息中被@用户ID清单(JSON数组字符串)' AFTER raw_payload; + +-- 消息@关系明细表 +CREATE TABLE IF NOT EXISTS message_archive.t_message_mentions ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + message_id VARCHAR(32) NOT NULL COMMENT '原始消息ID', + group_id VARCHAR(100) NOT NULL COMMENT '群ID', + sender_id VARCHAR(100) NOT NULL COMMENT '发送者ID(@发起人)', + mentioned_user_id VARCHAR(100) NOT NULL COMMENT '被@用户ID', + stat_date DATE NOT NULL COMMENT '统计日期', + msg_time DATETIME NOT NULL COMMENT '消息时间', + create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + UNIQUE KEY uk_message_sender_mentioned (message_id, sender_id, mentioned_user_id), + KEY idx_group_date (group_id, stat_date), + KEY idx_mentioned_group_date (mentioned_user_id, group_id, stat_date) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息@关系明细表'; + +-- 社交关系日边表(用于关系网和搭子榜) +CREATE TABLE IF NOT EXISTS message_archive.t_social_edges_daily ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + stat_date DATE NOT NULL COMMENT '统计日期', + group_id VARCHAR(100) NOT NULL COMMENT '群ID', + from_user_id VARCHAR(100) NOT NULL COMMENT '互动发起方', + to_user_id VARCHAR(100) NOT NULL COMMENT '互动接收方', + mention_count INT NOT NULL DEFAULT 0 COMMENT '@次数', + reply_count INT NOT NULL DEFAULT 0 COMMENT '回复次数(预留)', + interaction_score DECIMAL(10,2) NOT NULL DEFAULT 0.00 COMMENT '互动强度分(可用于关系网权重)', + create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + UNIQUE KEY uk_day_group_edge (stat_date, group_id, from_user_id, to_user_id), + KEY idx_group_day_score (group_id, stat_date, interaction_score) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='社交关系日边表(用于关系网和搭子榜)'; + +-- Value Rank 社交日汇总表(个人维度) +CREATE TABLE IF NOT EXISTS message_archive.t_value_rank_social_daily ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + stat_date DATE NOT NULL COMMENT '统计日期', + group_id VARCHAR(100) NOT NULL COMMENT '群ID', + user_id VARCHAR(100) NOT NULL COMMENT '用户ID', + mentioned_count INT NOT NULL DEFAULT 0 COMMENT '被@次数(入度)', + mention_others_count INT NOT NULL DEFAULT 0 COMMENT '@他人次数(出度)', + unique_interactors INT NOT NULL DEFAULT 0 COMMENT '与其发生互动的去重人数', + interaction_score DECIMAL(10,2) NOT NULL DEFAULT 0.00 COMMENT '社交影响力分', + create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + UNIQUE KEY uk_day_group_user (stat_date, group_id, user_id), + KEY idx_group_day_score (group_id, stat_date, interaction_score) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Value Rank 社交日汇总表'; diff --git a/plugins/value_rank/README.md b/plugins/value_rank/README.md index 8c3e639..7e28683 100644 --- a/plugins/value_rank/README.md +++ b/plugins/value_rank/README.md @@ -147,7 +147,7 @@ CREATE TABLE IF NOT EXISTS t_value_rank_snapshot ( > 这个表是关键:支持“涨跌解释”“历史回溯”“趋势展示”。 -## 5.2(可选)社交指标表(V2) +## 5.2 社交图数据层(建议直接上) ```sql CREATE TABLE IF NOT EXISTS t_value_rank_social_daily ( @@ -155,13 +155,77 @@ CREATE TABLE IF NOT EXISTS t_value_rank_social_daily ( stat_date DATE NOT NULL, group_id VARCHAR(100) NOT NULL, user_id VARCHAR(100) NOT NULL, - mentioned_count INT NOT NULL DEFAULT 0, - mention_others_count INT NOT NULL DEFAULT 0, + mentioned_count INT NOT NULL DEFAULT 0 COMMENT '被@次数(入度)', + mention_others_count INT NOT NULL DEFAULT 0 COMMENT '@他人次数(出度)', + unique_interactors INT NOT NULL DEFAULT 0 COMMENT '互动去重人数', + interaction_score DECIMAL(10,2) NOT NULL DEFAULT 0 COMMENT '社交影响力分', created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - UNIQUE KEY uniq_day_group_user (stat_date, group_id, user_id) + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY uniq_day_group_user (stat_date, group_id, user_id), + KEY idx_group_day_score (group_id, stat_date, interaction_score) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ``` +## 5.3 消息表结构化字段(提升提取效率) + +在 `messages` 表新增字段: + +- `raw_payload LONGTEXT`:完整原始消息(已支持) +- `mentioned_user_ids LONGTEXT`:该消息里被 @ 的用户 ID 清单(JSON 数组字符串) + +示例值: + +```json +["wxid_abc", "wxid_xyz"] +``` + +> 设计目的:避免每次统计都扫 `raw_payload`,在入库阶段就把最常用的社交特征结构化。 + +## 5.4 社交关系明细与边表(用于关系网图) + +```sql +CREATE TABLE IF NOT EXISTS t_message_mentions ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + message_id VARCHAR(32) NOT NULL, + group_id VARCHAR(100) NOT NULL, + sender_id VARCHAR(100) NOT NULL, + mentioned_user_id VARCHAR(100) NOT NULL, + stat_date DATE NOT NULL, + msg_time DATETIME NOT NULL, + create_time DATETIME DEFAULT CURRENT_TIMESTAMP, + UNIQUE KEY uk_message_sender_mentioned (message_id, sender_id, mentioned_user_id), + KEY idx_group_date (group_id, stat_date), + KEY idx_mentioned_group_date (mentioned_user_id, group_id, stat_date) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +CREATE TABLE IF NOT EXISTS t_social_edges_daily ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + stat_date DATE NOT NULL, + group_id VARCHAR(100) NOT NULL, + from_user_id VARCHAR(100) NOT NULL, + to_user_id VARCHAR(100) NOT NULL, + mention_count INT NOT NULL DEFAULT 0, + reply_count INT NOT NULL DEFAULT 0, + interaction_score DECIMAL(10,2) NOT NULL DEFAULT 0, + create_time DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY uk_day_group_edge (stat_date, group_id, from_user_id, to_user_id), + KEY idx_group_day_score (group_id, stat_date, interaction_score) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +``` + +--- + +## 5.5 社交图可产出图表(周报/日报) + +1. 群社交关系网图(节点=成员,边=互动强度) +2. 被@热度榜(Top10) +3. 最强搭子榜(双向互动最强的成员对) +4. 社交桥梁榜(连接不同圈层的关键成员) +5. 个人影响力趋势图(7天/30天) + +> 这些图表都基于 `t_social_edges_daily` + `t_value_rank_social_daily` 即可生成,不需要回扫全量原始消息。 + --- ## 6. 插件交互设计(命令与输出) @@ -262,8 +326,9 @@ plugins/value_rank/ 当你准备上线“被@”指标时,建议: -1. 在消息入库或消息处理总入口解析 `@` 数据 -2. 日聚合写入 `t_value_rank_social_daily` +1. 在消息入库时同步解析 `@`,并直接写 `messages.mentioned_user_ids` +2. 同步写 `t_message_mentions` 明细,方便追溯和反查 +3. 日聚合写入 `t_social_edges_daily` 与 `t_value_rank_social_daily` 3. 新增权重项: ```text