调整 message_util 的发送text方法名,方便替换wcf.send_text。加入了虚拟群组管理功能;

This commit is contained in:
liuwei
2025-04-18 12:05:16 +08:00
parent e2c1375e54
commit 5b0a703b46
18 changed files with 1173 additions and 116 deletions

View File

@@ -0,0 +1,141 @@
# 跨群聊天插件需求分析与文档
## 1. 需求概述
开发一个名为"跨群聊天"的微信机器人插件,该插件允许用户将多个微信群组合成一个虚拟聊天组,实现群组间消息的自动转发,使不同群的成员能够进行跨群交流。
## 2. 功能需求
### 2.1 核心功能
1. **群组管理**
- 创建虚拟聊天组
- 向虚拟聊天组中添加/删除微信群
- 查看当前所有虚拟聊天组及其包含的群
- 解散虚拟聊天组
2. **消息转发**
- 自动捕获虚拟聊天组内任一群的消息
- 将消息转发至同一虚拟聊天组内的其他群
- 支持文本、图片等消息类型的转发(不支持语音、表情)
3. **消息标识**
- 转发的消息需标明原始发送者信息(昵称)
- 标明消息来源群组
- 避免消息循环转发
### 2.2 扩展功能
1. **权限控制**
- 设置管理员权限,只有管理员可以管理虚拟聊天组
- 支持黑名单功能,可以屏蔽特定用户的消息不被转发
2. **消息过滤**
- 支持关键词过滤,含有特定关键词的消息不转发
- 支持消息类型过滤,可选择只转发特定类型的消息(仅支持文本、图片)
3. **统计分析**
- 记录转发消息数量
- 统计各群组活跃度
## 3. 技术架构
### 3.1 数据结构
1. **虚拟聊天组(ChatGroup)**
- ID: 唯一标识符
- 名称: 虚拟聊天组名称
- 群组列表: 包含的微信群ID列表
- 创建时间
- 创建者
2. **消息记录(Message)**
- 消息ID
- 原始群ID
- 发送者ID
- 消息内容
- 消息类型
- 发送时间
- 转发状态
### 3.2 接口设计
1. **管理接口**
- 创建虚拟聊天组: `createChatGroup(name, creatorId)`
- 添加群到虚拟聊天组: `addGroupToChatGroup(chatGroupId, wxGroupId)`
- 从虚拟聊天组移除群: `removeGroupFromChatGroup(chatGroupId, wxGroupId)`
- 解散虚拟聊天组: `deleteChatGroup(chatGroupId)`
- 查询虚拟聊天组: `listChatGroups()`, `getChatGroupDetail(chatGroupId)`
2. **消息处理接口**
- 消息接收: `receiveMessage(wxGroupId, senderId, content, type)`
- 消息转发: `forwardMessage(message, targetGroupIds)`
- 消息过滤: `filterMessage(message, rules)`
## 4. 数据存储
使用JSON文件存储虚拟聊天组配置和相关数据
```json
{
"chatGroups": [
{
"id": "cg001",
"name": "技术交流联盟",
"groups": ["wxid_group1", "wxid_group2", "wxid_group3"],
"createdAt": "2023-05-01T10:00:00Z",
"createdBy": "admin_user_id"
}
]
}
```
## 5. 用户交互
### 5.1 命令格式
用户通过在群内发送特定格式的命令来管理虚拟聊天组:
- 创建虚拟聊天组: `#创建跨群 [名称]`
- 添加群到虚拟聊天组: `#添加群 [虚拟聊天组ID] [当前群名称]`
- 查看所有虚拟聊天组: `#查看跨群列表`
- 查看虚拟聊天组详情: `#查看跨群 [虚拟聊天组ID]`
- 退出虚拟聊天组: `#退出跨群 [虚拟聊天组ID]`
- 解散虚拟聊天组: `#解散跨群 [虚拟聊天组ID]`
### 5.2 消息展示格式
转发的消息格式示例:
```
[来自群"技术交流群"]
张三:这是一条测试消息
```
## 6. 实现步骤
1. 创建插件基础结构
2. 实现数据存储模块
3. 实现群组管理功能
4. 实现消息捕获与转发功能
5. 实现用户命令解析
6. 添加消息过滤功能
7. 实现权限控制
8. 测试与优化
## 7. 注意事项
1. 避免消息循环转发
2. 确保消息转发的实时性
3. 注意微信API的限制避免触发封号风险
4. 保护用户隐私,不存储敏感信息
5. 考虑高并发情况下的性能问题
## 8. 后续优化方向
1. 支持更多消息类型的转发
2. 提供Web管理界面
3. 增加更细粒度的权限控制
4. 支持定时消息和定向转发
5. 添加AI智能过滤功能
这份需求文档提供了跨群聊天插件的基本框架和实现思路,可以根据实际开发过程进行调整和完善。

View File

@@ -0,0 +1,7 @@
# 从当前包的main模块导入GroupVirtualPlugin类
from .main import GroupVirtualPlugin
# 提供get_plugin函数返回插件实例
def get_plugin():
"""获取插件实例"""
return GroupVirtualPlugin()

View File

@@ -0,0 +1,3 @@
enable = true
command = ["#创建跨群", "#添加群", "#查看跨群列表", "#查看跨群", "#退出跨群", "#解散跨群"]

View File

@@ -0,0 +1,184 @@
import logging
from typing import Dict, Any, List, Optional, Tuple
from wcferry import Wcf
from message_util import MessageUtil
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.wechat.contact_manager import ContactManager
from db.connection import DBConnectionManager
from db.group_virtual_redis import GroupVirtualRedisDB
class GroupVirtualPlugin(MessagePluginInterface):
"""跨群聊天插件"""
@property
def name(self) -> str:
return "跨群聊天"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "将多个微信群组合成一个虚拟聊天组,实现群组间消息的自动转发"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return None # 不使用命令前缀,因为不处理命令
@property
def commands(self) -> List[str]:
return [] # 不处理任何命令
def __init__(self):
super().__init__()
self.data = None
self.message_cache = set() # 用于防止消息循环转发
self.group_virtual_redis = None
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf: Wcf = context.get("wcf")
self.event_system = context.get("event_system")
self.message_util: MessageUtil = context.get("message_util")
# 初始化配置
self.enable = self._config.get("GroupVirtual", {}).get("enable", True)
# 初始化Redis数据库操作类
self.db_manager = DBConnectionManager.get_instance()
self.group_virtual_redis = GroupVirtualRedisDB(self.db_manager)
# 从Redis加载数据
self.data = self.group_virtual_redis.load_chat_groups()
self.LOG.info(f"[{self.name}] 插件初始化完成")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
# 只处理群消息
roomid = message.get("roomid", "")
if not roomid:
return False
# 检查该群是否在任何虚拟聊天组中
chat_groups = self._get_chat_groups_by_group_id(roomid)
return len(chat_groups) > 0
@plugin_stats_decorator(plugin_name="跨群聊天")
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
roomid = message.get("roomid", "")
sender = message.get("sender", "")
# 检查是否是机器人自己发送的消息
if sender == self.wcf.self_wxid:
return False, "不转发自己的消息"
# 获取该群所在的所有虚拟聊天组
chat_groups = self._get_chat_groups_by_group_id(roomid)
if not chat_groups:
return False, "该群不在任何虚拟聊天组中"
# 生成消息唯一标识(用于防止循环转发)
message_id = f"{message.get('id')}_{message.get('timestamp')}"
if message_id in self.message_cache:
return False, "消息已经转发过"
# 添加到缓存
self.message_cache.add(message_id)
# 缓存大小控制
if len(self.message_cache) > 1000:
self.message_cache = set(list(self.message_cache)[-500:])
# 处理每个虚拟聊天组的转发
for chat_group in chat_groups:
self._forward_message_in_chat_group(message, chat_group)
return True, "消息已转发"
def _forward_message_in_chat_group(self, message: Dict[str, Any], chat_group: Dict[str, Any]):
"""在虚拟聊天组内转发消息"""
roomid = message.get("roomid", "")
sender = message.get("sender", "")
con = ContactManager.get_instance()
# 获取发送者昵称
sender_name = con.get_contact_name(sender) or "未知用户"
# 获取源群名称
source_group_name = None
for group in chat_group["groups"]:
if group["id"] == roomid:
source_group_name = group["name"]
break
if not source_group_name:
source_group_name = "未知群组"
# 构建转发消息前缀
prefix = f"[{sender_name}@{source_group_name}]"
# 根据消息类型处理
msg_type = message.get("type", 0)
# 文本消息
if msg_type == 1:
content = message.get("content", "")
forward_content = f"{prefix}{content}"
# 转发到其他群
for group in chat_group["groups"]:
if group["id"] != roomid: # 不转发回源群
self.message_util.send_text(forward_content, group["id"])
# 图片消息 暂时不支持。
# elif msg_type == 3:
# # 先发送前缀
# for group in chat_group["groups"]:
# if group["id"] != roomid:
# self.message_util.send_text(f"{prefix}[图片]", group["id"])
# # 转发图片
# image_path = message.get("file_path", "")
# if image_path and os.path.exists(image_path):
# self.message_util.send_image(image_path, group["id"])
def _get_chat_group(self, chat_group_id: str) -> Optional[Dict[str, Any]]:
"""获取虚拟聊天组"""
return self.group_virtual_redis.get_chat_group(chat_group_id)
def _get_chat_groups_by_group_id(self, group_id: str) -> List[Dict[str, Any]]:
"""获取包含指定群的所有虚拟聊天组"""
return self.group_virtual_redis.get_chat_groups_by_group_id(group_id)
def _save_data(self):
"""保存数据到Redis"""
self.group_virtual_redis.save_chat_groups(self.data)