From ad98fe6d7725b4e12a197c47b78ea68a08f9c59e Mon Sep 17 00:00:00 2001 From: liuwei Date: Thu, 25 Dec 2025 17:41:54 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9B=BE=E7=89=87=E4=BF=9D=E5=AD=98=E6=80=A7?= =?UTF-8?q?=E8=83=BD=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/wechat/message_to_db.py | 207 ++++++++++++++++++++-------------- 1 file changed, 121 insertions(+), 86 deletions(-) diff --git a/utils/wechat/message_to_db.py b/utils/wechat/message_to_db.py index 0e91a14..75e40d5 100644 --- a/utils/wechat/message_to_db.py +++ b/utils/wechat/message_to_db.py @@ -7,6 +7,9 @@ import os import base64 import imghdr +import re +from threading import Lock + from db.connection import DBConnectionManager from db.levels_db import LevelsDBOperator from db.message_storage import MessageStorageDB @@ -41,8 +44,20 @@ class MessageStorage: self.client = client # 图片处理相关初始化 - self.image_executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) # 专用于图片处理的线程池 + self.image_executor = concurrent.futures.ThreadPoolExecutor(max_workers=8) # 专用于图片处理的线程池 self.image_tasks = [] + # 图片任务在途控制 + self._image_task_inflight = 0 + self._image_task_lock = Lock() + self.MAX_IMAGE_TASKS = 50 # 可调,20~100 之间 + + # 事件循环(只创建一次,替代 asyncio.run) + self._image_loop = asyncio.new_event_loop() + + # 正则(替代 XML 解析) + self._aeskey_re = re.compile(r'aeskey="(.*?)"') + self._cdn_re = re.compile(r'cdnthumburl="(.*?)"') + # 修改为项目根目录下的 static/images self.image_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static", "images") # 确保图片存储目录存在 @@ -98,107 +113,127 @@ class MessageStorage: } def process_image(self, msg: WxMessage): - """异步处理图片消息,与消息存档分离""" - if msg.msg_type != MessageType.IMAGE or not self.client: # 不是图片消息或没有client实例 + """提交图片处理任务(同步环境,带背压)""" + + if msg.msg_type != MessageType.IMAGE or not self.client: return False - # 提交任务到图片处理线程池 + # ===== 背压:限制在途任务 ===== + with self._image_task_lock: + if self._image_task_inflight >= self.MAX_IMAGE_TASKS: + logger.warning("图片任务过多,暂时丢弃一条") + return False + self._image_task_inflight += 1 + future = self.image_executor.submit(self._process_image_task, msg) - # 添加回调函数 - future.add_done_callback(self._process_image_callback) - # 将任务添加到待处理列表 - self.image_tasks.append(future) - # 清理已完成的任务 - self._cleanup_completed_tasks() + future.add_done_callback(self._process_image_done) + return True def _process_image_task(self, msg: WxMessage): - """实际执行图片处理的任务函数""" + """实际执行图片处理的任务函数(同步高性能版)""" try: - # 使用wcf下载图片,确保图片存在 - if self.client and msg.msg_id: - # 从msg中提取xml内容,获取xml里面的参数 - xml_content = msg.content.clean_content + if not self.client or not msg.msg_id: + return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender, + 'error': "实例不存在或消息ID无效"} + xml_content = msg.content.clean_content - root = ET.fromstring(xml_content) - img_elem = root.find("img") - if img_elem is not None: - aeskey = img_elem.attrib.get("aeskey", "") - cdnthumburl = img_elem.attrib.get("cdnthumburl", "") - base64_str = asyncio.run(self.client.download_image(aeskey=aeskey, cdnmidimgurl=cdnthumburl)) - if base64_str: - group_dir = os.path.join(self.image_dir, msg.roomid or "unknown") - if not os.path.exists(group_dir): - os.makedirs(group_dir, exist_ok=True) - data = base64.b64decode(base64_str) - kind = imghdr.what(None, h=data) - ext = "jpg" if kind == "jpeg" else (kind if kind else "png") - file_name = f"{msg.msg_id}.{ext}" - file_path = os.path.join(group_dir, file_name) - skipped = False - if not os.path.exists(file_path): - try: - with open(file_path, "wb") as f: - f.write(data) - except Exception as e: - return { - 'success': False, - 'message_id': msg.msg_id, - 'roomid': msg.roomid, - 'sender': msg.sender, - 'error': f"图片保存失败: {e}" - } - else: - skipped = True - web_path = f"/static/images/{msg.roomid}/{file_name}" - self.message_db.update_message_image_file_path(msg.msg_id, web_path) - return { - 'success': True, - 'message_id': msg.msg_id, - 'roomid': msg.roomid, - 'sender': msg.sender, - 'file_path': web_path, - 'skipped': skipped - } - else: - return { - 'success': False, - 'message_id': msg.msg_id, - 'roomid': msg.roomid, - 'sender': msg.sender, - 'error': "图片下载失败" - } + # ===== 1. 正则提取参数(替代 XML)===== + aeskey_match = self._aeskey_re.search(xml_content) + cdn_match = self._cdn_re.search(xml_content) + + if not aeskey_match or not cdn_match: + return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender, + 'error': "XML 中未找到图片参数"} + aeskey = aeskey_match.group(1) + cdnthumburl = cdn_match.group(1) + + # ===== 2. 下载图片(复用事件循环)===== + try: + base64_str = self._image_loop.run_until_complete( + self.client.download_image( + aeskey=aeskey, + cdnmidimgurl=cdnthumburl + ) + ) + except Exception as e: + return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender, + 'error': "图片下载失败"} + + if not base64_str: + return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender, + 'error': "图片下载失败"} + + # ===== 3. base64 解码 ===== + try: + data = base64.b64decode(base64_str) + except Exception as e: + return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender, + 'error': "图片下载失败"} + + # ===== 4. 构建路径 ===== + room_id = msg.roomid or "unknown" + group_dir = os.path.join(self.image_dir, room_id) + os.makedirs(group_dir, exist_ok=True) + + # 微信图片默认 jpg,没必要 imghdr + file_name = f"{msg.msg_id}.jpg" + file_path = os.path.join(group_dir, file_name) + + skipped = False + + # ===== 5. 写文件 ===== + if not os.path.isfile(file_path): + with open(file_path, "wb") as f: + f.write(data) else: - return { - 'success': False, - 'message_id': msg.msg_id, - 'roomid': msg.roomid, - 'sender': msg.sender, - 'error': "实例不存在或消息ID无效" - } - except Exception as e: - logger.error(f"图片处理出错: {msg.msg_id}, 错误: {e}") + skipped = True + + # ===== 6. 更新数据库 ===== + web_path = f"/static/images/{room_id}/{file_name}" + self.message_db.update_message_image_file_path(msg.msg_id, web_path) + return { - 'success': False, - 'message_id': msg.msg_id, - 'roomid': msg.roomid, - 'sender': msg.sender, - 'error': str(e) + "success": True, + "message_id": msg.msg_id, + "roomid": room_id, + "sender": msg.sender, + "file_path": web_path, + "skipped": skipped } - def _process_image_callback(self, future): - """处理异步图片处理任务完成后的回调""" + except Exception as e: + logger.exception("图片处理出错") + return {'success': False, 'message_id': msg.msg_id, 'roomid': msg.roomid, 'sender': msg.sender, + 'error': "图片下载失败"} + + def _process_image_done(self, future): + """任务完成统一回调(极轻量)""" try: result = future.result() - if result['success']: - skipped_info = " (已存在)" if result.get('skipped') else "" - logger.info(f"图片处理成功{skipped_info}: {result['roomid']}:{result['sender']}:{result['message_id']}") - else: - error_msg = result.get('error', '未知错误') - logger.error( - f"图片处理失败: {result.get('roomid', '')}:{result.get('sender', '')}:{result.get('message_id', '')} - {error_msg}") + self._process_image_callback(result) except Exception as e: logger.error(f"处理图片回调时出错: {e}") + finally: + # ⚠️ 无论成功失败,都必须释放在途计数 + with self._image_task_lock: + self._image_task_inflight -= 1 + + def _process_image_callback(self, result): + if result['success']: + skipped_info = " (已存在)" if result.get('skipped') else "" + logger.info( + f"图片处理成功{skipped_info}: " + f"{result['roomid']}:{result['sender']}:{result['message_id']}" + ) + else: + logger.error( + f"图片处理失败: " + f"{result.get('roomid', '')}:" + f"{result.get('sender', '')}:" + f"{result.get('message_id', '')} - " + f"{result.get('error', '未知错误')}" + ) def _archive_callback(self, future): """处理异步存档任务完成后的回调"""