调整代码

This commit is contained in:
liuwei
2025-04-23 16:11:06 +08:00
parent b919a3834e
commit fc757151e4
11 changed files with 345 additions and 214 deletions

View File

@@ -4,6 +4,10 @@ import time
from datetime import datetime
from typing import Dict, Any, List, Optional, Tuple
import xml.etree.ElementTree as ET
from gewechat.client import gewe_client
from gewechat.response.gewe_resp import GeweResponse
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
@@ -61,160 +65,73 @@ class GroupMemberChangePlugin(MessagePluginInterface):
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理接收到的消息"""
# 此插件主要通过定时任务工作,不处理消息
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.GROUP_MEMBER_CHANGE) == PermissionStatus.DISABLED:
return False, "没有权限"
xml_content = str(content).strip().replace("\n", "").replace("\t", "")
root = ET.fromstring(xml_content)
if root.tag != "sysmsg":
return False, "非本次需要处理消息"
# 检查是否是进群消息
if root.attrib.get("type") == "sysmsgtemplate":
sys_msg_template = root.find("sysmsgtemplate")
if sys_msg_template is None:
return False, "非本次需要处理消息"
template = sys_msg_template.find("content_template")
if template is None:
return False, "非本次需要处理消息"
template_type = template.attrib.get("type")
if template_type not in ["tmpl_type_profile", "tmpl_type_profilewithrevoke"]:
return False, "非本次需要处理消息"
template_text = template.find("template").text
if '"$names$"加入了群聊' in template_text: # 直接加入群聊
new_members = self._parse_member_info(root, "names")
elif '"$username$"邀请"$names$"加入了群聊' in template_text: # 通过邀请加入群聊
new_members = self._parse_member_info(root, "names")
elif '你邀请"$names$"加入了群聊' in template_text: # 自己邀请成员加入群聊
new_members = self._parse_member_info(root, "names")
elif '"$adder$"通过扫描"$from$"分享的二维码加入群聊' in template_text: # 通过二维码加入群聊
new_members = self._parse_member_info(root, "adder")
elif '"$adder$"通过"$from$"的邀请二维码加入群聊' in template_text:
new_members = self._parse_member_info(root, "adder")
else:
self.LOG.warning(f"未知的入群方式: {template_text}")
return False, "非本次需要处理消息"
if not new_members:
return False, "非本次需要处理消息"
for member in new_members:
wxid = member["wxid"]
nickname = member["nickname"]
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
member_wxids = [wxid]
profile = gewe_client.client.get_chatroom_member_detail(gewe_client.client.app_id, member_wxids)
data = GeweResponse(profile).get_data()
gewe_client.client.post_link(gewe_client.client.app_id, sender,
title=f"👏欢迎 {nickname} 加入群聊!🎉",
description=f"⌚时间:{now}\n",
url="",
thumb_url=data[0].get("BigHeadImgUrl", ""))
return True, "已发送进群欢迎语"
return False, "无需执行"
def start(self) -> bool:
"""启动插件"""
if self.status == PluginStatus.RUNNING:
self.LOG.warning(f"{self.name} 插件已经在运行中")
return True
self.stop_flag = False
# 启动监控线程
self.monitor_thread = threading.Thread(target=self._monitor_groups, daemon=True)
self.monitor_thread.start()
self.status = PluginStatus.RUNNING
self.LOG.info(f"[{self.name}] 插件已启动")
return True
def stop(self) -> bool:
"""停止插件"""
if self.status != PluginStatus.RUNNING:
self.LOG.warning(f"{self.name} 插件未在运行中")
return True
self.stop_flag = True
if self.monitor_thread and self.monitor_thread.is_alive():
self.monitor_thread.join(timeout=5)
self.status = PluginStatus.STOPPED
self.LOG.info(f"[{self.name}] 插件已停止")
return True
def _monitor_groups(self):
"""监控群成员变化的线程函数"""
self.LOG.info("群成员监控线程已启动")
while not self.stop_flag:
try:
# 获取所有启用了机器人的群组
group_list = GroupBotManager.get_group_list()
for group_id in group_list:
# 检查群是否启用了成员变更提醒功能
if GroupBotManager.get_group_permission(group_id,
Feature.GROUP_MEMBER_CHANGE) == PermissionStatus.ENABLED:
self._check_group_members(group_id)
# 等待指定的时间间隔
for _ in range(self.check_interval):
if self.stop_flag:
break
time.sleep(1)
except Exception as e:
self.LOG.error(f"监控群成员变化时发生错误: {e}", exc_info=True)
time.sleep(5) # 发生错误时短暂休眠
self.LOG.info("群成员监控线程已停止")
def _check_group_members(self, group_id: str):
"""检查指定群的成员变化"""
try:
# 获取当前群成员
current_members = self.message_util.get_chatroom_members(group_id)
# 添加安全检查:如果获取到的成员列表为空,可能是接口异常
if not current_members:
self.LOG.warning(f"获取群 {group_id} 成员列表为空,可能是接口异常,跳过本次检查")
return
# 如果是首次检查该群
if group_id not in self.local_members:
self.LOG.info(f"首次检查群 {group_id},记录当前成员")
self.local_members[group_id] = current_members
return
# 获取上次记录的成员
previous_members = self.local_members[group_id]
# 添加安全检查:如果上次记录的成员为空,重新初始化
if not previous_members:
self.LOG.warning(f"{group_id} 上次记录的成员列表为空,重新初始化")
self.local_members[group_id] = current_members
return
# 比较成员变化
current_member_ids = set(current_members.keys())
previous_member_ids = set(previous_members.keys())
# 找出退群的成员
left_members = previous_member_ids - current_member_ids
# 添加安全检查:如果退群人数超过阈值,可能是异常情况
if len(left_members) > len(previous_member_ids) * 0.5: # 如果超过50%的成员"退群"
self.LOG.warning(
f"{group_id} 检测到超过50%的成员退群 ({len(left_members)}/{len(previous_member_ids)}),判定为异常情况,跳过通知")
# 更新本地缓存但不发送通知
self.local_members[group_id] = current_members
return
# 找出新加入的成员
joined_members = current_member_ids - previous_member_ids
# 如果有成员变化
if left_members or joined_members:
self.LOG.info(f"{group_id} 成员发生变化: {len(joined_members)}人加入, {len(left_members)}人退出")
# 处理退群成员
for wxid in left_members:
nickname = previous_members[wxid]
self._send_leave_notification(group_id, wxid, nickname)
# 处理新加入成员
for wxid in joined_members:
nickname = current_members[wxid]
self._send_join_notification(group_id, wxid, nickname) # 添加欢迎新成员的功能
# 更新本地缓存
self.local_members[group_id] = current_members
except Exception as e:
self.LOG.error(f"检查群 {group_id} 成员变化时发生错误: {e}", exc_info=True)
# 异常情况下不更新缓存,避免错误数据导致误判
def _send_leave_notification(self, group_id: str, wxid: str, nickname: str):
"""发送成员退群通知"""
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
message = f"""【退群提醒】
用户: {nickname}
微信号: {wxid}
退群时间: {now_time}
"""
# 使用message_util发送消息
self.message_util.send_text(message, group_id)
self.LOG.info(f"已发送退群通知: {nickname} 退出群 {group_id}")
def _send_join_notification(self, group_id: str, wxid: str, nickname: str):
"""发送成员入群通知"""
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
message = f"""
欢迎新成员: {nickname}
⌚️: {now_time}
"""
# 使用message_util发送消息 编写一个 send_rich_text 方法
self.message_util.send_rich_text("bot", "gh_bot", f"欢迎 {nickname} 加入群聊", message,
"https://hot.imsyy.top/#/",
"https://mmbiz.qpic.cn/mmbiz_png/bS1825ympzGML4gV6ibEFiaNA8Ycv6kCOo6tAwy5VntjeUGS0O2QQYeiakgIcpeFR9e0uCwl6nuQGib6f39xxkeFUQ/640?wx_fmt=png&",
group_id)
self.LOG.info(f"已发送入群通知: {nickname} 加入群 {group_id}")
@property
def commands(self) -> List[str]:
"""插件支持的命令列表"""
@@ -223,3 +140,30 @@ class GroupMemberChangePlugin(MessagePluginInterface):
def get_help(self) -> str:
"""获取插件帮助信息"""
return "群成员变更监控插件:自动监控群成员变动并发送通知。"
def _parse_member_info(self, root: ET.Element, link_name: str = "names") -> list[dict]:
"""解析新成员信息"""
new_members = []
try:
# 查找指定链接中的成员列表
names_link = root.find(f".//link[@name='{link_name}']")
if names_link is None:
return new_members
memberlist = names_link.find("memberlist")
if memberlist is None:
return new_members
for member in memberlist.findall("member"):
username = member.find("username").text
nickname = member.find("nickname").text
new_members.append({
"wxid": username,
"nickname": nickname
})
except Exception as e:
self.LOG.warning(f"解析新成员信息失败: {e}")
return new_members

View File

@@ -1,9 +1,9 @@
import logging
import requests
import lz4.block as lb
from typing import Dict, Any, List, Optional, Tuple
from gewechat.client import gewe_client
from gewechat.response.gewe_resp import GeweResponse
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
@@ -93,7 +93,7 @@ class MusicPlugin(MessagePluginInterface):
# 检查命令格式
if len(content.split(" ")) == 1:
self.message_util.send_text(f"❌命令格式错误!\n{self.command_format}",
(roomid if roomid else sender), sender)
(roomid if roomid else sender), sender)
return False, "命令格式错误"
# 检查权限
@@ -108,11 +108,11 @@ class MusicPlugin(MessagePluginInterface):
song_info = self._search_song(user_song_name)
if not song_info or not song_info.get("play_url"):
self.message_util.send_text(f"❌未找到歌曲:{user_song_name}",
(roomid if roomid else sender), sender)
(roomid if roomid else sender), sender)
return False, "未找到歌曲"
# 发送音乐
self._send_music_message(wcf, song_info, roomid or sender)
self._send_music_message(song_info, roomid or sender)
return True, "发送成功"
except Exception as e:
@@ -143,7 +143,7 @@ class MusicPlugin(MessagePluginInterface):
self.LOG.error(f"搜索歌曲出错: {e}")
return {}
def _send_music_message(self, wcf, song_info: Dict[str, Any], receiver: str) -> bool:
def _send_music_message(self, song_info: Dict[str, Any], receiver: str) -> bool:
"""发送音乐消息"""
try:
song_name = song_info.get("song_name", "")
@@ -201,19 +201,11 @@ class MusicPlugin(MessagePluginInterface):
</appinfo>
<commenturl />
</msg>"""
# # 修改消息数据库里面的消息content内容
# text_bytes = xml_message.encode('utf-8')
# compressed_data = lb.compress(text_bytes, store_size=False).hex()
#
# data = self.message_util.query_sql('MSG0.db', "SELECT * FROM MSG where type = 49 limit 1")
# self.message_util.query_sql('MSG0.db',
# f"""UPDATE MSG SET CompressContent = x'{compressed_data}', BytesExtra=x'', type=49, SubType=3,
# IsSender=0, TalkerId=2 WHERE MsgSvrID={data[0]['MsgSvrID']}"""
# )
#
# result = self.message_util.forward_msg(data[0]["MsgSvrID"], receiver)
# self.LOG.info(f"插件化:点歌发送结果: {result}")
return True
resp = gewe_client.client.post_app_msg(gewe_client.client.app_id, receiver, xml_message)
data = GeweResponse(resp)
self.LOG.info(f"发送音乐消息:{data}")
if data.is_success:
return True
except Exception as e:
self.LOG.error(f"发送音乐消息出错: {e}")

View File

@@ -110,18 +110,18 @@ class PluginManagerPlugin(MessagePluginInterface):
# 根据子命令执行相应操作
command_handlers = {
"列表": self._list_plugins,
"启用": lambda w, s, r: self._operate_plugin(plugin_name, w, s, r, self._enable_plugin),
"禁用": lambda w, s, r: self._operate_plugin(plugin_name, w, s, r, self._disable_plugin),
"重载": lambda w, s, r: self._operate_plugin(plugin_name, w, s, r, self._reload_plugin),
"卸载": lambda w, s, r: self._operate_plugin(plugin_name, w, s, r, self._unload_plugin),
"启用": lambda s, r: self._operate_plugin(plugin_name, s, r, self._enable_plugin),
"禁用": lambda s, r: self._operate_plugin(plugin_name, s, r, self._disable_plugin),
"重载": lambda s, r: self._operate_plugin(plugin_name, s, r, self._reload_plugin),
"卸载": lambda s, r: self._operate_plugin(plugin_name, s, r, self._unload_plugin),
# 修改这一行,使用 lambda 函数而不是直接调用
"加载": lambda w, s, r: self._load_plugin(plugin_name, w, s, r),
"信息": lambda w, s, r: self._operate_plugin(plugin_name, w, s, r, self._plugin_info)
"加载": lambda s, r: self._load_plugin(plugin_name, s, r),
"信息": lambda s, r: self._operate_plugin(plugin_name, s, r, self._plugin_info)
}
handler = command_handlers.get(sub_command)
if handler and (sub_command == "列表" or plugin_name):
return handler(wcf, sender, roomid)
return handler(sender, roomid)
else:
self.message_util.send_text(f"❌未知命令或缺少参数!\n{self.command_format}", target, sender)
return True, "未知命令"
@@ -154,7 +154,7 @@ class PluginManagerPlugin(MessagePluginInterface):
self.message_util.send_text(message, target, sender)
return True, "列出插件成功"
def _operate_plugin(self, plugin_name: str, wcf, sender: str, roomid: str,
def _operate_plugin(self, plugin_name: str, sender: str, roomid: str,
operation_func) -> Tuple[bool, str]:
"""通用插件操作函数"""
target = roomid if roomid else sender
@@ -172,9 +172,9 @@ class PluginManagerPlugin(MessagePluginInterface):
return True, "不能对插件管理插件自身执行此操作"
# 执行具体操作
return operation_func(display_name, wcf, sender, roomid)
return operation_func(display_name, sender, roomid)
def _enable_plugin(self, plugin_name: str, wcf, sender: str, roomid: str) -> Tuple[bool, str]:
def _enable_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""启用插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
@@ -198,7 +198,7 @@ class PluginManagerPlugin(MessagePluginInterface):
self.message_util.send_text(f"❌插件 {plugin_name} 启用失败", target, sender)
return False, f"插件 {plugin_name} 启用失败"
def _disable_plugin(self, plugin_name: str, wcf, sender: str, roomid: str) -> Tuple[bool, str]:
def _disable_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""禁用插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
@@ -222,7 +222,7 @@ class PluginManagerPlugin(MessagePluginInterface):
self.message_util.send_text(f"❌插件 {plugin_name} 禁用失败", target, sender)
return False, f"插件 {plugin_name} 禁用失败"
def _reload_plugin(self, plugin_name: str, wcf, sender: str, roomid: str) -> Tuple[bool, str]:
def _reload_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""重载插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
@@ -247,7 +247,7 @@ class PluginManagerPlugin(MessagePluginInterface):
self.message_util.send_text(f"❌插件 {plugin_name} 重载失败", target, sender)
return False, f"插件 {plugin_name} 重载失败"
def _unload_plugin(self, plugin_name: str, wcf, sender: str, roomid: str) -> Tuple[bool, str]:
def _unload_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""卸载插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
@@ -267,7 +267,7 @@ class PluginManagerPlugin(MessagePluginInterface):
self.message_util.send_text(f"❌插件 {plugin_name} 卸载失败", target, sender)
return False, f"插件 {plugin_name} 卸载失败"
def _load_plugin(self, plugin_name: str, wcf, sender: str, roomid: str, silent: bool = False) -> Tuple[bool, str]:
def _load_plugin(self, plugin_name: str, sender: str, roomid: str, silent: bool = False) -> Tuple[bool, str]:
"""加载插件"""
# 对于加载操作,我们直接使用目录名作为模块名
# 检查插件目录是否存在
@@ -275,7 +275,7 @@ class PluginManagerPlugin(MessagePluginInterface):
if not os.path.exists(plugin_dir):
if not silent:
self.message_util.send_text(f"❌插件目录 {plugin_dir} 不存在",
(roomid if roomid else sender), sender)
(roomid if roomid else sender), sender)
return False, f"插件目录 {plugin_dir} 不存在"
# 检查插件是否已加载 - 遍历所有插件查找模块名匹配的
@@ -284,7 +284,7 @@ class PluginManagerPlugin(MessagePluginInterface):
if existing_module_name == plugin_name:
if not silent:
self.message_util.send_text(f"⚠️插件 {existing_plugin.name} (模块名: {plugin_name}) 已经加载",
(roomid if roomid else sender), sender)
(roomid if roomid else sender), sender)
return True, f"插件 {existing_plugin.name} 已经加载"
try:
@@ -293,34 +293,34 @@ class PluginManagerPlugin(MessagePluginInterface):
if plugin:
if not silent:
self.message_util.send_text(f"✅插件 {plugin.name} 加载成功",
(roomid if roomid else sender), sender)
(roomid if roomid else sender), sender)
return True, f"插件 {plugin.name} 加载成功"
else:
if not silent:
self.message_util.send_text(f"❌插件 {plugin_name} 加载失败",
(roomid if roomid else sender), sender)
(roomid if roomid else sender), sender)
return False, f"插件 {plugin_name} 加载失败"
except Exception as e:
self.LOG.error(f"加载插件 {plugin_name} 出错: {e}")
if not silent:
self.message_util.send_text(f"❌加载插件出错: {str(e)}",
(roomid if roomid else sender), sender)
(roomid if roomid else sender), sender)
return False, f"加载插件出错: {e}"
def _plugin_info(self, plugin_name: str, wcf, sender: str, roomid: str) -> Tuple[bool, str]:
def _plugin_info(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""查看插件详情"""
# 查找匹配的插件名称
display_name, plugin = self.plugin_manager.find_plugin_by_name(plugin_name)
if not display_name:
self.message_util.send_text(f"❌未找到插件 {plugin_name},请检查名称是否正确",
(roomid if roomid else sender), sender)
(roomid if roomid else sender), sender)
return True, f"未找到插件 {plugin_name}"
plugin = self.plugin_registry.get_plugin(display_name)
if not plugin:
self.message_util.send_text(f"❌插件 {display_name} 不存在",
(roomid if roomid else sender), sender)
(roomid if roomid else sender), sender)
return True, f"插件 {display_name} 不存在"
# 获取插件模块名

View File

@@ -103,7 +103,7 @@ class VideoManPlugin(MessagePluginInterface):
try:
# 下载视频
file_abspath = self._download_video("https://api.guiguiya.com/api/video/fuji?type=json")
#FIXME 需要换成web容器地址。否则无法获取。
if not file_abspath:
self.message_util.send_text(f"\n❌视频下载失败,请稍后再试",
(roomid if roomid else sender), sender)