diff --git a/db/message_storage.py b/db/message_storage.py index 9db150a..4cd990e 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -42,7 +42,8 @@ class MessageStorageDB(BaseDBOperator): # 尽可能保存完整原始负载:优先使用对象自带序列化能力,其次兜底到 __dict__。 raw_payload = self._serialize_raw_payload(msg) # 在入库阶段结构化提取被@清单,避免后续统计每次都回扫原始包。 - mentioned_user_ids_json = self._extract_mentioned_user_ids(msg) + mentioned_user_ids = self._extract_mentioned_user_ids(msg) + mentioned_user_ids_json = json.dumps(mentioned_user_ids, ensure_ascii=False) sql_with_raw_payload = """ INSERT INTO messages ( @@ -52,7 +53,16 @@ class MessageStorageDB(BaseDBOperator): VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ params_with_raw_payload = (*base_params[:8], raw_payload, mentioned_user_ids_json, base_params[8]) - if self.execute_update(sql_with_raw_payload, params_with_raw_payload): + archived = self.execute_update(sql_with_raw_payload, params_with_raw_payload) + if archived: + # 归档成功后增量写社交关系数据。该步骤为统计增强逻辑,失败不应影响主流程。 + self._persist_mention_graph_data( + group_id=str(getattr(msg, "roomid", "") or ""), + sender_id=str(getattr(msg, "sender", "") or ""), + message_id=str(getattr(msg, "msg_id", "") or ""), + mentioned_user_ids=mentioned_user_ids, + msg_time=datetime.now(), + ) return True # 兼容旧表结构:如果线上还没执行 ALTER TABLE,加列前仍可继续正常归档。 @@ -63,7 +73,17 @@ class MessageStorageDB(BaseDBOperator): ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """ - return self.execute_update(sql_legacy, base_params) + archived = self.execute_update(sql_legacy, base_params) + if archived: + # 老表结构下(未加 mentioned_user_ids 字段)也继续补写社交图数据,避免统计断层。 + self._persist_mention_graph_data( + group_id=str(getattr(msg, "roomid", "") or ""), + sender_id=str(getattr(msg, "sender", "") or ""), + message_id=str(getattr(msg, "msg_id", "") or ""), + mentioned_user_ids=mentioned_user_ids, + msg_time=datetime.now(), + ) + return archived def _serialize_raw_payload(self, msg: WxMessage) -> str: """将消息对象尽量完整地序列化为 JSON 字符串。 @@ -91,19 +111,19 @@ class MessageStorageDB(BaseDBOperator): # 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。 return str(msg) - def _extract_mentioned_user_ids(self, msg: WxMessage) -> str: - """从消息中提取被@用户ID列表,并返回 JSON 数组字符串。 + def _extract_mentioned_user_ids(self, msg: WxMessage) -> List[str]: + """从消息中提取被@用户ID列表,并返回去重后的列表。 解析策略: 1. 优先从 `msg.msg_source` 的 XML 里读取 `atuserlist` 节点; 2. 若 XML 解析失败,则退化为正则提取 `atuserlist` 文本; 3. 去重并过滤空值,保证输出稳定。 - 返回值示例:`["wxid_a","wxid_b"]` + 返回值示例:`["wxid_a", "wxid_b"]` """ raw_xml = str(getattr(msg, "msg_source", "") or "") if not raw_xml: - return "[]" + return [] at_user_list_text = "" try: @@ -118,7 +138,7 @@ class MessageStorageDB(BaseDBOperator): at_user_list_text = str(match.group(1) or "").strip() if not at_user_list_text: - return "[]" + return [] # 微信 atuserlist 常见分隔符为 ',',但实际环境可能混入 ';' 或空白,这里统一兼容。 raw_ids = re.split(r"[,\s;]+", at_user_list_text) @@ -131,7 +151,116 @@ class MessageStorageDB(BaseDBOperator): seen.add(normalized_uid) result.append(normalized_uid) - return json.dumps(result, ensure_ascii=False) + return result + + def _persist_mention_graph_data( + self, + group_id: str, + sender_id: str, + message_id: str, + mentioned_user_ids: List[str], + msg_time: datetime, + ) -> None: + """落盘社交图增量数据(明细 + 边 + 个人日汇总)。 + + 设计原则: + 1. 只在群消息中处理(group_id 为空直接忽略); + 2. 过滤无效 @ 目标(空值、@所有人、自己@自己); + 3. 统计写入失败不抛异常,不影响主消息归档。 + """ + # 非群消息或缺少关键字段时直接跳过,避免写入脏数据。 + if not group_id or not sender_id or not message_id or not mentioned_user_ids: + return + + # 统一清洗被@列表,避免重复统计。 + invalid_mentions = {"notify@all", "all", "@all"} + clean_mentioned_ids: List[str] = [] + seen = set() + for uid in mentioned_user_ids: + normalized_uid = str(uid or "").strip() + if (not normalized_uid or normalized_uid in invalid_mentions or + normalized_uid == sender_id or normalized_uid in seen): + continue + seen.add(normalized_uid) + clean_mentioned_ids.append(normalized_uid) + + if not clean_mentioned_ids: + return + + try: + stat_date = msg_time.strftime("%Y-%m-%d") + msg_time_str = msg_time.strftime("%Y-%m-%d %H:%M:%S") + + # 1) 写 @ 明细表:用于追溯“哪条消息 @ 了谁”。 + mention_rows = [ + (message_id, group_id, sender_id, mentioned_uid, stat_date, msg_time_str) + for mentioned_uid in clean_mentioned_ids + ] + self.execute_batch( + """ + INSERT IGNORE INTO t_message_mentions + (message_id, group_id, sender_id, mentioned_user_id, stat_date, msg_time) + VALUES (%s, %s, %s, %s, %s, %s) + """, + mention_rows, + ) + + # 2) 写社交日边表:一条 @ 关系视为 sender -> mentioned_uid 的一条有向边增量。 + edge_rows = [ + (stat_date, group_id, sender_id, mentioned_uid, 1, 1.0) + for mentioned_uid in clean_mentioned_ids + ] + self.execute_batch( + """ + INSERT INTO t_social_edges_daily + (stat_date, group_id, from_user_id, to_user_id, mention_count, interaction_score) + VALUES (%s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + mention_count = mention_count + VALUES(mention_count), + interaction_score = interaction_score + VALUES(interaction_score), + update_time = CURRENT_TIMESTAMP + """, + edge_rows, + ) + + # 3) 写个人日汇总:更新被@次数/主动@次数,供 value_rank 直接读取。 + sender_social_row = ( + stat_date, group_id, sender_id, + 0, len(clean_mentioned_ids), 0, + float(len(clean_mentioned_ids)), + ) + self.execute_update( + """ + INSERT INTO t_value_rank_social_daily + (stat_date, group_id, user_id, mentioned_count, mention_others_count, unique_interactors, interaction_score) + VALUES (%s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + mention_others_count = mention_others_count + VALUES(mention_others_count), + interaction_score = interaction_score + VALUES(interaction_score), + update_time = CURRENT_TIMESTAMP + """, + sender_social_row, + ) + + receiver_social_rows = [ + (stat_date, group_id, mentioned_uid, 1, 0, 0, 1.0) + for mentioned_uid in clean_mentioned_ids + ] + self.execute_batch( + """ + INSERT INTO t_value_rank_social_daily + (stat_date, group_id, user_id, mentioned_count, mention_others_count, unique_interactors, interaction_score) + VALUES (%s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + mentioned_count = mentioned_count + VALUES(mentioned_count), + interaction_score = interaction_score + VALUES(interaction_score), + update_time = CURRENT_TIMESTAMP + """, + receiver_social_rows, + ) + except Exception as e: + # 社交图统计属于增强链路,不能反向影响主消息入库稳定性。 + self.LOG.error(f"写入社交图增量数据失败: {e}") def get_recent_messages(self, group_id: str, hours_ago: int = 8, min_content_length: int = 6) -> List[Dict]: """获取最近的消息"""