优化IO问题,使用异步方案进行视频下载等操作。

This commit is contained in:
liuwei
2025-06-16 10:33:26 +08:00
parent 02a387628c
commit ed324eaa24
3 changed files with 170 additions and 150 deletions

View File

@@ -4,7 +4,8 @@ import time
from pathlib import Path from pathlib import Path
from typing import Dict, Any, Tuple, Optional, List from typing import Dict, Any, Tuple, Optional, List
import requests import aiohttp
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus from base.plugin_common.plugin_interface import PluginStatus
@@ -205,46 +206,45 @@ class MessageSummaryPlugin(MessagePluginInterface):
} }
try: try:
# 发送POST请求 async with aiohttp.ClientSession() as session:
response = requests.post(self._api_url, headers=headers, json=data) async with session.post(self._api_url, headers=headers, json=data) as response:
response.raise_for_status() # 检查请求是否成功 response.raise_for_status() # 检查请求是否成功
response_data = await response.json()
self.LOG.info(f"Dify API响应状态码: {response.status}")
self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}")
# 解析响应 # 提取回答内容
response_data = response.json() answer = response_data.get("answer", "")
self.LOG.info(f"Dify API响应状态码: {response.status_code}") # 去除广告内容pollinations.ai 的广告
self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}") answer = remove_trailing_content(answer)
spath = ""
# 提取token使用情况
metadata = response_data.get("metadata", {})
usage = metadata.get("usage", {})
# 提取回答内容 if usage:
answer = response_data.get("answer", "") prompt_tokens = usage.get("prompt_tokens", 0)
# 去除广告内容pollinations.ai 的广告 completion_tokens = usage.get("completion_tokens", 0)
answer = remove_trailing_content(answer) total_tokens = usage.get("total_tokens", 0)
spath = ""
# 提取token使用情况
metadata = response_data.get("metadata", {})
usage = metadata.get("usage", {})
if usage: # 添加token信息
prompt_tokens = usage.get("prompt_tokens", 0) tokens_info = f"\n\n【tokens】输入: {prompt_tokens} 生成: {completion_tokens} 总: {total_tokens}"
completion_tokens = usage.get("completion_tokens", 0) answer += tokens_info
total_tokens = usage.get("total_tokens", 0) try:
# 使用唯一文件名并指定完整路径
timestamp = int(time.time())
output_path = f"summary_{timestamp}.png"
# 构建完整的输出路径
spath = await convert_md_str_to_image(answer, output_path)
self.LOG.info(f"成功生成图片: {spath}")
except Exception as e:
self.LOG.error(f"生成image失败:{e}", exc_info=True)
spath = None
# 返回文本内容和图片路径
return answer, spath
# 添加token信息 except aiohttp.ClientError as e:
tokens_info = f"\n\n【tokens】输入: {prompt_tokens} 生成: {completion_tokens} 总: {total_tokens}"
answer += tokens_info
try:
# 使用唯一文件名并指定完整路径
timestamp = int(time.time())
output_path = f"summary_{timestamp}.png"
# 构建完整的输出路径
spath = await convert_md_str_to_image(answer, output_path)
self.LOG.info(f"成功生成图片: {spath}")
except Exception as e:
self.LOG.error(f"生成image失败:{e}", exc_info=True)
spath = None
# 返回文本内容和图片路径
return answer, spath
except requests.exceptions.RequestException as e:
self.LOG.error(f"请求Dify API时出错: {e}") self.LOG.error(f"请求Dify API时出错: {e}")
return f"生成总结时出错: {str(e)}", None return f"生成总结时出错: {str(e)}", None

View File

@@ -1,7 +1,9 @@
import time import time
from loguru import logger
import os import os
import aiohttp
import aiofiles
import asyncio
from loguru import logger
import requests import requests
from typing import Dict, Any, List, Optional, Tuple from typing import Dict, Any, List, Optional, Tuple
from pathlib import Path from pathlib import Path
@@ -133,7 +135,7 @@ class VideoPlugin(MessagePluginInterface):
save_path = os.path.join(self.download_dir, video_filename) save_path = os.path.join(self.download_dir, video_filename)
self.LOG.info(f"开始下载视频到: {save_path}") self.LOG.info(f"开始下载视频到: {save_path}")
file_abspath, first_frame = self._download_stream(" http://api.yujn.cn/api/heisis.php?type=video", save_path) file_abspath, first_frame = await self._download_stream(" http://api.yujn.cn/api/heisis.php?type=video", save_path)
if not file_abspath or not os.path.exists(file_abspath) or not file_abspath.endswith("mp4"): if not file_abspath or not os.path.exists(file_abspath) or not file_abspath.endswith("mp4"):
self.LOG.error(f"视频下载失败,文件路径: {file_abspath}") self.LOG.error(f"视频下载失败,文件路径: {file_abspath}")
@@ -154,9 +156,9 @@ class VideoPlugin(MessagePluginInterface):
sender) sender)
return False, f"处理出错: {e}" return False, f"处理出错: {e}"
def _download_stream(self, url, save_path): async def _download_stream(self, url, save_path):
""" """
从指定URL读取视频流并保存到本地 从指定URL读取视频流并保存到本地(异步版本)
:param url: 视频流的URL :param url: 视频流的URL
:param save_path: 本地保存路径(包含文件名,例如 "video.mp4" :param save_path: 本地保存路径(包含文件名,例如 "video.mp4"
""" """
@@ -170,76 +172,78 @@ class VideoPlugin(MessagePluginInterface):
"Connection": "keep-alive", "Connection": "keep-alive",
"Referer": "https://api.guiguiya.com/" "Referer": "https://api.guiguiya.com/"
} }
response = requests.get(url, stream=True, timeout=30, headers=headers, allow_redirects=True)
# 检查请求是否成功 async with aiohttp.ClientSession() as session:
response.raise_for_status() # 如果状态码不是200将抛出异常 async with session.get(url, headers=headers, allow_redirects=True) as response:
# 检查请求是否成功
response.raise_for_status()
# 确保保存路径的目录存在 # 确保保存路径的目录存在
save_dir = os.path.dirname(save_path) save_dir = os.path.dirname(save_path)
if save_dir and not os.path.exists(save_dir): if save_dir and not os.path.exists(save_dir):
os.makedirs(save_dir, exist_ok=True) os.makedirs(save_dir, exist_ok=True)
self.LOG.info(f"创建目录: {save_dir}") self.LOG.info(f"创建目录: {save_dir}")
# 检查是否是视频流可选根据Content-Type判断 # 检查是否是视频流可选根据Content-Type判断
content_type = response.headers.get("Content-Type", "").lower() content_type = response.headers.get("Content-Type", "").lower()
self.LOG.info(f"响应Content-Type: {content_type}") self.LOG.info(f"响应Content-Type: {content_type}")
if "video" not in content_type and "application/octet-stream" not in content_type: if "video" not in content_type and "application/octet-stream" not in content_type:
self.LOG.warning(f"警告: 返回的可能不是视频流Content-Type: {content_type}") self.LOG.warning(f"警告: 返回的可能不是视频流Content-Type: {content_type}")
self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看 text = await response.text()
return None, None self.LOG.warning(f"响应内容预览: {text[:100]}") # 打印前100字符查看
return None, None
# 以二进制写入模式保存流数据 # 以二进制写入模式保存流数据
try: try:
total_size = int(response.headers.get('Content-Length', 0)) total_size = int(response.headers.get('Content-Length', 0))
self.LOG.info(f"预期下载大小: {total_size} 字节") self.LOG.info(f"预期下载大小: {total_size} 字节")
downloaded_size = 0 downloaded_size = 0
with open(save_path, "wb") as file: async with aiofiles.open(save_path, "wb") as file:
for chunk in response.iter_content(chunk_size=1024): # 分块读取每块1KB async for chunk in response.content.iter_chunked(1024): # 分块读取每块1KB
if chunk: # 过滤空块 if chunk: # 过滤空块
file.write(chunk) await file.write(chunk)
downloaded_size += len(chunk) downloaded_size += len(chunk)
self.LOG.info(f"视频已下载到: {save_path}, 大小: {downloaded_size} 字节") self.LOG.info(f"视频已下载到: {save_path}, 大小: {downloaded_size} 字节")
# 验证下载是否完整 # 验证下载是否完整
if total_size > 0 and downloaded_size < total_size * 0.9: # 如果下载不到预期大小的90% if total_size > 0 and downloaded_size < total_size * 0.9: # 如果下载不到预期大小的90%
self.LOG.error(f"下载不完整: 预期 {total_size} 字节, 实际 {downloaded_size} 字节") self.LOG.error(f"下载不完整: 预期 {total_size} 字节, 实际 {downloaded_size} 字节")
return None, None return None, None
except IOError as e: except IOError as e:
self.LOG.error(f"文件写入失败: {e}") self.LOG.error(f"文件写入失败: {e}")
return None, None return None, None
# 检查文件是否存在且大小合理 # 检查文件是否存在且大小合理
if not os.path.exists(save_path): if not os.path.exists(save_path):
self.LOG.error(f"下载的文件不存在: {save_path}") self.LOG.error(f"下载的文件不存在: {save_path}")
return None, None return None, None
file_size = os.path.getsize(save_path) file_size = os.path.getsize(save_path)
if file_size < 10000: # 小于10KB的文件可能不是有效视频 if file_size < 10000: # 小于10KB的文件可能不是有效视频
self.LOG.error(f"下载的文件太小,可能不是有效视频: {file_size} 字节") self.LOG.error(f"下载的文件太小,可能不是有效视频: {file_size} 字节")
# 尝试读取文件内容以诊断问题 # 尝试读取文件内容以诊断问题
try: try:
with open(save_path, 'rb') as f: async with aiofiles.open(save_path, 'rb') as f:
content_preview = f.read(200) content_preview = await f.read(200)
self.LOG.warning(f"文件内容预览(十六进制): {content_preview.hex()[:100]}") self.LOG.warning(f"文件内容预览(十六进制): {content_preview.hex()[:100]}")
except Exception as e: except Exception as e:
self.LOG.error(f"读取文件内容失败: {e}") self.LOG.error(f"读取文件内容失败: {e}")
return None, None return None, None
# 加入首帧下载 # 加入首帧下载
first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg") first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg")
first_frame = self._get_first_frame(save_path, first_frame_path) first_frame = await self._get_first_frame(save_path, first_frame_path)
if not first_frame or not os.path.exists(first_frame): if not first_frame or not os.path.exists(first_frame):
self.LOG.warning(f"无法提取首帧,使用默认图片") self.LOG.warning(f"无法提取首帧,使用默认图片")
# 可以在这里设置一个默认图片路径 # 可以在这里设置一个默认图片路径
return os.path.abspath(save_path), first_frame return os.path.abspath(save_path), first_frame
except requests.RequestException as e: except aiohttp.ClientError as e:
self.LOG.error(f"请求失败: {e}") self.LOG.error(f"请求失败: {e}")
except IOError as e: except IOError as e:
self.LOG.error(f"文件写入失败: {e}") self.LOG.error(f"文件写入失败: {e}")
@@ -247,40 +251,47 @@ class VideoPlugin(MessagePluginInterface):
self.LOG.error(f"下载视频时发生未知错误: {e}") self.LOG.error(f"下载视频时发生未知错误: {e}")
return None, None return None, None
def _get_first_frame(self, video_path, output_path): async def _get_first_frame(self, video_path, output_path):
""" """
提取视频的第一帧并保存为图片 提取视频的第一帧并保存为图片(异步版本)
:param video_path: 视频文件路径 :param video_path: 视频文件路径
:param output_path: 输出图片路径 :param output_path: 输出图片路径
:return: 输出图片的绝对路径如果失败则返回None :return: 输出图片的绝对路径如果失败则返回None
""" """
try: try:
self.LOG.info(f"开始提取视频首帧: {video_path}") self.LOG.info(f"开始提取视频首帧: {video_path}")
# 打开视频文件
cap = cv2.VideoCapture(video_path) # 使用 asyncio.to_thread 包装 OpenCV 操作
if not cap.isOpened(): def extract_frame():
self.LOG.error(f"无法打开视频: {video_path}") # 打开视频文件
return None cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
self.LOG.error(f"无法打开视频: {video_path}")
return None
# 读取首帧 # 读取首帧
ret, frame = cap.read() ret, frame = cap.read()
if not ret: if not ret:
self.LOG.error("无法读取视频帧") self.LOG.error("无法读取视频帧")
cap.release()
return None
# 保存首帧为图片
try:
cv2.imwrite(output_path, frame)
self.LOG.info(f"首帧已保存为: {output_path}")
except Exception as e:
self.LOG.error(f"保存首帧图片失败: {e}")
cap.release()
return None
# 释放资源
cap.release() cap.release()
return None return os.path.abspath(output_path)
# 保存首帧为图片 # 在线程池中执行 OpenCV 操作
try: result = await asyncio.to_thread(extract_frame)
cv2.imwrite(output_path, frame) return result
self.LOG.info(f"首帧已保存为: {output_path}")
except Exception as e:
self.LOG.error(f"保存首帧图片失败: {e}")
cap.release()
return None
# 释放资源
cap.release()
return os.path.abspath(output_path)
except Exception as e: except Exception as e:
self.LOG.error(f"提取视频首帧时出错: {e}") self.LOG.error(f"提取视频首帧时出错: {e}")

View File

@@ -1,5 +1,7 @@
import os import os
import time import time
import asyncio
import aiofiles
from typing import Dict, Any, List, Optional, Tuple from typing import Dict, Any, List, Optional, Tuple
import aiohttp import aiohttp
@@ -170,17 +172,17 @@ class VideoManPlugin(MessagePluginInterface):
self.LOG.error(f"无法下载视频HTTP状态码: {video_response.status}") self.LOG.error(f"无法下载视频HTTP状态码: {video_response.status}")
return None return None
# 保存视频 # 使用 aiofiles 异步保存视频
with open(save_path, "wb") as file: async with aiofiles.open(save_path, "wb") as file:
async for chunk in video_response.content.iter_chunked(1024): async for chunk in video_response.content.iter_chunked(1024):
if chunk: # 过滤空块 if chunk: # 过滤空块
file.write(chunk) await file.write(chunk)
abs_path = os.path.abspath(save_path) abs_path = os.path.abspath(save_path)
self.LOG.info(f"视频已下载至: {abs_path}") self.LOG.info(f"视频已下载至: {abs_path}")
first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg") first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg")
first_frame = self._get_first_frame(save_path, first_frame_path) first_frame = await self._get_first_frame(save_path, first_frame_path)
return abs_path, first_frame return abs_path, first_frame
@@ -193,40 +195,47 @@ class VideoManPlugin(MessagePluginInterface):
return None return None
def _get_first_frame(self, video_path, output_path): async def _get_first_frame(self, video_path, output_path):
""" """
提取视频的第一帧并保存为图片 提取视频的第一帧并保存为图片(异步版本)
:param video_path: 视频文件路径 :param video_path: 视频文件路径
:param output_path: 输出图片路径 :param output_path: 输出图片路径
:return: 输出图片的绝对路径如果失败则返回None :return: 输出图片的绝对路径如果失败则返回None
""" """
try: try:
self.LOG.info(f"开始提取视频首帧: {video_path}") self.LOG.info(f"开始提取视频首帧: {video_path}")
# 打开视频文件
cap = cv2.VideoCapture(video_path) # 使用 asyncio.to_thread 包装 OpenCV 操作
if not cap.isOpened(): def extract_frame():
self.LOG.error(f"无法打开视频: {video_path}") # 打开视频文件
return None cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
self.LOG.error(f"无法打开视频: {video_path}")
return None
# 读取首帧 # 读取首帧
ret, frame = cap.read() ret, frame = cap.read()
if not ret: if not ret:
self.LOG.error("无法读取视频帧") self.LOG.error("无法读取视频帧")
cap.release()
return None
# 保存首帧为图片
try:
cv2.imwrite(output_path, frame)
self.LOG.info(f"首帧已保存为: {output_path}")
except Exception as e:
self.LOG.error(f"保存首帧图片失败: {e}")
cap.release()
return None
# 释放资源
cap.release() cap.release()
return None return os.path.abspath(output_path)
# 保存首帧为图片 # 在线程池中执行 OpenCV 操作
try: result = await asyncio.to_thread(extract_frame)
cv2.imwrite(output_path, frame) return result
self.LOG.info(f"首帧已保存为: {output_path}")
except Exception as e:
self.LOG.error(f"保存首帧图片失败: {e}")
cap.release()
return None
# 释放资源
cap.release()
return os.path.abspath(output_path)
except Exception as e: except Exception as e:
self.LOG.error(f"提取视频首帧时出错: {e}") self.LOG.error(f"提取视频首帧时出错: {e}")