from __future__ import annotations from contextvars import ContextVar, Token # 当前消息链路的 trace_id: # 1. 使用 ContextVar 而不是全局变量,避免并发消息之间互相覆盖; # 2. asyncio Task 会自动继承上下文,因此插件里再起的协程也能拿到同一个 trace_id; # 3. 对不在消息主链路里的后台任务,该值默认为空字符串,不会影响原有逻辑。 _CURRENT_TRACE_ID: ContextVar[str] = ContextVar("current_trace_id", default="") def set_current_trace_id(trace_id: str) -> Token: """写入当前上下文的 trace_id,并返回可回滚 token。""" return _CURRENT_TRACE_ID.set(str(trace_id or "").strip()) def reset_current_trace_id(token: Token) -> None: """根据 token 回滚 trace_id,避免上下文泄漏到后续无关任务。""" _CURRENT_TRACE_ID.reset(token) def get_current_trace_id() -> str: """读取当前上下文中的 trace_id。""" return str(_CURRENT_TRACE_ID.get("") or "").strip() def format_trace_prefix(trace_id: str = "") -> str: """统一生成日志前缀,避免各模块手写格式不一致。""" resolved_trace_id = str(trace_id or get_current_trace_id() or "").strip() if not resolved_trace_id: return "" return f"[trace_id={resolved_trace_id}] "