早柚模块接入。

This commit is contained in:
liuwei
2026-01-22 11:59:21 +08:00
parent ac8ebeffc8
commit 54bdc3a971
2 changed files with 107 additions and 52 deletions

View File

@@ -1,7 +1,5 @@
from typing import Dict, Any, List, Optional, Tuple from typing import Dict, Any, List, Optional, Tuple
import json import json
import asyncio
import websockets
import markdown import markdown
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from loguru import logger as _logger from loguru import logger as _logger
@@ -10,6 +8,7 @@ from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus from base.plugin_common.plugin_interface import PluginStatus
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.gscore_client import gs_core_client
from wechat_ipad import WechatAPIClient from wechat_ipad import WechatAPIClient
@@ -88,7 +87,6 @@ class GsCoreAdapterPlugin(MessagePluginInterface):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.websocket = None
self.gscore_url = "" self.gscore_url = ""
self.bot: Optional[WechatAPIClient] = None self.bot: Optional[WechatAPIClient] = None
self._commands = ["早柚", "重连早柚"] self._commands = ["早柚", "重连早柚"]
@@ -101,11 +99,13 @@ class GsCoreAdapterPlugin(MessagePluginInterface):
plugin_cfg = self._config.get("GsCoreAdapter", {}) plugin_cfg = self._config.get("GsCoreAdapter", {})
self.gscore_url = plugin_cfg.get("gscore_url", "") self.gscore_url = plugin_cfg.get("gscore_url", "")
self._commands = plugin_cfg.get("command", self._commands) self._commands = plugin_cfg.get("command", self._commands)
# 连接早柚核心(未配置时记录提醒)
if not self.gscore_url: if not self.gscore_url:
self.LOG.warning(f"[{self.name}] gscore_url 未配置,跳过连接") self.LOG.warning(f"[{self.name}] gscore_url 未配置,跳过连接")
else: else:
asyncio.create_task(self.connect()) from utils.gscore_client import gs_core_client
async def handler(payload: dict):
await self.message_handler_from_core(payload)
gs_core_client.configure(self.gscore_url, handler)
return True return True
def start(self) -> bool: def start(self) -> bool:
@@ -159,31 +159,15 @@ class GsCoreAdapterPlugin(MessagePluginInterface):
"content": [MessageNode(msg_text).to_dict()], "content": [MessageNode(msg_text).to_dict()],
} }
try: try:
await self.send_message(json.dumps(payload, ensure_ascii=False)) ok = await gs_core_client.send(json.dumps(payload, ensure_ascii=False))
return True, "发送成功" if ok:
except Exception: return True, "发送成功"
return False, "发送失败"
except Exception as e:
self.LOG.error(f"[{self.name}] 发送消息到早柚核心失败: {e}")
return False, "发送失败" return False, "发送失败"
return False, None return False, None
async def send_message(self, message: str):
if self.websocket:
await self.websocket.send(message.encode("utf-8"))
async def receive_message(self):
if not self.websocket:
return
while True:
try:
message = await self.websocket.recv()
await self.message_handler(message)
except websockets.exceptions.ConnectionClosed as e:
self.LOG.warning(f"[{self.name}] WebSocket连接关闭: {e}")
break
except Exception as e:
self.LOG.exception(f"[{self.name}] 接收消息异常: {e}")
break
await asyncio.sleep(0)
def parse_markdown(self, md_text: str): def parse_markdown(self, md_text: str):
try: try:
html = markdown.markdown(md_text) html = markdown.markdown(md_text)
@@ -199,15 +183,9 @@ class GsCoreAdapterPlugin(MessagePluginInterface):
self.LOG.exception(f"[{self.name}] Markdown解析失败: {e}") self.LOG.exception(f"[{self.name}] Markdown解析失败: {e}")
return md_text, [] return md_text, []
async def message_handler(self, message: bytes): async def message_handler_from_core(self, message_json: dict):
if not self.bot: if not self.bot:
return return
try:
message_str = message.decode("utf-8")
message_json = json.loads(message_str)
except Exception as e:
self.LOG.exception(f"[{self.name}] 解析核心返回失败: {e}")
return
if message_json.get("bot_id") != "abot": if message_json.get("bot_id") != "abot":
return return
target_id = message_json.get("target_id", "") target_id = message_json.get("target_id", "")
@@ -238,22 +216,5 @@ class GsCoreAdapterPlugin(MessagePluginInterface):
except Exception as e: except Exception as e:
self.LOG.exception(f"[{self.name}] 转发消息失败(type={t}): {e}") self.LOG.exception(f"[{self.name}] 转发消息失败(type={t}): {e}")
continue continue
async def reconnect(self): async def reconnect(self):
await self.close_connection() await gs_core_client.reconnect()
await self.connect()
async def close_connection(self):
if self.websocket:
await self.websocket.close()
self.websocket = None
async def connect(self):
try:
self.LOG.info(f"[{self.name}] 连接早柚核心: {self.gscore_url}")
self.websocket = await websockets.connect(self.gscore_url, max_size=10**7)
asyncio.create_task(self.receive_message())
except Exception as e:
self.LOG.error(f"[{self.name}] 连接早柚核心失败: {e}")
return False
return True

94
utils/gscore_client.py Normal file
View File

@@ -0,0 +1,94 @@
import asyncio
import json
from typing import Optional, Awaitable, Callable
import websockets
from loguru import logger
class GsCoreClient:
def __init__(self):
self._ws = None
self._url: str = ""
self._recv_task: Optional[asyncio.Task] = None
self._message_handler: Optional[Callable[[dict], Awaitable[None]]] = None
def configure(self, url: str, handler: Callable[[dict], Awaitable[None]]):
self._url = url
self._message_handler = handler
async def connect(self):
if not self._url:
logger.warning("[GsCoreClient] gscore_url 未配置,跳过连接")
return False
if self._ws is not None:
return True
try:
logger.info(f"[GsCoreClient] 连接早柚核心: {self._url}")
self._ws = await websockets.connect(self._url, max_size=10**7)
loop = asyncio.get_running_loop()
self._recv_task = loop.create_task(self._recv_loop())
return True
except Exception as e:
logger.error(f"[GsCoreClient] 连接早柚核心失败: {e}")
self._ws = None
return False
async def _recv_loop(self):
if not self._ws:
return
while True:
try:
msg = await self._ws.recv()
if isinstance(msg, bytes):
msg_str = msg.decode("utf-8")
else:
msg_str = msg
try:
payload = json.loads(msg_str)
except Exception as e:
logger.exception(f"[GsCoreClient] 解析核心返回失败: {e}")
continue
if self._message_handler:
try:
await self._message_handler(payload)
except Exception as e:
logger.exception(f"[GsCoreClient] 处理核心返回失败: {e}")
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"[GsCoreClient] WebSocket连接关闭: {e}")
break
except Exception as e:
logger.exception(f"[GsCoreClient] 接收消息异常: {e}")
break
await asyncio.sleep(0)
async def send(self, message: str) -> bool:
if not self._ws:
ok = await self.connect()
if not ok:
return False
try:
await self._ws.send(message.encode("utf-8"))
return True
except Exception as e:
logger.error(f"[GsCoreClient] 发送消息失败: {e}")
return False
async def reconnect(self):
await self.close()
await self.connect()
async def close(self):
if self._recv_task:
self._recv_task.cancel()
self._recv_task = None
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
gs_core_client = GsCoreClient()