图片保存性能优化
This commit is contained in:
@@ -7,6 +7,9 @@ import os
|
|||||||
import base64
|
import base64
|
||||||
import imghdr
|
import imghdr
|
||||||
|
|
||||||
|
import re
|
||||||
|
from threading import Lock
|
||||||
|
|
||||||
from db.connection import DBConnectionManager
|
from db.connection import DBConnectionManager
|
||||||
from db.levels_db import LevelsDBOperator
|
from db.levels_db import LevelsDBOperator
|
||||||
from db.message_storage import MessageStorageDB
|
from db.message_storage import MessageStorageDB
|
||||||
@@ -41,8 +44,20 @@ class MessageStorage:
|
|||||||
self.client = client
|
self.client = client
|
||||||
|
|
||||||
# 图片处理相关初始化
|
# 图片处理相关初始化
|
||||||
self.image_executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) # 专用于图片处理的线程池
|
self.image_executor = concurrent.futures.ThreadPoolExecutor(max_workers=8) # 专用于图片处理的线程池
|
||||||
self.image_tasks = []
|
self.image_tasks = []
|
||||||
|
# 图片任务在途控制
|
||||||
|
self._image_task_inflight = 0
|
||||||
|
self._image_task_lock = Lock()
|
||||||
|
self.MAX_IMAGE_TASKS = 50 # 可调,20~100 之间
|
||||||
|
|
||||||
|
# 事件循环(只创建一次,替代 asyncio.run)
|
||||||
|
self._image_loop = asyncio.new_event_loop()
|
||||||
|
|
||||||
|
# 正则(替代 XML 解析)
|
||||||
|
self._aeskey_re = re.compile(r'aeskey="(.*?)"')
|
||||||
|
self._cdn_re = re.compile(r'cdnthumburl="(.*?)"')
|
||||||
|
|
||||||
# 修改为项目根目录下的 static/images
|
# 修改为项目根目录下的 static/images
|
||||||
self.image_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static", "images")
|
self.image_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static", "images")
|
||||||
# 确保图片存储目录存在
|
# 确保图片存储目录存在
|
||||||
@@ -98,107 +113,127 @@ class MessageStorage:
|
|||||||
}
|
}
|
||||||
|
|
||||||
def process_image(self, msg: WxMessage):
|
def process_image(self, msg: WxMessage):
|
||||||
"""异步处理图片消息,与消息存档分离"""
|
"""提交图片处理任务(同步环境,带背压)"""
|
||||||
if msg.msg_type != MessageType.IMAGE or not self.client: # 不是图片消息或没有client实例
|
|
||||||
|
if msg.msg_type != MessageType.IMAGE or not self.client:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# 提交任务到图片处理线程池
|
# ===== 背压:限制在途任务 =====
|
||||||
|
with self._image_task_lock:
|
||||||
|
if self._image_task_inflight >= self.MAX_IMAGE_TASKS:
|
||||||
|
logger.warning("图片任务过多,暂时丢弃一条")
|
||||||
|
return False
|
||||||
|
self._image_task_inflight += 1
|
||||||
|
|
||||||
future = self.image_executor.submit(self._process_image_task, msg)
|
future = self.image_executor.submit(self._process_image_task, msg)
|
||||||
# 添加回调函数
|
future.add_done_callback(self._process_image_done)
|
||||||
future.add_done_callback(self._process_image_callback)
|
|
||||||
# 将任务添加到待处理列表
|
|
||||||
self.image_tasks.append(future)
|
|
||||||
# 清理已完成的任务
|
|
||||||
self._cleanup_completed_tasks()
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _process_image_task(self, msg: WxMessage):
|
def _process_image_task(self, msg: WxMessage):
|
||||||
"""实际执行图片处理的任务函数"""
|
"""实际执行图片处理的任务函数(同步高性能版)"""
|
||||||
try:
|
try:
|
||||||
# 使用wcf下载图片,确保图片存在
|
if not self.client or not msg.msg_id:
|
||||||
if self.client and msg.msg_id:
|
return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender,
|
||||||
# 从msg中提取xml内容,获取xml里面的参数
|
'error': "实例不存在或消息ID无效"}
|
||||||
xml_content = msg.content.clean_content
|
xml_content = msg.content.clean_content
|
||||||
|
|
||||||
root = ET.fromstring(xml_content)
|
# ===== 1. 正则提取参数(替代 XML)=====
|
||||||
img_elem = root.find("img")
|
aeskey_match = self._aeskey_re.search(xml_content)
|
||||||
if img_elem is not None:
|
cdn_match = self._cdn_re.search(xml_content)
|
||||||
aeskey = img_elem.attrib.get("aeskey", "")
|
|
||||||
cdnthumburl = img_elem.attrib.get("cdnthumburl", "")
|
if not aeskey_match or not cdn_match:
|
||||||
base64_str = asyncio.run(self.client.download_image(aeskey=aeskey, cdnmidimgurl=cdnthumburl))
|
return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender,
|
||||||
if base64_str:
|
'error': "XML 中未找到图片参数"}
|
||||||
group_dir = os.path.join(self.image_dir, msg.roomid or "unknown")
|
aeskey = aeskey_match.group(1)
|
||||||
if not os.path.exists(group_dir):
|
cdnthumburl = cdn_match.group(1)
|
||||||
os.makedirs(group_dir, exist_ok=True)
|
|
||||||
data = base64.b64decode(base64_str)
|
# ===== 2. 下载图片(复用事件循环)=====
|
||||||
kind = imghdr.what(None, h=data)
|
|
||||||
ext = "jpg" if kind == "jpeg" else (kind if kind else "png")
|
|
||||||
file_name = f"{msg.msg_id}.{ext}"
|
|
||||||
file_path = os.path.join(group_dir, file_name)
|
|
||||||
skipped = False
|
|
||||||
if not os.path.exists(file_path):
|
|
||||||
try:
|
try:
|
||||||
|
base64_str = self._image_loop.run_until_complete(
|
||||||
|
self.client.download_image(
|
||||||
|
aeskey=aeskey,
|
||||||
|
cdnmidimgurl=cdnthumburl
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender,
|
||||||
|
'error': "图片下载失败"}
|
||||||
|
|
||||||
|
if not base64_str:
|
||||||
|
return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender,
|
||||||
|
'error': "图片下载失败"}
|
||||||
|
|
||||||
|
# ===== 3. base64 解码 =====
|
||||||
|
try:
|
||||||
|
data = base64.b64decode(base64_str)
|
||||||
|
except Exception as e:
|
||||||
|
return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender,
|
||||||
|
'error': "图片下载失败"}
|
||||||
|
|
||||||
|
# ===== 4. 构建路径 =====
|
||||||
|
room_id = msg.roomid or "unknown"
|
||||||
|
group_dir = os.path.join(self.image_dir, room_id)
|
||||||
|
os.makedirs(group_dir, exist_ok=True)
|
||||||
|
|
||||||
|
# 微信图片默认 jpg,没必要 imghdr
|
||||||
|
file_name = f"{msg.msg_id}.jpg"
|
||||||
|
file_path = os.path.join(group_dir, file_name)
|
||||||
|
|
||||||
|
skipped = False
|
||||||
|
|
||||||
|
# ===== 5. 写文件 =====
|
||||||
|
if not os.path.isfile(file_path):
|
||||||
with open(file_path, "wb") as f:
|
with open(file_path, "wb") as f:
|
||||||
f.write(data)
|
f.write(data)
|
||||||
except Exception as e:
|
|
||||||
return {
|
|
||||||
'success': False,
|
|
||||||
'message_id': msg.msg_id,
|
|
||||||
'roomid': msg.roomid,
|
|
||||||
'sender': msg.sender,
|
|
||||||
'error': f"图片保存失败: {e}"
|
|
||||||
}
|
|
||||||
else:
|
else:
|
||||||
skipped = True
|
skipped = True
|
||||||
web_path = f"/static/images/{msg.roomid}/{file_name}"
|
|
||||||
|
# ===== 6. 更新数据库 =====
|
||||||
|
web_path = f"/static/images/{room_id}/{file_name}"
|
||||||
self.message_db.update_message_image_file_path(msg.msg_id, web_path)
|
self.message_db.update_message_image_file_path(msg.msg_id, web_path)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'success': True,
|
"success": True,
|
||||||
'message_id': msg.msg_id,
|
"message_id": msg.msg_id,
|
||||||
'roomid': msg.roomid,
|
"roomid": room_id,
|
||||||
'sender': msg.sender,
|
"sender": msg.sender,
|
||||||
'file_path': web_path,
|
"file_path": web_path,
|
||||||
'skipped': skipped
|
"skipped": skipped
|
||||||
}
|
|
||||||
else:
|
|
||||||
return {
|
|
||||||
'success': False,
|
|
||||||
'message_id': msg.msg_id,
|
|
||||||
'roomid': msg.roomid,
|
|
||||||
'sender': msg.sender,
|
|
||||||
'error': "图片下载失败"
|
|
||||||
}
|
|
||||||
else:
|
|
||||||
return {
|
|
||||||
'success': False,
|
|
||||||
'message_id': msg.msg_id,
|
|
||||||
'roomid': msg.roomid,
|
|
||||||
'sender': msg.sender,
|
|
||||||
'error': "实例不存在或消息ID无效"
|
|
||||||
}
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"图片处理出错: {msg.msg_id}, 错误: {e}")
|
|
||||||
return {
|
|
||||||
'success': False,
|
|
||||||
'message_id': msg.msg_id,
|
|
||||||
'roomid': msg.roomid,
|
|
||||||
'sender': msg.sender,
|
|
||||||
'error': str(e)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def _process_image_callback(self, future):
|
except Exception as e:
|
||||||
"""处理异步图片处理任务完成后的回调"""
|
logger.exception("图片处理出错")
|
||||||
|
return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender,
|
||||||
|
'error': "图片下载失败"}
|
||||||
|
|
||||||
|
def _process_image_done(self, future):
|
||||||
|
"""任务完成统一回调(极轻量)"""
|
||||||
try:
|
try:
|
||||||
result = future.result()
|
result = future.result()
|
||||||
if result['success']:
|
self._process_image_callback(result)
|
||||||
skipped_info = " (已存在)" if result.get('skipped') else ""
|
|
||||||
logger.info(f"图片处理成功{skipped_info}: {result['roomid']}:{result['sender']}:{result['message_id']}")
|
|
||||||
else:
|
|
||||||
error_msg = result.get('error', '未知错误')
|
|
||||||
logger.error(
|
|
||||||
f"图片处理失败: {result.get('roomid', '')}:{result.get('sender', '')}:{result.get('message_id', '')} - {error_msg}")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"处理图片回调时出错: {e}")
|
logger.error(f"处理图片回调时出错: {e}")
|
||||||
|
finally:
|
||||||
|
# ⚠️ 无论成功失败,都必须释放在途计数
|
||||||
|
with self._image_task_lock:
|
||||||
|
self._image_task_inflight -= 1
|
||||||
|
|
||||||
|
def _process_image_callback(self, result):
|
||||||
|
if result['success']:
|
||||||
|
skipped_info = " (已存在)" if result.get('skipped') else ""
|
||||||
|
logger.info(
|
||||||
|
f"图片处理成功{skipped_info}: "
|
||||||
|
f"{result['roomid']}:{result['sender']}:{result['message_id']}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.error(
|
||||||
|
f"图片处理失败: "
|
||||||
|
f"{result.get('roomid', '')}:"
|
||||||
|
f"{result.get('sender', '')}:"
|
||||||
|
f"{result.get('message_id', '')} - "
|
||||||
|
f"{result.get('error', '未知错误')}"
|
||||||
|
)
|
||||||
|
|
||||||
def _archive_callback(self, future):
|
def _archive_callback(self, future):
|
||||||
"""处理异步存档任务完成后的回调"""
|
"""处理异步存档任务完成后的回调"""
|
||||||
|
|||||||
Reference in New Issue
Block a user