""" WechatHookBot - 主入口 基于个微大客户版 Hook API 的微信机器人框架 优化功能: - 优先级消息队列 - 自适应熔断器 - 配置热更新 - 性能监控 - 优雅关闭 """ import asyncio import signal import sys import time import tomllib from pathlib import Path from loguru import logger from WechatHook import NoveLoader, WechatHookClient from WechatHook.callbacks import ( add_callback_handler, remove_callback_handler, clear_all_callbacks, wechat_connect_callback, wechat_recv_callback, wechat_close_callback, CONNECT_CALLBACK, RECV_CALLBACK, CLOSE_CALLBACK ) from utils.hookbot import HookBot from utils.plugin_manager import PluginManager from utils.decorators import scheduler from utils.bot_utils import ( PriorityMessageQueue, MessagePriority, PRIORITY_MESSAGE_TYPES, AdaptiveCircuitBreaker, ConfigWatcher, PerformanceMonitor, get_performance_monitor ) class BotService: """机器人服务类""" def __init__(self): self.loader = None self.client = None self.hookbot = None self.plugin_manager = None self.process_id = None # 微信进程 ID self.socket_client_id = None # Socket 客户端 ID self.is_running = False self.is_shutting_down = False # 是否正在关闭 self.event_loop = None # 事件循环引用 # 消息队列和性能控制 self.message_queue: PriorityMessageQueue = None # 优先级消息队列 self.queue_config = {} self.concurrency_config = {} self.consumer_tasks = [] self.processing_semaphore = None # 自适应熔断器 self.circuit_breaker: AdaptiveCircuitBreaker = None # 配置热更新 self.config_watcher: ConfigWatcher = None # 性能监控 self.performance_monitor: PerformanceMonitor = None @CONNECT_CALLBACK(in_class=True) def on_connect(self, client_id): """连接回调""" logger.success(f"微信客户端已连接: {client_id}") self.socket_client_id = client_id @RECV_CALLBACK(in_class=True) def on_receive(self, client_id, msg_type, data): """接收消息回调""" # 减少日志输出,只记录关键消息类型 if msg_type == 11025: # 登录信息 logger.success(f"获取到登录信息: wxid={data.get('wxid', 'unknown')}, nickname={data.get('nickname', 'unknown')}") if self.hookbot: self.hookbot.update_profile(data.get('wxid', 'unknown'), data.get('nickname', 'unknown')) # 初始化 CDN(必须在登录后执行,才能使用协议 API) if self.client and self.event_loop: logger.info("正在初始化 CDN...") asyncio.run_coroutine_threadsafe( self.client.cdn_init(), self.event_loop ) return # 使用消息队列处理其他消息 if self.message_queue and self.event_loop: try: # 快速入队,不阻塞回调 asyncio.run_coroutine_threadsafe( self._enqueue_message(msg_type, data), self.event_loop ) except Exception as e: logger.error(f"消息入队失败: {e}") async def _enqueue_message(self, msg_type, data): """将消息加入优先级队列""" try: # 记录收到消息 if self.performance_monitor: self.performance_monitor.record_message_received() # 检查队列是否已满 if self.message_queue.full(): overflow_strategy = self.queue_config.get("overflow_strategy", "drop_oldest") if overflow_strategy == "drop_oldest": # 丢弃优先级最低的消息 if self.message_queue.drop_lowest_priority(): logger.warning("队列已满,丢弃优先级最低的消息") if self.performance_monitor: self.performance_monitor.record_message_dropped() elif overflow_strategy == "sampling": # 采样处理,随机丢弃(但高优先级消息不丢弃) import random priority = PRIORITY_MESSAGE_TYPES.get(msg_type, MessagePriority.NORMAL) if priority < MessagePriority.HIGH and random.random() < 0.5: logger.debug("队列压力大,采样丢弃消息") if self.performance_monitor: self.performance_monitor.record_message_dropped() return else: # degrade # 降级处理(但高优先级消息不丢弃) priority = PRIORITY_MESSAGE_TYPES.get(msg_type, MessagePriority.NORMAL) if priority < MessagePriority.HIGH: logger.warning("队列已满,降级处理") if self.performance_monitor: self.performance_monitor.record_message_dropped() return # 将消息放入优先级队列 await self.message_queue.put(msg_type, data) # 记录队列大小 if self.performance_monitor: self.performance_monitor.record_queue_size(self.message_queue.qsize()) except Exception as e: logger.error(f"消息入队异常: {e}") async def _message_consumer(self, consumer_id: int): """消息消费者协程 - 纯队列串行模式,避免并发触发风控""" logger.info(f"消息消费者 {consumer_id} 已启动(串行模式)") while self.is_running and not self.is_shutting_down: try: # 从队列获取消息,设置超时避免无限等待 msg_type, data = await asyncio.wait_for( self.message_queue.get(), timeout=1.0 ) # 检查熔断器状态 if self.circuit_breaker and self.circuit_breaker.is_open(): logger.debug("熔断器开启,跳过消息处理") self.circuit_breaker.record_rejection() self.message_queue.task_done() continue # 串行处理:等待当前消息处理完成后再处理下一条 timeout = self.concurrency_config.get("plugin_task_timeout_seconds", 720) start_time = time.time() try: await asyncio.wait_for( self.hookbot.process_message(msg_type, data), timeout=timeout ) # 记录成功 processing_time = time.time() - start_time if self.circuit_breaker: self.circuit_breaker.record_success() if self.performance_monitor: self.performance_monitor.record_message_processed(processing_time) except asyncio.TimeoutError: logger.warning(f"消息处理超时 (>{timeout}s): type={msg_type}") if self.circuit_breaker: self.circuit_breaker.record_failure() if self.performance_monitor: self.performance_monitor.record_message_failed() except Exception as e: logger.error(f"消息处理异常: {e}") if self.circuit_breaker: self.circuit_breaker.record_failure() if self.performance_monitor: self.performance_monitor.record_message_failed() # 标记任务完成 self.message_queue.task_done() # 更新熔断器统计 if self.performance_monitor and self.circuit_breaker: self.performance_monitor.update_circuit_breaker_stats( self.circuit_breaker.get_stats() ) # 消息间隔,避免发送太快触发风控 message_interval = self.concurrency_config.get("message_interval_ms", 100) if message_interval > 0: await asyncio.sleep(message_interval / 1000.0) except asyncio.TimeoutError: # 队列为空,继续等待 continue except asyncio.CancelledError: # 任务被取消,退出循环 logger.info(f"消费者 {consumer_id} 收到取消信号") break except Exception as e: logger.error(f"消费者 {consumer_id} 异常: {e}") await asyncio.sleep(0.1) # 短暂休息避免忙等 logger.info(f"消费者 {consumer_id} 已退出") @CLOSE_CALLBACK(in_class=True) def on_close(self, client_id): """断开连接回调""" logger.warning(f"微信客户端已断开: {client_id}") async def _wait_for_socket(self, timeout_seconds: int = 15) -> bool: """等待 socket 客户端连接""" elapsed = 0 while elapsed < timeout_seconds: if self.socket_client_id: return True await asyncio.sleep(1) elapsed += 1 logger.info(f"等待微信客户端连接中... ({elapsed}/{timeout_seconds}s)") return False async def initialize(self): """初始化系统""" logger.info("=" * 60) logger.info("WechatHookBot 启动中...") logger.info("=" * 60) # 保存事件循环引用 self.event_loop = asyncio.get_event_loop() # 读取配置 config_path = Path("main_config.toml") if not config_path.exists(): logger.error("配置文件不存在: main_config.toml") return False with open(config_path, "rb") as f: config = tomllib.load(f) # 初始化性能配置 self.queue_config = config.get("Queue", {}) self.concurrency_config = config.get("Concurrency", {}) # 创建优先级消息队列 queue_size = self.queue_config.get("max_size", 1000) self.message_queue = PriorityMessageQueue(maxsize=queue_size) logger.info(f"优先级消息队列已创建,容量: {queue_size}") # 创建并发控制信号量 max_concurrency = self.concurrency_config.get("plugin_max_concurrency", 8) self.processing_semaphore = asyncio.Semaphore(max_concurrency) logger.info(f"并发控制已设置,最大并发: {max_concurrency}") # 创建自适应熔断器 if self.concurrency_config.get("enable_circuit_breaker", True): self.circuit_breaker = AdaptiveCircuitBreaker( failure_threshold=self.concurrency_config.get("circuit_breaker_threshold", 10), success_threshold=3, initial_recovery_time=5.0, max_recovery_time=300.0 ) logger.info("自适应熔断器已创建") # 创建性能监控器 self.performance_monitor = get_performance_monitor() logger.info("性能监控器已创建") # 创建配置热更新监听器 self.config_watcher = ConfigWatcher("main_config.toml", check_interval=5.0) self.config_watcher.register_callback(self._on_config_update) await self.config_watcher.start() logger.info("配置热更新监听器已启动") # 不需要数据库(简化版本) # 获取 DLL 路径 hook_config = config.get("WechatHook", {}) loader_dll = hook_config.get("loader-dll", "libs/Loader.dll") helper_dll = hook_config.get("helper-dll", "libs/Helper.dll") # 创建共享内存(必须在创建 Loader 之前) from WechatHook.loader import create_shared_memory logger.info("创建共享内存...") self.shared_memory_handle, self.shared_memory_address = create_shared_memory() # 注册回调(必须在创建 Loader 之前) add_callback_handler(self) # 创建 Loader logger.info("加载 Loader.dll...") try: self.loader = NoveLoader(loader_dll) except Exception as e: logger.error(f"加载 Loader.dll 失败: {e}") return False try: version = self.loader.GetUserWeChatVersion() logger.info(f"检测到本机微信版本: {version}") except Exception as e: logger.warning(f"无法获取微信版本信息: {e}") # 注入微信 logger.info("注入微信...") self.process_id = self.loader.InjectWeChat(helper_dll) if not self.process_id: logger.error("注入微信失败") return False # 等待 socket 客户端回调 if not await self._wait_for_socket(timeout_seconds=20): logger.error("Socket 客户端未连接,请检查微信是否正在运行") return False # 额外等待 0.5s 确保稳定 await asyncio.sleep(0.5) self.client = WechatHookClient(self.loader, self.socket_client_id) # 创建 HookBot self.hookbot = HookBot(self.client) # 获取登录信息 logger.info("获取登录信息...") await self.client.get_login_info() await asyncio.sleep(2) # 增加等待时间确保回调执行 # 检查是否已通过回调获取到登录信息 if not self.hookbot.wxid: logger.warning("未能通过回调获取登录信息,使用占位符") self.hookbot.update_profile("unknown", "HookBot") # 初始化 CDN(必须在登录后执行,才能使用协议 API) logger.info("正在初始化 CDN...") await self.client.cdn_init() await asyncio.sleep(0.5) # 等待 CDN 初始化完成 # 加载插件 logger.info("加载插件...") self.plugin_manager = PluginManager() self.plugin_manager.set_bot(self.client) loaded_plugins = await self.plugin_manager.load_plugins(load_disabled=False) logger.success(f"已加载插件: {loaded_plugins}") # 启动消息消费者 consumer_count = self.queue_config.get("consumer_count", 1) for i in range(consumer_count): consumer_task = asyncio.create_task(self._message_consumer(i)) self.consumer_tasks.append(consumer_task) logger.success(f"已启动 {consumer_count} 个消息消费者") # 启动定时任务 if scheduler.state == 0: scheduler.start() logger.success("定时任务已启动") # 记录启动时间 import time self.start_time = int(time.time()) logger.info(f"启动时间: {self.start_time}") logger.success("=" * 60) logger.success("WechatHookBot 启动成功!") logger.success("=" * 60) return True def _on_config_update(self, new_config: dict): """配置热更新回调""" logger.info("正在应用新配置...") # 更新队列配置 self.queue_config = new_config.get("Queue", self.queue_config) # 更新并发配置 old_concurrency = self.concurrency_config self.concurrency_config = new_config.get("Concurrency", self.concurrency_config) # 更新熔断器配置 if self.circuit_breaker: new_threshold = self.concurrency_config.get("circuit_breaker_threshold", 10) if new_threshold != old_concurrency.get("circuit_breaker_threshold", 10): self.circuit_breaker.failure_threshold = new_threshold logger.info(f"熔断器阈值已更新: {new_threshold}") logger.success("配置热更新完成") async def run(self): """运行机器人""" if not await self.initialize(): return self.is_running = True # 启动定期性能报告 async def periodic_stats(): while self.is_running: await asyncio.sleep(300) # 每5分钟输出一次 if self.performance_monitor and self.is_running: self.performance_monitor.print_stats() stats_task = asyncio.create_task(periodic_stats()) try: logger.info("机器人正在运行,按 Ctrl+C 停止...") while self.is_running: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("收到停止信号...") finally: stats_task.cancel() await self.stop() async def stop(self): """优雅关闭机器人""" if self.is_shutting_down: return self.is_shutting_down = True logger.info("=" * 60) logger.info("正在优雅关闭机器人...") logger.info("=" * 60) # 1. 停止接收新消息 self.is_running = False logger.info("[1/7] 停止接收新消息") # 2. 等待队列中的消息处理完成(带超时) if self.message_queue and not self.message_queue.empty(): queue_size = self.message_queue.qsize() logger.info(f"[2/7] 等待队列中 {queue_size} 条消息处理完成...") try: await asyncio.wait_for( self.message_queue.join(), timeout=30 ) logger.info("[2/7] 队列消息已全部处理完成") except asyncio.TimeoutError: logger.warning("[2/7] 队列消息未在 30 秒内处理完成,强制清空") # 清空剩余消息 while not self.message_queue.empty(): try: self.message_queue.get_nowait() self.message_queue.task_done() except: break else: logger.info("[2/7] 队列为空,无需等待") # 3. 停止消息消费者 if self.consumer_tasks: logger.info(f"[3/7] 停止 {len(self.consumer_tasks)} 个消息消费者...") for task in self.consumer_tasks: task.cancel() await asyncio.gather(*self.consumer_tasks, return_exceptions=True) self.consumer_tasks.clear() logger.info("[3/7] 消息消费者已停止") else: logger.info("[3/7] 无消费者需要停止") # 4. 停止配置监听器 if self.config_watcher: logger.info("[4/7] 停止配置监听器...") await self.config_watcher.stop() logger.info("[4/7] 配置监听器已停止") else: logger.info("[4/7] 无配置监听器") # 5. 卸载插件 if self.plugin_manager: logger.info("[5/7] 卸载插件...") await self.plugin_manager.unload_plugins() logger.info("[5/7] 插件已卸载") else: logger.info("[5/7] 无插件需要卸载") # 6. 停止定时任务 if scheduler.running: logger.info("[6/7] 停止定时任务...") scheduler.shutdown() logger.info("[6/7] 定时任务已停止") else: logger.info("[6/7] 定时任务未运行") # 7. 清理回调和销毁微信连接 logger.info("[7/7] 清理资源...") remove_callback_handler(self) clear_all_callbacks() if self.loader: self.loader.DestroyWeChat() # 输出最终性能报告 if self.performance_monitor: logger.info("最终性能报告:") self.performance_monitor.print_stats() logger.success("=" * 60) logger.success("机器人已优雅关闭") logger.success("=" * 60) async def main(): """主函数""" # 读取性能配置 config_path = Path("main_config.toml") if config_path.exists(): with open(config_path, "rb") as f: config = tomllib.load(f) perf_config = config.get("Performance", {}) else: perf_config = {} # 配置日志 logger.remove() # 控制台日志(启动阶段始终启用,稳定后可配置禁用) console_enabled = perf_config.get("log_console_enabled", True) logger.add( sys.stdout, colorize=perf_config.get("log_colorize", True), format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", level=perf_config.get("log_level_console", "INFO"), filter=lambda record: console_enabled or "启动" in record["message"] or "初始化" in record["message"] or "成功" in record["message"] or "失败" in record["message"] or "错误" in record["message"] ) # 文件日志(始终启用) logger.add( "logs/hookbot.log", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", encoding="utf-8", rotation="5mb", # 减小文件大小 retention="1 week", # 缩短保留时间 level=perf_config.get("log_level_file", "INFO") ) # 创建并运行服务 service = BotService() await service.run() if __name__ == "__main__": # 检查 Python 版本 if sys.maxsize > 2**32: logger.error("请使用 32位 Python 运行此程序!") sys.exit(1) # 运行 asyncio.run(main())