Files
abot/plugins/group_member_change/main.py
2025-04-30 17:03:20 +08:00

217 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime
from typing import Dict, Any, List, Optional, Tuple
import xml.etree.ElementTree as ET
import dacite
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from message_util import MessageUtil # 导入消息工具类
from wechat_ipad import WechatAPIClient
class GroupMemberChangePlugin(MessagePluginInterface):
"""群成员变更监控插件"""
@property
def name(self) -> str:
return "群成员变更监控"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "监控群成员变动并发送通知"
@property
def author(self) -> str:
return "Trae AI"
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def __init__(self):
super().__init__()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 创建消息工具实例message_util
self.message_util: MessageUtil = context.get("message_util")
self.LOG.info(f"{self.name} 插件初始化完成")
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
return True
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理接收到的消息"""
content = message.get("content")
if hasattr(content, "clean_content"):
content = content.clean_content
content = str(content).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
if not content or "<sysmsg" not in content:
return False, "非系统消息"
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.GROUP_MEMBER_CHANGE) == PermissionStatus.DISABLED:
return False, "没有权限"
xml_content = str(content).strip().replace("\n", "").replace("\t", "")
root = ET.fromstring(xml_content)
if root.tag != "sysmsg":
return False, "非本次需要处理消息"
# 检查是否是进群消息
if root.attrib.get("type") == "sysmsgtemplate":
sys_msg_template = root.find("sysmsgtemplate")
if sys_msg_template is None:
return False, "非本次需要处理消息"
template = sys_msg_template.find("content_template")
if template is None:
return False, "非本次需要处理消息"
template_type = template.attrib.get("type")
if template_type not in ["tmpl_type_profile", "tmpl_type_profilewithrevoke"]:
return False, "非本次需要处理消息"
template_text = template.find("template").text
if '"$names$"加入了群聊' in template_text: # 直接加入群聊
new_members = self._parse_member_info(root, "names")
elif '"$username$"邀请"$names$"加入了群聊' in template_text: # 通过邀请加入群聊
new_members = self._parse_member_info(root, "names")
elif '你邀请"$names$"加入了群聊' in template_text: # 自己邀请成员加入群聊
new_members = self._parse_member_info(root, "names")
elif '"$adder$"通过扫描"$from$"分享的二维码加入群聊' in template_text: # 通过二维码加入群聊
new_members = self._parse_member_info(root, "adder")
elif '"$adder$"通过"$from$"的邀请二维码加入群聊' in template_text:
new_members = self._parse_member_info(root, "adder")
else:
self.LOG.warning(f"未知的入群方式: {template_text}")
return False, "非本次需要处理消息"
if not new_members:
return False, "非本次需要处理消息"
for member in new_members:
wxid = member["wxid"]
nickname = member["nickname"]
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
member_wxids = [wxid]
await bot.send_at_message(roomid, f"👏欢迎 {nickname} 加入群聊!🎉", member_wxids)
members = await bot.get_chatroom_member_detail(wxid, roomid)
head_url = members.get("SmallHeadImgUrl") or members.get("BigHeadImgUrl") or ""
xml_content = f"""
<appmsg appid="" sdkver="1">
<title>👏欢迎 {nickname} 加入群聊!🎉</title>
<des>⌚时间:{now}</des>
<action>view</action>
<type>5</type>
<showtype>0</showtype>
<content />
<url>https://hot.imsyy.top/#/</url>
<dataurl />
<lowurl />
<lowdataurl />
<recorditem />
<thumburl>{head_url}</thumburl>
<messageaction />
<laninfo />
<extinfo />
<sourceusername />
<sourcedisplayname />
<commenturl />
<appattach>
<totallen>0</totallen>
<attachid />
<emoticonmd5 />
<fileext />
<aeskey />
</appattach>
<webviewshared>
<publisherId />
<publisherReqId>0</publisherReqId>
</webviewshared>
<weappinfo>
<pagepath />
<username />
<appid />
<appservicetype>0</appservicetype>
</weappinfo>
<websearch />
</appmsg>
<fromusername>Jyunere</fromusername>
<scene>0</scene>
<appinfo>
<version>1</version>
<appname></appname>
</appinfo>
<commenturl></commenturl>
"""
await bot.send_link_xml_message(xml_content, roomid)
return True, "已发送进群欢迎语"
return False, "无需执行"
@property
def commands(self) -> List[str]:
"""插件支持的命令列表"""
return []
def get_help(self) -> str:
"""获取插件帮助信息"""
return "群成员变更监控插件:自动监控群成员变动并发送通知。"
def _parse_member_info(self, root: ET.Element, link_name: str = "names") -> list[dict]:
"""解析新成员信息"""
new_members = []
try:
# 查找指定链接中的成员列表
names_link = root.find(f".//link[@name='{link_name}']")
if names_link is None:
return new_members
memberlist = names_link.find("memberlist")
if memberlist is None:
return new_members
for member in memberlist.findall("member"):
username = member.find("username").text
nickname = member.find("nickname").text
new_members.append({
"wxid": username,
"nickname": nickname
})
except Exception as e:
self.LOG.warning(f"解析新成员信息失败: {e}")
return new_members