From 21e876c9376a09b6531055a255c054973c597e1c Mon Sep 17 00:00:00 2001 From: liuwei Date: Fri, 6 Jun 2025 10:02:12 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E4=BA=86=E9=98=9F=E5=88=97?= =?UTF-8?q?=EF=BC=8C=E5=B9=B6=E5=8F=918=E7=BA=BF=E7=A8=8B=E9=80=BB?= =?UTF-8?q?=E8=BE=91=EF=BC=8C=E8=A7=A3=E5=86=B3=E9=98=BB=E5=A1=9E=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- robot.py | 91 +++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 83 insertions(+), 8 deletions(-) diff --git a/robot.py b/robot.py index 48e310d..31d52ff 100644 --- a/robot.py +++ b/robot.py @@ -39,6 +39,14 @@ class Robot: self.config = config self.LOG = logger + # 添加消息队列和处理控制 + self.max_queue_size = 1000 # 设置队列最大容量 + self.message_queue = asyncio.Queue(maxsize=self.max_queue_size) + self.message_processor_running = False + self.message_processor_tasks = set() # 存储所有处理任务 + self.max_concurrent_tasks = 8 # 最大并发处理数 + self.processing_lock = asyncio.Lock() # 用于控制并发数 + # wechat_ipad 相关属性 self.ipad_bot: WechatAPIClient self.ipad_config = None @@ -141,10 +149,69 @@ class Robot: asyncio.set_event_loop(self.ipad_loop) self.ipad_loop.run_until_complete(self._wechat_ipad_core(server_ip, server_port)) + async def _start_message_processor(self): + """启动消息处理器""" + self.message_processor_running = True + self.message_processor_tasks = set() # 存储所有处理任务 + self.LOG.info("消息处理器已启动") + + async def _stop_message_processor(self): + """停止消息处理器""" + self.message_processor_running = False + self.message_processor_tasks.clear() + self.LOG.info("消息处理器已停止") + + async def _process_message_queue(self): + """处理消息队列中的消息""" + while self.message_processor_running: + try: + # 从队列中获取消息,设置超时时间为1秒 + message = await asyncio.wait_for(self.message_queue.get(), timeout=1.0) + + # 检查当前运行的任务数 + async with self.processing_lock: + if len(self.message_processor_tasks) >= self.max_concurrent_tasks: + # 如果达到最大并发数,等待一个任务完成 + self.LOG.debug(f"当前并发任务数: {len(self.message_processor_tasks)},等待任务完成") + await asyncio.sleep(0.1) + continue + + # 创建新的处理任务 + task = asyncio.create_task(self._process_single_message(message)) + self.message_processor_tasks.add(task) + task.add_done_callback(self.message_processor_tasks.discard) + + except asyncio.TimeoutError: + # 超时继续循环 + continue + except Exception as e: + self.LOG.error(f"消息处理器发生错误: {e}") + await asyncio.sleep(1) + + async def _process_single_message(self, message): + """处理单个消息""" + try: + # 直接处理消息,移除超时限制 + await self._process_ipad_message(message) + except Exception as e: + self.LOG.error(f"处理消息失败: {e}") + # 可以在这里添加重试逻辑 + try: + # 记录失败的消息,可以考虑重试或持久化 + self.LOG.error(f"消息处理失败,消息ID: {message.msg_id}, 内容: {message.content}") + except Exception as log_error: + self.LOG.error(f"记录失败消息时出错: {log_error}") + finally: + # 标记任务完成 + self.message_queue.task_done() + async def _wechat_ipad_core(self, server_ip, server_port): """wechat_ipad核心逻辑,基于bot-core.py""" try: self.LOG.info("启动wechat_ipad bot") + # 启动消息处理器 + await self._start_message_processor() + # 调用登录接口 self.ipad_bot = wechat_ipad.WechatAPIClient(server_ip, server_port) self.message_auto_revoke = MessageAutoRevoke(self.ipad_bot) @@ -238,24 +305,32 @@ class Robot: data = data.get("AddMsgs") if data: - tasks = [] + # 使用异步方式将消息放入队列 for message in data: - # self.LOG.debug(f"sync_message.处理消息消息内容: {message}") - # 处理消息 try: wxmsg: WxMessage = WxMessage.from_json(message) + # 非阻塞方式将消息放入队列 + try: + # 设置超时时间为0.1秒,避免阻塞太久 + await asyncio.wait_for( + self.message_queue.put(wxmsg), + timeout=0.1 + ) + except asyncio.TimeoutError: + self.LOG.warning(f"消息队列已满(当前大小: {self.message_queue.qsize()}),丢弃消息") + except asyncio.QueueFull: + self.LOG.warning(f"消息队列已满(当前大小: {self.message_queue.qsize()}),丢弃消息") except Exception as e: self.LOG.error(f"WxMessage.from_json 解析失败,消息内容: {message},错误: {e}") - continue # 跳过本条消息,继续处理下一条 - tasks.append(self._process_ipad_message(wxmsg)) - if tasks: - await asyncio.gather(*tasks) - # 使用异步睡眠替代忙等待循环 + continue await asyncio.sleep(0.5) except Exception as e: self.LOG.exception(f"wechat_ipad客户端运行出错: {e}") self.ipad_running = False + finally: + # 停止消息处理器 + await self._stop_message_processor() async def _handle_ipad_login(self, wxid, device_name, device_id): """处理wechat_ipad登录"""