import asyncio import json from typing import Optional, Awaitable, Callable import websockets from loguru import logger class GsCoreClient: def __init__(self): self._ws = None self._url: str = "" self._token: str = "" self._recv_task: Optional[asyncio.Task] = None self._message_handler: Optional[Callable[[dict], Awaitable[None]]] = None self._is_closing = False def configure(self, url: str, handler: Callable[[dict], Awaitable[None]], token: str = "liuwei"): self._url = url self._message_handler = handler self._token = token async def connect(self): if not self._url: logger.warning("[GsCoreClient] gscore_url 未配置,跳过连接") return False if self._ws is not None: return True try: logger.info(f"[GsCoreClient] 正在连接早柚核心: {self._url}") # 构造 Header headers = { "Authorization": f"Bearer {self._token}", "Token": self._token } # 修复点:确保 extra_headers 正确传递 self._ws = await websockets.connect( self._url, extra_headers=headers, max_size=10 ** 7 ) self._is_closing = False if self._recv_task and not self._recv_task.done(): self._recv_task.cancel() loop = asyncio.get_running_loop() self._recv_task = loop.create_task(self._recv_loop()) logger.success("[GsCoreClient] 连接成功且鉴权通过") return True except Exception as e: # 如果依然报错,可能是 websockets 版本过旧,建议升级: pip install --upgrade websockets logger.error(f"[GsCoreClient] 连接或鉴权失败: {e}") self._ws = None return False async def _recv_loop(self): while not self._is_closing: try: if not self._ws: break msg = await self._ws.recv() msg_str = msg.decode("utf-8") if isinstance(msg, bytes) else msg try: payload = json.loads(msg_str) if self._message_handler: await self._message_handler(payload) except Exception as e: logger.exception(f"[GsCoreClient] 解析核心返回失败: {e}") except websockets.exceptions.ConnectionClosed as e: if not self._is_closing: logger.warning(f"[GsCoreClient] WebSocket连接断开 ({e.code}),5秒后重连...") break else: break except Exception as e: logger.exception(f"[GsCoreClient] 接收消息异常: {e}") break await asyncio.sleep(0.1) if not self._is_closing: self._ws = None await asyncio.sleep(5) await self.connect() async def send(self, message: str) -> bool: if not self._ws: if not await self.connect(): return False try: await self._ws.send(message.encode("utf-8")) return True except (websockets.exceptions.ConnectionClosed, Exception) as e: logger.error(f"[GsCoreClient] 发送失败,尝试重连: {e}") if await self.reconnect(): try: await self._ws.send(message.encode("utf-8")) return True except Exception: pass return False async def reconnect(self): await self.close(manual=False) return await self.connect() async def close(self, manual: bool = True): self._is_closing = manual if self._recv_task: self._recv_task.cancel() self._recv_task = None if self._ws: try: await self._ws.close() except Exception: pass self._ws = None gs_core_client = GsCoreClient()