响应指令媒资发送增加内存缓存机制

1. 在趣味指令插件中新增媒资缓存:首次发送读磁盘,后续优先从内存读取,减少重复I/O。

2. 缓存键包含路径+mtime+size,文件更新后可自动回源读取新内容。

3. 增加单文件上限与总容量上限,并采用LRU淘汰策略防止内存膨胀。

4. 图片语音视频发送链路改为优先使用缓存字节数据发送。
This commit is contained in:
liuwei
2026-04-23 13:32:40 +08:00
parent 3c7becd94f
commit c2bc110c57
2 changed files with 133 additions and 8 deletions

View File

@@ -1,2 +1,8 @@
[FunCommandPlay] [FunCommandPlay]
enable = true enable = true
# 媒资缓存开关:开启后首次发送读磁盘,后续复用内存缓存。
media-cache-enable = true
# 单个媒资文件允许缓存的最大体积MB
media-cache-max-file-mb = 80
# 媒资缓存总上限MB超限后按最近最少使用策略淘汰。
media-cache-max-total-mb = 200

View File

@@ -9,6 +9,8 @@
import asyncio import asyncio
import os import os
import threading
from collections import OrderedDict
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
@@ -64,6 +66,16 @@ class FunCommandPlayPlugin(MessagePluginInterface):
self.feature = self.register_feature() self.feature = self.register_feature()
self.rule_service: Optional[FunCommandRuleService] = None self.rule_service: Optional[FunCommandRuleService] = None
self.enable = True self.enable = True
# 媒资缓存开关:默认开启,首次发送时读磁盘,后续直接走内存,降低 I/O 开销。
self.media_cache_enable = True
# 单个文件允许缓存的最大体积(字节)。超过阈值则只发送不入缓存,防止单文件挤爆内存。
self.media_cache_max_file_bytes = 80 * 1024 * 1024
# 全局缓存总上限(字节)。采用 LRU 淘汰,越久未使用越先被移除。
self.media_cache_max_total_bytes = 200 * 1024 * 1024
# 缓存结构key -> bytes。OrderedDict 用于维护访问顺序,实现 LRU。
self._media_cache: "OrderedDict[str, bytes]" = OrderedDict()
self._media_cache_total_bytes = 0
self._media_cache_lock = threading.RLock()
def initialize(self, context: Dict[str, Any]) -> bool: def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件。""" """初始化插件。"""
@@ -78,6 +90,17 @@ class FunCommandPlayPlugin(MessagePluginInterface):
# 读取开关配置:保留插件级总开关,便于快速停用。 # 读取开关配置:保留插件级总开关,便于快速停用。
plugin_cfg = self._config.get("FunCommandPlay", {}) plugin_cfg = self._config.get("FunCommandPlay", {})
self.enable = bool(plugin_cfg.get("enable", True)) self.enable = bool(plugin_cfg.get("enable", True))
# 媒资缓存参数支持通过配置覆盖,便于你按机器内存规模灵活调整。
# 约定单位为 MB方便运维理解和调整。
self.media_cache_enable = bool(plugin_cfg.get("media-cache-enable", True))
self.media_cache_max_file_bytes = max(
int(plugin_cfg.get("media-cache-max-file-mb", 80) or 80),
1
) * 1024 * 1024
self.media_cache_max_total_bytes = max(
int(plugin_cfg.get("media-cache-max-total-mb", 200) or 200),
10
) * 1024 * 1024
# 初始化规则服务,确保首次启动就有表结构。 # 初始化规则服务,确保首次启动就有表结构。
redis_client = db_manager.get_redis_connection() redis_client = db_manager.get_redis_connection()
@@ -92,6 +115,98 @@ class FunCommandPlayPlugin(MessagePluginInterface):
self.LOG.debug(f"[{self.name}] 插件初始化完成") self.LOG.debug(f"[{self.name}] 插件初始化完成")
return True return True
@staticmethod
def _build_media_cache_key(media_kind: str, media_path: str) -> str:
"""构建媒资缓存键。
关键点:
1. 键里带上绝对路径 + 文件修改时间 + 文件大小,确保文件内容变更后会自动命中新键。
2. 不依赖手动清缓存,更新本地文件后下一次发送会自然回源读取新内容。
"""
abs_path = os.path.abspath(media_path)
stat = os.stat(abs_path)
return f"{media_kind}:{abs_path}:{int(stat.st_mtime_ns)}:{int(stat.st_size)}"
def _try_get_media_bytes_from_cache(self, cache_key: str) -> Optional[bytes]:
"""尝试从内存缓存读取媒资字节。
读取后会把条目移动到末尾,表示最近访问,配合 LRU 淘汰策略使用。
"""
with self._media_cache_lock:
cached = self._media_cache.get(cache_key)
if cached is None:
return None
self._media_cache.move_to_end(cache_key)
return cached
def _put_media_bytes_to_cache(self, cache_key: str, payload: bytes) -> None:
"""写入媒资到内存缓存,并执行 LRU 淘汰。
规则:
1. 单文件超过阈值不入缓存。
2. 新写入前先清理旧键占用(若覆盖)。
3. 超过总容量时,从最久未使用项开始淘汰。
"""
if not self.media_cache_enable:
return
if payload is None:
return
payload_size = len(payload)
if payload_size <= 0:
return
if payload_size > self.media_cache_max_file_bytes:
return
with self._media_cache_lock:
old_payload = self._media_cache.pop(cache_key, None)
if old_payload is not None:
self._media_cache_total_bytes -= len(old_payload)
self._media_cache[cache_key] = payload
self._media_cache_total_bytes += payload_size
self._media_cache.move_to_end(cache_key)
while self._media_cache_total_bytes > self.media_cache_max_total_bytes and self._media_cache:
stale_key, stale_payload = self._media_cache.popitem(last=False)
self._media_cache_total_bytes -= len(stale_payload)
self.LOG.debug(f"[{self.name}] 媒资缓存淘汰: {stale_key}")
def _load_media_bytes(self, media_kind: str, media_path: str) -> Optional[bytes]:
"""加载媒资字节:先缓存,后磁盘。
行为:
1. 首次发送读取磁盘并缓存。
2. 后续发送命中缓存,避免重复磁盘 I/O。
"""
if not media_path:
return None
if not os.path.exists(media_path):
self.LOG.warning(f"[{self.name}] 媒资路径不存在: {media_path}")
return None
try:
cache_key = self._build_media_cache_key(media_kind=media_kind, media_path=media_path)
except Exception as e:
self.LOG.warning(f"[{self.name}] 构建媒资缓存键失败,将回退磁盘读取: path={media_path}, error={e}")
cache_key = ""
if cache_key:
cached = self._try_get_media_bytes_from_cache(cache_key)
if cached is not None:
return cached
try:
with open(media_path, "rb") as fp:
payload = fp.read()
except Exception as e:
self.LOG.error(f"[{self.name}] 读取媒资文件失败: path={media_path}, error={e}")
return None
if cache_key:
self._put_media_bytes_to_cache(cache_key, payload)
return payload
def start(self) -> bool: def start(self) -> bool:
self.status = PluginStatus.RUNNING self.status = PluginStatus.RUNNING
self.LOG.info(f"[{self.name}] 插件已启动") self.LOG.info(f"[{self.name}] 插件已启动")
@@ -238,28 +353,32 @@ class FunCommandPlayPlugin(MessagePluginInterface):
if action_type == "image": if action_type == "image":
image_path = self._render_template(str(action.get("path", "") or ""), context) image_path = self._render_template(str(action.get("path", "") or ""), context)
if image_path and os.path.exists(image_path): image_bytes = self._load_media_bytes(media_kind="image", media_path=image_path)
await bot.send_image_message(target_id, Path(image_path)) if image_bytes:
await bot.send_image_message(target_id, image_bytes)
return return
if action_type == "voice": if action_type == "voice":
voice_path = self._render_template(str(action.get("path", "") or ""), context) voice_path = self._render_template(str(action.get("path", "") or ""), context)
voice_format = str(action.get("format", "") or "").strip().lower() voice_format = str(action.get("format", "") or "").strip().lower()
if voice_path and os.path.exists(voice_path): voice_bytes = self._load_media_bytes(media_kind="voice", media_path=voice_path)
if voice_bytes:
if not voice_format: if not voice_format:
suffix = Path(voice_path).suffix.lower() suffix = Path(voice_path).suffix.lower()
voice_format = "wav" if suffix == ".wav" else "mp3" voice_format = "wav" if suffix == ".wav" else "mp3"
await bot.send_voice_message(target_id, Path(voice_path), voice_format) await bot.send_voice_message(target_id, voice_bytes, voice_format)
return return
if action_type == "video": if action_type == "video":
video_path = self._render_template(str(action.get("path", "") or ""), context) video_path = self._render_template(str(action.get("path", "") or ""), context)
cover_path = self._render_template(str(action.get("cover_path", "") or ""), context) cover_path = self._render_template(str(action.get("cover_path", "") or ""), context)
if video_path and os.path.exists(video_path): video_bytes = self._load_media_bytes(media_kind="video", media_path=video_path)
if cover_path and os.path.exists(cover_path): if video_bytes:
await bot.send_video_message(target_id, Path(video_path), Path(cover_path)) cover_bytes = self._load_media_bytes(media_kind="image", media_path=cover_path) if cover_path else None
if cover_bytes:
await bot.send_video_message(target_id, video_bytes, cover_bytes)
else: else:
await bot.send_video_message(target_id, Path(video_path)) await bot.send_video_message(target_id, video_bytes)
return return
if action_type == "link": if action_type == "link":