284 lines
11 KiB
Python
284 lines
11 KiB
Python
import redis
|
||
import re
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from utils.decorator.plugin_decorators import plugin_stats_decorator
|
||
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
|
||
from utils.wechat.contact_manager import ContactManager
|
||
from wechat_ipad import WechatAPIClient
|
||
|
||
|
||
class GroupAutoInvitePlugin(MessagePluginInterface):
|
||
"""自动加群功能插件"""
|
||
|
||
# 功能权限常量
|
||
FEATURE_KEY = "GROUP_AUTO_INVITE"
|
||
FEATURE_DESCRIPTION = "🤝 自动加群功能 [#加群配置, #加群]"
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "自动加群功能"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "提供自动加群功能,支持通过关键词邀请用户加入指定群聊"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "liu.wei"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return "#加群配置|" # 使用前缀来识别命令
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return self._commands
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
# Redis 中存储群组映射的前缀
|
||
self.mapping_prefix = "group:group_mapping:"
|
||
self._commands = []
|
||
self.bot: WechatAPIClient = None
|
||
# 注册功能权限
|
||
self.feature = self.register_feature()
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG.debug(f"正在初始化 {self.name} 插件...")
|
||
|
||
# 获取Redis连接池
|
||
self.redis_pool = context.get("redis_pool")
|
||
|
||
# 从配置中获取命令和启用状态
|
||
plugin_config = self._config.get("GroupAutoInvite", {})
|
||
self._commands = plugin_config.get("command", ["#加群配置"])
|
||
self.command_format = plugin_config.get("command-format", "#加群配置|help")
|
||
self.enable = plugin_config.get("enable", True)
|
||
|
||
self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.debug(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def _get_redis_connection(self):
|
||
"""从连接池获取 Redis 连接"""
|
||
return redis.Redis(connection_pool=self.redis_pool)
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
if not self.enable:
|
||
return False
|
||
|
||
content = str(message.get("content", "")).strip()
|
||
roomid = message.get("roomid", "")
|
||
|
||
if GroupBotManager.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
|
||
return False
|
||
|
||
# 处理加群配置命令
|
||
if content.startswith("#加群配置|"):
|
||
return True
|
||
|
||
# 处理加群请求 - 只在私聊中处理
|
||
if re.search(r"^#加群\s+(\w+)$", content):
|
||
# 如果是在群聊中发送的加群请求,不处理
|
||
if roomid:
|
||
self.LOG.info(f"⚠️ 请私聊机器人发送 \"#加群 key\" 命令加入群聊")
|
||
return False
|
||
return True
|
||
return False
|
||
|
||
@plugin_stats_decorator(plugin_name="自动加群功能")
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理消息"""
|
||
content = str(message.get("content", "")).strip()
|
||
self.LOG.debug(f"插件执行: {self.name}:{content}")
|
||
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
|
||
self.bot: WechatAPIClient = message.get("bot")
|
||
# 处理加群配置命令
|
||
if content.startswith("#加群配置|"):
|
||
return await self._handle_config_command(content, sender, roomid)
|
||
|
||
# 处理加群请求
|
||
match = re.search(r"^#加群\s+(\w+)$", content)
|
||
if match:
|
||
return await self._handle_join_request(match.group(1), sender, roomid)
|
||
|
||
return False, "无法处理的消息"
|
||
|
||
async def _handle_config_command(self, content: str, sender: str, roomid: str) -> Tuple[
|
||
bool, Optional[str]]:
|
||
"""处理配置命令"""
|
||
# 检查是否为管理员
|
||
admin_list = GroupBotManager.get_admin_list()
|
||
if sender not in admin_list:
|
||
await self.bot.send_text_message((roomid if roomid else sender), "⚠️ 权限不足,只有管理员才能配置群邀请功能",
|
||
sender)
|
||
return True, "权限不足"
|
||
|
||
# 解析命令
|
||
command = content.replace("#加群配置|", "").strip()
|
||
result = self.process_command(command)
|
||
|
||
# 发送结果
|
||
await self.bot.send_text_message((roomid if roomid else sender), result, sender)
|
||
return True, "配置命令处理成功"
|
||
|
||
async def _handle_join_request(self, key: str, sender: str, roomid: str) -> Tuple[
|
||
bool, Optional[str]]:
|
||
"""处理加群请求"""
|
||
try:
|
||
# 获取对应的群ID
|
||
group_id = self.get_first_group_id(key)
|
||
|
||
# 检查是否找到群ID
|
||
if isinstance(group_id, str) and "没有关联的群ID" in group_id:
|
||
await self.bot.send_text_message(sender, f"⚠️ 未找到关键词 '{key}' 对应的群聊")
|
||
return True, "未找到群聊"
|
||
# 判断是否在群里面,如果在,则不添加
|
||
con = ContactManager.get_instance()
|
||
members = con.get_group_members(group_id)
|
||
# 如果在群里面,则不添加
|
||
if sender in members:
|
||
await self.bot.send_text_message(sender, f"⚠️ 你已经在群聊中了,无需重复添加")
|
||
return True, "你已经在群聊中了"
|
||
# 发送邀请
|
||
self.LOG.info(f"邀请用户 {sender} 加入群 {group_id}")
|
||
result = await self.bot.invite_chatroom_member(sender, group_id)
|
||
|
||
if result:
|
||
await self.bot.send_text_message(sender, f"✅ 已发送邀请,请查看群聊邀请通知")
|
||
return True, "邀请发送成功"
|
||
else:
|
||
await self.bot.send_text_message(sender, f"❌ 邀请发送失败,请稍后再试")
|
||
return False, "邀请发送失败"
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"处理加群请求出错: {e}")
|
||
await self.bot.send_text_message(sender, f"❌ 处理加群请求出错: {e}")
|
||
return False, f"处理出错: {e}"
|
||
|
||
def add_mapping(self, key, group_id):
|
||
"""添加群组ID到指定key"""
|
||
try:
|
||
if self._get_redis_connection().sismember(self.mapping_prefix + key, group_id):
|
||
return f"群ID {group_id} 已存在于关键词 {key} 中"
|
||
else:
|
||
self._get_redis_connection().sadd(self.mapping_prefix + key, group_id)
|
||
return f"已添加: {key} -> {group_id}"
|
||
except redis.RedisError as e:
|
||
return f"操作失败: {e}"
|
||
|
||
def del_mapping(self, key, group_id):
|
||
"""删除指定key下的某个群组ID"""
|
||
try:
|
||
if self._get_redis_connection().sismember(self.mapping_prefix + key, group_id):
|
||
self._get_redis_connection().srem(self.mapping_prefix + key, group_id)
|
||
return f"已删除: {key} -> {group_id}"
|
||
else:
|
||
return f"群ID {group_id} 不存在于关键词 {key} 中"
|
||
except redis.RedisError as e:
|
||
return f"操作失败: {e}"
|
||
|
||
def get_group_ids(self, key):
|
||
"""获取指定key下的所有群组ID"""
|
||
try:
|
||
group_ids = self._get_redis_connection().smembers(self.mapping_prefix + key)
|
||
if group_ids:
|
||
return f"关键词 {key} 的群ID列表: {', '.join(group_ids)}"
|
||
else:
|
||
return f"关键词 '{key}' 没有关联的群ID"
|
||
except redis.RedisError as e:
|
||
return f"操作失败: {e}"
|
||
|
||
def get_first_group_id(self, key):
|
||
"""获取指定key的第一个群组ID"""
|
||
try:
|
||
group_ids = self._get_redis_connection().smembers(self.mapping_prefix + key)
|
||
if group_ids:
|
||
first_group_id = next(iter(group_ids))
|
||
self.LOG.info(f"关键词 {key} 的第一个群ID: {first_group_id}")
|
||
return first_group_id
|
||
else:
|
||
return f"关键词 '{key}' 没有关联的群ID"
|
||
except redis.RedisError as e:
|
||
return f"操作失败: {e}"
|
||
|
||
def process_command(self, command):
|
||
"""处理命令行输入"""
|
||
command_parts = command.split()
|
||
if len(command_parts) == 0:
|
||
return "无效命令"
|
||
|
||
cmd = command_parts[0]
|
||
|
||
if cmd == "add" and len(command_parts) == 3:
|
||
key = command_parts[1]
|
||
group_id = command_parts[2]
|
||
return self.add_mapping(key, group_id)
|
||
|
||
elif cmd == "del" and len(command_parts) == 3:
|
||
key = command_parts[1]
|
||
group_id = command_parts[2]
|
||
return self.del_mapping(key, group_id)
|
||
|
||
elif cmd == "get" and len(command_parts) == 2:
|
||
key = command_parts[1]
|
||
return self.get_group_ids(key)
|
||
|
||
elif cmd == "get_first" and len(command_parts) == 2:
|
||
key = command_parts[1]
|
||
return self.get_first_group_id(key)
|
||
|
||
elif cmd == "help":
|
||
commands = (
|
||
"群自动邀请配置命令:\n"
|
||
"'#加群配置|add key group_id' - 添加群组ID\n"
|
||
"'#加群配置|del key group_id' - 删除群组ID\n"
|
||
"'#加群配置|get key' - 获取所有群组ID\n"
|
||
"'#加群配置|get_first key' - 获取第一个群组ID\n"
|
||
"\n"
|
||
"用户加群命令:\n"
|
||
"'#加群 key' - 请求加入关键词对应的群聊\n"
|
||
)
|
||
return commands
|
||
|
||
else:
|
||
return "未知命令或参数数量错误,请输入: #加群配置|help 获取帮助"
|
||
|
||
def get_help(self) -> str:
|
||
"""获取插件帮助信息"""
|
||
return """自动加群功能插件:
|
||
1. 管理员可以通过 #加群配置 命令管理群聊映射
|
||
2. 用户可以通过 #加群 关键词 命令请求加入指定群聊
|
||
3. 使用 #加群配置|help 获取详细命令说明"""
|