From 15c5971cefd883b2d43654c55992d97a898e202b Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 30 Dec 2025 09:01:28 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E5=9B=BE=E7=89=87=E4=B8=8B?= =?UTF-8?q?=E8=BD=BD=E9=80=BB=E8=BE=91=EF=BC=8C=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A15=E5=88=86=E9=92=9F=E4=B8=80=E6=AC=A1=E8=BF=9B?= =?UTF-8?q?=E8=A1=8C=E5=9B=BE=E7=89=87=E6=B6=88=E6=81=AF=E4=B8=8B=E8=BD=BD?= =?UTF-8?q?=E5=AD=98=E6=A1=A3=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- db/message_storage.py | 24 +++++ main.py | 6 ++ utils/wechat/message_to_db.py | 163 ++++++++++++++++++++++++---------- 3 files changed, 146 insertions(+), 47 deletions(-) diff --git a/db/message_storage.py b/db/message_storage.py index fad374e..6535d01 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -257,3 +257,27 @@ class MessageStorageDB(BaseDBOperator): """ return self.execute_query(sql, tuple(params)) or [] + + def get_pending_image_messages(self, minutes_ago: int = 10, limit: int = 50) -> List[Dict]: + """获取最近N分钟内未处理图片的消息(image_path IS NULL) + + Args: + minutes_ago: 查询最近多少分钟的消息,默认10分钟 + limit: 每次最多处理多少条,默认50条 + + Returns: + 包含消息ID、群ID、消息XML等信息的列表 + """ + sql = """ + SELECT message_id, group_id, message_xml, timestamp + FROM messages + WHERE message_type = '3' + AND image_path IS NULL + AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE) + AND message_xml IS NOT NULL + AND message_xml != '' + ORDER BY timestamp ASC + LIMIT %s + """ + params = (minutes_ago, limit) + return self.execute_query(sql, params) or [] diff --git a/main.py b/main.py index ea1fbe0..323eef9 100644 --- a/main.py +++ b/main.py @@ -141,6 +141,12 @@ def jobs(robot: Robot): await manager.update_image_cache() logger.info("图片缓存更新完成") + # ✅ 每2分钟处理一次待下载的图片消息(串行处理,避免数据库锁竞争) + @async_job.every_minutes(5) + async def process_pending_images_job(): + if hasattr(robot, 'message_storage') and robot.message_storage: + await robot.message_storage.process_pending_images(minutes_ago=10, batch_size=20) + if __name__ == "__main__": diff --git a/utils/wechat/message_to_db.py b/utils/wechat/message_to_db.py index 27b8715..ae094b0 100644 --- a/utils/wechat/message_to_db.py +++ b/utils/wechat/message_to_db.py @@ -9,6 +9,7 @@ import imghdr import re from threading import Lock +from typing import Dict from db.connection import DBConnectionManager from db.levels_db import LevelsDBOperator @@ -113,38 +114,46 @@ class MessageStorage: } def process_image(self, msg: WxMessage): - """提交图片处理任务(同步环境,带背压)""" - - if msg.msg_type != MessageType.IMAGE or not self.client: - return False - - # ===== 背压:限制在途任务 ===== - with self._image_task_lock: - if self._image_task_inflight >= self.MAX_IMAGE_TASKS: - logger.warning("图片任务过多,暂时丢弃一条") - return False - self._image_task_inflight += 1 - - future = self.image_executor.submit(self._process_image_task, msg) - future.add_done_callback(self._process_image_done) - + """图片消息已通过 archive_message 存入数据库,不再实时处理 + 改为定时任务批量处理,减少对主流程的影响和数据库锁竞争 + """ + # 图片消息已经通过 archive_message 存入数据库 + # 定时任务会定期扫描并处理未下载的图片 + logger.debug(f"图片消息已记录,等待定时任务处理: msg_id={msg.msg_id}, roomid={msg.roomid}") return True - def _process_image_task(self, msg: WxMessage): - """实际执行图片处理的任务函数(同步高性能版)""" + def _process_image_from_db(self, db_record: Dict) -> Dict: + """从数据库记录处理图片(用于定时任务) + + Args: + db_record: 数据库记录,包含 message_id, group_id, message_xml 等 + + Returns: + 处理结果字典 + """ + message_id = db_record.get('message_id') + group_id = db_record.get('group_id', 'unknown') + xml_content = db_record.get('message_xml', '') + + if not self.client or not message_id or not xml_content: + return { + 'success': False, + 'message_id': message_id, + 'error': "缺少必要参数" + } + try: - if not self.client or not msg.msg_id: - return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender, - 'error': "实例不存在或消息ID无效"} - xml_content = msg.content.clean_content - # ===== 1. 正则提取参数(替代 XML)===== aeskey_match = self._aeskey_re.search(xml_content) cdn_match = self._cdn_re.search(xml_content) if not aeskey_match or not cdn_match: - return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender, - 'error': "XML 中未找到图片参数"} + return { + 'success': False, + 'message_id': message_id, + 'error': "XML 中未找到图片参数" + } + aeskey = aeskey_match.group(1) cdnthumburl = cdn_match.group(1) @@ -157,56 +166,116 @@ class MessageStorage: ) ) except Exception as e: - return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender, - 'error': "图片下载失败"} + logger.error(f"图片下载失败 message_id={message_id}: {e}") + return { + 'success': False, + 'message_id': message_id, + 'error': f"图片下载失败: {str(e)}" + } if not base64_str: - return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender, - 'error': "图片下载失败"} + return { + 'success': False, + 'message_id': message_id, + 'error': "图片下载失败:返回为空" + } # ===== 3. base64 解码 ===== try: data = base64.b64decode(base64_str) except Exception as e: - return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender, - 'error': "图片下载失败"} + logger.error(f"图片解码失败 message_id={message_id}: {e}") + return { + 'success': False, + 'message_id': message_id, + 'error': f"图片解码失败: {str(e)}" + } # ===== 4. 构建路径 ===== - room_id = msg.roomid or "unknown" + room_id = group_id or "unknown" group_dir = os.path.join(self.image_dir, room_id) os.makedirs(group_dir, exist_ok=True) - # 微信图片默认 jpg,没必要 imghdr - file_name = f"{msg.msg_id}.jpg" + # 微信图片默认 jpg + file_name = f"{message_id}.jpg" file_path = os.path.join(group_dir, file_name) - skipped = False - web_path = None # ===== 5. 写文件 ===== - if not os.path.isfile(file_path): + skipped = False + if os.path.isfile(file_path): + skipped = True + logger.debug(f"图片文件已存在,跳过保存: {room_id}-{file_name}") + else: with open(file_path, "wb") as f: f.write(data) - else: - skipped = True - logger.warning(f"跳过图片保存{room_id}-{file_name}") + + # ===== 6. 更新数据库(串行更新,避免锁竞争)===== if not skipped: - # ===== 6. 更新数据库 ===== web_path = f"/static/images/{room_id}/{file_name}" - self.message_db.update_message_image_file_path(msg.msg_id, web_path) + success = self.message_db.update_message_image_file_path(message_id, web_path) + if success: + logger.debug(f"图片处理成功: message_id={message_id}, path={web_path}") + else: + logger.warning(f"图片路径更新失败: message_id={message_id}") + return { + 'success': False, + 'message_id': message_id, + 'error': "数据库更新失败" + } return { "success": True, - "message_id": msg.msg_id, + "message_id": message_id, "roomid": room_id, - "sender": msg.sender, - "file_path": web_path, + "file_path": f"/static/images/{room_id}/{file_name}" if not skipped else None, "skipped": skipped } except Exception as e: - logger.exception("图片处理出错") - return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender, - 'error': "图片下载失败"} + logger.exception(f"处理图片出错 message_id={message_id}") + return { + 'success': False, + 'message_id': message_id, + 'error': f"处理出错: {str(e)}" + } + + async def process_pending_images(self, minutes_ago: int = 10, batch_size: int = 20): + """定时任务:批量处理未下载的图片消息(串行处理,避免锁竞争) + + Args: + minutes_ago: 处理最近多少分钟的消息,默认10分钟 + batch_size: 每次处理多少条,默认20条 + """ + if not self.client: + logger.warning("微信客户端未初始化,跳过图片处理") + return + + try: + # 查询未处理的图片消息 + pending_messages = self.message_db.get_pending_image_messages(minutes_ago, batch_size) + + if not pending_messages: + logger.debug(f"未发现待处理的图片消息(最近{minutes_ago}分钟)") + return + + logger.info(f"开始处理 {len(pending_messages)} 条待处理图片消息") + success_count = 0 + fail_count = 0 + + # 串行处理,避免并发更新数据库导致锁竞争 + for msg_record in pending_messages: + result = self._process_image_from_db(msg_record) + if result.get('success'): + success_count += 1 + else: + fail_count += 1 + error = result.get('error', '未知错误') + logger.warning(f"图片处理失败 message_id={result.get('message_id')}: {error}") + + logger.info(f"图片处理完成: 成功={success_count}, 失败={fail_count}, 总计={len(pending_messages)}") + + except Exception as e: + logger.exception(f"定时处理图片任务出错: {e}") def _process_image_done(self, future): """任务完成统一回调(极轻量)"""