167 lines
6.6 KiB
Python
167 lines
6.6 KiB
Python
import logging
|
||
from datetime import datetime
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
|
||
import xml.etree.ElementTree as ET
|
||
|
||
import dacite
|
||
|
||
from gewechat.client import gewe_client
|
||
from gewechat.response.model.group.chatroom_member_detail import ChatroomMemberDetail
|
||
from plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from plugin_common.plugin_interface import PluginStatus
|
||
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
|
||
from message_util import MessageUtil # 导入消息工具类
|
||
|
||
|
||
class GroupMemberChangePlugin(MessagePluginInterface):
|
||
"""群成员变更监控插件"""
|
||
@property
|
||
def name(self) -> str:
|
||
return "群成员变更监控"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "监控群成员变动并发送通知"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "Trae AI"
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.LOG = logging.getLogger(f"Plugin.{self.name}")
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG.info(f"正在初始化 {self.name} 插件...")
|
||
|
||
# 创建消息工具实例message_util
|
||
self.message_util: MessageUtil = context.get("message_util")
|
||
|
||
self.LOG.info(f"{self.name} 插件初始化完成")
|
||
return True
|
||
|
||
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理接收到的消息"""
|
||
|
||
content = str(message.get("content", "")).strip()
|
||
self.LOG.info(f"插件执行: {self.name}:{content}")
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
|
||
# 检查权限
|
||
if roomid and gbm.get_group_permission(roomid, Feature.GROUP_MEMBER_CHANGE) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
|
||
xml_content = str(content).strip().replace("\n", "").replace("\t", "")
|
||
root = ET.fromstring(xml_content)
|
||
|
||
if root.tag != "sysmsg":
|
||
return False, "非本次需要处理消息"
|
||
|
||
# 检查是否是进群消息
|
||
if root.attrib.get("type") == "sysmsgtemplate":
|
||
sys_msg_template = root.find("sysmsgtemplate")
|
||
if sys_msg_template is None:
|
||
return False, "非本次需要处理消息"
|
||
|
||
template = sys_msg_template.find("content_template")
|
||
if template is None:
|
||
return False, "非本次需要处理消息"
|
||
|
||
template_type = template.attrib.get("type")
|
||
if template_type not in ["tmpl_type_profile", "tmpl_type_profilewithrevoke"]:
|
||
return False, "非本次需要处理消息"
|
||
|
||
template_text = template.find("template").text
|
||
|
||
if '"$names$"加入了群聊' in template_text: # 直接加入群聊
|
||
new_members = self._parse_member_info(root, "names")
|
||
elif '"$username$"邀请"$names$"加入了群聊' in template_text: # 通过邀请加入群聊
|
||
new_members = self._parse_member_info(root, "names")
|
||
elif '你邀请"$names$"加入了群聊' in template_text: # 自己邀请成员加入群聊
|
||
new_members = self._parse_member_info(root, "names")
|
||
elif '"$adder$"通过扫描"$from$"分享的二维码加入群聊' in template_text: # 通过二维码加入群聊
|
||
new_members = self._parse_member_info(root, "adder")
|
||
elif '"$adder$"通过"$from$"的邀请二维码加入群聊' in template_text:
|
||
new_members = self._parse_member_info(root, "adder")
|
||
else:
|
||
self.LOG.warning(f"未知的入群方式: {template_text}")
|
||
return False, "非本次需要处理消息"
|
||
|
||
if not new_members:
|
||
return False, "非本次需要处理消息"
|
||
|
||
for member in new_members:
|
||
wxid = member["wxid"]
|
||
nickname = member["nickname"]
|
||
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
member_wxids = [wxid]
|
||
profile: ChatroomMemberDetail = dacite.from_dict(ChatroomMemberDetail,
|
||
gewe_client.client.get_chatroom_member_detail(
|
||
gewe_client.client.app_id,
|
||
member_wxids))
|
||
if profile.ret == 200:
|
||
gewe_client.client.post_link(gewe_client.client.app_id, sender,
|
||
title=f"👏欢迎 {nickname} 加入群聊!🎉",
|
||
description=f"⌚时间:{now}\n",
|
||
url="https://hot.imsyy.top/#/",
|
||
thumb_url=profile.data[0].big_head_img_url)
|
||
return True, "已发送进群欢迎语"
|
||
return False, "无需执行"
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
"""插件支持的命令列表"""
|
||
return []
|
||
|
||
def get_help(self) -> str:
|
||
"""获取插件帮助信息"""
|
||
return "群成员变更监控插件:自动监控群成员变动并发送通知。"
|
||
|
||
def _parse_member_info(self, root: ET.Element, link_name: str = "names") -> list[dict]:
|
||
"""解析新成员信息"""
|
||
new_members = []
|
||
try:
|
||
# 查找指定链接中的成员列表
|
||
names_link = root.find(f".//link[@name='{link_name}']")
|
||
if names_link is None:
|
||
return new_members
|
||
|
||
memberlist = names_link.find("memberlist")
|
||
|
||
if memberlist is None:
|
||
return new_members
|
||
|
||
for member in memberlist.findall("member"):
|
||
username = member.find("username").text
|
||
nickname = member.find("nickname").text
|
||
new_members.append({
|
||
"wxid": username,
|
||
"nickname": nickname
|
||
})
|
||
|
||
except Exception as e:
|
||
self.LOG.warning(f"解析新成员信息失败: {e}")
|
||
|
||
return new_members
|