import asyncio import json from typing import Optional, Awaitable, Callable import websockets from loguru import logger class GsCoreClient: def __init__(self): self._ws = None self._url: str = "" self._recv_task: Optional[asyncio.Task] = None self._message_handler: Optional[Callable[[dict], Awaitable[None]]] = None self._is_closing = False # 用于区分是“意外断开”还是“主动关闭” def configure(self, url: str, handler: Callable[[dict], Awaitable[None]]): self._url = url self._message_handler = handler async def connect(self): if not self._url: logger.warning("[GsCoreClient] gscore_url 未配置,跳过连接") return False if self._ws is not None: return True try: logger.info(f"[GsCoreClient] 正在连接早柚核心: {self._url}") self._ws = await websockets.connect(self._url, max_size=10 ** 7) self._is_closing = False # 确保旧任务已清理 if self._recv_task and not self._recv_task.done(): self._recv_task.cancel() loop = asyncio.get_running_loop() self._recv_task = loop.create_task(self._recv_loop()) logger.success("[GsCoreClient] 连接成功") return True except Exception as e: logger.error(f"[GsCoreClient] 连接早柚核心失败: {e}") self._ws = None return False async def _recv_loop(self): """接收消息循环,包含自动重连触发""" while not self._is_closing: try: if not self._ws: break msg = await self._ws.recv() msg_str = msg.decode("utf-8") if isinstance(msg, bytes) else msg try: payload = json.loads(msg_str) if self._message_handler: await self._message_handler(payload) except Exception as e: logger.exception(f"[GsCoreClient] 解析或处理核心返回失败: {e}") except websockets.exceptions.ConnectionClosed as e: if not self._is_closing: logger.warning(f"[GsCoreClient] WebSocket连接意外断开 ({e.code}),5秒后尝试重连...") break # 跳出循环,触发下方的重连 else: break except Exception as e: logger.exception(f"[GsCoreClient] 接收消息异常: {e}") break await asyncio.sleep(0) # 退出循环后的重连逻辑 if not self._is_closing: self._ws = None await asyncio.sleep(5) await self.connect() async def send(self, message: str) -> bool: """发送消息,支持失败重连一次""" if not self._ws: if not await self.connect(): return False try: await self._ws.send(message.encode("utf-8")) return True except (websockets.exceptions.ConnectionClosed, Exception) as e: logger.error(f"[GsCoreClient] 发送失败 ({e}),尝试重连并重发...") # 立即清理并重连 if await self.reconnect(): try: # 重连成功后重试发送 await self._ws.send(message.encode("utf-8")) return True except Exception as re_e: logger.error(f"[GsCoreClient] 重连后发送依然失败: {re_e}") return False async def reconnect(self): logger.info("[GsCoreClient] 正在执行重连流程...") await self.close(manual=False) # 标记为非手动关闭,允许 recv_loop 重启 return await self.connect() async def close(self, manual: bool = True): """关闭连接。manual=True 表示人为停止,不再自动重连""" self._is_closing = manual if self._recv_task: self._recv_task.cancel() self._recv_task = None if self._ws: try: await self._ws.close() except Exception: pass self._ws = None if manual: logger.info("[GsCoreClient] 连接已手动关闭") gs_core_client = GsCoreClient()