完善 value_rank 社交图设计并落地 @ 结构化存储

- messages 表新增 mentioned_user_ids 字段设计,使用 JSON 数组字符串存储被@用户清单

- 新增社交图相关表设计:t_message_mentions、t_social_edges_daily、t_value_rank_social_daily

- 增加迁移脚本 20260421_add_mentions_and_social_graph_tables.sql,支持现网平滑升级

- 改造 MessageStorageDB 入库流程:解析 msg_source.atuserlist 并写入 mentioned_user_ids

- 更新 value_rank README:补充社交图数据链路、可产出图表及实现说明
This commit is contained in:
liuwei
2026-04-21 13:34:19 +08:00
parent dfa17c5f95
commit 2730595a88
4 changed files with 214 additions and 9 deletions

View File

@@ -2,6 +2,8 @@
from datetime import datetime from datetime import datetime
import json import json
import re
import xml.etree.ElementTree as ET
from typing import Dict, List, Optional from typing import Dict, List, Optional
from db.base import BaseDBOperator from db.base import BaseDBOperator
@@ -39,15 +41,17 @@ class MessageStorageDB(BaseDBOperator):
# 尽可能保存完整原始负载:优先使用对象自带序列化能力,其次兜底到 __dict__。 # 尽可能保存完整原始负载:优先使用对象自带序列化能力,其次兜底到 __dict__。
raw_payload = self._serialize_raw_payload(msg) raw_payload = self._serialize_raw_payload(msg)
# 在入库阶段结构化提取被@清单,避免后续统计每次都回扫原始包。
mentioned_user_ids_json = self._extract_mentioned_user_ids(msg)
sql_with_raw_payload = """ sql_with_raw_payload = """
INSERT INTO messages ( INSERT INTO messages (
group_id, timestamp, sender, content, message_type, group_id, timestamp, sender, content, message_type,
attachment_url, message_id, message_xml, raw_payload, message_thumb attachment_url, message_id, message_xml, raw_payload, mentioned_user_ids, message_thumb
) )
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""" """
params_with_raw_payload = (*base_params[:8], raw_payload, base_params[8]) params_with_raw_payload = (*base_params[:8], raw_payload, mentioned_user_ids_json, base_params[8])
if self.execute_update(sql_with_raw_payload, params_with_raw_payload): if self.execute_update(sql_with_raw_payload, params_with_raw_payload):
return True return True
@@ -87,6 +91,48 @@ class MessageStorageDB(BaseDBOperator):
# 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。 # 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。
return str(msg) return str(msg)
def _extract_mentioned_user_ids(self, msg: WxMessage) -> str:
"""从消息中提取被@用户ID列表并返回 JSON 数组字符串。
解析策略:
1. 优先从 `msg.msg_source` 的 XML 里读取 `atuserlist` 节点;
2. 若 XML 解析失败,则退化为正则提取 `atuserlist` 文本;
3. 去重并过滤空值,保证输出稳定。
返回值示例:`["wxid_a","wxid_b"]`
"""
raw_xml = str(getattr(msg, "msg_source", "") or "")
if not raw_xml:
return "[]"
at_user_list_text = ""
try:
root = ET.fromstring(raw_xml)
node = root.find(".//atuserlist")
if node is not None and node.text:
at_user_list_text = str(node.text).strip()
except Exception:
# 兼容异常格式 XML采用正则兜底确保尽量不丢数据。
match = re.search(r"<atuserlist><!\[CDATA\[(.*?)\]\]></atuserlist>", raw_xml, flags=re.IGNORECASE | re.DOTALL)
if match:
at_user_list_text = str(match.group(1) or "").strip()
if not at_user_list_text:
return "[]"
# 微信 atuserlist 常见分隔符为 ',',但实际环境可能混入 ';' 或空白,这里统一兼容。
raw_ids = re.split(r"[,\s;]+", at_user_list_text)
seen = set()
result = []
for uid in raw_ids:
normalized_uid = str(uid or "").strip()
if not normalized_uid or normalized_uid in seen:
continue
seen.add(normalized_uid)
result.append(normalized_uid)
return json.dumps(result, ensure_ascii=False)
def get_recent_messages(self, group_id: str, hours_ago: int = 8, min_content_length: int = 6) -> List[Dict]: def get_recent_messages(self, group_id: str, hours_ago: int = 8, min_content_length: int = 6) -> List[Dict]:
"""获取最近的消息""" """获取最近的消息"""
sql = """ sql = """

View File

@@ -40,6 +40,7 @@ create or replace table message_archive.messages
message_id varchar(32) null comment '消息 id', message_id varchar(32) null comment '消息 id',
message_xml text null comment '消息 xml 部分', message_xml text null comment '消息 xml 部分',
raw_payload longtext null comment 'API 原始消息完整负载(完整序列化数据)', raw_payload longtext null comment 'API 原始消息完整负载(完整序列化数据)',
mentioned_user_ids longtext null comment '消息中被@用户ID清单JSON数组字符串',
message_thumb longtext null comment '视频或图片消息的缩略图路径', message_thumb longtext null comment '视频或图片消息的缩略图路径',
image_path varchar(255) null comment '图片URL路径' image_path varchar(255) null comment '图片URL路径'
) )
@@ -60,6 +61,49 @@ create or replace index idx_message_type
create or replace index messages_message_id_index create or replace index messages_message_id_index
on message_archive.messages (message_id); on message_archive.messages (message_id);
create or replace table message_archive.t_message_mentions
(
id bigint auto_increment
primary key,
message_id varchar(32) not null comment '原始消息ID',
group_id varchar(100) not null comment '群ID',
sender_id varchar(100) not null comment '发送者ID@发起人)',
mentioned_user_id varchar(100) not null comment '被@用户ID',
stat_date date not null comment '统计日期',
msg_time datetime not null comment '消息时间',
create_time datetime default current_timestamp() null comment '创建时间',
constraint uk_message_sender_mentioned
unique (message_id, sender_id, mentioned_user_id)
)
comment '消息@关系明细表';
create or replace index idx_group_date
on message_archive.t_message_mentions (group_id, stat_date);
create or replace index idx_mentioned_group_date
on message_archive.t_message_mentions (mentioned_user_id, group_id, stat_date);
create or replace table message_archive.t_social_edges_daily
(
id bigint auto_increment
primary key,
stat_date date not null comment '统计日期',
group_id varchar(100) not null comment '群ID',
from_user_id varchar(100) not null comment '互动发起方',
to_user_id varchar(100) not null comment '互动接收方',
mention_count int default 0 not null comment '@次数',
reply_count int default 0 not null comment '回复次数(预留)',
interaction_score decimal(10, 2) default 0.00 not null comment '互动强度分(可用于关系网权重)',
create_time datetime default current_timestamp() null comment '创建时间',
update_time datetime default current_timestamp() null on update current_timestamp() comment '更新时间',
constraint uk_day_group_edge
unique (stat_date, group_id, from_user_id, to_user_id)
)
comment '社交关系日边表(用于关系网和搭子榜)';
create or replace index idx_group_day_score
on message_archive.t_social_edges_daily (group_id, stat_date, interaction_score);
create or replace table message_archive.speech_counts create or replace table message_archive.speech_counts
( (
id int auto_increment comment '自增主键ID' id int auto_increment comment '自增主键ID'

View File

@@ -0,0 +1,50 @@
-- 消息表增加被@清单字段JSON 数组字符串)
ALTER TABLE message_archive.messages
ADD COLUMN IF NOT EXISTS mentioned_user_ids LONGTEXT NULL COMMENT '消息中被@用户ID清单JSON数组字符串' AFTER raw_payload;
-- 消息@关系明细表
CREATE TABLE IF NOT EXISTS message_archive.t_message_mentions (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
message_id VARCHAR(32) NOT NULL COMMENT '原始消息ID',
group_id VARCHAR(100) NOT NULL COMMENT '群ID',
sender_id VARCHAR(100) NOT NULL COMMENT '发送者ID@发起人)',
mentioned_user_id VARCHAR(100) NOT NULL COMMENT '被@用户ID',
stat_date DATE NOT NULL COMMENT '统计日期',
msg_time DATETIME NOT NULL COMMENT '消息时间',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
UNIQUE KEY uk_message_sender_mentioned (message_id, sender_id, mentioned_user_id),
KEY idx_group_date (group_id, stat_date),
KEY idx_mentioned_group_date (mentioned_user_id, group_id, stat_date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息@关系明细表';
-- 社交关系日边表(用于关系网和搭子榜)
CREATE TABLE IF NOT EXISTS message_archive.t_social_edges_daily (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
stat_date DATE NOT NULL COMMENT '统计日期',
group_id VARCHAR(100) NOT NULL COMMENT '群ID',
from_user_id VARCHAR(100) NOT NULL COMMENT '互动发起方',
to_user_id VARCHAR(100) NOT NULL COMMENT '互动接收方',
mention_count INT NOT NULL DEFAULT 0 COMMENT '@次数',
reply_count INT NOT NULL DEFAULT 0 COMMENT '回复次数(预留)',
interaction_score DECIMAL(10,2) NOT NULL DEFAULT 0.00 COMMENT '互动强度分(可用于关系网权重)',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY uk_day_group_edge (stat_date, group_id, from_user_id, to_user_id),
KEY idx_group_day_score (group_id, stat_date, interaction_score)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='社交关系日边表(用于关系网和搭子榜)';
-- Value Rank 社交日汇总表(个人维度)
CREATE TABLE IF NOT EXISTS message_archive.t_value_rank_social_daily (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
stat_date DATE NOT NULL COMMENT '统计日期',
group_id VARCHAR(100) NOT NULL COMMENT '群ID',
user_id VARCHAR(100) NOT NULL COMMENT '用户ID',
mentioned_count INT NOT NULL DEFAULT 0 COMMENT '被@次数(入度)',
mention_others_count INT NOT NULL DEFAULT 0 COMMENT '@他人次数(出度)',
unique_interactors INT NOT NULL DEFAULT 0 COMMENT '与其发生互动的去重人数',
interaction_score DECIMAL(10,2) NOT NULL DEFAULT 0.00 COMMENT '社交影响力分',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY uk_day_group_user (stat_date, group_id, user_id),
KEY idx_group_day_score (group_id, stat_date, interaction_score)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Value Rank 社交日汇总表';

View File

@@ -147,7 +147,7 @@ CREATE TABLE IF NOT EXISTS t_value_rank_snapshot (
> 这个表是关键:支持“涨跌解释”“历史回溯”“趋势展示”。 > 这个表是关键:支持“涨跌解释”“历史回溯”“趋势展示”。
## 5.2可选社交指标表V2 ## 5.2 社交图数据层(建议直接上
```sql ```sql
CREATE TABLE IF NOT EXISTS t_value_rank_social_daily ( CREATE TABLE IF NOT EXISTS t_value_rank_social_daily (
@@ -155,13 +155,77 @@ CREATE TABLE IF NOT EXISTS t_value_rank_social_daily (
stat_date DATE NOT NULL, stat_date DATE NOT NULL,
group_id VARCHAR(100) NOT NULL, group_id VARCHAR(100) NOT NULL,
user_id VARCHAR(100) NOT NULL, user_id VARCHAR(100) NOT NULL,
mentioned_count INT NOT NULL DEFAULT 0, mentioned_count INT NOT NULL DEFAULT 0 COMMENT '被@次数(入度)',
mention_others_count INT NOT NULL DEFAULT 0, mention_others_count INT NOT NULL DEFAULT 0 COMMENT '@他人次数(出度)',
unique_interactors INT NOT NULL DEFAULT 0 COMMENT '互动去重人数',
interaction_score DECIMAL(10,2) NOT NULL DEFAULT 0 COMMENT '社交影响力分',
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uniq_day_group_user (stat_date, group_id, user_id) updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uniq_day_group_user (stat_date, group_id, user_id),
KEY idx_group_day_score (group_id, stat_date, interaction_score)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
``` ```
## 5.3 消息表结构化字段(提升提取效率)
`messages` 表新增字段:
- `raw_payload LONGTEXT`:完整原始消息(已支持)
- `mentioned_user_ids LONGTEXT`:该消息里被 @ 的用户 ID 清单JSON 数组字符串)
示例值:
```json
["wxid_abc", "wxid_xyz"]
```
> 设计目的:避免每次统计都扫 `raw_payload`,在入库阶段就把最常用的社交特征结构化。
## 5.4 社交关系明细与边表(用于关系网图)
```sql
CREATE TABLE IF NOT EXISTS t_message_mentions (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
message_id VARCHAR(32) NOT NULL,
group_id VARCHAR(100) NOT NULL,
sender_id VARCHAR(100) NOT NULL,
mentioned_user_id VARCHAR(100) NOT NULL,
stat_date DATE NOT NULL,
msg_time DATETIME NOT NULL,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_message_sender_mentioned (message_id, sender_id, mentioned_user_id),
KEY idx_group_date (group_id, stat_date),
KEY idx_mentioned_group_date (mentioned_user_id, group_id, stat_date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE IF NOT EXISTS t_social_edges_daily (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
stat_date DATE NOT NULL,
group_id VARCHAR(100) NOT NULL,
from_user_id VARCHAR(100) NOT NULL,
to_user_id VARCHAR(100) NOT NULL,
mention_count INT NOT NULL DEFAULT 0,
reply_count INT NOT NULL DEFAULT 0,
interaction_score DECIMAL(10,2) NOT NULL DEFAULT 0,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_day_group_edge (stat_date, group_id, from_user_id, to_user_id),
KEY idx_group_day_score (group_id, stat_date, interaction_score)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
```
---
## 5.5 社交图可产出图表(周报/日报)
1. 群社交关系网图(节点=成员,边=互动强度)
2.@热度榜Top10
3. 最强搭子榜(双向互动最强的成员对)
4. 社交桥梁榜(连接不同圈层的关键成员)
5. 个人影响力趋势图7天/30天
> 这些图表都基于 `t_social_edges_daily` + `t_value_rank_social_daily` 即可生成,不需要回扫全量原始消息。
--- ---
## 6. 插件交互设计(命令与输出) ## 6. 插件交互设计(命令与输出)
@@ -262,8 +326,9 @@ plugins/value_rank/
当你准备上线“被@”指标时,建议: 当你准备上线“被@”指标时,建议:
1. 在消息入库或消息处理总入口解析 `@` 数据 1. 在消息入库时同步解析 `@`,并直接写 `messages.mentioned_user_ids`
2. 日聚合写入 `t_value_rank_social_daily` 2. 同步写 `t_message_mentions` 明细,方便追溯和反查
3. 日聚合写入 `t_social_edges_daily``t_value_rank_social_daily`
3. 新增权重项: 3. 新增权重项:
```text ```text