From 45aeb2096004d37c75b16a97b97d2895aaa80302 Mon Sep 17 00:00:00 2001 From: liuwei Date: Mon, 31 Mar 2025 13:37:19 +0800 Subject: [PATCH] =?UTF-8?q?=E8=87=AA=E5=8A=A8=E6=9B=B4=E6=96=B0=E8=84=9A?= =?UTF-8?q?=E6=9C=AC=E7=BC=96=E5=86=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugins/system_updater/__init__.py | 2 + plugins/system_updater/config.toml | 6 + plugins/system_updater/main.py | 183 +++++++++++++++ win_click.py | 351 ++++++++++++++++++++--------- 4 files changed, 438 insertions(+), 104 deletions(-) create mode 100644 plugins/system_updater/__init__.py create mode 100644 plugins/system_updater/config.toml create mode 100644 plugins/system_updater/main.py diff --git a/plugins/system_updater/__init__.py b/plugins/system_updater/__init__.py new file mode 100644 index 0000000..1ab2939 --- /dev/null +++ b/plugins/system_updater/__init__.py @@ -0,0 +1,2 @@ +# 系统更新插件 +from .main import get_plugin \ No newline at end of file diff --git a/plugins/system_updater/config.toml b/plugins/system_updater/config.toml new file mode 100644 index 0000000..0775bac --- /dev/null +++ b/plugins/system_updater/config.toml @@ -0,0 +1,6 @@ +[SystemUpdater] +enable = true +commands = ["更新系统", "系统更新", "重启系统", "更新重启"] +wait_time = 15 +# 设置管理员微信ID,只有这些ID可以执行更新操作 +admin_wxids = ["Jyunere"] # 在此添加管理员微信ID,例如 ["wxid_123456", "wxid_abcdef"] \ No newline at end of file diff --git a/plugins/system_updater/main.py b/plugins/system_updater/main.py new file mode 100644 index 0000000..143acaa --- /dev/null +++ b/plugins/system_updater/main.py @@ -0,0 +1,183 @@ +# -*- coding: utf-8 -*- +import logging +import os +import threading +from typing import Dict, Any, List, Optional, Tuple + +from plugin_common.message_plugin_interface import MessagePluginInterface +from plugin_common.plugin_interface import PluginStatus +from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager + + +# 动态导入win_click.py +def import_win_click(): + """动态导入win_click模块""" + try: + # 首先尝试从项目根目录导入 + project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) + win_click_path = os.path.join(project_root, "win_click.py") + + if not os.path.exists(win_click_path): + # 尝试在utils目录下查找 + win_click_path = os.path.join(project_root, "utils", "win_click.py") + + if os.path.exists(win_click_path): + import importlib.util + import sys + + spec = importlib.util.spec_from_file_location("win_click", win_click_path) + win_click = importlib.util.module_from_spec(spec) + sys.modules["win_click"] = win_click + spec.loader.exec_module(win_click) + return win_click + else: + return None + except Exception as e: + print(f"导入win_click模块失败: {e}") + return None + + +win_click = import_win_click() + + +class SystemUpdaterPlugin(MessagePluginInterface): + """系统更新插件""" + + @property + def name(self) -> str: + return "系统更新" + + @property + def version(self) -> str: + return "1.0.0" + + @property + def description(self) -> str: + return "提供系统更新和自动登录功能" + + @property + def author(self) -> str: + return "AI Assistant" + + @property + def command_prefix(self) -> Optional[str]: + return "" + + @property + def commands(self) -> List[str]: + return self._commands + + def __init__(self): + super().__init__() + self.admin_wxids = [] + self.wait_time = 15 # 默认等待15秒 + + def initialize(self, context: Dict[str, Any]) -> bool: + """初始化插件""" + self.LOG = logging.getLogger(f"Plugin.{self.name}") + self.LOG.info(f"正在初始化 {self.name} 插件...") + + # 保存上下文对象 + self.wcf = context.get("wcf") + self.message_util = context.get("message_util") + self.config = self._config + + # 从配置中获取命令和其他设置 + plugin_config = self._config.get("SystemUpdater", {}) + self._commands = plugin_config.get("commands", ["更新系统", "系统更新"]) + self.wait_time = plugin_config.get("wait_time", 15) + self.admin_wxids = plugin_config.get("admin_wxids", []) + self.enable = plugin_config.get("enable", True) + + # 检查win_click模块是否可用 + if win_click is None: + self.LOG.error("无法导入win_click模块,插件功能将受限") + + self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") + return True + + def start(self) -> bool: + self.status = PluginStatus.RUNNING + return True + + def stop(self) -> bool: + self.status = PluginStatus.STOPPED + return True + + def can_process(self, message: Dict[str, Any]) -> bool: + """检查是否可以处理该消息""" + if not self.enable: + return False + + content = str(message.get("content", "")).strip() + + # 检查是否是命令 + if content in self._commands: + return True + + # 检查是否是带参数的更新命令 + for cmd in self._commands: + if content.startswith(f"{cmd} "): + return True + + return False + + def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """处理消息""" + content = str(message.get("content", "")).strip() + sender = message.get("sender") + roomid = message.get("roomid", "") + wcf = message.get("wcf") + gbm = message.get("gbm", None) + + # 检查权限 + if self.admin_wxids and sender not in self.admin_wxids: + wcf.send_text("⚠️ 您没有执行此操作的权限", + (roomid if roomid else sender), sender) + return True, "无权限" + + # 如果是群消息,检查群权限 + if roomid and gbm and hasattr(gbm, 'get_group_permission'): + if gbm.get_group_permission(roomid, Feature.ROBOT) == PermissionStatus.DISABLED: + return False, "机器人功能已禁用" + + # 提取等待时间参数 + wait_time = self.wait_time + command = content.split(" ")[0] + + if len(content.split(" ")) > 1: + try: + param = content.split(" ")[1].strip() + if param.isdigit(): + wait_time = int(param) + self.LOG.info(f"使用自定义等待时间: {wait_time}秒") + except: + pass + + # 检查win_click模块是否可用 + if win_click is None: + wcf.send_text("⚠️ 无法执行更新操作,系统缺少必要的组件", + (roomid if roomid else sender), sender) + return True, "缺少win_click模块" + + # 发送更新通知 + wcf.send_text(f"🔄 系统即将更新并重启,等待时间设置为{wait_time}秒...", + (roomid if roomid else sender), sender) + + # 启动更新流程 + def update_thread(): + try: + self.LOG.info(f"开始系统更新流程,等待时间: {wait_time}秒") + win_click.update_system(wait_time) + except Exception as e: + self.LOG.error(f"系统更新失败: {e}") + + # 在新线程中启动更新,避免阻塞消息处理 + threading.Thread(target=update_thread, daemon=True).start() + + return True, "系统更新中" + + +# 插件入口点 +def get_plugin(): + return SystemUpdaterPlugin() diff --git a/win_click.py b/win_click.py index 6ce3519..e6a6fd5 100644 --- a/win_click.py +++ b/win_click.py @@ -1,12 +1,70 @@ import pyautogui import time import os -import numpy as np +import subprocess +import sys +import signal +import psutil from PIL import Image, ImageGrab import win32gui import win32con import win32process -import psutil + + +def kill_process_by_name(process_name): + """终止指定名称的进程""" + killed = False + for proc in psutil.process_iter(['pid', 'name']): + try: + if process_name.lower() in proc.info['name'].lower(): + print(f"正在终止进程: {proc.info['name']} (PID: {proc.info['pid']})") + try: + process = psutil.Process(proc.info['pid']) + process.terminate() + killed = True + except Exception as e: + print(f"终止进程时出错: {e}") + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + pass + return killed + + +def kill_current_python_process(): + """结束当前Python程序""" + current_pid = os.getpid() + parent_pid = psutil.Process(current_pid).ppid() + print(f"当前Python进程PID: {current_pid}, 父进程PID: {parent_pid}") + + # 收集需要在退出前结束的进程 + pids_to_kill = [] + + # 查找所有可能与当前进程相关的Python进程 + for proc in psutil.process_iter(['pid', 'name', 'cmdline']): + try: + # 如果是Python进程并且不是当前进程 + if ('python' in proc.info['name'].lower() and + proc.info['pid'] != current_pid and + proc.info['pid'] != parent_pid): + # 检查命令行参数,看是否与当前脚本相关 + cmdline = ' '.join(proc.info['cmdline']).lower() + script_name = os.path.basename(__file__).lower() + if script_name in cmdline or "win_click.py" in cmdline: + pids_to_kill.append(proc.info['pid']) + print(f"将结束Python进程: {proc.info['name']} (PID: {proc.info['pid']})") + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + pass + + # 注册程序退出时要杀死的进程 + def on_exit(): + for pid in pids_to_kill: + try: + os.kill(pid, signal.SIGTERM) + except: + pass + + # 返回退出函数,由调用者决定何时执行 + return on_exit + def find_wechat_window(): """查找微信窗口并将其激活""" @@ -18,33 +76,34 @@ def find_wechat_window(): if win32gui.IsWindowVisible(hwnd) and "微信" in win32gui.GetWindowText(hwnd): windows.append(hwnd) return True - + windows = [] win32gui.EnumWindows(callback, windows) - + if windows: wechat_window = windows[0] - + # 方法2: 如果窗口标题查找失败,通过进程名查找 if wechat_window == 0: print("通过窗口标题未找到微信,尝试通过进程查找...") for proc in psutil.process_iter(['pid', 'name']): if proc.info['name'] and 'WeChat' in proc.info['name']: pid = proc.info['pid'] + # 查找与此PID关联的窗口 def enum_windows_callback(hwnd, target_pid): _, found_pid = win32process.GetWindowThreadProcessId(hwnd) if found_pid == target_pid and win32gui.IsWindowVisible(hwnd): target_pid.append(hwnd) return True - + target_hwnds = [] win32gui.EnumWindows(lambda hwnd, param: enum_windows_callback(hwnd, target_hwnds), pid) - + if target_hwnds: wechat_window = target_hwnds[0] break - + if wechat_window != 0: print(f"找到微信窗口: {win32gui.GetWindowText(wechat_window)}") # 还原最小化的窗口 @@ -57,108 +116,192 @@ def find_wechat_window(): print("未找到微信窗口") return False + def is_green_pixel(r, g, b): """判断像素是否为绿色""" # 微信的绿色按钮大约是 RGB(7, 193, 96) - return g > 150 and g > r*1.5 and g > b*1.5 + return g > 150 and g > r * 1.5 and g > b * 1.5 + + +def find_and_click_wechat_login(max_retries=3, retry_interval=5): + """查找并点击微信登录按钮 + + Args: + max_retries: 最大重试次数 + retry_interval: 重试间隔(秒) + """ + for attempt in range(max_retries): + # 切换到微信窗口 + if not find_wechat_window(): + print(f"尝试 {attempt + 1}/{max_retries}: 未找到微信窗口,等待 {retry_interval} 秒后重试...") + time.sleep(retry_interval) + continue + + # 等待窗口完全加载 + time.sleep(2) -def find_and_click_wechat_login(): - """查找并点击微信登录按钮""" - # 切换到微信窗口 - if not find_wechat_window(): - print("无法找到微信窗口,正在尝试启动微信...") try: - # 尝试启动微信(常见安装路径) - wechat_paths = [ - r"C:\Program Files (x86)\Tencent\WeChat\WeChat.exe", - r"C:\Program Files\Tencent\WeChat\WeChat.exe", - r"D:\Program Files (x86)\Tencent\WeChat\WeChat.exe", - r"D:\Program Files\Tencent\WeChat\WeChat.exe" - ] - - for path in wechat_paths: - if os.path.exists(path): - os.startfile(path) - print(f"正在启动微信: {path}") - time.sleep(5) # 等待微信启动 - if find_wechat_window(): - break - else: - print("未找到微信安装路径,请手动启动微信") - return + print(f"尝试 {attempt + 1}/{max_retries}: 查找登录按钮...") + # 截取屏幕 + screenshot = ImageGrab.grab() + + # 创建一个绿色区域的掩码 + green_areas = [] + width, height = screenshot.size + + # 分析图像,识别绿色区域 + for y in range(0, height, 5): # 每5个像素采样一次以提高效率 + for x in range(0, width, 5): + r, g, b = screenshot.getpixel((x, y)) + if is_green_pixel(r, g, b): + # 发现绿色像素,向四周扩散检查是否为按钮 + left, top, right, bottom = x, y, x, y + # 向右扩散 + for nx in range(x, min(x + 300, width)): + r, g, b = screenshot.getpixel((nx, y)) + if not is_green_pixel(r, g, b): + break + right = nx + # 向下扩散 + for ny in range(y, min(y + 60, height)): + r, g, b = screenshot.getpixel((x, ny)) + if not is_green_pixel(r, g, b): + break + bottom = ny + + width_area = right - left + height_area = bottom - top + + # 如果区域符合按钮尺寸 + if 100 < width_area < 300 and 30 < height_area < 60: + green_areas.append((left, top, right, bottom)) + + login_button_found = False + for left, top, right, bottom in green_areas: + # 点击按钮中心 + center_x = (left + right) // 2 + center_y = (top + bottom) // 2 + pyautogui.click(center_x, center_y) + print(f"已点击位置: ({center_x}, {center_y})") + login_button_found = True + break + + # 方法2:如果图像识别失败,尝试使用固定位置 + if not login_button_found: + print("未通过图像识别找到登录按钮,尝试使用备用方法...") + + # 获取屏幕分辨率 + screen_width, screen_height = pyautogui.size() + + # 估计登录按钮位置 (通常在窗口中下部偏右) + button_x = screen_width // 2 # 水平中心 + button_y = (screen_height // 2) + 100 # 垂直中心偏下 + + # 移动到估计位置并点击 + pyautogui.click(button_x, button_y) + print(f"已点击估计位置: ({button_x}, {button_y})") + + print("点击操作完成") + return True + except Exception as e: - print(f"启动微信时出错: {e}") - return + print(f"尝试 {attempt + 1}/{max_retries} 发生错误: {e}") + if attempt < max_retries - 1: + print(f"等待 {retry_interval} 秒后重试...") + time.sleep(retry_interval) + + print(f"经过 {max_retries} 次尝试后仍未成功点击登录按钮") + return False + + +def update_system(wait_time=15): + """完整的系统更新流程 - # 等待窗口完全加载 - time.sleep(2) - - try: - print("尝试查找登录按钮...") - # 截取屏幕 - screenshot = ImageGrab.grab() - screenshot_np = np.array(screenshot) - - # 创建一个绿色区域的掩码 - green_areas = [] - width, height = screenshot.size - - # 分析图像,识别绿色区域 - for y in range(0, height, 5): # 每5个像素采样一次以提高效率 - for x in range(0, width, 5): - r, g, b = screenshot.getpixel((x, y)) - if is_green_pixel(r, g, b): - # 发现绿色像素,向四周扩散检查是否为按钮 - left, top, right, bottom = x, y, x, y - # 向右扩散 - for nx in range(x, min(x+300, width)): - r, g, b = screenshot.getpixel((nx, y)) - if not is_green_pixel(r, g, b): - break - right = nx - # 向下扩散 - for ny in range(y, min(y+60, height)): - r, g, b = screenshot.getpixel((x, ny)) - if not is_green_pixel(r, g, b): - break - bottom = ny - - width_area = right - left - height_area = bottom - top - - # 如果区域符合按钮尺寸 - if 100 < width_area < 300 and 30 < height_area < 60: - green_areas.append((left, top, right, bottom)) - - login_button_found = False - for left, top, right, bottom in green_areas: - # 点击按钮中心 - center_x = (left + right) // 2 - center_y = (top + bottom) // 2 - pyautogui.click(center_x, center_y) - print(f"已点击位置: ({center_x}, {center_y})") - login_button_found = True - break - - # 方法2:如果图像识别失败,尝试使用固定位置 - if not login_button_found: - print("未通过图像识别找到登录按钮,尝试使用备用方法...") - - # 获取屏幕分辨率 - screen_width, screen_height = pyautogui.size() - - # 估计登录按钮位置 (通常在窗口中下部偏右) - button_x = screen_width // 2 # 水平中心 - button_y = (screen_height // 2) + 100 # 垂直中心偏下 - - # 移动到估计位置并点击 - pyautogui.click(button_x, button_y) - print(f"已点击估计位置: ({button_x}, {button_y})") - - print("点击操作完成") - - except Exception as e: - print(f"发生错误: {e}") + Args: + wait_time: 等待微信启动的秒数,默认15秒 + """ + print("=== 开始系统更新流程 ===") + + # 步骤1: 结束微信进程 + print("步骤1: 正在结束微信进程...") + if kill_process_by_name("WeChat"): + print("微信进程已终止") + else: + print("未找到运行中的微信进程") + + # 步骤2: 注册结束当前Python进程的函数 + exit_handler = kill_current_python_process() + + # 步骤3: 准备重启系统 + print("步骤3: 准备重启系统...") + + # 查找bot_start.bat文件 + current_dir = os.path.dirname(os.path.abspath(__file__)) + bat_path = os.path.join(current_dir, "bot_start.bat") + + if not os.path.exists(bat_path): + # 向上查找一级目录 + parent_dir = os.path.dirname(current_dir) + bat_path = os.path.join(parent_dir, "bot_start.bat") + + if os.path.exists(bat_path): + print(f"找到启动脚本: {bat_path}") + + # 创建一个临时的启动脚本,用于在当前进程结束后启动系统 + temp_bat = os.path.join(os.environ.get('TEMP', os.getcwd()), "restart_system.bat") + with open(temp_bat, "w") as f: + f.write(f"""@echo off +echo 等待旧进程结束... +timeout /t 2 /nobreak > nul +echo 重新启动系统... +start "" "{bat_path}" +echo 等待系统启动和微信加载 ({wait_time}秒)... +timeout /t {wait_time} /nobreak > nul +echo 尝试登录微信... +start "" "python" "{os.path.abspath(__file__)}" --login-only +exit +""") + + # 启动临时脚本 + print("启动临时脚本执行重启...") + # 使用subprocess.Popen而不是run,这样不会等待它完成 + subprocess.Popen(["cmd", "/c", temp_bat], + shell=True, + creationflags=subprocess.CREATE_NEW_CONSOLE) + + # 等待一小段时间确保脚本开始运行 + time.sleep(1) + + # 结束当前Python相关进程并退出 + print("结束当前进程...") + exit_handler() + sys.exit(0) + else: + print(f"未找到启动脚本 bot_start.bat,无法自动重启系统") + return False + if __name__ == "__main__": - find_and_click_wechat_login() \ No newline at end of file + # 检查是否只需要执行登录 + if "--login-only" in sys.argv: + print("仅执行微信登录...") + find_and_click_wechat_login() + else: + # 显示选项菜单 + print("==== 系统工具 ====") + print("1. 查找并点击微信登录") + print("2. 更新系统 (结束微信、更新代码、重启系统、自动登录)") + print("0. 退出") + + try: + choice = input("请选择操作 [0-2]: ") + if choice == "1": + find_and_click_wechat_login() + elif choice == "2": + update_system() + else: + print("退出程序") + except KeyboardInterrupt: + print("\n程序被中断") + except Exception as e: + print(f"发生错误: {e}")