From 4ad5a690d9d93903670b61f54c79bb0886e601db Mon Sep 17 00:00:00 2001 From: liuwei Date: Sun, 27 Apr 2025 10:54:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=BB=E9=99=A4Windows=E7=9A=84=E9=87=8D?= =?UTF-8?q?=E5=90=AF=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gewechat/client/get_chatroom_members.py | 2 +- plugins/system_updater/main.py | 51 --- requirements.txt | 1 - win_click.py | 457 ------------------------ 4 files changed, 1 insertion(+), 510 deletions(-) delete mode 100644 win_click.py diff --git a/gewechat/client/get_chatroom_members.py b/gewechat/client/get_chatroom_members.py index bfb78f6..3250289 100644 --- a/gewechat/client/get_chatroom_members.py +++ b/gewechat/client/get_chatroom_members.py @@ -45,7 +45,7 @@ def set_call_back(): payload = json.dumps({ "token": "cb43f52db27e4a56bb6ec7da54373582", - "callbackUrl": "http://192.168.2.210:8999/gewechat/message/callback" + "callbackUrl": "http://192.168.2.212:8999/gewechat/message/callback" }) headers = { 'X-GEWE-TOKEN': 'cb43f52db27e4a56bb6ec7da54373582', diff --git a/plugins/system_updater/main.py b/plugins/system_updater/main.py index 654d531..a292509 100644 --- a/plugins/system_updater/main.py +++ b/plugins/system_updater/main.py @@ -1,7 +1,5 @@ # -*- coding: utf-8 -*- import logging -import os -import threading from typing import Dict, Any, List, Optional, Tuple from plugin_common.message_plugin_interface import MessagePluginInterface @@ -9,36 +7,6 @@ from plugin_common.plugin_interface import PluginStatus from utils.robot_cmd.robot_command import Feature, PermissionStatus -# 动态导入win_click.py -def import_win_click(): - """动态导入win_click模块""" - try: - # 首先尝试从项目根目录导入 - project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) - win_click_path = os.path.join(project_root, "win_click.py") - - if not os.path.exists(win_click_path): - # 尝试在utils目录下查找 - win_click_path = os.path.join(project_root, "utils", "win_click.py") - - if os.path.exists(win_click_path): - import importlib.util - import sys - - spec = importlib.util.spec_from_file_location("win_click", win_click_path) - win_click = importlib.util.module_from_spec(spec) - sys.modules["win_click"] = win_click - spec.loader.exec_module(win_click) - return win_click - else: - return None - except Exception as e: - print(f"导入win_click模块失败: {e}") - return None - - -win_click = import_win_click() - class SystemUpdaterPlugin(MessagePluginInterface): """系统更新插件""" @@ -88,9 +56,6 @@ class SystemUpdaterPlugin(MessagePluginInterface): self.admin_wxids = plugin_config.get("admin_wxids", []) self.enable = plugin_config.get("enable", True) - # 检查win_click模块是否可用 - if win_click is None: - self.LOG.error("无法导入win_click模块,插件功能将受限") self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True @@ -152,26 +117,10 @@ class SystemUpdaterPlugin(MessagePluginInterface): except: pass - # 检查win_click模块是否可用 - if win_click is None: - self.message_util.send_text("⚠️ 无法执行更新操作,系统缺少必要的组件", - (roomid if roomid else sender), sender) - return True, "缺少win_click模块" - # 发送更新通知 self.message_util.send_text(f"🔄 系统即将更新并重启,等待时间设置为{wait_time}秒...", (roomid if roomid else sender), sender) - # 启动更新流程 - def update_thread(): - try: - self.LOG.info(f"开始系统更新流程,等待时间: {wait_time}秒") - win_click.update_system(wait_time) - except Exception as e: - self.LOG.error(f"系统更新失败: {e}") - - # 在新线程中启动更新,避免阻塞消息处理 - threading.Thread(target=update_thread, daemon=True).start() return True, "系统更新中" diff --git a/requirements.txt b/requirements.txt index 740a13b..6a2f4ae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -40,7 +40,6 @@ tomli~=2.2.1 PyAutoGUI~=0.9.54 psutil~=6.1.1 numpy~=1.26.4 -pywin32==306 gewechat-client==0.1.5 fastapi~=0.115.12 diff --git a/win_click.py b/win_click.py deleted file mode 100644 index a7790e1..0000000 --- a/win_click.py +++ /dev/null @@ -1,457 +0,0 @@ -import pyautogui -import time -import os -import subprocess -import sys -import signal -import psutil -from PIL import Image, ImageGrab -import win32gui -import win32con -import win32process - - -def kill_process_by_name(process_name): - """终止指定名称的进程""" - killed = False - for proc in psutil.process_iter(['pid', 'name']): - try: - if process_name.lower() in proc.info['name'].lower(): - print(f"正在终止进程: {proc.info['name']} (PID: {proc.info['pid']})") - try: - process = psutil.Process(proc.info['pid']) - process.terminate() - killed = True - except Exception as e: - print(f"终止进程时出错: {e}") - except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): - pass - return killed - - -def kill_current_python_process(): - """结束当前Python程序及相关进程""" - current_pid = os.getpid() - parent_pid = psutil.Process(current_pid).ppid() - print(f"当前Python进程PID: {current_pid}, 父进程PID: {parent_pid}") - - # 标记当前进程自己 - this_process = psutil.Process(current_pid) - - # 收集需要结束的进程列表 - processes_to_kill = [] - - # 查找所有Python进程和cmd进程 - print("查找所有相关进程...") - for proc in psutil.process_iter(['pid', 'name', 'cmdline']): - try: - proc_pid = proc.info['pid'] - proc_name = proc.info['name'].lower() if proc.info['name'] else "" - - # 跳过当前进程 - if proc_pid == current_pid: - continue - - # 检查是否是Python相关进程 - if 'python' in proc_name or proc_name.endswith('.exe'): - try: - # 获取完整进程对象和命令行 - full_proc = psutil.Process(proc_pid) - cmdline = ' '.join(full_proc.cmdline()).lower() if full_proc.cmdline() else '' - - # 检查是否是相关进程 - 更广泛的匹配条件 - if ('python' in cmdline or - 'wechatrobot' in cmdline or - 'robot' in cmdline or - 'main' in cmdline or - 'win_click' in cmdline or - os.path.dirname(os.path.abspath(__file__)).lower() in cmdline.lower()): - processes_to_kill.append(full_proc) - print(f"找到相关Python进程: {full_proc.name()} (PID: {proc_pid}), 命令行: {cmdline[:50]}...") - except Exception as e: - print(f"检查进程 {proc_pid} 时出错: {e}") - - # 检查是否是cmd进程或与bot_start相关的进程 - if proc_name in ['cmd.exe', 'powershell.exe', 'conhost.exe'] or 'cmd' in proc_name: - try: - full_proc = psutil.Process(proc_pid) - cmdline = ' '.join(full_proc.cmdline()).lower() if full_proc.cmdline() else '' - - # 检查是否是与bot_start或restart_system相关的cmd进程 - if ('bot_start' in cmdline or 'restart_system' in cmdline or - os.path.dirname(os.path.abspath(__file__)).lower() in cmdline.lower()): - processes_to_kill.append(full_proc) - print(f"找到相关CMD进程: {full_proc.name()} (PID: {proc_pid}), 命令行: {cmdline[:50]}...") - except Exception as e: - print(f"检查CMD进程 {proc_pid} 时出错: {e}") - - except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e: - print(f"检查进程时出错: {e}") - continue - - # 确保不杀死当前进程和系统进程 - processes_to_kill = [p for p in processes_to_kill - if p.pid != current_pid and p.pid != 1 and p.pid != parent_pid] - - # 按进程树层级排序(子进程先杀) - def get_tree_depth(process): - depth = 0 - try: - parent_id = process.ppid() - while parent_id != 0 and parent_id != 1: - depth += 1 - parent = psutil.Process(parent_id) - parent_id = parent.ppid() - except: - pass - return depth - - # 排序进程,子进程先终止 - processes_to_kill.sort(key=get_tree_depth, reverse=True) - - # 定义终止进程的函数 - def terminate_processes(): - print(f"开始终止 {len(processes_to_kill)} 个相关进程...") - killed_pids = [] - - # 首先尝试温和终止所有进程 - for proc in processes_to_kill: - try: - if proc.is_running() and proc.pid not in killed_pids: - print(f"温和终止进程: {proc.name()} (PID: {proc.pid})") - proc.terminate() - killed_pids.append(proc.pid) - except Exception as e: - print(f"温和终止进程 {proc.pid} 时出错: {e}") - - # 等待一段时间让进程自行终止 - print("等待进程自行终止...") - time.sleep(2) - - # 强制终止仍然存活的进程 - for proc in processes_to_kill: - try: - if proc.is_running() and proc.pid not in killed_pids: - print(f"强制终止进程: {proc.name()} (PID: {proc.pid})") - proc.kill() - killed_pids.append(proc.pid) - except Exception as e: - print(f"强制终止进程 {proc.pid} 时出错: {e}") - - # 确保进程终止 - still_running = [] - for proc in processes_to_kill: - try: - if proc.is_running(): - still_running.append(proc) - except: - pass - - # 使用操作系统级命令终止顽固进程 - if still_running: - print(f"仍有 {len(still_running)} 个进程未终止,尝试使用系统命令终止...") - for proc in still_running: - try: - # 使用taskkill强制终止进程和子进程 - os.system(f"taskkill /F /PID {proc.pid} /T") - print(f"使用taskkill终止进程及其子进程: PID {proc.pid}") - except Exception as e: - print(f"使用taskkill终止进程 {proc.pid} 时出错: {e}") - - # 全局终止所有Python进程(最后的保险措施) - print("使用系统命令终止所有残留的Python进程...") - os.system("taskkill /F /IM python.exe /T 2>nul") - os.system("taskkill /F /IM pythonw.exe /T 2>nul") - - print("进程终止操作完成") - - # 返回终止进程的函数 - return terminate_processes - - -def find_wechat_window(): - """查找微信窗口并将其激活""" - # 方法1: 通过窗口标题查找 - wechat_window = win32gui.FindWindow(None, "微信") - if wechat_window == 0: - # 备用方法: 尝试查找部分标题匹配的窗口 - def callback(hwnd, windows): - if win32gui.IsWindowVisible(hwnd) and "微信" in win32gui.GetWindowText(hwnd): - windows.append(hwnd) - return True - - windows = [] - win32gui.EnumWindows(callback, windows) - - if windows: - wechat_window = windows[0] - - # 方法2: 如果窗口标题查找失败,通过进程名查找 - if wechat_window == 0: - print("通过窗口标题未找到微信,尝试通过进程查找...") - for proc in psutil.process_iter(['pid', 'name']): - if proc.info['name'] and 'WeChat' in proc.info['name']: - pid = proc.info['pid'] - - # 查找与此PID关联的窗口 - def enum_windows_callback(hwnd, target_pid): - _, found_pid = win32process.GetWindowThreadProcessId(hwnd) - if found_pid == target_pid and win32gui.IsWindowVisible(hwnd): - target_pid.append(hwnd) - return True - - target_hwnds = [] - win32gui.EnumWindows(lambda hwnd, param: enum_windows_callback(hwnd, target_hwnds), pid) - - if target_hwnds: - wechat_window = target_hwnds[0] - break - - if wechat_window != 0: - print(f"找到微信窗口: {win32gui.GetWindowText(wechat_window)}") - # 还原最小化的窗口 - if win32gui.IsIconic(wechat_window): - win32gui.ShowWindow(wechat_window, win32con.SW_RESTORE) - # 激活并前置窗口 - win32gui.SetForegroundWindow(wechat_window) - return True - else: - print("未找到微信窗口") - return False - - -def is_green_pixel(r, g, b): - """判断像素是否为绿色""" - # 微信的绿色按钮大约是 RGB(7, 193, 96) - return g > 150 and g > r * 1.5 and g > b * 1.5 - - -def find_and_click_wechat_login(max_retries=3, retry_interval=5): - """查找并点击微信登录按钮 - - Args: - max_retries: 最大重试次数 - retry_interval: 重试间隔(秒) - """ - for attempt in range(max_retries): - # 切换到微信窗口 - if not find_wechat_window(): - print(f"尝试 {attempt + 1}/{max_retries}: 未找到微信窗口,等待 {retry_interval} 秒后重试...") - time.sleep(retry_interval) - continue - - # 等待窗口完全加载 - time.sleep(2) - - try: - print(f"尝试 {attempt + 1}/{max_retries}: 查找登录按钮...") - # 截取屏幕 - screenshot = ImageGrab.grab() - - # 创建一个绿色区域的掩码 - green_areas = [] - width, height = screenshot.size - - # 分析图像,识别绿色区域 - for y in range(0, height, 5): # 每5个像素采样一次以提高效率 - for x in range(0, width, 5): - r, g, b = screenshot.getpixel((x, y)) - if is_green_pixel(r, g, b): - # 发现绿色像素,向四周扩散检查是否为按钮 - left, top, right, bottom = x, y, x, y - # 向右扩散 - for nx in range(x, min(x + 300, width)): - r, g, b = screenshot.getpixel((nx, y)) - if not is_green_pixel(r, g, b): - break - right = nx - # 向下扩散 - for ny in range(y, min(y + 60, height)): - r, g, b = screenshot.getpixel((x, ny)) - if not is_green_pixel(r, g, b): - break - bottom = ny - - width_area = right - left - height_area = bottom - top - - # 如果区域符合按钮尺寸 - if 100 < width_area < 300 and 30 < height_area < 60: - green_areas.append((left, top, right, bottom)) - - login_button_found = False - for left, top, right, bottom in green_areas: - # 点击按钮中心 - center_x = (left + right) // 2 - center_y = (top + bottom) // 2 - pyautogui.click(center_x, center_y) - print(f"已点击位置: ({center_x}, {center_y})") - login_button_found = True - break - - # 方法2:如果图像识别失败,尝试使用固定位置 - if not login_button_found: - print("未通过图像识别找到登录按钮,尝试使用备用方法...") - - # 获取屏幕分辨率 - screen_width, screen_height = pyautogui.size() - - # 估计登录按钮位置 (通常在窗口中下部偏右) - button_x = screen_width // 2 # 水平中心 - button_y = (screen_height // 2) + 100 # 垂直中心偏下 - - # 移动到估计位置并点击 - pyautogui.click(button_x, button_y) - print(f"已点击估计位置: ({button_x}, {button_y})") - - print("点击操作完成") - return True - - except Exception as e: - print(f"尝试 {attempt + 1}/{max_retries} 发生错误: {e}") - if attempt < max_retries - 1: - print(f"等待 {retry_interval} 秒后重试...") - time.sleep(retry_interval) - - print(f"经过 {max_retries} 次尝试后仍未成功点击登录按钮") - return False - - -def update_system(wait_time=15): - """完整的系统更新流程""" - print("=== 开始系统更新流程 ===") - - # 步骤1: 结束微信进程 - print("步骤1: 正在结束微信进程...") - if kill_process_by_name("WeChat"): - print("微信进程已终止") - else: - print("未找到运行中的微信进程") - - # 步骤2: 获取终止当前Python进程的函数 - terminate_processes = kill_current_python_process() - - # 步骤3: 准备重启系统 - print("步骤3: 准备重启系统...") - - # 查找bot_start.bat文件 - current_dir = os.path.dirname(os.path.abspath(__file__)) - bat_path = os.path.join(current_dir, "bot_start.bat") - - if not os.path.exists(bat_path): - # 向上查找一级目录 - parent_dir = os.path.dirname(current_dir) - bat_path = os.path.join(parent_dir, "bot_start.bat") - - if os.path.exists(bat_path): - print(f"找到启动脚本: {bat_path}") - - # 创建一个临时的启动脚本,用于在当前进程结束后启动系统 - temp_bat = os.path.join(os.environ.get('TEMP', os.getcwd()), "restart_system.bat") - with open(temp_bat, "w") as f: - f.write(f"""@echo off -echo 等待旧进程结束... -timeout /t 5 /nobreak > nul -echo 强制终止残留Python进程... -taskkill /F /IM python.exe /T 2>nul -taskkill /F /IM pythonw.exe /T 2>nul -timeout /t 2 /nobreak > nul -echo 再次确认无残留进程... -taskkill /F /IM python.exe /T 2>nul -taskkill /F /IM pythonw.exe /T 2>nul -echo 重新启动系统... -cd /d "{os.path.dirname(bat_path)}" -call "{bat_path}" -echo 等待系统启动和微信加载 ({wait_time}秒)... -timeout /t {wait_time} /nobreak > nul -echo 尝试登录微信... -start "" "python" "{os.path.abspath(__file__)}" --login-only -echo 清理临时脚本... -(goto) 2>nul & del "%~f0" & exit -""") - - # 确保临时脚本具有执行权限 - os.chmod(temp_bat, 0o755) - - print(f"启动临时脚本执行重启: {temp_bat}") - - # 创建分离进程运行批处理 - try: - # 方法1: 使用START命令启动独立进程 - os.system(f'start "" "{temp_bat}"') - - # 方法2: 作为备份机制,也使用subprocess启动 - startupinfo = subprocess.STARTUPINFO() - startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW - subprocess.Popen( - temp_bat, - shell=True, - creationflags=subprocess.CREATE_NEW_PROCESS_GROUP | subprocess.DETACHED_PROCESS, - startupinfo=startupinfo - ) - - print("已通过两种方法启动临时脚本") - except Exception as e: - print(f"启动临时脚本时出错: {e}") - # 尝试直接回退到启动原始脚本 - try: - print(f"尝试直接启动原始脚本: {bat_path}") - original_dir = os.getcwd() - os.chdir(os.path.dirname(bat_path)) - subprocess.Popen( - f'cmd /c "{bat_path}"', - shell=True, - creationflags=subprocess.CREATE_NEW_PROCESS_GROUP | subprocess.DETACHED_PROCESS - ) - os.chdir(original_dir) - except Exception as e2: - print(f"启动原始脚本时出错: {e2}") - - # 等待确保脚本启动 - print("等待4秒确保启动脚本已开始执行...") - time.sleep(4) - - # 多重终止保障 - try: - # 确保使用系统命令终止所有Python进程 - print("使用系统命令终止所有Python相关进程...") - os.system("taskkill /F /IM python.exe /T 2>nul") - os.system("taskkill /F /IM pythonw.exe /T 2>nul") - - # 结束当前Python相关进程 - print("使用psutil终止当前进程...") - terminate_processes() - except Exception as e: - print(f"终止进程时出错: {e}") - - print("强制退出当前进程") - os._exit(0) # 使用os._exit确保立即退出 - else: - print(f"未找到启动脚本 bot_start.bat,无法自动重启系统") - return False - - -if __name__ == "__main__": - # 检查是否只需要执行登录 - if "--login-only" in sys.argv: - print("仅执行微信登录...") - find_and_click_wechat_login() - else: - # 显示选项菜单 - print("==== 系统工具 ====") - print("1. 查找并点击微信登录") - print("2. 更新系统 (结束微信、更新代码、重启系统、自动登录)") - print("0. 退出") - - try: - choice = input("请选择操作 [0-2]: ") - if choice == "1": - find_and_click_wechat_login() - elif choice == "2": - update_system() - else: - print("退出程序") - except KeyboardInterrupt: - print("\n程序被中断") - except Exception as e: - print(f"发生错误: {e}")