from datetime import datetime from typing import Dict, Any, List, Optional, Tuple import xml.etree.ElementTree as ET import dacite from plugin_common.message_plugin_interface import MessagePluginInterface from plugin_common.plugin_interface import PluginStatus from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from message_util import MessageUtil # 导入消息工具类 from wechat_ipad import WechatAPIClient class GroupMemberChangePlugin(MessagePluginInterface): """群成员变更监控插件""" @property def name(self) -> str: return "群成员变更监控" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "监控群成员变动并发送通知" @property def author(self) -> str: return "Trae AI" def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def __init__(self): super().__init__() def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG.info(f"正在初始化 {self.name} 插件...") # 创建消息工具实例message_util self.message_util: MessageUtil = context.get("message_util") self.LOG.info(f"{self.name} 插件初始化完成") return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" return True async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理接收到的消息""" content = message.get("content") if hasattr(content, "clean_content"): content = content.clean_content content = str(content).strip() self.LOG.info(f"插件执行: {self.name}:{content}") if not content or " 👏欢迎 {nickname} 加入群聊!🎉 ⌚时间:{now} view 5 0 https://hot.imsyy.top/#/ {head_url} 0 0 0 Jyunere 0 1 """ await bot.send_link_xml_message(xml_content, roomid) return True, "已发送进群欢迎语" return False, "无需执行" @property def commands(self) -> List[str]: """插件支持的命令列表""" return [] def get_help(self) -> str: """获取插件帮助信息""" return "群成员变更监控插件:自动监控群成员变动并发送通知。" def _parse_member_info(self, root: ET.Element, link_name: str = "names") -> list[dict]: """解析新成员信息""" new_members = [] try: # 查找指定链接中的成员列表 names_link = root.find(f".//link[@name='{link_name}']") if names_link is None: return new_members memberlist = names_link.find("memberlist") if memberlist is None: return new_members for member in memberlist.findall("member"): username = member.find("username").text nickname = member.find("nickname").text new_members.append({ "wxid": username, "nickname": nickname }) except Exception as e: self.LOG.warning(f"解析新成员信息失败: {e}") return new_members