加入图片缓存,每次从文件系统中提取相关的图片,加载成bytes,后续使用时直接从缓存中提取。减少IO读取次数,提高发送性能
This commit is contained in:
@@ -2,7 +2,8 @@ import os
|
||||
import time
|
||||
import random
|
||||
import asyncio
|
||||
from typing import Optional, List
|
||||
from typing import Optional, List, Dict
|
||||
from collections import deque
|
||||
|
||||
import logging
|
||||
|
||||
@@ -16,10 +17,15 @@ class ImageCacheManager:
|
||||
LAST_UPDATE_TIME_KEY = "group:images:last_update_time"
|
||||
BATCH_SIZE = 500
|
||||
|
||||
def __init__(self, image_folder: str):
|
||||
def __init__(self, image_folder: str, cache_size: int = 5):
|
||||
self.image_folder = image_folder
|
||||
self.redis = DBConnectionManager.get_instance().get_redis_connection()
|
||||
self.image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}
|
||||
|
||||
# 内存缓存相关
|
||||
self.cache_size = cache_size
|
||||
self.image_bytes_cache = deque(maxlen=cache_size) # 使用deque作为LRU缓存
|
||||
self.is_refilling = False # 防止重复填充缓存
|
||||
|
||||
def _get_last_update_time(self) -> float:
|
||||
ts = self.redis.get(self.LAST_UPDATE_TIME_KEY)
|
||||
@@ -112,3 +118,86 @@ class ImageCacheManager:
|
||||
except Exception as e:
|
||||
logger.error(f"获取随机图片失败: {e}")
|
||||
return None
|
||||
|
||||
def _load_image_bytes(self, image_path: str) -> Optional[bytes]:
|
||||
"""从磁盘加载图片的bytes数据"""
|
||||
try:
|
||||
with open(image_path, 'rb') as f:
|
||||
return f.read()
|
||||
except Exception as e:
|
||||
logger.error(f"读取图片文件失败 {image_path}: {e}")
|
||||
return None
|
||||
|
||||
def _refill_cache(self):
|
||||
"""重新填充缓存"""
|
||||
if self.is_refilling:
|
||||
return
|
||||
|
||||
self.is_refilling = True
|
||||
try:
|
||||
logger.info("开始重新填充图片缓存...")
|
||||
|
||||
# 获取多个随机图片路径
|
||||
redis_key = self.IMAGE_KEY_PREFIX + "all"
|
||||
image_paths = []
|
||||
|
||||
# 获取比缓存大小多一点的图片路径,以防有些文件读取失败
|
||||
for _ in range(self.cache_size + 2):
|
||||
try:
|
||||
img = self.redis.srandmember(redis_key)
|
||||
if img:
|
||||
path = img.decode('utf-8') if isinstance(img, bytes) else img
|
||||
# 检查路径是否已经在缓存中
|
||||
existing_paths = [item['path'] for item in self.image_bytes_cache]
|
||||
if path not in existing_paths:
|
||||
image_paths.append(path)
|
||||
except Exception as e:
|
||||
logger.error(f"获取随机图片路径失败: {e}")
|
||||
continue
|
||||
|
||||
# 加载图片bytes并添加到缓存
|
||||
loaded_count = 0
|
||||
for path in image_paths:
|
||||
if loaded_count >= self.cache_size:
|
||||
break
|
||||
|
||||
image_bytes = self._load_image_bytes(path)
|
||||
if image_bytes:
|
||||
self.image_bytes_cache.append({
|
||||
'path': path,
|
||||
'bytes': image_bytes
|
||||
})
|
||||
loaded_count += 1
|
||||
|
||||
logger.info(f"缓存填充完成,新增 {loaded_count} 张图片")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"填充缓存失败: {e}")
|
||||
finally:
|
||||
self.is_refilling = False
|
||||
|
||||
def get_cached_image_bytes(self) -> Optional[Dict[str, any]]:
|
||||
"""
|
||||
从缓存中获取图片bytes数据
|
||||
返回格式: {'path': str, 'bytes': bytes}
|
||||
"""
|
||||
# 如果缓存为空,立即填充
|
||||
if not self.image_bytes_cache:
|
||||
self._refill_cache()
|
||||
if not self.image_bytes_cache:
|
||||
return None
|
||||
|
||||
# 从缓存中取出一个图片
|
||||
cached_image = self.image_bytes_cache.popleft()
|
||||
|
||||
# 如果缓存中只剩最后一个,异步填充缓存
|
||||
if len(self.image_bytes_cache) <= 1:
|
||||
# 使用线程池异步填充,避免阻塞
|
||||
import threading
|
||||
threading.Thread(target=self._refill_cache, daemon=True).start()
|
||||
|
||||
return cached_image
|
||||
|
||||
def get_cached_image_count(self) -> int:
|
||||
"""获取当前缓存中的图片数量"""
|
||||
return len(self.image_bytes_cache)
|
||||
|
||||
Reference in New Issue
Block a user