"""
WechatHookBot - 主入口
基于个微大客户版 Hook API 的微信机器人框架
优化功能:
- 优先级消息队列
- 自适应熔断器
- 配置热更新
- 性能监控
- 优雅关闭
"""
import asyncio
import signal
import sys
import time
import tomllib
from pathlib import Path
from loguru import logger
from WechatHook import NoveLoader, WechatHookClient
from WechatHook.callbacks import (
add_callback_handler,
remove_callback_handler,
clear_all_callbacks,
wechat_connect_callback,
wechat_recv_callback,
wechat_close_callback,
CONNECT_CALLBACK,
RECV_CALLBACK,
CLOSE_CALLBACK
)
from utils.hookbot import HookBot
from utils.plugin_manager import PluginManager
from utils.decorators import scheduler
from utils.bot_utils import (
PriorityMessageQueue,
MessagePriority,
PRIORITY_MESSAGE_TYPES,
AdaptiveCircuitBreaker,
ConfigWatcher,
PerformanceMonitor,
get_performance_monitor
)
class BotService:
"""机器人服务类"""
def __init__(self):
self.loader = None
self.client = None
self.hookbot = None
self.plugin_manager = None
self.process_id = None # 微信进程 ID
self.socket_client_id = None # Socket 客户端 ID
self.is_running = False
self.is_shutting_down = False # 是否正在关闭
self.event_loop = None # 事件循环引用
# 消息队列和性能控制
self.message_queue: PriorityMessageQueue = None # 优先级消息队列
self.queue_config = {}
self.concurrency_config = {}
self.consumer_tasks = []
self.processing_semaphore = None
# 自适应熔断器
self.circuit_breaker: AdaptiveCircuitBreaker = None
# 配置热更新
self.config_watcher: ConfigWatcher = None
# 性能监控
self.performance_monitor: PerformanceMonitor = None
@CONNECT_CALLBACK(in_class=True)
def on_connect(self, client_id):
"""连接回调"""
logger.success(f"微信客户端已连接: {client_id}")
self.socket_client_id = client_id
@RECV_CALLBACK(in_class=True)
def on_receive(self, client_id, msg_type, data):
"""接收消息回调"""
# 减少日志输出,只记录关键消息类型
if msg_type == 11025: # 登录信息
logger.success(f"获取到登录信息: wxid={data.get('wxid', 'unknown')}, nickname={data.get('nickname', 'unknown')}")
if self.hookbot:
self.hookbot.update_profile(data.get('wxid', 'unknown'), data.get('nickname', 'unknown'))
# 初始化 CDN(必须在登录后执行,才能使用协议 API)
if self.client and self.event_loop:
logger.info("正在初始化 CDN...")
asyncio.run_coroutine_threadsafe(
self.client.cdn_init(),
self.event_loop
)
return
# 使用消息队列处理其他消息
if self.message_queue and self.event_loop:
try:
# 快速入队,不阻塞回调
asyncio.run_coroutine_threadsafe(
self._enqueue_message(msg_type, data),
self.event_loop
)
except Exception as e:
logger.error(f"消息入队失败: {e}")
async def _enqueue_message(self, msg_type, data):
"""将消息加入优先级队列"""
try:
# 记录收到消息
if self.performance_monitor:
self.performance_monitor.record_message_received()
# 检查队列是否已满
if self.message_queue.full():
overflow_strategy = self.queue_config.get("overflow_strategy", "drop_oldest")
if overflow_strategy == "drop_oldest":
# 丢弃优先级最低的消息
if self.message_queue.drop_lowest_priority():
logger.warning("队列已满,丢弃优先级最低的消息")
if self.performance_monitor:
self.performance_monitor.record_message_dropped()
elif overflow_strategy == "sampling":
# 采样处理,随机丢弃(但高优先级消息不丢弃)
import random
priority = PRIORITY_MESSAGE_TYPES.get(msg_type, MessagePriority.NORMAL)
if priority < MessagePriority.HIGH and random.random() < 0.5:
logger.debug("队列压力大,采样丢弃消息")
if self.performance_monitor:
self.performance_monitor.record_message_dropped()
return
else: # degrade
# 降级处理(但高优先级消息不丢弃)
priority = PRIORITY_MESSAGE_TYPES.get(msg_type, MessagePriority.NORMAL)
if priority < MessagePriority.HIGH:
logger.warning("队列已满,降级处理")
if self.performance_monitor:
self.performance_monitor.record_message_dropped()
return
# 将消息放入优先级队列
await self.message_queue.put(msg_type, data)
# 记录队列大小
if self.performance_monitor:
self.performance_monitor.record_queue_size(self.message_queue.qsize())
except Exception as e:
logger.error(f"消息入队异常: {e}")
async def _message_consumer(self, consumer_id: int):
"""消息消费者协程 - 纯队列串行模式,避免并发触发风控"""
logger.info(f"消息消费者 {consumer_id} 已启动(串行模式)")
while self.is_running and not self.is_shutting_down:
try:
# 从队列获取消息,设置超时避免无限等待
msg_type, data = await asyncio.wait_for(
self.message_queue.get(),
timeout=1.0
)
# 检查熔断器状态
if self.circuit_breaker and self.circuit_breaker.is_open():
logger.debug("熔断器开启,跳过消息处理")
self.circuit_breaker.record_rejection()
self.message_queue.task_done()
continue
# 串行处理:等待当前消息处理完成后再处理下一条
timeout = self.concurrency_config.get("plugin_task_timeout_seconds", 720)
start_time = time.time()
try:
await asyncio.wait_for(
self.hookbot.process_message(msg_type, data),
timeout=timeout
)
# 记录成功
processing_time = time.time() - start_time
if self.circuit_breaker:
self.circuit_breaker.record_success()
if self.performance_monitor:
self.performance_monitor.record_message_processed(processing_time)
except asyncio.TimeoutError:
logger.warning(f"消息处理超时 (>{timeout}s): type={msg_type}")
if self.circuit_breaker:
self.circuit_breaker.record_failure()
if self.performance_monitor:
self.performance_monitor.record_message_failed()
except Exception as e:
logger.error(f"消息处理异常: {e}")
if self.circuit_breaker:
self.circuit_breaker.record_failure()
if self.performance_monitor:
self.performance_monitor.record_message_failed()
# 标记任务完成
self.message_queue.task_done()
# 更新熔断器统计
if self.performance_monitor and self.circuit_breaker:
self.performance_monitor.update_circuit_breaker_stats(
self.circuit_breaker.get_stats()
)
# 消息间隔,避免发送太快触发风控
message_interval = self.concurrency_config.get("message_interval_ms", 100)
if message_interval > 0:
await asyncio.sleep(message_interval / 1000.0)
except asyncio.TimeoutError:
# 队列为空,继续等待
continue
except asyncio.CancelledError:
# 任务被取消,退出循环
logger.info(f"消费者 {consumer_id} 收到取消信号")
break
except Exception as e:
logger.error(f"消费者 {consumer_id} 异常: {e}")
await asyncio.sleep(0.1) # 短暂休息避免忙等
logger.info(f"消费者 {consumer_id} 已退出")
@CLOSE_CALLBACK(in_class=True)
def on_close(self, client_id):
"""断开连接回调"""
logger.warning(f"微信客户端已断开: {client_id}")
async def _wait_for_socket(self, timeout_seconds: int = 15) -> bool:
"""等待 socket 客户端连接"""
elapsed = 0
while elapsed < timeout_seconds:
if self.socket_client_id:
return True
await asyncio.sleep(1)
elapsed += 1
logger.info(f"等待微信客户端连接中... ({elapsed}/{timeout_seconds}s)")
return False
async def initialize(self):
"""初始化系统"""
logger.info("=" * 60)
logger.info("WechatHookBot 启动中...")
logger.info("=" * 60)
# 保存事件循环引用
self.event_loop = asyncio.get_event_loop()
# 读取配置
config_path = Path("main_config.toml")
if not config_path.exists():
logger.error("配置文件不存在: main_config.toml")
return False
with open(config_path, "rb") as f:
config = tomllib.load(f)
# 初始化性能配置
self.queue_config = config.get("Queue", {})
self.concurrency_config = config.get("Concurrency", {})
# 创建优先级消息队列
queue_size = self.queue_config.get("max_size", 1000)
self.message_queue = PriorityMessageQueue(maxsize=queue_size)
logger.info(f"优先级消息队列已创建,容量: {queue_size}")
# 创建并发控制信号量
max_concurrency = self.concurrency_config.get("plugin_max_concurrency", 8)
self.processing_semaphore = asyncio.Semaphore(max_concurrency)
logger.info(f"并发控制已设置,最大并发: {max_concurrency}")
# 创建自适应熔断器
if self.concurrency_config.get("enable_circuit_breaker", True):
self.circuit_breaker = AdaptiveCircuitBreaker(
failure_threshold=self.concurrency_config.get("circuit_breaker_threshold", 10),
success_threshold=3,
initial_recovery_time=5.0,
max_recovery_time=300.0
)
logger.info("自适应熔断器已创建")
# 创建性能监控器
self.performance_monitor = get_performance_monitor()
logger.info("性能监控器已创建")
# 创建配置热更新监听器
self.config_watcher = ConfigWatcher("main_config.toml", check_interval=5.0)
self.config_watcher.register_callback(self._on_config_update)
await self.config_watcher.start()
logger.info("配置热更新监听器已启动")
# 不需要数据库(简化版本)
# 获取 DLL 路径
hook_config = config.get("WechatHook", {})
loader_dll = hook_config.get("loader-dll", "libs/Loader.dll")
helper_dll = hook_config.get("helper-dll", "libs/Helper.dll")
# 创建共享内存(必须在创建 Loader 之前)
from WechatHook.loader import create_shared_memory
logger.info("创建共享内存...")
self.shared_memory_handle, self.shared_memory_address = create_shared_memory()
# 注册回调(必须在创建 Loader 之前)
add_callback_handler(self)
# 创建 Loader
logger.info("加载 Loader.dll...")
try:
self.loader = NoveLoader(loader_dll)
except Exception as e:
logger.error(f"加载 Loader.dll 失败: {e}")
return False
try:
version = self.loader.GetUserWeChatVersion()
logger.info(f"检测到本机微信版本: {version}")
except Exception as e:
logger.warning(f"无法获取微信版本信息: {e}")
# 注入微信
logger.info("注入微信...")
self.process_id = self.loader.InjectWeChat(helper_dll)
if not self.process_id:
logger.error("注入微信失败")
return False
# 等待 socket 客户端回调
if not await self._wait_for_socket(timeout_seconds=20):
logger.error("Socket 客户端未连接,请检查微信是否正在运行")
return False
# 额外等待 0.5s 确保稳定
await asyncio.sleep(0.5)
self.client = WechatHookClient(self.loader, self.socket_client_id)
# 创建 HookBot
self.hookbot = HookBot(self.client)
# 获取登录信息
logger.info("获取登录信息...")
await self.client.get_login_info()
await asyncio.sleep(2) # 增加等待时间确保回调执行
# 检查是否已通过回调获取到登录信息
if not self.hookbot.wxid:
logger.warning("未能通过回调获取登录信息,使用占位符")
self.hookbot.update_profile("unknown", "HookBot")
# 初始化 CDN(必须在登录后执行,才能使用协议 API)
logger.info("正在初始化 CDN...")
await self.client.cdn_init()
await asyncio.sleep(0.5) # 等待 CDN 初始化完成
# 加载插件
logger.info("加载插件...")
self.plugin_manager = PluginManager()
self.plugin_manager.set_bot(self.client)
loaded_plugins = await self.plugin_manager.load_plugins(load_disabled=False)
logger.success(f"已加载插件: {loaded_plugins}")
# 启动消息消费者
consumer_count = self.queue_config.get("consumer_count", 1)
for i in range(consumer_count):
consumer_task = asyncio.create_task(self._message_consumer(i))
self.consumer_tasks.append(consumer_task)
logger.success(f"已启动 {consumer_count} 个消息消费者")
# 启动定时任务
if scheduler.state == 0:
scheduler.start()
logger.success("定时任务已启动")
# 记录启动时间
import time
self.start_time = int(time.time())
logger.info(f"启动时间: {self.start_time}")
logger.success("=" * 60)
logger.success("WechatHookBot 启动成功!")
logger.success("=" * 60)
return True
def _on_config_update(self, new_config: dict):
"""配置热更新回调"""
logger.info("正在应用新配置...")
# 更新队列配置
self.queue_config = new_config.get("Queue", self.queue_config)
# 更新并发配置
old_concurrency = self.concurrency_config
self.concurrency_config = new_config.get("Concurrency", self.concurrency_config)
# 更新熔断器配置
if self.circuit_breaker:
new_threshold = self.concurrency_config.get("circuit_breaker_threshold", 10)
if new_threshold != old_concurrency.get("circuit_breaker_threshold", 10):
self.circuit_breaker.failure_threshold = new_threshold
logger.info(f"熔断器阈值已更新: {new_threshold}")
logger.success("配置热更新完成")
async def run(self):
"""运行机器人"""
if not await self.initialize():
return
self.is_running = True
# 启动定期性能报告
async def periodic_stats():
while self.is_running:
await asyncio.sleep(300) # 每5分钟输出一次
if self.performance_monitor and self.is_running:
self.performance_monitor.print_stats()
stats_task = asyncio.create_task(periodic_stats())
try:
logger.info("机器人正在运行,按 Ctrl+C 停止...")
while self.is_running:
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("收到停止信号...")
finally:
stats_task.cancel()
await self.stop()
async def stop(self):
"""优雅关闭机器人"""
if self.is_shutting_down:
return
self.is_shutting_down = True
logger.info("=" * 60)
logger.info("正在优雅关闭机器人...")
logger.info("=" * 60)
# 1. 停止接收新消息
self.is_running = False
logger.info("[1/7] 停止接收新消息")
# 2. 等待队列中的消息处理完成(带超时)
if self.message_queue and not self.message_queue.empty():
queue_size = self.message_queue.qsize()
logger.info(f"[2/7] 等待队列中 {queue_size} 条消息处理完成...")
try:
await asyncio.wait_for(
self.message_queue.join(),
timeout=30
)
logger.info("[2/7] 队列消息已全部处理完成")
except asyncio.TimeoutError:
logger.warning("[2/7] 队列消息未在 30 秒内处理完成,强制清空")
# 清空剩余消息
while not self.message_queue.empty():
try:
self.message_queue.get_nowait()
self.message_queue.task_done()
except:
break
else:
logger.info("[2/7] 队列为空,无需等待")
# 3. 停止消息消费者
if self.consumer_tasks:
logger.info(f"[3/7] 停止 {len(self.consumer_tasks)} 个消息消费者...")
for task in self.consumer_tasks:
task.cancel()
await asyncio.gather(*self.consumer_tasks, return_exceptions=True)
self.consumer_tasks.clear()
logger.info("[3/7] 消息消费者已停止")
else:
logger.info("[3/7] 无消费者需要停止")
# 4. 停止配置监听器
if self.config_watcher:
logger.info("[4/7] 停止配置监听器...")
await self.config_watcher.stop()
logger.info("[4/7] 配置监听器已停止")
else:
logger.info("[4/7] 无配置监听器")
# 5. 卸载插件
if self.plugin_manager:
logger.info("[5/7] 卸载插件...")
await self.plugin_manager.unload_plugins()
logger.info("[5/7] 插件已卸载")
else:
logger.info("[5/7] 无插件需要卸载")
# 6. 停止定时任务
if scheduler.running:
logger.info("[6/7] 停止定时任务...")
scheduler.shutdown()
logger.info("[6/7] 定时任务已停止")
else:
logger.info("[6/7] 定时任务未运行")
# 7. 清理回调和销毁微信连接
logger.info("[7/7] 清理资源...")
remove_callback_handler(self)
clear_all_callbacks()
if self.loader:
self.loader.DestroyWeChat()
# 输出最终性能报告
if self.performance_monitor:
logger.info("最终性能报告:")
self.performance_monitor.print_stats()
logger.success("=" * 60)
logger.success("机器人已优雅关闭")
logger.success("=" * 60)
async def main():
"""主函数"""
# 读取性能配置
config_path = Path("main_config.toml")
if config_path.exists():
with open(config_path, "rb") as f:
config = tomllib.load(f)
perf_config = config.get("Performance", {})
else:
perf_config = {}
# 配置日志
logger.remove()
# 控制台日志(启动阶段始终启用,稳定后可配置禁用)
console_enabled = perf_config.get("log_console_enabled", True)
logger.add(
sys.stdout,
colorize=perf_config.get("log_colorize", True),
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
level=perf_config.get("log_level_console", "INFO"),
filter=lambda record: console_enabled or "启动" in record["message"] or "初始化" in record["message"] or "成功" in record["message"] or "失败" in record["message"] or "错误" in record["message"]
)
# 文件日志(始终启用)
logger.add(
"logs/hookbot.log",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
encoding="utf-8",
rotation="5mb", # 减小文件大小
retention="1 week", # 缩短保留时间
level=perf_config.get("log_level_file", "INFO")
)
# 创建并运行服务
service = BotService()
await service.run()
if __name__ == "__main__":
# 检查 Python 版本
if sys.maxsize > 2**32:
logger.error("请使用 32位 Python 运行此程序!")
sys.exit(1)
# 运行
asyncio.run(main())