from typing import Dict, Any, List, Optional, Tuple from message_util import MessageUtil from plugin_common.message_plugin_interface import MessagePluginInterface from plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.wechat.contact_manager import ContactManager from db.connection import DBConnectionManager from db.group_virtual_redis import GroupVirtualRedisDB from wechat_ipad.models.message import WxMessage class GroupVirtualPlugin(MessagePluginInterface): """跨群聊天插件""" @property def name(self) -> str: return "跨群聊天" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "将多个微信群组合成一个虚拟聊天组,实现群组间消息的自动转发" @property def author(self) -> str: return "Trae AI" @property def command_prefix(self) -> Optional[str]: return None # 不使用命令前缀,因为不处理命令 @property def commands(self) -> List[str]: return [] # 不处理任何命令 def __init__(self): super().__init__() self.data = None self.message_cache = set() # 用于防止消息循环转发 self.group_virtual_redis = None def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG.info(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.event_system = context.get("event_system") self.message_util: MessageUtil = context.get("message_util") # 初始化配置 self.enable = self._config.get("GroupVirtual", {}).get("enable", True) # 初始化Redis数据库操作类 self.db_manager = DBConnectionManager.get_instance() self.group_virtual_redis = GroupVirtualRedisDB(self.db_manager) # 从Redis加载数据 self.data = self.group_virtual_redis.load_chat_groups() self.LOG.info(f"[{self.name}] 插件初始化完成") return True def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False # 只处理群消息 roomid = message.get("roomid", "") if not roomid: return False # 检查该群是否在任何虚拟聊天组中 chat_groups = self._get_chat_groups_by_group_id(roomid) return len(chat_groups) > 0 @plugin_stats_decorator(plugin_name="跨群聊天") def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" roomid = message.get("roomid", "") sender = message.get("sender", "") full_wx_msg: WxMessage = message.get("full_wx_msg") # 检查是否是机器人自己发送的消息 if full_wx_msg.from_self(): return False, "不转发自己的消息" # 获取该群所在的所有虚拟聊天组 chat_groups = self._get_chat_groups_by_group_id(roomid) if not chat_groups: return False, "该群不在任何虚拟聊天组中" # 生成消息唯一标识(用于防止循环转发) message_id = f"{message.get('id')}_{message.get('timestamp')}" if message_id in self.message_cache: return False, "消息已经转发过" # 添加到缓存 self.message_cache.add(message_id) # 缓存大小控制 if len(self.message_cache) > 1000: self.message_cache = set(list(self.message_cache)[-500:]) # 处理每个虚拟聊天组的转发 for chat_group in chat_groups: self._forward_message_in_chat_group(message, chat_group) return True, "消息已转发" def _forward_message_in_chat_group(self, message: Dict[str, Any], chat_group: Dict[str, Any]): """在虚拟聊天组内转发消息""" roomid = message.get("roomid", "") sender = message.get("sender", "") con = ContactManager.get_instance() # 获取发送者昵称 sender_name = con.get_group_name(roomid, sender) or "未知用户" # 获取源群名称 source_group_name = None for group in chat_group["groups"]: if group["id"] == roomid: source_group_name = group["name"] break if not source_group_name: source_group_name = "未知群组" # 构建转发消息前缀 prefix = f"[{sender_name}@{source_group_name}]:" # 根据消息类型处理 msg_type = message.get("type", 0) # 文本消息 if msg_type == 1: content = message.get("content", "") forward_content = f"{prefix}{content}" # 转发到其他群 for group in chat_group["groups"]: if group["id"] != roomid: # 不转发回源群 self.message_util.send_text(forward_content, group["id"]) # 图片消息 暂时不支持。 # elif msg_type == 3: # # 先发送前缀 # for group in chat_group["groups"]: # if group["id"] != roomid: # self.message_util.send_text(f"{prefix}[图片]", group["id"]) # # 转发图片 # image_path = message.get("file_path", "") # if image_path and os.path.exists(image_path): # self.message_util.send_image(image_path, group["id"]) def _get_chat_group(self, chat_group_id: str) -> Optional[Dict[str, Any]]: """获取虚拟聊天组""" return self.group_virtual_redis.get_chat_group(chat_group_id) def _get_chat_groups_by_group_id(self, group_id: str) -> List[Dict[str, Any]]: """获取包含指定群的所有虚拟聊天组""" return self.group_virtual_redis.get_chat_groups_by_group_id(group_id) def _save_data(self): """保存数据到Redis""" self.group_virtual_redis.save_chat_groups(self.data)