feature:1.加群功能插件化;2.成员变更不监听消息,使用定时检查策略;

This commit is contained in:
liuwei
2025-03-20 14:23:09 +08:00
parent ce837d241a
commit 333a87e34c
15 changed files with 584 additions and 358 deletions

View File

@@ -5,6 +5,7 @@ from typing import Dict, Any, List, Optional, Tuple
from wcferry import Wcf
from message_util import MessageUtil
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from plugins.stats_collector.decorators import plugin_stats_decorator
@@ -40,7 +41,8 @@ class BeautyLegPlugin(MessagePluginInterface):
def __init__(self):
super().__init__()
self.image_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "beautyleg", "download_dir")
self.image_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"beautyleg", "download_dir")
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
@@ -50,7 +52,7 @@ class BeautyLegPlugin(MessagePluginInterface):
# 保存上下文对象
self.wcf = context.get("wcf")
self.event_system = context.get("event_system")
self.message_util = context.get("message_util")
self.message_util: MessageUtil = context.get("message_util")
self.gbm = context.get("gbm")
self._commands = self._config.get("Beautyleg", {}).get("command", ["美腿", "腿来"])
@@ -105,8 +107,8 @@ class BeautyLegPlugin(MessagePluginInterface):
# 获取随机图片
random_file_path = self._get_random_file_from_dir(self.image_folder)
if not random_file_path:
wcf.send_text(f"\n❌未找到美腿图片资源",
(roomid if roomid else sender), sender)
self.message_util.send_text_msg(f"\n❌未找到美腿图片资源",
(roomid if roomid else sender), sender)
return True, "未找到图片资源"
# 发送图片
@@ -147,4 +149,4 @@ class BeautyLegPlugin(MessagePluginInterface):
self.LOG.error("No image files found in the directory (including subdirectories).")
return None
return random.choice(image_files)
return random.choice(image_files)

View File

@@ -0,0 +1,7 @@
# 从当前包的main模块导入GroupAutoInvitePlugin类
from .main import GroupAutoInvitePlugin
# 提供get_plugin函数返回插件实例
def get_plugin():
"""获取插件实例"""
return GroupAutoInvitePlugin()

View File

@@ -0,0 +1,13 @@
[GroupAutoInvite]
enable = true
command = ["#加群配置", "#加群"]
command-format = """
🔧群自动邀请配置命令:
'#加群配置|add key group_id' - 添加群组ID
'#加群配置|del key group_id' - 删除群组ID
'#加群配置|get key' - 获取所有群组ID
'#加群配置|get_first key' - 获取第一个群组ID
用户加群命令:
'#加群 key' - 请求加入关键词对应的群聊
"""

View File

@@ -0,0 +1,280 @@
import logging
import redis
import re
from typing import Dict, Any, List, Optional, Tuple
from wcferry import Wcf, WxMsg
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from plugins.stats_collector.decorators import plugin_stats_decorator
from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
class GroupAutoInvitePlugin(MessagePluginInterface):
"""自动加群功能插件"""
@property
def name(self) -> str:
return "自动加群功能"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供自动加群功能,支持通过关键词邀请用户加入指定群聊"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "#加群配置|" # 使用前缀来识别命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
# Redis 中存储群组映射的前缀
self.mapping_prefix = "group:group_mapping:"
self.redis_pool = None
self.LOG = None
self.wcf = None
self.gbm = None
self.enable = True
self._commands = []
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf = context.get("wcf")
if not self.wcf:
self.LOG.error("无法获取wcf对象插件初始化失败")
return False
# 获取群管理器
self.gbm = context.get("gbm")
if not self.gbm:
self.LOG.error("无法获取群管理器对象,插件初始化失败")
return False
# 获取Redis连接池
self.redis_pool = context.get("redis_pool")
if not self.redis_pool:
self.LOG.error("无法获取Redis连接池插件初始化失败")
return False
# 从配置中获取命令和启用状态
plugin_config = self._config.get("GroupAutoInvite", {})
self._commands = plugin_config.get("command", ["#加群配置"])
self.command_format = plugin_config.get("command-format", "#加群配置|help")
self.enable = plugin_config.get("enable", True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def _get_redis_connection(self):
"""从连接池获取 Redis 连接"""
return redis.Redis(connection_pool=self.redis_pool)
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
roomid = message.get("roomid", "")
# 处理加群配置命令
if content.startswith("#加群配置|"):
return True
# 处理加群请求 - 只在私聊中处理
if re.search(r"^#加群\s+(\w+)$", content):
# 如果是在群聊中发送的加群请求,不处理
if roomid:
self.LOG.info(f"⚠️ 请私聊机器人发送 \"#加群 key\" 命令加入群聊")
return False
return True
return False
@plugin_stats_decorator(plugin_name="自动加群功能")
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf: Wcf = message.get("wcf")
gbm: GroupBotManager = message.get("gbm")
# 处理加群配置命令
if content.startswith("#加群配置|"):
return self._handle_config_command(content, sender, roomid, wcf, gbm)
# 处理加群请求
match = re.search(r"^#加群\s+(\w+)$", content)
if match:
return self._handle_join_request(match.group(1), sender, roomid, wcf, gbm)
return False, "无法处理的消息"
def _handle_config_command(self, content: str, sender: str, roomid: str, wcf: Wcf, gbm: GroupBotManager) -> Tuple[bool, Optional[str]]:
"""处理配置命令"""
# 检查是否为管理员
admin_list = self.gbm.get_admin_list()
if sender not in admin_list:
wcf.send_text("⚠️ 权限不足,只有管理员才能配置群邀请功能",
(roomid if roomid else sender), sender)
return True, "权限不足"
# 解析命令
command = content.replace("#加群配置|", "").strip()
result = self.process_command(command)
# 发送结果
wcf.send_text(result, (roomid if roomid else sender), sender)
return True, "配置命令处理成功"
def _handle_join_request(self, key: str, sender: str, roomid: str, wcf: Wcf, gbm: GroupBotManager) -> Tuple[bool, Optional[str]]:
"""处理加群请求"""
try:
# 获取对应的群ID
group_id = self.get_first_group_id(key)
# 检查是否找到群ID
if isinstance(group_id, str) and "没有关联的群ID" in group_id:
wcf.send_text(f"⚠️ 未找到关键词 '{key}' 对应的群聊", sender)
return True, "未找到群聊"
# 发送邀请
self.LOG.info(f"邀请用户 {sender} 加入群 {group_id}")
result = wcf.invite_chatroom_members(group_id, sender)
if result:
wcf.send_text(f"✅ 已发送邀请,请查看群聊邀请通知", sender)
return True, "邀请发送成功"
else:
wcf.send_text(f"❌ 邀请发送失败,请稍后再试", sender)
return True, "邀请发送失败"
except Exception as e:
self.LOG.error(f"处理加群请求出错: {e}")
wcf.send_text(f"❌ 处理加群请求出错: {e}", sender)
return True, f"处理出错: {e}"
def add_mapping(self, key, group_id):
"""添加群组ID到指定key"""
try:
if self._get_redis_connection().sismember(self.mapping_prefix + key, group_id):
return f"群ID {group_id} 已存在于关键词 {key}"
else:
self._get_redis_connection().sadd(self.mapping_prefix + key, group_id)
return f"已添加: {key} -> {group_id}"
except redis.RedisError as e:
return f"操作失败: {e}"
def del_mapping(self, key, group_id):
"""删除指定key下的某个群组ID"""
try:
if self._get_redis_connection().sismember(self.mapping_prefix + key, group_id):
self._get_redis_connection().srem(self.mapping_prefix + key, group_id)
return f"已删除: {key} -> {group_id}"
else:
return f"群ID {group_id} 不存在于关键词 {key}"
except redis.RedisError as e:
return f"操作失败: {e}"
def get_group_ids(self, key):
"""获取指定key下的所有群组ID"""
try:
group_ids = self._get_redis_connection().smembers(self.mapping_prefix + key)
if group_ids:
return f"关键词 {key} 的群ID列表: {', '.join(group_ids)}"
else:
return f"关键词 '{key}' 没有关联的群ID"
except redis.RedisError as e:
return f"操作失败: {e}"
def get_first_group_id(self, key):
"""获取指定key的第一个群组ID"""
try:
group_ids = self._get_redis_connection().smembers(self.mapping_prefix + key)
if group_ids:
first_group_id = next(iter(group_ids))
self.LOG.info(f"关键词 {key} 的第一个群ID: {first_group_id}")
return first_group_id
else:
return f"关键词 '{key}' 没有关联的群ID"
except redis.RedisError as e:
return f"操作失败: {e}"
def process_command(self, command):
"""处理命令行输入"""
command_parts = command.split()
if len(command_parts) == 0:
return "无效命令"
cmd = command_parts[0]
if cmd == "add" and len(command_parts) == 3:
key = command_parts[1]
group_id = command_parts[2]
return self.add_mapping(key, group_id)
elif cmd == "del" and len(command_parts) == 3:
key = command_parts[1]
group_id = command_parts[2]
return self.del_mapping(key, group_id)
elif cmd == "get" and len(command_parts) == 2:
key = command_parts[1]
return self.get_group_ids(key)
elif cmd == "get_first" and len(command_parts) == 2:
key = command_parts[1]
return self.get_first_group_id(key)
elif cmd == "help":
commands = (
"群自动邀请配置命令:\n"
"'#加群配置|add key group_id' - 添加群组ID\n"
"'#加群配置|del key group_id' - 删除群组ID\n"
"'#加群配置|get key' - 获取所有群组ID\n"
"'#加群配置|get_first key' - 获取第一个群组ID\n"
"\n"
"用户加群命令:\n"
"'#加群 key' - 请求加入关键词对应的群聊\n"
)
return commands
else:
return "未知命令或参数数量错误,请输入: #加群配置|help 获取帮助"
def get_help(self) -> str:
"""获取插件帮助信息"""
return """自动加群功能插件:
1. 管理员可以通过 #加群配置 命令管理群聊映射
2. 用户可以通过 #加群 关键词 命令请求加入指定群聊
3. 使用 #加群配置|help 获取详细命令说明"""

View File

@@ -0,0 +1,7 @@
# 从当前包的main模块导入GroupMemberChangePlugin类
from .main import GroupMemberChangePlugin
# 提供get_plugin函数返回插件实例
def get_plugin():
"""获取插件实例"""
return GroupMemberChangePlugin()

View File

@@ -0,0 +1,3 @@
[GroupMemberChange]
enable = true
check_interval = 30 # 检查间隔,单位为秒

View File

@@ -0,0 +1,247 @@
import logging
import threading
import time
from datetime import datetime
from typing import Dict, Any, List, Optional, Set, Tuple
from wcferry import Wcf
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
class GroupMemberChangePlugin(MessagePluginInterface):
"""群成员变更监控插件"""
@property
def name(self) -> str:
return "群成员变更监控"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "监控群成员变动并发送通知"
@property
def author(self) -> str:
return "Trae AI"
def __init__(self):
super().__init__()
self.status = PluginStatus.LOADED
self.wcf = None
self.gbm = None
self.LOG = logging.getLogger(f"Plugin.{self.name}")
# 初始化本地缓存字典,使用 group_id 作为键
self.local_members = {}
# 监控线程
self.monitor_thread = None
self.stop_flag = False
# 检查间隔时间(秒)
self.check_interval = 30
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf: Wcf = context.get("wcf")
if not self.wcf:
self.LOG.error("无法获取wcf对象插件初始化失败")
return False
# 获取群管理器
self.gbm = context.get("gbm")
if not self.gbm:
self.LOG.error("无法获取群管理器对象,插件初始化失败")
return False
# 从配置中获取启用状态和检查间隔
plugin_config = self._config.get("GroupMemberChange", {})
self.check_interval = plugin_config.get("check_interval", 30)
self.LOG.info(f"{self.name} 插件初始化完成,检查间隔: {self.check_interval}")
return True
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理接收到的消息"""
# 此插件主要通过定时任务工作,不处理消息
return False, "无需执行"
def start(self) -> bool:
"""启动插件"""
if self.status == PluginStatus.RUNNING:
self.LOG.warning(f"{self.name} 插件已经在运行中")
return True
self.stop_flag = False
# 启动监控线程
self.monitor_thread = threading.Thread(target=self._monitor_groups, daemon=True)
self.monitor_thread.start()
self.status = PluginStatus.RUNNING
self.LOG.info(f"[{self.name}] 插件已启动")
return True
def stop(self) -> bool:
"""停止插件"""
if self.status != PluginStatus.RUNNING:
self.LOG.warning(f"{self.name} 插件未在运行中")
return True
self.stop_flag = True
if self.monitor_thread and self.monitor_thread.is_alive():
self.monitor_thread.join(timeout=5)
self.status = PluginStatus.STOPPED
self.LOG.info(f"[{self.name}] 插件已停止")
return True
def _monitor_groups(self):
"""监控群成员变化的线程函数"""
self.LOG.info("群成员监控线程已启动")
while not self.stop_flag:
try:
# 获取所有启用了机器人的群组
group_list = self.gbm.get_group_list()
for group_id in group_list:
# 检查群是否启用了成员变更提醒功能
if self.gbm.get_group_permission(group_id, Feature.GROUP_MEMBER_CHANGE) == PermissionStatus.ENABLED:
self._check_group_members(group_id)
# 等待指定的时间间隔
for _ in range(self.check_interval):
if self.stop_flag:
break
time.sleep(1)
except Exception as e:
self.LOG.error(f"监控群成员变化时发生错误: {e}", exc_info=True)
time.sleep(5) # 发生错误时短暂休眠
self.LOG.info("群成员监控线程已停止")
def _check_group_members(self, group_id: str):
"""检查指定群的成员变化"""
try:
# 获取当前群成员
current_members = self.wcf.get_chatroom_members(group_id)
# 如果是首次检查该群
if group_id not in self.local_members:
self.LOG.info(f"首次检查群 {group_id},记录当前成员")
self.local_members[group_id] = current_members
return
# 获取上次记录的成员
previous_members = self.local_members[group_id]
# 比较成员变化
current_member_ids = set(current_members.keys())
previous_member_ids = set(previous_members.keys())
# 找出退群的成员
left_members = previous_member_ids - current_member_ids
# 找出新加入的成员
joined_members = current_member_ids - previous_member_ids
# 如果有成员变化
if left_members or joined_members:
self.LOG.info(f"{group_id} 成员发生变化: {len(joined_members)}人加入, {len(left_members)}人退出")
# 处理退群成员
for wxid in left_members:
nickname = previous_members[wxid]
member_info = self._get_member_info(wxid, nickname)
self._send_leave_notification(group_id, member_info)
# 处理新加入成员
for wxid in joined_members:
nickname = current_members[wxid]
# 可以在这里添加欢迎新成员的代码
# 更新本地缓存
self.local_members[group_id] = current_members
except Exception as e:
self.LOG.error(f"检查群 {group_id} 成员变化时发生错误: {e}", exc_info=True)
def _get_member_info(self, wxid: str, nickname: str) -> Dict[str, str]:
"""获取成员详细信息"""
try:
# 尝试获取成员详细信息
member_info = {
"wxid": wxid,
"nickname": nickname,
"remark": "",
"gender": "未知",
"region": "未知"
}
# 尝试获取用户详情
user_info = self.wcf.get_user_info(wxid)
if user_info:
member_info["remark"] = user_info.get("remark", "")
# 性别转换
gender_code = user_info.get("gender", 0)
if gender_code == 1:
member_info["gender"] = ""
elif gender_code == 2:
member_info["gender"] = ""
# 地区信息
country = user_info.get("country", "")
province = user_info.get("province", "")
city = user_info.get("city", "")
if country or province or city:
member_info["region"] = f"{country} {province} {city}".strip()
return member_info
except Exception as e:
self.LOG.error(f"获取成员 {wxid} 信息时发生错误: {e}")
return {
"wxid": wxid,
"nickname": nickname,
"remark": "",
"gender": "未知",
"region": "未知"
}
def _send_leave_notification(self, group_id: str, member_info: Dict[str, str]):
"""发送成员退群通知"""
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
message = f"""【退群提醒】
用户: {member_info['nickname']} ({member_info['gender']})
群备注: {member_info['remark']}
微信号: {member_info['wxid']}
地区: {member_info['region']}
退群时间: {now_time}
"""
self.wcf.send_text(message, group_id)
self.LOG.info(f"已发送退群通知: {member_info['nickname']} 退出群 {group_id}")
@property
def commands(self) -> List[str]:
"""插件支持的命令列表"""
return []
def get_help(self) -> str:
"""获取插件帮助信息"""
return "群成员变更监控插件:自动监控群成员变动并发送通知。"

View File

@@ -5,6 +5,7 @@ from typing import Dict, Any, List, Optional, Tuple
from wcferry import Wcf
from message_util import MessageUtil
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from plugins.stats_collector.decorators import plugin_stats_decorator
@@ -51,13 +52,13 @@ class VideoManPlugin(MessagePluginInterface):
# 保存上下文对象
self.wcf = context.get("wcf")
self.event_system = context.get("event_system")
self.message_util = context.get("message_util")
self.message_util: MessageUtil = context.get("message_util")
self.gbm = context.get("gbm")
self._commands = self._config.get("VideoMan", {}).get("command", ["猛男", "肌肉", "帅哥"])
self.command_format = self._config.get("VideoMan", {}).get("command-format", "猛男")
self.enable = self._config.get("VideoMan", {}).get("enable", True)
# 确保下载目录存在
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir, exist_ok=True)
@@ -104,10 +105,10 @@ class VideoManPlugin(MessagePluginInterface):
try:
# 下载视频
file_abspath = self._download_video("https://api.guiguiya.com/api/video/fuji?type=json")
if not file_abspath:
wcf.send_text(f"\n❌视频下载失败,请稍后再试",
(roomid if roomid else sender), sender)
self.message_util.send_text_msg(f"\n❌视频下载失败,请稍后再试",
(roomid if roomid else sender), sender)
return True, "视频下载失败"
# 发送视频
@@ -117,8 +118,6 @@ class VideoManPlugin(MessagePluginInterface):
except Exception as e:
self.LOG.error(f"处理视频请求出错: {e}")
wcf.send_text(f"\n❌请求出错:{e}",
(roomid if roomid else sender), sender)
return True, f"处理出错: {e}"
def _download_video(self, api_url):
@@ -130,43 +129,43 @@ class VideoManPlugin(MessagePluginInterface):
# 确保下载目录存在
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir, exist_ok=True)
save_path = os.path.join(self.download_dir, "video.mp4")
try:
# 获取视频URL
response = requests.get(api_url)
if response.status_code != 200:
self.LOG.error(f"API请求失败HTTP状态码: {response.status_code}")
return None
data = response.json()
video_url = data.get("url")
if not video_url:
self.LOG.error("API响应中没有找到视频URL")
return None
# 下载视频
video_response = requests.get(video_url, stream=True)
if video_response.status_code != 200:
self.LOG.error(f"无法下载视频HTTP状态码: {video_response.status_code}")
return None
# 保存视频
with open(save_path, "wb") as file:
for chunk in video_response.iter_content(chunk_size=1024):
if chunk: # 过滤空块
file.write(chunk)
abs_path = os.path.abspath(save_path)
self.LOG.info(f"视频已下载至: {abs_path}")
return abs_path
except requests.RequestException as e:
self.LOG.error(f"请求失败: {e}")
except IOError as e:
self.LOG.error(f"文件写入失败: {e}")
except Exception as e:
self.LOG.error(f"发生未知错误: {e}")
return None
return None