diff --git a/plugins/message_summary/main.py b/plugins/message_summary/main.py index 15d0024..e6e5595 100644 --- a/plugins/message_summary/main.py +++ b/plugins/message_summary/main.py @@ -4,7 +4,8 @@ import time from pathlib import Path from typing import Dict, Any, Tuple, Optional, List -import requests +import aiohttp +from loguru import logger from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus @@ -205,46 +206,45 @@ class MessageSummaryPlugin(MessagePluginInterface): } try: - # 发送POST请求 - response = requests.post(self._api_url, headers=headers, json=data) - response.raise_for_status() # 检查请求是否成功 + async with aiohttp.ClientSession() as session: + async with session.post(self._api_url, headers=headers, json=data) as response: + response.raise_for_status() # 检查请求是否成功 + response_data = await response.json() + + self.LOG.info(f"Dify API响应状态码: {response.status}") + self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}") - # 解析响应 - response_data = response.json() - self.LOG.info(f"Dify API响应状态码: {response.status_code}") - self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}") + # 提取回答内容 + answer = response_data.get("answer", "") + # 去除广告内容pollinations.ai 的广告 + answer = remove_trailing_content(answer) + spath = "" + # 提取token使用情况 + metadata = response_data.get("metadata", {}) + usage = metadata.get("usage", {}) - # 提取回答内容 - answer = response_data.get("answer", "") - # 去除广告内容pollinations.ai 的广告 - answer = remove_trailing_content(answer) - spath = "" - # 提取token使用情况 - metadata = response_data.get("metadata", {}) - usage = metadata.get("usage", {}) + if usage: + prompt_tokens = usage.get("prompt_tokens", 0) + completion_tokens = usage.get("completion_tokens", 0) + total_tokens = usage.get("total_tokens", 0) - if usage: - prompt_tokens = usage.get("prompt_tokens", 0) - completion_tokens = usage.get("completion_tokens", 0) - total_tokens = usage.get("total_tokens", 0) + # 添加token信息 + tokens_info = f"\n\n【tokens】输入: {prompt_tokens} 生成: {completion_tokens} 总: {total_tokens}" + answer += tokens_info + try: + # 使用唯一文件名并指定完整路径 + timestamp = int(time.time()) + output_path = f"summary_{timestamp}.png" + # 构建完整的输出路径 + spath = await convert_md_str_to_image(answer, output_path) + self.LOG.info(f"成功生成图片: {spath}") + except Exception as e: + self.LOG.error(f"生成image失败:{e}", exc_info=True) + spath = None + # 返回文本内容和图片路径 + return answer, spath - # 添加token信息 - tokens_info = f"\n\n【tokens】输入: {prompt_tokens} 生成: {completion_tokens} 总: {total_tokens}" - answer += tokens_info - try: - # 使用唯一文件名并指定完整路径 - timestamp = int(time.time()) - output_path = f"summary_{timestamp}.png" - # 构建完整的输出路径 - spath = await convert_md_str_to_image(answer, output_path) - self.LOG.info(f"成功生成图片: {spath}") - except Exception as e: - self.LOG.error(f"生成image失败:{e}", exc_info=True) - spath = None - # 返回文本内容和图片路径 - return answer, spath - - except requests.exceptions.RequestException as e: + except aiohttp.ClientError as e: self.LOG.error(f"请求Dify API时出错: {e}") return f"生成总结时出错: {str(e)}", None diff --git a/plugins/video/main.py b/plugins/video/main.py index 932470f..a898199 100644 --- a/plugins/video/main.py +++ b/plugins/video/main.py @@ -1,7 +1,9 @@ import time - -from loguru import logger import os +import aiohttp +import aiofiles +import asyncio +from loguru import logger import requests from typing import Dict, Any, List, Optional, Tuple from pathlib import Path @@ -133,7 +135,7 @@ class VideoPlugin(MessagePluginInterface): save_path = os.path.join(self.download_dir, video_filename) self.LOG.info(f"开始下载视频到: {save_path}") - file_abspath, first_frame = self._download_stream(" http://api.yujn.cn/api/heisis.php?type=video", save_path) + file_abspath, first_frame = await self._download_stream(" http://api.yujn.cn/api/heisis.php?type=video", save_path) if not file_abspath or not os.path.exists(file_abspath) or not file_abspath.endswith("mp4"): self.LOG.error(f"视频下载失败,文件路径: {file_abspath}") @@ -154,9 +156,9 @@ class VideoPlugin(MessagePluginInterface): sender) return False, f"处理出错: {e}" - def _download_stream(self, url, save_path): + async def _download_stream(self, url, save_path): """ - 从指定URL读取视频流并保存到本地 + 从指定URL读取视频流并保存到本地(异步版本) :param url: 视频流的URL :param save_path: 本地保存路径(包含文件名,例如 "video.mp4") """ @@ -170,76 +172,78 @@ class VideoPlugin(MessagePluginInterface): "Connection": "keep-alive", "Referer": "https://api.guiguiya.com/" } - response = requests.get(url, stream=True, timeout=30, headers=headers, allow_redirects=True) - # 检查请求是否成功 - response.raise_for_status() # 如果状态码不是200,将抛出异常 + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=headers, allow_redirects=True) as response: + # 检查请求是否成功 + response.raise_for_status() - # 确保保存路径的目录存在 - save_dir = os.path.dirname(save_path) - if save_dir and not os.path.exists(save_dir): - os.makedirs(save_dir, exist_ok=True) - self.LOG.info(f"创建目录: {save_dir}") + # 确保保存路径的目录存在 + save_dir = os.path.dirname(save_path) + if save_dir and not os.path.exists(save_dir): + os.makedirs(save_dir, exist_ok=True) + self.LOG.info(f"创建目录: {save_dir}") - # 检查是否是视频流(可选,根据Content-Type判断) - content_type = response.headers.get("Content-Type", "").lower() - self.LOG.info(f"响应Content-Type: {content_type}") + # 检查是否是视频流(可选,根据Content-Type判断) + content_type = response.headers.get("Content-Type", "").lower() + self.LOG.info(f"响应Content-Type: {content_type}") - if "video" not in content_type and "application/octet-stream" not in content_type: - self.LOG.warning(f"警告: 返回的可能不是视频流,Content-Type: {content_type}") - self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看 - return None, None + if "video" not in content_type and "application/octet-stream" not in content_type: + self.LOG.warning(f"警告: 返回的可能不是视频流,Content-Type: {content_type}") + text = await response.text() + self.LOG.warning(f"响应内容预览: {text[:100]}") # 打印前100字符查看 + return None, None - # 以二进制写入模式保存流数据 - try: - total_size = int(response.headers.get('Content-Length', 0)) - self.LOG.info(f"预期下载大小: {total_size} 字节") - downloaded_size = 0 + # 以二进制写入模式保存流数据 + try: + total_size = int(response.headers.get('Content-Length', 0)) + self.LOG.info(f"预期下载大小: {total_size} 字节") + downloaded_size = 0 - with open(save_path, "wb") as file: - for chunk in response.iter_content(chunk_size=1024): # 分块读取,每块1KB - if chunk: # 过滤空块 - file.write(chunk) - downloaded_size += len(chunk) + async with aiofiles.open(save_path, "wb") as file: + async for chunk in response.content.iter_chunked(1024): # 分块读取,每块1KB + if chunk: # 过滤空块 + await file.write(chunk) + downloaded_size += len(chunk) - self.LOG.info(f"视频已下载到: {save_path}, 大小: {downloaded_size} 字节") + self.LOG.info(f"视频已下载到: {save_path}, 大小: {downloaded_size} 字节") - # 验证下载是否完整 - if total_size > 0 and downloaded_size < total_size * 0.9: # 如果下载不到预期大小的90% - self.LOG.error(f"下载不完整: 预期 {total_size} 字节, 实际 {downloaded_size} 字节") - return None, None - except IOError as e: - self.LOG.error(f"文件写入失败: {e}") - return None, None + # 验证下载是否完整 + if total_size > 0 and downloaded_size < total_size * 0.9: # 如果下载不到预期大小的90% + self.LOG.error(f"下载不完整: 预期 {total_size} 字节, 实际 {downloaded_size} 字节") + return None, None + except IOError as e: + self.LOG.error(f"文件写入失败: {e}") + return None, None - # 检查文件是否存在且大小合理 - if not os.path.exists(save_path): - self.LOG.error(f"下载的文件不存在: {save_path}") - return None, None + # 检查文件是否存在且大小合理 + if not os.path.exists(save_path): + self.LOG.error(f"下载的文件不存在: {save_path}") + return None, None - file_size = os.path.getsize(save_path) - if file_size < 10000: # 小于10KB的文件可能不是有效视频 - self.LOG.error(f"下载的文件太小,可能不是有效视频: {file_size} 字节") - # 尝试读取文件内容以诊断问题 - try: - with open(save_path, 'rb') as f: - content_preview = f.read(200) - self.LOG.warning(f"文件内容预览(十六进制): {content_preview.hex()[:100]}") - except Exception as e: - self.LOG.error(f"读取文件内容失败: {e}") - return None, None + file_size = os.path.getsize(save_path) + if file_size < 10000: # 小于10KB的文件可能不是有效视频 + self.LOG.error(f"下载的文件太小,可能不是有效视频: {file_size} 字节") + # 尝试读取文件内容以诊断问题 + try: + async with aiofiles.open(save_path, 'rb') as f: + content_preview = await f.read(200) + self.LOG.warning(f"文件内容预览(十六进制): {content_preview.hex()[:100]}") + except Exception as e: + self.LOG.error(f"读取文件内容失败: {e}") + return None, None - # 加入首帧下载 - first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg") - first_frame = self._get_first_frame(save_path, first_frame_path) + # 加入首帧下载 + first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg") + first_frame = await self._get_first_frame(save_path, first_frame_path) - if not first_frame or not os.path.exists(first_frame): - self.LOG.warning(f"无法提取首帧,使用默认图片") - # 可以在这里设置一个默认图片路径 + if not first_frame or not os.path.exists(first_frame): + self.LOG.warning(f"无法提取首帧,使用默认图片") + # 可以在这里设置一个默认图片路径 - return os.path.abspath(save_path), first_frame + return os.path.abspath(save_path), first_frame - except requests.RequestException as e: + except aiohttp.ClientError as e: self.LOG.error(f"请求失败: {e}") except IOError as e: self.LOG.error(f"文件写入失败: {e}") @@ -247,40 +251,47 @@ class VideoPlugin(MessagePluginInterface): self.LOG.error(f"下载视频时发生未知错误: {e}") return None, None - def _get_first_frame(self, video_path, output_path): + async def _get_first_frame(self, video_path, output_path): """ - 提取视频的第一帧并保存为图片 + 提取视频的第一帧并保存为图片(异步版本) :param video_path: 视频文件路径 :param output_path: 输出图片路径 :return: 输出图片的绝对路径,如果失败则返回None """ try: self.LOG.info(f"开始提取视频首帧: {video_path}") - # 打开视频文件 - cap = cv2.VideoCapture(video_path) - if not cap.isOpened(): - self.LOG.error(f"无法打开视频: {video_path}") - return None + + # 使用 asyncio.to_thread 包装 OpenCV 操作 + def extract_frame(): + # 打开视频文件 + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + self.LOG.error(f"无法打开视频: {video_path}") + return None - # 读取首帧 - ret, frame = cap.read() - if not ret: - self.LOG.error("无法读取视频帧") + # 读取首帧 + ret, frame = cap.read() + if not ret: + self.LOG.error("无法读取视频帧") + cap.release() + return None + + # 保存首帧为图片 + try: + cv2.imwrite(output_path, frame) + self.LOG.info(f"首帧已保存为: {output_path}") + except Exception as e: + self.LOG.error(f"保存首帧图片失败: {e}") + cap.release() + return None + + # 释放资源 cap.release() - return None + return os.path.abspath(output_path) - # 保存首帧为图片 - try: - cv2.imwrite(output_path, frame) - self.LOG.info(f"首帧已保存为: {output_path}") - except Exception as e: - self.LOG.error(f"保存首帧图片失败: {e}") - cap.release() - return None - - # 释放资源 - cap.release() - return os.path.abspath(output_path) + # 在线程池中执行 OpenCV 操作 + result = await asyncio.to_thread(extract_frame) + return result except Exception as e: self.LOG.error(f"提取视频首帧时出错: {e}") diff --git a/plugins/video_man/main.py b/plugins/video_man/main.py index 33c0061..e99741f 100644 --- a/plugins/video_man/main.py +++ b/plugins/video_man/main.py @@ -1,5 +1,7 @@ import os import time +import asyncio +import aiofiles from typing import Dict, Any, List, Optional, Tuple import aiohttp @@ -170,17 +172,17 @@ class VideoManPlugin(MessagePluginInterface): self.LOG.error(f"无法下载视频,HTTP状态码: {video_response.status}") return None - # 保存视频 - with open(save_path, "wb") as file: + # 使用 aiofiles 异步保存视频 + async with aiofiles.open(save_path, "wb") as file: async for chunk in video_response.content.iter_chunked(1024): if chunk: # 过滤空块 - file.write(chunk) + await file.write(chunk) abs_path = os.path.abspath(save_path) self.LOG.info(f"视频已下载至: {abs_path}") first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg") - first_frame = self._get_first_frame(save_path, first_frame_path) + first_frame = await self._get_first_frame(save_path, first_frame_path) return abs_path, first_frame @@ -193,40 +195,47 @@ class VideoManPlugin(MessagePluginInterface): return None - def _get_first_frame(self, video_path, output_path): + async def _get_first_frame(self, video_path, output_path): """ - 提取视频的第一帧并保存为图片 + 提取视频的第一帧并保存为图片(异步版本) :param video_path: 视频文件路径 :param output_path: 输出图片路径 :return: 输出图片的绝对路径,如果失败则返回None """ try: self.LOG.info(f"开始提取视频首帧: {video_path}") - # 打开视频文件 - cap = cv2.VideoCapture(video_path) - if not cap.isOpened(): - self.LOG.error(f"无法打开视频: {video_path}") - return None + + # 使用 asyncio.to_thread 包装 OpenCV 操作 + def extract_frame(): + # 打开视频文件 + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + self.LOG.error(f"无法打开视频: {video_path}") + return None - # 读取首帧 - ret, frame = cap.read() - if not ret: - self.LOG.error("无法读取视频帧") + # 读取首帧 + ret, frame = cap.read() + if not ret: + self.LOG.error("无法读取视频帧") + cap.release() + return None + + # 保存首帧为图片 + try: + cv2.imwrite(output_path, frame) + self.LOG.info(f"首帧已保存为: {output_path}") + except Exception as e: + self.LOG.error(f"保存首帧图片失败: {e}") + cap.release() + return None + + # 释放资源 cap.release() - return None + return os.path.abspath(output_path) - # 保存首帧为图片 - try: - cv2.imwrite(output_path, frame) - self.LOG.info(f"首帧已保存为: {output_path}") - except Exception as e: - self.LOG.error(f"保存首帧图片失败: {e}") - cap.release() - return None - - # 释放资源 - cap.release() - return os.path.abspath(output_path) + # 在线程池中执行 OpenCV 操作 + result = await asyncio.to_thread(extract_frame) + return result except Exception as e: self.LOG.error(f"提取视频首帧时出错: {e}")