From 906f0905df4e9cb15ef92fbb3d6e727dd0f53d36 Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 21 Apr 2026 13:51:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86@=E6=8A=BD=E5=8F=96=E4=B8=8E=E7=A4=BE?= =?UTF-8?q?=E4=BA=A4=E5=9B=BE=E5=86=99=E5=85=A5=E6=94=B9=E4=B8=BA=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E6=89=B9=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 精简 archive_message 主链路:仅做消息归档,不再同步执行@解析与社交统计 - 新增 MessageStorageDB.process_pending_mentions 批处理能力,按批次回填 mentioned_user_ids 并写入社交图 - 新增系统任务 process_pending_mentions,每10分钟执行一次(every_seconds=600) - 增加幂等保护:基于 t_message_mentions 已有记录过滤新增@关系,避免重复累加社交边和热度 - 保留详细中文注释,说明性能优化目标与批处理策略 --- db/message_storage.py | 172 +++++++++++++++++++++++++++------- utils/system_jobs.py | 16 ++++ utils/wechat/message_to_db.py | 28 ++++++ 3 files changed, 184 insertions(+), 32 deletions(-) diff --git a/db/message_storage.py b/db/message_storage.py index bf3c12b..7d173f7 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -41,9 +41,6 @@ class MessageStorageDB(BaseDBOperator): # 尽可能保存完整原始负载:优先使用对象自带序列化能力,其次兜底到 __dict__。 raw_payload = self._serialize_raw_payload(msg) - # 在入库阶段结构化提取被@清单,避免后续统计每次都回扫原始包。 - mentioned_user_ids = self._extract_mentioned_user_ids(msg) - mentioned_user_ids_json = json.dumps(mentioned_user_ids, ensure_ascii=False) sql_with_raw_payload = """ INSERT INTO messages ( @@ -52,17 +49,10 @@ class MessageStorageDB(BaseDBOperator): ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ - params_with_raw_payload = (*base_params[:8], raw_payload, mentioned_user_ids_json, base_params[8]) + # 为了降低主链路延迟,这里不做@解析,mentioned_user_ids 先置空,后续由定时任务批处理回填。 + params_with_raw_payload = (*base_params[:8], raw_payload, None, base_params[8]) archived = self.execute_update(sql_with_raw_payload, params_with_raw_payload) if archived: - # 归档成功后增量写社交关系数据。该步骤为统计增强逻辑,失败不应影响主流程。 - self._persist_mention_graph_data( - group_id=str(getattr(msg, "roomid", "") or ""), - sender_id=str(getattr(msg, "sender", "") or ""), - message_id=str(getattr(msg, "msg_id", "") or ""), - mentioned_user_ids=mentioned_user_ids, - msg_time=datetime.now(), - ) return True # 兼容旧表结构:如果线上还没执行 ALTER TABLE,加列前仍可继续正常归档。 @@ -73,17 +63,7 @@ class MessageStorageDB(BaseDBOperator): ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """ - archived = self.execute_update(sql_legacy, base_params) - if archived: - # 老表结构下(未加 mentioned_user_ids 字段)也继续补写社交图数据,避免统计断层。 - self._persist_mention_graph_data( - group_id=str(getattr(msg, "roomid", "") or ""), - sender_id=str(getattr(msg, "sender", "") or ""), - message_id=str(getattr(msg, "msg_id", "") or ""), - mentioned_user_ids=mentioned_user_ids, - msg_time=datetime.now(), - ) - return archived + return self.execute_update(sql_legacy, base_params) def _serialize_raw_payload(self, msg: WxMessage) -> str: """将消息对象尽量完整地序列化为 JSON 字符串。 @@ -111,8 +91,8 @@ class MessageStorageDB(BaseDBOperator): # 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。 return str(msg) - def _extract_mentioned_user_ids(self, msg: WxMessage) -> List[str]: - """从消息中提取被@用户ID列表,并返回去重后的列表。 + def _extract_mentioned_user_ids(self, raw_xml: str) -> List[str]: + """从消息 XML 中提取被@用户ID列表,并返回去重后的列表。 解析策略: 1. 优先从 `msg.msg_source` 的 XML 里读取 `atuserlist` 节点; @@ -121,7 +101,7 @@ class MessageStorageDB(BaseDBOperator): 返回值示例:`["wxid_a", "wxid_b"]` """ - raw_xml = str(getattr(msg, "msg_source", "") or "") + raw_xml = str(raw_xml or "") if not raw_xml: return [] @@ -153,6 +133,118 @@ class MessageStorageDB(BaseDBOperator): return result + def get_pending_mention_extract_messages(self, limit: int = 200, max_age_days: int = 7) -> List[Dict]: + """获取待处理 @ 抽取的消息批次。 + + 筛选规则: + 1. 群消息; + 2. mentioned_user_ids 为空(表示还未处理); + 3. message_xml 非空; + 4. 时间在 max_age_days 窗口内,避免扫描无限历史。 + """ + sql = """ + SELECT message_id, group_id, sender, message_xml, timestamp + FROM messages + WHERE group_id IS NOT NULL + AND group_id <> '' + AND (mentioned_user_ids IS NULL OR mentioned_user_ids = '') + AND message_xml IS NOT NULL + AND message_xml <> '' + AND timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) + ORDER BY timestamp ASC + LIMIT %s + """ + return self.execute_query(sql, (max_age_days, limit)) or [] + + def process_pending_mentions(self, batch_size: int = 200, max_age_days: int = 7) -> Dict[str, int]: + """批量处理待抽取 @ 的消息,并同步社交图数据。 + + 返回统计: + - total: 本批读取条数 + - processed: 成功处理条数 + - with_mentions: 提取到 @ 的消息条数 + - failed: 失败条数 + """ + rows = self.get_pending_mention_extract_messages(limit=batch_size, max_age_days=max_age_days) + if not rows: + return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0} + + processed = 0 + with_mentions = 0 + failed = 0 + + for row in rows: + try: + message_id = str(row.get("message_id") or "").strip() + group_id = str(row.get("group_id") or "").strip() + sender_id = str(row.get("sender") or "").strip() + raw_xml = str(row.get("message_xml") or "") + ts_raw = row.get("timestamp") + msg_time = self._safe_parse_message_time(ts_raw) + + mentioned_user_ids = self._extract_mentioned_user_ids(raw_xml) + mentioned_user_ids_json = json.dumps(mentioned_user_ids, ensure_ascii=False) + + self._update_message_mentioned_user_ids( + message_id=message_id, + group_id=group_id, + sender_id=sender_id, + mentioned_user_ids_json=mentioned_user_ids_json, + ) + self._persist_mention_graph_data( + group_id=group_id, + sender_id=sender_id, + message_id=message_id, + mentioned_user_ids=mentioned_user_ids, + msg_time=msg_time, + ) + + processed += 1 + if mentioned_user_ids: + with_mentions += 1 + except Exception as e: + failed += 1 + self.LOG.error(f"处理待抽取@消息失败: message_id={row.get('message_id')}, error={e}") + + return { + "total": len(rows), + "processed": processed, + "with_mentions": with_mentions, + "failed": failed, + } + + @staticmethod + def _safe_parse_message_time(value) -> datetime: + """安全解析消息时间,失败时回退到当前时间。""" + if isinstance(value, datetime): + return value + text = str(value or "").strip() + if not text: + return datetime.now() + try: + return datetime.strptime(text, "%Y-%m-%d %H:%M:%S") + except Exception: + return datetime.now() + + def _update_message_mentioned_user_ids( + self, + message_id: str, + group_id: str, + sender_id: str, + mentioned_user_ids_json: str, + ) -> None: + """回填消息表的 mentioned_user_ids 字段。""" + self.execute_update( + """ + UPDATE messages + SET mentioned_user_ids = %s + WHERE message_id = %s + AND group_id = %s + AND sender = %s + """, + (mentioned_user_ids_json, message_id, group_id, sender_id), + ) + def _persist_mention_graph_data( self, group_id: str, @@ -191,10 +283,26 @@ class MessageStorageDB(BaseDBOperator): stat_date = msg_time.strftime("%Y-%m-%d") msg_time_str = msg_time.strftime("%Y-%m-%d %H:%M:%S") + # 幂等控制:只处理“该消息中尚未写入明细表”的新增 @ 目标。 + existed_rows = self.execute_query( + """ + SELECT mentioned_user_id + FROM t_message_mentions + WHERE message_id = %s + AND group_id = %s + AND sender_id = %s + """, + (message_id, group_id, sender_id), + ) or [] + existed_ids = {str(r.get("mentioned_user_id") or "").strip() for r in existed_rows} + newly_mentioned_ids = [uid for uid in clean_mentioned_ids if uid not in existed_ids] + if not newly_mentioned_ids: + return + # 1) 写 @ 明细表:用于追溯“哪条消息 @ 了谁”。 mention_rows = [ (message_id, group_id, sender_id, mentioned_uid, stat_date, msg_time_str) - for mentioned_uid in clean_mentioned_ids + for mentioned_uid in newly_mentioned_ids ] self.execute_batch( """ @@ -208,7 +316,7 @@ class MessageStorageDB(BaseDBOperator): # 2) 写社交日边表:一条 @ 关系视为 sender -> mentioned_uid 的一条有向边增量。 edge_rows = [ (stat_date, group_id, sender_id, mentioned_uid, 1, 1.0) - for mentioned_uid in clean_mentioned_ids + for mentioned_uid in newly_mentioned_ids ] self.execute_batch( """ @@ -226,8 +334,8 @@ class MessageStorageDB(BaseDBOperator): # 3) 写个人日汇总:更新被@次数/主动@次数,供 value_rank 直接读取。 sender_social_row = ( stat_date, group_id, sender_id, - 0, len(clean_mentioned_ids), 0, - float(len(clean_mentioned_ids)), + 0, len(newly_mentioned_ids), 0, + float(len(newly_mentioned_ids)), ) self.execute_update( """ @@ -244,7 +352,7 @@ class MessageStorageDB(BaseDBOperator): receiver_social_rows = [ (stat_date, group_id, mentioned_uid, 1, 0, 0, 1.0) - for mentioned_uid in clean_mentioned_ids + for mentioned_uid in newly_mentioned_ids ] self.execute_batch( """ @@ -260,7 +368,7 @@ class MessageStorageDB(BaseDBOperator): ) # 4) 回填 unique_interactors:针对本条消息受影响的用户实时重算“去重互动人数”。 - affected_user_ids = [sender_id, *clean_mentioned_ids] + affected_user_ids = [sender_id, *newly_mentioned_ids] self._refresh_unique_interactors(stat_date, group_id, affected_user_ids) except Exception as e: # 社交图统计属于增强链路,不能反向影响主消息入库稳定性。 diff --git a/utils/system_jobs.py b/utils/system_jobs.py index a6141cd..fe34b6c 100644 --- a/utils/system_jobs.py +++ b/utils/system_jobs.py @@ -43,6 +43,14 @@ def get_system_job_definitions(robot) -> List[Dict[str, Any]]: "trigger_config": {"seconds": 300}, "handler": _build_process_pending_images_handler(robot), }, + { + "job_key": "process_pending_mentions", + "name": "待抽取@关系处理", + "description": "每 10 分钟处理一次待抽取@消息并更新社交图", + "trigger_type": "every_seconds", + "trigger_config": {"seconds": 600}, + "handler": _build_process_pending_mentions_handler(robot), + }, ] def _build_process_pending_images_handler(robot) -> Callable[[], Awaitable[None]]: @@ -53,6 +61,14 @@ def _build_process_pending_images_handler(robot) -> Callable[[], Awaitable[None] return _handler +def _build_process_pending_mentions_handler(robot) -> Callable[[], Awaitable[None]]: + async def _handler(): + if hasattr(robot, "message_storage") and robot.message_storage: + await robot.message_storage.process_pending_mentions(batch_size=200, max_age_days=7) + + return _handler + + class SystemJobLoader: """系统任务加载器:从数据库读取调度配置并注册到 async_job。""" diff --git a/utils/wechat/message_to_db.py b/utils/wechat/message_to_db.py index 9f8029d..92abfdd 100644 --- a/utils/wechat/message_to_db.py +++ b/utils/wechat/message_to_db.py @@ -358,6 +358,34 @@ class MessageStorage: except Exception as e: logger.exception(f"定时处理媒体任务出错: {e}") + async def process_pending_mentions(self, batch_size: int = 200, max_age_days: int = 7): + """定时任务:批量处理待抽取 @ 的消息并写入社交图。 + + 说明: + 1. 该任务与主消息归档链路解耦,不阻塞实时收发; + 2. 每次只处理有限批次,避免长事务和数据库抖动; + 3. 重复执行安全:底层按 message_id + sender + mentioned_user_id 做幂等控制。 + """ + try: + stats = self.message_db.process_pending_mentions( + batch_size=batch_size, + max_age_days=max_age_days, + ) + total = int(stats.get("total", 0)) + if total == 0: + logger.debug("待处理@抽取队列为空,本轮跳过") + return + + logger.info( + "批量@抽取完成: " + f"读取={stats.get('total', 0)}, " + f"处理={stats.get('processed', 0)}, " + f"含@={stats.get('with_mentions', 0)}, " + f"失败={stats.get('failed', 0)}" + ) + except Exception as e: + logger.exception(f"定时处理@抽取任务出错: {e}") + def _process_image_done(self, future): """任务完成统一回调(极轻量)""" try: