接通 @ 社交图增量写入链路
- 在消息归档后自动写入 t_message_mentions 明细表(INSERT IGNORE 防重复) - 在消息归档后自动写入 t_social_edges_daily 日边表(ON DUPLICATE KEY 累加) - 同步更新 t_value_rank_social_daily 的被@次数与主动@次数,供 value_rank 直接消费 - 优化 @ 提取函数:返回去重列表,并统一过滤 @所有人 与自己@自己 - 保持主链路稳定:社交图写入失败不影响消息归档成功
This commit is contained in:
@@ -42,7 +42,8 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
# 尽可能保存完整原始负载:优先使用对象自带序列化能力,其次兜底到 __dict__。
|
# 尽可能保存完整原始负载:优先使用对象自带序列化能力,其次兜底到 __dict__。
|
||||||
raw_payload = self._serialize_raw_payload(msg)
|
raw_payload = self._serialize_raw_payload(msg)
|
||||||
# 在入库阶段结构化提取被@清单,避免后续统计每次都回扫原始包。
|
# 在入库阶段结构化提取被@清单,避免后续统计每次都回扫原始包。
|
||||||
mentioned_user_ids_json = self._extract_mentioned_user_ids(msg)
|
mentioned_user_ids = self._extract_mentioned_user_ids(msg)
|
||||||
|
mentioned_user_ids_json = json.dumps(mentioned_user_ids, ensure_ascii=False)
|
||||||
|
|
||||||
sql_with_raw_payload = """
|
sql_with_raw_payload = """
|
||||||
INSERT INTO messages (
|
INSERT INTO messages (
|
||||||
@@ -52,7 +53,16 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
"""
|
"""
|
||||||
params_with_raw_payload = (*base_params[:8], raw_payload, mentioned_user_ids_json, base_params[8])
|
params_with_raw_payload = (*base_params[:8], raw_payload, mentioned_user_ids_json, base_params[8])
|
||||||
if self.execute_update(sql_with_raw_payload, params_with_raw_payload):
|
archived = self.execute_update(sql_with_raw_payload, params_with_raw_payload)
|
||||||
|
if archived:
|
||||||
|
# 归档成功后增量写社交关系数据。该步骤为统计增强逻辑,失败不应影响主流程。
|
||||||
|
self._persist_mention_graph_data(
|
||||||
|
group_id=str(getattr(msg, "roomid", "") or ""),
|
||||||
|
sender_id=str(getattr(msg, "sender", "") or ""),
|
||||||
|
message_id=str(getattr(msg, "msg_id", "") or ""),
|
||||||
|
mentioned_user_ids=mentioned_user_ids,
|
||||||
|
msg_time=datetime.now(),
|
||||||
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# 兼容旧表结构:如果线上还没执行 ALTER TABLE,加列前仍可继续正常归档。
|
# 兼容旧表结构:如果线上还没执行 ALTER TABLE,加列前仍可继续正常归档。
|
||||||
@@ -63,7 +73,17 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
)
|
)
|
||||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
"""
|
"""
|
||||||
return self.execute_update(sql_legacy, base_params)
|
archived = self.execute_update(sql_legacy, base_params)
|
||||||
|
if archived:
|
||||||
|
# 老表结构下(未加 mentioned_user_ids 字段)也继续补写社交图数据,避免统计断层。
|
||||||
|
self._persist_mention_graph_data(
|
||||||
|
group_id=str(getattr(msg, "roomid", "") or ""),
|
||||||
|
sender_id=str(getattr(msg, "sender", "") or ""),
|
||||||
|
message_id=str(getattr(msg, "msg_id", "") or ""),
|
||||||
|
mentioned_user_ids=mentioned_user_ids,
|
||||||
|
msg_time=datetime.now(),
|
||||||
|
)
|
||||||
|
return archived
|
||||||
|
|
||||||
def _serialize_raw_payload(self, msg: WxMessage) -> str:
|
def _serialize_raw_payload(self, msg: WxMessage) -> str:
|
||||||
"""将消息对象尽量完整地序列化为 JSON 字符串。
|
"""将消息对象尽量完整地序列化为 JSON 字符串。
|
||||||
@@ -91,19 +111,19 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
# 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。
|
# 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。
|
||||||
return str(msg)
|
return str(msg)
|
||||||
|
|
||||||
def _extract_mentioned_user_ids(self, msg: WxMessage) -> str:
|
def _extract_mentioned_user_ids(self, msg: WxMessage) -> List[str]:
|
||||||
"""从消息中提取被@用户ID列表,并返回 JSON 数组字符串。
|
"""从消息中提取被@用户ID列表,并返回去重后的列表。
|
||||||
|
|
||||||
解析策略:
|
解析策略:
|
||||||
1. 优先从 `msg.msg_source` 的 XML 里读取 `atuserlist` 节点;
|
1. 优先从 `msg.msg_source` 的 XML 里读取 `atuserlist` 节点;
|
||||||
2. 若 XML 解析失败,则退化为正则提取 `atuserlist` 文本;
|
2. 若 XML 解析失败,则退化为正则提取 `atuserlist` 文本;
|
||||||
3. 去重并过滤空值,保证输出稳定。
|
3. 去重并过滤空值,保证输出稳定。
|
||||||
|
|
||||||
返回值示例:`["wxid_a","wxid_b"]`
|
返回值示例:`["wxid_a", "wxid_b"]`
|
||||||
"""
|
"""
|
||||||
raw_xml = str(getattr(msg, "msg_source", "") or "")
|
raw_xml = str(getattr(msg, "msg_source", "") or "")
|
||||||
if not raw_xml:
|
if not raw_xml:
|
||||||
return "[]"
|
return []
|
||||||
|
|
||||||
at_user_list_text = ""
|
at_user_list_text = ""
|
||||||
try:
|
try:
|
||||||
@@ -118,7 +138,7 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
at_user_list_text = str(match.group(1) or "").strip()
|
at_user_list_text = str(match.group(1) or "").strip()
|
||||||
|
|
||||||
if not at_user_list_text:
|
if not at_user_list_text:
|
||||||
return "[]"
|
return []
|
||||||
|
|
||||||
# 微信 atuserlist 常见分隔符为 ',',但实际环境可能混入 ';' 或空白,这里统一兼容。
|
# 微信 atuserlist 常见分隔符为 ',',但实际环境可能混入 ';' 或空白,这里统一兼容。
|
||||||
raw_ids = re.split(r"[,\s;]+", at_user_list_text)
|
raw_ids = re.split(r"[,\s;]+", at_user_list_text)
|
||||||
@@ -131,7 +151,116 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
seen.add(normalized_uid)
|
seen.add(normalized_uid)
|
||||||
result.append(normalized_uid)
|
result.append(normalized_uid)
|
||||||
|
|
||||||
return json.dumps(result, ensure_ascii=False)
|
return result
|
||||||
|
|
||||||
|
def _persist_mention_graph_data(
|
||||||
|
self,
|
||||||
|
group_id: str,
|
||||||
|
sender_id: str,
|
||||||
|
message_id: str,
|
||||||
|
mentioned_user_ids: List[str],
|
||||||
|
msg_time: datetime,
|
||||||
|
) -> None:
|
||||||
|
"""落盘社交图增量数据(明细 + 边 + 个人日汇总)。
|
||||||
|
|
||||||
|
设计原则:
|
||||||
|
1. 只在群消息中处理(group_id 为空直接忽略);
|
||||||
|
2. 过滤无效 @ 目标(空值、@所有人、自己@自己);
|
||||||
|
3. 统计写入失败不抛异常,不影响主消息归档。
|
||||||
|
"""
|
||||||
|
# 非群消息或缺少关键字段时直接跳过,避免写入脏数据。
|
||||||
|
if not group_id or not sender_id or not message_id or not mentioned_user_ids:
|
||||||
|
return
|
||||||
|
|
||||||
|
# 统一清洗被@列表,避免重复统计。
|
||||||
|
invalid_mentions = {"notify@all", "all", "@all"}
|
||||||
|
clean_mentioned_ids: List[str] = []
|
||||||
|
seen = set()
|
||||||
|
for uid in mentioned_user_ids:
|
||||||
|
normalized_uid = str(uid or "").strip()
|
||||||
|
if (not normalized_uid or normalized_uid in invalid_mentions or
|
||||||
|
normalized_uid == sender_id or normalized_uid in seen):
|
||||||
|
continue
|
||||||
|
seen.add(normalized_uid)
|
||||||
|
clean_mentioned_ids.append(normalized_uid)
|
||||||
|
|
||||||
|
if not clean_mentioned_ids:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
stat_date = msg_time.strftime("%Y-%m-%d")
|
||||||
|
msg_time_str = msg_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
|
# 1) 写 @ 明细表:用于追溯“哪条消息 @ 了谁”。
|
||||||
|
mention_rows = [
|
||||||
|
(message_id, group_id, sender_id, mentioned_uid, stat_date, msg_time_str)
|
||||||
|
for mentioned_uid in clean_mentioned_ids
|
||||||
|
]
|
||||||
|
self.execute_batch(
|
||||||
|
"""
|
||||||
|
INSERT IGNORE INTO t_message_mentions
|
||||||
|
(message_id, group_id, sender_id, mentioned_user_id, stat_date, msg_time)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s)
|
||||||
|
""",
|
||||||
|
mention_rows,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 2) 写社交日边表:一条 @ 关系视为 sender -> mentioned_uid 的一条有向边增量。
|
||||||
|
edge_rows = [
|
||||||
|
(stat_date, group_id, sender_id, mentioned_uid, 1, 1.0)
|
||||||
|
for mentioned_uid in clean_mentioned_ids
|
||||||
|
]
|
||||||
|
self.execute_batch(
|
||||||
|
"""
|
||||||
|
INSERT INTO t_social_edges_daily
|
||||||
|
(stat_date, group_id, from_user_id, to_user_id, mention_count, interaction_score)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s)
|
||||||
|
ON DUPLICATE KEY UPDATE
|
||||||
|
mention_count = mention_count + VALUES(mention_count),
|
||||||
|
interaction_score = interaction_score + VALUES(interaction_score),
|
||||||
|
update_time = CURRENT_TIMESTAMP
|
||||||
|
""",
|
||||||
|
edge_rows,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 3) 写个人日汇总:更新被@次数/主动@次数,供 value_rank 直接读取。
|
||||||
|
sender_social_row = (
|
||||||
|
stat_date, group_id, sender_id,
|
||||||
|
0, len(clean_mentioned_ids), 0,
|
||||||
|
float(len(clean_mentioned_ids)),
|
||||||
|
)
|
||||||
|
self.execute_update(
|
||||||
|
"""
|
||||||
|
INSERT INTO t_value_rank_social_daily
|
||||||
|
(stat_date, group_id, user_id, mentioned_count, mention_others_count, unique_interactors, interaction_score)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
||||||
|
ON DUPLICATE KEY UPDATE
|
||||||
|
mention_others_count = mention_others_count + VALUES(mention_others_count),
|
||||||
|
interaction_score = interaction_score + VALUES(interaction_score),
|
||||||
|
update_time = CURRENT_TIMESTAMP
|
||||||
|
""",
|
||||||
|
sender_social_row,
|
||||||
|
)
|
||||||
|
|
||||||
|
receiver_social_rows = [
|
||||||
|
(stat_date, group_id, mentioned_uid, 1, 0, 0, 1.0)
|
||||||
|
for mentioned_uid in clean_mentioned_ids
|
||||||
|
]
|
||||||
|
self.execute_batch(
|
||||||
|
"""
|
||||||
|
INSERT INTO t_value_rank_social_daily
|
||||||
|
(stat_date, group_id, user_id, mentioned_count, mention_others_count, unique_interactors, interaction_score)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
||||||
|
ON DUPLICATE KEY UPDATE
|
||||||
|
mentioned_count = mentioned_count + VALUES(mentioned_count),
|
||||||
|
interaction_score = interaction_score + VALUES(interaction_score),
|
||||||
|
update_time = CURRENT_TIMESTAMP
|
||||||
|
""",
|
||||||
|
receiver_social_rows,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
# 社交图统计属于增强链路,不能反向影响主消息入库稳定性。
|
||||||
|
self.LOG.error(f"写入社交图增量数据失败: {e}")
|
||||||
|
|
||||||
def get_recent_messages(self, group_id: str, hours_ago: int = 8, min_content_length: int = 6) -> List[Dict]:
|
def get_recent_messages(self, group_id: str, hours_ago: int = 8, min_content_length: int = 6) -> List[Dict]:
|
||||||
"""获取最近的消息"""
|
"""获取最近的消息"""
|
||||||
|
|||||||
Reference in New Issue
Block a user