diff --git a/robot.py b/robot.py index 31d52ff..48e310d 100644 --- a/robot.py +++ b/robot.py @@ -39,14 +39,6 @@ class Robot: self.config = config self.LOG = logger - # 添加消息队列和处理控制 - self.max_queue_size = 1000 # 设置队列最大容量 - self.message_queue = asyncio.Queue(maxsize=self.max_queue_size) - self.message_processor_running = False - self.message_processor_tasks = set() # 存储所有处理任务 - self.max_concurrent_tasks = 8 # 最大并发处理数 - self.processing_lock = asyncio.Lock() # 用于控制并发数 - # wechat_ipad 相关属性 self.ipad_bot: WechatAPIClient self.ipad_config = None @@ -149,69 +141,10 @@ class Robot: asyncio.set_event_loop(self.ipad_loop) self.ipad_loop.run_until_complete(self._wechat_ipad_core(server_ip, server_port)) - async def _start_message_processor(self): - """启动消息处理器""" - self.message_processor_running = True - self.message_processor_tasks = set() # 存储所有处理任务 - self.LOG.info("消息处理器已启动") - - async def _stop_message_processor(self): - """停止消息处理器""" - self.message_processor_running = False - self.message_processor_tasks.clear() - self.LOG.info("消息处理器已停止") - - async def _process_message_queue(self): - """处理消息队列中的消息""" - while self.message_processor_running: - try: - # 从队列中获取消息,设置超时时间为1秒 - message = await asyncio.wait_for(self.message_queue.get(), timeout=1.0) - - # 检查当前运行的任务数 - async with self.processing_lock: - if len(self.message_processor_tasks) >= self.max_concurrent_tasks: - # 如果达到最大并发数,等待一个任务完成 - self.LOG.debug(f"当前并发任务数: {len(self.message_processor_tasks)},等待任务完成") - await asyncio.sleep(0.1) - continue - - # 创建新的处理任务 - task = asyncio.create_task(self._process_single_message(message)) - self.message_processor_tasks.add(task) - task.add_done_callback(self.message_processor_tasks.discard) - - except asyncio.TimeoutError: - # 超时继续循环 - continue - except Exception as e: - self.LOG.error(f"消息处理器发生错误: {e}") - await asyncio.sleep(1) - - async def _process_single_message(self, message): - """处理单个消息""" - try: - # 直接处理消息,移除超时限制 - await self._process_ipad_message(message) - except Exception as e: - self.LOG.error(f"处理消息失败: {e}") - # 可以在这里添加重试逻辑 - try: - # 记录失败的消息,可以考虑重试或持久化 - self.LOG.error(f"消息处理失败,消息ID: {message.msg_id}, 内容: {message.content}") - except Exception as log_error: - self.LOG.error(f"记录失败消息时出错: {log_error}") - finally: - # 标记任务完成 - self.message_queue.task_done() - async def _wechat_ipad_core(self, server_ip, server_port): """wechat_ipad核心逻辑,基于bot-core.py""" try: self.LOG.info("启动wechat_ipad bot") - # 启动消息处理器 - await self._start_message_processor() - # 调用登录接口 self.ipad_bot = wechat_ipad.WechatAPIClient(server_ip, server_port) self.message_auto_revoke = MessageAutoRevoke(self.ipad_bot) @@ -305,32 +238,24 @@ class Robot: data = data.get("AddMsgs") if data: - # 使用异步方式将消息放入队列 + tasks = [] for message in data: + # self.LOG.debug(f"sync_message.处理消息消息内容: {message}") + # 处理消息 try: wxmsg: WxMessage = WxMessage.from_json(message) - # 非阻塞方式将消息放入队列 - try: - # 设置超时时间为0.1秒,避免阻塞太久 - await asyncio.wait_for( - self.message_queue.put(wxmsg), - timeout=0.1 - ) - except asyncio.TimeoutError: - self.LOG.warning(f"消息队列已满(当前大小: {self.message_queue.qsize()}),丢弃消息") - except asyncio.QueueFull: - self.LOG.warning(f"消息队列已满(当前大小: {self.message_queue.qsize()}),丢弃消息") except Exception as e: self.LOG.error(f"WxMessage.from_json 解析失败,消息内容: {message},错误: {e}") - continue + continue # 跳过本条消息,继续处理下一条 + tasks.append(self._process_ipad_message(wxmsg)) + if tasks: + await asyncio.gather(*tasks) + # 使用异步睡眠替代忙等待循环 await asyncio.sleep(0.5) except Exception as e: self.LOG.exception(f"wechat_ipad客户端运行出错: {e}") self.ipad_running = False - finally: - # 停止消息处理器 - await self._stop_message_processor() async def _handle_ipad_login(self, wxid, device_name, device_id): """处理wechat_ipad登录"""