为主消息链路接入trace_id追踪

- 为接收消息生成并透传trace_id到插件处理上下文
- 统一关键日志输出格式,支持按trace_id串联排障
- 将统计插件错误记录与执行日志补充trace_id关联信息
- 在工程优化文档中补充近期已完成治理项
This commit is contained in:
liuwei
2026-04-30 15:00:29 +08:00
parent a580bd12e5
commit ce38f66b7b
3 changed files with 71 additions and 13 deletions

View File

@@ -12,6 +12,12 @@
本文重点不放在“再增加多少新功能”,而放在“如何让现有系统更稳、更快、更安全、更好维护、更好用”。
## 1.1 最近已完成的治理项
- 已剥离未实际使用的事件系统实现,减少主链路无效抽象
- 已将插件调用统计改为主链路直接埋点,降低维护复杂度
- 已在消息主链路接入 `trace_id`,用于串联消息处理、插件统计与异常日志
## 2. 项目现状判断
从当前仓库结构、主链路代码、插件体系、后台管理端、存储层与 AI 能力来看ABOT 已经属于一个具备完整产品雏形的机器人系统,而不是简单的脚本集合。

View File

@@ -79,6 +79,7 @@ class StatsCollectorPlugin(PluginInterface):
is_group: bool,
process_result: bool,
process_time_ms: float,
trace_id: str = "",
) -> None:
"""由主链路直接调用,记录一次插件执行结果。"""
# 主链路可能会在高频场景下频繁回调这里,因此先做最便宜的开关判断,
@@ -96,7 +97,8 @@ class StatsCollectorPlugin(PluginInterface):
process_time_ms=process_time_ms,
)
self.LOG.debug(
f"记录插件调用结束: {plugin_name} - {command} - 成功: {process_result} - 处理时间: {process_time_ms}ms"
f"记录插件调用结束: trace_id={trace_id or '-'} "
f"{plugin_name} - {command} - 成功: {process_result} - 处理时间: {process_time_ms}ms"
)
except Exception as e:
self.LOG.error(f"记录插件调用统计数据出错: {e}")
@@ -110,6 +112,7 @@ class StatsCollectorPlugin(PluginInterface):
group_id: Optional[str],
is_group: bool,
error_message: str,
trace_id: str = "",
stack_trace: Optional[str] = None,
) -> None:
"""由主链路直接调用,记录一次插件执行异常。"""
@@ -122,10 +125,14 @@ class StatsCollectorPlugin(PluginInterface):
command=command,
user_id=user_id,
group_id=group_id if is_group else None,
error_message=error_message,
stack_trace=stack_trace,
# 错误表当前没有独立 trace_id 字段,因此先把 trace_id 前缀写入文案,
# 这样后台查看错误列表时,仍然可以把数据库错误记录与运行日志串起来。
error_message=f"[trace_id={trace_id}] {error_message}" if trace_id else error_message,
stack_trace=(f"[trace_id={trace_id}]\n{stack_trace}" if trace_id and stack_trace else stack_trace),
)
self.LOG.debug(
f"记录插件调用错误: trace_id={trace_id or '-'} {plugin_name} - {command} - {error_message}"
)
self.LOG.debug(f"记录插件调用错误: {plugin_name} - {command} - {error_message}")
except Exception as e:
self.LOG.error(f"记录插件错误信息出错: {e}")

View File

@@ -4,6 +4,7 @@ import threading
import time
import tomllib
import traceback
import uuid
from collections import deque
import toml
@@ -342,12 +343,20 @@ class Robot:
# 处理消息
try:
wxmsg: WxMessage = WxMessage.from_json(message)
self._attach_trace_id(wxmsg)
# 判断是否已经收到过。处理。存储最近20个msg_id处理之前判断是否在清单里面如果在这不重新处理了。
msg_id = wxmsg.msg_id
if msg_id in self.recent_msg_ids:
self.LOG.info(f"出现重复ID消息:{msg_id}")
self.LOG.info(self._trace_message(wxmsg, f"出现重复ID消息: {msg_id}"))
continue # 已处理,跳过
self.recent_msg_ids.append(msg_id)
self.LOG.debug(
self._trace_message(
wxmsg,
f"收到消息 type={getattr(wxmsg.msg_type, 'name', wxmsg.msg_type)} "
f"sender={wxmsg.sender} room={wxmsg.roomid or '-'}"
)
)
except Exception as e:
self.LOG.error(f"WxMessage.from_json 解析失败,消息内容: {message},错误: {e}")
continue # 跳过本条消息,继续处理下一条
@@ -381,7 +390,7 @@ class Robot:
try:
await self._process_ipad_message(wxmsg)
except Exception as e:
self.LOG.error(f"处理消息失败 msg_id={wxmsg.msg_id}, 错误: {e}")
self.LOG.error(self._trace_message(wxmsg, f"处理消息失败 msg_id={wxmsg.msg_id}, 错误: {e}"))
async def _handle_ipad_login(self, wxid, device_name, device_id):
"""处理wechat_ipad登录"""
@@ -553,7 +562,7 @@ class Robot:
if message.sender != self.wxid:
self.message_storage.process_message(message)
except Exception as e:
self.LOG.error(f"process_message error: {e}")
self.LOG.error(self._trace_message(message, f"process_message error: {e}"))
# # 聊天记录入库动作:
try:
@@ -562,10 +571,10 @@ class Robot:
if message.msg_type == MessageType.IMAGE: # 图片消息类型
self.message_storage.process_image(message)
except Exception as e:
self.LOG.error(f"archive_message error: {e}")
self.LOG.error(self._trace_message(message, f"archive_message error: {e}"))
except Exception as e:
self.LOG.error(f"处理wechat_ipad消息出错: {e}")
self.LOG.error(self._trace_message(message, f"处理wechat_ipad消息出错: {e}"))
def stop_wechat_ipad(self):
"""停止wechat_ipad客户端"""
@@ -592,7 +601,7 @@ class Robot:
if is_sleep_time:
# 只处理特定消息,如管理员消息或紧急消息
self.LOG.info(f"夜间休眠时间(00:30-05:00),忽略消息: {msg}")
self.LOG.info(self._trace_message(msg, f"夜间休眠时间(00:30-05:00),忽略消息: {msg}"))
return False
message_plugins = self.plugin_registry.get_plugins_by_type(MessagePluginInterface)
message_plugins = self._sort_message_plugins(message_plugins)
@@ -622,6 +631,7 @@ class Robot:
"roomid": room_id,
"is_at": msg.is_at(self.wxid),
"timestamp": time.time(),
"trace_id": self._get_trace_id(msg),
"all_contacts": self.allContacts,
"full_wx_msg": msg,
"gbm": self.gbm,
@@ -644,6 +654,13 @@ class Robot:
process_time_ms=self._elapsed_ms(started_at),
)
if processed:
self.LOG.info(
self._trace_message(
msg,
f"插件命中 plugin={plugin.name} command={command_name} "
f"cost_ms={self._elapsed_ms(started_at)}"
)
)
return True
except Exception as e:
self._record_plugin_call_error(
@@ -652,10 +669,36 @@ class Robot:
command_name=command_name,
error=e,
)
self.LOG.error(f"插件 {plugin.name} 处理消息失败: {e}")
self.LOG.error(self._trace_message(msg, f"插件 {plugin.name} 处理消息失败: {e}"))
return False
def _attach_trace_id(self, msg: WxMessage) -> str:
"""为消息对象附加稳定 trace_id便于后续全链路关联。"""
trace_id = self._get_trace_id(msg)
if trace_id:
return trace_id
msg_id = str(getattr(msg, "msg_id", "") or "0")
create_time = str(getattr(msg, "create_time", "") or "0")
sender_tail = str(getattr(msg, "sender", "") or "")[-6:] or "unknown"
random_tail = uuid.uuid4().hex[:6]
trace_id = f"wx-{msg_id}-{create_time}-{sender_tail}-{random_tail}"
setattr(msg, "trace_id", trace_id)
return trace_id
@staticmethod
def _get_trace_id(msg: WxMessage) -> str:
"""读取消息对象上的 trace_id若不存在则返回空字符串。"""
return str(getattr(msg, "trace_id", "") or "").strip()
def _trace_message(self, msg: WxMessage, message: str) -> str:
"""为日志消息统一追加 trace_id 前缀。"""
trace_id = self._get_trace_id(msg)
if not trace_id:
return message
return f"[trace_id={trace_id}] {message}"
@staticmethod
def _elapsed_ms(started_at: float) -> float:
"""把 monotonic 起始时间转换为毫秒耗时。"""
@@ -709,9 +752,10 @@ class Robot:
is_group=msg.from_group(),
process_result=process_result,
process_time_ms=process_time_ms,
trace_id=self._get_trace_id(msg),
)
except Exception as stats_error:
self.LOG.error(f"记录插件调用统计失败: plugin={plugin.name}, error={stats_error}")
self.LOG.error(self._trace_message(msg, f"记录插件调用统计失败: plugin={plugin.name}, error={stats_error}"))
def _record_plugin_call_error(
self,
@@ -734,11 +778,12 @@ class Robot:
group_id=msg.roomid if msg.from_group() else None,
is_group=msg.from_group(),
error_message=str(error),
trace_id=self._get_trace_id(msg),
# 这里保留完整堆栈,便于后台直接查看异常上下文,而不必只看摘要日志。
stack_trace=traceback.format_exc(),
)
except Exception as stats_error:
self.LOG.error(f"记录插件异常统计失败: plugin={plugin.name}, error={stats_error}")
self.LOG.error(self._trace_message(msg, f"记录插件异常统计失败: plugin={plugin.name}, error={stats_error}"))
@staticmethod
def _sort_message_plugins(message_plugins):