feat:优化AI

This commit is contained in:
2025-12-23 13:54:35 +08:00
parent c1983172af
commit cc250e1f1e
2 changed files with 620 additions and 180 deletions

View File

@@ -9,6 +9,7 @@ import asyncio
import tomllib
import aiohttp
import json
import re
from pathlib import Path
from datetime import datetime
from loguru import logger
@@ -650,6 +651,397 @@ class AIChat(PluginBase):
return ""
return str(content)
def _sanitize_llm_output(self, text) -> str:
"""
清洗 LLM 输出,尽量满足:不输出思维链、不使用 Markdown。
说明:提示词并非强约束,因此在所有“发给用户/写入上下文”的出口统一做后处理。
"""
if text is None:
return ""
raw = str(text)
cleaned = raw
output_cfg = (self.config or {}).get("output", {})
strip_thinking = output_cfg.get("strip_thinking", True)
strip_markdown = output_cfg.get("strip_markdown", True)
# 先做一次 Markdown 清理,避免 “**思考过程:**/### 思考” 这类包裹导致无法识别
if strip_markdown:
cleaned = self._strip_markdown_syntax(cleaned)
if strip_thinking:
cleaned = self._strip_thinking_content(cleaned)
# 再跑一轮:部分模型会把“思考/最终”标记写成 Markdown或在剥离标签后才露出标记
if strip_markdown:
cleaned = self._strip_markdown_syntax(cleaned)
if strip_thinking:
cleaned = self._strip_thinking_content(cleaned)
cleaned = cleaned.strip()
# 兜底:清洗后仍残留明显“思维链/大纲”标记时,再尝试一次“抽取最终段”
if strip_thinking and cleaned and self._contains_thinking_markers(cleaned):
extracted = self._extract_after_last_answer_marker(cleaned)
if not extracted:
extracted = self._extract_final_answer_from_outline(cleaned)
if extracted:
cleaned = extracted.strip()
# 仍残留标记:尽量选取最后一个“不含标记”的段落作为最终回复
if cleaned and self._contains_thinking_markers(cleaned):
parts = [p.strip() for p in re.split(r"\n{2,}", cleaned) if p.strip()]
for p in reversed(parts):
if not self._contains_thinking_markers(p):
cleaned = p
break
cleaned = cleaned.strip()
# 最终兜底:仍然像思维链就直接丢弃(宁可不发也不要把思维链发出去)
if strip_thinking and cleaned and self._contains_thinking_markers(cleaned):
return ""
if cleaned:
return cleaned
raw_stripped = raw.strip()
# 清洗后为空时,不要回退到包含思维链标记的原文(避免把 <think>... 直接发出去)
if strip_thinking and self._contains_thinking_markers(raw_stripped):
return ""
return raw_stripped
def _contains_thinking_markers(self, text: str) -> bool:
"""粗略判断文本是否包含明显的“思考/推理”外显标记,用于决定是否允许回退原文。"""
if not text:
return False
lowered = text.lower()
tag_tokens = (
"<think", "</think",
"<analysis", "</analysis",
"<reasoning", "</reasoning",
"<thought", "</thought",
"<thinking", "</thinking",
"<thoughts", "</thoughts",
"<scratchpad", "</scratchpad",
"&lt;think", "&lt;/think",
"&lt;analysis", "&lt;/analysis",
"&lt;reasoning", "&lt;/reasoning",
"&lt;thought", "&lt;/thought",
"&lt;thinking", "&lt;/thinking",
"&lt;thoughts", "&lt;/thoughts",
"&lt;scratchpad", "&lt;/scratchpad",
)
if any(tok in lowered for tok in tag_tokens):
return True
stripped = text.strip()
if stripped.startswith("{") and stripped.endswith("}"):
# JSON 结构化输出常见于“analysis/final”
json_keys = (
"\"analysis\"",
"\"reasoning\"",
"\"thought\"",
"\"thoughts\"",
"\"scratchpad\"",
"\"final\"",
"\"answer\"",
"\"response\"",
"\"output\"",
"\"text\"",
)
if any(k in lowered for k in json_keys):
return True
# YAML/KV 风格
if re.search(r"(?im)^\s*(analysis|reasoning|thoughts?|scratchpad|final|answer|response|output|text|思考|分析|推理|最终|输出)\s*[:]", text):
return True
marker_re = re.compile(
r"(?mi)^\s*(?:\d+\s*[\.\、:)\-–—]\s*)?(?:[-*•]+\s*)?"
r"(?:【\s*(?:思考过程|推理过程|分析过程|思考|分析|推理|内心独白|内心os|思维链|思路|"
r"chain\s*of\s*thought|reasoning|analysis|thinking|thoughts|thought\s*process|scratchpad)\s*】"
r"|(?:思考过程|推理过程|分析过程|思考|分析|推理|内心独白|内心os|思维链|思路|"
r"chain\s*of\s*thought|reasoning|analysis|analyze|thinking|thoughts|thought\s*process|scratchpad|internal\s*monologue|mind\s*space|final\s*polish|output\s*generation)"
r"(?:\s*】)?\s*(?:[:]|$|\s+))"
)
return marker_re.search(text) is not None
def _extract_after_last_answer_marker(self, text: str) -> str | None:
"""从文本中抽取最后一个“最终/输出/答案”标记后的内容(不要求必须是编号大纲)。"""
if not text:
return None
# 1) 明确的行首标记Text:/Final Answer:/输出: ...
marker_re = re.compile(
r"(?im)^\s*(?:\d+\s*[\.\、:\)、)\-–—]\s*)?"
r"(?:text|final\s*answer|final\s*response|final\s*output|final|output|answer|response|输出|最终回复|最终答案|最终)\s*[:]\s*"
)
matches = list(marker_re.finditer(text))
if matches:
candidate = text[matches[-1].end():].strip()
if candidate:
return candidate
# 2) JSON/YAML 风格final: ... / \"final\": \"...\"
kv_re = re.compile(
r"(?im)^\s*\"?(?:final|answer|response|output|text|最终|最终回复|最终答案|输出)\"?\s*[:]\s*"
)
kv_matches = list(kv_re.finditer(text))
if kv_matches:
candidate = text[kv_matches[-1].end():].strip()
if candidate:
return candidate
# 3) 纯 JSON 对象(尝试解析)
stripped = text.strip()
if stripped.startswith("{") and stripped.endswith("}"):
try:
obj = json.loads(stripped)
if isinstance(obj, dict):
for key in ("final", "answer", "response", "output", "text"):
v = obj.get(key)
if isinstance(v, str) and v.strip():
return v.strip()
except Exception:
pass
return None
def _extract_final_answer_from_outline(self, text: str) -> str | None:
"""从“分析/草稿/输出”这类结构化大纲中提取最终回复正文(用于拦截思维链)。"""
if not text:
return None
# 至少包含多个“1./2./3.”段落,才认为可能是大纲/思维链输出
heading_re = re.compile(r"(?m)^\s*\d+\s*[\.\、:\)、)\-–—]\s*\S+")
if len(heading_re.findall(text)) < 2:
return None
# 优先:提取最后一个 “Text:/Final Answer:/Output:” 之后的内容
marker_re = re.compile(
r"(?im)^\s*(?:\d+\s*[\.\、:\)、)\-–—]\s*)?"
r"(?:text|final\s*answer|final\s*response|final\s*output|output|answer|response|输出|最终回复|最终答案)\s*[:]\s*"
)
matches = list(marker_re.finditer(text))
if matches:
candidate = text[matches[-1].end():].strip()
if candidate:
return candidate
# 没有明确的最终标记时,仅在包含“分析/思考/草稿/输出”等元信息关键词的情况下兜底抽取
lowered = text.lower()
outline_keywords = (
"analyze",
"analysis",
"reasoning",
"internal monologue",
"mind space",
"draft",
"drafting",
"outline",
"plan",
"steps",
"formulating response",
"final polish",
"final answer",
"output generation",
"system prompt",
"chat log",
"previous turn",
"current situation",
)
cn_keywords = ("思考", "分析", "推理", "思维链", "草稿", "计划", "步骤", "输出", "最终")
if not any(k in lowered for k in outline_keywords) and not any(k in text for k in cn_keywords):
return None
# 次选:取最后一个非空段落(避免返回整段大纲)
parts = [p.strip() for p in re.split(r"\n{2,}", text) if p.strip()]
if not parts:
return None
last = parts[-1]
if len(heading_re.findall(last)) == 0:
return last
return None
def _strip_thinking_content(self, text: str) -> str:
"""移除常见的“思考/推理”外显内容(如 <think>...</think>、思考:...)。"""
if not text:
return ""
t = text.replace("\r\n", "\n").replace("\r", "\n")
# 1) 先移除显式标签块(常见于某些推理模型)
thinking_tags = ("think", "analysis", "reasoning", "thought", "thinking", "thoughts", "scratchpad", "reflection")
for tag in thinking_tags:
t = re.sub(rf"<{tag}\b[^>]*>.*?</{tag}>", "", t, flags=re.IGNORECASE | re.DOTALL)
# 兼容被转义的标签(&lt;think&gt;...&lt;/think&gt;
t = re.sub(rf"&lt;{tag}\b[^&]*&gt;.*?&lt;/{tag}&gt;", "", t, flags=re.IGNORECASE | re.DOTALL)
# 1.1) 兜底:流式/截断导致标签未闭合时,若开头出现思考标签,直接截断后续内容
m = re.search(r"<(think|analysis|reasoning|thought|thinking|thoughts|scratchpad|reflection)\b[^>]*>", t, flags=re.IGNORECASE)
if m and m.start() < 200:
t = t[: m.start()].rstrip()
m2 = re.search(r"&lt;(think|analysis|reasoning|thought|thinking|thoughts|scratchpad|reflection)\b[^&]*&gt;", t, flags=re.IGNORECASE)
if m2 and m2.start() < 200:
t = t[: m2.start()].rstrip()
# 2) 再处理“思考:.../最终:...”这种分段格式(尽量只剥离前置思考)
lines = t.split("\n")
if not lines:
return t
# 若文本中包含明显的“最终/输出/答案”标记(不限是否编号),直接抽取最后一段,避免把大纲整体发出去
if self._contains_thinking_markers(t):
extracted_anywhere = self._extract_after_last_answer_marker(t)
if extracted_anywhere:
return extracted_anywhere
reasoning_kw = (
r"思考过程|推理过程|分析过程|思考|分析|推理|思路|内心独白|内心os|思维链|"
r"chain\s*of\s*thought|reasoning|analysis|analyze|thinking|thoughts|thought\s*process|scratchpad|plan|steps|draft|outline"
)
answer_kw = r"最终答案|最终回复|最终|回答|回复|答复|结论|输出|final(?:\s*answer)?|final\s*response|final\s*output|answer|response|output|text"
# 兼容:
# - 思考:... / 最终回复:...
# - 【思考】... / 【最终】...
# - **思考过程:**Markdown 会在外层先被剥离)
reasoning_start = re.compile(
rf"^\s*(?:\d+\s*[\.\、:\)、)\-–—]\s*)?(?:[-*•]+\s*)?"
rf"(?:【\s*(?:{reasoning_kw})\s*】\s*[:]?\s*|(?:{reasoning_kw})(?:\s*】)?\s*(?:[:]|$|\s+))",
re.IGNORECASE,
)
answer_start = re.compile(
rf"^\s*(?:\d+\s*[\.\、:\)、)\-–—]\s*)?(?:[-*•]+\s*)?"
rf"(?:【\s*(?:{answer_kw})\s*】\s*[:]?\s*|(?:{answer_kw})(?:\s*】)?\s*(?:[:]|$)\s*)",
re.IGNORECASE,
)
# 2.0) 若文本开头就是“最终回复:/Final answer:”之类,直接去掉标记(不强依赖出现“思考块”)
for idx, line in enumerate(lines):
if line.strip() == "":
continue
m0 = answer_start.match(line)
if m0:
lines[idx] = line[m0.end():].lstrip()
break
has_reasoning = any(reasoning_start.match(line) for line in lines[:10])
has_answer_marker = any(answer_start.match(line) for line in lines)
# 2.1) 若同时存在“思考块 + 答案标记”,跳过思考块直到答案标记
if has_reasoning and has_answer_marker:
out_lines: list[str] = []
skipping = False
answer_started = False
for line in lines:
if answer_started:
out_lines.append(line)
continue
if not skipping and reasoning_start.match(line):
skipping = True
continue
if skipping:
m = answer_start.match(line)
if m:
answer_started = True
skipping = False
out_lines.append(line[m.end():].lstrip())
continue
m = answer_start.match(line)
if m:
answer_started = True
out_lines.append(line[m.end():].lstrip())
else:
out_lines.append(line)
t2 = "\n".join(out_lines).strip()
return t2 if t2 else t
# 2.2) 兜底:若开头就是“思考:”,尝试去掉第一段(到第一个空行)
if has_reasoning:
first_blank_idx = None
for idx, line in enumerate(lines):
if line.strip() == "":
first_blank_idx = idx
break
if first_blank_idx is not None and first_blank_idx + 1 < len(lines):
candidate = "\n".join(lines[first_blank_idx + 1 :]).strip()
if candidate:
return candidate
# 2.3) 兜底识别“1. Analyze... 2. ... 6. Output ... Text: ...”这类思维链大纲并抽取最终正文
outline_extracted = self._extract_final_answer_from_outline("\n".join(lines).strip())
if outline_extracted:
return outline_extracted
# 将行级处理结果合回文本(例如去掉开头的“最终回复:”标记)
t = "\n".join(lines).strip()
# 3) 兼容 <final>...</final> 这类包裹(保留正文,去掉标签)
t = re.sub(r"</?\s*(final|answer)\s*>", "", t, flags=re.IGNORECASE).strip()
return t
def _strip_markdown_syntax(self, text: str) -> str:
"""将常见 Markdown 标记转换为更像纯文本的形式(保留内容,移除格式符)。"""
if not text:
return ""
t = text.replace("\r\n", "\n").replace("\r", "\n")
# 去掉代码块围栏(保留内容)
t = re.sub(r"```[^\n]*\n", "", t)
t = t.replace("```", "")
# 图片/链接:![alt](url) / [text](url)
def _md_image_repl(m: re.Match) -> str:
alt = (m.group(1) or "").strip()
url = (m.group(2) or "").strip()
if alt and url:
return f"{alt}{url}"
return url or alt or ""
def _md_link_repl(m: re.Match) -> str:
label = (m.group(1) or "").strip()
url = (m.group(2) or "").strip()
if label and url:
return f"{label}{url}"
return url or label or ""
t = re.sub(r"!\[([^\]]*)\]\(([^)]+)\)", _md_image_repl, t)
t = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", _md_link_repl, t)
# 行级标记:标题、引用、分割线
cleaned_lines: list[str] = []
for line in t.split("\n"):
line = re.sub(r"^\s{0,3}#{1,6}\s+", "", line) # 标题
line = re.sub(r"^\s{0,3}>\s?", "", line) # 引用
if re.match(r"^\s*(?:-{3,}|\*{3,}|_{3,})\s*$", line):
continue # 分割线整行移除
cleaned_lines.append(line)
t = "\n".join(cleaned_lines)
# 行内代码:`code`
t = re.sub(r"`([^`]+)`", r"\1", t)
# 粗体/删除线(保留文本)
t = t.replace("**", "")
t = t.replace("__", "")
t = t.replace("~~", "")
# 斜体(保留文本,避免误伤乘法/通配符,仅处理成对包裹)
t = re.sub(r"(?<!\*)\*([^*\n]+)\*(?!\*)", r"\1", t)
t = re.sub(r"(?<!_)_([^_\n]+)_(?!_)", r"\1", t)
# 压缩过多空行
t = re.sub(r"\n{3,}", "\n\n", t)
return t.strip()
def _append_group_history_messages(self, messages: list, recent_history: list):
"""将群聊历史按 role 追加到 LLM messages"""
for msg in recent_history:
@@ -661,6 +1053,8 @@ class AIChat(PluginBase):
if role == "assistant":
if isinstance(msg_content, list):
msg_content = self._extract_text_from_multimodal(msg_content)
# 避免旧历史中的 Markdown/思维链污染上下文
msg_content = self._sanitize_llm_output(msg_content)
messages.append({
"role": "assistant",
"content": msg_content
@@ -971,10 +1365,20 @@ class AIChat(PluginBase):
# 获取用户昵称(用于历史记录)- 使用缓存优化
nickname = await self._get_user_nickname(bot, from_wxid, user_wxid, is_group)
# 提取实际消息内容(去除@),仅在需要回复时使用
actual_content = ""
if should_reply:
actual_content = self._extract_content(message, content)
# 保存到群组历史记录(所有消息都保存,不管是否回复)
# 但如果是 AutoReply 触发的,跳过保存(消息已经在正常流程中保存过了)
if is_group and not message.get('_auto_reply_triggered'):
await self._add_to_history(from_wxid, nickname, content, sender_wxid=user_wxid)
# mention 模式下,群聊里@机器人仅作为触发条件,不进入上下文,避免同一句话在上下文中出现两种形式(含@/不含@
trigger_mode = self.config.get("behavior", {}).get("trigger_mode", "mention")
history_content = content
if trigger_mode == "mention" and should_reply and actual_content:
history_content = actual_content
await self._add_to_history(from_wxid, nickname, history_content, sender_wxid=user_wxid)
# 如果不需要回复,直接返回
if not should_reply:
@@ -990,8 +1394,6 @@ class AIChat(PluginBase):
logger.warning(f"用户 {user_wxid} 触发限流,{reset_time}秒后重置")
return False
# 提取实际消息内容(去除@
actual_content = self._extract_content(message, content)
if not actual_content:
return
@@ -1004,6 +1406,10 @@ class AIChat(PluginBase):
if not message.get('_auto_reply_triggered'):
self._add_to_memory(chat_id, "user", actual_content)
# 群聊:消息已写入 history则不再重复附加到 LLM messages避免“同一句话发给AI两次”
history_enabled = bool(self.store) and self.config.get("history", {}).get("enabled", True)
append_user_message = not (is_group and history_enabled and not message.get('_auto_reply_triggered'))
# 调用 AI API带重试机制
max_retries = self.config.get("api", {}).get("max_retries", 2)
response = None
@@ -1011,7 +1417,16 @@ class AIChat(PluginBase):
for attempt in range(max_retries + 1):
try:
response = await self._call_ai_api(actual_content, bot, from_wxid, chat_id, nickname, user_wxid, is_group)
response = await self._call_ai_api(
actual_content,
bot,
from_wxid,
chat_id,
nickname,
user_wxid,
is_group,
append_user_message=append_user_message,
)
# 检查返回值:
# - None: 工具调用已异步处理,不需要重试
@@ -1040,15 +1455,19 @@ class AIChat(PluginBase):
# 发送回复并添加到记忆
# 注意:如果返回 None 或空字符串,说明已经以其他形式处理了,不需要再发送文本
if response:
await bot.send_text(from_wxid, response)
self._add_to_memory(chat_id, "assistant", response)
# 保存机器人回复到历史记录
if is_group:
with open("main_config.toml", "rb") as f:
main_config = tomllib.load(f)
bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人")
await self._add_to_history(from_wxid, bot_nickname, response, role="assistant")
logger.success(f"AI 回复成功: {response[:50]}...")
cleaned_response = self._sanitize_llm_output(response)
if cleaned_response:
await bot.send_text(from_wxid, cleaned_response)
self._add_to_memory(chat_id, "assistant", cleaned_response)
# 保存机器人回复到历史记录
if is_group:
with open("main_config.toml", "rb") as f:
main_config = tomllib.load(f)
bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人")
await self._add_to_history(from_wxid, bot_nickname, cleaned_response, role="assistant")
logger.success(f"AI 回复成功: {cleaned_response[:50]}...")
else:
logger.warning("AI 回复清洗后为空(可能只包含思维链/格式标记),已跳过发送")
else:
logger.info("AI 回复为空或已通过其他方式发送(如聊天记录)")
@@ -1134,7 +1553,18 @@ class AIChat(PluginBase):
return content.strip()
async def _call_ai_api(self, user_message: str, bot=None, from_wxid: str = None, chat_id: str = None, nickname: str = "", user_wxid: str = None, is_group: bool = False) -> str:
async def _call_ai_api(
self,
user_message: str,
bot=None,
from_wxid: str = None,
chat_id: str = None,
nickname: str = "",
user_wxid: str = None,
is_group: bool = False,
*,
append_user_message: bool = True,
) -> str:
"""调用 AI API"""
api_config = self.config["api"]
@@ -1191,7 +1621,8 @@ class AIChat(PluginBase):
messages.extend(memory_messages[:-1])
# 添加当前用户消息
messages.append({"role": "user", "content": f"[{nickname}] {user_message}" if is_group and nickname else user_message})
if append_user_message:
messages.append({"role": "user", "content": f"[{nickname}] {user_message}" if is_group and nickname else user_message})
payload = {
"model": api_config["model"],
@@ -1285,8 +1716,12 @@ class AIChat(PluginBase):
tool_call_hint_sent = True
# 只有当 AI 有文本输出时才发送
if full_content and full_content.strip():
logger.info(f"[流式] 检测到工具调用,先发送已有文本: {full_content[:30]}...")
await bot.send_text(from_wxid, full_content.strip())
preview = self._sanitize_llm_output(full_content)
if preview:
logger.info(f"[流式] 检测到工具调用,先发送已有文本: {preview[:30]}...")
await bot.send_text(from_wxid, preview)
else:
logger.info("[流式] 检测到工具调用,但文本清洗后为空(可能为思维链/无有效正文),跳过发送")
else:
# AI 没有输出文本,不发送默认提示
logger.info("[流式] 检测到工具调用AI 未输出文本")
@@ -1347,7 +1782,7 @@ class AIChat(PluginBase):
logger.warning("检测到模型输出了错误的工具调用格式,拦截并返回提示")
return "抱歉,我遇到了一些技术问题,请重新描述一下你的需求~"
return full_content.strip()
return self._sanitize_llm_output(full_content)
except aiohttp.ClientError as e:
logger.error(f"网络请求失败: {type(e).__name__}: {str(e)}")
raise Exception(f"网络请求失败: {str(e)}")
@@ -1578,6 +2013,11 @@ class AIChat(PluginBase):
if not tool_result:
continue
tool_message = self._sanitize_llm_output(tool_result.message) if tool_result.message is not None else ""
# 工具文本统一做一次输出清洗,避免工具内部/下游LLM把“思维链”发出来
tool_message = self._sanitize_llm_output(tool_result.message) if tool_result.message is not None else ""
if tool_result.success:
logger.success(f"[异步] 工具 {function_name} 执行成功")
else:
@@ -1588,25 +2028,32 @@ class AIChat(PluginBase):
need_ai_reply_results.append({
"tool_call_id": tool_call_id,
"function_name": function_name,
"result": tool_result.message
"result": tool_message
})
continue
# 工具成功且需要回文本时发送
if tool_result.success and not tool_result.already_sent and tool_result.message and not tool_result.no_reply:
if tool_result.send_result_text:
await bot.send_text(from_wxid, tool_result.message)
if tool_message:
await bot.send_text(from_wxid, tool_message)
else:
logger.warning(f"[异步] 工具 {function_name} 输出清洗后为空,已跳过发送")
# 工具失败默认回一条错误提示
if not tool_result.success and tool_result.message and not tool_result.no_reply:
try:
await bot.send_text(from_wxid, f"{tool_result.message}")
if tool_message:
await bot.send_text(from_wxid, f"{tool_message}")
else:
await bot.send_text(from_wxid, f"{function_name} 执行失败")
except Exception:
pass
# 保存工具结果到记忆(可选)
if tool_result.save_to_memory and chat_id:
self._add_to_memory(chat_id, "assistant", f"[工具 {function_name} 结果]: {tool_result.message}")
if tool_message:
self._add_to_memory(chat_id, "assistant", f"[工具 {function_name} 结果]: {tool_message}")
# 如果有需要 AI 回复的工具结果,调用 AI 继续对话
if need_ai_reply_results:
@@ -1733,12 +2180,16 @@ class AIChat(PluginBase):
# 发送 AI 的回复
if full_content.strip():
await bot.send_text(from_wxid, full_content.strip())
logger.success(f"[工具回传] AI 回复完成,长度: {len(full_content)}")
cleaned_content = self._sanitize_llm_output(full_content)
if cleaned_content:
await bot.send_text(from_wxid, cleaned_content)
logger.success(f"[工具回传] AI 回复完成,长度: {len(cleaned_content)}")
else:
logger.warning("[工具回传] AI 回复清洗后为空,已跳过发送")
# 保存到历史记录
if chat_id:
self._add_to_memory(chat_id, "assistant", full_content.strip())
if chat_id and cleaned_content:
self._add_to_memory(chat_id, "assistant", cleaned_content)
else:
logger.warning("[工具回传] AI 返回空内容")
@@ -1841,16 +2292,23 @@ class AIChat(PluginBase):
if tool_result.success and not tool_result.already_sent and tool_result.message and not tool_result.no_reply:
if tool_result.send_result_text:
await bot.send_text(from_wxid, tool_result.message)
if tool_message:
await bot.send_text(from_wxid, tool_message)
else:
logger.warning(f"[异步-图片] 工具 {function_name} 输出清洗后为空,已跳过发送")
if not tool_result.success and tool_result.message and not tool_result.no_reply:
try:
await bot.send_text(from_wxid, f"{tool_result.message}")
if tool_message:
await bot.send_text(from_wxid, f"{tool_message}")
else:
await bot.send_text(from_wxid, f"{function_name} 执行失败")
except Exception:
pass
if tool_result.save_to_memory and chat_id:
self._add_to_memory(chat_id, "assistant", f"[工具 {function_name} 结果]: {tool_result.message}")
if tool_message:
self._add_to_memory(chat_id, "assistant", f"[工具 {function_name} 结果]: {tool_message}")
logger.info(f"[异步-图片] 所有工具执行完成")
@@ -2061,20 +2519,36 @@ class AIChat(PluginBase):
await self._add_to_history(from_wxid, nickname, title_text, image_base64=image_base64)
# 调用AI API带图片
response = await self._call_ai_api_with_image(title_text, image_base64, bot, from_wxid, chat_id, nickname, user_wxid, is_group)
history_enabled = bool(self.store) and self.config.get("history", {}).get("enabled", True)
append_user_message = not (is_group and history_enabled)
response = await self._call_ai_api_with_image(
title_text,
image_base64,
bot,
from_wxid,
chat_id,
nickname,
user_wxid,
is_group,
append_user_message=append_user_message,
)
if response:
await bot.send_text(from_wxid, response)
self._add_to_memory(chat_id, "assistant", response)
# 保存机器人回复到历史记录
if is_group:
import tomllib
with open("main_config.toml", "rb") as f:
main_config = tomllib.load(f)
bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人")
await self._add_to_history(from_wxid, bot_nickname, response, role="assistant")
logger.success(f"AI回复成功: {response[:50]}...")
cleaned_response = self._sanitize_llm_output(response)
if cleaned_response:
await bot.send_text(from_wxid, cleaned_response)
self._add_to_memory(chat_id, "assistant", cleaned_response)
# 保存机器人回复到历史记录
if is_group:
import tomllib
with open("main_config.toml", "rb") as f:
main_config = tomllib.load(f)
bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人")
await self._add_to_history(from_wxid, bot_nickname, cleaned_response, role="assistant")
logger.success(f"AI回复成功: {cleaned_response[:50]}...")
else:
logger.warning("AI 回复清洗后为空,已跳过发送")
return False
except Exception as e:
@@ -2182,16 +2656,20 @@ class AIChat(PluginBase):
response = await self._call_ai_api(combined_message, bot, from_wxid, chat_id, nickname, user_wxid, is_group)
if response:
await bot.send_text(from_wxid, response)
self._add_to_memory(chat_id, "assistant", response)
# 保存机器人回复到历史记录
if is_group:
import tomllib
with open("main_config.toml", "rb") as f:
main_config = tomllib.load(f)
bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人")
await self._add_to_history(from_wxid, bot_nickname, response, role="assistant")
logger.success(f"[聊天记录] AI 回复成功: {response[:50]}...")
cleaned_response = self._sanitize_llm_output(response)
if cleaned_response:
await bot.send_text(from_wxid, cleaned_response)
self._add_to_memory(chat_id, "assistant", cleaned_response)
# 保存机器人回复到历史记录
if is_group:
import tomllib
with open("main_config.toml", "rb") as f:
main_config = tomllib.load(f)
bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人")
await self._add_to_history(from_wxid, bot_nickname, cleaned_response, role="assistant")
logger.success(f"[聊天记录] AI 回复成功: {cleaned_response[:50]}...")
else:
logger.warning("[聊天记录] AI 回复清洗后为空,已跳过发送")
else:
await bot.send_text(from_wxid, "❌ AI 回复生成失败")
@@ -2268,16 +2746,20 @@ class AIChat(PluginBase):
response = await self._call_ai_api(combined_message, bot, from_wxid, chat_id, nickname, user_wxid, is_group)
if response:
await bot.send_text(from_wxid, response)
self._add_to_memory(chat_id, "assistant", response)
# 保存机器人回复到历史记录
if is_group:
import tomllib
with open("main_config.toml", "rb") as f:
main_config = tomllib.load(f)
bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人")
await self._add_to_history(from_wxid, bot_nickname, response, role="assistant")
logger.success(f"[视频识别] 主AI回复成功: {response[:50]}...")
cleaned_response = self._sanitize_llm_output(response)
if cleaned_response:
await bot.send_text(from_wxid, cleaned_response)
self._add_to_memory(chat_id, "assistant", cleaned_response)
# 保存机器人回复到历史记录
if is_group:
import tomllib
with open("main_config.toml", "rb") as f:
main_config = tomllib.load(f)
bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人")
await self._add_to_history(from_wxid, bot_nickname, cleaned_response, role="assistant")
logger.success(f"[视频识别] 主AI回复成功: {cleaned_response[:50]}...")
else:
logger.warning("[视频识别] 主AI回复清洗后为空已跳过发送")
else:
await bot.send_text(from_wxid, "❌ AI 回复生成失败")
@@ -2388,7 +2870,7 @@ class AIChat(PluginBase):
if "text" in part:
text = part["text"]
logger.info(f"[视频AI] 分析完成,长度: {len(text)}")
return text
return self._sanitize_llm_output(text)
# 记录失败原因
if "usageMetadata" in result:
@@ -2697,7 +3179,7 @@ class AIChat(PluginBase):
video_config
)
return full_content.strip()
return self._sanitize_llm_output(full_content)
except Exception as e:
logger.error(f"[视频识别] API 调用失败: {e}")
@@ -2761,7 +3243,7 @@ class AIChat(PluginBase):
if "text" in part:
text = part["text"]
logger.info(f"[视频识别-简化] 成功,长度: {len(text)}")
return text
return self._sanitize_llm_output(text)
logger.error(f"[视频识别-简化] 仍然没有 candidates: {str(result)[:300]}")
return ""
@@ -2809,7 +3291,19 @@ class AIChat(PluginBase):
return False
async def _call_ai_api_with_image(self, user_message: str, image_base64: str, bot=None, from_wxid: str = None, chat_id: str = None, nickname: str = "", user_wxid: str = None, is_group: bool = False) -> str:
async def _call_ai_api_with_image(
self,
user_message: str,
image_base64: str,
bot=None,
from_wxid: str = None,
chat_id: str = None,
nickname: str = "",
user_wxid: str = None,
is_group: bool = False,
*,
append_user_message: bool = True,
) -> str:
"""调用AI API带图片"""
api_config = self.config["api"]
tools = self._collect_tools()
@@ -2854,14 +3348,15 @@ class AIChat(PluginBase):
messages.extend(memory_messages[:-1])
# 添加当前用户消息(带图片)
text_value = f"[{nickname}] {user_message}" if is_group and nickname else user_message
messages.append({
"role": "user",
"content": [
{"type": "text", "text": text_value},
{"type": "image_url", "image_url": {"url": image_base64}}
]
})
if append_user_message:
text_value = f"[{nickname}] {user_message}" if is_group and nickname else user_message
messages.append({
"role": "user",
"content": [
{"type": "text", "text": text_value},
{"type": "image_url", "image_url": {"url": image_base64}}
]
})
payload = {
"model": api_config["model"],
@@ -2943,8 +3438,12 @@ class AIChat(PluginBase):
if not tool_call_hint_sent and bot and from_wxid:
tool_call_hint_sent = True
if full_content and full_content.strip():
logger.info(f"[流式-图片] 检测到工具调用,先发送已有文本")
await bot.send_text(from_wxid, full_content.strip())
preview = self._sanitize_llm_output(full_content)
if preview:
logger.info("[流式-图片] 检测到工具调用,先发送已有文本")
await bot.send_text(from_wxid, preview)
else:
logger.info("[流式-图片] 检测到工具调用,但文本清洗后为空(可能为思维链/无有效正文),跳过发送")
else:
logger.info("[流式-图片] 检测到工具调用AI 未输出文本")
@@ -3001,7 +3500,7 @@ class AIChat(PluginBase):
logger.warning("检测到模型输出了错误的工具调用格式,拦截并返回提示")
return "抱歉,我遇到了一些技术问题,请重新描述一下你的需求~"
return full_content.strip()
return self._sanitize_llm_output(full_content)
except Exception as e:
logger.error(f"调用AI API失败: {e}")
@@ -3212,8 +3711,9 @@ class AIChat(PluginBase):
description = await self._generate_image_description(image_base64, description_prompt, image_desc_config)
if description:
await self._update_history_by_id(from_wxid, placeholder_id, f"[图片: {description}]")
logger.success(f"已更新图片描述: {nickname} - {description[:30]}...")
cleaned_description = self._sanitize_llm_output(description)
await self._update_history_by_id(from_wxid, placeholder_id, f"[图片: {cleaned_description}]")
logger.success(f"已更新图片描述: {nickname} - {cleaned_description[:30]}...")
else:
await self._update_history_by_id(from_wxid, placeholder_id, "[图片]")
logger.warning(f"图片描述生成失败")