Files
abot/plugins/group_auto_invite/main.py
2026-01-16 13:34:37 +08:00

284 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import redis
import re
from typing import Dict, Any, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
from utils.wechat.contact_manager import ContactManager
from wechat_ipad import WechatAPIClient
class GroupAutoInvitePlugin(MessagePluginInterface):
"""自动加群功能插件"""
# 功能权限常量
FEATURE_KEY = "GROUP_AUTO_INVITE"
FEATURE_DESCRIPTION = "🤝 自动加群功能 [#加群配置, #加群]"
@property
def name(self) -> str:
return "自动加群功能"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供自动加群功能,支持通过关键词邀请用户加入指定群聊"
@property
def author(self) -> str:
return "liu.wei"
@property
def command_prefix(self) -> Optional[str]:
return "#加群配置|" # 使用前缀来识别命令
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
# Redis 中存储群组映射的前缀
self.mapping_prefix = "group:group_mapping:"
self._commands = []
self.bot: WechatAPIClient = None
# 注册功能权限
self.feature = self.register_feature()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG.debug(f"正在初始化 {self.name} 插件...")
# 获取Redis连接池
self.redis_pool = context.get("redis_pool")
# 从配置中获取命令和启用状态
plugin_config = self._config.get("GroupAutoInvite", {})
self._commands = plugin_config.get("command", ["#加群配置"])
self.command_format = plugin_config.get("command-format", "#加群配置|help")
self.enable = plugin_config.get("enable", True)
self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.debug(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def _get_redis_connection(self):
"""从连接池获取 Redis 连接"""
return redis.Redis(connection_pool=self.redis_pool)
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
roomid = message.get("roomid", "")
if GroupBotManager.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False
# 处理加群配置命令
if content.startswith("#加群配置|"):
return True
# 处理加群请求 - 只在私聊中处理
if re.search(r"^#加群\s+(\w+)$", content):
# 如果是在群聊中发送的加群请求,不处理
if roomid:
self.LOG.info(f"⚠️ 请私聊机器人发送 \"#加群 key\" 命令加入群聊")
return False
return True
return False
@plugin_stats_decorator(plugin_name="自动加群功能")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
self.bot: WechatAPIClient = message.get("bot")
# 处理加群配置命令
if content.startswith("#加群配置|"):
return await self._handle_config_command(content, sender, roomid)
# 处理加群请求
match = re.search(r"^#加群\s+(\w+)$", content)
if match:
return await self._handle_join_request(match.group(1), sender, roomid)
return False, "无法处理的消息"
async def _handle_config_command(self, content: str, sender: str, roomid: str) -> Tuple[
bool, Optional[str]]:
"""处理配置命令"""
# 检查是否为管理员
admin_list = GroupBotManager.get_admin_list()
if sender not in admin_list:
await self.bot.send_text_message((roomid if roomid else sender), "⚠️ 权限不足,只有管理员才能配置群邀请功能",
sender)
return True, "权限不足"
# 解析命令
command = content.replace("#加群配置|", "").strip()
result = self.process_command(command)
# 发送结果
await self.bot.send_text_message((roomid if roomid else sender), result, sender)
return True, "配置命令处理成功"
async def _handle_join_request(self, key: str, sender: str, roomid: str) -> Tuple[
bool, Optional[str]]:
"""处理加群请求"""
try:
# 获取对应的群ID
group_id = self.get_first_group_id(key)
# 检查是否找到群ID
if isinstance(group_id, str) and "没有关联的群ID" in group_id:
await self.bot.send_text_message(sender, f"⚠️ 未找到关键词 '{key}' 对应的群聊")
return True, "未找到群聊"
# 判断是否在群里面,如果在,则不添加
con = ContactManager.get_instance()
members = con.get_group_members(group_id)
# 如果在群里面,则不添加
if sender in members:
await self.bot.send_text_message(sender, f"⚠️ 你已经在群聊中了,无需重复添加")
return True, "你已经在群聊中了"
# 发送邀请
self.LOG.info(f"邀请用户 {sender} 加入群 {group_id}")
result = await self.bot.invite_chatroom_member(sender, group_id)
if result:
await self.bot.send_text_message(sender, f"✅ 已发送邀请,请查看群聊邀请通知")
return True, "邀请发送成功"
else:
await self.bot.send_text_message(sender, f"❌ 邀请发送失败,请稍后再试")
return False, "邀请发送失败"
except Exception as e:
self.LOG.error(f"处理加群请求出错: {e}")
await self.bot.send_text_message(sender, f"❌ 处理加群请求出错: {e}")
return False, f"处理出错: {e}"
def add_mapping(self, key, group_id):
"""添加群组ID到指定key"""
try:
if self._get_redis_connection().sismember(self.mapping_prefix + key, group_id):
return f"群ID {group_id} 已存在于关键词 {key}"
else:
self._get_redis_connection().sadd(self.mapping_prefix + key, group_id)
return f"已添加: {key} -> {group_id}"
except redis.RedisError as e:
return f"操作失败: {e}"
def del_mapping(self, key, group_id):
"""删除指定key下的某个群组ID"""
try:
if self._get_redis_connection().sismember(self.mapping_prefix + key, group_id):
self._get_redis_connection().srem(self.mapping_prefix + key, group_id)
return f"已删除: {key} -> {group_id}"
else:
return f"群ID {group_id} 不存在于关键词 {key}"
except redis.RedisError as e:
return f"操作失败: {e}"
def get_group_ids(self, key):
"""获取指定key下的所有群组ID"""
try:
group_ids = self._get_redis_connection().smembers(self.mapping_prefix + key)
if group_ids:
return f"关键词 {key} 的群ID列表: {', '.join(group_ids)}"
else:
return f"关键词 '{key}' 没有关联的群ID"
except redis.RedisError as e:
return f"操作失败: {e}"
def get_first_group_id(self, key):
"""获取指定key的第一个群组ID"""
try:
group_ids = self._get_redis_connection().smembers(self.mapping_prefix + key)
if group_ids:
first_group_id = next(iter(group_ids))
self.LOG.info(f"关键词 {key} 的第一个群ID: {first_group_id}")
return first_group_id
else:
return f"关键词 '{key}' 没有关联的群ID"
except redis.RedisError as e:
return f"操作失败: {e}"
def process_command(self, command):
"""处理命令行输入"""
command_parts = command.split()
if len(command_parts) == 0:
return "无效命令"
cmd = command_parts[0]
if cmd == "add" and len(command_parts) == 3:
key = command_parts[1]
group_id = command_parts[2]
return self.add_mapping(key, group_id)
elif cmd == "del" and len(command_parts) == 3:
key = command_parts[1]
group_id = command_parts[2]
return self.del_mapping(key, group_id)
elif cmd == "get" and len(command_parts) == 2:
key = command_parts[1]
return self.get_group_ids(key)
elif cmd == "get_first" and len(command_parts) == 2:
key = command_parts[1]
return self.get_first_group_id(key)
elif cmd == "help":
commands = (
"群自动邀请配置命令:\n"
"'#加群配置|add key group_id' - 添加群组ID\n"
"'#加群配置|del key group_id' - 删除群组ID\n"
"'#加群配置|get key' - 获取所有群组ID\n"
"'#加群配置|get_first key' - 获取第一个群组ID\n"
"\n"
"用户加群命令:\n"
"'#加群 key' - 请求加入关键词对应的群聊\n"
)
return commands
else:
return "未知命令或参数数量错误,请输入: #加群配置|help 获取帮助"
def get_help(self) -> str:
"""获取插件帮助信息"""
return """自动加群功能插件:
1. 管理员可以通过 #加群配置 命令管理群聊映射
2. 用户可以通过 #加群 关键词 命令请求加入指定群聊
3. 使用 #加群配置|help 获取详细命令说明"""