Files
2025-12-03 15:48:44 +08:00

364 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
入群欢迎插件
当新成员加入群聊时,发送欢迎卡片
支持两种方式:
1. type=11098 群成员新增事件(如果 API 支持)
2. type=11058 系统消息解析(通用方式)
"""
import tomllib
from pathlib import Path
from loguru import logger
from utils.plugin_base import PluginBase
from utils.decorators import on_text_message
# 定义事件装饰器
def on_chatroom_member_add(priority=50):
"""群成员新增装饰器"""
def decorator(func):
setattr(func, '_event_type', 'chatroom_member_add')
setattr(func, '_priority', min(max(priority, 0), 99))
return func
return decorator
def on_system_message(priority=50):
"""系统消息装饰器"""
def decorator(func):
setattr(func, '_event_type', 'system_message')
setattr(func, '_priority', min(max(priority, 0), 99))
return func
return decorator
def on_chatroom_info_change(priority=50):
"""群信息变化装饰器"""
def decorator(func):
setattr(func, '_event_type', 'chatroom_info_change')
setattr(func, '_priority', min(max(priority, 0), 99))
return func
return decorator
class GroupWelcome(PluginBase):
"""入群欢迎插件"""
# 插件元数据
description = "新成员入群时发送欢迎卡片"
author = "ShiHao"
version = "1.0.0"
def __init__(self):
super().__init__()
self.config = None
# 群成员缓存:{room_wxid: set(member_wxids)}
self.member_cache = {}
async def async_init(self):
"""插件异步初始化"""
# 读取配置
config_path = Path(__file__).parent / "config.toml"
with open(config_path, "rb") as f:
self.config = tomllib.load(f)
logger.info("入群欢迎插件已加载")
@on_chatroom_member_add(priority=50)
async def on_chatroom_member_add(self, bot, message: dict):
"""处理群成员新增事件type=11098"""
logger.info(f"[GroupWelcome] 收到群成员新增事件,原始消息: {message}")
# 检查是否启用
if not self.config["behavior"]["enabled"]:
logger.warning("[GroupWelcome] 插件未启用,跳过处理")
return
room_wxid = message.get("RoomWxid", "")
member_list = message.get("MemberList", [])
logger.info(f"[GroupWelcome] 解析结果 - 群ID: {room_wxid}, 成员列表: {member_list}")
# 检查群聊过滤
if not self._should_welcome(room_wxid):
logger.info(f"[GroupWelcome] 群 {room_wxid} 不在欢迎列表中,跳过")
return
logger.success(f"[GroupWelcome] 群 {room_wxid} 有新成员加入: {len(member_list)}")
await self._process_new_members(bot, room_wxid, member_list)
@on_system_message(priority=50)
async def on_system_message(self, bot, message: dict):
"""处理系统消息type=11058解析入群通知"""
# 检查是否启用
if not self.config["behavior"]["enabled"]:
return
raw_msg = message.get("Content", "")
room_wxid = message.get("FromWxid", "")
# 只处理群聊系统消息
if not room_wxid.endswith("@chatroom"):
return
logger.info(f"[GroupWelcome] 收到系统消息: room={room_wxid}, msg={raw_msg}")
# 解析入群消息
# 格式1: "你邀请xxxx加入了群聊"
# 格式2: "xxxx邀请xxxx加入了群聊"
# 格式3: "xxxxx通过扫描你分享的二维码加入群聊"
# 格式4: "xxxxx通过扫描xxxxxx分享的二维码加入群聊"
import re
# 匹配入群消息
patterns = [
r"(.+?)通过扫描.+?二维码加入群聊", # 扫码入群
r"(?:你|.+?)邀请(.+?)加入了群聊", # 邀请入群
]
for pattern in patterns:
match = re.search(pattern, raw_msg)
if match:
nickname = match.group(1).strip()
logger.success(f"[GroupWelcome] 从系统消息解析到新成员: {nickname}")
# 检查群聊过滤
if not self._should_welcome(room_wxid):
logger.info(f"[GroupWelcome] 群 {room_wxid} 不在欢迎列表中,跳过")
return
# 构造成员列表(只有昵称,没有 wxid
member_list = [{"wxid": "", "nickname": nickname}]
await self._process_new_members(bot, room_wxid, member_list)
break
@on_chatroom_info_change(priority=50)
async def on_chatroom_info_change(self, bot, message: dict):
"""处理群信息变化事件type=11100用于扫码加群等场景"""
# 检查是否启用
if not self.config["behavior"]["enabled"]:
return
room_wxid = message.get("RoomWxid", "")
total_member = message.get("TotalMember", 0)
member_list = message.get("MemberList", [])
logger.info(f"[GroupWelcome] 收到群信息变化事件: room={room_wxid}, total_member={total_member}, has_member_list={len(member_list) > 0}")
# 检查群聊过滤
if not self._should_welcome(room_wxid):
return
# 如果有 member_list直接处理群创建或某些情况
if member_list:
logger.info(f"[GroupWelcome] 群信息变化事件包含成员列表,直接处理")
await self._process_new_members(bot, room_wxid, member_list)
return
# 没有 member_list需要通过对比缓存找出新成员
try:
# 获取当前群成员列表
current_members = await bot.get_chatroom_members(room_wxid)
if not current_members:
logger.warning(f"[GroupWelcome] 无法获取群成员列表: {room_wxid}")
return
current_wxids = {m.get("wxid") for m in current_members if m.get("wxid")}
# 如果缓存中没有这个群,初始化缓存(首次)
if room_wxid not in self.member_cache:
logger.info(f"[GroupWelcome] 首次记录群成员: {room_wxid}, 成员数: {len(current_wxids)}")
self.member_cache[room_wxid] = current_wxids
return
# 对比找出新成员
cached_wxids = self.member_cache[room_wxid]
new_wxids = current_wxids - cached_wxids
if new_wxids:
logger.success(f"[GroupWelcome] 检测到新成员: {len(new_wxids)}")
# 构造新成员列表
new_members = [m for m in current_members if m.get("wxid") in new_wxids]
# 更新缓存
self.member_cache[room_wxid] = current_wxids
# 处理新成员
await self._process_new_members(bot, room_wxid, new_members)
else:
# 成员数量没有增加,可能是其他信息变化
logger.debug(f"[GroupWelcome] 群信息变化但无新成员: {room_wxid}")
# 更新缓存(可能有成员退出)
self.member_cache[room_wxid] = current_wxids
except Exception as e:
logger.error(f"[GroupWelcome] 处理群信息变化失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
async def _process_new_members(self, bot, room_wxid: str, member_list: list):
"""处理新成员列表"""
# 为每个新成员发送欢迎卡片
for member in member_list:
wxid = member.get("wxid", "")
nickname = member.get("nickname", "新成员")
try:
# 如果有 wxid尝试获取详细信息包括头像
if wxid:
user_info = await bot.get_user_info_in_chatroom(room_wxid, wxid)
if user_info:
# 提取头像 URL
big_head_img_url = user_info.get("bigHeadImgUrl", "")
actual_nickname = user_info.get("nickName", {}).get("string", nickname)
logger.info(f"获取到新成员信息: {actual_nickname} ({wxid}), 头像: {big_head_img_url}")
# 发送欢迎卡片
await self._send_welcome_card(
bot, room_wxid, actual_nickname, big_head_img_url
)
else:
logger.warning(f"无法获取新成员 {wxid} 的详细信息,使用默认配置")
# 使用默认配置发送
await self._send_welcome_card(bot, room_wxid, nickname, "")
else:
# 没有 wxid从系统消息解析尝试通过昵称匹配获取 wxid 和头像
logger.info(f"[GroupWelcome] 尝试通过昵称匹配获取用户信息: {nickname}")
# 获取群成员列表(可能需要等待成员真正加入)
members = await bot.get_chatroom_members(room_wxid)
# 如果没找到等待1秒后重试一次
if not members or not any(nickname.strip('"') == m.get("nickname", "") for m in members):
import asyncio
await asyncio.sleep(1)
members = await bot.get_chatroom_members(room_wxid)
if members:
# 通过昵称或群内昵称匹配
matched_member = None
for member in members:
member_nickname = member.get("nickname", "")
member_display_name = member.get("display_name", "")
# 匹配昵称(去除引号)
if nickname.strip('"') == member_nickname or nickname.strip('"') == member_display_name:
matched_member = member
break
if matched_member:
member_wxid = matched_member.get("wxid", "")
member_nickname = matched_member.get("nickname", nickname)
logger.success(f"[GroupWelcome] 匹配成功: {member_nickname} ({member_wxid})")
# 获取成员详细信息(包括头像)
try:
user_info = await bot.get_user_info_in_chatroom(room_wxid, member_wxid)
if user_info:
member_avatar = user_info.get("bigHeadImgUrl", "")
logger.info(f"[GroupWelcome] 获取到成员头像: {member_avatar}")
await self._send_welcome_card(bot, room_wxid, member_nickname, member_avatar)
else:
logger.warning(f"[GroupWelcome] 无法获取成员详细信息,使用默认头像")
await self._send_welcome_card(bot, room_wxid, member_nickname, "")
except Exception as e:
logger.error(f"[GroupWelcome] 获取成员详细信息失败: {e}")
await self._send_welcome_card(bot, room_wxid, member_nickname, "")
else:
logger.warning(f"[GroupWelcome] 未找到匹配的成员: {nickname}")
await self._send_welcome_card(bot, room_wxid, nickname, "")
else:
logger.warning(f"[GroupWelcome] 无法获取群成员列表")
await self._send_welcome_card(bot, room_wxid, nickname, "")
except Exception as e:
logger.error(f"处理新成员 {nickname} 欢迎失败: {e}")
async def _send_welcome_card(
self, bot, room_wxid: str, nickname: str, image_url: str
):
"""发送欢迎卡片"""
welcome_config = self.config["welcome"]
# 替换变量
title = welcome_config["title"].replace("{nickname}", nickname)
desc = welcome_config["desc"].replace("{nickname}", nickname)
url = welcome_config["url"]
# 使用新成员的头像作为卡片图片
card_image_url = image_url if image_url else ""
try:
await bot.send_link_card(
to_wxid=room_wxid,
title=title,
desc=desc,
url=url,
image_url=card_image_url,
)
logger.success(f"已向 {nickname} 发送欢迎卡片")
except Exception as e:
logger.error(f"发送欢迎卡片失败: {e}")
def _should_welcome(self, room_wxid: str) -> bool:
"""判断是否应该发送欢迎"""
enabled_groups = self.config["behavior"]["enabled_groups"]
disabled_groups = self.config["behavior"]["disabled_groups"]
# 如果在禁用列表中,不欢迎
if room_wxid in disabled_groups:
return False
# 如果启用列表为空,对所有群生效
if not enabled_groups:
return True
# 否则只对启用列表中的群生效
return room_wxid in enabled_groups
@on_text_message(priority=90)
async def handle_test_command(self, bot, message: dict):
"""处理测试命令"""
content = message.get("Content", "").strip()
from_wxid = message.get("FromWxid", "")
is_group = message.get("IsGroup", False)
# 只处理群聊消息
if not is_group:
return
# 检查是否是测试入群命令
if content.startswith("/测试入群"):
parts = content.split()
if len(parts) < 2:
await bot.send_text(from_wxid, "用法: /测试入群 wxid")
return False
test_wxid = parts[1].strip()
logger.info(f"收到测试入群命令: wxid={test_wxid}")
try:
# 模拟群成员新增事件
test_message = {
"RoomWxid": from_wxid,
"MemberList": [{"wxid": test_wxid, "nickname": "测试用户"}],
}
await self.on_chatroom_member_add(bot, test_message)
await bot.send_text(from_wxid, f"已触发入群欢迎测试: {test_wxid}")
except Exception as e:
logger.error(f"测试入群欢迎失败: {e}")
await bot.send_text(from_wxid, f"测试失败: {e}")
return False # 阻止后续处理