From 2d5a5547de5ff038e6e7e5934fe2515a1edbc234 Mon Sep 17 00:00:00 2001 From: liuwei Date: Thu, 30 Apr 2026 15:22:07 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=B6=88=E6=81=AF=E9=93=BE?= =?UTF-8?q?=E8=B7=AFtrace=E5=B9=B6=E8=B4=AF=E9=80=9AAI=E4=B8=8E=E5=8F=91?= =?UTF-8?q?=E9=80=81=E5=8A=A8=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/工程优化与Feature清单.md | 7 +++++++ robot.py | 8 ++++++++ utils/ai/unified_llm.py | 13 +++++++++++++ utils/trace_context.py | 32 ++++++++++++++++++++++++++++++++ wechat_ipad/client/message.py | 9 +++++++-- 5 files changed, 67 insertions(+), 2 deletions(-) create mode 100644 utils/trace_context.py diff --git a/docs/工程优化与Feature清单.md b/docs/工程优化与Feature清单.md index ba5f105..7667e75 100644 --- a/docs/工程优化与Feature清单.md +++ b/docs/工程优化与Feature清单.md @@ -19,6 +19,7 @@ - 已在消息主链路接入 `trace_id`,用于串联消息处理、插件统计与异常日志 - 已在后台首页补充“系统健康快照”,可集中查看机器人连接、插件运行、近 24 小时异常与 md2img 运行状态 - 已补充 MySQL / Redis 连接探测与统一 LLM 最近调用快照,基础设施与 AI 运行态可直接在首页查看 +- 已将 `trace_id` 通过异步上下文继续贯穿到统一 LLM 调用与微信发送动作,链路追踪粒度进一步提升 ## 2. 项目现状判断 @@ -345,6 +346,12 @@ - 对单条消息实现“从接收到发送”的全链路追踪 +当前进展: + +- 第一阶段已完成:主消息链路、插件统计与异常日志已接入 `trace_id` +- 第二阶段已完成:统一 LLM 调用与微信发送日志已可自动继承同一 `trace_id` +- 后续可继续补充后台按 `trace_id` 检索错误、消息与 AI 调用详情的入口 + 建议内容: - 为每条消息生成统一 trace_id diff --git a/robot.py b/robot.py index 8df06da..912a3c7 100644 --- a/robot.py +++ b/robot.py @@ -32,6 +32,7 @@ from utils.wechat.contact_manager import ContactManager from utils.wechat.member_monitor import ChatroomMemberMonitor from utils.wechat.message_to_db import MessageStorage from utils.ai.llm_registry import LLMRegistry +from utils.trace_context import set_current_trace_id, reset_current_trace_id from wechat_ipad import WechatAPIClient from wechat_ipad.models.message import WxMessage, MessageType @@ -387,10 +388,17 @@ class Robot: async def _process_with_semaphore(self, wxmsg): async with sem: + # 进入单条消息处理前,把 trace_id 放入当前异步上下文: + # 1. 后续插件中的 AI 调用、消息发送、子协程都可以自动继承这个 trace_id; + # 2. 这样不需要给大量现有方法额外加 trace_id 参数,侵入性更小; + # 3. finally 中会回滚 token,避免把当前消息的 trace_id 泄漏到下一条消息。 + trace_token = set_current_trace_id(self._get_trace_id(wxmsg)) try: await self._process_ipad_message(wxmsg) except Exception as e: self.LOG.error(self._trace_message(wxmsg, f"处理消息失败 msg_id={wxmsg.msg_id}, 错误: {e}")) + finally: + reset_current_trace_id(trace_token) async def _handle_ipad_login(self, wxid, device_name, device_id): """处理wechat_ipad登录""" diff --git a/utils/ai/unified_llm.py b/utils/ai/unified_llm.py index 1505672..246aed1 100644 --- a/utils/ai/unified_llm.py +++ b/utils/ai/unified_llm.py @@ -15,6 +15,7 @@ from requests import HTTPError from loguru import logger from utils.ai.llm_registry import LLMRegistry +from utils.trace_context import get_current_trace_id, format_trace_prefix class UnifiedLLMClient: @@ -58,6 +59,7 @@ class UnifiedLLMClient: backend: str, scene: str, model: str, + trace_id: str, success: bool, latency_ms: float, error: str = "", @@ -70,6 +72,7 @@ class UnifiedLLMClient: "backend": str(backend or "").strip(), "scene": str(scene or "").strip(), "model": str(model or "").strip(), + "trace_id": str(trace_id or "").strip(), "success": bool(success), "latency_ms": round(float(latency_ms or 0.0), 2), "error": str(error or "").strip()[:300], @@ -236,6 +239,7 @@ class UnifiedLLMClient: started_at = time.monotonic() self.last_error = "" result: Optional[Dict[str, Any]] = None + current_trace_id = get_current_trace_id() if not self.is_available(): self.last_error = "client_unavailable" elif self.provider == "dify": @@ -272,10 +276,19 @@ class UnifiedLLMClient: backend=str(self.config.get("backend", "") or ""), scene=str(self.config.get("scene", "") or ""), model=self.model or str(self.mode or ""), + trace_id=current_trace_id, success=bool(result and result.get("text")), latency_ms=latency_ms, error=self.last_error, ) + # 在统一出口补一条轻量 trace 日志,方便把“消息 -> AI 调用”快速串起来。 + self.LOG.debug( + f"{format_trace_prefix(current_trace_id)}LLM调用结束 " + f"provider={self.provider} backend={self.config.get('backend', '') or '-'} " + f"scene={self.config.get('scene', '') or '-'} " + f"success={bool(result and result.get('text'))} latency_ms={round(latency_ms, 2)} " + f"error={self.last_error or '-'}" + ) return result def _generate_openai( diff --git a/utils/trace_context.py b/utils/trace_context.py new file mode 100644 index 0000000..79d1e87 --- /dev/null +++ b/utils/trace_context.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from contextvars import ContextVar, Token + +# 当前消息链路的 trace_id: +# 1. 使用 ContextVar 而不是全局变量,避免并发消息之间互相覆盖; +# 2. asyncio Task 会自动继承上下文,因此插件里再起的协程也能拿到同一个 trace_id; +# 3. 对不在消息主链路里的后台任务,该值默认为空字符串,不会影响原有逻辑。 +_CURRENT_TRACE_ID: ContextVar[str] = ContextVar("current_trace_id", default="") + + +def set_current_trace_id(trace_id: str) -> Token: + """写入当前上下文的 trace_id,并返回可回滚 token。""" + return _CURRENT_TRACE_ID.set(str(trace_id or "").strip()) + + +def reset_current_trace_id(token: Token) -> None: + """根据 token 回滚 trace_id,避免上下文泄漏到后续无关任务。""" + _CURRENT_TRACE_ID.reset(token) + + +def get_current_trace_id() -> str: + """读取当前上下文中的 trace_id。""" + return str(_CURRENT_TRACE_ID.get("") or "").strip() + + +def format_trace_prefix(trace_id: str = "") -> str: + """统一生成日志前缀,避免各模块手写格式不一致。""" + resolved_trace_id = str(trace_id or get_current_trace_id() or "").strip() + if not resolved_trace_id: + return "" + return f"[trace_id={resolved_trace_id}] " diff --git a/wechat_ipad/client/message.py b/wechat_ipad/client/message.py index 6fd981c..14cde47 100644 --- a/wechat_ipad/client/message.py +++ b/wechat_ipad/client/message.py @@ -16,6 +16,7 @@ from pymediainfo import MediaInfo import aiofiles from utils.video_utils import get_first_frame, get_first_frame_bytes +from utils.trace_context import format_trace_prefix from wechat_ipad import UserLoggedOut from wechat_ipad.client.base import WechatAPIClientBase @@ -144,7 +145,9 @@ class MessageMixin(WechatAPIClientBase): response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendTxt', json=json_param) json_resp = await response.json() if json_resp.get("Success"): - self.logging.info("发送文字消息: 对方wxid:{} at:{} 内容:{}", wxid, at, content) + # 发送动作也带上 trace_id,便于把“某条入站消息最终发了什么”直接串起来。 + self.logging.info("{}发送文字消息: 对方wxid:{} at:{} 内容:{}", + format_trace_prefix(), wxid, at, content) data = json_resp.get("Data") return data.get("List")[0].get("ClientMsgId"), data.get("List")[0].get("CreateTime"), data.get("List")[ 0].get("NewMsgId") @@ -191,7 +194,9 @@ class MessageMixin(WechatAPIClientBase): if json_resp.get("Success"): json_param.pop('Base64') - self.logging.info("发送图片消息: 对方wxid:{} 图片base64略", wxid) + # 图片日志不打印 base64 内容,但保留 trace_id,便于关联具体发送动作。 + self.logging.info("{}发送图片消息: 对方wxid:{} 图片base64略", + format_trace_prefix(), wxid) data = json_resp.get("Data") self.logging.debug("发送图片消息成功,返回:{}", data) return data.get("ClientImgId").get("string"), data.get("CreateTime"), data.get("NewMsgId")