Files
abot/win_click.py
2025-03-31 14:23:11 +08:00

458 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pyautogui
import time
import os
import subprocess
import sys
import signal
import psutil
from PIL import Image, ImageGrab
import win32gui
import win32con
import win32process
def kill_process_by_name(process_name):
"""终止指定名称的进程"""
killed = False
for proc in psutil.process_iter(['pid', 'name']):
try:
if process_name.lower() in proc.info['name'].lower():
print(f"正在终止进程: {proc.info['name']} (PID: {proc.info['pid']})")
try:
process = psutil.Process(proc.info['pid'])
process.terminate()
killed = True
except Exception as e:
print(f"终止进程时出错: {e}")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return killed
def kill_current_python_process():
"""结束当前Python程序及相关进程"""
current_pid = os.getpid()
parent_pid = psutil.Process(current_pid).ppid()
print(f"当前Python进程PID: {current_pid}, 父进程PID: {parent_pid}")
# 标记当前进程自己
this_process = psutil.Process(current_pid)
# 收集需要结束的进程列表
processes_to_kill = []
# 查找所有Python进程和cmd进程
print("查找所有相关进程...")
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
proc_pid = proc.info['pid']
proc_name = proc.info['name'].lower() if proc.info['name'] else ""
# 跳过当前进程
if proc_pid == current_pid:
continue
# 检查是否是Python相关进程
if 'python' in proc_name or proc_name.endswith('.exe'):
try:
# 获取完整进程对象和命令行
full_proc = psutil.Process(proc_pid)
cmdline = ' '.join(full_proc.cmdline()).lower() if full_proc.cmdline() else ''
# 检查是否是相关进程 - 更广泛的匹配条件
if ('python' in cmdline or
'wechatrobot' in cmdline or
'robot' in cmdline or
'main' in cmdline or
'win_click' in cmdline or
os.path.dirname(os.path.abspath(__file__)).lower() in cmdline.lower()):
processes_to_kill.append(full_proc)
print(f"找到相关Python进程: {full_proc.name()} (PID: {proc_pid}), 命令行: {cmdline[:50]}...")
except Exception as e:
print(f"检查进程 {proc_pid} 时出错: {e}")
# 检查是否是cmd进程或与bot_start相关的进程
if proc_name in ['cmd.exe', 'powershell.exe', 'conhost.exe'] or 'cmd' in proc_name:
try:
full_proc = psutil.Process(proc_pid)
cmdline = ' '.join(full_proc.cmdline()).lower() if full_proc.cmdline() else ''
# 检查是否是与bot_start或restart_system相关的cmd进程
if ('bot_start' in cmdline or 'restart_system' in cmdline or
os.path.dirname(os.path.abspath(__file__)).lower() in cmdline.lower()):
processes_to_kill.append(full_proc)
print(f"找到相关CMD进程: {full_proc.name()} (PID: {proc_pid}), 命令行: {cmdline[:50]}...")
except Exception as e:
print(f"检查CMD进程 {proc_pid} 时出错: {e}")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e:
print(f"检查进程时出错: {e}")
continue
# 确保不杀死当前进程和系统进程
processes_to_kill = [p for p in processes_to_kill
if p.pid != current_pid and p.pid != 1 and p.pid != parent_pid]
# 按进程树层级排序(子进程先杀)
def get_tree_depth(process):
depth = 0
try:
parent_id = process.ppid()
while parent_id != 0 and parent_id != 1:
depth += 1
parent = psutil.Process(parent_id)
parent_id = parent.ppid()
except:
pass
return depth
# 排序进程,子进程先终止
processes_to_kill.sort(key=get_tree_depth, reverse=True)
# 定义终止进程的函数
def terminate_processes():
print(f"开始终止 {len(processes_to_kill)} 个相关进程...")
killed_pids = []
# 首先尝试温和终止所有进程
for proc in processes_to_kill:
try:
if proc.is_running() and proc.pid not in killed_pids:
print(f"温和终止进程: {proc.name()} (PID: {proc.pid})")
proc.terminate()
killed_pids.append(proc.pid)
except Exception as e:
print(f"温和终止进程 {proc.pid} 时出错: {e}")
# 等待一段时间让进程自行终止
print("等待进程自行终止...")
time.sleep(2)
# 强制终止仍然存活的进程
for proc in processes_to_kill:
try:
if proc.is_running() and proc.pid not in killed_pids:
print(f"强制终止进程: {proc.name()} (PID: {proc.pid})")
proc.kill()
killed_pids.append(proc.pid)
except Exception as e:
print(f"强制终止进程 {proc.pid} 时出错: {e}")
# 确保进程终止
still_running = []
for proc in processes_to_kill:
try:
if proc.is_running():
still_running.append(proc)
except:
pass
# 使用操作系统级命令终止顽固进程
if still_running:
print(f"仍有 {len(still_running)} 个进程未终止,尝试使用系统命令终止...")
for proc in still_running:
try:
# 使用taskkill强制终止进程和子进程
os.system(f"taskkill /F /PID {proc.pid} /T")
print(f"使用taskkill终止进程及其子进程: PID {proc.pid}")
except Exception as e:
print(f"使用taskkill终止进程 {proc.pid} 时出错: {e}")
# 全局终止所有Python进程最后的保险措施
print("使用系统命令终止所有残留的Python进程...")
os.system("taskkill /F /IM python.exe /T 2>nul")
os.system("taskkill /F /IM pythonw.exe /T 2>nul")
print("进程终止操作完成")
# 返回终止进程的函数
return terminate_processes
def find_wechat_window():
"""查找微信窗口并将其激活"""
# 方法1: 通过窗口标题查找
wechat_window = win32gui.FindWindow(None, "微信")
if wechat_window == 0:
# 备用方法: 尝试查找部分标题匹配的窗口
def callback(hwnd, windows):
if win32gui.IsWindowVisible(hwnd) and "微信" in win32gui.GetWindowText(hwnd):
windows.append(hwnd)
return True
windows = []
win32gui.EnumWindows(callback, windows)
if windows:
wechat_window = windows[0]
# 方法2: 如果窗口标题查找失败,通过进程名查找
if wechat_window == 0:
print("通过窗口标题未找到微信,尝试通过进程查找...")
for proc in psutil.process_iter(['pid', 'name']):
if proc.info['name'] and 'WeChat' in proc.info['name']:
pid = proc.info['pid']
# 查找与此PID关联的窗口
def enum_windows_callback(hwnd, target_pid):
_, found_pid = win32process.GetWindowThreadProcessId(hwnd)
if found_pid == target_pid and win32gui.IsWindowVisible(hwnd):
target_pid.append(hwnd)
return True
target_hwnds = []
win32gui.EnumWindows(lambda hwnd, param: enum_windows_callback(hwnd, target_hwnds), pid)
if target_hwnds:
wechat_window = target_hwnds[0]
break
if wechat_window != 0:
print(f"找到微信窗口: {win32gui.GetWindowText(wechat_window)}")
# 还原最小化的窗口
if win32gui.IsIconic(wechat_window):
win32gui.ShowWindow(wechat_window, win32con.SW_RESTORE)
# 激活并前置窗口
win32gui.SetForegroundWindow(wechat_window)
return True
else:
print("未找到微信窗口")
return False
def is_green_pixel(r, g, b):
"""判断像素是否为绿色"""
# 微信的绿色按钮大约是 RGB(7, 193, 96)
return g > 150 and g > r * 1.5 and g > b * 1.5
def find_and_click_wechat_login(max_retries=3, retry_interval=5):
"""查找并点击微信登录按钮
Args:
max_retries: 最大重试次数
retry_interval: 重试间隔(秒)
"""
for attempt in range(max_retries):
# 切换到微信窗口
if not find_wechat_window():
print(f"尝试 {attempt + 1}/{max_retries}: 未找到微信窗口,等待 {retry_interval} 秒后重试...")
time.sleep(retry_interval)
continue
# 等待窗口完全加载
time.sleep(2)
try:
print(f"尝试 {attempt + 1}/{max_retries}: 查找登录按钮...")
# 截取屏幕
screenshot = ImageGrab.grab()
# 创建一个绿色区域的掩码
green_areas = []
width, height = screenshot.size
# 分析图像,识别绿色区域
for y in range(0, height, 5): # 每5个像素采样一次以提高效率
for x in range(0, width, 5):
r, g, b = screenshot.getpixel((x, y))
if is_green_pixel(r, g, b):
# 发现绿色像素,向四周扩散检查是否为按钮
left, top, right, bottom = x, y, x, y
# 向右扩散
for nx in range(x, min(x + 300, width)):
r, g, b = screenshot.getpixel((nx, y))
if not is_green_pixel(r, g, b):
break
right = nx
# 向下扩散
for ny in range(y, min(y + 60, height)):
r, g, b = screenshot.getpixel((x, ny))
if not is_green_pixel(r, g, b):
break
bottom = ny
width_area = right - left
height_area = bottom - top
# 如果区域符合按钮尺寸
if 100 < width_area < 300 and 30 < height_area < 60:
green_areas.append((left, top, right, bottom))
login_button_found = False
for left, top, right, bottom in green_areas:
# 点击按钮中心
center_x = (left + right) // 2
center_y = (top + bottom) // 2
pyautogui.click(center_x, center_y)
print(f"已点击位置: ({center_x}, {center_y})")
login_button_found = True
break
# 方法2如果图像识别失败尝试使用固定位置
if not login_button_found:
print("未通过图像识别找到登录按钮,尝试使用备用方法...")
# 获取屏幕分辨率
screen_width, screen_height = pyautogui.size()
# 估计登录按钮位置 (通常在窗口中下部偏右)
button_x = screen_width // 2 # 水平中心
button_y = (screen_height // 2) + 100 # 垂直中心偏下
# 移动到估计位置并点击
pyautogui.click(button_x, button_y)
print(f"已点击估计位置: ({button_x}, {button_y})")
print("点击操作完成")
return True
except Exception as e:
print(f"尝试 {attempt + 1}/{max_retries} 发生错误: {e}")
if attempt < max_retries - 1:
print(f"等待 {retry_interval} 秒后重试...")
time.sleep(retry_interval)
print(f"经过 {max_retries} 次尝试后仍未成功点击登录按钮")
return False
def update_system(wait_time=15):
"""完整的系统更新流程"""
print("=== 开始系统更新流程 ===")
# 步骤1: 结束微信进程
print("步骤1: 正在结束微信进程...")
if kill_process_by_name("WeChat"):
print("微信进程已终止")
else:
print("未找到运行中的微信进程")
# 步骤2: 获取终止当前Python进程的函数
terminate_processes = kill_current_python_process()
# 步骤3: 准备重启系统
print("步骤3: 准备重启系统...")
# 查找bot_start.bat文件
current_dir = os.path.dirname(os.path.abspath(__file__))
bat_path = os.path.join(current_dir, "bot_start.bat")
if not os.path.exists(bat_path):
# 向上查找一级目录
parent_dir = os.path.dirname(current_dir)
bat_path = os.path.join(parent_dir, "bot_start.bat")
if os.path.exists(bat_path):
print(f"找到启动脚本: {bat_path}")
# 创建一个临时的启动脚本,用于在当前进程结束后启动系统
temp_bat = os.path.join(os.environ.get('TEMP', os.getcwd()), "restart_system.bat")
with open(temp_bat, "w") as f:
f.write(f"""@echo off
echo 等待旧进程结束...
timeout /t 5 /nobreak > nul
echo 强制终止残留Python进程...
taskkill /F /IM python.exe /T 2>nul
taskkill /F /IM pythonw.exe /T 2>nul
timeout /t 2 /nobreak > nul
echo 再次确认无残留进程...
taskkill /F /IM python.exe /T 2>nul
taskkill /F /IM pythonw.exe /T 2>nul
echo 重新启动系统...
cd /d "{os.path.dirname(bat_path)}"
call "{bat_path}"
echo 等待系统启动和微信加载 ({wait_time}秒)...
timeout /t {wait_time} /nobreak > nul
echo 尝试登录微信...
start "" "python" "{os.path.abspath(__file__)}" --login-only
echo 清理临时脚本...
(goto) 2>nul & del "%~f0" & exit
""")
# 确保临时脚本具有执行权限
os.chmod(temp_bat, 0o755)
print(f"启动临时脚本执行重启: {temp_bat}")
# 创建分离进程运行批处理
try:
# 方法1: 使用START命令启动独立进程
os.system(f'start "" "{temp_bat}"')
# 方法2: 作为备份机制也使用subprocess启动
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
subprocess.Popen(
temp_bat,
shell=True,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP | subprocess.DETACHED_PROCESS,
startupinfo=startupinfo
)
print("已通过两种方法启动临时脚本")
except Exception as e:
print(f"启动临时脚本时出错: {e}")
# 尝试直接回退到启动原始脚本
try:
print(f"尝试直接启动原始脚本: {bat_path}")
original_dir = os.getcwd()
os.chdir(os.path.dirname(bat_path))
subprocess.Popen(
f'cmd /c "{bat_path}"',
shell=True,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP | subprocess.DETACHED_PROCESS
)
os.chdir(original_dir)
except Exception as e2:
print(f"启动原始脚本时出错: {e2}")
# 等待确保脚本启动
print("等待4秒确保启动脚本已开始执行...")
time.sleep(4)
# 多重终止保障
try:
# 确保使用系统命令终止所有Python进程
print("使用系统命令终止所有Python相关进程...")
os.system("taskkill /F /IM python.exe /T 2>nul")
os.system("taskkill /F /IM pythonw.exe /T 2>nul")
# 结束当前Python相关进程
print("使用psutil终止当前进程...")
terminate_processes()
except Exception as e:
print(f"终止进程时出错: {e}")
print("强制退出当前进程")
os._exit(0) # 使用os._exit确保立即退出
else:
print(f"未找到启动脚本 bot_start.bat无法自动重启系统")
return False
if __name__ == "__main__":
# 检查是否只需要执行登录
if "--login-only" in sys.argv:
print("仅执行微信登录...")
find_and_click_wechat_login()
else:
# 显示选项菜单
print("==== 系统工具 ====")
print("1. 查找并点击微信登录")
print("2. 更新系统 (结束微信、更新代码、重启系统、自动登录)")
print("0. 退出")
try:
choice = input("请选择操作 [0-2]: ")
if choice == "1":
find_and_click_wechat_login()
elif choice == "2":
update_system()
else:
print("退出程序")
except KeyboardInterrupt:
print("\n程序被中断")
except Exception as e:
print(f"发生错误: {e}")