import asyncio import json from loguru import logger from utils.gscore_client import gs_core_client # 模拟处理从服务端接收到的消息 async def my_message_handler(payload: dict): logger.info(f"成功接收到核心消息: {payload}") async def run_test(): # 1. 配置信息 (确保 URL 和 Token 正确) # 脚本会自动将其转换为 ws://192.168.2.240:8765/ws/abot?token=liuwei target_url = "ws://192.168.2.240:8765/ws/abot" test_token = "liuwei" logger.info("--- 开始 GsCoreClient 测试 ---") gs_core_client.configure( url=target_url, handler=my_message_handler, token=test_token ) # 2. 尝试连接 success = await gs_core_client.connect() if not success: logger.error("连接失败,请检查服务端 IP 或端口是否开放。") return # 3. 测试发送消息 # 模拟一个简单的指令或数据包 test_data = { "action": "test_echo", "content": "Hello GsCore!", "sender": "TestScript" } logger.info("尝试发送测试数据...") send_ok = await gs_core_client.send(json.dumps(test_data)) if send_ok: logger.success("消息发送指令已提交") else: logger.error("消息发送失败") # 4. 挂起运行,观察重连 logger.info("脚本将保持运行 60 秒。") logger.info("提示:此时你可以尝试重启服务端,验证脚本是否会自动执行 1012 错误后的重连...") try: for i in range(60): await asyncio.sleep(1) if i % 10 == 0 and i > 0: # 每10秒心跳一下,确保连接还活着 await gs_core_client.send(json.dumps({"type": "heartbeat", "index": i})) except KeyboardInterrupt: logger.warning("用户手动停止测试") finally: await gs_core_client.close() logger.info("--- 测试结束 ---") if __name__ == "__main__": try: asyncio.run(run_test()) except Exception as e: logger.critical(f"测试脚本崩溃: {e}")