Files
abot/utils/gscore_client.py
2026-02-02 10:43:51 +08:00

124 lines
4.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
from typing import Optional, Awaitable, Callable
import websockets
from loguru import logger
class GsCoreClient:
def __init__(self):
self._ws = None
self._url: str = ""
self._token: str = ""
self._recv_task: Optional[asyncio.Task] = None
self._message_handler: Optional[Callable[[dict], Awaitable[None]]] = None
self._is_closing = False
def configure(self, url: str, handler: Callable[[dict], Awaitable[None]], token: str = "liuwei"):
self._url = url
self._message_handler = handler
self._token = token
async def connect(self):
if not self._url:
logger.warning("[GsCoreClient] gscore_url 未配置,跳过连接")
return False
if self._ws is not None:
return True
try:
# 兼容性方案:如果 Header 传参报错,尝试将 token 附加在 URL 参数中
# 格式变为: ws://ip:port/ws/abot?token=liuwei
connector_url = self._url
if "?" in connector_url:
connector_url += f"&token={self._token}"
else:
connector_url += f"?token={self._token}"
logger.info(f"[GsCoreClient] 正在尝试连接 (兼容模式): {connector_url}")
# 去掉 extra_headers避免触发底层 create_connection 的参数检查
self._ws = await websockets.connect(
connector_url,
max_size=10 ** 7
)
self._is_closing = False
if self._recv_task and not self._recv_task.done():
self._recv_task.cancel()
loop = asyncio.get_running_loop()
self._recv_task = loop.create_task(self._recv_loop())
logger.success("[GsCoreClient] 连接成功")
return True
except Exception as e:
logger.error(f"[GsCoreClient] 连接失败: {e}")
self._ws = None
return False
async def _recv_loop(self):
while not self._is_closing:
try:
if not self._ws:
break
msg = await self._ws.recv()
msg_str = msg.decode("utf-8") if isinstance(msg, bytes) else msg
try:
payload = json.loads(msg_str)
if self._message_handler:
await self._message_handler(payload)
except Exception as e:
logger.exception(f"[GsCoreClient] 解析返回失败: {e}")
except websockets.exceptions.ConnectionClosed as e:
if not self._is_closing:
logger.warning(f"[GsCoreClient] 连接断开 ({e.code})5秒后重连...")
break
else:
break
except Exception as e:
logger.exception(f"[GsCoreClient] 接收异常: {e}")
break
await asyncio.sleep(0.1)
if not self._is_closing:
self._ws = None
await asyncio.sleep(5)
await self.connect()
async def send(self, message: str) -> bool:
if not self._ws:
if not await self.connect():
return False
try:
# 显式检查连接状态
await self._ws.send(message.encode("utf-8"))
return True
except (websockets.exceptions.ConnectionClosed, Exception) as e:
logger.error(f"[GsCoreClient] 发送失败,尝试重连: {e}")
if await self.reconnect():
try:
await self._ws.send(message.encode("utf-8"))
return True
except Exception:
pass
return False
async def reconnect(self):
await self.close(manual=False)
return await self.connect()
async def close(self, manual: bool = True):
self._is_closing = manual
if self._recv_task:
self._recv_task.cancel()
self._recv_task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
gs_core_client = GsCoreClient()