From ce38f66b7b8488cb62fe1432bd74ac0e0aaacf38 Mon Sep 17 00:00:00 2001 From: liuwei Date: Thu, 30 Apr 2026 15:00:29 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=BA=E4=B8=BB=E6=B6=88=E6=81=AF=E9=93=BE?= =?UTF-8?q?=E8=B7=AF=E6=8E=A5=E5=85=A5trace=5Fid=E8=BF=BD=E8=B8=AA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 为接收消息生成并透传trace_id到插件处理上下文 - 统一关键日志输出格式,支持按trace_id串联排障 - 将统计插件错误记录与执行日志补充trace_id关联信息 - 在工程优化文档中补充近期已完成治理项 --- docs/工程优化与Feature清单.md | 6 ++++ plugins/stats_collector/main.py | 15 +++++--- robot.py | 63 ++++++++++++++++++++++++++++----- 3 files changed, 71 insertions(+), 13 deletions(-) diff --git a/docs/工程优化与Feature清单.md b/docs/工程优化与Feature清单.md index 93744a2..17bcc70 100644 --- a/docs/工程优化与Feature清单.md +++ b/docs/工程优化与Feature清单.md @@ -12,6 +12,12 @@ 本文重点不放在“再增加多少新功能”,而放在“如何让现有系统更稳、更快、更安全、更好维护、更好用”。 +## 1.1 最近已完成的治理项 + +- 已剥离未实际使用的事件系统实现,减少主链路无效抽象 +- 已将插件调用统计改为主链路直接埋点,降低维护复杂度 +- 已在消息主链路接入 `trace_id`,用于串联消息处理、插件统计与异常日志 + ## 2. 项目现状判断 从当前仓库结构、主链路代码、插件体系、后台管理端、存储层与 AI 能力来看,ABOT 已经属于一个具备完整产品雏形的机器人系统,而不是简单的脚本集合。 diff --git a/plugins/stats_collector/main.py b/plugins/stats_collector/main.py index 0f2b69f..ff9fd16 100644 --- a/plugins/stats_collector/main.py +++ b/plugins/stats_collector/main.py @@ -79,6 +79,7 @@ class StatsCollectorPlugin(PluginInterface): is_group: bool, process_result: bool, process_time_ms: float, + trace_id: str = "", ) -> None: """由主链路直接调用,记录一次插件执行结果。""" # 主链路可能会在高频场景下频繁回调这里,因此先做最便宜的开关判断, @@ -96,7 +97,8 @@ class StatsCollectorPlugin(PluginInterface): process_time_ms=process_time_ms, ) self.LOG.debug( - f"记录插件调用结束: {plugin_name} - {command} - 成功: {process_result} - 处理时间: {process_time_ms}ms" + f"记录插件调用结束: trace_id={trace_id or '-'} " + f"{plugin_name} - {command} - 成功: {process_result} - 处理时间: {process_time_ms}ms" ) except Exception as e: self.LOG.error(f"记录插件调用统计数据出错: {e}") @@ -110,6 +112,7 @@ class StatsCollectorPlugin(PluginInterface): group_id: Optional[str], is_group: bool, error_message: str, + trace_id: str = "", stack_trace: Optional[str] = None, ) -> None: """由主链路直接调用,记录一次插件执行异常。""" @@ -122,10 +125,14 @@ class StatsCollectorPlugin(PluginInterface): command=command, user_id=user_id, group_id=group_id if is_group else None, - error_message=error_message, - stack_trace=stack_trace, + # 错误表当前没有独立 trace_id 字段,因此先把 trace_id 前缀写入文案, + # 这样后台查看错误列表时,仍然可以把数据库错误记录与运行日志串起来。 + error_message=f"[trace_id={trace_id}] {error_message}" if trace_id else error_message, + stack_trace=(f"[trace_id={trace_id}]\n{stack_trace}" if trace_id and stack_trace else stack_trace), + ) + self.LOG.debug( + f"记录插件调用错误: trace_id={trace_id or '-'} {plugin_name} - {command} - {error_message}" ) - self.LOG.debug(f"记录插件调用错误: {plugin_name} - {command} - {error_message}") except Exception as e: self.LOG.error(f"记录插件错误信息出错: {e}") diff --git a/robot.py b/robot.py index 6d22d35..8df06da 100644 --- a/robot.py +++ b/robot.py @@ -4,6 +4,7 @@ import threading import time import tomllib import traceback +import uuid from collections import deque import toml @@ -342,12 +343,20 @@ class Robot: # 处理消息 try: wxmsg: WxMessage = WxMessage.from_json(message) + self._attach_trace_id(wxmsg) # 判断是否已经收到过。处理。存储最近20个msg_id,处理之前判断是否在清单里面,如果在,这不重新处理了。 msg_id = wxmsg.msg_id if msg_id in self.recent_msg_ids: - self.LOG.info(f"出现重复ID消息:{msg_id}") + self.LOG.info(self._trace_message(wxmsg, f"出现重复ID消息: {msg_id}")) continue # 已处理,跳过 self.recent_msg_ids.append(msg_id) + self.LOG.debug( + self._trace_message( + wxmsg, + f"收到消息 type={getattr(wxmsg.msg_type, 'name', wxmsg.msg_type)} " + f"sender={wxmsg.sender} room={wxmsg.roomid or '-'}" + ) + ) except Exception as e: self.LOG.error(f"WxMessage.from_json 解析失败,消息内容: {message},错误: {e}") continue # 跳过本条消息,继续处理下一条 @@ -381,7 +390,7 @@ class Robot: try: await self._process_ipad_message(wxmsg) except Exception as e: - self.LOG.error(f"处理消息失败 msg_id={wxmsg.msg_id}, 错误: {e}") + self.LOG.error(self._trace_message(wxmsg, f"处理消息失败 msg_id={wxmsg.msg_id}, 错误: {e}")) async def _handle_ipad_login(self, wxid, device_name, device_id): """处理wechat_ipad登录""" @@ -553,7 +562,7 @@ class Robot: if message.sender != self.wxid: self.message_storage.process_message(message) except Exception as e: - self.LOG.error(f"process_message error: {e}") + self.LOG.error(self._trace_message(message, f"process_message error: {e}")) # # 聊天记录入库动作: try: @@ -562,10 +571,10 @@ class Robot: if message.msg_type == MessageType.IMAGE: # 图片消息类型 self.message_storage.process_image(message) except Exception as e: - self.LOG.error(f"archive_message error: {e}") + self.LOG.error(self._trace_message(message, f"archive_message error: {e}")) except Exception as e: - self.LOG.error(f"处理wechat_ipad消息出错: {e}") + self.LOG.error(self._trace_message(message, f"处理wechat_ipad消息出错: {e}")) def stop_wechat_ipad(self): """停止wechat_ipad客户端""" @@ -592,7 +601,7 @@ class Robot: if is_sleep_time: # 只处理特定消息,如管理员消息或紧急消息 - self.LOG.info(f"夜间休眠时间(00:30-05:00),忽略消息: {msg}") + self.LOG.info(self._trace_message(msg, f"夜间休眠时间(00:30-05:00),忽略消息: {msg}")) return False message_plugins = self.plugin_registry.get_plugins_by_type(MessagePluginInterface) message_plugins = self._sort_message_plugins(message_plugins) @@ -622,6 +631,7 @@ class Robot: "roomid": room_id, "is_at": msg.is_at(self.wxid), "timestamp": time.time(), + "trace_id": self._get_trace_id(msg), "all_contacts": self.allContacts, "full_wx_msg": msg, "gbm": self.gbm, @@ -644,6 +654,13 @@ class Robot: process_time_ms=self._elapsed_ms(started_at), ) if processed: + self.LOG.info( + self._trace_message( + msg, + f"插件命中 plugin={plugin.name} command={command_name} " + f"cost_ms={self._elapsed_ms(started_at)}" + ) + ) return True except Exception as e: self._record_plugin_call_error( @@ -652,10 +669,36 @@ class Robot: command_name=command_name, error=e, ) - self.LOG.error(f"插件 {plugin.name} 处理消息失败: {e}") + self.LOG.error(self._trace_message(msg, f"插件 {plugin.name} 处理消息失败: {e}")) return False + def _attach_trace_id(self, msg: WxMessage) -> str: + """为消息对象附加稳定 trace_id,便于后续全链路关联。""" + trace_id = self._get_trace_id(msg) + if trace_id: + return trace_id + + msg_id = str(getattr(msg, "msg_id", "") or "0") + create_time = str(getattr(msg, "create_time", "") or "0") + sender_tail = str(getattr(msg, "sender", "") or "")[-6:] or "unknown" + random_tail = uuid.uuid4().hex[:6] + trace_id = f"wx-{msg_id}-{create_time}-{sender_tail}-{random_tail}" + setattr(msg, "trace_id", trace_id) + return trace_id + + @staticmethod + def _get_trace_id(msg: WxMessage) -> str: + """读取消息对象上的 trace_id;若不存在则返回空字符串。""" + return str(getattr(msg, "trace_id", "") or "").strip() + + def _trace_message(self, msg: WxMessage, message: str) -> str: + """为日志消息统一追加 trace_id 前缀。""" + trace_id = self._get_trace_id(msg) + if not trace_id: + return message + return f"[trace_id={trace_id}] {message}" + @staticmethod def _elapsed_ms(started_at: float) -> float: """把 monotonic 起始时间转换为毫秒耗时。""" @@ -709,9 +752,10 @@ class Robot: is_group=msg.from_group(), process_result=process_result, process_time_ms=process_time_ms, + trace_id=self._get_trace_id(msg), ) except Exception as stats_error: - self.LOG.error(f"记录插件调用统计失败: plugin={plugin.name}, error={stats_error}") + self.LOG.error(self._trace_message(msg, f"记录插件调用统计失败: plugin={plugin.name}, error={stats_error}")) def _record_plugin_call_error( self, @@ -734,11 +778,12 @@ class Robot: group_id=msg.roomid if msg.from_group() else None, is_group=msg.from_group(), error_message=str(error), + trace_id=self._get_trace_id(msg), # 这里保留完整堆栈,便于后台直接查看异常上下文,而不必只看摘要日志。 stack_trace=traceback.format_exc(), ) except Exception as stats_error: - self.LOG.error(f"记录插件异常统计失败: plugin={plugin.name}, error={stats_error}") + self.LOG.error(self._trace_message(msg, f"记录插件异常统计失败: plugin={plugin.name}, error={stats_error}")) @staticmethod def _sort_message_plugins(message_plugins):