221 lines
8.2 KiB
Python
221 lines
8.2 KiB
Python
from typing import Dict, Any, List, Optional, Tuple
|
|
import json
|
|
import markdown
|
|
from bs4 import BeautifulSoup
|
|
from loguru import logger as _logger
|
|
|
|
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
|
from base.plugin_common.plugin_interface import PluginStatus
|
|
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
|
|
from utils.decorator.plugin_decorators import plugin_stats_decorator
|
|
from utils.gscore_client import gs_core_client
|
|
from wechat_ipad import WechatAPIClient
|
|
|
|
|
|
class MessageNode:
|
|
def __init__(self, message: str):
|
|
self.type = "text"
|
|
self.data = message
|
|
|
|
def to_dict(self):
|
|
return {"type": self.type, "data": self.data}
|
|
|
|
|
|
class SenderDict:
|
|
def __init__(self, user: dict):
|
|
self.age = 0
|
|
self.area = user.get("Country", "未知")
|
|
self.card = user.get("NickName", {}).get("string", "")
|
|
self.level = ""
|
|
self.nickname = user.get("NickName", {}).get("string", "")
|
|
self.role = "owner"
|
|
self.sex = "男" if user.get("Sex") == 1 else ("女" if user.get("Sex") == 2 else "未知")
|
|
self.title = ""
|
|
self.user_id = 0
|
|
self.avater = user.get("SmallHeadImgUrl", "")
|
|
|
|
def to_dict(self):
|
|
return {
|
|
"age": self.age,
|
|
"area": self.area,
|
|
"card": self.card,
|
|
"level": self.level,
|
|
"nickname": self.nickname,
|
|
"role": self.role,
|
|
"sex": self.sex,
|
|
"title": self.title,
|
|
"user_id": self.user_id,
|
|
"avater": self.avater,
|
|
}
|
|
|
|
|
|
class GsCoreAdapterPlugin(MessagePluginInterface):
|
|
FEATURE_KEY = "GS_CORE_ADAPTER"
|
|
FEATURE_DESCRIPTION = "🌐 早柚核心适配 [早柚, 开启早柚, 关闭早柚, 重连早柚]"
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "早柚适配器"
|
|
|
|
@property
|
|
def version(self) -> str:
|
|
return "0.0.1"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return "将指令转发至早柚核心并回传处理结果"
|
|
|
|
@property
|
|
def author(self) -> str:
|
|
return "xuangeer"
|
|
|
|
@property
|
|
def command_prefix(self) -> Optional[str]:
|
|
return ""
|
|
|
|
@property
|
|
def commands(self) -> List[str]:
|
|
return self._commands
|
|
|
|
@property
|
|
def feature_key(self) -> Optional[str]:
|
|
return self.FEATURE_KEY
|
|
|
|
@property
|
|
def feature_description(self) -> Optional[str]:
|
|
return self.FEATURE_DESCRIPTION
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.gscore_url = ""
|
|
self.bot: Optional[WechatAPIClient] = None
|
|
self._commands = ["早柚", "重连早柚"]
|
|
self.feature = None
|
|
|
|
def initialize(self, context: Dict[str, Any]) -> bool:
|
|
self.LOG = _logger
|
|
self.event_system = context.get("event_system")
|
|
self.feature = self.register_feature()
|
|
plugin_cfg = self._config.get("GsCoreAdapter", {})
|
|
self.gscore_url = plugin_cfg.get("gscore_url", "")
|
|
self._commands = plugin_cfg.get("command", self._commands)
|
|
if not self.gscore_url:
|
|
self.LOG.warning(f"[{self.name}] gscore_url 未配置,跳过连接")
|
|
else:
|
|
from utils.gscore_client import gs_core_client
|
|
async def handler(payload: dict):
|
|
await self.message_handler_from_core(payload)
|
|
gs_core_client.configure(self.gscore_url, handler)
|
|
return True
|
|
|
|
def start(self) -> bool:
|
|
self.status = PluginStatus.RUNNING
|
|
return True
|
|
|
|
def stop(self) -> bool:
|
|
self.status = PluginStatus.STOPPED
|
|
return True
|
|
|
|
def can_process(self, message: Dict[str, Any]) -> bool:
|
|
content = str(message.get("content", "")).strip()
|
|
command = content.split(" ")[0] if content else ""
|
|
return command in self._commands
|
|
|
|
@plugin_stats_decorator(plugin_name="早柚适配器")
|
|
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
|
content = str(message.get("content", "")).strip()
|
|
sender = message.get("sender")
|
|
roomid = message.get("roomid", "")
|
|
gbm = message.get("gbm")
|
|
bot: WechatAPIClient = message.get("bot")
|
|
command = content.split(" ")[0]
|
|
if command in ["重连早柚"]:
|
|
admins = GroupBotManager.get_admin_list()
|
|
if sender not in admins:
|
|
await bot.send_text_message((roomid if roomid else sender), "无管理员权限", sender)
|
|
return True, "权限不足"
|
|
await self.reconnect()
|
|
await bot.send_text_message((roomid if roomid else sender), "重连早柚成功", sender)
|
|
return True, "重连成功"
|
|
if command == "早柚":
|
|
if roomid and gbm and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
|
|
return False, "没有权限"
|
|
if self.bot is None:
|
|
self.bot = bot
|
|
try:
|
|
user = await bot.get_contact(sender)
|
|
except Exception:
|
|
return False, "获取用户信息失败"
|
|
msg_text = content[len(command):].strip()
|
|
payload = {
|
|
"bot_id": "abot",
|
|
"sender": SenderDict(user).to_dict(),
|
|
"bot_self_id": bot.wxid,
|
|
"msg_id": "",
|
|
"user_type": "group" if roomid else "direct",
|
|
"group_id": roomid or "",
|
|
"user_id": sender,
|
|
"user_pm": 1 if sender in GroupBotManager.get_admin_list() else 6,
|
|
"content": [MessageNode(msg_text).to_dict()],
|
|
}
|
|
try:
|
|
ok = await gs_core_client.send(json.dumps(payload, ensure_ascii=False))
|
|
if ok:
|
|
return True, "发送成功"
|
|
return False, "发送失败"
|
|
except Exception as e:
|
|
self.LOG.error(f"[{self.name}] 发送消息到早柚核心失败: {e}")
|
|
return False, "发送失败"
|
|
return False, None
|
|
|
|
def parse_markdown(self, md_text: str):
|
|
try:
|
|
html = markdown.markdown(md_text)
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
text = soup.get_text()
|
|
images = []
|
|
for img in soup.find_all("img"):
|
|
img_src = img.get("src")
|
|
if img_src:
|
|
images.append(img_src)
|
|
return text, images
|
|
except Exception as e:
|
|
self.LOG.exception(f"[{self.name}] Markdown解析失败: {e}")
|
|
return md_text, []
|
|
|
|
async def message_handler_from_core(self, message_json: dict):
|
|
if not self.bot:
|
|
return
|
|
if message_json.get("bot_id") != "abot":
|
|
return
|
|
target_id = message_json.get("target_id", "")
|
|
content_list = message_json.get("content", [])
|
|
for msg in content_list:
|
|
t = msg.get("type")
|
|
if not t:
|
|
continue
|
|
try:
|
|
if t == "node":
|
|
data = msg.get("data", [])
|
|
for node in data:
|
|
if node.get("type") == "text":
|
|
await self.bot.send_text_message(target_id, node.get("data", ""))
|
|
if node.get("type") in ["image", "b64", "url"]:
|
|
jpg = str(node.get("data", "")).replace("base64://", "data:image/jpg;base64,")
|
|
await self.bot.send_image_message(target_id, jpg)
|
|
if t == "text":
|
|
await self.bot.send_text_message(target_id, msg.get("data", ""))
|
|
if t == "markdown":
|
|
text, images = self.parse_markdown(msg.get("data", ""))
|
|
await self.bot.send_text_message(target_id, text)
|
|
for img in images:
|
|
await self.bot.send_image_message(target_id, img)
|
|
if t == "image":
|
|
jpg = str(msg.get("data", "")).replace("base64://", "data:image/jpg;base64,")
|
|
await self.bot.send_image_message(target_id, jpg)
|
|
except Exception as e:
|
|
self.LOG.exception(f"[{self.name}] 转发消息失败(type={t}): {e}")
|
|
continue
|
|
async def reconnect(self):
|
|
await gs_core_client.reconnect()
|