From e13be17a37a812e749d6273d7cd86314670ace67 Mon Sep 17 00:00:00 2001 From: shihao <3127647737@qq.com> Date: Thu, 11 Dec 2025 13:52:19 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E6=B7=BB=E5=8A=A0=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E8=AF=86=E5=88=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugins/AIChat/main.py | 580 +++++++++++++++++++++++++++++++++++++- plugins/AutoReply/main.py | 209 ++++++++++++-- test_gemini_video.py | 83 ++++++ 3 files changed, 835 insertions(+), 37 deletions(-) create mode 100644 test_gemini_video.py diff --git a/plugins/AIChat/main.py b/plugins/AIChat/main.py index 43b5b38..9c47f75 100644 --- a/plugins/AIChat/main.py +++ b/plugins/AIChat/main.py @@ -1999,20 +1999,19 @@ class AIChat(PluginBase): refer_xml = html.unescape(refer_content.text) refer_root = ET.fromstring(refer_xml) - # 提取图片信息 + # 尝试提取图片信息 img = refer_root.find(".//img") - if img is None: - logger.debug("引用的消息不是图片") + # 尝试提取视频信息 + video = refer_root.find(".//videomsg") + + if img is None and video is None: + logger.debug("引用的消息不是图片或视频") return True - cdnbigimgurl = img.get("cdnbigimgurl", "") - aeskey = img.get("aeskey", "") - - if not cdnbigimgurl or not aeskey: - logger.warning(f"图片信息不完整: cdnurl={bool(cdnbigimgurl)}, aeskey={bool(aeskey)}") + # 检查是否应该回复(提前检查,避免下载后才发现不需要回复) + if not self._should_reply_quote(message, title_text): + logger.debug("引用消息不满足回复条件") return True - - logger.info(f"AI处理引用图片消息: {title_text[:50]}...") # 限流检查 allowed, remaining, reset_time = self._check_rate_limit(user_wxid) @@ -2026,6 +2025,24 @@ class AIChat(PluginBase): # 获取用户昵称 - 使用缓存优化 nickname = await self._get_user_nickname(bot, from_wxid, user_wxid, is_group) + chat_id = self._get_chat_id(from_wxid, user_wxid, is_group) + + # 处理视频消息 + if video is not None: + return await self._handle_quote_video( + bot, video, title_text, from_wxid, user_wxid, + is_group, nickname, chat_id + ) + + # 处理图片消息 + cdnbigimgurl = img.get("cdnbigimgurl", "") + aeskey = img.get("aeskey", "") + + if not cdnbigimgurl or not aeskey: + logger.warning(f"图片信息不完整: cdnurl={bool(cdnbigimgurl)}, aeskey={bool(aeskey)}") + return True + + logger.info(f"AI处理引用图片消息: {title_text[:50]}...") # 下载并编码图片 logger.info(f"开始下载图片: {cdnbigimgurl[:50]}...") @@ -2035,9 +2052,8 @@ class AIChat(PluginBase): await bot.send_text(from_wxid, "❌ 无法处理图片") return False logger.info("图片下载和编码成功") - - # 获取会话ID并添加消息到记忆(包含图片base64) - chat_id = self._get_chat_id(from_wxid, user_wxid, is_group) + + # 添加消息到记忆(包含图片base64) self._add_to_memory(chat_id, "user", title_text, image_base64=image_base64) # 保存用户引用图片消息到群组历史记录 @@ -2065,6 +2081,544 @@ class AIChat(PluginBase): logger.error(f"处理引用消息失败: {e}") return True + async def _handle_quote_video(self, bot, video_elem, title_text: str, from_wxid: str, + user_wxid: str, is_group: bool, nickname: str, chat_id: str): + """处理引用的视频消息 - 双AI架构""" + try: + # 检查视频识别功能是否启用 + video_config = self.config.get("video_recognition", {}) + if not video_config.get("enabled", True): + logger.info("[视频识别] 功能未启用") + await bot.send_text(from_wxid, "❌ 视频识别功能未启用") + return False + + # 提取视频 CDN 信息 + cdnvideourl = video_elem.get("cdnvideourl", "") + aeskey = video_elem.get("aeskey", "") + + # 如果主要的CDN信息为空,尝试获取原始视频信息 + if not cdnvideourl or not aeskey: + cdnvideourl = video_elem.get("cdnrawvideourl", "") + aeskey = video_elem.get("cdnrawvideoaeskey", "") + + if not cdnvideourl or not aeskey: + logger.warning(f"[视频识别] 视频信息不完整: cdnurl={bool(cdnvideourl)}, aeskey={bool(aeskey)}") + await bot.send_text(from_wxid, "❌ 无法获取视频信息") + return False + + logger.info(f"[视频识别] 处理引用视频: {title_text[:50]}...") + + # 提示用户正在处理 + await bot.send_text(from_wxid, "🎬 正在分析视频,请稍候...") + + # 下载并编码视频 + video_base64 = await self._download_and_encode_video(bot, cdnvideourl, aeskey) + if not video_base64: + logger.error("[视频识别] 视频下载失败") + await bot.send_text(from_wxid, "❌ 视频下载失败") + return False + + logger.info("[视频识别] 视频下载和编码成功") + + # ========== 第一步:视频AI 分析视频内容 ========== + video_description = await self._analyze_video_content(video_base64, video_config) + if not video_description: + logger.error("[视频识别] 视频AI分析失败") + await bot.send_text(from_wxid, "❌ 视频分析失败") + return False + + logger.info(f"[视频识别] 视频AI分析完成: {video_description[:100]}...") + + # ========== 第二步:主AI 基于视频描述生成回复 ========== + # 构造包含视频描述的用户消息 + user_question = title_text.strip() if title_text.strip() else "这个视频讲了什么?" + combined_message = f"[用户发送了一个视频,以下是视频内容描述]\n{video_description}\n\n[用户的问题]\n{user_question}" + + # 添加到记忆(让主AI知道用户发了视频) + self._add_to_memory(chat_id, "user", combined_message) + + # 如果是群聊,添加到历史记录 + if is_group: + await self._add_to_history(from_wxid, nickname, f"[发送了一个视频] {user_question}") + + # 调用主AI生成回复(使用现有的 _call_ai_api 方法,继承完整上下文) + response = await self._call_ai_api(combined_message, chat_id, from_wxid, is_group, nickname) + + if response: + await bot.send_text(from_wxid, response) + self._add_to_memory(chat_id, "assistant", response) + # 保存机器人回复到历史记录 + if is_group: + import tomllib + with open("main_config.toml", "rb") as f: + main_config = tomllib.load(f) + bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人") + await self._add_to_history(from_wxid, bot_nickname, response) + logger.success(f"[视频识别] 主AI回复成功: {response[:50]}...") + else: + await bot.send_text(from_wxid, "❌ AI 回复生成失败") + + return False + + except Exception as e: + logger.error(f"[视频识别] 处理视频失败: {e}") + import traceback + logger.error(traceback.format_exc()) + await bot.send_text(from_wxid, "❌ 视频处理出错") + return False + + async def _analyze_video_content(self, video_base64: str, video_config: dict) -> str: + """视频AI:专门分析视频内容,生成客观描述""" + try: + api_url = video_config.get("api_url", "https://api.functen.cn/v1beta/models") + api_key = video_config.get("api_key", self.config["api"]["api_key"]) + model = video_config.get("model", "gemini-3-pro-preview") + + full_url = f"{api_url}/{model}:generateContent" + + # 去除 data:video/mp4;base64, 前缀(如果有) + if video_base64.startswith("data:"): + video_base64 = video_base64.split(",", 1)[1] + logger.debug("[视频AI] 已去除 base64 前缀") + + # 视频分析专用提示词 + analyze_prompt = """请详细分析这个视频的内容,包括: +1. 视频的主要场景和环境 +2. 出现的人物/物体及其动作 +3. 视频中的文字、对话或声音(如果有) +4. 视频的整体主题或要表达的内容 +5. 任何值得注意的细节 + +请用客观、详细的方式描述,不要加入主观评价。""" + + payload = { + "contents": [ + { + "parts": [ + {"text": analyze_prompt}, + { + "inline_data": { + "mime_type": "video/mp4", + "data": video_base64 + } + } + ] + } + ], + "generationConfig": { + "maxOutputTokens": video_config.get("max_tokens", 8192) + } + } + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}" + } + + timeout = aiohttp.ClientTimeout(total=video_config.get("timeout", 360)) + + logger.info(f"[视频AI] 开始分析视频...") + + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.post(full_url, json=payload, headers=headers) as resp: + if resp.status != 200: + error_text = await resp.text() + logger.error(f"[视频AI] API 错误: {resp.status}, {error_text[:300]}") + return "" + + result = await resp.json() + logger.info(f"[视频AI] API 响应 keys: {list(result.keys())}") + + # 检查安全过滤 + if "promptFeedback" in result: + feedback = result["promptFeedback"] + if feedback.get("blockReason"): + logger.warning(f"[视频AI] 内容被过滤: {feedback.get('blockReason')}") + return "" + + # 提取文本 + if "candidates" in result and result["candidates"]: + for candidate in result["candidates"]: + # 检查是否被安全过滤 + if candidate.get("finishReason") == "SAFETY": + logger.warning("[视频AI] 响应被安全过滤") + return "" + + content = candidate.get("content", {}) + for part in content.get("parts", []): + if "text" in part: + text = part["text"] + logger.info(f"[视频AI] 分析完成,长度: {len(text)}") + return text + + # 记录失败原因 + if "usageMetadata" in result: + usage = result["usageMetadata"] + logger.warning(f"[视频AI] 无响应,Token: prompt={usage.get('promptTokenCount', 0)}") + + logger.error(f"[视频AI] 没有有效响应: {str(result)[:300]}") + return "" + + except asyncio.TimeoutError: + logger.error(f"[视频AI] 请求超时") + return "" + except Exception as e: + logger.error(f"[视频AI] 分析失败: {e}") + import traceback + logger.error(traceback.format_exc()) + return "" + + async def _download_and_encode_video(self, bot, cdnurl: str, aeskey: str) -> str: + """下载视频并转换为 base64""" + try: + # 从缓存获取 + from utils.redis_cache import RedisCache + redis_cache = get_cache() + if redis_cache and redis_cache.enabled: + media_key = RedisCache.generate_media_key(cdnurl, aeskey) + if media_key: + cached_data = redis_cache.get_cached_media(media_key, "video") + if cached_data: + logger.debug(f"[视频识别] 从缓存获取视频: {media_key[:20]}...") + return cached_data + + # 下载视频 + logger.info(f"[视频识别] 开始下载视频...") + temp_dir = Path(__file__).parent / "temp" + temp_dir.mkdir(exist_ok=True) + + filename = f"video_{uuid.uuid4().hex[:8]}.mp4" + save_path = str((temp_dir / filename).resolve()) + + # file_type=4 表示视频 + success = await bot.cdn_download(cdnurl, aeskey, save_path, file_type=4) + if not success: + logger.error("[视频识别] CDN 下载失败") + return "" + + # 等待文件写入完成 + import os + for _ in range(30): # 最多等待15秒 + if os.path.exists(save_path) and os.path.getsize(save_path) > 0: + break + await asyncio.sleep(0.5) + + if not os.path.exists(save_path): + logger.error("[视频识别] 视频文件未生成") + return "" + + file_size = os.path.getsize(save_path) + logger.info(f"[视频识别] 视频下载完成,大小: {file_size / 1024 / 1024:.2f} MB") + + # 检查文件大小限制 + video_config = self.config.get("video_recognition", {}) + max_size_mb = video_config.get("max_size_mb", 20) + if file_size > max_size_mb * 1024 * 1024: + logger.warning(f"[视频识别] 视频文件过大: {file_size / 1024 / 1024:.2f} MB > {max_size_mb} MB") + try: + Path(save_path).unlink() + except: + pass + return "" + + # 读取并编码为 base64 + with open(save_path, "rb") as f: + video_data = base64.b64encode(f.read()).decode() + + video_base64 = f"data:video/mp4;base64,{video_data}" + + # 缓存到 Redis + if redis_cache and redis_cache.enabled and media_key: + redis_cache.cache_media(media_key, video_base64, "video", ttl=600) + logger.debug(f"[视频识别] 视频已缓存: {media_key[:20]}...") + + # 清理临时文件 + try: + Path(save_path).unlink() + except: + pass + + return video_base64 + + except Exception as e: + logger.error(f"[视频识别] 下载视频失败: {e}") + import traceback + logger.error(traceback.format_exc()) + return "" + + async def _call_ai_api_with_video(self, user_message: str, video_base64: str, bot=None, + from_wxid: str = None, chat_id: str = None, + nickname: str = "", user_wxid: str = None, + is_group: bool = False) -> str: + """调用 Gemini 原生 API(带视频)- 继承完整上下文""" + try: + video_config = self.config.get("video_recognition", {}) + + # 使用视频识别专用配置 + video_model = video_config.get("model", "gemini-3-pro-preview") + api_url = video_config.get("api_url", "https://api.functen.cn/v1beta/models") + api_key = video_config.get("api_key", self.config["api"]["api_key"]) + + # 构建完整的 API URL + full_url = f"{api_url}/{video_model}:generateContent" + + # 构建系统提示(与 _call_ai_api 保持一致) + system_content = self.system_prompt + current_time = datetime.now() + weekday_map = { + 0: "星期一", 1: "星期二", 2: "星期三", 3: "星期四", + 4: "星期五", 5: "星期六", 6: "星期日" + } + weekday = weekday_map[current_time.weekday()] + time_str = current_time.strftime(f"%Y年%m月%d日 %H:%M:%S {weekday}") + system_content += f"\n\n当前时间:{time_str}" + + if nickname: + system_content += f"\n当前对话用户的昵称是:{nickname}" + + # 加载持久记忆 + memory_chat_id = from_wxid if is_group else user_wxid + if memory_chat_id: + persistent_memories = self._get_persistent_memories(memory_chat_id) + if persistent_memories: + system_content += "\n\n【持久记忆】以下是用户要求你记住的重要信息:\n" + for m in persistent_memories: + mem_time = m['time'][:10] if m['time'] else "" + system_content += f"- [{mem_time}] {m['nickname']}: {m['content']}\n" + + # 构建历史上下文 + history_context = "" + if is_group and from_wxid: + # 群聊:从 Redis/文件加载历史 + history = await self._load_history(from_wxid) + max_context = self.config.get("history", {}).get("max_context", 50) + recent_history = history[-max_context:] if len(history) > max_context else history + + if recent_history: + history_context = "\n\n【最近的群聊记录】\n" + for msg in recent_history: + msg_nickname = msg.get("nickname", "") + msg_content = msg.get("content", "") + if isinstance(msg_content, list): + # 多模态内容,提取文本 + for item in msg_content: + if item.get("type") == "text": + msg_content = item.get("text", "") + break + else: + msg_content = "[图片]" + # 限制单条消息长度 + if len(str(msg_content)) > 200: + msg_content = str(msg_content)[:200] + "..." + history_context += f"[{msg_nickname}] {msg_content}\n" + else: + # 私聊:从 memory 加载 + if chat_id: + memory_messages = self._get_memory_messages(chat_id) + if memory_messages: + history_context = "\n\n【最近的对话记录】\n" + for msg in memory_messages[-20:]: # 最近20条 + role = msg.get("role", "") + content = msg.get("content", "") + if isinstance(content, list): + for item in content: + if item.get("type") == "text": + content = item.get("text", "") + break + else: + content = "[图片]" + role_name = "用户" if role == "user" else "你" + if len(str(content)) > 200: + content = str(content)[:200] + "..." + history_context += f"[{role_name}] {content}\n" + + # 从 data:video/mp4;base64,xxx 中提取纯 base64 数据 + if video_base64.startswith("data:"): + video_base64 = video_base64.split(",", 1)[1] + + # 构建完整提示(人设 + 历史 + 当前问题) + full_prompt = system_content + history_context + f"\n\n【当前】用户发送了一个视频并问:{user_message or '请描述这个视频的内容'}" + + # 构建 Gemini 原生格式请求 + payload = { + "contents": [ + { + "parts": [ + {"text": full_prompt}, + { + "inline_data": { + "mime_type": "video/mp4", + "data": video_base64 + } + } + ] + } + ], + "generationConfig": { + "maxOutputTokens": video_config.get("max_tokens", 8192) + } + } + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}" + } + + timeout = aiohttp.ClientTimeout(total=video_config.get("timeout", 360)) + + # 配置代理 + connector = None + proxy_config = self.config.get("proxy", {}) + if proxy_config.get("enabled", False) and PROXY_SUPPORT: + proxy_type = proxy_config.get("type", "socks5").upper() + proxy_host = proxy_config.get("host", "127.0.0.1") + proxy_port = proxy_config.get("port", 7890) + proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}" + try: + connector = ProxyConnector.from_url(proxy_url) + except Exception as e: + logger.warning(f"[视频识别] 代理配置失败: {e}") + + logger.info(f"[视频识别] 调用 Gemini API: {full_url}") + logger.debug(f"[视频识别] 提示词长度: {len(full_prompt)} 字符") + + async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session: + async with session.post(full_url, json=payload, headers=headers) as resp: + if resp.status != 200: + error_text = await resp.text() + logger.error(f"[视频识别] API 错误: {resp.status}, {error_text[:500]}") + return "" + + # 解析 Gemini 响应格式 + result = await resp.json() + # 详细记录响应(用于调试) + logger.info(f"[视频识别] API 响应 keys: {list(result.keys()) if isinstance(result, dict) else type(result)}") + + # 检查是否有错误 + if "error" in result: + logger.error(f"[视频识别] API 返回错误: {result['error']}") + return "" + + # 检查 promptFeedback(安全过滤信息) + if "promptFeedback" in result: + feedback = result["promptFeedback"] + block_reason = feedback.get("blockReason", "") + if block_reason: + logger.warning(f"[视频识别] 请求被阻止,原因: {block_reason}") + logger.warning(f"[视频识别] 安全评级: {feedback.get('safetyRatings', [])}") + return "抱歉,视频内容无法分析(内容策略限制)。" + + # 提取文本内容 + full_content = "" + if "candidates" in result and result["candidates"]: + logger.info(f"[视频识别] candidates 数量: {len(result['candidates'])}") + for i, candidate in enumerate(result["candidates"]): + # 检查 finishReason + finish_reason = candidate.get("finishReason", "") + if finish_reason: + logger.info(f"[视频识别] candidate[{i}] finishReason: {finish_reason}") + if finish_reason == "SAFETY": + logger.warning(f"[视频识别] 内容被安全过滤: {candidate.get('safetyRatings', [])}") + return "抱歉,视频内容无法分析。" + + content = candidate.get("content", {}) + parts = content.get("parts", []) + logger.info(f"[视频识别] candidate[{i}] parts 数量: {len(parts)}") + for part in parts: + if "text" in part: + full_content += part["text"] + else: + # 没有 candidates,记录完整响应 + logger.error(f"[视频识别] 响应中没有 candidates: {str(result)[:500]}") + # 可能是上下文太长导致,记录 token 使用情况 + if "usageMetadata" in result: + usage = result["usageMetadata"] + logger.warning(f"[视频识别] Token 使用: prompt={usage.get('promptTokenCount', 0)}, total={usage.get('totalTokenCount', 0)}") + + logger.info(f"[视频识别] AI 响应完成,长度: {len(full_content)}") + + # 如果没有内容,尝试简化重试 + if not full_content: + logger.info("[视频识别] 尝试简化请求重试...") + return await self._call_ai_api_with_video_simple( + user_message or "请描述这个视频的内容", + video_base64, + video_config + ) + + return full_content.strip() + + except Exception as e: + logger.error(f"[视频识别] API 调用失败: {e}") + import traceback + logger.error(traceback.format_exc()) + return "" + + async def _call_ai_api_with_video_simple(self, user_message: str, video_base64: str, video_config: dict) -> str: + """简化版视频识别 API 调用(不带上下文,用于降级重试)""" + try: + api_url = video_config.get("api_url", "https://api.functen.cn/v1beta/models") + api_key = video_config.get("api_key", self.config["api"]["api_key"]) + model = video_config.get("model", "gemini-3-pro-preview") + + full_url = f"{api_url}/{model}:generateContent" + + # 简化请求:只发送用户问题和视频 + payload = { + "contents": [ + { + "parts": [ + {"text": user_message}, + { + "inline_data": { + "mime_type": "video/mp4", + "data": video_base64 + } + } + ] + } + ], + "generationConfig": { + "maxOutputTokens": video_config.get("max_tokens", 8192) + } + } + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}" + } + + timeout = aiohttp.ClientTimeout(total=video_config.get("timeout", 360)) + + logger.info(f"[视频识别-简化] 调用 API: {full_url}") + + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.post(full_url, json=payload, headers=headers) as resp: + if resp.status != 200: + error_text = await resp.text() + logger.error(f"[视频识别-简化] API 错误: {resp.status}, {error_text[:300]}") + return "" + + result = await resp.json() + logger.info(f"[视频识别-简化] API 响应 keys: {list(result.keys())}") + + # 提取文本 + if "candidates" in result and result["candidates"]: + for candidate in result["candidates"]: + content = candidate.get("content", {}) + for part in content.get("parts", []): + if "text" in part: + text = part["text"] + logger.info(f"[视频识别-简化] 成功,长度: {len(text)}") + return text + + logger.error(f"[视频识别-简化] 仍然没有 candidates: {str(result)[:300]}") + return "" + + except Exception as e: + logger.error(f"[视频识别-简化] 失败: {e}") + return "" + def _should_reply_quote(self, message: dict, title_text: str) -> bool: """判断是否应该回复引用消息""" is_group = message.get("IsGroup", False) diff --git a/plugins/AutoReply/main.py b/plugins/AutoReply/main.py index fb42c2b..e5a9de9 100644 --- a/plugins/AutoReply/main.py +++ b/plugins/AutoReply/main.py @@ -2,6 +2,11 @@ AutoReply 插件 - 基于双LLM架构的智能自动回复 使用小模型判断是否需要回复,通过后触发AIChat插件生成回复 + +v2.0 改进: +- 后台异步判断,不阻塞消息处理 +- 消息过时检测,避免回复已被刷走的消息 +- 更智能的冷却机制 """ import json @@ -12,7 +17,7 @@ import aiohttp from pathlib import Path from datetime import datetime, date from dataclasses import dataclass, field -from typing import Dict, Optional +from typing import Dict, Optional, List from loguru import logger from utils.plugin_base import PluginBase from utils.decorators import on_text_message @@ -45,6 +50,17 @@ class ChatState: last_reset_date: str = "" total_messages: int = 0 total_replies: int = 0 + message_count_at_trigger: int = 0 # 触发判断时的消息计数 + + +@dataclass +class PendingJudge: + """待处理的判断任务""" + chat_id: str + from_wxid: str + content: str + trigger_time: float + message_count: int # 触发时的消息计数 class AutoReply(PluginBase): @@ -52,7 +68,7 @@ class AutoReply(PluginBase): description = "基于双LLM架构的智能自动回复插件" author = "ShiHao" - version = "1.1.0" + version = "2.0.0" def __init__(self): super().__init__() @@ -60,9 +76,12 @@ class AutoReply(PluginBase): self.chat_states: Dict[str, ChatState] = {} self.weights = {} self.last_judge_time: Dict[str, float] = {} + self.last_reply_time: Dict[str, float] = {} # 上次回复时间(用于冷却) self.judging: Dict[str, bool] = {} self.bot_wxid: str = "" self.bot_nickname: str = "" + self.pending_tasks: Dict[str, asyncio.Task] = {} # 后台判断任务 + self.message_counters: Dict[str, int] = {} # 每个群的消息计数器 async def async_init(self): """异步初始化""" @@ -145,7 +164,7 @@ class AutoReply(PluginBase): @on_text_message(priority=90) async def handle_message(self, bot, message: dict): - """处理消息""" + """处理消息 - 后台异步判断模式""" try: # 检查是否启用 if not self.config or not self.config["basic"]["enabled"]: @@ -179,42 +198,50 @@ class AutoReply(PluginBase): chat_id = self._normalize_chat_id(from_wxid) current_time = time.time() + # 更新消息计数器(每条消息都计数) + self.message_counters[chat_id] = self.message_counters.get(chat_id, 0) + 1 + current_msg_count = self.message_counters[chat_id] + # 频率限制:检查是否正在判断中 if self.judging.get(chat_id, False): logger.debug(f"[AutoReply] 群聊 {from_wxid[:15]}... 正在判断中,跳过") return True - # 频率限制:检查时间间隔 + # 频率限制:检查判断间隔 min_interval = self.config.get("rate_limit", {}).get("min_interval", 10) - last_time = self.last_judge_time.get(chat_id, 0) - if current_time - last_time < min_interval: - logger.debug(f"[AutoReply] 距离上次判断仅 {current_time - last_time:.1f}秒,跳过") + last_judge = self.last_judge_time.get(chat_id, 0) + if current_time - last_judge < min_interval: + logger.debug(f"[AutoReply] 距离上次判断仅 {current_time - last_judge:.1f}秒,跳过") + return True + + # 冷却检查:上次回复后的冷却时间 + reply_cooldown = self.config.get("rate_limit", {}).get("reply_cooldown", 60) + last_reply = self.last_reply_time.get(chat_id, 0) + if current_time - last_reply < reply_cooldown: + logger.debug(f"[AutoReply] 回复冷却中,剩余 {reply_cooldown - (current_time - last_reply):.0f}秒") return True # 标记正在判断 self.judging[chat_id] = True self.last_judge_time[chat_id] = current_time - try: - # 使用小模型判断 - judge_result = await self._judge_with_small_model(from_wxid, content) + # 启动后台判断任务(fire-and-forget) + pending = PendingJudge( + chat_id=chat_id, + from_wxid=from_wxid, + content=content, + trigger_time=current_time, + message_count=current_msg_count + ) - if judge_result.should_reply: - logger.info(f"[AutoReply] 触发回复 | 群:{from_wxid[:15]}... | 评分:{judge_result.overall_score:.2f} | {judge_result.reasoning[:30]}") + task = asyncio.create_task( + self._background_judge(bot, pending) + ) + self.pending_tasks[chat_id] = task - # 更新状态 - self._update_state(chat_id, replied=True) - - # 设置触发标记,让AIChat处理 - message['_auto_reply_triggered'] = True - else: - logger.debug(f"[AutoReply] 不触发 | 群:{from_wxid[:15]}... | 评分:{judge_result.overall_score:.2f}") - self._update_state(chat_id, replied=False) - - finally: - # 清除判断中标记 - self.judging[chat_id] = False + logger.debug(f"[AutoReply] 启动后台判断 | 群:{from_wxid[:15]}... | 消息#{current_msg_count}") + # 立即返回,不阻塞消息处理 return True except Exception as e: @@ -226,6 +253,140 @@ class AutoReply(PluginBase): self.judging[chat_id] = False return True + async def _background_judge(self, bot, pending: PendingJudge): + """后台判断任务 - 判断完成后直接触发回复""" + chat_id = pending.chat_id + try: + # 使用小模型判断 + judge_result = await self._judge_with_small_model(pending.from_wxid, pending.content) + + if not judge_result.should_reply: + logger.debug(f"[AutoReply] 不触发 | 群:{pending.from_wxid[:15]}... | 评分:{judge_result.overall_score:.2f}") + self._update_state(chat_id, replied=False) + return + + # 检查消息是否过时 + current_time = time.time() + current_msg_count = self.message_counters.get(chat_id, 0) + + elapsed_time = current_time - pending.trigger_time + new_messages = current_msg_count - pending.message_count + + # 获取过时阈值配置 + stale_config = self.config.get("stale_detection", {}) + max_elapsed = stale_config.get("max_elapsed_seconds", 60) + max_new_messages = stale_config.get("max_new_messages", 15) + + is_stale = elapsed_time > max_elapsed or new_messages > max_new_messages + + if is_stale: + logger.info(f"[AutoReply] 消息过时,放弃回复 | 耗时:{elapsed_time:.1f}s | 新消息:{new_messages}条") + self._update_state(chat_id, replied=False) + return + + # 触发回复 + logger.info(f"[AutoReply] 触发回复 | 群:{pending.from_wxid[:15]}... | 评分:{judge_result.overall_score:.2f} | 耗时:{elapsed_time:.1f}s | {judge_result.reasoning[:30]}") + + # 更新状态 + self._update_state(chat_id, replied=True) + self.last_reply_time[chat_id] = current_time + + # 直接调用 AIChat 生成回复(基于最新上下文) + await self._trigger_ai_reply(bot, pending.from_wxid) + + except Exception as e: + logger.error(f"[AutoReply] 后台判断异常: {e}") + import traceback + logger.error(traceback.format_exc()) + finally: + # 清除判断中标记 + self.judging[chat_id] = False + # 清理任务引用 + if chat_id in self.pending_tasks: + del self.pending_tasks[chat_id] + + async def _trigger_ai_reply(self, bot, from_wxid: str): + """触发 AIChat 生成回复(基于最新历史上下文)""" + try: + from utils.plugin_manager import PluginManager + aichat_plugin = PluginManager().plugins.get("AIChat") + + if not aichat_plugin: + logger.warning("[AutoReply] AIChat 插件未加载") + return + + # 获取最新的历史记录作为上下文 + chat_id = self._normalize_chat_id(from_wxid) + recent_context = await self._get_recent_context_for_reply(chat_id) + + if not recent_context: + logger.warning("[AutoReply] 无法获取上下文") + return + + # 构造一个虚拟消息,触发 AIChat 回复 + # 使用特殊标记让 AIChat 知道这是自动回复触发 + virtual_message = { + 'FromWxid': from_wxid, + 'SenderWxid': '', # 空,表示不是特定用户 + 'Content': recent_context, + 'IsGroup': True, + '_auto_reply_triggered': True, + '_auto_reply_context': True, # 标记这是上下文触发 + } + + # 调用 AIChat 的处理方法 + await aichat_plugin.handle_message(bot, virtual_message) + + except Exception as e: + logger.error(f"[AutoReply] 触发AI回复失败: {e}") + import traceback + logger.error(traceback.format_exc()) + + async def _get_recent_context_for_reply(self, chat_id: str) -> str: + """获取最近的上下文用于生成回复""" + try: + history = await self._get_history(chat_id) + if not history: + return "" + + # 取最近几条消息作为上下文提示 + count = self.config.get('context', {}).get('messages_count', 5) + recent = history[-count:] if len(history) > count else history + + # 构建上下文摘要 + context_lines = [] + for record in recent: + nickname = record.get('nickname', '未知') + content = record.get('content', '') + if isinstance(content, list): + # 多模态内容,提取文本 + for item in content: + if item.get('type') == 'text': + content = item.get('text', '') + break + else: + content = '[图片]' + if len(content) > 50: + content = content[:50] + "..." + context_lines.append(f"{nickname}: {content}") + + # 返回最后一条消息作为触发内容(AIChat 会读取完整历史) + if recent: + last = recent[-1] + last_content = last.get('content', '') + if isinstance(last_content, list): + for item in last_content: + if item.get('type') == 'text': + return item.get('text', '') + return '[图片]' + return last_content + + return "" + + except Exception as e: + logger.error(f"[AutoReply] 获取上下文失败: {e}") + return "" + async def _judge_with_small_model(self, from_wxid: str, content: str) -> JudgeResult: """使用小模型判断是否需要回复""" chat_id = self._normalize_chat_id(from_wxid) diff --git a/test_gemini_video.py b/test_gemini_video.py new file mode 100644 index 0000000..a87b0bd --- /dev/null +++ b/test_gemini_video.py @@ -0,0 +1,83 @@ +""" +测试 Gemini API 视频识别 +""" +import base64 +import requests +import json + +# 配置 +API_URL = "https://api.functen.cn/v1beta/models/gemini-3-pro-preview:generateContent" +API_KEY = "sk-NeOtq0kOU39x3LMqY09aKYLoOBJIgFkgGuDwVGgGEstXPn3M" +VIDEO_PATH = r"D:\project\shrobot\WechatHookBot\plugins\AIChat\265d0df9ea89578bcb86f824c5255a42.mp4" + +def test_video(): + # 读取视频并编码为 base64 + print(f"读取视频: {VIDEO_PATH}") + with open(VIDEO_PATH, "rb") as f: + video_data = f.read() + + video_base64 = base64.b64encode(video_data).decode() + print(f"视频大小: {len(video_data) / 1024 / 1024:.2f} MB") + print(f"Base64 长度: {len(video_base64)}") + + # 构建 Gemini 原生格式请求 + payload = { + "contents": [ + { + "parts": [ + {"text": "请描述这个视频的内容"}, + { + "inline_data": { + "mime_type": "video/mp4", + "data": video_base64 + } + } + ] + } + ], + "generationConfig": { + "maxOutputTokens": 4096 + } + } + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {API_KEY}" + } + + print(f"\n发送请求到: {API_URL}") + print("请求中...") + + try: + response = requests.post( + API_URL, + headers=headers, + json=payload, + timeout=180 + ) + + print(f"\n状态码: {response.status_code}") + + if response.status_code == 200: + result = response.json() + print("\n=== 响应内容 ===") + print(json.dumps(result, ensure_ascii=False, indent=2)) + + # 提取文本 + if "candidates" in result: + for candidate in result["candidates"]: + content = candidate.get("content", {}) + for part in content.get("parts", []): + if "text" in part: + print("\n=== AI 回复 ===") + print(part["text"]) + else: + print(f"\n错误响应: {response.text}") + + except Exception as e: + print(f"\n请求失败: {e}") + import traceback + traceback.print_exc() + +if __name__ == "__main__": + test_video()