212 lines
7.1 KiB
Python
212 lines
7.1 KiB
Python
from datetime import datetime
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
|
||
import xml.etree.ElementTree as ET
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
|
||
from wechat_ipad import WechatAPIClient
|
||
|
||
|
||
class GroupMemberChangePlugin(MessagePluginInterface):
|
||
"""群成员变更监控插件"""
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "群成员变更监控"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "监控群成员变动并发送通知"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "Trae AI"
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG.info(f"正在初始化 {self.name} 插件...")
|
||
|
||
self.LOG.info(f"{self.name} 插件初始化完成")
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
content = message.get("content")
|
||
if not content or "<sysmsg" not in content:
|
||
return False
|
||
return True
|
||
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理接收到的消息"""
|
||
content = message.get("content")
|
||
if hasattr(content, "clean_content"):
|
||
content = content.clean_content
|
||
content = str(content).strip()
|
||
self.LOG.debug(f"插件执行: {self.name}:{content}")
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
|
||
bot: WechatAPIClient = message.get("bot")
|
||
# 检查权限
|
||
if roomid and gbm.get_group_permission(roomid, Feature.GROUP_MEMBER_CHANGE) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
|
||
xml_content = str(content).strip().replace("\n", "").replace("\t", "")
|
||
|
||
root = ET.fromstring(xml_content)
|
||
if root.tag != "sysmsg":
|
||
return False, "非本次需要处理消息"
|
||
|
||
# 检查是否是进群消息
|
||
if root.attrib.get("type") == "sysmsgtemplate":
|
||
sys_msg_template = root.find("sysmsgtemplate")
|
||
if sys_msg_template is None:
|
||
return False, "非本次需要处理消息"
|
||
|
||
template = sys_msg_template.find("content_template")
|
||
if template is None:
|
||
return False, "非本次需要处理消息"
|
||
|
||
template_type = template.attrib.get("type")
|
||
if template_type not in ["tmpl_type_profile", "tmpl_type_profilewithrevoke"]:
|
||
return False, "非本次需要处理消息"
|
||
|
||
template_text = template.find("template").text
|
||
|
||
if '"$names$"加入了群聊' in template_text: # 直接加入群聊
|
||
new_members = self._parse_member_info(root, "names")
|
||
elif '"$username$"邀请"$names$"加入了群聊' in template_text: # 通过邀请加入群聊
|
||
new_members = self._parse_member_info(root, "names")
|
||
elif '你邀请"$names$"加入了群聊' in template_text: # 自己邀请成员加入群聊
|
||
new_members = self._parse_member_info(root, "names")
|
||
elif '"$adder$"通过扫描"$from$"分享的二维码加入群聊' in template_text: # 通过二维码加入群聊
|
||
new_members = self._parse_member_info(root, "adder")
|
||
elif '"$adder$"通过"$from$"的邀请二维码加入群聊' in template_text:
|
||
new_members = self._parse_member_info(root, "adder")
|
||
else:
|
||
self.LOG.warning(f"未知的入群方式: {template_text}")
|
||
return False, "非本次需要处理消息"
|
||
|
||
if not new_members:
|
||
return False, "非本次需要处理消息"
|
||
|
||
for member in new_members:
|
||
wxid = member["wxid"]
|
||
nickname = member["nickname"]
|
||
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
member_wxids = [wxid]
|
||
|
||
await bot.send_at_message(roomid, f"👏欢迎 {nickname} 加入群聊!🎉", member_wxids)
|
||
members = await bot.get_chatroom_member_detail(wxid, roomid)
|
||
head_url = members.get("SmallHeadImgUrl") or members.get("BigHeadImgUrl") or ""
|
||
|
||
xml_content = f"""
|
||
<appmsg appid="" sdkver="1">
|
||
<title>👏欢迎 {nickname} 加入群聊!🎉</title>
|
||
<des>⌚时间:{now}</des>
|
||
<action>view</action>
|
||
<type>5</type>
|
||
<showtype>0</showtype>
|
||
<content />
|
||
<url>https://hot.imsyy.top/#/</url>
|
||
<dataurl />
|
||
<lowurl />
|
||
<lowdataurl />
|
||
<recorditem />
|
||
<thumburl>{head_url}</thumburl>
|
||
<messageaction />
|
||
<laninfo />
|
||
<extinfo />
|
||
<sourceusername />
|
||
<sourcedisplayname />
|
||
<commenturl />
|
||
<appattach>
|
||
<totallen>0</totallen>
|
||
<attachid />
|
||
<emoticonmd5 />
|
||
<fileext />
|
||
<aeskey />
|
||
</appattach>
|
||
<webviewshared>
|
||
<publisherId />
|
||
<publisherReqId>0</publisherReqId>
|
||
</webviewshared>
|
||
<weappinfo>
|
||
<pagepath />
|
||
<username />
|
||
<appid />
|
||
<appservicetype>0</appservicetype>
|
||
</weappinfo>
|
||
<websearch />
|
||
</appmsg>
|
||
<fromusername>Jyunere</fromusername>
|
||
<scene>0</scene>
|
||
<appinfo>
|
||
<version>1</version>
|
||
<appname></appname>
|
||
</appinfo>
|
||
<commenturl></commenturl>
|
||
"""
|
||
await bot.send_link_xml_message(xml_content, roomid)
|
||
return True, "已发送进群欢迎语"
|
||
return False, "无需执行"
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
"""插件支持的命令列表"""
|
||
return []
|
||
|
||
def get_help(self) -> str:
|
||
"""获取插件帮助信息"""
|
||
return "群成员变更监控插件:自动监控群成员变动并发送通知。"
|
||
|
||
def _parse_member_info(self, root: ET.Element, link_name: str = "names") -> list[dict]:
|
||
"""解析新成员信息"""
|
||
new_members = []
|
||
try:
|
||
# 查找指定链接中的成员列表
|
||
names_link = root.find(f".//link[@name='{link_name}']")
|
||
if names_link is None:
|
||
return new_members
|
||
|
||
memberlist = names_link.find("memberlist")
|
||
|
||
if memberlist is None:
|
||
return new_members
|
||
|
||
for member in memberlist.findall("member"):
|
||
username = member.find("username").text
|
||
nickname = member.find("nickname").text
|
||
new_members.append({
|
||
"wxid": username,
|
||
"nickname": nickname
|
||
})
|
||
|
||
except Exception as e:
|
||
self.LOG.warning(f"解析新成员信息失败: {e}")
|
||
|
||
return new_members
|