Files
abot/wechat_ipad/providers/server_864/runtime.py
2026-05-07 14:20:55 +08:00

435 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import os
import time
from typing import Any, Awaitable, Callable
import toml
AsyncCallback = Callable[..., Awaitable[None]]
class Server864RuntimeMixin:
"""864 provider 的运行时编排。
设计说明:
1. 864 的差异重点在于“固定 key + server 侧维护更多登录状态”;
2. 因此这里不再照搬 855 的心跳/长心跳双循环,而是采用“登录确认 + 初始化等待 + HTTP 消息轮询”;
3. 这样能先把现有 Robot 主链路无感切到 864后续若要补 WS 监听也只需在本目录内演进。
"""
def _init_runtime_state(self) -> None:
self._runtime_running = False
def stop_runtime(self) -> None:
self._runtime_running = False
def is_runtime_running(self) -> bool:
return bool(getattr(self, "_runtime_running", False))
async def run_runtime(
self,
*,
ipad_config: dict,
state_path: str,
logger,
on_login_ready: AsyncCallback,
on_history_message: AsyncCallback,
on_message: AsyncCallback,
on_idle_payload: AsyncCallback | None = None,
on_logout: AsyncCallback | None = None,
on_runtime_state_change: AsyncCallback | None = None,
on_login_qr_update: AsyncCallback | None = None,
on_login_qr_cleared: AsyncCallback | None = None,
) -> None:
"""启动 864 provider 的运行时主循环。"""
del on_history_message
server_key = str(ipad_config.get("server_key", "") or "").strip()
if not server_key:
raise ValueError("server_864 启动失败:缺少 server_key请在 .env 中配置 WECHAT_SERVER_KEY")
self.server_key = server_key
login_qr_api = str(ipad_config.get("login_qr_api", "new_x") or "new_x").strip()
login_way = str(ipad_config.get("login_way", "mac") or "mac").strip()
await self._ensure_login(
ipad_config=ipad_config,
state_path=state_path,
logger=logger,
on_login_qr_update=on_login_qr_update,
on_login_qr_cleared=on_login_qr_cleared,
login_qr_api=login_qr_api,
login_way=login_way,
)
await on_login_ready(self.get_login_identity())
logger.info("server_864 登录成功")
await self._set_runtime_running(True, on_runtime_state_change=on_runtime_state_change, logger=logger)
try:
logger.info("开始处理 server_864 消息轮询")
while self.is_runtime_running():
try:
data_temp = await self.sync_message()
except Exception as e:
logger.error(f"server_864 获取新消息失败: {e}")
await self._safe_callback(on_logout, str(e), logger=logger, callback_name="on_logout")
await self._set_runtime_running(False, on_runtime_state_change=on_runtime_state_change, logger=logger)
break
data = data_temp.get("AddMsgs") or []
if data:
for message in data:
await self._safe_callback(on_message, message, logger=logger, callback_name="on_message")
elif on_idle_payload:
await self._safe_callback(
on_idle_payload,
data_temp,
logger=logger,
callback_name="on_idle_payload",
)
await asyncio.sleep(2)
finally:
await self._set_runtime_running(False, on_runtime_state_change=on_runtime_state_change, logger=logger)
async def _ensure_login(
self,
*,
ipad_config: dict,
state_path: str,
logger,
on_login_qr_update: AsyncCallback | None = None,
on_login_qr_cleared: AsyncCallback | None = None,
login_qr_api: str = "new_x",
login_way: str = "mac",
) -> None:
"""确保 864 已完成登录。"""
if await self.is_logged_in():
await self._refresh_identity_from_profile(logger=logger)
# 864 在“服务端已经在线、ABOT 只是后启动”的场景下会直接走这里:
# 1. 之前这条分支只清理二维码态,没有补写本地 runtime_state
# 2. 这会让用户误以为“明明已经登录成功,却没有生成 provider 状态文件”;
# 3. 因此这里也统一补一次落盘,让首次接管现有登录态与扫码新登录的结果保持一致。
ipad_config["wxid"] = self.wxid
ipad_config["login_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self._save_runtime_state(
state_path=state_path,
state_payload={"wxid": self.wxid, "login_time": ipad_config["login_time"]},
logger=logger,
)
await self._safe_callback(
on_login_qr_cleared,
{
"status": "logged_in",
"status_text": "已检测到现有登录态",
"provider_name": "server_864",
"provider_stage": "logged_in",
"connection_ready": True,
"login_required": False,
},
logger=logger,
callback_name="on_login_qr_cleared",
)
return
# 先探测一次 864 当前阶段,让 Dashboard 能直接区分“等服务端准备”和“需要扫码”:
# 1. 864 的未登录态并不只有一种,部分场景其实是远端连接对象还没建好;
# 2. 若首页始终只显示“未登录”,运维很难判断下一步是等服务端还是去扫码;
# 3. 这里把差异压缩成轻量阶段字段,供前端直接展示,不改动核心登录流程。
login_stage_snapshot = await self._probe_login_stage()
await self._safe_callback(
on_login_qr_update,
login_stage_snapshot,
logger=logger,
callback_name="on_login_qr_update",
)
uuid, url = await self.get_qr_code(print_qr=True, login_qr_api=login_qr_api, login_way=login_way)
scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else ""
await self._safe_callback(
on_login_qr_update,
{
"uuid": uuid,
"url": url,
"scan_url": scan_url,
"expires_in": None,
"status": "waiting",
"status_text": "等待扫码登录",
"login_source": "fresh_qr",
"provider_name": "server_864",
"provider_stage": "waiting_scan",
"connection_ready": False,
"login_required": True,
"login_qr_api": login_qr_api,
"login_way": login_way,
},
logger=logger,
callback_name="on_login_qr_update",
)
while True:
is_logged_in, login_status = await self.check_login_status()
if is_logged_in:
await self._safe_callback(
on_login_qr_cleared,
{
"status": "confirmed",
"status_text": "扫码登录成功",
"uuid": uuid,
"provider_name": "server_864",
"provider_stage": "logged_in",
"connection_ready": True,
"login_required": False,
},
logger=logger,
callback_name="on_login_qr_cleared",
)
break
# 864 的登录状态查询会回传当前 uuid 和有效期:
# 1. 真实联调中已确认 `CheckLoginStatus` 会返回 `uuid/effective_time`
# 2. 这些值应优先作为 Dashboard 的二维码倒计时与当前扫码目标来源;
# 3. 一旦 server 侧切换了新的 uuid这里也要及时覆盖本地展示态避免前端一直盯着旧码。
latest_uuid = str(login_status.get("uuid", "") or uuid).strip() or uuid
effective_time = int(login_status.get("effective_time", 0) or 0)
verification_url = str(login_status.get("verification_url", "") or "").strip()
if latest_uuid != uuid:
uuid = latest_uuid
scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else ""
url = f"https://api.2dcode.biz/v1/create-qr-code?data={scan_url}" if scan_url else url
provider_stage = "verification_required" if verification_url else ("waiting_scan" if uuid else "login_required")
# 864 在“已扫码但待安全验证”阶段,`msg/loginState` 往往是空字符串:
# 1. 若直接套用默认“等待扫码登录”,用户会误以为还没扫上;
# 2. 因此这里优先识别 `verification_url`,给 Dashboard 一个更准确的引导文案;
# 3. 只有完全拿不到状态提示时,才回退到普通扫码等待文案。
raw_status_text = str(login_status.get("msg") or login_status.get("loginState") or "").strip()
if verification_url and not raw_status_text:
status_text = "扫码已完成,请继续打开验证链接完成安全验证"
else:
status_text = raw_status_text or "等待扫码登录"
await self._safe_callback(
on_login_qr_update,
{
"uuid": uuid,
"url": url,
"scan_url": scan_url,
"expires_in": effective_time if effective_time > 0 else None,
"status": "waiting",
"status_text": status_text,
"login_source": "fresh_qr",
"provider_name": "server_864",
"provider_stage": provider_stage,
"connection_ready": False,
"login_required": True,
"verification_url": verification_url,
"login_qr_api": login_qr_api,
"login_way": login_way,
},
logger=logger,
callback_name="on_login_qr_update",
)
# 若 server 已明确告知二维码失效,则立即重新申请一张新码:
# 1. 这能避免 Dashboard 一直展示一张已经不可扫的旧二维码;
# 2. 也能让新环境登录时的交互与 855 保持一致,都是“过期就自动刷新”;
# 3. 重新申请后直接回到当前 while 顶部继续轮询新的 uuid 状态。
if effective_time <= 0:
uuid, url = await self.get_qr_code(print_qr=True, login_qr_api=login_qr_api, login_way=login_way)
scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else ""
await self._safe_callback(
on_login_qr_update,
{
"uuid": uuid,
"url": url,
"scan_url": scan_url,
"expires_in": None,
"status": "waiting",
"status_text": "二维码已刷新,等待扫码登录",
"login_source": "refresh_qr",
"provider_name": "server_864",
"provider_stage": "waiting_scan",
"connection_ready": False,
"login_required": True,
"login_qr_api": login_qr_api,
"login_way": login_way,
},
logger=logger,
callback_name="on_login_qr_update",
)
await asyncio.sleep(5)
await self._wait_init_ready(logger=logger)
await self._refresh_identity_from_profile(logger=logger)
ipad_config["wxid"] = self.wxid
ipad_config["login_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self._save_runtime_state(
state_path=state_path,
state_payload={"wxid": self.wxid, "login_time": ipad_config["login_time"]},
logger=logger,
)
async def _probe_login_stage(self) -> dict[str, Any]:
"""探测 864 当前登录阶段,供 Dashboard 展示更准确的运维状态。"""
default_payload = {
"status": "waiting",
"status_text": "等待扫码登录",
"login_source": "fresh_qr",
"provider_name": "server_864",
"provider_stage": "login_required",
"connection_ready": False,
"login_required": True,
}
try:
login_status = await self.get_login_status(auto_login=False)
except Exception as e:
error_message = str(e).strip()
if "该链接不存在" in error_message:
return {
**default_payload,
"status_text": "864 服务端连接尚未建立,等待创建登录连接",
"provider_stage": "connection_pending",
"login_required": False,
}
if "重新登录" in error_message or "未登录" in error_message:
return {
**default_payload,
"status_text": error_message or "当前登录态已失效,等待重新扫码登录",
}
return {
**default_payload,
"status": "unavailable",
"status_text": error_message or "864 登录状态暂不可用",
"provider_stage": "status_unavailable",
}
if isinstance(login_status, dict):
login_message = str(
login_status.get("msg")
or login_status.get("message")
or login_status.get("loginState")
or ""
).strip()
if "在线" in login_message or str(login_status.get("loginState", "")).strip().lower() == "online":
return {
"status": "logged_in",
"status_text": "已检测到现有登录态",
"login_source": "runtime_state",
"provider_name": "server_864",
"provider_stage": "logged_in",
"connection_ready": True,
"login_required": False,
}
if "重新登录" in login_message or "未登录" in login_message:
return {
**default_payload,
"status_text": login_message or "当前登录态已失效,等待重新扫码登录",
}
return default_payload
async def _wait_init_ready(self, *, logger) -> None:
"""等待 864 server 侧初始化完成。"""
for _ in range(30):
try:
if await self.get_init_status():
return
except Exception as e:
error_message = str(e).strip()
# 864 不同版本对“初始化状态”接口的支持并不一致:
# 1. 实际联调里已经出现“登录成功后消息同步正常,但 GetInItStatus 返回该链接不存在”的情况;
# 2. 这说明它更像一个附加探针,而不是启动主链路的硬前置条件;
# 3. 因此遇到这种兼容性错误时直接放行,避免机器人已经可用却持续打印误导性告警。
if "该链接不存在" in error_message:
logger.info("server_864 当前版本未提供稳定的初始化状态查询,跳过该探针并继续启动")
return
logger.warning(f"server_864 检查初始化状态失败: {error_message}")
await asyncio.sleep(2)
async def _refresh_identity_from_profile(self, *, logger) -> None:
"""从 864 的资料接口刷新当前登录身份。"""
try:
profile = await self.get_profile()
except Exception as e:
error_message = str(e).strip()
# 864 有些版本在消息链路可用后,资料接口仍可能短时间不可用:
# 1. 此时若直接抛异常,会让“已经登录成功”的启动流程被资料查询反向拖垮;
# 2. 项目主链路真正依赖的是后续消息同步与发送能力,而不是这里的展示性资料;
# 3. 所以这里改成降级告警,保留已有身份字段,等后续再由可用接口补齐。
logger.warning(f"server_864 刷新登录账号资料失败,继续沿用当前缓存身份: {error_message}")
return
self.wxid = str(
profile.get("UserName")
or profile.get("userName")
or profile.get("Wxid")
or profile.get("wxid")
or self.wxid
)
nickname = profile.get("NickName") or profile.get("nickName") or profile.get("Nickname") or ""
if isinstance(nickname, dict):
nickname = nickname.get("string", "")
alias = profile.get("Alias") or profile.get("alias") or ""
phone = profile.get("Mobile") or profile.get("mobile") or profile.get("BindMobile") or ""
if isinstance(phone, dict):
phone = phone.get("string", "")
signature = profile.get("Signature") or profile.get("signature") or ""
self.nickname = str(nickname or "")
self.alias = str(alias or "")
self.phone = str(phone or "")
self.signature = str(signature or "")
logger.info(
f"server_864 登录账号信息: wxid: {self.wxid} 昵称: {self.nickname} 微信号: {self.alias} 手机号: {self.phone}"
)
@staticmethod
def _save_runtime_state(*, state_path: str, state_payload: dict[str, Any], logger) -> None:
"""保存 864 provider 的本地登录缓存。"""
try:
normalized_path = str(state_path or "").strip()
if not normalized_path:
return
state_dir = os.path.dirname(normalized_path)
if state_dir:
os.makedirs(state_dir, exist_ok=True)
with open(normalized_path, "w", encoding="utf-8") as f:
toml.dump(state_payload, f)
# 这里显式打印最终落盘路径,方便线上排查“文件写到哪里去了”:
# 1. runtime_state 当前使用相对路径,最终位置会受启动工作目录影响;
# 2. 用户在切到 Docker / Windows 服务 / 脚本托管时,最容易在这里产生认知偏差;
# 3. 直接记录绝对路径后,出现“文件没找到”时就能立刻定位到真实写入位置。
logger.info(f"server_864 本地状态已写入: {os.path.abspath(normalized_path)}")
except Exception as e:
logger.warning(f"写入 server_864 本地状态失败: path={state_path}, error={e}")
async def _set_runtime_running(self, running: bool, *, on_runtime_state_change: AsyncCallback | None, logger) -> None:
self._runtime_running = running
if on_runtime_state_change:
await self._safe_callback(
on_runtime_state_change,
running,
logger=logger,
callback_name="on_runtime_state_change",
)
async def _safe_callback(self, callback: AsyncCallback | None, *args: Any, logger, callback_name: str) -> None:
if callback is None:
return
try:
await callback(*args)
except Exception as e:
logger.exception(f"执行回调失败: {callback_name}, error: {e}")
def get_login_identity(self) -> dict[str, Any]:
"""返回统一登录身份结构。"""
return {
"wxid": self.wxid,
"nickname": self.nickname,
"alias": self.alias,
"phone": self.phone,
"signature": getattr(self, "signature", ""),
"device_name": "",
"device_id": "",
}