From 1339040527e89d5a458ff73468e89b5c82ddef03 Mon Sep 17 00:00:00 2001 From: liuwei Date: Wed, 6 May 2026 11:11:46 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86=E5=90=AF=E5=8A=A8=E5=8E=86=E5=8F=B2?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=94=B9=E4=B8=BA=E4=BB=85=E8=90=BD=E5=BA=93?= =?UTF-8?q?=E5=BD=92=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- robot.py | 50 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/robot.py b/robot.py index 83daf44..120ecf1 100644 --- a/robot.py +++ b/robot.py @@ -326,7 +326,9 @@ class Robot: count += 1 continue - self.LOG.debug(f"接受到 {len(data)} 条消息") + self.LOG.debug(f"接受到 {len(data)} 条历史消息,开始仅落库归档") + for raw_message in data: + await self._archive_startup_history_message(raw_message) await asyncio.sleep(1) self.LOG.info("处理堆积消息完毕") @@ -400,6 +402,52 @@ class Robot: # 在类里直接写一个内联 async 方法(不额外抽取新的对外方法) + async def _archive_startup_history_message(self, raw_message: dict) -> None: + """启动阶段只归档历史消息,不触发实时业务处理。 + + 目标: + 1. 保留历史消息记录,方便后台查询、总结和审计; + 2. 不触发插件、副作用指令、自动回复、积分统计等实时逻辑; + 3. 与实时拉流阶段共享最近消息去重队列,避免边界消息被重复处理。 + """ + try: + wxmsg: WxMessage = WxMessage.from_json(raw_message) + except Exception as e: + self.LOG.error(f"启动阶段历史消息解析失败,消息内容: {raw_message},错误: {e}") + return + + try: + self._attach_trace_id(wxmsg) + msg_id = wxmsg.msg_id + if msg_id in self.recent_msg_ids: + self.LOG.debug(self._trace_message(wxmsg, f"历史消息重复,跳过归档: {msg_id}")) + return + + # 先放入近期去重队列: + # 1. 启动阶段拉到的最后几条消息,可能和实时阶段收到的第一批消息重叠; + # 2. 这里提前记下 msg_id,可以避免后续被当成“新消息”再次触发业务逻辑; + # 3. 该队列长度虽然有限,但足够覆盖启动切换期的边界重复问题。 + self.recent_msg_ids.append(msg_id) + + if not self.message_storage: + self.LOG.warning(self._trace_message(wxmsg, "历史消息归档跳过:message_storage 尚未初始化")) + return + + # 历史消息只落库,不做实时业务: + # 1. 不调用 process_plugin_message,避免历史消息触发插件副作用; + # 2. 不调用 process_message,避免历史发言被重复计入实时统计; + # 3. 不走 _process_ipad_message,避免自动加群、成员变更、媒体业务等被整段回放。 + self.message_storage.archive_message(wxmsg) + self.LOG.debug( + self._trace_message( + wxmsg, + f"历史消息已归档 type={getattr(wxmsg.msg_type, 'name', wxmsg.msg_type)} " + f"sender={wxmsg.sender} room={wxmsg.roomid or '-'}" + ) + ) + except Exception as e: + self.LOG.error(self._trace_message(wxmsg, f"历史消息归档失败 msg_id={wxmsg.msg_id}, 错误: {e}")) + async def _process_with_semaphore(self, wxmsg): async with sem: # 进入单条消息处理前,把 trace_id 放入当前异步上下文: