""" WechatHookBot - 主入口 基于新版 HTTP Hook API 的微信机器人框架 特点: - HTTP 回调接收消息 - HTTP API 发送消息 - 无需 DLL 注入(DLL 放到微信目录自动加载) - 优先级消息队列 - 自适应熔断器 - 配置热更新 - 性能监控 - 优雅关闭 """ import asyncio import signal import sys import time from pathlib import Path from loguru import logger from WechatHook import WechatHookClient from WechatHook.http_server import CallbackServer from WechatHook.message_types import normalize_from_callback, get_internal_msg_type from utils.hookbot import HookBot from utils.config_manager import get_config, get_main_config_path, get_project_root from utils.plugin_manager import PluginManager from utils.decorators import scheduler from utils.message_queue import PriorityMessageQueue, MessagePriority from utils.bot_utils import ( PRIORITY_MESSAGE_TYPES, AdaptiveCircuitBreaker, ConfigWatcher, PerformanceMonitor, get_performance_monitor ) from utils.operation_lock import OperationLock class BotService: """机器人服务类""" def __init__(self): self.base_dir = get_project_root() self.config_path = get_main_config_path() self.client: WechatHookClient = None self.callback_server: CallbackServer = None self.hookbot: HookBot = None self.plugin_manager: PluginManager = None self.is_running = False self.is_shutting_down = False self.event_loop = None # 消息队列和性能控制 self.message_queue: PriorityMessageQueue = None self.queue_config = {} self.concurrency_config = {} self.consumer_tasks = [] self.processing_semaphore = None # 自适应熔断器 self.circuit_breaker: AdaptiveCircuitBreaker = None # 配置热更新 self.config_watcher: ConfigWatcher = None # 性能监控 self.performance_monitor: PerformanceMonitor = None # 配置 self.config = {} # WebUI self.webui_server = None async def on_message_callback(self, message_type: str, data: dict): """ HTTP 回调消息处理 Args: message_type: 消息类型 (private_message/group_message/moments_message/chatroom_member_add/chatroom_member_remove/chatroom_info_change/chatroom_member_nickname_change) data: 原始消息数据 """ if OperationLock.is_paused(): logger.debug(f"更新中忽略消息: type={message_type}") return if self.is_shutting_down: logger.debug(f"关闭中忽略消息: type={message_type}") return # 跳过朋友圈消息 if message_type == "moments_message": logger.debug("跳过朋友圈消息") return # 处理群事件(event_type 类型的消息) if message_type in ["chatroom_member_add", "chatroom_member_remove", "chatroom_info_change", "chatroom_member_nickname_change"]: await self._handle_chatroom_event(message_type, data) return # 使用消息队列处理普通消息 if self.message_queue and self.event_loop: try: await self._enqueue_message(message_type, data) except Exception as e: logger.error(f"消息入队失败: {e}") else: logger.warning(f"消息队列未就绪: queue={self.message_queue is not None}, loop={self.event_loop is not None}") async def _handle_chatroom_event(self, event_type: str, data: dict): """ 处理群事件(event_type 类型的消息) Args: event_type: 事件类型 (chatroom_member_add/chatroom_member_remove/chatroom_info_change/chatroom_member_nickname_change) data: 原始事件数据 """ try: logger.info(f"[群事件] 收到事件: {event_type}") # 提取事件数据 event_data = data.get("data", {}) room_wxid = event_data.get("roomid", "") member_count = event_data.get("membercount", 0) member_list_data = event_data.get("memberlist", {}) # 构造标准化的消息格式 normalized_msg = { "MsgType": self._get_event_msg_type(event_type), "RoomWxid": room_wxid, "MemberCount": member_count, "MemberList": [], } # 处理成员列表(可能是单个对象或数组) if isinstance(member_list_data, dict): # 单个成员 member_info = { "wxid": member_list_data.get("userName", ""), "nickname": member_list_data.get("nickName", ""), "display_name": member_list_data.get("displayName", ""), "avatar": member_list_data.get("bigHeadImgUrl", ""), } normalized_msg["MemberList"].append(member_info) elif isinstance(member_list_data, list): # 多个成员 for member in member_list_data: member_info = { "wxid": member.get("userName", ""), "nickname": member.get("nickName", ""), "display_name": member.get("displayName", ""), "avatar": member.get("bigHeadImgUrl", ""), } normalized_msg["MemberList"].append(member_info) logger.info(f"[群事件] 标准化消息: room={room_wxid}, members={len(normalized_msg['MemberList'])}") # 直接触发事件(不经过消息队列) from utils.event_manager import EventManager await EventManager.emit(event_type, self.client, normalized_msg) except Exception as e: logger.error(f"处理群事件失败: {e}") import traceback logger.error(f"详细错误: {traceback.format_exc()}") def _get_event_msg_type(self, event_type: str) -> int: """获取事件对应的消息类型码""" from WechatHook.message_types import MessageType event_map = { "chatroom_member_add": MessageType.MT_CHATROOM_MEMBER_ADD, "chatroom_member_remove": MessageType.MT_CHATROOM_MEMBER_REMOVE, "chatroom_info_change": MessageType.MT_CHATROOM_INFO_CHANGE, "chatroom_member_nickname_change": MessageType.MT_CHATROOM_MEMBER_NICKNAME_CHANGE, } return event_map.get(event_type, 11046) async def _enqueue_message(self, message_type: str, data: dict): """将消息加入优先级队列""" try: # 记录收到消息 if self.performance_monitor: self.performance_monitor.record_message_received() # 获取内部消息类型 wechat_msg_type = str(data.get("msgType", "1")) internal_type = get_internal_msg_type(wechat_msg_type, data) priority = PRIORITY_MESSAGE_TYPES.get(internal_type, MessagePriority.NORMAL) # 将消息放入优先级队列 # 存储 (message_type, data) 元组 accepted = await self.message_queue.put( internal_type, {"_callback_type": message_type, **data}, priority=priority ) if not accepted: if self.performance_monitor: self.performance_monitor.record_message_dropped() return # 记录队列大小 if self.performance_monitor: self.performance_monitor.record_queue_size(self.message_queue.qsize()) except Exception as e: logger.error(f"消息入队异常: {e}") async def _ensure_consumer_count(self, target_count: int): """按目标数量调整消费者协程。""" target_count = max(int(target_count), 1) current_count = len(self.consumer_tasks) if target_count > current_count: for consumer_id in range(current_count, target_count): consumer_task = asyncio.create_task(self._message_consumer(consumer_id)) self.consumer_tasks.append(consumer_task) logger.info(f"消息消费者数量已扩容到 {target_count}") return if target_count < current_count: tasks_to_stop = self.consumer_tasks[target_count:] self.consumer_tasks = self.consumer_tasks[:target_count] for task in tasks_to_stop: task.cancel() await asyncio.gather(*tasks_to_stop, return_exceptions=True) logger.info(f"消息消费者数量已缩容到 {target_count}") async def _message_consumer(self, consumer_id: int): """消息消费者协程""" logger.info(f"消息消费者 {consumer_id} 已启动") while True: if self.is_shutting_down and (not self.message_queue or self.message_queue.empty()): break try: if OperationLock.is_paused(): await OperationLock.wait_if_paused() continue message_acquired = False msg_type = None data = None try: # 从队列获取消息 msg_type, data = await asyncio.wait_for( self.message_queue.get(), timeout=1.0 ) message_acquired = True # 检查熔断器状态 if self.circuit_breaker and self.circuit_breaker.is_open(): logger.debug("熔断器开启,跳过消息处理") self.circuit_breaker.record_rejection() continue # 标准化消息 callback_type = data.pop("_callback_type", "private_message") normalized_msg = normalize_from_callback(callback_type, data) # 从消息中提取群成员信息并缓存 if callback_type == "group_message" and self.client: sender_profile = data.get("sender_profile") or {} new_chatroom_data = sender_profile.get("newChatroomData") or {} members = new_chatroom_data.get("chatRoomMember") or [] room_id = normalized_msg.get("RoomWxid", "") if members and room_id: self.client.update_chatroom_members_cache(room_id, members) # 处理消息 timeout = self.concurrency_config.get("plugin_task_timeout_seconds", 720) start_time = time.time() try: if self.processing_semaphore: async with self.processing_semaphore: await asyncio.wait_for( self.hookbot.process_message(msg_type, normalized_msg), timeout=timeout ) else: await asyncio.wait_for( self.hookbot.process_message(msg_type, normalized_msg), timeout=timeout ) processing_time = time.time() - start_time if self.circuit_breaker: self.circuit_breaker.record_success() if self.performance_monitor: self.performance_monitor.record_message_processed(processing_time) except asyncio.TimeoutError: logger.warning(f"消息处理超时 (>{timeout}s): type={msg_type}") if self.circuit_breaker: self.circuit_breaker.record_failure() if self.performance_monitor: self.performance_monitor.record_message_failed() except Exception as e: logger.error(f"消息处理异常: {e}") if self.circuit_breaker: self.circuit_breaker.record_failure() if self.performance_monitor: self.performance_monitor.record_message_failed() # 更新熔断器统计 if self.performance_monitor and self.circuit_breaker: self.performance_monitor.update_circuit_breaker_stats( self.circuit_breaker.get_stats() ) # 消息间隔 message_interval = self.concurrency_config.get("message_interval_ms", 100) if message_interval > 0: await asyncio.sleep(message_interval / 1000.0) except asyncio.TimeoutError: if self.is_shutting_down and self.message_queue and self.message_queue.empty(): break continue finally: if message_acquired and self.message_queue: self.message_queue.task_done() except asyncio.CancelledError: logger.info(f"消费者 {consumer_id} 收到取消信号") break except Exception as e: logger.error(f"消费者 {consumer_id} 异常: {e}") import traceback logger.error(traceback.format_exc()) await asyncio.sleep(0.1) logger.info(f"消费者 {consumer_id} 已退出") async def initialize(self): """初始化系统""" logger.info("=" * 60) logger.info("WechatHookBot 启动中... (HTTP 协议版本)") logger.info("=" * 60) self.event_loop = asyncio.get_event_loop() # 读取配置 config_path = self.config_path if not config_path.exists(): logger.error(f"配置文件不存在: {config_path}") return False self.config = get_config().get_all() # 初始化性能配置 self.queue_config = self.config.get("Queue", {}) self.concurrency_config = self.config.get("Concurrency", {}) # 创建优先级消息队列 self.message_queue = PriorityMessageQueue.from_config(self.queue_config) logger.info( f"优先级消息队列已创建,容量: {self.message_queue.maxsize}, " f"溢出策略: {self.message_queue.overflow_strategy.value}" ) # 创建并发控制信号量 max_concurrency = self.concurrency_config.get("plugin_max_concurrency", 8) self.processing_semaphore = asyncio.Semaphore(max_concurrency) logger.info(f"并发控制已设置,最大并发: {max_concurrency}") # 创建自适应熔断器 if self.concurrency_config.get("enable_circuit_breaker", True): self.circuit_breaker = AdaptiveCircuitBreaker( failure_threshold=self.concurrency_config.get("circuit_breaker_threshold", 10), success_threshold=3, initial_recovery_time=5.0, max_recovery_time=300.0 ) logger.info("自适应熔断器已创建") # 创建性能监控器 self.performance_monitor = get_performance_monitor() logger.info("性能监控器已创建") # 创建配置热更新监听器 self.config_watcher = ConfigWatcher(str(self.config_path), check_interval=5.0) self.config_watcher.register_callback(self._on_config_update) await self.config_watcher.start() logger.info("配置热更新监听器已启动") # 获取 HTTP 配置 http_config = self.config.get("HttpHook", {}) api_base_url = http_config.get("api-url", "http://127.0.0.1:8888") callback_host = http_config.get("callback-host", "0.0.0.0") callback_port = http_config.get("callback-port", 9999) # 创建 HTTP 客户端 logger.info(f"连接 Hook API: {api_base_url}") self.client = WechatHookClient(base_url=api_base_url) # 创建 HookBot self.hookbot = HookBot(self.client) # 微信初始化(刷新好友列表、群列表缓存) logger.info("执行微信初始化...") if await self.client.wechat_init(): logger.success("微信初始化成功") else: logger.warning("微信初始化失败,部分功能可能受影响") # 获取登录信息 logger.info("获取登录信息...") login_info = await self.client.get_login_info() if login_info and self.client.wxid: logger.success(f"获取登录信息成功: wxid={self.client.wxid}, nickname={self.client.nickname}") self.hookbot.update_profile(self.client.wxid, self.client.nickname) else: # 使用配置中的备用信息 bot_config = self.config.get("Bot", {}) fallback_wxid = bot_config.get("wxid", "unknown") fallback_nickname = bot_config.get("nickname", "HookBot") logger.warning(f"获取登录信息失败,使用配置中的备用信息: {fallback_wxid}") self.hookbot.update_profile(fallback_wxid, fallback_nickname) # 创建并启动回调服务器 logger.info(f"启动回调服务器: {callback_host}:{callback_port}") self.callback_server = CallbackServer(host=callback_host, port=callback_port) self.callback_server.add_message_handler(self.on_message_callback) if not await self.callback_server.start(): logger.error("回调服务器启动失败") return False # 启动 WebUI webui_config = self.config.get("WebUI", {}) if webui_config.get("enabled", False): try: from utils.webui import WebUIServer webui_host = webui_config.get("host", "0.0.0.0") webui_port = webui_config.get("port", 5001) self.webui_server = WebUIServer(host=webui_host, port=webui_port, config_path=str(self.config_path)) await self.webui_server.start() except Exception as e: logger.error(f"WebUI 启动失败: {e}") # 加载插件 logger.info("加载插件...") self.plugin_manager = PluginManager() self.plugin_manager.set_bot(self.client) loaded_plugins = await self.plugin_manager.load_plugins(load_disabled=False) logger.success(f"已加载插件: {loaded_plugins}") # 启动消息消费者 consumer_count = self.queue_config.get("consumer_count", 1) await self._ensure_consumer_count(consumer_count) logger.success(f"已启动 {consumer_count} 个消息消费者") # 启动定时任务 if scheduler.state == 0: scheduler.start() logger.success("定时任务已启动") # 记录启动时间 self.start_time = int(time.time()) logger.success("=" * 60) logger.success("WechatHookBot 启动成功!") logger.success(f"回调地址: http://{callback_host}:{callback_port}") logger.success("请确保 Hook 已配置正确的回调地址") logger.success("=" * 60) return True async def _on_config_update(self, new_config: dict): """配置热更新回调""" logger.info("正在应用新配置...") self.config = new_config or self.config old_queue = self.queue_config self.queue_config = new_config.get("Queue", self.queue_config) old_concurrency = self.concurrency_config self.concurrency_config = new_config.get("Concurrency", self.concurrency_config) if self.message_queue: await self.message_queue.update_config( maxsize=self.queue_config.get("max_size", self.message_queue.maxsize), overflow_strategy=self.queue_config.get("overflow_strategy", self.message_queue.overflow_strategy.value), sampling_rate=self.queue_config.get("sampling_rate", self.message_queue.sampling_rate), ) logger.info( f"消息队列配置已更新: max_size={self.message_queue.maxsize}, " f"overflow={self.message_queue.overflow_strategy.value}" ) enable_circuit_breaker = self.concurrency_config.get("enable_circuit_breaker", True) if enable_circuit_breaker and not self.circuit_breaker: self.circuit_breaker = AdaptiveCircuitBreaker( failure_threshold=self.concurrency_config.get("circuit_breaker_threshold", 10), success_threshold=3, initial_recovery_time=5.0, max_recovery_time=300.0 ) logger.info("已按新配置启用熔断器") elif not enable_circuit_breaker and self.circuit_breaker: self.circuit_breaker = None logger.info("已按新配置禁用熔断器") if self.circuit_breaker: new_threshold = self.concurrency_config.get("circuit_breaker_threshold", 10) if new_threshold != old_concurrency.get("circuit_breaker_threshold", 10): self.circuit_breaker.failure_threshold = new_threshold logger.info(f"熔断器阈值已更新: {new_threshold}") new_max_concurrency = self.concurrency_config.get("plugin_max_concurrency", 8) if new_max_concurrency != old_concurrency.get("plugin_max_concurrency", 8): self.processing_semaphore = asyncio.Semaphore(new_max_concurrency) logger.info(f"插件并发上限已更新: {new_max_concurrency}") new_consumer_count = self.queue_config.get("consumer_count", len(self.consumer_tasks) or 1) if new_consumer_count != len(self.consumer_tasks): await self._ensure_consumer_count(new_consumer_count) if self.queue_config.get("consumer_count") != old_queue.get("consumer_count"): logger.info(f"消息消费者数量已更新: {new_consumer_count}") logger.success("配置热更新完成") async def run(self): """运行机器人""" if not await self.initialize(): return self.is_running = True # 启动定期性能报告 async def periodic_stats(): while self.is_running: await asyncio.sleep(300) if self.performance_monitor and self.is_running: self.performance_monitor.print_stats() stats_task = asyncio.create_task(periodic_stats()) try: logger.info("机器人正在运行,按 Ctrl+C 停止...") while self.is_running: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("收到停止信号...") finally: stats_task.cancel() await self.stop() async def stop(self): """优雅关闭机器人""" if self.is_shutting_down: return self.is_shutting_down = True logger.info("=" * 60) logger.info("正在优雅关闭机器人...") logger.info("=" * 60) # 1. 停止接收新消息 self.is_running = False logger.info("[1/7] 停止接收新消息") if self.callback_server: logger.info("[1/7] 关闭消息入口...") await self.callback_server.stop() logger.info("[1/7] 消息入口已关闭") # 2. 等待队列中的消息处理完成 if self.message_queue and not self.message_queue.empty(): queue_size = self.message_queue.qsize() logger.info(f"[2/7] 等待队列中 {queue_size} 条消息处理完成...") try: await asyncio.wait_for(self.message_queue.join(), timeout=30) logger.info("[2/7] 队列消息已全部处理完成") except asyncio.TimeoutError: logger.warning("[2/7] 队列消息未在 30 秒内处理完成,将在停止消费者后清空剩余消息") else: logger.info("[2/7] 队列为空,无需等待") # 3. 停止消息消费者 if self.consumer_tasks: logger.info(f"[3/7] 停止 {len(self.consumer_tasks)} 个消息消费者...") for task in self.consumer_tasks: task.cancel() await asyncio.gather(*self.consumer_tasks, return_exceptions=True) self.consumer_tasks.clear() logger.info("[3/7] 消息消费者已停止") else: logger.info("[3/7] 无消费者需要停止") # 4. 回调服务器已在前面关闭,这里仅补充日志 if self.callback_server: logger.info("[4/7] 回调服务器已停止") else: logger.info("[4/7] 无回调服务器") # 4.5 停止 WebUI if self.webui_server: await self.webui_server.stop() # 5. 停止配置监听器 if self.config_watcher: logger.info("[5/7] 停止配置监听器...") await self.config_watcher.stop() logger.info("[5/7] 配置监听器已停止") else: logger.info("[5/7] 无配置监听器") # 6. 卸载插件 if self.plugin_manager: logger.info("[6/7] 卸载插件...") await self.plugin_manager.unload_plugins() logger.info("[6/7] 插件已卸载") else: logger.info("[6/7] 无插件需要卸载") # 7. 停止定时任务和关闭客户端 logger.info("[7/7] 清理资源...") if self.message_queue and not self.message_queue.empty(): remaining = self.message_queue.qsize() self.message_queue.clear() logger.warning(f"[7/7] 已清空剩余队列消息: {remaining}") if scheduler.running: scheduler.shutdown() if self.client: await self.client.close() # 输出最终性能报告 if self.performance_monitor: logger.info("最终性能报告:") self.performance_monitor.print_stats() logger.success("=" * 60) logger.success("机器人已优雅关闭") logger.success("=" * 60) async def main(): """主函数""" # 读取性能配置 project_root = get_project_root() logs_dir = project_root / "logs" logs_dir.mkdir(parents=True, exist_ok=True) config_path = get_main_config_path() if config_path.exists(): perf_config = get_config().get_section("Performance") else: perf_config = {} # 配置日志 logger.remove() console_enabled = perf_config.get("log_console_enabled", True) logger.add( sys.stdout, colorize=perf_config.get("log_colorize", True), format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", level=perf_config.get("log_level_console", "INFO"), filter=lambda record: console_enabled or "启动" in record["message"] or "初始化" in record["message"] or "成功" in record["message"] or "失败" in record["message"] or "错误" in record["message"] ) logger.add( str(logs_dir / "hookbot.log"), format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", encoding="utf-8", rotation="5mb", retention="1 week", level=perf_config.get("log_level_file", "INFO") ) # WebUI 日志 sink try: from utils.webui import loguru_sink logger.add( loguru_sink, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", level=perf_config.get("log_level_console", "INFO"), ) except Exception: pass # 创建并运行服务 service = BotService() await service.run() if __name__ == "__main__": # 注意:新协议不再需要 32 位 Python asyncio.run(main())