From a05b0d03b6839d80ee6a9d9ac30170a842b9f7c7 Mon Sep 17 00:00:00 2001 From: liuwei Date: Mon, 2 Feb 2026 10:22:16 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=B8=80=E4=B8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/gscore_client.py | 80 ++++++++++++++++++++++++++++-------------- 1 file changed, 53 insertions(+), 27 deletions(-) diff --git a/utils/gscore_client.py b/utils/gscore_client.py index 2c92743..70b3fff 100644 --- a/utils/gscore_client.py +++ b/utils/gscore_client.py @@ -12,6 +12,7 @@ class GsCoreClient: self._url: str = "" self._recv_task: Optional[asyncio.Task] = None self._message_handler: Optional[Callable[[dict], Awaitable[None]]] = None + self._is_closing = False # 用于区分是“意外断开”还是“主动关闭” def configure(self, url: str, handler: Callable[[dict], Awaitable[None]]): self._url = url @@ -24,10 +25,17 @@ class GsCoreClient: if self._ws is not None: return True try: - logger.info(f"[GsCoreClient] 连接早柚核心: {self._url}") - self._ws = await websockets.connect(self._url, max_size=10**7) + logger.info(f"[GsCoreClient] 正在连接早柚核心: {self._url}") + self._ws = await websockets.connect(self._url, max_size=10 ** 7) + self._is_closing = False + + # 确保旧任务已清理 + if self._recv_task and not self._recv_task.done(): + self._recv_task.cancel() + loop = asyncio.get_running_loop() self._recv_task = loop.create_task(self._recv_loop()) + logger.success("[GsCoreClient] 连接成功") return True except Exception as e: logger.error(f"[GsCoreClient] 连接早柚核心失败: {e}") @@ -35,50 +43,67 @@ class GsCoreClient: return False async def _recv_loop(self): - if not self._ws: - return - while True: + """接收消息循环,包含自动重连触发""" + while not self._is_closing: try: + if not self._ws: + break + msg = await self._ws.recv() - if isinstance(msg, bytes): - msg_str = msg.decode("utf-8") - else: - msg_str = msg + msg_str = msg.decode("utf-8") if isinstance(msg, bytes) else msg + try: payload = json.loads(msg_str) - except Exception as e: - logger.exception(f"[GsCoreClient] 解析核心返回失败: {e}") - continue - if self._message_handler: - try: + if self._message_handler: await self._message_handler(payload) - except Exception as e: - logger.exception(f"[GsCoreClient] 处理核心返回失败: {e}") + except Exception as e: + logger.exception(f"[GsCoreClient] 解析或处理核心返回失败: {e}") + except websockets.exceptions.ConnectionClosed as e: - logger.warning(f"[GsCoreClient] WebSocket连接关闭: {e}") - break + if not self._is_closing: + logger.warning(f"[GsCoreClient] WebSocket连接意外断开 ({e.code}),5秒后尝试重连...") + break # 跳出循环,触发下方的重连 + else: + break except Exception as e: logger.exception(f"[GsCoreClient] 接收消息异常: {e}") break await asyncio.sleep(0) + # 退出循环后的重连逻辑 + if not self._is_closing: + self._ws = None + await asyncio.sleep(5) + await self.connect() + async def send(self, message: str) -> bool: + """发送消息,支持失败重连一次""" if not self._ws: - ok = await self.connect() - if not ok: + if not await self.connect(): return False try: await self._ws.send(message.encode("utf-8")) return True - except Exception as e: - logger.error(f"[GsCoreClient] 发送消息失败: {e}") + except (websockets.exceptions.ConnectionClosed, Exception) as e: + logger.error(f"[GsCoreClient] 发送失败 ({e}),尝试重连并重发...") + # 立即清理并重连 + if await self.reconnect(): + try: + # 重连成功后重试发送 + await self._ws.send(message.encode("utf-8")) + return True + except Exception as re_e: + logger.error(f"[GsCoreClient] 重连后发送依然失败: {re_e}") return False async def reconnect(self): - await self.close() - await self.connect() + logger.info("[GsCoreClient] 正在执行重连流程...") + await self.close(manual=False) # 标记为非手动关闭,允许 recv_loop 重启 + return await self.connect() - async def close(self): + async def close(self, manual: bool = True): + """关闭连接。manual=True 表示人为停止,不再自动重连""" + self._is_closing = manual if self._recv_task: self._recv_task.cancel() self._recv_task = None @@ -88,7 +113,8 @@ class GsCoreClient: except Exception: pass self._ws = None + if manual: + logger.info("[GsCoreClient] 连接已手动关闭") -gs_core_client = GsCoreClient() - +gs_core_client = GsCoreClient() \ No newline at end of file