Files
abot/utils/gscore_client.py
2026-02-02 10:35:41 +08:00

124 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
from typing import Optional, Awaitable, Callable
import websockets
from loguru import logger
class GsCoreClient:
def __init__(self):
self._ws = None
self._url: str = ""
self._token: str = "" # 新增 token 存储
self._recv_task: Optional[asyncio.Task] = None
self._message_handler: Optional[Callable[[dict], Awaitable[None]]] = None
self._is_closing = False
def configure(self, url: str, handler: Callable[[dict], Awaitable[None]], token: str = "liuwei"):
"""配置连接信息,默认 token 为 liuwei"""
self._url = url
self._message_handler = handler
self._token = token
async def connect(self):
if not self._url:
logger.warning("[GsCoreClient] gscore_url 未配置,跳过连接")
return False
if self._ws is not None:
return True
try:
logger.info(f"[GsCoreClient] 正在连接早柚核心: {self._url}")
# 在 Header 中携带 Token 进行鉴权
headers = {
"Authorization": f"Bearer {self._token}",
"Token": self._token # 兼容某些直接校验 Token 字段的服务端
}
self._ws = await websockets.connect(
self._url,
max_size=10 ** 7,
extra_headers=headers
)
self._is_closing = False
if self._recv_task and not self._recv_task.done():
self._recv_task.cancel()
loop = asyncio.get_running_loop()
self._recv_task = loop.create_task(self._recv_loop())
logger.success("[GsCoreClient] 连接成功且鉴权通过")
return True
except Exception as e:
logger.error(f"[GsCoreClient] 连接或鉴权失败: {e}")
self._ws = None
return False
async def _recv_loop(self):
while not self._is_closing:
try:
if not self._ws:
break
msg = await self._ws.recv()
msg_str = msg.decode("utf-8") if isinstance(msg, bytes) else msg
try:
payload = json.loads(msg_str)
if self._message_handler:
await self._message_handler(payload)
except Exception as e:
logger.exception(f"[GsCoreClient] 解析核心返回失败: {e}")
except websockets.exceptions.ConnectionClosed as e:
if not self._is_closing:
logger.warning(f"[GsCoreClient] WebSocket连接断开 ({e.code})5秒后重连...")
break
else:
break
except Exception as e:
logger.exception(f"[GsCoreClient] 接收消息异常: {e}")
break
await asyncio.sleep(0)
if not self._is_closing:
self._ws = None
await asyncio.sleep(5)
await self.connect()
async def send(self, message: str) -> bool:
if not self._ws:
if not await self.connect():
return False
try:
await self._ws.send(message.encode("utf-8"))
return True
except (websockets.exceptions.ConnectionClosed, Exception) as e:
logger.error(f"[GsCoreClient] 发送失败,尝试重连: {e}")
if await self.reconnect():
try:
await self._ws.send(message.encode("utf-8"))
return True
except Exception:
pass
return False
async def reconnect(self):
await self.close(manual=False)
return await self.connect()
async def close(self, manual: bool = True):
self._is_closing = manual
if self._recv_task:
self._recv_task.cancel()
self._recv_task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
gs_core_client = GsCoreClient()