完善消息链路trace并贯通AI与发送动作

This commit is contained in:
liuwei
2026-04-30 15:22:07 +08:00
parent 4ddab01b8d
commit 2d5a5547de
5 changed files with 67 additions and 2 deletions

View File

@@ -19,6 +19,7 @@
- 已在消息主链路接入 `trace_id`,用于串联消息处理、插件统计与异常日志
- 已在后台首页补充“系统健康快照”,可集中查看机器人连接、插件运行、近 24 小时异常与 md2img 运行状态
- 已补充 MySQL / Redis 连接探测与统一 LLM 最近调用快照,基础设施与 AI 运行态可直接在首页查看
- 已将 `trace_id` 通过异步上下文继续贯穿到统一 LLM 调用与微信发送动作,链路追踪粒度进一步提升
## 2. 项目现状判断
@@ -345,6 +346,12 @@
- 对单条消息实现“从接收到发送”的全链路追踪
当前进展:
- 第一阶段已完成:主消息链路、插件统计与异常日志已接入 `trace_id`
- 第二阶段已完成:统一 LLM 调用与微信发送日志已可自动继承同一 `trace_id`
- 后续可继续补充后台按 `trace_id` 检索错误、消息与 AI 调用详情的入口
建议内容:
- 为每条消息生成统一 trace_id

View File

@@ -32,6 +32,7 @@ from utils.wechat.contact_manager import ContactManager
from utils.wechat.member_monitor import ChatroomMemberMonitor
from utils.wechat.message_to_db import MessageStorage
from utils.ai.llm_registry import LLMRegistry
from utils.trace_context import set_current_trace_id, reset_current_trace_id
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.message import WxMessage, MessageType
@@ -387,10 +388,17 @@ class Robot:
async def _process_with_semaphore(self, wxmsg):
async with sem:
# 进入单条消息处理前,把 trace_id 放入当前异步上下文:
# 1. 后续插件中的 AI 调用、消息发送、子协程都可以自动继承这个 trace_id
# 2. 这样不需要给大量现有方法额外加 trace_id 参数,侵入性更小;
# 3. finally 中会回滚 token避免把当前消息的 trace_id 泄漏到下一条消息。
trace_token = set_current_trace_id(self._get_trace_id(wxmsg))
try:
await self._process_ipad_message(wxmsg)
except Exception as e:
self.LOG.error(self._trace_message(wxmsg, f"处理消息失败 msg_id={wxmsg.msg_id}, 错误: {e}"))
finally:
reset_current_trace_id(trace_token)
async def _handle_ipad_login(self, wxid, device_name, device_id):
"""处理wechat_ipad登录"""

View File

@@ -15,6 +15,7 @@ from requests import HTTPError
from loguru import logger
from utils.ai.llm_registry import LLMRegistry
from utils.trace_context import get_current_trace_id, format_trace_prefix
class UnifiedLLMClient:
@@ -58,6 +59,7 @@ class UnifiedLLMClient:
backend: str,
scene: str,
model: str,
trace_id: str,
success: bool,
latency_ms: float,
error: str = "",
@@ -70,6 +72,7 @@ class UnifiedLLMClient:
"backend": str(backend or "").strip(),
"scene": str(scene or "").strip(),
"model": str(model or "").strip(),
"trace_id": str(trace_id or "").strip(),
"success": bool(success),
"latency_ms": round(float(latency_ms or 0.0), 2),
"error": str(error or "").strip()[:300],
@@ -236,6 +239,7 @@ class UnifiedLLMClient:
started_at = time.monotonic()
self.last_error = ""
result: Optional[Dict[str, Any]] = None
current_trace_id = get_current_trace_id()
if not self.is_available():
self.last_error = "client_unavailable"
elif self.provider == "dify":
@@ -272,10 +276,19 @@ class UnifiedLLMClient:
backend=str(self.config.get("backend", "") or ""),
scene=str(self.config.get("scene", "") or ""),
model=self.model or str(self.mode or ""),
trace_id=current_trace_id,
success=bool(result and result.get("text")),
latency_ms=latency_ms,
error=self.last_error,
)
# 在统一出口补一条轻量 trace 日志,方便把“消息 -> AI 调用”快速串起来。
self.LOG.debug(
f"{format_trace_prefix(current_trace_id)}LLM调用结束 "
f"provider={self.provider} backend={self.config.get('backend', '') or '-'} "
f"scene={self.config.get('scene', '') or '-'} "
f"success={bool(result and result.get('text'))} latency_ms={round(latency_ms, 2)} "
f"error={self.last_error or '-'}"
)
return result
def _generate_openai(

32
utils/trace_context.py Normal file
View File

@@ -0,0 +1,32 @@
from __future__ import annotations
from contextvars import ContextVar, Token
# 当前消息链路的 trace_id
# 1. 使用 ContextVar 而不是全局变量,避免并发消息之间互相覆盖;
# 2. asyncio Task 会自动继承上下文,因此插件里再起的协程也能拿到同一个 trace_id
# 3. 对不在消息主链路里的后台任务,该值默认为空字符串,不会影响原有逻辑。
_CURRENT_TRACE_ID: ContextVar[str] = ContextVar("current_trace_id", default="")
def set_current_trace_id(trace_id: str) -> Token:
"""写入当前上下文的 trace_id并返回可回滚 token。"""
return _CURRENT_TRACE_ID.set(str(trace_id or "").strip())
def reset_current_trace_id(token: Token) -> None:
"""根据 token 回滚 trace_id避免上下文泄漏到后续无关任务。"""
_CURRENT_TRACE_ID.reset(token)
def get_current_trace_id() -> str:
"""读取当前上下文中的 trace_id。"""
return str(_CURRENT_TRACE_ID.get("") or "").strip()
def format_trace_prefix(trace_id: str = "") -> str:
"""统一生成日志前缀,避免各模块手写格式不一致。"""
resolved_trace_id = str(trace_id or get_current_trace_id() or "").strip()
if not resolved_trace_id:
return ""
return f"[trace_id={resolved_trace_id}] "

View File

@@ -16,6 +16,7 @@ from pymediainfo import MediaInfo
import aiofiles
from utils.video_utils import get_first_frame, get_first_frame_bytes
from utils.trace_context import format_trace_prefix
from wechat_ipad import UserLoggedOut
from wechat_ipad.client.base import WechatAPIClientBase
@@ -144,7 +145,9 @@ class MessageMixin(WechatAPIClientBase):
response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendTxt', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
self.logging.info("发送文字消息: 对方wxid:{} at:{} 内容:{}", wxid, at, content)
# 发送动作也带上 trace_id便于把“某条入站消息最终发了什么”直接串起来。
self.logging.info("{}发送文字消息: 对方wxid:{} at:{} 内容:{}",
format_trace_prefix(), wxid, at, content)
data = json_resp.get("Data")
return data.get("List")[0].get("ClientMsgId"), data.get("List")[0].get("CreateTime"), data.get("List")[
0].get("NewMsgId")
@@ -191,7 +194,9 @@ class MessageMixin(WechatAPIClientBase):
if json_resp.get("Success"):
json_param.pop('Base64')
self.logging.info("发送图片消息: 对方wxid:{} 图片base64略", wxid)
# 图片日志不打印 base64 内容,但保留 trace_id便于关联具体发送动作。
self.logging.info("{}发送图片消息: 对方wxid:{} 图片base64略",
format_trace_prefix(), wxid)
data = json_resp.get("Data")
self.logging.debug("发送图片消息成功,返回:{}", data)
return data.get("ClientImgId").get("string"), data.get("CreateTime"), data.get("NewMsgId")