182 lines
5.9 KiB
Python
182 lines
5.9 KiB
Python
# -*- coding: utf-8 -*-
|
||
import logging
|
||
import os
|
||
import threading
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
|
||
from plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from plugin_common.plugin_interface import PluginStatus
|
||
from utils.robot_cmd.robot_command import Feature, PermissionStatus
|
||
|
||
|
||
# 动态导入win_click.py
|
||
def import_win_click():
|
||
"""动态导入win_click模块"""
|
||
try:
|
||
# 首先尝试从项目根目录导入
|
||
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
|
||
win_click_path = os.path.join(project_root, "win_click.py")
|
||
|
||
if not os.path.exists(win_click_path):
|
||
# 尝试在utils目录下查找
|
||
win_click_path = os.path.join(project_root, "utils", "win_click.py")
|
||
|
||
if os.path.exists(win_click_path):
|
||
import importlib.util
|
||
import sys
|
||
|
||
spec = importlib.util.spec_from_file_location("win_click", win_click_path)
|
||
win_click = importlib.util.module_from_spec(spec)
|
||
sys.modules["win_click"] = win_click
|
||
spec.loader.exec_module(win_click)
|
||
return win_click
|
||
else:
|
||
return None
|
||
except Exception as e:
|
||
print(f"导入win_click模块失败: {e}")
|
||
return None
|
||
|
||
|
||
win_click = import_win_click()
|
||
|
||
|
||
class SystemUpdaterPlugin(MessagePluginInterface):
|
||
"""系统更新插件"""
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "系统更新"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "提供系统更新和自动登录功能"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "AI Assistant"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return ""
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return self._commands
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.admin_wxids = []
|
||
self.wait_time = 15 # 默认等待15秒
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG = logging.getLogger(f"Plugin.{self.name}")
|
||
self.LOG.info(f"正在初始化 {self.name} 插件...")
|
||
|
||
# 保存上下文对象
|
||
self.message_util = context.get("message_util")
|
||
self.config = self._config
|
||
|
||
# 从配置中获取命令和其他设置
|
||
plugin_config = self._config.get("SystemUpdater", {})
|
||
self._commands = plugin_config.get("commands", ["更新系统", "系统更新"])
|
||
self.wait_time = plugin_config.get("wait_time", 15)
|
||
self.admin_wxids = plugin_config.get("admin_wxids", [])
|
||
self.enable = plugin_config.get("enable", True)
|
||
|
||
# 检查win_click模块是否可用
|
||
if win_click is None:
|
||
self.LOG.error("无法导入win_click模块,插件功能将受限")
|
||
|
||
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
if not self.enable:
|
||
return False
|
||
|
||
content = str(message.get("content", "")).strip()
|
||
|
||
# 检查是否是命令
|
||
if content in self._commands:
|
||
return True
|
||
|
||
# 检查是否是带参数的更新命令
|
||
for cmd in self._commands:
|
||
if content.startswith(f"{cmd} "):
|
||
return True
|
||
|
||
return False
|
||
|
||
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理消息"""
|
||
content = str(message.get("content", "")).strip()
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
gbm = message.get("gbm", None)
|
||
|
||
# 检查权限
|
||
if self.admin_wxids and sender not in self.admin_wxids:
|
||
self.message_util.send_text("⚠️ 您没有执行此操作的权限",
|
||
(roomid if roomid else sender), sender)
|
||
return True, "无权限"
|
||
|
||
# 如果是群消息,检查群权限
|
||
if roomid and gbm and hasattr(gbm, 'get_group_permission'):
|
||
if gbm.get_group_permission(roomid, Feature.ROBOT) == PermissionStatus.DISABLED:
|
||
return False, "机器人功能已禁用"
|
||
|
||
# 提取等待时间参数
|
||
wait_time = self.wait_time
|
||
command = content.split(" ")[0]
|
||
|
||
if len(content.split(" ")) > 1:
|
||
try:
|
||
param = content.split(" ")[1].strip()
|
||
if param.isdigit():
|
||
wait_time = int(param)
|
||
self.LOG.info(f"使用自定义等待时间: {wait_time}秒")
|
||
except:
|
||
pass
|
||
|
||
# 检查win_click模块是否可用
|
||
if win_click is None:
|
||
self.message_util.send_text("⚠️ 无法执行更新操作,系统缺少必要的组件",
|
||
(roomid if roomid else sender), sender)
|
||
return True, "缺少win_click模块"
|
||
|
||
# 发送更新通知
|
||
self.message_util.send_text(f"🔄 系统即将更新并重启,等待时间设置为{wait_time}秒...",
|
||
(roomid if roomid else sender), sender)
|
||
|
||
# 启动更新流程
|
||
def update_thread():
|
||
try:
|
||
self.LOG.info(f"开始系统更新流程,等待时间: {wait_time}秒")
|
||
win_click.update_system(wait_time)
|
||
except Exception as e:
|
||
self.LOG.error(f"系统更新失败: {e}")
|
||
|
||
# 在新线程中启动更新,避免阻塞消息处理
|
||
threading.Thread(target=update_thread, daemon=True).start()
|
||
|
||
return True, "系统更新中"
|
||
|
||
|
||
# 插件入口点
|
||
def get_plugin():
|
||
return SystemUpdaterPlugin()
|