From 2dc77b01e7eb240a457d2ddcee78919e1f5cc2e5 Mon Sep 17 00:00:00 2001 From: liuwei Date: Thu, 28 Aug 2025 17:16:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E4=BF=A1=E5=8F=B7=E9=87=8F?= =?UTF-8?q?=E5=B9=B6=E8=A1=8C=E6=A8=A1=E5=BC=8F=EF=BC=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- robot.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/robot.py b/robot.py index 28f53b2..7410c3a 100644 --- a/robot.py +++ b/robot.py @@ -29,6 +29,9 @@ from utils.wechat.message_to_db import MessageStorage from wechat_ipad import WechatAPIClient from wechat_ipad.models.message import WxMessage, MessageType +# 定义全局信号量,限制最大并发 10 +sem = asyncio.Semaphore(20) + class Robot: """个性化自己的机器人 @@ -240,7 +243,6 @@ class Robot: data = data.get("AddMsgs") if data: - tasks = [] for message in data: # self.LOG.debug(f"sync_message.处理消息消息内容: {message}") # 处理消息 @@ -255,16 +257,25 @@ class Robot: except Exception as e: self.LOG.error(f"WxMessage.from_json 解析失败,消息内容: {message},错误: {e}") continue # 跳过本条消息,继续处理下一条 - tasks.append(self._process_ipad_message(wxmsg)) - if tasks: - await asyncio.gather(*tasks) + # 创建独立任务,不阻塞下一条消息 + # 并发执行,限制最大并发数 + xx = asyncio.create_task(self._process_with_semaphore(wxmsg)) # 使用异步睡眠替代忙等待循环 - await asyncio.sleep(0.5) + await asyncio.sleep(2) except Exception as e: self.LOG.exception(f"wechat_ipad客户端运行出错: {e}") self.ipad_running = False + # 在类里直接写一个内联 async 方法(不额外抽取新的对外方法) + + async def _process_with_semaphore(self, wxmsg): + async with sem: + try: + await self._process_ipad_message(wxmsg) + except Exception as e: + self.LOG.error(f"处理消息失败 msg_id={wxmsg.msg_id}, 错误: {e}") + async def _handle_ipad_login(self, wxid, device_name, device_id): """处理wechat_ipad登录""" while not await self.ipad_bot.is_logged_in(wxid):