加入了队列,并发8线程逻辑,解决阻塞问题。
This commit is contained in:
91
robot.py
91
robot.py
@@ -39,6 +39,14 @@ class Robot:
|
||||
self.config = config
|
||||
self.LOG = logger
|
||||
|
||||
# 添加消息队列和处理控制
|
||||
self.max_queue_size = 1000 # 设置队列最大容量
|
||||
self.message_queue = asyncio.Queue(maxsize=self.max_queue_size)
|
||||
self.message_processor_running = False
|
||||
self.message_processor_tasks = set() # 存储所有处理任务
|
||||
self.max_concurrent_tasks = 8 # 最大并发处理数
|
||||
self.processing_lock = asyncio.Lock() # 用于控制并发数
|
||||
|
||||
# wechat_ipad 相关属性
|
||||
self.ipad_bot: WechatAPIClient
|
||||
self.ipad_config = None
|
||||
@@ -141,10 +149,69 @@ class Robot:
|
||||
asyncio.set_event_loop(self.ipad_loop)
|
||||
self.ipad_loop.run_until_complete(self._wechat_ipad_core(server_ip, server_port))
|
||||
|
||||
async def _start_message_processor(self):
|
||||
"""启动消息处理器"""
|
||||
self.message_processor_running = True
|
||||
self.message_processor_tasks = set() # 存储所有处理任务
|
||||
self.LOG.info("消息处理器已启动")
|
||||
|
||||
async def _stop_message_processor(self):
|
||||
"""停止消息处理器"""
|
||||
self.message_processor_running = False
|
||||
self.message_processor_tasks.clear()
|
||||
self.LOG.info("消息处理器已停止")
|
||||
|
||||
async def _process_message_queue(self):
|
||||
"""处理消息队列中的消息"""
|
||||
while self.message_processor_running:
|
||||
try:
|
||||
# 从队列中获取消息,设置超时时间为1秒
|
||||
message = await asyncio.wait_for(self.message_queue.get(), timeout=1.0)
|
||||
|
||||
# 检查当前运行的任务数
|
||||
async with self.processing_lock:
|
||||
if len(self.message_processor_tasks) >= self.max_concurrent_tasks:
|
||||
# 如果达到最大并发数,等待一个任务完成
|
||||
self.LOG.debug(f"当前并发任务数: {len(self.message_processor_tasks)},等待任务完成")
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
|
||||
# 创建新的处理任务
|
||||
task = asyncio.create_task(self._process_single_message(message))
|
||||
self.message_processor_tasks.add(task)
|
||||
task.add_done_callback(self.message_processor_tasks.discard)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# 超时继续循环
|
||||
continue
|
||||
except Exception as e:
|
||||
self.LOG.error(f"消息处理器发生错误: {e}")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def _process_single_message(self, message):
|
||||
"""处理单个消息"""
|
||||
try:
|
||||
# 直接处理消息,移除超时限制
|
||||
await self._process_ipad_message(message)
|
||||
except Exception as e:
|
||||
self.LOG.error(f"处理消息失败: {e}")
|
||||
# 可以在这里添加重试逻辑
|
||||
try:
|
||||
# 记录失败的消息,可以考虑重试或持久化
|
||||
self.LOG.error(f"消息处理失败,消息ID: {message.msg_id}, 内容: {message.content}")
|
||||
except Exception as log_error:
|
||||
self.LOG.error(f"记录失败消息时出错: {log_error}")
|
||||
finally:
|
||||
# 标记任务完成
|
||||
self.message_queue.task_done()
|
||||
|
||||
async def _wechat_ipad_core(self, server_ip, server_port):
|
||||
"""wechat_ipad核心逻辑,基于bot-core.py"""
|
||||
try:
|
||||
self.LOG.info("启动wechat_ipad bot")
|
||||
# 启动消息处理器
|
||||
await self._start_message_processor()
|
||||
|
||||
# 调用登录接口
|
||||
self.ipad_bot = wechat_ipad.WechatAPIClient(server_ip, server_port)
|
||||
self.message_auto_revoke = MessageAutoRevoke(self.ipad_bot)
|
||||
@@ -238,24 +305,32 @@ class Robot:
|
||||
|
||||
data = data.get("AddMsgs")
|
||||
if data:
|
||||
tasks = []
|
||||
# 使用异步方式将消息放入队列
|
||||
for message in data:
|
||||
# self.LOG.debug(f"sync_message.处理消息消息内容: {message}")
|
||||
# 处理消息
|
||||
try:
|
||||
wxmsg: WxMessage = WxMessage.from_json(message)
|
||||
# 非阻塞方式将消息放入队列
|
||||
try:
|
||||
# 设置超时时间为0.1秒,避免阻塞太久
|
||||
await asyncio.wait_for(
|
||||
self.message_queue.put(wxmsg),
|
||||
timeout=0.1
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
self.LOG.warning(f"消息队列已满(当前大小: {self.message_queue.qsize()}),丢弃消息")
|
||||
except asyncio.QueueFull:
|
||||
self.LOG.warning(f"消息队列已满(当前大小: {self.message_queue.qsize()}),丢弃消息")
|
||||
except Exception as e:
|
||||
self.LOG.error(f"WxMessage.from_json 解析失败,消息内容: {message},错误: {e}")
|
||||
continue # 跳过本条消息,继续处理下一条
|
||||
tasks.append(self._process_ipad_message(wxmsg))
|
||||
if tasks:
|
||||
await asyncio.gather(*tasks)
|
||||
# 使用异步睡眠替代忙等待循环
|
||||
continue
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
except Exception as e:
|
||||
self.LOG.exception(f"wechat_ipad客户端运行出错: {e}")
|
||||
self.ipad_running = False
|
||||
finally:
|
||||
# 停止消息处理器
|
||||
await self._stop_message_processor()
|
||||
|
||||
async def _handle_ipad_login(self, wxid, device_name, device_id):
|
||||
"""处理wechat_ipad登录"""
|
||||
|
||||
Reference in New Issue
Block a user