import xml.etree.ElementTree as ET from datetime import datetime from typing import Dict, Any, List, Optional, Tuple from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from db.connection import DBConnectionManager from db.contacts_db import ContactsDBOperator from utils.robot_cmd.robot_command import PermissionStatus, GroupBotManager from utils.wechat.contact_manager import ContactManager from wechat_ipad import WechatAPIClient from wechat_ipad.models.appmsg_xml import LINK_XML_WELCOME class GroupMemberChangePlugin(MessagePluginInterface): """群成员变更监控插件""" # 功能权限常量 FEATURE_KEY = "GROUP_MEMBER_CHANGE" FEATURE_DESCRIPTION = "👥 群成员变更监控 [自动监控群成员变动并发送通知]" @property def name(self) -> str: return "群成员变更监控" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "监控群成员变动并发送通知" @property def author(self) -> str: return "liu.wei" def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def __init__(self): super().__init__() # 注册功能权限 self.feature = self.register_feature() def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG.info(f"正在初始化 {self.name} 插件...") self.LOG.info(f"{self.name} 插件初始化完成") return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" content = message.get("content") if not content or " Tuple[bool, Optional[str]]: """处理接收到的消息""" content = message.get("content") if hasattr(content, "clean_content"): content = content.clean_content content = str(content).strip() self.LOG.debug(f"插件执行: {self.name}:{content}") sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") # 检查权限 if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" xml_content = str(content).strip().replace("\n", "").replace("\t", "") root = ET.fromstring(xml_content) if root.tag != "sysmsg": return False, "非本次需要处理消息" # 检查是否是进群消息 if root.attrib.get("type") == "sysmsgtemplate": sys_msg_template = root.find("sysmsgtemplate") if sys_msg_template is None: return False, "非本次需要处理消息" template = sys_msg_template.find("content_template") if template is None: return False, "非本次需要处理消息" template_type = template.attrib.get("type") if template_type not in ["tmpl_type_profile", "tmpl_type_profilewithrevoke"]: return False, "非本次需要处理消息" template_text = template.find("template").text if '"$names$"加入了群聊' in template_text: # 直接加入群聊 new_members = self._parse_member_info(root, "names") elif '"$username$"邀请"$names$"加入了群聊' in template_text: # 通过邀请加入群聊 new_members = self._parse_member_info(root, "names") elif '你邀请"$names$"加入了群聊' in template_text: # 自己邀请成员加入群聊 new_members = self._parse_member_info(root, "names") elif '"$adder$"通过扫描"$from$"分享的二维码加入群聊' in template_text: # 通过二维码加入群聊 new_members = self._parse_member_info(root, "adder") elif '"$adder$"通过"$from$"的邀请二维码加入群聊' in template_text: new_members = self._parse_member_info(root, "adder") else: self.LOG.warning(f"未知的入群方式: {template_text}") return False, "非本次需要处理消息" if not new_members: return False, "非本次需要处理消息" for member in new_members: wxid = member["wxid"] nickname = member["nickname"] now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") member_wxids = [wxid] await bot.send_at_message(roomid, f"👏欢迎 {nickname} 加入群聊!🎉", member_wxids) members = await bot.get_chatroom_member_detail(wxid, roomid) head_url = members.get("SmallHeadImgUrl") or members.get("BigHeadImgUrl") or "" try: # 更新联系人信息 ContactManager.get_instance().update_head_image(wxid, head_url) ContactManager.get_instance().update_group_members(roomid, wxid, nickname) # 入库 contact_db: ContactsDBOperator = ContactsDBOperator(DBConnectionManager.get_instance()) member_details: List[Dict] = [members] contact_db.save_chatroom_member_simple(roomid, member_details) except Exception as e: self.LOG.warning(f"新增群员信息失败: {e}") xml_content = f"{LINK_XML_WELCOME}".format(nickname=nickname, now=now, head_url=head_url) await bot.send_link_xml_message(xml_content, roomid) return True, "已发送进群欢迎语" return False, "无需执行" @property def commands(self) -> List[str]: """插件支持的命令列表""" return [] @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def get_help(self) -> str: """获取插件帮助信息""" return "群成员变更监控插件:自动监控群成员变动并发送通知。" def _parse_member_info(self, root: ET.Element, link_name: str = "names") -> list[dict]: """解析新成员信息""" new_members = [] try: # 查找指定链接中的成员列表 names_link = root.find(f".//link[@name='{link_name}']") if names_link is None: return new_members memberlist = names_link.find("memberlist") if memberlist is None: return new_members for member in memberlist.findall("member"): username = member.find("username").text nickname = member.find("nickname").text new_members.append({ "wxid": username, "nickname": nickname }) except Exception as e: self.LOG.warning(f"解析新成员信息失败: {e}") return new_members