Files
abot/wechat_ipad/providers/server_864/runtime.py

620 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import os
import re
import time
from typing import Any, Awaitable, Callable
import toml
AsyncCallback = Callable[..., Awaitable[None]]
class Server864RuntimeMixin:
"""864 provider 的运行时编排。
设计说明:
1. 864 的差异重点在于“固定 key + server 侧维护更多登录状态”;
2. 因此这里不再照搬 855 的心跳/长心跳双循环,而是采用“登录确认 + 初始化等待 + HTTP 消息轮询”;
3. 这样能先把现有 Robot 主链路无感切到 864后续若要补 WS 监听也只需在本目录内演进。
"""
def _init_runtime_state(self) -> None:
self._runtime_running = False
def stop_runtime(self) -> None:
self._runtime_running = False
def is_runtime_running(self) -> bool:
return bool(getattr(self, "_runtime_running", False))
@staticmethod
def _normalize_login_runtime_message(message: str) -> str:
"""把 864 登录阶段抛出的原始错误整理成更适合前端展示的短文本。"""
raw_message = str(message or "").strip()
if not raw_message:
return "864 登录流程发生异常,请查看服务端日志"
# 864 某些失败场景会把整段 XML 直接塞进错误信息里:
# 1. 终端日志能看到完整 XML但前端直接展示这段原文会非常难读
# 2. 这里优先提取 `<Content><![CDATA[...]]></Content>` 里的真正提示语;
# 3. 这样像“客户端版本过低”这种关键原因可以直接显示给运维人员。
content_match = re.search(r"<Content><!\[CDATA\[(.*?)\]\]></Content>", raw_message, flags=re.S)
if content_match:
extracted = str(content_match.group(1) or "").strip()
if extracted:
raw_message = extracted
# 把换行和多余空格压平,避免弹窗里出现大段不可读排版。
normalized = " ".join(part for part in re.split(r"\s+", raw_message) if part)
if "客户端版本过低" in normalized:
return f"{normalized} 请更换更高版本的 864 client/server 组合后再登录。"
return normalized
@staticmethod
def _normalize_post_scan_failure_message(message: str) -> str:
"""把扫码成功后的收口阶段错误翻译成更接近真实原因的提示。"""
normalized = Server864RuntimeMixin._normalize_login_runtime_message(message)
if "该链接不存在" in normalized:
# 864 在扫码成功后如果 ManualAuth 失败,会直接把长连接停掉:
# 1. ABOT 随后再调 `GetInItStatus/GetProfile` 时,就只会看到“该链接不存在”;
# 2. 但这并不是根因,根因往往已经出现在服务端控制台,例如“当前客户端版本过低”;
# 3. 因此前端这里给出更贴近真实情况的兼容提示,帮助用户把注意力放到版本兼容上。
return (
"扫码成功后服务端登录收口失败,连接已被主动关闭。"
"这通常表示当前 864 client/server 版本组合不兼容;"
"若服务端日志已出现“当前客户端版本过低”,请升级到更高版本后再登录。"
)
return normalized
async def run_runtime(
self,
*,
ipad_config: dict,
state_path: str,
logger,
on_login_ready: AsyncCallback,
on_history_message: AsyncCallback,
on_message: AsyncCallback,
on_idle_payload: AsyncCallback | None = None,
on_logout: AsyncCallback | None = None,
on_runtime_state_change: AsyncCallback | None = None,
on_login_qr_update: AsyncCallback | None = None,
on_login_qr_cleared: AsyncCallback | None = None,
) -> None:
"""启动 864 provider 的运行时主循环。"""
del on_history_message
server_key = str(ipad_config.get("server_key", "") or "").strip()
if not server_key:
raise ValueError("server_864 启动失败:缺少 server_key请在 .env 中配置 WECHAT_SERVER_KEY")
self.server_key = server_key
login_qr_api = str(ipad_config.get("login_qr_api", "new_x") or "new_x").strip()
login_way = str(ipad_config.get("login_way", "mac") or "mac").strip()
await self._ensure_login(
ipad_config=ipad_config,
state_path=state_path,
logger=logger,
on_login_qr_update=on_login_qr_update,
on_login_qr_cleared=on_login_qr_cleared,
login_qr_api=login_qr_api,
login_way=login_way,
)
login_identity = self.get_login_identity()
# 在 provider 侧再做一次硬校验:
# 1. `_ensure_login()` 已经负责等待真正可用的登录态,但运行期代码后续可能继续演进;
# 2. 这里补一道收口保护,可以避免未来某次重构把“空身份”再次漏进 Robot 业务层;
# 3. 一旦这里失败,说明当前 provider 还没有拿到稳定账号身份,必须继续停留在登录阶段。
if not self._has_login_identity(login_identity):
raise RuntimeError("server_864 登录流程未拿到可用账号身份,已阻止进入后台业务初始化")
await on_login_ready(login_identity)
logger.info("server_864 登录成功")
await self._set_runtime_running(True, on_runtime_state_change=on_runtime_state_change, logger=logger)
try:
logger.info("开始处理 server_864 消息轮询")
while self.is_runtime_running():
try:
data_temp = await self.sync_message()
except Exception as e:
logger.error(f"server_864 获取新消息失败: {e}")
await self._safe_callback(on_logout, str(e), logger=logger, callback_name="on_logout")
await self._set_runtime_running(False, on_runtime_state_change=on_runtime_state_change, logger=logger)
break
data = data_temp.get("AddMsgs") or []
if data:
for message in data:
await self._safe_callback(on_message, message, logger=logger, callback_name="on_message")
elif on_idle_payload:
await self._safe_callback(
on_idle_payload,
data_temp,
logger=logger,
callback_name="on_idle_payload",
)
await asyncio.sleep(2)
finally:
await self._set_runtime_running(False, on_runtime_state_change=on_runtime_state_change, logger=logger)
async def _ensure_login(
self,
*,
ipad_config: dict,
state_path: str,
logger,
on_login_qr_update: AsyncCallback | None = None,
on_login_qr_cleared: AsyncCallback | None = None,
login_qr_api: str = "new_x",
login_way: str = "mac",
) -> None:
"""确保 864 已完成登录。"""
if await self.is_logged_in():
identity_ready = await self._refresh_identity_from_profile(logger=logger)
if not identity_ready:
raise RuntimeError("当前未拿到可用账号身份,请重新扫码登录")
# 864 在“服务端已经在线、ABOT 只是后启动”的场景下会直接走这里:
# 1. 之前这条分支只清理二维码态,没有补写本地 runtime_state
# 2. 这会让用户误以为“明明已经登录成功,却没有生成 provider 状态文件”;
# 3. 因此这里也统一补一次落盘,让首次接管现有登录态与扫码新登录的结果保持一致。
ipad_config["wxid"] = self.wxid
ipad_config["login_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self._save_runtime_state(
state_path=state_path,
state_payload={"wxid": self.wxid, "login_time": ipad_config["login_time"]},
logger=logger,
)
await self._safe_callback(
on_login_qr_cleared,
{
"status": "logged_in",
"status_text": "已检测到现有登录态",
"provider_name": "server_864",
"provider_stage": "logged_in",
"connection_ready": True,
"login_required": False,
},
logger=logger,
callback_name="on_login_qr_cleared",
)
return
while True:
# 这里把“申请二维码 -> 轮询扫码 -> 登录收口”包成一轮完整会话:
# 1. 864 某些失败不是用户操作问题,而是服务端在扫码后主动中断当前连接;
# 2. 对这类场景,最合理的行为不是让线程退出,而是把错误留在前端并回到下一轮扫码;
# 3. 这样可以满足“没登录成功就不要进入后台,但页面持续引导重新登录”的产品预期。
login_stage_snapshot = await self._probe_login_stage()
await self._safe_callback(
on_login_qr_update,
login_stage_snapshot,
logger=logger,
callback_name="on_login_qr_update",
)
uuid, url = await self.get_qr_code(print_qr=True, login_qr_api=login_qr_api, login_way=login_way)
scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else ""
effective_time = 0
raw_state = 0
await self._safe_callback(
on_login_qr_update,
{
"uuid": uuid,
"url": url,
"scan_url": scan_url,
"expires_in": None,
"status": "waiting",
"status_text": "等待扫码登录",
"login_source": "fresh_qr",
"provider_name": "server_864",
"provider_stage": "waiting_scan",
"connection_ready": False,
"login_required": True,
"login_qr_api": login_qr_api,
"login_way": login_way,
},
logger=logger,
callback_name="on_login_qr_update",
)
login_completed = False
while True:
try:
is_logged_in, login_status = await self.check_login_status()
except Exception as e:
error_message = self._normalize_login_runtime_message(str(e))
# 轮询阶段失败时只更新前端,不中断 provider 主线程:
# 1. 线上最常见的是本轮登录连接已被服务端丢弃,继续崩线程只会让运维反复重启;
# 2. 这里先把明确错误展示给前端,再回到外层重新申请二维码;
# 3. 这样用户在新环境里可以直接继续扫码,不需要手工重启 ABOT。
await self._safe_callback(
on_login_qr_update,
{
"uuid": uuid,
"url": url,
"scan_url": scan_url,
"status": "unavailable",
"status_text": error_message,
"login_source": "fresh_qr",
"provider_name": "server_864",
"provider_stage": "status_unavailable",
"connection_ready": False,
"login_required": True,
"login_qr_api": login_qr_api,
"login_way": login_way,
},
logger=logger,
callback_name="on_login_qr_update",
)
logger.warning(f"server_864 登录状态轮询失败,本轮二维码作废并等待重新登录: {error_message}")
await asyncio.sleep(2)
break
if is_logged_in:
await self._safe_callback(
on_login_qr_update,
{
"uuid": uuid,
"url": url,
"scan_url": scan_url,
"expires_in": effective_time if effective_time > 0 else None,
"status": "confirmed",
# 864 进入 `state=2` 后,已经拿到了 wxid / wxnewpass 等关键登录信息:
# 1. 这说明扫码本身已经成功,但并不代表 ABOT 后续初始化已经全部完成;
# 2. 若这里直接关闭弹窗,后面的版本过低/初始化失败就无法同步给前端;
# 3. 因此先保持弹窗可见,把阶段切到“登录收口中”,等真正初始化完成后再清理。
"status_text": "扫码成功,等待服务端完成登录",
"login_source": "fresh_qr",
"provider_name": "server_864",
"provider_stage": "login_finalizing",
"connection_ready": True,
"login_required": False,
"raw_state": max(raw_state, 2),
"login_qr_api": login_qr_api,
"login_way": login_way,
},
logger=logger,
callback_name="on_login_qr_update",
)
login_completed = True
break
# 864 的登录状态查询会回传当前 uuid 和有效期:
# 1. 真实联调中已确认 `CheckLoginStatus` 会返回 `uuid/effective_time`
# 2. 这些值应优先作为 Dashboard 的二维码倒计时与当前扫码目标来源;
# 3. 一旦 server 侧切换了新的 uuid这里也要及时覆盖本地展示态避免前端一直盯着旧码。
latest_uuid = str(login_status.get("uuid", "") or uuid).strip() or uuid
effective_time = int(login_status.get("effective_time", 0) or 0)
raw_state = int(login_status.get("raw_state", login_status.get("state", 0)) or 0)
verification_url = str(login_status.get("verification_url", "") or "").strip()
nick_name = str(login_status.get("nick_name", "") or "").strip()
head_img_url = str(login_status.get("head_img_url", "") or "").strip()
if latest_uuid != uuid:
uuid = latest_uuid
scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else ""
url = f"https://api.2dcode.biz/v1/create-qr-code?data={scan_url}" if scan_url else url
# 864 的 `CheckLoginStatus` 存在一类“状态缓存已终态,但服务端连接没有继续推进”的情况:
# 1. 实测里扫码并完成安全验证后,接口可能停在 `state=4 + VerificationUrl`,前端会一直看见旧状态;
# 2. 结合服务端源码,这类 `state=4` 更接近终态/失效态,而不是可继续等待的中间态;
# 3. 因此这里把它单独识别出来,避免 Dashboard 长时间卡在“等待安全验证”的旧文案。
raw_status_text = str(login_status.get("msg") or login_status.get("loginState") or "").strip()
if raw_state == 0:
# 864 有时会把上一轮登录留下的 VerificationUrl 一起带回来:
# 1. 但当 `raw_state=0` 时,真实含义仍然是“当前二维码等待扫码”;
# 2. 若继续优先展示旧验证链接,前端就会出现“明明没扫却提示去安全验证”的误导;
# 3. 因此这里明确以原始状态为准,并主动忽略这类陈旧 VerificationUrl。
provider_stage = "waiting_scan" if uuid else "login_required"
status = "waiting"
status_text = raw_status_text or "等待扫码登录"
verification_url = ""
nick_name = ""
head_img_url = ""
elif raw_state == 4:
provider_stage = "login_required"
status = "expired"
if verification_url:
status_text = "安全验证链路已结束,但服务端未完成登录收口,正在准备刷新二维码"
else:
status_text = raw_status_text or "二维码状态已结束,正在准备刷新二维码"
elif raw_state == 1:
# 864 在 `state=1` 时通常已经识别出扫码账号,但还没推进到最终登录完成:
# 1. 这时接口经常会同时回传昵称、头像以及 VerificationUrl
# 2. 但从用户体验上看,它更接近“已扫码,等待服务端确认”,不应该继续只提示去点链接;
# 3. 因此这里单独映射成可读阶段,方便 Dashboard 展示更准确的过程状态。
provider_stage = "scan_confirmed"
status = "waiting"
display_name = nick_name or "当前微信账号"
status_text = raw_status_text or f"已扫码:{display_name},等待服务端确认登录"
elif verification_url:
provider_stage = "verification_required"
status = "waiting"
if not raw_status_text:
status_text = "扫码已完成,请继续打开验证链接完成安全验证"
else:
status_text = raw_status_text
else:
provider_stage = "waiting_scan" if uuid else "login_required"
status = "waiting"
status_text = raw_status_text or "等待扫码登录"
if verification_url and raw_state not in {0, 4} and not raw_status_text:
status_text = "扫码已完成,请继续打开验证链接完成安全验证"
await self._safe_callback(
on_login_qr_update,
{
"uuid": uuid,
"url": url,
"scan_url": scan_url,
"expires_in": effective_time if effective_time > 0 else None,
"status": status,
"status_text": status_text,
"login_source": "fresh_qr",
"provider_name": "server_864",
"provider_stage": provider_stage,
"connection_ready": False,
"login_required": True,
"verification_url": verification_url,
"raw_state": raw_state,
"nick_name": nick_name,
"head_img_url": head_img_url,
"login_qr_api": login_qr_api,
"login_way": login_way,
},
logger=logger,
callback_name="on_login_qr_update",
)
# 若 server 已明确告知二维码失效或停在终态缓存,则立即结束本轮、回到外层申请新二维码:
# 1. 这能避免 Dashboard 一直展示一张已经不可扫的旧二维码;
# 2. 864 某些版本即使 `effective_time` 还大于 0也可能已经停在 `state=4` 终态缓存里;
# 3. 因此这里补充 `raw_state == 4` 的刷新条件,让前端能尽快得到新的登录入口。
if effective_time <= 0 or raw_state == 4:
await self._safe_callback(
on_login_qr_update,
{
"uuid": uuid,
"url": url,
"scan_url": scan_url,
"expires_in": None,
"status": "waiting",
"status_text": "二维码已失效,正在准备新二维码",
"login_source": "refresh_qr",
"provider_name": "server_864",
"provider_stage": "waiting_scan",
"connection_ready": False,
"login_required": True,
"login_qr_api": login_qr_api,
"login_way": login_way,
},
logger=logger,
callback_name="on_login_qr_update",
)
break
await asyncio.sleep(5)
if not login_completed:
continue
try:
await self._wait_init_ready(logger=logger)
identity_ready = await self._refresh_identity_from_profile(logger=logger)
if not identity_ready:
raise RuntimeError("扫码完成后未获取到可用账号身份,请重新扫码登录")
ipad_config["wxid"] = self.wxid
ipad_config["login_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self._save_runtime_state(
state_path=state_path,
state_payload={"wxid": self.wxid, "login_time": ipad_config["login_time"]},
logger=logger,
)
return
except Exception as e:
error_message = self._normalize_post_scan_failure_message(str(e))
# 扫码成功后的收口阶段同样需要把异常同步给前端:
# 1. 这类错误往往发生在“state=2 之后但正式进入业务前”,最容易被误判成前端没刷新;
# 2. 例如当前用户遇到的“客户端版本过低”就是在这个阶段由服务端主动断开;
# 3. 这里不再直接抛异常退出,而是把错误留在弹窗里并继续等待下一轮扫码。
await self._safe_callback(
on_login_qr_update,
{
"uuid": uuid,
"url": url,
"scan_url": scan_url,
"status": "unavailable",
"status_text": error_message,
"login_source": "fresh_qr",
"provider_name": "server_864",
"provider_stage": "status_unavailable",
"connection_ready": False,
"login_required": True,
"raw_state": max(raw_state, 2),
"login_qr_api": login_qr_api,
"login_way": login_way,
},
logger=logger,
callback_name="on_login_qr_update",
)
logger.warning(f"server_864 登录收口失败,继续停留在扫码引导态等待重新登录: {error_message}")
await asyncio.sleep(2)
@staticmethod
def _has_login_identity(login_identity: dict[str, Any] | None) -> bool:
"""判断当前 provider 是否已经拿到可用于进入业务层的账号身份。"""
payload = dict(login_identity or {})
# 这里把“可用身份”的标准压到最小:
# 1. 864 有些接口可能先拿到昵称、后拿到 wxid也可能顺序相反
# 2. 但两者都为空时,业务层无法知道当前到底是谁登录了;
# 3. 因此至少要拿到 `wxid` 或 `nickname` 之一,才允许继续进入 Robot 初始化。
return bool(str(payload.get("wxid", "") or "").strip() or str(payload.get("nickname", "") or "").strip())
async def _probe_login_stage(self) -> dict[str, Any]:
"""探测 864 当前登录阶段,供 Dashboard 展示更准确的运维状态。"""
default_payload = {
"status": "waiting",
"status_text": "等待扫码登录",
"login_source": "fresh_qr",
"provider_name": "server_864",
"provider_stage": "login_required",
"connection_ready": False,
"login_required": True,
}
try:
login_status = await self.get_login_status(auto_login=False)
except Exception as e:
error_message = str(e).strip()
if "该链接不存在" in error_message:
return {
**default_payload,
"status_text": "864 服务端连接尚未建立,等待创建登录连接",
"provider_stage": "connection_pending",
"login_required": False,
}
if "重新登录" in error_message or "未登录" in error_message:
return {
**default_payload,
"status_text": error_message or "当前登录态已失效,等待重新扫码登录",
}
return {
**default_payload,
"status": "unavailable",
"status_text": error_message or "864 登录状态暂不可用",
"provider_stage": "status_unavailable",
}
if isinstance(login_status, dict):
login_message = str(
login_status.get("msg")
or login_status.get("message")
or login_status.get("loginState")
or ""
).strip()
if "在线" in login_message or str(login_status.get("loginState", "")).strip().lower() == "online":
return {
"status": "logged_in",
"status_text": "已检测到现有登录态",
"login_source": "runtime_state",
"provider_name": "server_864",
"provider_stage": "logged_in",
"connection_ready": True,
"login_required": False,
}
if "重新登录" in login_message or "未登录" in login_message:
return {
**default_payload,
"status_text": login_message or "当前登录态已失效,等待重新扫码登录",
}
return default_payload
async def _wait_init_ready(self, *, logger) -> None:
"""等待 864 server 侧初始化完成。"""
for _ in range(30):
try:
if await self.get_init_status():
return
except Exception as e:
error_message = str(e).strip()
# 864 不同版本对“初始化状态”接口的支持并不一致:
# 1. 实际联调里已经出现“登录成功后消息同步正常,但 GetInItStatus 返回该链接不存在”的情况;
# 2. 这说明它更像一个附加探针,而不是启动主链路的硬前置条件;
# 3. 因此遇到这种兼容性错误时直接放行,避免机器人已经可用却持续打印误导性告警。
if "该链接不存在" in error_message:
logger.info("server_864 当前版本未提供稳定的初始化状态查询,跳过该探针并继续启动")
return
logger.warning(f"server_864 检查初始化状态失败: {error_message}")
await asyncio.sleep(2)
async def _refresh_identity_from_profile(self, *, logger) -> bool:
"""从 864 的资料接口刷新当前登录身份。"""
try:
profile = await self.get_profile()
except Exception as e:
error_message = str(e).strip()
# 864 有些版本在消息链路可用后,资料接口仍可能短时间不可用:
# 1. 此时若直接抛异常,会让“已经登录成功”的启动流程被资料查询反向拖垮;
# 2. 但如果当前连 `wxid/nickname` 都没有,就不能再假装“已经有可用身份”;
# 3. 因此这里返回布尔值,由上层决定是继续使用已知身份,还是把登录流程判定为失败。
if self.wxid or self.nickname:
logger.warning(f"server_864 刷新登录账号资料失败,继续沿用当前已获取身份: {error_message}")
return True
logger.warning(f"server_864 刷新登录账号资料失败,当前尚未拿到可用账号身份: {error_message}")
return False
self.wxid = str(
profile.get("UserName")
or profile.get("userName")
or profile.get("Wxid")
or profile.get("wxid")
or self.wxid
)
nickname = profile.get("NickName") or profile.get("nickName") or profile.get("Nickname") or ""
if isinstance(nickname, dict):
nickname = nickname.get("string", "")
alias = profile.get("Alias") or profile.get("alias") or ""
phone = profile.get("Mobile") or profile.get("mobile") or profile.get("BindMobile") or ""
if isinstance(phone, dict):
phone = phone.get("string", "")
signature = profile.get("Signature") or profile.get("signature") or ""
self.nickname = str(nickname or "")
self.alias = str(alias or "")
self.phone = str(phone or "")
self.signature = str(signature or "")
logger.info(
f"server_864 登录账号信息: wxid: {self.wxid} 昵称: {self.nickname} 微信号: {self.alias} 手机号: {self.phone}"
)
return bool(self.wxid or self.nickname)
@staticmethod
def _save_runtime_state(*, state_path: str, state_payload: dict[str, Any], logger) -> None:
"""保存 864 provider 的本地登录缓存。"""
try:
normalized_path = str(state_path or "").strip()
if not normalized_path:
return
state_dir = os.path.dirname(normalized_path)
if state_dir:
os.makedirs(state_dir, exist_ok=True)
with open(normalized_path, "w", encoding="utf-8") as f:
toml.dump(state_payload, f)
# 这里显式打印最终落盘路径,方便线上排查“文件写到哪里去了”:
# 1. runtime_state 当前使用相对路径,最终位置会受启动工作目录影响;
# 2. 用户在切到 Docker / Windows 服务 / 脚本托管时,最容易在这里产生认知偏差;
# 3. 直接记录绝对路径后,出现“文件没找到”时就能立刻定位到真实写入位置。
logger.info(f"server_864 本地状态已写入: {os.path.abspath(normalized_path)}")
except Exception as e:
logger.warning(f"写入 server_864 本地状态失败: path={state_path}, error={e}")
async def _set_runtime_running(self, running: bool, *, on_runtime_state_change: AsyncCallback | None, logger) -> None:
self._runtime_running = running
if on_runtime_state_change:
await self._safe_callback(
on_runtime_state_change,
running,
logger=logger,
callback_name="on_runtime_state_change",
)
async def _safe_callback(self, callback: AsyncCallback | None, *args: Any, logger, callback_name: str) -> None:
if callback is None:
return
try:
await callback(*args)
except Exception as e:
logger.exception(f"执行回调失败: {callback_name}, error: {e}")
def get_login_identity(self) -> dict[str, Any]:
"""返回统一登录身份结构。"""
return {
"wxid": self.wxid,
"nickname": self.nickname,
"alias": self.alias,
"phone": self.phone,
"signature": getattr(self, "signature", ""),
"device_name": "",
"device_id": "",
}