192 lines
7.5 KiB
Python
192 lines
7.5 KiB
Python
import xml.etree.ElementTree as ET
|
||
from datetime import datetime
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from db.connection import DBConnectionManager
|
||
from db.contacts_db import ContactsDBOperator
|
||
from utils.robot_cmd.robot_command import PermissionStatus, GroupBotManager
|
||
from utils.wechat.contact_manager import ContactManager
|
||
from wechat_ipad import WechatAPIClient
|
||
from wechat_ipad.models.appmsg_xml import LINK_XML_WELCOME
|
||
|
||
|
||
class GroupMemberChangePlugin(MessagePluginInterface):
|
||
"""群成员变更监控插件"""
|
||
|
||
# 功能权限常量
|
||
FEATURE_KEY = "GROUP_MEMBER_CHANGE"
|
||
FEATURE_DESCRIPTION = "👥 群成员变更监控 [自动监控群成员变动并发送通知]"
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "群成员变更监控"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "监控群成员变动并发送通知"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "liu.wei"
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.debug(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
# 注册功能权限
|
||
self.feature = self.register_feature()
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG.debug(f"正在初始化 {self.name} 插件...")
|
||
|
||
self.LOG.debug(f"{self.name} 插件初始化完成")
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
content = message.get("content")
|
||
if not content or "<sysmsg" not in content:
|
||
return False
|
||
return True
|
||
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理接收到的消息"""
|
||
content = message.get("content")
|
||
if hasattr(content, "clean_content"):
|
||
content = content.clean_content
|
||
content = str(content).strip()
|
||
self.LOG.debug(f"插件执行: {self.name}:{content}")
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
|
||
bot: WechatAPIClient = message.get("bot")
|
||
# 检查权限
|
||
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
|
||
xml_content = str(content).strip().replace("\n", "").replace("\t", "")
|
||
|
||
root = ET.fromstring(xml_content)
|
||
if root.tag != "sysmsg":
|
||
return False, "非本次需要处理消息"
|
||
|
||
# 检查是否是进群消息
|
||
if root.attrib.get("type") == "sysmsgtemplate":
|
||
sys_msg_template = root.find("sysmsgtemplate")
|
||
if sys_msg_template is None:
|
||
return False, "非本次需要处理消息"
|
||
|
||
template = sys_msg_template.find("content_template")
|
||
if template is None:
|
||
return False, "非本次需要处理消息"
|
||
|
||
template_type = template.attrib.get("type")
|
||
if template_type not in ["tmpl_type_profile", "tmpl_type_profilewithrevoke"]:
|
||
return False, "非本次需要处理消息"
|
||
|
||
template_text = template.find("template").text
|
||
|
||
if '"$names$"加入了群聊' in template_text: # 直接加入群聊
|
||
new_members = self._parse_member_info(root, "names")
|
||
elif '"$username$"邀请"$names$"加入了群聊' in template_text: # 通过邀请加入群聊
|
||
new_members = self._parse_member_info(root, "names")
|
||
elif '你邀请"$names$"加入了群聊' in template_text: # 自己邀请成员加入群聊
|
||
new_members = self._parse_member_info(root, "names")
|
||
elif '"$adder$"通过扫描"$from$"分享的二维码加入群聊' in template_text: # 通过二维码加入群聊
|
||
new_members = self._parse_member_info(root, "adder")
|
||
elif '"$adder$"通过"$from$"的邀请二维码加入群聊' in template_text:
|
||
new_members = self._parse_member_info(root, "adder")
|
||
else:
|
||
self.LOG.warning(f"未知的入群方式: {template_text}")
|
||
return False, "非本次需要处理消息"
|
||
|
||
if not new_members:
|
||
return False, "非本次需要处理消息"
|
||
|
||
for member in new_members:
|
||
wxid = member["wxid"]
|
||
nickname = member["nickname"]
|
||
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
member_wxids = [wxid]
|
||
await bot.send_at_message(roomid, f"👏欢迎 {nickname} 加入群聊!🎉", member_wxids)
|
||
members = await bot.get_chatroom_member_detail(wxid, roomid)
|
||
head_url = members.get("SmallHeadImgUrl") or members.get("BigHeadImgUrl") or ""
|
||
try:
|
||
# 更新联系人信息
|
||
ContactManager.get_instance().update_head_image(wxid, head_url)
|
||
ContactManager.get_instance().update_group_members(roomid, wxid, nickname)
|
||
# 入库
|
||
contact_db: ContactsDBOperator = ContactsDBOperator(DBConnectionManager.get_instance())
|
||
member_details: List[Dict] = [members]
|
||
contact_db.save_chatroom_member_simple(roomid, member_details)
|
||
except Exception as e:
|
||
self.LOG.warning(f"新增群员信息失败: {e}")
|
||
xml_content = f"{LINK_XML_WELCOME}".format(nickname=nickname, now=now, head_url=head_url)
|
||
|
||
await bot.send_link_xml_message(xml_content, roomid)
|
||
return True, "已发送进群欢迎语"
|
||
return False, "无需执行"
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
"""插件支持的命令列表"""
|
||
return []
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
def get_help(self) -> str:
|
||
"""获取插件帮助信息"""
|
||
return "群成员变更监控插件:自动监控群成员变动并发送通知。"
|
||
|
||
def _parse_member_info(self, root: ET.Element, link_name: str = "names") -> list[dict]:
|
||
"""解析新成员信息"""
|
||
new_members = []
|
||
try:
|
||
# 查找指定链接中的成员列表
|
||
names_link = root.find(f".//link[@name='{link_name}']")
|
||
if names_link is None:
|
||
return new_members
|
||
|
||
memberlist = names_link.find("memberlist")
|
||
|
||
if memberlist is None:
|
||
return new_members
|
||
|
||
for member in memberlist.findall("member"):
|
||
username = member.find("username").text
|
||
nickname = member.find("nickname").text
|
||
new_members.append({
|
||
"wxid": username,
|
||
"nickname": nickname
|
||
})
|
||
|
||
except Exception as e:
|
||
self.LOG.warning(f"解析新成员信息失败: {e}")
|
||
|
||
return new_members
|