diff --git a/plugins/gscore_adapter/main.py b/plugins/gscore_adapter/main.py index 82091c6..478a5c9 100644 --- a/plugins/gscore_adapter/main.py +++ b/plugins/gscore_adapter/main.py @@ -1,7 +1,5 @@ from typing import Dict, Any, List, Optional, Tuple import json -import asyncio -import websockets import markdown from bs4 import BeautifulSoup from loguru import logger as _logger @@ -10,6 +8,7 @@ from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus from utils.decorator.plugin_decorators import plugin_stats_decorator +from utils.gscore_client import gs_core_client from wechat_ipad import WechatAPIClient @@ -88,7 +87,6 @@ class GsCoreAdapterPlugin(MessagePluginInterface): def __init__(self): super().__init__() - self.websocket = None self.gscore_url = "" self.bot: Optional[WechatAPIClient] = None self._commands = ["早柚", "重连早柚"] @@ -101,11 +99,13 @@ class GsCoreAdapterPlugin(MessagePluginInterface): plugin_cfg = self._config.get("GsCoreAdapter", {}) self.gscore_url = plugin_cfg.get("gscore_url", "") self._commands = plugin_cfg.get("command", self._commands) - # 连接早柚核心(未配置时记录提醒) if not self.gscore_url: self.LOG.warning(f"[{self.name}] gscore_url 未配置,跳过连接") else: - asyncio.create_task(self.connect()) + from utils.gscore_client import gs_core_client + async def handler(payload: dict): + await self.message_handler_from_core(payload) + gs_core_client.configure(self.gscore_url, handler) return True def start(self) -> bool: @@ -159,31 +159,15 @@ class GsCoreAdapterPlugin(MessagePluginInterface): "content": [MessageNode(msg_text).to_dict()], } try: - await self.send_message(json.dumps(payload, ensure_ascii=False)) - return True, "发送成功" - except Exception: + ok = await gs_core_client.send(json.dumps(payload, ensure_ascii=False)) + if ok: + return True, "发送成功" + return False, "发送失败" + except Exception as e: + self.LOG.error(f"[{self.name}] 发送消息到早柚核心失败: {e}") return False, "发送失败" return False, None - async def send_message(self, message: str): - if self.websocket: - await self.websocket.send(message.encode("utf-8")) - - async def receive_message(self): - if not self.websocket: - return - while True: - try: - message = await self.websocket.recv() - await self.message_handler(message) - except websockets.exceptions.ConnectionClosed as e: - self.LOG.warning(f"[{self.name}] WebSocket连接关闭: {e}") - break - except Exception as e: - self.LOG.exception(f"[{self.name}] 接收消息异常: {e}") - break - await asyncio.sleep(0) - def parse_markdown(self, md_text: str): try: html = markdown.markdown(md_text) @@ -199,15 +183,9 @@ class GsCoreAdapterPlugin(MessagePluginInterface): self.LOG.exception(f"[{self.name}] Markdown解析失败: {e}") return md_text, [] - async def message_handler(self, message: bytes): + async def message_handler_from_core(self, message_json: dict): if not self.bot: return - try: - message_str = message.decode("utf-8") - message_json = json.loads(message_str) - except Exception as e: - self.LOG.exception(f"[{self.name}] 解析核心返回失败: {e}") - return if message_json.get("bot_id") != "abot": return target_id = message_json.get("target_id", "") @@ -238,22 +216,5 @@ class GsCoreAdapterPlugin(MessagePluginInterface): except Exception as e: self.LOG.exception(f"[{self.name}] 转发消息失败(type={t}): {e}") continue - async def reconnect(self): - await self.close_connection() - await self.connect() - - async def close_connection(self): - if self.websocket: - await self.websocket.close() - self.websocket = None - - async def connect(self): - try: - self.LOG.info(f"[{self.name}] 连接早柚核心: {self.gscore_url}") - self.websocket = await websockets.connect(self.gscore_url, max_size=10**7) - asyncio.create_task(self.receive_message()) - except Exception as e: - self.LOG.error(f"[{self.name}] 连接早柚核心失败: {e}") - return False - return True + await gs_core_client.reconnect() diff --git a/utils/gscore_client.py b/utils/gscore_client.py new file mode 100644 index 0000000..2c92743 --- /dev/null +++ b/utils/gscore_client.py @@ -0,0 +1,94 @@ +import asyncio +import json +from typing import Optional, Awaitable, Callable + +import websockets +from loguru import logger + + +class GsCoreClient: + def __init__(self): + self._ws = None + self._url: str = "" + self._recv_task: Optional[asyncio.Task] = None + self._message_handler: Optional[Callable[[dict], Awaitable[None]]] = None + + def configure(self, url: str, handler: Callable[[dict], Awaitable[None]]): + self._url = url + self._message_handler = handler + + async def connect(self): + if not self._url: + logger.warning("[GsCoreClient] gscore_url 未配置,跳过连接") + return False + if self._ws is not None: + return True + try: + logger.info(f"[GsCoreClient] 连接早柚核心: {self._url}") + self._ws = await websockets.connect(self._url, max_size=10**7) + loop = asyncio.get_running_loop() + self._recv_task = loop.create_task(self._recv_loop()) + return True + except Exception as e: + logger.error(f"[GsCoreClient] 连接早柚核心失败: {e}") + self._ws = None + return False + + async def _recv_loop(self): + if not self._ws: + return + while True: + try: + msg = await self._ws.recv() + if isinstance(msg, bytes): + msg_str = msg.decode("utf-8") + else: + msg_str = msg + try: + payload = json.loads(msg_str) + except Exception as e: + logger.exception(f"[GsCoreClient] 解析核心返回失败: {e}") + continue + if self._message_handler: + try: + await self._message_handler(payload) + except Exception as e: + logger.exception(f"[GsCoreClient] 处理核心返回失败: {e}") + except websockets.exceptions.ConnectionClosed as e: + logger.warning(f"[GsCoreClient] WebSocket连接关闭: {e}") + break + except Exception as e: + logger.exception(f"[GsCoreClient] 接收消息异常: {e}") + break + await asyncio.sleep(0) + + async def send(self, message: str) -> bool: + if not self._ws: + ok = await self.connect() + if not ok: + return False + try: + await self._ws.send(message.encode("utf-8")) + return True + except Exception as e: + logger.error(f"[GsCoreClient] 发送消息失败: {e}") + return False + + async def reconnect(self): + await self.close() + await self.connect() + + async def close(self): + if self._recv_task: + self._recv_task.cancel() + self._recv_task = None + if self._ws: + try: + await self._ws.close() + except Exception: + pass + self._ws = None + + +gs_core_client = GsCoreClient() +