自动更新脚本编写

This commit is contained in:
liuwei
2025-03-31 13:37:19 +08:00
parent 4639e6d685
commit 45aeb20960
4 changed files with 438 additions and 104 deletions

View File

@@ -0,0 +1,2 @@
# 系统更新插件
from .main import get_plugin

View File

@@ -0,0 +1,6 @@
[SystemUpdater]
enable = true
commands = ["更新系统", "系统更新", "重启系统", "更新重启"]
wait_time = 15
# 设置管理员微信ID只有这些ID可以执行更新操作
admin_wxids = ["Jyunere"] # 在此添加管理员微信ID例如 ["wxid_123456", "wxid_abcdef"]

View File

@@ -0,0 +1,183 @@
# -*- coding: utf-8 -*-
import logging
import os
import threading
from typing import Dict, Any, List, Optional, Tuple
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
# 动态导入win_click.py
def import_win_click():
"""动态导入win_click模块"""
try:
# 首先尝试从项目根目录导入
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
win_click_path = os.path.join(project_root, "win_click.py")
if not os.path.exists(win_click_path):
# 尝试在utils目录下查找
win_click_path = os.path.join(project_root, "utils", "win_click.py")
if os.path.exists(win_click_path):
import importlib.util
import sys
spec = importlib.util.spec_from_file_location("win_click", win_click_path)
win_click = importlib.util.module_from_spec(spec)
sys.modules["win_click"] = win_click
spec.loader.exec_module(win_click)
return win_click
else:
return None
except Exception as e:
print(f"导入win_click模块失败: {e}")
return None
win_click = import_win_click()
class SystemUpdaterPlugin(MessagePluginInterface):
"""系统更新插件"""
@property
def name(self) -> str:
return "系统更新"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供系统更新和自动登录功能"
@property
def author(self) -> str:
return "AI Assistant"
@property
def command_prefix(self) -> Optional[str]:
return ""
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.admin_wxids = []
self.wait_time = 15 # 默认等待15秒
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf = context.get("wcf")
self.message_util = context.get("message_util")
self.config = self._config
# 从配置中获取命令和其他设置
plugin_config = self._config.get("SystemUpdater", {})
self._commands = plugin_config.get("commands", ["更新系统", "系统更新"])
self.wait_time = plugin_config.get("wait_time", 15)
self.admin_wxids = plugin_config.get("admin_wxids", [])
self.enable = plugin_config.get("enable", True)
# 检查win_click模块是否可用
if win_click is None:
self.LOG.error("无法导入win_click模块插件功能将受限")
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
# 检查是否是命令
if content in self._commands:
return True
# 检查是否是带参数的更新命令
for cmd in self._commands:
if content.startswith(f"{cmd} "):
return True
return False
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf = message.get("wcf")
gbm = message.get("gbm", None)
# 检查权限
if self.admin_wxids and sender not in self.admin_wxids:
wcf.send_text("⚠️ 您没有执行此操作的权限",
(roomid if roomid else sender), sender)
return True, "无权限"
# 如果是群消息,检查群权限
if roomid and gbm and hasattr(gbm, 'get_group_permission'):
if gbm.get_group_permission(roomid, Feature.ROBOT) == PermissionStatus.DISABLED:
return False, "机器人功能已禁用"
# 提取等待时间参数
wait_time = self.wait_time
command = content.split(" ")[0]
if len(content.split(" ")) > 1:
try:
param = content.split(" ")[1].strip()
if param.isdigit():
wait_time = int(param)
self.LOG.info(f"使用自定义等待时间: {wait_time}")
except:
pass
# 检查win_click模块是否可用
if win_click is None:
wcf.send_text("⚠️ 无法执行更新操作,系统缺少必要的组件",
(roomid if roomid else sender), sender)
return True, "缺少win_click模块"
# 发送更新通知
wcf.send_text(f"🔄 系统即将更新并重启,等待时间设置为{wait_time}秒...",
(roomid if roomid else sender), sender)
# 启动更新流程
def update_thread():
try:
self.LOG.info(f"开始系统更新流程,等待时间: {wait_time}")
win_click.update_system(wait_time)
except Exception as e:
self.LOG.error(f"系统更新失败: {e}")
# 在新线程中启动更新,避免阻塞消息处理
threading.Thread(target=update_thread, daemon=True).start()
return True, "系统更新中"
# 插件入口点
def get_plugin():
return SystemUpdaterPlugin()

View File

@@ -1,12 +1,70 @@
import pyautogui
import time
import os
import numpy as np
import subprocess
import sys
import signal
import psutil
from PIL import Image, ImageGrab
import win32gui
import win32con
import win32process
import psutil
def kill_process_by_name(process_name):
"""终止指定名称的进程"""
killed = False
for proc in psutil.process_iter(['pid', 'name']):
try:
if process_name.lower() in proc.info['name'].lower():
print(f"正在终止进程: {proc.info['name']} (PID: {proc.info['pid']})")
try:
process = psutil.Process(proc.info['pid'])
process.terminate()
killed = True
except Exception as e:
print(f"终止进程时出错: {e}")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return killed
def kill_current_python_process():
"""结束当前Python程序"""
current_pid = os.getpid()
parent_pid = psutil.Process(current_pid).ppid()
print(f"当前Python进程PID: {current_pid}, 父进程PID: {parent_pid}")
# 收集需要在退出前结束的进程
pids_to_kill = []
# 查找所有可能与当前进程相关的Python进程
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
# 如果是Python进程并且不是当前进程
if ('python' in proc.info['name'].lower() and
proc.info['pid'] != current_pid and
proc.info['pid'] != parent_pid):
# 检查命令行参数,看是否与当前脚本相关
cmdline = ' '.join(proc.info['cmdline']).lower()
script_name = os.path.basename(__file__).lower()
if script_name in cmdline or "win_click.py" in cmdline:
pids_to_kill.append(proc.info['pid'])
print(f"将结束Python进程: {proc.info['name']} (PID: {proc.info['pid']})")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
# 注册程序退出时要杀死的进程
def on_exit():
for pid in pids_to_kill:
try:
os.kill(pid, signal.SIGTERM)
except:
pass
# 返回退出函数,由调用者决定何时执行
return on_exit
def find_wechat_window():
"""查找微信窗口并将其激活"""
@@ -18,33 +76,34 @@ def find_wechat_window():
if win32gui.IsWindowVisible(hwnd) and "微信" in win32gui.GetWindowText(hwnd):
windows.append(hwnd)
return True
windows = []
win32gui.EnumWindows(callback, windows)
if windows:
wechat_window = windows[0]
# 方法2: 如果窗口标题查找失败,通过进程名查找
if wechat_window == 0:
print("通过窗口标题未找到微信,尝试通过进程查找...")
for proc in psutil.process_iter(['pid', 'name']):
if proc.info['name'] and 'WeChat' in proc.info['name']:
pid = proc.info['pid']
# 查找与此PID关联的窗口
def enum_windows_callback(hwnd, target_pid):
_, found_pid = win32process.GetWindowThreadProcessId(hwnd)
if found_pid == target_pid and win32gui.IsWindowVisible(hwnd):
target_pid.append(hwnd)
return True
target_hwnds = []
win32gui.EnumWindows(lambda hwnd, param: enum_windows_callback(hwnd, target_hwnds), pid)
if target_hwnds:
wechat_window = target_hwnds[0]
break
if wechat_window != 0:
print(f"找到微信窗口: {win32gui.GetWindowText(wechat_window)}")
# 还原最小化的窗口
@@ -57,108 +116,192 @@ def find_wechat_window():
print("未找到微信窗口")
return False
def is_green_pixel(r, g, b):
"""判断像素是否为绿色"""
# 微信的绿色按钮大约是 RGB(7, 193, 96)
return g > 150 and g > r*1.5 and g > b*1.5
return g > 150 and g > r * 1.5 and g > b * 1.5
def find_and_click_wechat_login(max_retries=3, retry_interval=5):
"""查找并点击微信登录按钮
Args:
max_retries: 最大重试次数
retry_interval: 重试间隔(秒)
"""
for attempt in range(max_retries):
# 切换到微信窗口
if not find_wechat_window():
print(f"尝试 {attempt + 1}/{max_retries}: 未找到微信窗口,等待 {retry_interval} 秒后重试...")
time.sleep(retry_interval)
continue
# 等待窗口完全加载
time.sleep(2)
def find_and_click_wechat_login():
"""查找并点击微信登录按钮"""
# 切换到微信窗口
if not find_wechat_window():
print("无法找到微信窗口,正在尝试启动微信...")
try:
# 尝试启动微信(常见安装路径)
wechat_paths = [
r"C:\Program Files (x86)\Tencent\WeChat\WeChat.exe",
r"C:\Program Files\Tencent\WeChat\WeChat.exe",
r"D:\Program Files (x86)\Tencent\WeChat\WeChat.exe",
r"D:\Program Files\Tencent\WeChat\WeChat.exe"
]
for path in wechat_paths:
if os.path.exists(path):
os.startfile(path)
print(f"正在启动微信: {path}")
time.sleep(5) # 等待微信启动
if find_wechat_window():
break
else:
print("未找到微信安装路径,请手动启动微信")
return
print(f"尝试 {attempt + 1}/{max_retries}: 查找登录按钮...")
# 截取屏幕
screenshot = ImageGrab.grab()
# 创建一个绿色区域的掩码
green_areas = []
width, height = screenshot.size
# 分析图像,识别绿色区域
for y in range(0, height, 5): # 每5个像素采样一次以提高效率
for x in range(0, width, 5):
r, g, b = screenshot.getpixel((x, y))
if is_green_pixel(r, g, b):
# 发现绿色像素,向四周扩散检查是否为按钮
left, top, right, bottom = x, y, x, y
# 向右扩散
for nx in range(x, min(x + 300, width)):
r, g, b = screenshot.getpixel((nx, y))
if not is_green_pixel(r, g, b):
break
right = nx
# 向下扩散
for ny in range(y, min(y + 60, height)):
r, g, b = screenshot.getpixel((x, ny))
if not is_green_pixel(r, g, b):
break
bottom = ny
width_area = right - left
height_area = bottom - top
# 如果区域符合按钮尺寸
if 100 < width_area < 300 and 30 < height_area < 60:
green_areas.append((left, top, right, bottom))
login_button_found = False
for left, top, right, bottom in green_areas:
# 点击按钮中心
center_x = (left + right) // 2
center_y = (top + bottom) // 2
pyautogui.click(center_x, center_y)
print(f"已点击位置: ({center_x}, {center_y})")
login_button_found = True
break
# 方法2如果图像识别失败尝试使用固定位置
if not login_button_found:
print("未通过图像识别找到登录按钮,尝试使用备用方法...")
# 获取屏幕分辨率
screen_width, screen_height = pyautogui.size()
# 估计登录按钮位置 (通常在窗口中下部偏右)
button_x = screen_width // 2 # 水平中心
button_y = (screen_height // 2) + 100 # 垂直中心偏下
# 移动到估计位置并点击
pyautogui.click(button_x, button_y)
print(f"已点击估计位置: ({button_x}, {button_y})")
print("点击操作完成")
return True
except Exception as e:
print(f"启动微信时出错: {e}")
return
print(f"尝试 {attempt + 1}/{max_retries} 发生错误: {e}")
if attempt < max_retries - 1:
print(f"等待 {retry_interval} 秒后重试...")
time.sleep(retry_interval)
print(f"经过 {max_retries} 次尝试后仍未成功点击登录按钮")
return False
def update_system(wait_time=15):
"""完整的系统更新流程
# 等待窗口完全加载
time.sleep(2)
try:
print("尝试查找登录按钮...")
# 截取屏幕
screenshot = ImageGrab.grab()
screenshot_np = np.array(screenshot)
# 创建一个绿色区域的掩码
green_areas = []
width, height = screenshot.size
# 分析图像,识别绿色区域
for y in range(0, height, 5): # 每5个像素采样一次以提高效率
for x in range(0, width, 5):
r, g, b = screenshot.getpixel((x, y))
if is_green_pixel(r, g, b):
# 发现绿色像素,向四周扩散检查是否为按钮
left, top, right, bottom = x, y, x, y
# 向右扩散
for nx in range(x, min(x+300, width)):
r, g, b = screenshot.getpixel((nx, y))
if not is_green_pixel(r, g, b):
break
right = nx
# 向下扩散
for ny in range(y, min(y+60, height)):
r, g, b = screenshot.getpixel((x, ny))
if not is_green_pixel(r, g, b):
break
bottom = ny
width_area = right - left
height_area = bottom - top
# 如果区域符合按钮尺寸
if 100 < width_area < 300 and 30 < height_area < 60:
green_areas.append((left, top, right, bottom))
login_button_found = False
for left, top, right, bottom in green_areas:
# 点击按钮中心
center_x = (left + right) // 2
center_y = (top + bottom) // 2
pyautogui.click(center_x, center_y)
print(f"已点击位置: ({center_x}, {center_y})")
login_button_found = True
break
# 方法2如果图像识别失败尝试使用固定位置
if not login_button_found:
print("未通过图像识别找到登录按钮,尝试使用备用方法...")
# 获取屏幕分辨率
screen_width, screen_height = pyautogui.size()
# 估计登录按钮位置 (通常在窗口中下部偏右)
button_x = screen_width // 2 # 水平中心
button_y = (screen_height // 2) + 100 # 垂直中心偏下
# 移动到估计位置并点击
pyautogui.click(button_x, button_y)
print(f"已点击估计位置: ({button_x}, {button_y})")
print("点击操作完成")
except Exception as e:
print(f"发生错误: {e}")
Args:
wait_time: 等待微信启动的秒数默认15秒
"""
print("=== 开始系统更新流程 ===")
# 步骤1: 结束微信进程
print("步骤1: 正在结束微信进程...")
if kill_process_by_name("WeChat"):
print("微信进程已终止")
else:
print("未找到运行中的微信进程")
# 步骤2: 注册结束当前Python进程的函数
exit_handler = kill_current_python_process()
# 步骤3: 准备重启系统
print("步骤3: 准备重启系统...")
# 查找bot_start.bat文件
current_dir = os.path.dirname(os.path.abspath(__file__))
bat_path = os.path.join(current_dir, "bot_start.bat")
if not os.path.exists(bat_path):
# 向上查找一级目录
parent_dir = os.path.dirname(current_dir)
bat_path = os.path.join(parent_dir, "bot_start.bat")
if os.path.exists(bat_path):
print(f"找到启动脚本: {bat_path}")
# 创建一个临时的启动脚本,用于在当前进程结束后启动系统
temp_bat = os.path.join(os.environ.get('TEMP', os.getcwd()), "restart_system.bat")
with open(temp_bat, "w") as f:
f.write(f"""@echo off
echo 等待旧进程结束...
timeout /t 2 /nobreak > nul
echo 重新启动系统...
start "" "{bat_path}"
echo 等待系统启动和微信加载 ({wait_time}秒)...
timeout /t {wait_time} /nobreak > nul
echo 尝试登录微信...
start "" "python" "{os.path.abspath(__file__)}" --login-only
exit
""")
# 启动临时脚本
print("启动临时脚本执行重启...")
# 使用subprocess.Popen而不是run这样不会等待它完成
subprocess.Popen(["cmd", "/c", temp_bat],
shell=True,
creationflags=subprocess.CREATE_NEW_CONSOLE)
# 等待一小段时间确保脚本开始运行
time.sleep(1)
# 结束当前Python相关进程并退出
print("结束当前进程...")
exit_handler()
sys.exit(0)
else:
print(f"未找到启动脚本 bot_start.bat无法自动重启系统")
return False
if __name__ == "__main__":
find_and_click_wechat_login()
# 检查是否只需要执行登录
if "--login-only" in sys.argv:
print("仅执行微信登录...")
find_and_click_wechat_login()
else:
# 显示选项菜单
print("==== 系统工具 ====")
print("1. 查找并点击微信登录")
print("2. 更新系统 (结束微信、更新代码、重启系统、自动登录)")
print("0. 退出")
try:
choice = input("请选择操作 [0-2]: ")
if choice == "1":
find_and_click_wechat_login()
elif choice == "2":
update_system()
else:
print("退出程序")
except KeyboardInterrupt:
print("\n程序被中断")
except Exception as e:
print(f"发生错误: {e}")