调整图片下载逻辑,定时任务5分钟一次进行图片消息下载存档。

This commit is contained in:
liuwei
2025-12-30 09:01:28 +08:00
parent ec7ec1c363
commit 15c5971cef
3 changed files with 146 additions and 47 deletions

View File

@@ -257,3 +257,27 @@ class MessageStorageDB(BaseDBOperator):
"""
return self.execute_query(sql, tuple(params)) or []
def get_pending_image_messages(self, minutes_ago: int = 10, limit: int = 50) -> List[Dict]:
"""获取最近N分钟内未处理图片的消息image_path IS NULL
Args:
minutes_ago: 查询最近多少分钟的消息默认10分钟
limit: 每次最多处理多少条默认50条
Returns:
包含消息ID、群ID、消息XML等信息的列表
"""
sql = """
SELECT message_id, group_id, message_xml, timestamp
FROM messages
WHERE message_type = '3'
AND image_path IS NULL
AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE)
AND message_xml IS NOT NULL
AND message_xml != ''
ORDER BY timestamp ASC
LIMIT %s
"""
params = (minutes_ago, limit)
return self.execute_query(sql, params) or []

View File

@@ -141,6 +141,12 @@ def jobs(robot: Robot):
await manager.update_image_cache()
logger.info("图片缓存更新完成")
# ✅ 每2分钟处理一次待下载的图片消息串行处理避免数据库锁竞争
@async_job.every_minutes(5)
async def process_pending_images_job():
if hasattr(robot, 'message_storage') and robot.message_storage:
await robot.message_storage.process_pending_images(minutes_ago=10, batch_size=20)
if __name__ == "__main__":

View File

@@ -9,6 +9,7 @@ import imghdr
import re
from threading import Lock
from typing import Dict
from db.connection import DBConnectionManager
from db.levels_db import LevelsDBOperator
@@ -113,38 +114,46 @@ class MessageStorage:
}
def process_image(self, msg: WxMessage):
"""提交图片处理任务(同步环境,带背压)"""
if msg.msg_type != MessageType.IMAGE or not self.client:
return False
# ===== 背压:限制在途任务 =====
with self._image_task_lock:
if self._image_task_inflight >= self.MAX_IMAGE_TASKS:
logger.warning("图片任务过多,暂时丢弃一条")
return False
self._image_task_inflight += 1
future = self.image_executor.submit(self._process_image_task, msg)
future.add_done_callback(self._process_image_done)
"""图片消息已通过 archive_message 存入数据库,不再实时处理
改为定时任务批量处理,减少对主流程的影响和数据库锁竞争
"""
# 图片消息已经通过 archive_message 存入数据库
# 定时任务会定期扫描并处理未下载的图片
logger.debug(f"图片消息已记录,等待定时任务处理: msg_id={msg.msg_id}, roomid={msg.roomid}")
return True
def _process_image_task(self, msg: WxMessage):
"""实际执行图片处理的任务函数(同步高性能版)"""
def _process_image_from_db(self, db_record: Dict) -> Dict:
"""从数据库记录处理图片(用于定时任务)
Args:
db_record: 数据库记录,包含 message_id, group_id, message_xml 等
Returns:
处理结果字典
"""
message_id = db_record.get('message_id')
group_id = db_record.get('group_id', 'unknown')
xml_content = db_record.get('message_xml', '')
if not self.client or not message_id or not xml_content:
return {
'success': False,
'message_id': message_id,
'error': "缺少必要参数"
}
try:
if not self.client or not msg.msg_id:
return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender,
'error': "实例不存在或消息ID无效"}
xml_content = msg.content.clean_content
# ===== 1. 正则提取参数(替代 XML=====
aeskey_match = self._aeskey_re.search(xml_content)
cdn_match = self._cdn_re.search(xml_content)
if not aeskey_match or not cdn_match:
return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender,
'error': "XML 中未找到图片参数"}
return {
'success': False,
'message_id': message_id,
'error': "XML 中未找到图片参数"
}
aeskey = aeskey_match.group(1)
cdnthumburl = cdn_match.group(1)
@@ -157,56 +166,116 @@ class MessageStorage:
)
)
except Exception as e:
return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender,
'error': "图片下载失败"}
logger.error(f"图片下载失败 message_id={message_id}: {e}")
return {
'success': False,
'message_id': message_id,
'error': f"图片下载失败: {str(e)}"
}
if not base64_str:
return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender,
'error': "图片下载失败"}
return {
'success': False,
'message_id': message_id,
'error': "图片下载失败:返回为空"
}
# ===== 3. base64 解码 =====
try:
data = base64.b64decode(base64_str)
except Exception as e:
return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender,
'error': "图片下载失败"}
logger.error(f"图片解码失败 message_id={message_id}: {e}")
return {
'success': False,
'message_id': message_id,
'error': f"图片解码失败: {str(e)}"
}
# ===== 4. 构建路径 =====
room_id = msg.roomid or "unknown"
room_id = group_id or "unknown"
group_dir = os.path.join(self.image_dir, room_id)
os.makedirs(group_dir, exist_ok=True)
# 微信图片默认 jpg,没必要 imghdr
file_name = f"{msg.msg_id}.jpg"
# 微信图片默认 jpg
file_name = f"{message_id}.jpg"
file_path = os.path.join(group_dir, file_name)
skipped = False
web_path = None
# ===== 5. 写文件 =====
if not os.path.isfile(file_path):
skipped = False
if os.path.isfile(file_path):
skipped = True
logger.debug(f"图片文件已存在,跳过保存: {room_id}-{file_name}")
else:
with open(file_path, "wb") as f:
f.write(data)
else:
skipped = True
logger.warning(f"跳过图片保存{room_id}-{file_name}")
# ===== 6. 更新数据库(串行更新,避免锁竞争)=====
if not skipped:
# ===== 6. 更新数据库 =====
web_path = f"/static/images/{room_id}/{file_name}"
self.message_db.update_message_image_file_path(msg.msg_id, web_path)
success = self.message_db.update_message_image_file_path(message_id, web_path)
if success:
logger.debug(f"图片处理成功: message_id={message_id}, path={web_path}")
else:
logger.warning(f"图片路径更新失败: message_id={message_id}")
return {
'success': False,
'message_id': message_id,
'error': "数据库更新失败"
}
return {
"success": True,
"message_id": msg.msg_id,
"message_id": message_id,
"roomid": room_id,
"sender": msg.sender,
"file_path": web_path,
"file_path": f"/static/images/{room_id}/{file_name}" if not skipped else None,
"skipped": skipped
}
except Exception as e:
logger.exception("图片处理出错")
return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender,
'error': "图片下载失败"}
logger.exception(f"处理图片出错 message_id={message_id}")
return {
'success': False,
'message_id': message_id,
'error': f"处理出错: {str(e)}"
}
async def process_pending_images(self, minutes_ago: int = 10, batch_size: int = 20):
"""定时任务:批量处理未下载的图片消息(串行处理,避免锁竞争)
Args:
minutes_ago: 处理最近多少分钟的消息默认10分钟
batch_size: 每次处理多少条默认20条
"""
if not self.client:
logger.warning("微信客户端未初始化,跳过图片处理")
return
try:
# 查询未处理的图片消息
pending_messages = self.message_db.get_pending_image_messages(minutes_ago, batch_size)
if not pending_messages:
logger.debug(f"未发现待处理的图片消息(最近{minutes_ago}分钟)")
return
logger.info(f"开始处理 {len(pending_messages)} 条待处理图片消息")
success_count = 0
fail_count = 0
# 串行处理,避免并发更新数据库导致锁竞争
for msg_record in pending_messages:
result = self._process_image_from_db(msg_record)
if result.get('success'):
success_count += 1
else:
fail_count += 1
error = result.get('error', '未知错误')
logger.warning(f"图片处理失败 message_id={result.get('message_id')}: {error}")
logger.info(f"图片处理完成: 成功={success_count}, 失败={fail_count}, 总计={len(pending_messages)}")
except Exception as e:
logger.exception(f"定时处理图片任务出错: {e}")
def _process_image_done(self, future):
"""任务完成统一回调(极轻量)"""