206 lines
7.4 KiB
Python
206 lines
7.4 KiB
Python
import xml.etree.ElementTree as ET
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from db.connection import DBConnectionManager
|
||
from db.group_virtual_redis import GroupVirtualRedisDB
|
||
from utils.wechat.contact_manager import ContactManager
|
||
from wechat_ipad import WechatAPIClient
|
||
from wechat_ipad.models.message import WxMessage, MessageType
|
||
|
||
|
||
class GroupVirtualPlugin(MessagePluginInterface):
|
||
"""跨群聊天插件"""
|
||
|
||
# 功能权限常量
|
||
FEATURE_KEY = "GROUP_VIRTUAL"
|
||
FEATURE_DESCRIPTION = "🔄 跨群聊天功能 [自动转发群组间消息]"
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "跨群聊天"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "将多个微信群组合成一个虚拟聊天组,实现群组间消息的自动转发"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "Trae AI"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return None # 不使用命令前缀,因为不处理命令
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return [] # 不处理任何命令
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.data = None
|
||
self.message_cache = set() # 用于防止消息循环转发
|
||
self.group_virtual_redis = None
|
||
# 注册功能权限
|
||
self.feature = self.register_feature()
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG.info(f"正在初始化 {self.name} 插件...")
|
||
|
||
# 保存上下文对象
|
||
self.event_system = context.get("event_system")
|
||
|
||
# 初始化配置
|
||
self.enable = self._config.get("GroupVirtual", {}).get("enable", True)
|
||
|
||
# 初始化Redis数据库操作类
|
||
self.db_manager = DBConnectionManager.get_instance()
|
||
self.group_virtual_redis = GroupVirtualRedisDB(self.db_manager)
|
||
|
||
# 从Redis加载数据
|
||
self.data = self.group_virtual_redis.load_chat_groups()
|
||
|
||
self.LOG.info(f"[{self.name}] 插件初始化完成")
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
if not self.enable:
|
||
return False
|
||
|
||
roomid = message.get("roomid", "")
|
||
|
||
# 只处理群消息
|
||
if not roomid:
|
||
return False
|
||
|
||
# 检查该群是否在任何虚拟聊天组中
|
||
chat_groups = self._get_chat_groups_by_group_id(roomid)
|
||
return len(chat_groups) > 0
|
||
|
||
# @plugin_stats_decorator(plugin_name="跨群聊天")
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理消息"""
|
||
roomid = message.get("roomid", "")
|
||
sender = message.get("sender", "")
|
||
full_wx_msg: WxMessage = message.get("full_wx_msg")
|
||
content = message.get("content", "")
|
||
self.LOG.debug(f"插件执行: {self.name}:{content}")
|
||
self.bot: WechatAPIClient = message.get("bot")
|
||
# 检查是否是机器人自己发送的消息
|
||
if full_wx_msg.from_self():
|
||
return False, "不转发自己的消息"
|
||
|
||
# 获取该群所在的所有虚拟聊天组
|
||
chat_groups = self._get_chat_groups_by_group_id(roomid)
|
||
if not chat_groups:
|
||
return False, "该群不在任何虚拟聊天组中"
|
||
|
||
# 生成消息唯一标识(用于防止循环转发)
|
||
message_id = f"{full_wx_msg.msg_id}_{full_wx_msg.create_time}"
|
||
if message_id in self.message_cache:
|
||
return False, "消息已经转发过"
|
||
|
||
# 添加到缓存
|
||
self.message_cache.add(message_id)
|
||
# 缓存大小控制
|
||
if len(self.message_cache) > 1000:
|
||
self.message_cache = set(list(self.message_cache)[-500:])
|
||
|
||
# 处理每个虚拟聊天组的转发
|
||
for chat_group in chat_groups:
|
||
await self._forward_message_in_chat_group(message, chat_group)
|
||
|
||
return True, "消息已转发"
|
||
|
||
async def _forward_message_in_chat_group(self, message: Dict[str, Any], chat_group: Dict[str, Any]):
|
||
"""在虚拟聊天组内转发消息"""
|
||
roomid = message.get("roomid", "")
|
||
sender = message.get("sender", "")
|
||
con = ContactManager.get_instance()
|
||
# 获取发送者昵称
|
||
sender_name = con.get_group_name(roomid, sender) or "未知用户"
|
||
|
||
# 获取源群名称
|
||
source_group_name = None
|
||
for group in chat_group["groups"]:
|
||
if group["id"] == roomid:
|
||
source_group_name = group["name"]
|
||
break
|
||
|
||
if not source_group_name:
|
||
source_group_name = "未知群组"
|
||
|
||
# 构建转发消息前缀
|
||
prefix = f"[{sender_name}@{source_group_name}]:"
|
||
|
||
# 根据消息类型处理
|
||
msg_type = message.get("type", 0)
|
||
|
||
# 文本消息
|
||
if msg_type == MessageType.TEXT:
|
||
content = message.get("content", "")
|
||
forward_content = f"{prefix}{content}"
|
||
self.LOG.debug(f"forward_content: {forward_content}")
|
||
# 转发到其他群
|
||
for group in chat_group["groups"]:
|
||
if group["id"] != roomid: # 不转发回源群
|
||
await self.bot.send_text_message(group["id"], forward_content)
|
||
|
||
# 图片消息 暂时不支持。
|
||
elif msg_type == MessageType.IMAGE:
|
||
# 先发送前缀
|
||
for group in chat_group["groups"]:
|
||
if group["id"] != roomid:
|
||
await self.bot.send_text_message(group["id"], f"{prefix}[图片]")
|
||
# 转发图片
|
||
xml_content = message.get("content", "")
|
||
self.LOG.debug(f"xml_content: {xml_content}")
|
||
await self.bot.send_cdn_img_msg(group["id"], xml_content)
|
||
elif msg_type == MessageType.VIDEO:
|
||
for group in chat_group["groups"]:
|
||
if group["id"] != roomid:
|
||
await self.bot.send_text_message(group["id"], f"{prefix}[视频]")
|
||
# 转发图片
|
||
xml_content = message.get("content", "")
|
||
self.LOG.debug(f"xml_content: {xml_content}")
|
||
await self.bot.send_cdn_video_msg(group["id"], xml_content)
|
||
|
||
def _get_chat_group(self, chat_group_id: str) -> Optional[Dict[str, Any]]:
|
||
"""获取虚拟聊天组"""
|
||
return self.group_virtual_redis.get_chat_group(chat_group_id)
|
||
|
||
def _get_chat_groups_by_group_id(self, group_id: str) -> List[Dict[str, Any]]:
|
||
"""获取包含指定群的所有虚拟聊天组"""
|
||
return self.group_virtual_redis.get_chat_groups_by_group_id(group_id)
|
||
|
||
def _save_data(self):
|
||
"""保存数据到Redis"""
|
||
self.group_virtual_redis.save_chat_groups(self.data)
|