diff --git a/plugins/AIChat/data/persistent_memory.db b/plugins/AIChat/data/persistent_memory.db new file mode 100644 index 0000000..8a31622 Binary files /dev/null and b/plugins/AIChat/data/persistent_memory.db differ diff --git a/plugins/AIChat/main.py b/plugins/AIChat/main.py index 4acb797..5986cdd 100644 --- a/plugins/AIChat/main.py +++ b/plugins/AIChat/main.py @@ -488,7 +488,7 @@ class AIChat(PluginBase): async def _generate_image_description(self, image_base64: str, prompt: str, config: dict) -> str: """ - 使用 Gemini API 生成图片描述 + 使用 AI 生成图片描述 Args: image_base64: 图片的 base64 数据 @@ -498,39 +498,26 @@ class AIChat(PluginBase): Returns: 图片描述文本,失败返回空字符串 """ - import json try: api_config = self.config["api"] description_model = config.get("model", api_config["model"]) - api_url = api_config.get("gemini_url", "https://api.functen.cn/v1beta/models") - # 处理 base64 数据 - image_data = image_base64 - mime_type = "image/jpeg" - if image_data.startswith("data:"): - mime_type = image_data.split(";")[0].split(":")[1] - image_data = image_data.split(",", 1)[1] - - # 构建 Gemini 格式请求 - full_url = f"{api_url}/{description_model}:streamGenerateContent?alt=sse" + # 构建消息 + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + {"type": "image_url", "image_url": {"url": image_base64}} + ] + } + ] payload = { - "contents": [ - { - "parts": [ - {"text": prompt}, - { - "inline_data": { - "mime_type": mime_type, - "data": image_data - } - } - ] - } - ], - "generationConfig": { - "maxOutputTokens": config.get("max_tokens", 1000) - } + "model": description_model, + "messages": messages, + "max_tokens": config.get("max_tokens", 1000), + "stream": True } headers = { @@ -538,12 +525,12 @@ class AIChat(PluginBase): "Authorization": f"Bearer {api_config['api_key']}" } - timeout = aiohttp.ClientTimeout(total=api_config.get("timeout", 120)) + timeout = aiohttp.ClientTimeout(total=api_config["timeout"]) # 配置代理 connector = None proxy_config = self.config.get("proxy", {}) - if proxy_config.get("enabled", False) and PROXY_SUPPORT: + if proxy_config.get("enabled", False): proxy_type = proxy_config.get("type", "socks5").upper() proxy_host = proxy_config.get("host", "127.0.0.1") proxy_port = proxy_config.get("port", 7890) @@ -555,37 +542,43 @@ class AIChat(PluginBase): else: proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}" - try: - connector = ProxyConnector.from_url(proxy_url) - except Exception as e: - logger.warning(f"代理配置失败,将直连: {e}") + if PROXY_SUPPORT: + try: + connector = ProxyConnector.from_url(proxy_url) + except Exception as e: + logger.warning(f"代理配置失败,将直连: {e}") + connector = None async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session: - async with session.post(full_url, json=payload, headers=headers) as resp: + async with session.post( + api_config["url"], + json=payload, + headers=headers + ) as resp: if resp.status != 200: error_text = await resp.text() logger.error(f"图片描述 API 返回错误: {resp.status}, {error_text[:200]}") return "" - # 流式接收 Gemini 响应 + # 流式接收响应 + import json description = "" async for line in resp.content: line = line.decode('utf-8').strip() - if not line or not line.startswith("data: "): + if not line or line == "data: [DONE]": continue - try: - data = json.loads(line[6:]) - candidates = data.get("candidates", []) - if candidates: - parts = candidates[0].get("content", {}).get("parts", []) - for part in parts: - if "text" in part: - description += part["text"] - except: - pass + if line.startswith("data: "): + try: + data = json.loads(line[6:]) + delta = data.get("choices", [{}])[0].get("delta", {}) + content = delta.get("content", "") + if content: + description += content + except: + pass - logger.debug(f"图片描述生成成功: {description[:100]}...") + logger.debug(f"图片描述生成成功: {description}") return description.strip() except Exception as e: @@ -627,617 +620,6 @@ class AIChat(PluginBase): return tools - # ==================== Gemini API 格式转换方法 ==================== - - def _convert_tools_to_gemini(self, openai_tools: list) -> list: - """ - 将 OpenAI 格式的工具定义转换为 Gemini 格式 - - OpenAI: [{"type": "function", "function": {"name": ..., "parameters": ...}}] - Gemini: [{"function_declarations": [{"name": ..., "parameters": ...}]}] - """ - if not openai_tools: - return [] - - function_declarations = [] - for tool in openai_tools: - if tool.get("type") == "function": - func = tool.get("function", {}) - function_declarations.append({ - "name": func.get("name", ""), - "description": func.get("description", ""), - "parameters": func.get("parameters", {}) - }) - - if function_declarations: - return [{"function_declarations": function_declarations}] - return [] - - def _build_gemini_contents(self, system_content: str, history_messages: list, - current_message: dict, is_group: bool = False) -> list: - """ - 构建 Gemini API 的 contents 格式 - - Args: - system_content: 系统提示词(包含人设、时间、持久记忆等) - history_messages: 历史消息列表 - current_message: 当前用户消息 {"text": str, "media": optional} - is_group: 是否群聊 - - Returns: - Gemini contents 格式的列表 - """ - contents = [] - - # Gemini 没有 system role,将系统提示放在第一条 user 消息中 - # 然后用一条简短的 model 回复来"确认" - system_parts = [{"text": f"[系统指令]\n{system_content}\n\n请按照以上指令进行对话。"}] - contents.append({"role": "user", "parts": system_parts}) - contents.append({"role": "model", "parts": [{"text": "好的,我会按照指令进行对话。"}]}) - - # 添加历史消息 - for msg in history_messages: - gemini_msg = self._convert_message_to_gemini(msg, is_group) - if gemini_msg: - contents.append(gemini_msg) - - # 添加当前用户消息 - current_parts = [] - if current_message.get("text"): - current_parts.append({"text": current_message["text"]}) - - # 添加媒体内容(图片/视频) - if current_message.get("image_base64"): - image_data = current_message["image_base64"] - # 去除 data:image/xxx;base64, 前缀 - if image_data.startswith("data:"): - mime_type = image_data.split(";")[0].split(":")[1] - image_data = image_data.split(",", 1)[1] - else: - mime_type = "image/jpeg" - current_parts.append({ - "inline_data": { - "mime_type": mime_type, - "data": image_data - } - }) - - if current_message.get("video_base64"): - video_data = current_message["video_base64"] - # 去除 data:video/xxx;base64, 前缀 - if video_data.startswith("data:"): - video_data = video_data.split(",", 1)[1] - current_parts.append({ - "inline_data": { - "mime_type": "video/mp4", - "data": video_data - } - }) - - if current_parts: - contents.append({"role": "user", "parts": current_parts}) - - return contents - - def _convert_message_to_gemini(self, msg: dict, is_group: bool = False) -> dict: - """ - 将单条历史消息转换为 Gemini 格式 - - 支持的输入格式: - 1. 群聊历史: {"nickname": str, "content": str|list} - 2. 私聊记忆: {"role": "user"|"assistant", "content": str|list} - """ - parts = [] - - # 群聊历史格式 - if "nickname" in msg: - nickname = msg.get("nickname", "") - content = msg.get("content", "") - - if isinstance(content, list): - # 多模态内容 - for item in content: - if item.get("type") == "text": - text = item.get("text", "") - parts.append({"text": f"[{nickname}] {text}" if nickname else text}) - elif item.get("type") == "image_url": - image_url = item.get("image_url", {}).get("url", "") - if image_url.startswith("data:"): - mime_type = image_url.split(";")[0].split(":")[1] - image_data = image_url.split(",", 1)[1] - parts.append({ - "inline_data": { - "mime_type": mime_type, - "data": image_data - } - }) - else: - # 纯文本 - parts.append({"text": f"[{nickname}] {content}" if nickname else content}) - - # 群聊历史都作为 user 消息(因为是多人对话记录) - return {"role": "user", "parts": parts} if parts else None - - # 私聊记忆格式 - elif "role" in msg: - role = msg.get("role", "user") - content = msg.get("content", "") - - # 转换角色名 - gemini_role = "model" if role == "assistant" else "user" - - if isinstance(content, list): - for item in content: - if item.get("type") == "text": - parts.append({"text": item.get("text", "")}) - elif item.get("type") == "image_url": - image_url = item.get("image_url", {}).get("url", "") - if image_url.startswith("data:"): - mime_type = image_url.split(";")[0].split(":")[1] - image_data = image_url.split(",", 1)[1] - parts.append({ - "inline_data": { - "mime_type": mime_type, - "data": image_data - } - }) - else: - parts.append({"text": content}) - - return {"role": gemini_role, "parts": parts} if parts else None - - return None - - def _parse_gemini_tool_calls(self, response_parts: list) -> list: - """ - 从 Gemini 响应中解析工具调用 - - Gemini 格式: {"functionCall": {"name": "...", "args": {...}}} - 转换为内部格式: {"id": "...", "function": {"name": "...", "arguments": "..."}} - """ - import json - tool_calls = [] - for i, part in enumerate(response_parts): - if "functionCall" in part: - func_call = part["functionCall"] - tool_calls.append({ - "id": f"call_{uuid.uuid4().hex[:8]}", - "type": "function", - "function": { - "name": func_call.get("name", ""), - "arguments": json.dumps(func_call.get("args", {}), ensure_ascii=False) - } - }) - return tool_calls - - def _build_tool_response_contents(self, contents: list, tool_calls: list, - tool_results: list) -> list: - """ - 构建包含工具调用结果的 contents,用于继续对话 - - Args: - contents: 原始 contents - tool_calls: 工具调用列表 - tool_results: 工具执行结果列表 - """ - import json - new_contents = contents.copy() - - # 添加 model 的工具调用响应 - function_call_parts = [] - for tc in tool_calls: - function_call_parts.append({ - "functionCall": { - "name": tc["function"]["name"], - "args": json.loads(tc["function"]["arguments"]) - } - }) - if function_call_parts: - new_contents.append({"role": "model", "parts": function_call_parts}) - - # 添加工具执行结果 - function_response_parts = [] - for i, result in enumerate(tool_results): - tool_name = tool_calls[i]["function"]["name"] if i < len(tool_calls) else "unknown" - function_response_parts.append({ - "functionResponse": { - "name": tool_name, - "response": {"result": result.get("message", str(result))} - } - }) - if function_response_parts: - new_contents.append({"role": "user", "parts": function_response_parts}) - - return new_contents - - # ==================== 统一的 Gemini API 调用 ==================== - - async def _call_gemini_api(self, contents: list, tools: list = None, - bot=None, from_wxid: str = None, - chat_id: str = None, nickname: str = "", - user_wxid: str = None, is_group: bool = False) -> tuple: - """ - 统一的 Gemini API 调用方法 - - Args: - contents: Gemini 格式的对话内容 - tools: Gemini 格式的工具定义(可选) - bot: WechatHookClient 实例 - from_wxid: 消息来源 - chat_id: 会话ID - nickname: 用户昵称 - user_wxid: 用户wxid - is_group: 是否群聊 - - Returns: - (response_text, tool_calls) - 响应文本和工具调用列表 - """ - import json - - api_config = self.config["api"] - model = api_config["model"] - api_url = api_config.get("gemini_url", api_config.get("url", "").replace("/v1/chat/completions", "/v1beta/models")) - api_key = api_config["api_key"] - - # 构建完整 URL - full_url = f"{api_url}/{model}:streamGenerateContent?alt=sse" - - # 构建请求体 - payload = { - "contents": contents, - "generationConfig": { - "maxOutputTokens": api_config.get("max_tokens", 8192) - } - } - - if tools: - payload["tools"] = tools - - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {api_key}" - } - - timeout = aiohttp.ClientTimeout(total=api_config.get("timeout", 120)) - - # 配置代理 - connector = None - proxy_config = self.config.get("proxy", {}) - if proxy_config.get("enabled", False) and PROXY_SUPPORT: - proxy_type = proxy_config.get("type", "socks5").upper() - proxy_host = proxy_config.get("host", "127.0.0.1") - proxy_port = proxy_config.get("port", 7890) - proxy_username = proxy_config.get("username") - proxy_password = proxy_config.get("password") - - if proxy_username and proxy_password: - proxy_url = f"{proxy_type}://{proxy_username}:{proxy_password}@{proxy_host}:{proxy_port}" - else: - proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}" - - try: - connector = ProxyConnector.from_url(proxy_url) - logger.debug(f"[Gemini] 使用代理: {proxy_type}://{proxy_host}:{proxy_port}") - except Exception as e: - logger.warning(f"[Gemini] 代理配置失败: {e}") - - # 保存用户信息供工具调用使用 - self._current_user_wxid = user_wxid - self._current_is_group = is_group - - try: - async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session: - logger.debug(f"[Gemini] 发送流式请求: {full_url}") - async with session.post(full_url, json=payload, headers=headers) as resp: - if resp.status != 200: - error_text = await resp.text() - logger.error(f"[Gemini] API 错误: {resp.status}, {error_text[:500]}") - raise Exception(f"Gemini API 错误 {resp.status}: {error_text[:200]}") - - # 流式接收响应 - full_text = "" - all_parts = [] - tool_call_hint_sent = False - - async for line in resp.content: - line = line.decode('utf-8').strip() - if not line or not line.startswith("data: "): - continue - - try: - data = json.loads(line[6:]) - candidates = data.get("candidates", []) - if not candidates: - continue - - content = candidates[0].get("content", ) - parts = content.get("parts", []) - - for part in parts: - all_parts.append(part) - - # 收集文本 - if "text" in part: - full_text += part["text"] - - # 检测到工具调用时,先发送已有文本 - if "functionCall" in part: - if not tool_call_hint_sent and bot and from_wxid: - tool_call_hint_sent = True - if full_text.strip(): - logger.info(f"[Gemini] 检测到工具调用,先发送文本: {full_text[:30]}...") - await bot.send_text(from_wxid, full_text.strip()) - - except json.JSONDecodeError: - continue - - # 解析工具调用 - tool_calls = self._parse_gemini_tool_calls(all_parts) - - logger.info(f"[Gemini] 响应完成, 文本长度: {len(full_text)}, 工具调用: {len(tool_calls)}") - - return full_text.strip(), tool_calls - - except aiohttp.ClientError as e: - logger.error(f"[Gemini] 网络请求失败: {e}") - raise - except asyncio.TimeoutError: - logger.error(f"[Gemini] 请求超时") - raise - - async def _handle_gemini_response(self, response_text: str, tool_calls: list, - contents: list, tools: list, - bot, from_wxid: str, chat_id: str, - nickname: str, user_wxid: str, is_group: bool): - """ - 处理 Gemini API 响应,包括工具调用 - - Args: - response_text: AI 响应文本 - tool_calls: 工具调用列表 - contents: 原始 contents(用于工具调用后继续对话) - tools: 工具定义 - bot, from_wxid, chat_id, nickname, user_wxid, is_group: 上下文信息 - """ - if tool_calls: - # 有工具调用,异步执行 - logger.info(f"[Gemini] 启动异步工具执行,共 {len(tool_calls)} 个工具") - asyncio.create_task( - self._execute_gemini_tools_async( - tool_calls, contents, tools, - bot, from_wxid, chat_id, nickname, user_wxid, is_group - ) - ) - return None # 工具调用异步处理 - - return response_text - - async def _execute_gemini_tools_async(self, tool_calls: list, contents: list, tools: list, - bot, from_wxid: str, chat_id: str, - nickname: str, user_wxid: str, is_group: bool): - """ - 异步执行 Gemini 工具调用 - """ - import json - - try: - logger.info(f"[Gemini] 开始执行 {len(tool_calls)} 个工具") - - # 收集需要 AI 回复的工具结果 - need_ai_reply_results = [] - tool_results = [] - - for tool_call in tool_calls: - function_name = tool_call["function"]["name"] - try: - arguments = json.loads(tool_call["function"]["arguments"]) - except: - arguments = {} - - logger.info(f"[Gemini] 执行工具: {function_name}, 参数: {arguments}") - - result = await self._execute_tool_and_get_result(function_name, arguments, bot, from_wxid) - tool_results.append(result) - - if result and result.get("success"): - logger.success(f"[Gemini] 工具 {function_name} 执行成功") - - # 检查是否需要 AI 继续回复 - if result.get("need_ai_reply"): - need_ai_reply_results.append({ - "tool_call": tool_call, - "result": result - }) - elif not result.get("already_sent") and result.get("message"): - if result.get("send_result_text"): - await bot.send_text(from_wxid, result["message"]) - else: - logger.warning(f"[Gemini] 工具 {function_name} 执行失败: {result}") - if result and result.get("message"): - await bot.send_text(from_wxid, f"❌ {result['message']}") - - # 如果有需要 AI 回复的工具结果,继续对话 - if need_ai_reply_results: - await self._continue_gemini_with_tool_results( - contents, tools, tool_calls, tool_results, - bot, from_wxid, chat_id, nickname, user_wxid, is_group - ) - - logger.info("[Gemini] 所有工具执行完成") - - except Exception as e: - logger.error(f"[Gemini] 工具执行异常: {e}") - import traceback - logger.error(traceback.format_exc()) - try: - await bot.send_text(from_wxid, "❌ 工具执行出错") - except: - pass - - async def _continue_gemini_with_tool_results(self, contents: list, tools: list, - tool_calls: list, tool_results: list, - bot, from_wxid: str, chat_id: str, - nickname: str, user_wxid: str, is_group: bool): - """ - 基于工具结果继续 Gemini 对话 - """ - try: - # 构建包含工具结果的新 contents - new_contents = self._build_tool_response_contents(contents, tool_calls, tool_results) - - # 继续调用 API(不带工具,避免循环调用) - response_text, new_tool_calls = await self._call_gemini_api( - new_contents, tools=None, - bot=bot, from_wxid=from_wxid, chat_id=chat_id, - nickname=nickname, user_wxid=user_wxid, is_group=is_group - ) - - if response_text: - await bot.send_text(from_wxid, response_text) - logger.success(f"[Gemini] 工具回传后 AI 回复: {response_text[:50]}...") - - # 保存到记忆 - if chat_id: - self._add_to_memory(chat_id, "assistant", response_text) - if is_group: - import tomllib - with open("main_config.toml", "rb") as f: - main_config = tomllib.load(f) - bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人") - await self._add_to_history(from_wxid, bot_nickname, response_text) - - except Exception as e: - logger.error(f"[Gemini] 工具回传后继续对话失败: {e}") - import traceback - logger.error(traceback.format_exc()) - - async def _process_with_gemini(self, text: str = "", image_base64: str = None, - video_base64: str = None, bot=None, - from_wxid: str = None, chat_id: str = None, - nickname: str = "", user_wxid: str = None, - is_group: bool = False) -> str: - """ - 统一的 Gemini 消息处理入口 - - 支持:纯文本、图片+文本、视频+文本 - - Args: - text: 用户消息文本 - image_base64: 图片 base64(可选) - video_base64: 视频 base64(可选) - bot, from_wxid, chat_id, nickname, user_wxid, is_group: 上下文信息 - - Returns: - AI 响应文本,如果是工具调用则返回 None - """ - import json - - # 1. 构建系统提示词 - system_content = self._build_system_content(nickname, from_wxid, user_wxid, is_group) - - # 2. 加载历史消息 - history_messages = [] - if is_group and from_wxid: - history = await self._load_history(from_wxid) - max_context = self.config.get("history", {}).get("max_context", 50) - history_messages = history[-max_context:] if len(history) > max_context else history - elif chat_id: - memory_messages = self._get_memory_messages(chat_id) - if memory_messages and len(memory_messages) > 1: - history_messages = memory_messages[:-1] # 排除刚添加的当前消息 - - # 3. 构建当前消息 - current_message = {"text": f"[{nickname}] {text}" if is_group and nickname else text} - if image_base64: - current_message["image_base64"] = image_base64 - if video_base64: - current_message["video_base64"] = video_base64 - - # 4. 构建 Gemini contents - contents = self._build_gemini_contents(system_content, history_messages, current_message, is_group) - - # 5. 收集并转换工具 - openai_tools = self._collect_tools() - gemini_tools = self._convert_tools_to_gemini(openai_tools) - - if gemini_tools: - logger.info(f"[Gemini] 已加载 {len(openai_tools)} 个工具") - - # 6. 调用 Gemini API(带重试) - max_retries = self.config.get("api", {}).get("max_retries", 2) - last_error = None - - for attempt in range(max_retries + 1): - try: - response_text, tool_calls = await self._call_gemini_api( - contents=contents, - tools=gemini_tools if gemini_tools else None, - bot=bot, - from_wxid=from_wxid, - chat_id=chat_id, - nickname=nickname, - user_wxid=user_wxid, - is_group=is_group - ) - - # 处理工具调用 - if tool_calls: - result = await self._handle_gemini_response( - response_text, tool_calls, contents, gemini_tools, - bot, from_wxid, chat_id, nickname, user_wxid, is_group - ) - return result # None 表示工具调用已异步处理 - - # 检查空响应 - if not response_text and attempt < max_retries: - logger.warning(f"[Gemini] 返回空内容,重试 {attempt + 1}/{max_retries}") - await asyncio.sleep(1) - continue - - return response_text - - except Exception as e: - last_error = e - if attempt < max_retries: - logger.warning(f"[Gemini] API 调用失败,重试 {attempt + 1}/{max_retries}: {e}") - await asyncio.sleep(1) - else: - raise - - return "" - - def _build_system_content(self, nickname: str, from_wxid: str, - user_wxid: str, is_group: bool) -> str: - """构建系统提示词(包含人设、时间、持久记忆等)""" - system_content = self.system_prompt - - # 添加当前时间 - current_time = datetime.now() - weekday_map = { - 0: "星期一", 1: "星期二", 2: "星期三", 3: "星期四", - 4: "星期五", 5: "星期六", 6: "星期日" - } - weekday = weekday_map[current_time.weekday()] - time_str = current_time.strftime(f"%Y年%m月%d日 %H:%M:%S {weekday}") - system_content += f"\n\n当前时间:{time_str}" - - if nickname: - system_content += f"\n当前对话用户的昵称是:{nickname}" - - # 加载持久记忆 - memory_chat_id = from_wxid if is_group else user_wxid - if memory_chat_id: - persistent_memories = self._get_persistent_memories(memory_chat_id) - if persistent_memories: - system_content += "\n\n【持久记忆】以下是用户要求你记住的重要信息:\n" - for m in persistent_memories: - mem_time = m['time'][:10] if m['time'] else "" - system_content += f"- [{mem_time}] {m['nickname']}: {m['content']}\n" - - return system_content - - # ==================== 结束 Gemini API 方法 ==================== - async def _handle_list_prompts(self, bot, from_wxid: str): """处理人设列表指令""" try: @@ -1600,19 +982,41 @@ class AIChat(PluginBase): chat_id = self._get_chat_id(from_wxid, user_wxid, is_group) self._add_to_memory(chat_id, "user", actual_content) - # 使用统一的 Gemini API 处理消息 - response = await self._process_with_gemini( - text=actual_content, - bot=bot, - from_wxid=from_wxid, - chat_id=chat_id, - nickname=nickname, - user_wxid=user_wxid, - is_group=is_group - ) + # 调用 AI API(带重试机制) + max_retries = self.config.get("api", {}).get("max_retries", 2) + response = None + last_error = None + + for attempt in range(max_retries + 1): + try: + response = await self._call_ai_api(actual_content, bot, from_wxid, chat_id, nickname, user_wxid, is_group) + + # 检查返回值: + # - None: 工具调用已异步处理,不需要重试 + # - "": 真正的空响应,需要重试 + # - 有内容: 正常响应 + if response is None: + # 工具调用,不重试 + logger.info("AI 触发工具调用,已异步处理") + break + + if response == "" and attempt < max_retries: + logger.warning(f"AI 返回空内容,重试 {attempt + 1}/{max_retries}") + await asyncio.sleep(1) # 等待1秒后重试 + continue + + break # 成功或已达到最大重试次数 + + except Exception as e: + last_error = e + if attempt < max_retries: + logger.warning(f"AI API 调用失败,重试 {attempt + 1}/{max_retries}: {e}") + await asyncio.sleep(1) + else: + raise # 发送回复并添加到记忆 - # 注意:如果返回 None 或空字符串,说明已经以其他形式处理了(如工具调用) + # 注意:如果返回 None 或空字符串,说明已经以其他形式处理了,不需要再发送文本 if response: await bot.send_text(from_wxid, response) self._add_to_memory(chat_id, "assistant", response) @@ -1624,7 +1028,7 @@ class AIChat(PluginBase): await self._add_to_history(from_wxid, bot_nickname, response) logger.success(f"AI 回复成功: {response[:50]}...") else: - logger.info("AI 回复为空或已通过其他方式发送(如工具调用)") + logger.info("AI 回复为空或已通过其他方式发送(如聊天记录)") except Exception as e: import traceback @@ -2523,8 +1927,19 @@ class AIChat(PluginBase): user_wxid = sender_wxid if is_group else from_wxid try: + # 群聊引用消息可能带有 "wxid:\n" 前缀,需要去除 + xml_content = content + if is_group and ":\n" in content: + # 查找 XML 声明或 标签的位置 + xml_start = content.find(" 0: + xml_content = content[xml_start:] + logger.debug(f"去除引用消息前缀,原长度: {len(content)}, 新长度: {len(xml_content)}") + # 解析XML获取标题和引用消息 - root = ET.fromstring(content) + root = ET.fromstring(xml_content) title = root.find(".//title") if title is None or not title.text: logger.debug("引用消息没有标题,跳过") @@ -2590,18 +2005,51 @@ class AIChat(PluginBase): logger.debug("引用消息中没有 content") return True + # 检查被引用消息的类型 + # type=1: 纯文本,type=3: 图片,type=43: 视频,type=49: 应用消息(含聊天记录) + refer_type_elem = refermsg.find("type") + refer_type = int(refer_type_elem.text) if refer_type_elem is not None and refer_type_elem.text else 0 + logger.debug(f"被引用消息类型: {refer_type}") + + # 纯文本消息不需要处理(type=1) + if refer_type == 1: + logger.debug("引用的是纯文本消息,跳过") + return True + + # 只处理图片(3)、视频(43)、应用消息(49,含聊天记录) + if refer_type not in [3, 43, 49]: + logger.debug(f"引用的消息类型 {refer_type} 不支持处理") + return True + # 解码HTML实体 import html refer_xml = html.unescape(refer_content.text) - refer_root = ET.fromstring(refer_xml) + # 被引用消息的内容也可能带有 "wxid:\n" 前缀,需要去除 + if ":\n" in refer_xml: + xml_start = refer_xml.find(" 0: + refer_xml = refer_xml[xml_start:] + logger.debug(f"去除被引用消息前缀") + + # 尝试解析 XML + try: + refer_root = ET.fromstring(refer_xml) + except ET.ParseError as e: + logger.debug(f"被引用消息内容不是有效的 XML: {e}") + return True + + # 尝试提取聊天记录信息(type=19) + recorditem = refer_root.find(".//recorditem") # 尝试提取图片信息 img = refer_root.find(".//img") # 尝试提取视频信息 video = refer_root.find(".//videomsg") - if img is None and video is None: - logger.debug("引用的消息不是图片或视频") + if img is None and video is None and recorditem is None: + logger.debug("引用的消息不是图片、视频或聊天记录") return True # 检查是否应该回复(提前检查,避免下载后才发现不需要回复) @@ -2623,6 +2071,13 @@ class AIChat(PluginBase): nickname = await self._get_user_nickname(bot, from_wxid, user_wxid, is_group) chat_id = self._get_chat_id(from_wxid, user_wxid, is_group) + # 处理聊天记录消息(type=19) + if recorditem is not None: + return await self._handle_quote_chat_record( + bot, recorditem, title_text, from_wxid, user_wxid, + is_group, nickname, chat_id + ) + # 处理视频消息 if video is not None: return await self._handle_quote_video( @@ -2631,18 +2086,20 @@ class AIChat(PluginBase): ) # 处理图片消息 - cdnbigimgurl = img.get("cdnbigimgurl", "") - aeskey = img.get("aeskey", "") + # 按优先级尝试获取图片 URL:大图 > 中图 > 缩略图 + cdnurl = img.get("cdnbigimgurl", "") or img.get("cdnmidimgurl", "") or img.get("cdnthumburl", "") + # aeskey 也有多种可能的属性名 + aeskey = img.get("aeskey", "") or img.get("cdnthumbaeskey", "") - if not cdnbigimgurl or not aeskey: - logger.warning(f"图片信息不完整: cdnurl={bool(cdnbigimgurl)}, aeskey={bool(aeskey)}") + if not cdnurl or not aeskey: + logger.warning(f"图片信息不完整: cdnurl={bool(cdnurl)}, aeskey={bool(aeskey)}") return True logger.info(f"AI处理引用图片消息: {title_text[:50]}...") # 下载并编码图片 - logger.info(f"开始下载图片: {cdnbigimgurl[:50]}...") - image_base64 = await self._download_and_encode_image(bot, cdnbigimgurl, aeskey) + logger.info(f"开始下载图片: {cdnurl[:50]}...") + image_base64 = await self._download_and_encode_image(bot, cdnurl, aeskey) if not image_base64: logger.error("图片下载失败") await bot.send_text(from_wxid, "❌ 无法处理图片") @@ -2656,17 +2113,8 @@ class AIChat(PluginBase): if is_group: await self._add_to_history(from_wxid, nickname, title_text, image_base64=image_base64) - # 使用统一的 Gemini API 处理图片消息 - response = await self._process_with_gemini( - text=title_text, - image_base64=image_base64, - bot=bot, - from_wxid=from_wxid, - chat_id=chat_id, - nickname=nickname, - user_wxid=user_wxid, - is_group=is_group - ) + # 调用AI API(带图片) + response = await self._call_ai_api_with_image(title_text, image_base64, bot, from_wxid, chat_id, nickname, user_wxid, is_group) if response: await bot.send_text(from_wxid, response) @@ -2679,66 +2127,112 @@ class AIChat(PluginBase): bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人") await self._add_to_history(from_wxid, bot_nickname, response) logger.success(f"AI回复成功: {response[:50]}...") - + return False except Exception as e: logger.error(f"处理引用消息失败: {e}") return True - async def _handle_quote_video(self, bot, video_elem, title_text: str, from_wxid: str, - user_wxid: str, is_group: bool, nickname: str, chat_id: str): - """处理引用的视频消息 - 统一 Gemini API(直接处理视频)""" + async def _handle_quote_chat_record(self, bot, recorditem_elem, title_text: str, from_wxid: str, + user_wxid: str, is_group: bool, nickname: str, chat_id: str): + """处理引用的聊天记录消息(type=19)""" try: - # 提取视频 CDN 信息 - cdnvideourl = video_elem.get("cdnvideourl", "") - aeskey = video_elem.get("aeskey", "") + logger.info(f"[聊天记录] 处理引用的聊天记录: {title_text[:50]}...") - # 如果主要的CDN信息为空,尝试获取原始视频信息 - if not cdnvideourl or not aeskey: - cdnvideourl = video_elem.get("cdnrawvideourl", "") - aeskey = video_elem.get("cdnrawvideoaeskey", "") - - if not cdnvideourl or not aeskey: - logger.warning(f"[视频] 视频信息不完整: cdnurl={bool(cdnvideourl)}, aeskey={bool(aeskey)}") - await bot.send_text(from_wxid, "❌ 无法获取视频信息") + # recorditem 的内容是 CDATA,需要提取并解析 + record_text = recorditem_elem.text + if not record_text: + logger.warning("[聊天记录] recorditem 内容为空") + await bot.send_text(from_wxid, "❌ 无法读取聊天记录内容") return False - logger.info(f"[视频] 处理引用视频: {title_text[:50]}...") - - # 提示用户正在处理 - await bot.send_text(from_wxid, "🎬 正在分析视频,请稍候...") - - # 下载并编码视频 - video_base64 = await self._download_and_encode_video(bot, cdnvideourl, aeskey) - if not video_base64: - logger.error("[视频] 视频下载失败") - await bot.send_text(from_wxid, "❌ 视频下载失败") + # 解析 recordinfo XML + try: + record_root = ET.fromstring(record_text) + except ET.ParseError as e: + logger.error(f"[聊天记录] 解析 recordinfo 失败: {e}") + await bot.send_text(from_wxid, "❌ 聊天记录格式解析失败") return False - logger.info("[视频] 视频下载和编码成功") + # 提取聊天记录内容 + datalist = record_root.find(".//datalist") + chat_records = [] - # 用户问题 - user_question = title_text.strip() if title_text.strip() else "这个视频讲了什么?" + # 尝试从 datalist 解析完整消息 + if datalist is not None: + for dataitem in datalist.findall("dataitem"): + source_name = dataitem.find("sourcename") + source_time = dataitem.find("sourcetime") + data_desc = dataitem.find("datadesc") + + sender = source_name.text if source_name is not None and source_name.text else "未知" + time_str = source_time.text if source_time is not None and source_time.text else "" + content = data_desc.text if data_desc is not None and data_desc.text else "" + + if content: + chat_records.append({ + "sender": sender, + "time": time_str, + "content": content + }) + + # 如果 datalist 为空(引用消息的简化版本),尝试从 desc 获取摘要 + if not chat_records: + desc_elem = record_root.find(".//desc") + if desc_elem is not None and desc_elem.text: + # desc 格式通常是 "发送者: 内容\n发送者: 内容" + desc_text = desc_elem.text.strip() + logger.info(f"[聊天记录] 从 desc 获取摘要内容: {desc_text[:100]}...") + chat_records.append({ + "sender": "聊天记录摘要", + "time": "", + "content": desc_text + }) + + if not chat_records: + logger.warning("[聊天记录] 没有解析到任何消息") + await bot.send_text(from_wxid, "❌ 聊天记录中没有消息内容") + return False + + logger.info(f"[聊天记录] 解析到 {len(chat_records)} 条消息") + + # 构建聊天记录文本 + record_title = record_root.find(".//title") + title = record_title.text if record_title is not None and record_title.text else "聊天记录" + + chat_text = f"【{title}】\n\n" + for i, record in enumerate(chat_records, 1): + time_part = f" ({record['time']})" if record['time'] else "" + if record['sender'] == "聊天记录摘要": + # 摘要模式,直接显示内容 + chat_text += f"{record['content']}\n\n" + else: + chat_text += f"[{record['sender']}{time_part}]:\n{record['content']}\n\n" + + # 构造发送给 AI 的消息 + user_question = title_text.strip() if title_text.strip() else "请分析这段聊天记录" + # 去除 @ 部分 + if user_question.startswith("@"): + parts = user_question.split(maxsplit=1) + if len(parts) > 1: + user_question = parts[1].strip() + else: + user_question = "请分析这段聊天记录" + + combined_message = f"[用户发送了一段聊天记录,请阅读并回答问题]\n\n{chat_text}\n[用户的问题]: {user_question}" + + logger.info(f"[聊天记录] 发送给 AI,消息长度: {len(combined_message)}") # 添加到记忆 - self._add_to_memory(chat_id, "user", f"[发送了一个视频] {user_question}") + self._add_to_memory(chat_id, "user", combined_message) # 如果是群聊,添加到历史记录 if is_group: - await self._add_to_history(from_wxid, nickname, f"[发送了一个视频] {user_question}") + await self._add_to_history(from_wxid, nickname, f"[发送了聊天记录] {user_question}") - # 使用统一的 Gemini API 直接处理视频(不再需要两步架构) - response = await self._process_with_gemini( - text=user_question, - video_base64=video_base64, - bot=bot, - from_wxid=from_wxid, - chat_id=chat_id, - nickname=nickname, - user_wxid=user_wxid, - is_group=is_group - ) + # 调用 AI API + response = await self._call_ai_api(combined_message, bot, from_wxid, chat_id, nickname, user_wxid, is_group) if response: await bot.send_text(from_wxid, response) @@ -2750,7 +2244,93 @@ class AIChat(PluginBase): main_config = tomllib.load(f) bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人") await self._add_to_history(from_wxid, bot_nickname, response) - logger.success(f"[视频] AI回复成功: {response[:50]}...") + logger.success(f"[聊天记录] AI 回复成功: {response[:50]}...") + else: + await bot.send_text(from_wxid, "❌ AI 回复生成失败") + + return False + + except Exception as e: + logger.error(f"[聊天记录] 处理失败: {e}") + import traceback + logger.error(traceback.format_exc()) + await bot.send_text(from_wxid, "❌ 聊天记录处理出错") + return False + + async def _handle_quote_video(self, bot, video_elem, title_text: str, from_wxid: str, + user_wxid: str, is_group: bool, nickname: str, chat_id: str): + """处理引用的视频消息 - 双AI架构""" + try: + # 检查视频识别功能是否启用 + video_config = self.config.get("video_recognition", {}) + if not video_config.get("enabled", True): + logger.info("[视频识别] 功能未启用") + await bot.send_text(from_wxid, "❌ 视频识别功能未启用") + return False + + # 提取视频 CDN 信息 + cdnvideourl = video_elem.get("cdnvideourl", "") + aeskey = video_elem.get("aeskey", "") + + # 如果主要的CDN信息为空,尝试获取原始视频信息 + if not cdnvideourl or not aeskey: + cdnvideourl = video_elem.get("cdnrawvideourl", "") + aeskey = video_elem.get("cdnrawvideoaeskey", "") + + if not cdnvideourl or not aeskey: + logger.warning(f"[视频识别] 视频信息不完整: cdnurl={bool(cdnvideourl)}, aeskey={bool(aeskey)}") + await bot.send_text(from_wxid, "❌ 无法获取视频信息") + return False + + logger.info(f"[视频识别] 处理引用视频: {title_text[:50]}...") + + # 提示用户正在处理 + await bot.send_text(from_wxid, "🎬 正在分析视频,请稍候...") + + # 下载并编码视频 + video_base64 = await self._download_and_encode_video(bot, cdnvideourl, aeskey) + if not video_base64: + logger.error("[视频识别] 视频下载失败") + await bot.send_text(from_wxid, "❌ 视频下载失败") + return False + + logger.info("[视频识别] 视频下载和编码成功") + + # ========== 第一步:视频AI 分析视频内容 ========== + video_description = await self._analyze_video_content(video_base64, video_config) + if not video_description: + logger.error("[视频识别] 视频AI分析失败") + await bot.send_text(from_wxid, "❌ 视频分析失败") + return False + + logger.info(f"[视频识别] 视频AI分析完成: {video_description[:100]}...") + + # ========== 第二步:主AI 基于视频描述生成回复 ========== + # 构造包含视频描述的用户消息 + user_question = title_text.strip() if title_text.strip() else "这个视频讲了什么?" + combined_message = f"[用户发送了一个视频,以下是视频内容描述]\n{video_description}\n\n[用户的问题]\n{user_question}" + + # 添加到记忆(让主AI知道用户发了视频) + self._add_to_memory(chat_id, "user", combined_message) + + # 如果是群聊,添加到历史记录 + if is_group: + await self._add_to_history(from_wxid, nickname, f"[发送了一个视频] {user_question}") + + # 调用主AI生成回复(使用现有的 _call_ai_api 方法,继承完整上下文) + response = await self._call_ai_api(combined_message, chat_id, from_wxid, is_group, nickname) + + if response: + await bot.send_text(from_wxid, response) + self._add_to_memory(chat_id, "assistant", response) + # 保存机器人回复到历史记录 + if is_group: + import tomllib + with open("main_config.toml", "rb") as f: + main_config = tomllib.load(f) + bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人") + await self._add_to_history(from_wxid, bot_nickname, response) + logger.success(f"[视频识别] 主AI回复成功: {response[:50]}...") else: await bot.send_text(from_wxid, "❌ AI 回复生成失败") @@ -2813,51 +2393,79 @@ class AIChat(PluginBase): timeout = aiohttp.ClientTimeout(total=video_config.get("timeout", 360)) - logger.info(f"[视频AI] 开始分析视频...") + # 重试机制:对于 502/503/504 等临时性错误自动重试 + max_retries = 2 + retry_delay = 5 # 重试间隔(秒) - async with aiohttp.ClientSession(timeout=timeout) as session: - async with session.post(full_url, json=payload, headers=headers) as resp: - if resp.status != 200: - error_text = await resp.text() - logger.error(f"[视频AI] API 错误: {resp.status}, {error_text[:300]}") - return "" + for attempt in range(max_retries + 1): + try: + logger.info(f"[视频AI] 开始分析视频...{f' (重试 {attempt}/{max_retries})' if attempt > 0 else ''}") - result = await resp.json() - logger.info(f"[视频AI] API 响应 keys: {list(result.keys())}") + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.post(full_url, json=payload, headers=headers) as resp: + if resp.status in [502, 503, 504]: + error_text = await resp.text() + logger.warning(f"[视频AI] API 临时错误: {resp.status}, 将重试...") + if attempt < max_retries: + await asyncio.sleep(retry_delay) + continue + else: + logger.error(f"[视频AI] API 错误: {resp.status}, 已达最大重试次数") + return "" - # 检查安全过滤 - if "promptFeedback" in result: - feedback = result["promptFeedback"] - if feedback.get("blockReason"): - logger.warning(f"[视频AI] 内容被过滤: {feedback.get('blockReason')}") - return "" - - # 提取文本 - if "candidates" in result and result["candidates"]: - for candidate in result["candidates"]: - # 检查是否被安全过滤 - if candidate.get("finishReason") == "SAFETY": - logger.warning("[视频AI] 响应被安全过滤") + if resp.status != 200: + error_text = await resp.text() + logger.error(f"[视频AI] API 错误: {resp.status}, {error_text[:300]}") return "" - content = candidate.get("content", {}) - for part in content.get("parts", []): - if "text" in part: - text = part["text"] - logger.info(f"[视频AI] 分析完成,长度: {len(text)}") - return text + result = await resp.json() + logger.info(f"[视频AI] API 响应 keys: {list(result.keys())}") - # 记录失败原因 - if "usageMetadata" in result: - usage = result["usageMetadata"] - logger.warning(f"[视频AI] 无响应,Token: prompt={usage.get('promptTokenCount', 0)}") + # 检查安全过滤 + if "promptFeedback" in result: + feedback = result["promptFeedback"] + if feedback.get("blockReason"): + logger.warning(f"[视频AI] 内容被过滤: {feedback.get('blockReason')}") + return "" - logger.error(f"[视频AI] 没有有效响应: {str(result)[:300]}") + # 提取文本 + if "candidates" in result and result["candidates"]: + for candidate in result["candidates"]: + # 检查是否被安全过滤 + if candidate.get("finishReason") == "SAFETY": + logger.warning("[视频AI] 响应被安全过滤") + return "" + + content = candidate.get("content", {}) + for part in content.get("parts", []): + if "text" in part: + text = part["text"] + logger.info(f"[视频AI] 分析完成,长度: {len(text)}") + return text + + # 记录失败原因 + if "usageMetadata" in result: + usage = result["usageMetadata"] + logger.warning(f"[视频AI] 无响应,Token: prompt={usage.get('promptTokenCount', 0)}") + + logger.error(f"[视频AI] 没有有效响应: {str(result)[:300]}") + return "" + + except asyncio.TimeoutError: + logger.warning(f"[视频AI] 请求超时{f', 将重试...' if attempt < max_retries else ''}") + if attempt < max_retries: + await asyncio.sleep(retry_delay) + continue + return "" + except Exception as e: + logger.error(f"[视频AI] 分析失败: {e}") + import traceback + logger.error(traceback.format_exc()) return "" - except asyncio.TimeoutError: - logger.error(f"[视频AI] 请求超时") + # 循环结束仍未成功 return "" + except Exception as e: logger.error(f"[视频AI] 分析失败: {e}") import traceback diff --git a/plugins/AIChat_Gemini.zip b/plugins/AIChat_Gemini.zip new file mode 100644 index 0000000..5ddb892 Binary files /dev/null and b/plugins/AIChat_Gemini.zip differ diff --git a/plugins/NanoImage/main.py b/plugins/NanoImage/main.py index f3f93d6..5a45c43 100644 --- a/plugins/NanoImage/main.py +++ b/plugins/NanoImage/main.py @@ -95,6 +95,7 @@ class NanoImage(PluginBase): if response.status_code == 200: # 处理流式响应 image_url = None + image_base64 = None full_content = "" async for line in response.aiter_lines(): if line.startswith("data: "): @@ -106,33 +107,57 @@ class NanoImage(PluginBase): data = json.loads(data_str) if "choices" in data and data["choices"]: delta = data["choices"][0].get("delta", {}) + + # 方式1: 从 delta.images 中提取(新格式) + images = delta.get("images", []) + if images and len(images) > 0: + img_data = images[0].get("image_url", {}).get("url", "") + if img_data: + if img_data.startswith("data:image"): + # base64 格式 + image_base64 = img_data + logger.info(f"从 delta.images 提取到 base64 图片") + elif img_data.startswith("http"): + image_url = img_data + logger.info(f"从 delta.images 提取到图片URL: {image_url}") + + # 方式2: 从 content 中提取(旧格式) content = delta.get("content", "") if content: full_content += content if "http" in content: - # 提取图片URL import re urls = re.findall(r'https?://[^\s\)\]"\']+', content) if urls: image_url = urls[0].rstrip("'\"") - logger.info(f"提取到图片URL: {image_url}") + logger.info(f"从 content 提取到图片URL: {image_url}") except Exception as e: logger.warning(f"解析响应数据失败: {e}") continue # 如果没有从流中提取到URL,尝试从完整内容中提取 - if not image_url and full_content: + if not image_url and not image_base64 and full_content: import re urls = re.findall(r'https?://[^\s\)\]"\']+', full_content) if urls: image_url = urls[0].rstrip("'\"") logger.info(f"从完整内容提取到图片URL: {image_url}") - if not image_url: - logger.error(f"未能提取到图片URL,完整响应: {full_content[:500]}") + if not image_url and not image_base64: + logger.error(f"未能提取到图片,完整响应: {full_content[:500]}") + # 处理 base64 图片 + if image_base64: + image_path = await self._save_base64_image(image_base64) + if image_path: + logger.success("成功生成图像 (base64)") + return [image_path] + else: + logger.warning(f"base64图片保存失败,将重试 ({attempt + 1}/{max_retry})") + continue + + # 处理 URL 图片 if image_url: - # 下载图片 image_path = await self._download_image(image_url) if image_path: logger.success("成功生成图像") @@ -184,6 +209,48 @@ class NanoImage(PluginBase): logger.warning(f"读取代理配置失败: {e}") return None + async def _save_base64_image(self, base64_data: str) -> Optional[str]: + """保存 base64 图片到本地""" + try: + # 去除 data:image/xxx;base64, 前缀 + if base64_data.startswith("data:image"): + # 提取格式和数据 + header, data = base64_data.split(",", 1) + # 从 header 中提取格式,如 data:image/jpeg;base64 + if "jpeg" in header or "jpg" in header: + ext = "jpg" + elif "png" in header: + ext = "png" + elif "gif" in header: + ext = "gif" + elif "webp" in header: + ext = "webp" + else: + ext = "jpg" + else: + data = base64_data + ext = "jpg" + + # 解码 base64 + image_bytes = base64.b64decode(data) + + # 生成文件名 + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + uid = uuid.uuid4().hex[:8] + file_path = self.images_dir / f"nano_{ts}_{uid}.{ext}" + + # 保存文件 + with open(file_path, "wb") as f: + f.write(image_bytes) + + logger.info(f"base64图片保存成功: {file_path}") + return str(file_path) + except Exception as e: + logger.error(f"保存base64图片失败: {e}") + import traceback + logger.error(traceback.format_exc()) + return None + async def _download_image(self, url: str) -> Optional[str]: """下载图片到本地""" try: