429 lines
16 KiB
Python
429 lines
16 KiB
Python
import pyautogui
|
||
import time
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
import signal
|
||
import psutil
|
||
from PIL import Image, ImageGrab
|
||
import win32gui
|
||
import win32con
|
||
import win32process
|
||
|
||
|
||
def kill_process_by_name(process_name):
|
||
"""终止指定名称的进程"""
|
||
killed = False
|
||
for proc in psutil.process_iter(['pid', 'name']):
|
||
try:
|
||
if process_name.lower() in proc.info['name'].lower():
|
||
print(f"正在终止进程: {proc.info['name']} (PID: {proc.info['pid']})")
|
||
try:
|
||
process = psutil.Process(proc.info['pid'])
|
||
process.terminate()
|
||
killed = True
|
||
except Exception as e:
|
||
print(f"终止进程时出错: {e}")
|
||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||
pass
|
||
return killed
|
||
|
||
|
||
def kill_current_python_process():
|
||
"""结束当前Python程序及相关进程"""
|
||
current_pid = os.getpid()
|
||
parent_pid = psutil.Process(current_pid).ppid()
|
||
print(f"当前Python进程PID: {current_pid}, 父进程PID: {parent_pid}")
|
||
|
||
# 标记当前进程自己
|
||
this_process = psutil.Process(current_pid)
|
||
|
||
# 收集需要结束的进程列表
|
||
processes_to_kill = []
|
||
|
||
# 查找所有Python进程和cmd进程
|
||
print("查找所有相关进程...")
|
||
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
|
||
try:
|
||
proc_pid = proc.info['pid']
|
||
proc_name = proc.info['name'].lower() if proc.info['name'] else ""
|
||
|
||
# 跳过当前进程
|
||
if proc_pid == current_pid:
|
||
continue
|
||
|
||
# 检查是否是Python相关进程
|
||
if 'python' in proc_name or proc_name.endswith('.exe'):
|
||
try:
|
||
# 获取完整进程对象和命令行
|
||
full_proc = psutil.Process(proc_pid)
|
||
cmdline = ' '.join(full_proc.cmdline()).lower() if full_proc.cmdline() else ''
|
||
|
||
# 检查是否是相关进程 - 更广泛的匹配条件
|
||
if ('python' in cmdline or
|
||
'wechatrobot' in cmdline or
|
||
'robot' in cmdline or
|
||
'main' in cmdline or
|
||
'win_click' in cmdline or
|
||
os.path.dirname(os.path.abspath(__file__)).lower() in cmdline.lower()):
|
||
processes_to_kill.append(full_proc)
|
||
print(f"找到相关Python进程: {full_proc.name()} (PID: {proc_pid}), 命令行: {cmdline[:50]}...")
|
||
except Exception as e:
|
||
print(f"检查进程 {proc_pid} 时出错: {e}")
|
||
|
||
# 检查是否是cmd进程或与bot_start相关的进程
|
||
if proc_name in ['cmd.exe', 'powershell.exe', 'conhost.exe'] or 'cmd' in proc_name:
|
||
try:
|
||
full_proc = psutil.Process(proc_pid)
|
||
cmdline = ' '.join(full_proc.cmdline()).lower() if full_proc.cmdline() else ''
|
||
|
||
# 检查是否是与bot_start或restart_system相关的cmd进程
|
||
if ('bot_start' in cmdline or 'restart_system' in cmdline or
|
||
os.path.dirname(os.path.abspath(__file__)).lower() in cmdline.lower()):
|
||
processes_to_kill.append(full_proc)
|
||
print(f"找到相关CMD进程: {full_proc.name()} (PID: {proc_pid}), 命令行: {cmdline[:50]}...")
|
||
except Exception as e:
|
||
print(f"检查CMD进程 {proc_pid} 时出错: {e}")
|
||
|
||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e:
|
||
print(f"检查进程时出错: {e}")
|
||
continue
|
||
|
||
# 确保不杀死当前进程和系统进程
|
||
processes_to_kill = [p for p in processes_to_kill
|
||
if p.pid != current_pid and p.pid != 1 and p.pid != parent_pid]
|
||
|
||
# 按进程树层级排序(子进程先杀)
|
||
def get_tree_depth(process):
|
||
depth = 0
|
||
try:
|
||
parent_id = process.ppid()
|
||
while parent_id != 0 and parent_id != 1:
|
||
depth += 1
|
||
parent = psutil.Process(parent_id)
|
||
parent_id = parent.ppid()
|
||
except:
|
||
pass
|
||
return depth
|
||
|
||
# 排序进程,子进程先终止
|
||
processes_to_kill.sort(key=get_tree_depth, reverse=True)
|
||
|
||
# 定义终止进程的函数
|
||
def terminate_processes():
|
||
print(f"开始终止 {len(processes_to_kill)} 个相关进程...")
|
||
killed_pids = []
|
||
|
||
# 首先尝试温和终止所有进程
|
||
for proc in processes_to_kill:
|
||
try:
|
||
if proc.is_running() and proc.pid not in killed_pids:
|
||
print(f"温和终止进程: {proc.name()} (PID: {proc.pid})")
|
||
proc.terminate()
|
||
killed_pids.append(proc.pid)
|
||
except Exception as e:
|
||
print(f"温和终止进程 {proc.pid} 时出错: {e}")
|
||
|
||
# 等待一段时间让进程自行终止
|
||
print("等待进程自行终止...")
|
||
time.sleep(2)
|
||
|
||
# 强制终止仍然存活的进程
|
||
for proc in processes_to_kill:
|
||
try:
|
||
if proc.is_running() and proc.pid not in killed_pids:
|
||
print(f"强制终止进程: {proc.name()} (PID: {proc.pid})")
|
||
proc.kill()
|
||
killed_pids.append(proc.pid)
|
||
except Exception as e:
|
||
print(f"强制终止进程 {proc.pid} 时出错: {e}")
|
||
|
||
# 确保进程终止
|
||
still_running = []
|
||
for proc in processes_to_kill:
|
||
try:
|
||
if proc.is_running():
|
||
still_running.append(proc)
|
||
except:
|
||
pass
|
||
|
||
# 使用操作系统级命令终止顽固进程
|
||
if still_running:
|
||
print(f"仍有 {len(still_running)} 个进程未终止,尝试使用系统命令终止...")
|
||
for proc in still_running:
|
||
try:
|
||
# 使用taskkill强制终止进程和子进程
|
||
os.system(f"taskkill /F /PID {proc.pid} /T")
|
||
print(f"使用taskkill终止进程及其子进程: PID {proc.pid}")
|
||
except Exception as e:
|
||
print(f"使用taskkill终止进程 {proc.pid} 时出错: {e}")
|
||
|
||
# 全局终止所有Python进程(最后的保险措施)
|
||
print("使用系统命令终止所有残留的Python进程...")
|
||
os.system("taskkill /F /IM python.exe /T 2>nul")
|
||
os.system("taskkill /F /IM pythonw.exe /T 2>nul")
|
||
|
||
print("进程终止操作完成")
|
||
|
||
# 返回终止进程的函数
|
||
return terminate_processes
|
||
|
||
|
||
def find_wechat_window():
|
||
"""查找微信窗口并将其激活"""
|
||
# 方法1: 通过窗口标题查找
|
||
wechat_window = win32gui.FindWindow(None, "微信")
|
||
if wechat_window == 0:
|
||
# 备用方法: 尝试查找部分标题匹配的窗口
|
||
def callback(hwnd, windows):
|
||
if win32gui.IsWindowVisible(hwnd) and "微信" in win32gui.GetWindowText(hwnd):
|
||
windows.append(hwnd)
|
||
return True
|
||
|
||
windows = []
|
||
win32gui.EnumWindows(callback, windows)
|
||
|
||
if windows:
|
||
wechat_window = windows[0]
|
||
|
||
# 方法2: 如果窗口标题查找失败,通过进程名查找
|
||
if wechat_window == 0:
|
||
print("通过窗口标题未找到微信,尝试通过进程查找...")
|
||
for proc in psutil.process_iter(['pid', 'name']):
|
||
if proc.info['name'] and 'WeChat' in proc.info['name']:
|
||
pid = proc.info['pid']
|
||
|
||
# 查找与此PID关联的窗口
|
||
def enum_windows_callback(hwnd, target_pid):
|
||
_, found_pid = win32process.GetWindowThreadProcessId(hwnd)
|
||
if found_pid == target_pid and win32gui.IsWindowVisible(hwnd):
|
||
target_pid.append(hwnd)
|
||
return True
|
||
|
||
target_hwnds = []
|
||
win32gui.EnumWindows(lambda hwnd, param: enum_windows_callback(hwnd, target_hwnds), pid)
|
||
|
||
if target_hwnds:
|
||
wechat_window = target_hwnds[0]
|
||
break
|
||
|
||
if wechat_window != 0:
|
||
print(f"找到微信窗口: {win32gui.GetWindowText(wechat_window)}")
|
||
# 还原最小化的窗口
|
||
if win32gui.IsIconic(wechat_window):
|
||
win32gui.ShowWindow(wechat_window, win32con.SW_RESTORE)
|
||
# 激活并前置窗口
|
||
win32gui.SetForegroundWindow(wechat_window)
|
||
return True
|
||
else:
|
||
print("未找到微信窗口")
|
||
return False
|
||
|
||
|
||
def is_green_pixel(r, g, b):
|
||
"""判断像素是否为绿色"""
|
||
# 微信的绿色按钮大约是 RGB(7, 193, 96)
|
||
return g > 150 and g > r * 1.5 and g > b * 1.5
|
||
|
||
|
||
def find_and_click_wechat_login(max_retries=3, retry_interval=5):
|
||
"""查找并点击微信登录按钮
|
||
|
||
Args:
|
||
max_retries: 最大重试次数
|
||
retry_interval: 重试间隔(秒)
|
||
"""
|
||
for attempt in range(max_retries):
|
||
# 切换到微信窗口
|
||
if not find_wechat_window():
|
||
print(f"尝试 {attempt + 1}/{max_retries}: 未找到微信窗口,等待 {retry_interval} 秒后重试...")
|
||
time.sleep(retry_interval)
|
||
continue
|
||
|
||
# 等待窗口完全加载
|
||
time.sleep(2)
|
||
|
||
try:
|
||
print(f"尝试 {attempt + 1}/{max_retries}: 查找登录按钮...")
|
||
# 截取屏幕
|
||
screenshot = ImageGrab.grab()
|
||
|
||
# 创建一个绿色区域的掩码
|
||
green_areas = []
|
||
width, height = screenshot.size
|
||
|
||
# 分析图像,识别绿色区域
|
||
for y in range(0, height, 5): # 每5个像素采样一次以提高效率
|
||
for x in range(0, width, 5):
|
||
r, g, b = screenshot.getpixel((x, y))
|
||
if is_green_pixel(r, g, b):
|
||
# 发现绿色像素,向四周扩散检查是否为按钮
|
||
left, top, right, bottom = x, y, x, y
|
||
# 向右扩散
|
||
for nx in range(x, min(x + 300, width)):
|
||
r, g, b = screenshot.getpixel((nx, y))
|
||
if not is_green_pixel(r, g, b):
|
||
break
|
||
right = nx
|
||
# 向下扩散
|
||
for ny in range(y, min(y + 60, height)):
|
||
r, g, b = screenshot.getpixel((x, ny))
|
||
if not is_green_pixel(r, g, b):
|
||
break
|
||
bottom = ny
|
||
|
||
width_area = right - left
|
||
height_area = bottom - top
|
||
|
||
# 如果区域符合按钮尺寸
|
||
if 100 < width_area < 300 and 30 < height_area < 60:
|
||
green_areas.append((left, top, right, bottom))
|
||
|
||
login_button_found = False
|
||
for left, top, right, bottom in green_areas:
|
||
# 点击按钮中心
|
||
center_x = (left + right) // 2
|
||
center_y = (top + bottom) // 2
|
||
pyautogui.click(center_x, center_y)
|
||
print(f"已点击位置: ({center_x}, {center_y})")
|
||
login_button_found = True
|
||
break
|
||
|
||
# 方法2:如果图像识别失败,尝试使用固定位置
|
||
if not login_button_found:
|
||
print("未通过图像识别找到登录按钮,尝试使用备用方法...")
|
||
|
||
# 获取屏幕分辨率
|
||
screen_width, screen_height = pyautogui.size()
|
||
|
||
# 估计登录按钮位置 (通常在窗口中下部偏右)
|
||
button_x = screen_width // 2 # 水平中心
|
||
button_y = (screen_height // 2) + 100 # 垂直中心偏下
|
||
|
||
# 移动到估计位置并点击
|
||
pyautogui.click(button_x, button_y)
|
||
print(f"已点击估计位置: ({button_x}, {button_y})")
|
||
|
||
print("点击操作完成")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"尝试 {attempt + 1}/{max_retries} 发生错误: {e}")
|
||
if attempt < max_retries - 1:
|
||
print(f"等待 {retry_interval} 秒后重试...")
|
||
time.sleep(retry_interval)
|
||
|
||
print(f"经过 {max_retries} 次尝试后仍未成功点击登录按钮")
|
||
return False
|
||
|
||
|
||
def update_system(wait_time=15):
|
||
"""完整的系统更新流程"""
|
||
print("=== 开始系统更新流程 ===")
|
||
|
||
# 步骤1: 结束微信进程
|
||
print("步骤1: 正在结束微信进程...")
|
||
if kill_process_by_name("WeChat"):
|
||
print("微信进程已终止")
|
||
else:
|
||
print("未找到运行中的微信进程")
|
||
|
||
# 步骤2: 获取终止当前Python进程的函数
|
||
terminate_processes = kill_current_python_process()
|
||
|
||
# 步骤3: 准备重启系统
|
||
print("步骤3: 准备重启系统...")
|
||
|
||
# 查找bot_start.bat文件
|
||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||
bat_path = os.path.join(current_dir, "bot_start.bat")
|
||
|
||
if not os.path.exists(bat_path):
|
||
# 向上查找一级目录
|
||
parent_dir = os.path.dirname(current_dir)
|
||
bat_path = os.path.join(parent_dir, "bot_start.bat")
|
||
|
||
if os.path.exists(bat_path):
|
||
print(f"找到启动脚本: {bat_path}")
|
||
|
||
# 创建一个临时的启动脚本,用于在当前进程结束后启动系统
|
||
temp_bat = os.path.join(os.environ.get('TEMP', os.getcwd()), "restart_system.bat")
|
||
with open(temp_bat, "w") as f:
|
||
f.write(f"""@echo off
|
||
echo 等待旧进程结束...
|
||
timeout /t 5 /nobreak > nul
|
||
echo 强制终止残留Python进程...
|
||
taskkill /F /IM python.exe /T 2>nul
|
||
taskkill /F /IM pythonw.exe /T 2>nul
|
||
timeout /t 2 /nobreak > nul
|
||
echo 再次确认无残留进程...
|
||
taskkill /F /IM python.exe /T 2>nul
|
||
taskkill /F /IM pythonw.exe /T 2>nul
|
||
echo 重新启动系统...
|
||
start "" "{bat_path}"
|
||
echo 等待系统启动和微信加载 ({wait_time}秒)...
|
||
timeout /t {wait_time} /nobreak > nul
|
||
echo 尝试登录微信...
|
||
start "" "python" "{os.path.abspath(__file__)}" --login-only
|
||
echo 清理临时脚本...
|
||
(goto) 2>nul & del "%~f0" & exit
|
||
""")
|
||
|
||
# 启动临时脚本
|
||
print(f"启动临时脚本执行重启: {temp_bat}")
|
||
# 使用startupinfo隐藏cmd窗口,防止重复窗口
|
||
startupinfo = subprocess.STARTUPINFO()
|
||
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
||
startupinfo.wShowWindow = subprocess.SW_HIDE
|
||
|
||
# 使用subprocess.Popen而不是run,这样不会等待它完成
|
||
subprocess.Popen(["cmd", "/c", temp_bat],
|
||
shell=True,
|
||
creationflags=subprocess.CREATE_NEW_CONSOLE,
|
||
startupinfo=startupinfo)
|
||
|
||
# 等待脚本启动
|
||
print("已启动临时脚本,等待3秒...")
|
||
time.sleep(3)
|
||
|
||
# 直接调用全局终止方法确保干净退出
|
||
print("终止所有Python相关进程...")
|
||
os.system("taskkill /F /IM python.exe /T 2>nul")
|
||
|
||
# 结束当前Python相关进程并退出
|
||
print("结束当前进程...")
|
||
terminate_processes()
|
||
|
||
# 确保进程退出
|
||
print("强制退出当前进程")
|
||
os._exit(0) # 使用os._exit确保立即退出
|
||
else:
|
||
print(f"未找到启动脚本 bot_start.bat,无法自动重启系统")
|
||
return False
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 检查是否只需要执行登录
|
||
if "--login-only" in sys.argv:
|
||
print("仅执行微信登录...")
|
||
find_and_click_wechat_login()
|
||
else:
|
||
# 显示选项菜单
|
||
print("==== 系统工具 ====")
|
||
print("1. 查找并点击微信登录")
|
||
print("2. 更新系统 (结束微信、更新代码、重启系统、自动登录)")
|
||
print("0. 退出")
|
||
|
||
try:
|
||
choice = input("请选择操作 [0-2]: ")
|
||
if choice == "1":
|
||
find_and_click_wechat_login()
|
||
elif choice == "2":
|
||
update_system()
|
||
else:
|
||
print("退出程序")
|
||
except KeyboardInterrupt:
|
||
print("\n程序被中断")
|
||
except Exception as e:
|
||
print(f"发生错误: {e}")
|