feat:添加视频识别

This commit is contained in:
2025-12-11 13:52:19 +08:00
parent debb67d71c
commit e13be17a37
3 changed files with 835 additions and 37 deletions

View File

@@ -1999,20 +1999,19 @@ class AIChat(PluginBase):
refer_xml = html.unescape(refer_content.text)
refer_root = ET.fromstring(refer_xml)
# 提取图片信息
# 尝试提取图片信息
img = refer_root.find(".//img")
if img is None:
logger.debug("引用的消息不是图片")
# 尝试提取视频信息
video = refer_root.find(".//videomsg")
if img is None and video is None:
logger.debug("引用的消息不是图片或视频")
return True
cdnbigimgurl = img.get("cdnbigimgurl", "")
aeskey = img.get("aeskey", "")
if not cdnbigimgurl or not aeskey:
logger.warning(f"图片信息不完整: cdnurl={bool(cdnbigimgurl)}, aeskey={bool(aeskey)}")
# 检查是否应该回复(提前检查,避免下载后才发现不需要回复)
if not self._should_reply_quote(message, title_text):
logger.debug("引用消息不满足回复条件")
return True
logger.info(f"AI处理引用图片消息: {title_text[:50]}...")
# 限流检查
allowed, remaining, reset_time = self._check_rate_limit(user_wxid)
@@ -2026,6 +2025,24 @@ class AIChat(PluginBase):
# 获取用户昵称 - 使用缓存优化
nickname = await self._get_user_nickname(bot, from_wxid, user_wxid, is_group)
chat_id = self._get_chat_id(from_wxid, user_wxid, is_group)
# 处理视频消息
if video is not None:
return await self._handle_quote_video(
bot, video, title_text, from_wxid, user_wxid,
is_group, nickname, chat_id
)
# 处理图片消息
cdnbigimgurl = img.get("cdnbigimgurl", "")
aeskey = img.get("aeskey", "")
if not cdnbigimgurl or not aeskey:
logger.warning(f"图片信息不完整: cdnurl={bool(cdnbigimgurl)}, aeskey={bool(aeskey)}")
return True
logger.info(f"AI处理引用图片消息: {title_text[:50]}...")
# 下载并编码图片
logger.info(f"开始下载图片: {cdnbigimgurl[:50]}...")
@@ -2035,9 +2052,8 @@ class AIChat(PluginBase):
await bot.send_text(from_wxid, "❌ 无法处理图片")
return False
logger.info("图片下载和编码成功")
# 获取会话ID并添加消息到记忆包含图片base64
chat_id = self._get_chat_id(from_wxid, user_wxid, is_group)
# 添加消息到记忆包含图片base64
self._add_to_memory(chat_id, "user", title_text, image_base64=image_base64)
# 保存用户引用图片消息到群组历史记录
@@ -2065,6 +2081,544 @@ class AIChat(PluginBase):
logger.error(f"处理引用消息失败: {e}")
return True
async def _handle_quote_video(self, bot, video_elem, title_text: str, from_wxid: str,
user_wxid: str, is_group: bool, nickname: str, chat_id: str):
"""处理引用的视频消息 - 双AI架构"""
try:
# 检查视频识别功能是否启用
video_config = self.config.get("video_recognition", {})
if not video_config.get("enabled", True):
logger.info("[视频识别] 功能未启用")
await bot.send_text(from_wxid, "❌ 视频识别功能未启用")
return False
# 提取视频 CDN 信息
cdnvideourl = video_elem.get("cdnvideourl", "")
aeskey = video_elem.get("aeskey", "")
# 如果主要的CDN信息为空尝试获取原始视频信息
if not cdnvideourl or not aeskey:
cdnvideourl = video_elem.get("cdnrawvideourl", "")
aeskey = video_elem.get("cdnrawvideoaeskey", "")
if not cdnvideourl or not aeskey:
logger.warning(f"[视频识别] 视频信息不完整: cdnurl={bool(cdnvideourl)}, aeskey={bool(aeskey)}")
await bot.send_text(from_wxid, "❌ 无法获取视频信息")
return False
logger.info(f"[视频识别] 处理引用视频: {title_text[:50]}...")
# 提示用户正在处理
await bot.send_text(from_wxid, "🎬 正在分析视频,请稍候...")
# 下载并编码视频
video_base64 = await self._download_and_encode_video(bot, cdnvideourl, aeskey)
if not video_base64:
logger.error("[视频识别] 视频下载失败")
await bot.send_text(from_wxid, "❌ 视频下载失败")
return False
logger.info("[视频识别] 视频下载和编码成功")
# ========== 第一步视频AI 分析视频内容 ==========
video_description = await self._analyze_video_content(video_base64, video_config)
if not video_description:
logger.error("[视频识别] 视频AI分析失败")
await bot.send_text(from_wxid, "❌ 视频分析失败")
return False
logger.info(f"[视频识别] 视频AI分析完成: {video_description[:100]}...")
# ========== 第二步主AI 基于视频描述生成回复 ==========
# 构造包含视频描述的用户消息
user_question = title_text.strip() if title_text.strip() else "这个视频讲了什么?"
combined_message = f"[用户发送了一个视频,以下是视频内容描述]\n{video_description}\n\n[用户的问题]\n{user_question}"
# 添加到记忆让主AI知道用户发了视频
self._add_to_memory(chat_id, "user", combined_message)
# 如果是群聊,添加到历史记录
if is_group:
await self._add_to_history(from_wxid, nickname, f"[发送了一个视频] {user_question}")
# 调用主AI生成回复使用现有的 _call_ai_api 方法,继承完整上下文)
response = await self._call_ai_api(combined_message, chat_id, from_wxid, is_group, nickname)
if response:
await bot.send_text(from_wxid, response)
self._add_to_memory(chat_id, "assistant", response)
# 保存机器人回复到历史记录
if is_group:
import tomllib
with open("main_config.toml", "rb") as f:
main_config = tomllib.load(f)
bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人")
await self._add_to_history(from_wxid, bot_nickname, response)
logger.success(f"[视频识别] 主AI回复成功: {response[:50]}...")
else:
await bot.send_text(from_wxid, "❌ AI 回复生成失败")
return False
except Exception as e:
logger.error(f"[视频识别] 处理视频失败: {e}")
import traceback
logger.error(traceback.format_exc())
await bot.send_text(from_wxid, "❌ 视频处理出错")
return False
async def _analyze_video_content(self, video_base64: str, video_config: dict) -> str:
"""视频AI专门分析视频内容生成客观描述"""
try:
api_url = video_config.get("api_url", "https://api.functen.cn/v1beta/models")
api_key = video_config.get("api_key", self.config["api"]["api_key"])
model = video_config.get("model", "gemini-3-pro-preview")
full_url = f"{api_url}/{model}:generateContent"
# 去除 data:video/mp4;base64, 前缀(如果有)
if video_base64.startswith("data:"):
video_base64 = video_base64.split(",", 1)[1]
logger.debug("[视频AI] 已去除 base64 前缀")
# 视频分析专用提示词
analyze_prompt = """请详细分析这个视频的内容,包括:
1. 视频的主要场景和环境
2. 出现的人物/物体及其动作
3. 视频中的文字、对话或声音(如果有)
4. 视频的整体主题或要表达的内容
5. 任何值得注意的细节
请用客观、详细的方式描述,不要加入主观评价。"""
payload = {
"contents": [
{
"parts": [
{"text": analyze_prompt},
{
"inline_data": {
"mime_type": "video/mp4",
"data": video_base64
}
}
]
}
],
"generationConfig": {
"maxOutputTokens": video_config.get("max_tokens", 8192)
}
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
timeout = aiohttp.ClientTimeout(total=video_config.get("timeout", 360))
logger.info(f"[视频AI] 开始分析视频...")
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(full_url, json=payload, headers=headers) as resp:
if resp.status != 200:
error_text = await resp.text()
logger.error(f"[视频AI] API 错误: {resp.status}, {error_text[:300]}")
return ""
result = await resp.json()
logger.info(f"[视频AI] API 响应 keys: {list(result.keys())}")
# 检查安全过滤
if "promptFeedback" in result:
feedback = result["promptFeedback"]
if feedback.get("blockReason"):
logger.warning(f"[视频AI] 内容被过滤: {feedback.get('blockReason')}")
return ""
# 提取文本
if "candidates" in result and result["candidates"]:
for candidate in result["candidates"]:
# 检查是否被安全过滤
if candidate.get("finishReason") == "SAFETY":
logger.warning("[视频AI] 响应被安全过滤")
return ""
content = candidate.get("content", {})
for part in content.get("parts", []):
if "text" in part:
text = part["text"]
logger.info(f"[视频AI] 分析完成,长度: {len(text)}")
return text
# 记录失败原因
if "usageMetadata" in result:
usage = result["usageMetadata"]
logger.warning(f"[视频AI] 无响应Token: prompt={usage.get('promptTokenCount', 0)}")
logger.error(f"[视频AI] 没有有效响应: {str(result)[:300]}")
return ""
except asyncio.TimeoutError:
logger.error(f"[视频AI] 请求超时")
return ""
except Exception as e:
logger.error(f"[视频AI] 分析失败: {e}")
import traceback
logger.error(traceback.format_exc())
return ""
async def _download_and_encode_video(self, bot, cdnurl: str, aeskey: str) -> str:
"""下载视频并转换为 base64"""
try:
# 从缓存获取
from utils.redis_cache import RedisCache
redis_cache = get_cache()
if redis_cache and redis_cache.enabled:
media_key = RedisCache.generate_media_key(cdnurl, aeskey)
if media_key:
cached_data = redis_cache.get_cached_media(media_key, "video")
if cached_data:
logger.debug(f"[视频识别] 从缓存获取视频: {media_key[:20]}...")
return cached_data
# 下载视频
logger.info(f"[视频识别] 开始下载视频...")
temp_dir = Path(__file__).parent / "temp"
temp_dir.mkdir(exist_ok=True)
filename = f"video_{uuid.uuid4().hex[:8]}.mp4"
save_path = str((temp_dir / filename).resolve())
# file_type=4 表示视频
success = await bot.cdn_download(cdnurl, aeskey, save_path, file_type=4)
if not success:
logger.error("[视频识别] CDN 下载失败")
return ""
# 等待文件写入完成
import os
for _ in range(30): # 最多等待15秒
if os.path.exists(save_path) and os.path.getsize(save_path) > 0:
break
await asyncio.sleep(0.5)
if not os.path.exists(save_path):
logger.error("[视频识别] 视频文件未生成")
return ""
file_size = os.path.getsize(save_path)
logger.info(f"[视频识别] 视频下载完成,大小: {file_size / 1024 / 1024:.2f} MB")
# 检查文件大小限制
video_config = self.config.get("video_recognition", {})
max_size_mb = video_config.get("max_size_mb", 20)
if file_size > max_size_mb * 1024 * 1024:
logger.warning(f"[视频识别] 视频文件过大: {file_size / 1024 / 1024:.2f} MB > {max_size_mb} MB")
try:
Path(save_path).unlink()
except:
pass
return ""
# 读取并编码为 base64
with open(save_path, "rb") as f:
video_data = base64.b64encode(f.read()).decode()
video_base64 = f"data:video/mp4;base64,{video_data}"
# 缓存到 Redis
if redis_cache and redis_cache.enabled and media_key:
redis_cache.cache_media(media_key, video_base64, "video", ttl=600)
logger.debug(f"[视频识别] 视频已缓存: {media_key[:20]}...")
# 清理临时文件
try:
Path(save_path).unlink()
except:
pass
return video_base64
except Exception as e:
logger.error(f"[视频识别] 下载视频失败: {e}")
import traceback
logger.error(traceback.format_exc())
return ""
async def _call_ai_api_with_video(self, user_message: str, video_base64: str, bot=None,
from_wxid: str = None, chat_id: str = None,
nickname: str = "", user_wxid: str = None,
is_group: bool = False) -> str:
"""调用 Gemini 原生 API带视频- 继承完整上下文"""
try:
video_config = self.config.get("video_recognition", {})
# 使用视频识别专用配置
video_model = video_config.get("model", "gemini-3-pro-preview")
api_url = video_config.get("api_url", "https://api.functen.cn/v1beta/models")
api_key = video_config.get("api_key", self.config["api"]["api_key"])
# 构建完整的 API URL
full_url = f"{api_url}/{video_model}:generateContent"
# 构建系统提示(与 _call_ai_api 保持一致)
system_content = self.system_prompt
current_time = datetime.now()
weekday_map = {
0: "星期一", 1: "星期二", 2: "星期三", 3: "星期四",
4: "星期五", 5: "星期六", 6: "星期日"
}
weekday = weekday_map[current_time.weekday()]
time_str = current_time.strftime(f"%Y年%m月%d日 %H:%M:%S {weekday}")
system_content += f"\n\n当前时间:{time_str}"
if nickname:
system_content += f"\n当前对话用户的昵称是:{nickname}"
# 加载持久记忆
memory_chat_id = from_wxid if is_group else user_wxid
if memory_chat_id:
persistent_memories = self._get_persistent_memories(memory_chat_id)
if persistent_memories:
system_content += "\n\n【持久记忆】以下是用户要求你记住的重要信息:\n"
for m in persistent_memories:
mem_time = m['time'][:10] if m['time'] else ""
system_content += f"- [{mem_time}] {m['nickname']}: {m['content']}\n"
# 构建历史上下文
history_context = ""
if is_group and from_wxid:
# 群聊:从 Redis/文件加载历史
history = await self._load_history(from_wxid)
max_context = self.config.get("history", {}).get("max_context", 50)
recent_history = history[-max_context:] if len(history) > max_context else history
if recent_history:
history_context = "\n\n【最近的群聊记录】\n"
for msg in recent_history:
msg_nickname = msg.get("nickname", "")
msg_content = msg.get("content", "")
if isinstance(msg_content, list):
# 多模态内容,提取文本
for item in msg_content:
if item.get("type") == "text":
msg_content = item.get("text", "")
break
else:
msg_content = "[图片]"
# 限制单条消息长度
if len(str(msg_content)) > 200:
msg_content = str(msg_content)[:200] + "..."
history_context += f"[{msg_nickname}] {msg_content}\n"
else:
# 私聊:从 memory 加载
if chat_id:
memory_messages = self._get_memory_messages(chat_id)
if memory_messages:
history_context = "\n\n【最近的对话记录】\n"
for msg in memory_messages[-20:]: # 最近20条
role = msg.get("role", "")
content = msg.get("content", "")
if isinstance(content, list):
for item in content:
if item.get("type") == "text":
content = item.get("text", "")
break
else:
content = "[图片]"
role_name = "用户" if role == "user" else ""
if len(str(content)) > 200:
content = str(content)[:200] + "..."
history_context += f"[{role_name}] {content}\n"
# 从 data:video/mp4;base64,xxx 中提取纯 base64 数据
if video_base64.startswith("data:"):
video_base64 = video_base64.split(",", 1)[1]
# 构建完整提示(人设 + 历史 + 当前问题)
full_prompt = system_content + history_context + f"\n\n【当前】用户发送了一个视频并问:{user_message or '请描述这个视频的内容'}"
# 构建 Gemini 原生格式请求
payload = {
"contents": [
{
"parts": [
{"text": full_prompt},
{
"inline_data": {
"mime_type": "video/mp4",
"data": video_base64
}
}
]
}
],
"generationConfig": {
"maxOutputTokens": video_config.get("max_tokens", 8192)
}
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
timeout = aiohttp.ClientTimeout(total=video_config.get("timeout", 360))
# 配置代理
connector = None
proxy_config = self.config.get("proxy", {})
if proxy_config.get("enabled", False) and PROXY_SUPPORT:
proxy_type = proxy_config.get("type", "socks5").upper()
proxy_host = proxy_config.get("host", "127.0.0.1")
proxy_port = proxy_config.get("port", 7890)
proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}"
try:
connector = ProxyConnector.from_url(proxy_url)
except Exception as e:
logger.warning(f"[视频识别] 代理配置失败: {e}")
logger.info(f"[视频识别] 调用 Gemini API: {full_url}")
logger.debug(f"[视频识别] 提示词长度: {len(full_prompt)} 字符")
async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
async with session.post(full_url, json=payload, headers=headers) as resp:
if resp.status != 200:
error_text = await resp.text()
logger.error(f"[视频识别] API 错误: {resp.status}, {error_text[:500]}")
return ""
# 解析 Gemini 响应格式
result = await resp.json()
# 详细记录响应(用于调试)
logger.info(f"[视频识别] API 响应 keys: {list(result.keys()) if isinstance(result, dict) else type(result)}")
# 检查是否有错误
if "error" in result:
logger.error(f"[视频识别] API 返回错误: {result['error']}")
return ""
# 检查 promptFeedback安全过滤信息
if "promptFeedback" in result:
feedback = result["promptFeedback"]
block_reason = feedback.get("blockReason", "")
if block_reason:
logger.warning(f"[视频识别] 请求被阻止,原因: {block_reason}")
logger.warning(f"[视频识别] 安全评级: {feedback.get('safetyRatings', [])}")
return "抱歉,视频内容无法分析(内容策略限制)。"
# 提取文本内容
full_content = ""
if "candidates" in result and result["candidates"]:
logger.info(f"[视频识别] candidates 数量: {len(result['candidates'])}")
for i, candidate in enumerate(result["candidates"]):
# 检查 finishReason
finish_reason = candidate.get("finishReason", "")
if finish_reason:
logger.info(f"[视频识别] candidate[{i}] finishReason: {finish_reason}")
if finish_reason == "SAFETY":
logger.warning(f"[视频识别] 内容被安全过滤: {candidate.get('safetyRatings', [])}")
return "抱歉,视频内容无法分析。"
content = candidate.get("content", {})
parts = content.get("parts", [])
logger.info(f"[视频识别] candidate[{i}] parts 数量: {len(parts)}")
for part in parts:
if "text" in part:
full_content += part["text"]
else:
# 没有 candidates记录完整响应
logger.error(f"[视频识别] 响应中没有 candidates: {str(result)[:500]}")
# 可能是上下文太长导致,记录 token 使用情况
if "usageMetadata" in result:
usage = result["usageMetadata"]
logger.warning(f"[视频识别] Token 使用: prompt={usage.get('promptTokenCount', 0)}, total={usage.get('totalTokenCount', 0)}")
logger.info(f"[视频识别] AI 响应完成,长度: {len(full_content)}")
# 如果没有内容,尝试简化重试
if not full_content:
logger.info("[视频识别] 尝试简化请求重试...")
return await self._call_ai_api_with_video_simple(
user_message or "请描述这个视频的内容",
video_base64,
video_config
)
return full_content.strip()
except Exception as e:
logger.error(f"[视频识别] API 调用失败: {e}")
import traceback
logger.error(traceback.format_exc())
return ""
async def _call_ai_api_with_video_simple(self, user_message: str, video_base64: str, video_config: dict) -> str:
"""简化版视频识别 API 调用(不带上下文,用于降级重试)"""
try:
api_url = video_config.get("api_url", "https://api.functen.cn/v1beta/models")
api_key = video_config.get("api_key", self.config["api"]["api_key"])
model = video_config.get("model", "gemini-3-pro-preview")
full_url = f"{api_url}/{model}:generateContent"
# 简化请求:只发送用户问题和视频
payload = {
"contents": [
{
"parts": [
{"text": user_message},
{
"inline_data": {
"mime_type": "video/mp4",
"data": video_base64
}
}
]
}
],
"generationConfig": {
"maxOutputTokens": video_config.get("max_tokens", 8192)
}
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
timeout = aiohttp.ClientTimeout(total=video_config.get("timeout", 360))
logger.info(f"[视频识别-简化] 调用 API: {full_url}")
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(full_url, json=payload, headers=headers) as resp:
if resp.status != 200:
error_text = await resp.text()
logger.error(f"[视频识别-简化] API 错误: {resp.status}, {error_text[:300]}")
return ""
result = await resp.json()
logger.info(f"[视频识别-简化] API 响应 keys: {list(result.keys())}")
# 提取文本
if "candidates" in result and result["candidates"]:
for candidate in result["candidates"]:
content = candidate.get("content", {})
for part in content.get("parts", []):
if "text" in part:
text = part["text"]
logger.info(f"[视频识别-简化] 成功,长度: {len(text)}")
return text
logger.error(f"[视频识别-简化] 仍然没有 candidates: {str(result)[:300]}")
return ""
except Exception as e:
logger.error(f"[视频识别-简化] 失败: {e}")
return ""
def _should_reply_quote(self, message: dict, title_text: str) -> bool:
"""判断是否应该回复引用消息"""
is_group = message.get("IsGroup", False)