# -*- coding: utf-8 -*- import os import subprocess from loguru import logger from typing import Dict, Any, List, Optional, Tuple from plugin_common.message_plugin_interface import MessagePluginInterface from plugin_common.plugin_interface import PluginStatus from utils.robot_cmd.robot_command import Feature, PermissionStatus from wechat_ipad import WechatAPIClient class SystemUpdaterPlugin(MessagePluginInterface): """系统更新插件""" @property def name(self) -> str: return "系统更新" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供系统更新和自动登录功能" @property def author(self) -> str: return "AI Assistant" @property def command_prefix(self) -> Optional[str]: return "" @property def commands(self) -> List[str]: return self._commands def __init__(self): super().__init__() self.admin_wxids = [] self.wait_time = 5 # 默认等待15秒 self.bot: WechatAPIClient = None def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logger self.LOG.info(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.message_util = context.get("message_util") self.config = self._config # 从配置中获取命令和其他设置 plugin_config = self._config.get("SystemUpdater", {}) self._commands = plugin_config.get("commands", ["更新系统", "系统更新"]) self.wait_time = plugin_config.get("wait_time", 5) self.admin_wxids = plugin_config.get("admin_wxids", []) self.enable = plugin_config.get("enable", True) self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: self.status = PluginStatus.RUNNING return True def stop(self) -> bool: self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() # 检查是否是命令 if content in self._commands: return True # 检查是否是带参数的更新命令 for cmd in self._commands: if content.startswith(f"{cmd} "): return True return False async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() sender = message.get("sender") roomid = message.get("roomid", "") gbm = message.get("gbm", None) self.bot: WechatAPIClient = message.get("bot") # 检查权限 if self.admin_wxids and sender not in self.admin_wxids: await self.bot.send_text_message((roomid if roomid else sender), "⚠️ 您没有执行此操作的权限", sender) return True, "无权限" # 如果是群消息,检查群权限 if roomid and gbm and hasattr(gbm, 'get_group_permission'): if gbm.get_group_permission(roomid, Feature.ROBOT) == PermissionStatus.DISABLED: return False, "机器人功能已禁用" # 提取等待时间参数 wait_time = self.wait_time command = content.split(" ")[0] if len(content.split(" ")) > 1: try: param = content.split(" ")[1].strip() if param.isdigit(): wait_time = int(param) self.LOG.info(f"使用自定义等待时间: {wait_time}秒") except: pass # 发送更新通知 await self.bot.send_text_message((roomid if roomid else sender), f"🔄 系统即将更新并重启,等待时间设置为{wait_time}秒...", sender) # 执行系统更新脚本 self._execute_system_update() return True, "系统更新中" def _execute_system_update(self): """执行系统更新操作""" try: # 使用相对路径调用 restart.sh 脚本 result = subprocess.run( ["/home/liuwei/wechatbot/WeChatRobot/restart.sh"], # 使用绝对路径 check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # 打印执行结果 logger.info(f"更新命令执行成功: {result.stdout.decode()}") except subprocess.CalledProcessError as e: # 如果 subprocess 执行出错,输出 stderr 信息 logger.error(f"执行更新命令时出错: {e.stderr.decode()}") except Exception as e: # 捕获其他异常 logger.error(f"系统更新失败: {str(e)}") # 插件入口点 def get_plugin(): return SystemUpdaterPlugin()