测试信号量并行模式,
This commit is contained in:
21
robot.py
21
robot.py
@@ -29,6 +29,9 @@ from utils.wechat.message_to_db import MessageStorage
|
||||
from wechat_ipad import WechatAPIClient
|
||||
from wechat_ipad.models.message import WxMessage, MessageType
|
||||
|
||||
# 定义全局信号量,限制最大并发 10
|
||||
sem = asyncio.Semaphore(20)
|
||||
|
||||
|
||||
class Robot:
|
||||
"""个性化自己的机器人
|
||||
@@ -240,7 +243,6 @@ class Robot:
|
||||
|
||||
data = data.get("AddMsgs")
|
||||
if data:
|
||||
tasks = []
|
||||
for message in data:
|
||||
# self.LOG.debug(f"sync_message.处理消息消息内容: {message}")
|
||||
# 处理消息
|
||||
@@ -255,16 +257,25 @@ class Robot:
|
||||
except Exception as e:
|
||||
self.LOG.error(f"WxMessage.from_json 解析失败,消息内容: {message},错误: {e}")
|
||||
continue # 跳过本条消息,继续处理下一条
|
||||
tasks.append(self._process_ipad_message(wxmsg))
|
||||
if tasks:
|
||||
await asyncio.gather(*tasks)
|
||||
# 创建独立任务,不阻塞下一条消息
|
||||
# 并发执行,限制最大并发数
|
||||
xx = asyncio.create_task(self._process_with_semaphore(wxmsg))
|
||||
# 使用异步睡眠替代忙等待循环
|
||||
await asyncio.sleep(0.5)
|
||||
await asyncio.sleep(2)
|
||||
|
||||
except Exception as e:
|
||||
self.LOG.exception(f"wechat_ipad客户端运行出错: {e}")
|
||||
self.ipad_running = False
|
||||
|
||||
# 在类里直接写一个内联 async 方法(不额外抽取新的对外方法)
|
||||
|
||||
async def _process_with_semaphore(self, wxmsg):
|
||||
async with sem:
|
||||
try:
|
||||
await self._process_ipad_message(wxmsg)
|
||||
except Exception as e:
|
||||
self.LOG.error(f"处理消息失败 msg_id={wxmsg.msg_id}, 错误: {e}")
|
||||
|
||||
async def _handle_ipad_login(self, wxid, device_name, device_id):
|
||||
"""处理wechat_ipad登录"""
|
||||
while not await self.ipad_bot.is_logged_in(wxid):
|
||||
|
||||
Reference in New Issue
Block a user