Files
abot/plugins/group_member_change/main.py
liuwei d4b7cb32f6 feat(群级配置): 新增MySQL+Redis持久缓存并接入进群欢迎差异化配置
新增群级插件配置表与服务层,采用MySQL持久化+Redis长期缓存(TTL=-1);后台新增群级插件配置管理页面与API,支持按群按插件维护JSON配置并在修改后同步回填MySQL和刷新Redis;已将群成员变更监控插件接入该配置,支持欢迎文案与卡片URL等按群差异化。
2026-04-20 10:42:46 +08:00

330 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import xml.etree.ElementTree as ET
from datetime import datetime
from typing import Dict, Any, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.connection import DBConnectionManager
from db.contacts_db import ContactsDBOperator
from utils.group_plugin_config_service import GroupPluginConfigService
from utils.robot_cmd.robot_command import PermissionStatus, GroupBotManager
from utils.wechat.contact_manager import ContactManager
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.appmsg_xml import LINK_XML_NORMAL, LINK_XML_WELCOME
class GroupMemberChangePlugin(MessagePluginInterface):
"""群成员变更监控插件"""
# 功能权限常量
FEATURE_KEY = "GROUP_MEMBER_CHANGE"
FEATURE_DESCRIPTION = "👥 群成员变更监控 [自动监控群成员变动并发送通知]"
SUPPORTED_TEMPLATE_TEXTS = {
'"$names$"加入了群聊',
'"$username$"邀请"$names$"加入了群聊',
'你邀请"$names$"加入了群聊',
'"$adder$"通过扫描"$from$"分享的二维码加入群聊',
'"$adder$"通过"$from$"的邀请二维码加入群聊'
}
@property
def name(self) -> str:
return "群成员变更监控"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "监控群成员变动并发送通知"
@property
def author(self) -> str:
return "liu.wei"
def start(self) -> bool:
"""启动插件"""
self.LOG.debug(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def __init__(self):
super().__init__()
# 注册功能权限
self.feature = self.register_feature()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG.debug(f"正在初始化 {self.name} 插件...")
# 注入群级插件配置服务(由机器人系统上下文提供):
# 1. 优先通过该服务读取“按群差异化”的欢迎文案与卡片配置;
# 2. 未配置时保持原有默认欢迎行为,确保兼容老群。
self.group_plugin_config_service: Optional[GroupPluginConfigService] = context.get("group_plugin_config_service")
self.LOG.debug(f"{self.name} 插件初始化完成")
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
roomid = str(message.get("roomid", "") or "").strip()
if not roomid or not roomid.endswith("@chatroom"):
return False
gbm: GroupBotManager = message.get("gbm")
if gbm and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False
msg_type = message.get("type")
msg_type_value = getattr(msg_type, "value", msg_type)
if str(msg_type_value) not in {"10000", "10002"}:
return False
content = message.get("content")
if hasattr(content, "clean_content"):
content = content.clean_content
content = str(content or "").strip()
if not content or "<sysmsg" not in content:
return False
full_msg = message.get("full_wx_msg")
if full_msg and not full_msg.from_group():
return False
root = self._parse_sysmsg_root(content)
if root is None:
return False
return self._is_supported_join_event(root)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理接收到的消息"""
content = message.get("content")
if hasattr(content, "clean_content"):
content = content.clean_content
content = str(content).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
root = self._parse_sysmsg_root(content)
if root is None:
return False, "非本次需要处理消息"
# 检查是否是进群消息
if root.attrib.get("type") == "sysmsgtemplate":
sys_msg_template = root.find("sysmsgtemplate")
if sys_msg_template is None:
return False, "非本次需要处理消息"
template = sys_msg_template.find("content_template")
if template is None:
return False, "非本次需要处理消息"
template_type = template.attrib.get("type")
if template_type not in ["tmpl_type_profile", "tmpl_type_profilewithrevoke"]:
return False, "非本次需要处理消息"
template_text = template.find("template").text
if '"$names$"加入了群聊' in template_text: # 直接加入群聊
new_members = self._parse_member_info(root, "names")
elif '"$username$"邀请"$names$"加入了群聊' in template_text: # 通过邀请加入群聊
new_members = self._parse_member_info(root, "names")
elif '你邀请"$names$"加入了群聊' in template_text: # 自己邀请成员加入群聊
new_members = self._parse_member_info(root, "names")
elif '"$adder$"通过扫描"$from$"分享的二维码加入群聊' in template_text: # 通过二维码加入群聊
new_members = self._parse_member_info(root, "adder")
elif '"$adder$"通过"$from$"的邀请二维码加入群聊' in template_text:
new_members = self._parse_member_info(root, "adder")
else:
self.LOG.warning(f"未知的入群方式: {template_text}")
return False, "非本次需要处理消息"
if not new_members:
return False, "非本次需要处理消息"
for member in new_members:
wxid = member["wxid"]
nickname = member["nickname"]
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
member_wxids = [wxid]
members = await bot.get_chatroom_member_detail(wxid, roomid)
head_url = members.get("SmallHeadImgUrl") or members.get("BigHeadImgUrl") or ""
welcome_cfg = self._get_group_welcome_config(roomid)
variables = {
"nickname": nickname,
"wxid": wxid,
"group_id": roomid,
"now": now,
"head_url": head_url,
}
# 文本欢迎:支持后台按群关闭和模板自定义。
if bool(welcome_cfg.get("welcome_text_enabled", True)):
welcome_text = self._safe_format(
welcome_cfg.get("welcome_text_template", "👏欢迎 {nickname} 加入群聊!🎉"),
variables
)
await bot.send_at_message(roomid, welcome_text, member_wxids)
try:
# 更新联系人信息
ContactManager.get_instance().update_head_image(wxid, head_url)
ContactManager.get_instance().update_group_members(roomid, wxid, nickname)
# 入库
contact_db: ContactsDBOperator = ContactsDBOperator(DBConnectionManager.get_instance())
member_details: List[Dict] = [members]
contact_db.save_chatroom_member_simple(roomid, member_details)
except Exception as e:
self.LOG.warning(f"新增群员信息失败: {e}")
# 欢迎卡片:支持后台按群关闭,且可配置标题/描述/URL/缩略图。
if bool(welcome_cfg.get("welcome_card_enabled", True)):
if self.group_plugin_config_service:
xml_content = self._build_custom_welcome_card_xml(welcome_cfg, variables)
else:
# 老流程兼容:当未接入配置服务时沿用原模板。
xml_content = f"{LINK_XML_WELCOME}".format(nickname=nickname, now=now, head_url=head_url)
await bot.send_link_xml_message(xml_content, roomid)
return True, "已发送进群欢迎语"
return False, "无需执行"
def _parse_sysmsg_root(self, content: str) -> Optional[ET.Element]:
xml_content = str(content or "").strip().replace("\n", "").replace("\t", "")
if not xml_content or "<sysmsg" not in xml_content:
return None
try:
root = ET.fromstring(xml_content)
except ET.ParseError:
return None
if root.tag != "sysmsg":
return None
return root
def _is_supported_join_event(self, root: ET.Element) -> bool:
if root.attrib.get("type") != "sysmsgtemplate":
return False
sys_msg_template = root.find("sysmsgtemplate")
if sys_msg_template is None:
return False
template = sys_msg_template.find("content_template")
if template is None:
return False
template_type = template.attrib.get("type")
if template_type not in {"tmpl_type_profile", "tmpl_type_profilewithrevoke"}:
return False
template_node = template.find("template")
template_text = (template_node.text or "").strip() if template_node is not None else ""
if not template_text:
return False
return any(item in template_text for item in self.SUPPORTED_TEMPLATE_TEXTS)
@property
def commands(self) -> List[str]:
"""插件支持的命令列表"""
return []
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def get_help(self) -> str:
"""获取插件帮助信息"""
return "群成员变更监控插件:自动监控群成员变动并发送通知。"
def _parse_member_info(self, root: ET.Element, link_name: str = "names") -> list[dict]:
"""解析新成员信息"""
new_members = []
try:
# 查找指定链接中的成员列表
names_link = root.find(f".//link[@name='{link_name}']")
if names_link is None:
return new_members
memberlist = names_link.find("memberlist")
if memberlist is None:
return new_members
for member in memberlist.findall("member"):
username = member.find("username").text
nickname = member.find("nickname").text
new_members.append({
"wxid": username,
"nickname": nickname
})
except Exception as e:
self.LOG.warning(f"解析新成员信息失败: {e}")
return new_members
@staticmethod
def _safe_format(template: str, variables: Dict[str, Any]) -> str:
"""安全格式化模板,缺失变量时保留原占位符。"""
text = str(template or "")
for key, value in (variables or {}).items():
text = text.replace(f"{{{key}}}", str(value or ""))
return text
def _get_group_welcome_config(self, group_id: str) -> Dict[str, Any]:
"""读取群级欢迎配置,未配置时返回默认值。"""
default_cfg = {
"welcome_text_enabled": True,
"welcome_text_template": "👏欢迎 {nickname} 加入群聊!🎉",
"welcome_card_enabled": True,
"card_title_template": "👏欢迎 {nickname} 加入群聊!🎉",
"card_desc_template": "⌚时间:{now}",
"card_url": "https://newsnow.busiyi.world/",
"card_thumb_url": "{head_url}",
}
if not self.group_plugin_config_service:
return default_cfg
try:
cfg = self.group_plugin_config_service.get_config(
group_id=group_id,
plugin_name=self.name,
config_key="welcome",
default=default_cfg,
)
if not isinstance(cfg, dict):
return default_cfg
# 默认值兜底,避免后台只配置部分字段时缺项。
return {**default_cfg, **cfg}
except Exception as e:
self.LOG.warning(f"读取群级欢迎配置失败,回退默认配置: group={group_id}, error={e}")
return default_cfg
def _build_custom_welcome_card_xml(self, cfg: Dict[str, Any], variables: Dict[str, Any]) -> str:
"""根据群级配置构建欢迎卡片 XML。"""
title = self._safe_format(cfg.get("card_title_template", ""), variables)
desc = self._safe_format(cfg.get("card_desc_template", ""), variables)
url = self._safe_format(cfg.get("card_url", ""), variables)
thumb_url = self._safe_format(cfg.get("card_thumb_url", ""), variables)
return LINK_XML_NORMAL.format(
title=title,
des=desc,
url=url,
thumburl=thumb_url,
)