124 lines
4.2 KiB
Python
124 lines
4.2 KiB
Python
import asyncio
|
||
import json
|
||
from typing import Optional, Awaitable, Callable
|
||
|
||
import websockets
|
||
from loguru import logger
|
||
|
||
|
||
class GsCoreClient:
|
||
def __init__(self):
|
||
self._ws = None
|
||
self._url: str = ""
|
||
self._token: str = ""
|
||
self._recv_task: Optional[asyncio.Task] = None
|
||
self._message_handler: Optional[Callable[[dict], Awaitable[None]]] = None
|
||
self._is_closing = False
|
||
|
||
def configure(self, url: str, handler: Callable[[dict], Awaitable[None]], token: str = "liuwei"):
|
||
self._url = url
|
||
self._message_handler = handler
|
||
self._token = token
|
||
|
||
async def connect(self):
|
||
if not self._url:
|
||
logger.warning("[GsCoreClient] gscore_url 未配置,跳过连接")
|
||
return False
|
||
if self._ws is not None:
|
||
return True
|
||
try:
|
||
# 兼容性方案:如果 Header 传参报错,尝试将 token 附加在 URL 参数中
|
||
# 格式变为: ws://ip:port/ws/abot?token=liuwei
|
||
connector_url = self._url
|
||
if "?" in connector_url:
|
||
connector_url += f"&token={self._token}"
|
||
else:
|
||
connector_url += f"?token={self._token}"
|
||
|
||
logger.info(f"[GsCoreClient] 正在尝试连接 (兼容模式): {connector_url}")
|
||
|
||
# 去掉 extra_headers,避免触发底层 create_connection 的参数检查
|
||
self._ws = await websockets.connect(
|
||
connector_url,
|
||
max_size=10 ** 7
|
||
)
|
||
|
||
self._is_closing = False
|
||
|
||
if self._recv_task and not self._recv_task.done():
|
||
self._recv_task.cancel()
|
||
|
||
loop = asyncio.get_running_loop()
|
||
self._recv_task = loop.create_task(self._recv_loop())
|
||
logger.success("[GsCoreClient] 连接成功")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"[GsCoreClient] 连接失败: {e}")
|
||
self._ws = None
|
||
return False
|
||
|
||
async def _recv_loop(self):
|
||
while not self._is_closing:
|
||
try:
|
||
if not self._ws:
|
||
break
|
||
msg = await self._ws.recv()
|
||
msg_str = msg.decode("utf-8") if isinstance(msg, bytes) else msg
|
||
try:
|
||
payload = json.loads(msg_str)
|
||
if self._message_handler:
|
||
await self._message_handler(payload)
|
||
except Exception as e:
|
||
logger.exception(f"[GsCoreClient] 解析返回失败: {e}")
|
||
except websockets.exceptions.ConnectionClosed as e:
|
||
if not self._is_closing:
|
||
logger.warning(f"[GsCoreClient] 连接断开 ({e.code}),5秒后重连...")
|
||
break
|
||
else:
|
||
break
|
||
except Exception as e:
|
||
logger.exception(f"[GsCoreClient] 接收异常: {e}")
|
||
break
|
||
await asyncio.sleep(0.1)
|
||
|
||
if not self._is_closing:
|
||
self._ws = None
|
||
await asyncio.sleep(5)
|
||
await self.connect()
|
||
|
||
async def send(self, message: str) -> bool:
|
||
if not self._ws:
|
||
if not await self.connect():
|
||
return False
|
||
try:
|
||
# 显式检查连接状态
|
||
await self._ws.send(message.encode("utf-8"))
|
||
return True
|
||
except (websockets.exceptions.ConnectionClosed, Exception) as e:
|
||
logger.error(f"[GsCoreClient] 发送失败,尝试重连: {e}")
|
||
if await self.reconnect():
|
||
try:
|
||
await self._ws.send(message.encode("utf-8"))
|
||
return True
|
||
except Exception:
|
||
pass
|
||
return False
|
||
|
||
async def reconnect(self):
|
||
await self.close(manual=False)
|
||
return await self.connect()
|
||
|
||
async def close(self, manual: bool = True):
|
||
self._is_closing = manual
|
||
if self._recv_task:
|
||
self._recv_task.cancel()
|
||
self._recv_task = None
|
||
if self._ws:
|
||
try:
|
||
await self._ws.close()
|
||
except Exception:
|
||
pass
|
||
self._ws = None
|
||
|
||
|
||
gs_core_client = GsCoreClient() |