将启动历史消息改为仅落库归档

This commit is contained in:
liuwei
2026-05-06 11:11:46 +08:00
parent b4c046a594
commit 1339040527

View File

@@ -326,7 +326,9 @@ class Robot:
count += 1
continue
self.LOG.debug(f"接受到 {len(data)}消息")
self.LOG.debug(f"接受到 {len(data)}历史消息,开始仅落库归档")
for raw_message in data:
await self._archive_startup_history_message(raw_message)
await asyncio.sleep(1)
self.LOG.info("处理堆积消息完毕")
@@ -400,6 +402,52 @@ class Robot:
# 在类里直接写一个内联 async 方法(不额外抽取新的对外方法)
async def _archive_startup_history_message(self, raw_message: dict) -> None:
"""启动阶段只归档历史消息,不触发实时业务处理。
目标:
1. 保留历史消息记录,方便后台查询、总结和审计;
2. 不触发插件、副作用指令、自动回复、积分统计等实时逻辑;
3. 与实时拉流阶段共享最近消息去重队列,避免边界消息被重复处理。
"""
try:
wxmsg: WxMessage = WxMessage.from_json(raw_message)
except Exception as e:
self.LOG.error(f"启动阶段历史消息解析失败,消息内容: {raw_message},错误: {e}")
return
try:
self._attach_trace_id(wxmsg)
msg_id = wxmsg.msg_id
if msg_id in self.recent_msg_ids:
self.LOG.debug(self._trace_message(wxmsg, f"历史消息重复,跳过归档: {msg_id}"))
return
# 先放入近期去重队列:
# 1. 启动阶段拉到的最后几条消息,可能和实时阶段收到的第一批消息重叠;
# 2. 这里提前记下 msg_id可以避免后续被当成“新消息”再次触发业务逻辑
# 3. 该队列长度虽然有限,但足够覆盖启动切换期的边界重复问题。
self.recent_msg_ids.append(msg_id)
if not self.message_storage:
self.LOG.warning(self._trace_message(wxmsg, "历史消息归档跳过message_storage 尚未初始化"))
return
# 历史消息只落库,不做实时业务:
# 1. 不调用 process_plugin_message避免历史消息触发插件副作用
# 2. 不调用 process_message避免历史发言被重复计入实时统计
# 3. 不走 _process_ipad_message避免自动加群、成员变更、媒体业务等被整段回放。
self.message_storage.archive_message(wxmsg)
self.LOG.debug(
self._trace_message(
wxmsg,
f"历史消息已归档 type={getattr(wxmsg.msg_type, 'name', wxmsg.msg_type)} "
f"sender={wxmsg.sender} room={wxmsg.roomid or '-'}"
)
)
except Exception as e:
self.LOG.error(self._trace_message(wxmsg, f"历史消息归档失败 msg_id={wxmsg.msg_id}, 错误: {e}"))
async def _process_with_semaphore(self, wxmsg):
async with sem:
# 进入单条消息处理前,把 trace_id 放入当前异步上下文: