Files
abot/wechat_ipad/providers/server_864/runtime.py

402 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import os
import time
from typing import Any, Awaitable, Callable
import toml
AsyncCallback = Callable[..., Awaitable[None]]
class Server864RuntimeMixin:
"""864 provider 的运行时编排。
设计说明:
1. 864 的差异重点在于“固定 key + server 侧维护更多登录状态”;
2. 因此这里不再照搬 855 的心跳/长心跳双循环,而是采用“登录确认 + 初始化等待 + HTTP 消息轮询”;
3. 这样能先把现有 Robot 主链路无感切到 864后续若要补 WS 监听也只需在本目录内演进。
"""
def _init_runtime_state(self) -> None:
self._runtime_running = False
def stop_runtime(self) -> None:
self._runtime_running = False
def is_runtime_running(self) -> bool:
return bool(getattr(self, "_runtime_running", False))
async def run_runtime(
self,
*,
ipad_config: dict,
state_path: str,
logger,
on_login_ready: AsyncCallback,
on_history_message: AsyncCallback,
on_message: AsyncCallback,
on_idle_payload: AsyncCallback | None = None,
on_logout: AsyncCallback | None = None,
on_runtime_state_change: AsyncCallback | None = None,
on_login_qr_update: AsyncCallback | None = None,
on_login_qr_cleared: AsyncCallback | None = None,
) -> None:
"""启动 864 provider 的运行时主循环。"""
del on_history_message
server_key = str(ipad_config.get("server_key", "") or "").strip()
if not server_key:
raise ValueError("server_864 启动失败:缺少 server_key请在 .env 中配置 WECHAT_SERVER_KEY")
self.server_key = server_key
login_qr_api = str(ipad_config.get("login_qr_api", "new_x") or "new_x").strip()
login_way = str(ipad_config.get("login_way", "mac") or "mac").strip()
await self._ensure_login(
ipad_config=ipad_config,
state_path=state_path,
logger=logger,
on_login_qr_update=on_login_qr_update,
on_login_qr_cleared=on_login_qr_cleared,
login_qr_api=login_qr_api,
login_way=login_way,
)
await on_login_ready(self.get_login_identity())
logger.info("server_864 登录成功")
await self._set_runtime_running(True, on_runtime_state_change=on_runtime_state_change, logger=logger)
try:
logger.info("开始处理 server_864 消息轮询")
while self.is_runtime_running():
try:
data_temp = await self.sync_message()
except Exception as e:
logger.error(f"server_864 获取新消息失败: {e}")
await self._safe_callback(on_logout, str(e), logger=logger, callback_name="on_logout")
await self._set_runtime_running(False, on_runtime_state_change=on_runtime_state_change, logger=logger)
break
data = data_temp.get("AddMsgs") or []
if data:
for message in data:
await self._safe_callback(on_message, message, logger=logger, callback_name="on_message")
elif on_idle_payload:
await self._safe_callback(
on_idle_payload,
data_temp,
logger=logger,
callback_name="on_idle_payload",
)
await asyncio.sleep(2)
finally:
await self._set_runtime_running(False, on_runtime_state_change=on_runtime_state_change, logger=logger)
async def _ensure_login(
self,
*,
ipad_config: dict,
state_path: str,
logger,
on_login_qr_update: AsyncCallback | None = None,
on_login_qr_cleared: AsyncCallback | None = None,
login_qr_api: str = "new_x",
login_way: str = "mac",
) -> None:
"""确保 864 已完成登录。"""
if await self.is_logged_in():
await self._refresh_identity_from_profile(logger=logger)
await self._safe_callback(
on_login_qr_cleared,
{
"status": "logged_in",
"status_text": "已检测到现有登录态",
"provider_name": "server_864",
"provider_stage": "logged_in",
"connection_ready": True,
"login_required": False,
},
logger=logger,
callback_name="on_login_qr_cleared",
)
return
# 先探测一次 864 当前阶段,让 Dashboard 能直接区分“等服务端准备”和“需要扫码”:
# 1. 864 的未登录态并不只有一种,部分场景其实是远端连接对象还没建好;
# 2. 若首页始终只显示“未登录”,运维很难判断下一步是等服务端还是去扫码;
# 3. 这里把差异压缩成轻量阶段字段,供前端直接展示,不改动核心登录流程。
login_stage_snapshot = await self._probe_login_stage()
await self._safe_callback(
on_login_qr_update,
login_stage_snapshot,
logger=logger,
callback_name="on_login_qr_update",
)
uuid, url = await self.get_qr_code(print_qr=True, login_qr_api=login_qr_api, login_way=login_way)
scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else ""
await self._safe_callback(
on_login_qr_update,
{
"uuid": uuid,
"url": url,
"scan_url": scan_url,
"expires_in": None,
"status": "waiting",
"status_text": "等待扫码登录",
"login_source": "fresh_qr",
"provider_name": "server_864",
"provider_stage": "waiting_scan",
"connection_ready": False,
"login_required": True,
"login_qr_api": login_qr_api,
"login_way": login_way,
},
logger=logger,
callback_name="on_login_qr_update",
)
while True:
is_logged_in, login_status = await self.check_login_status()
if is_logged_in:
await self._safe_callback(
on_login_qr_cleared,
{
"status": "confirmed",
"status_text": "扫码登录成功",
"uuid": uuid,
"provider_name": "server_864",
"provider_stage": "logged_in",
"connection_ready": True,
"login_required": False,
},
logger=logger,
callback_name="on_login_qr_cleared",
)
break
# 864 的登录状态查询会回传当前 uuid 和有效期:
# 1. 真实联调中已确认 `CheckLoginStatus` 会返回 `uuid/effective_time`
# 2. 这些值应优先作为 Dashboard 的二维码倒计时与当前扫码目标来源;
# 3. 一旦 server 侧切换了新的 uuid这里也要及时覆盖本地展示态避免前端一直盯着旧码。
latest_uuid = str(login_status.get("uuid", "") or uuid).strip() or uuid
effective_time = int(login_status.get("effective_time", 0) or 0)
verification_url = str(login_status.get("verification_url", "") or "").strip()
if latest_uuid != uuid:
uuid = latest_uuid
scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else ""
url = f"https://api.2dcode.biz/v1/create-qr-code?data={scan_url}" if scan_url else url
provider_stage = "verification_required" if verification_url else ("waiting_scan" if uuid else "login_required")
# 864 在“已扫码但待安全验证”阶段,`msg/loginState` 往往是空字符串:
# 1. 若直接套用默认“等待扫码登录”,用户会误以为还没扫上;
# 2. 因此这里优先识别 `verification_url`,给 Dashboard 一个更准确的引导文案;
# 3. 只有完全拿不到状态提示时,才回退到普通扫码等待文案。
raw_status_text = str(login_status.get("msg") or login_status.get("loginState") or "").strip()
if verification_url and not raw_status_text:
status_text = "扫码已完成,请继续打开验证链接完成安全验证"
else:
status_text = raw_status_text or "等待扫码登录"
await self._safe_callback(
on_login_qr_update,
{
"uuid": uuid,
"url": url,
"scan_url": scan_url,
"expires_in": effective_time if effective_time > 0 else None,
"status": "waiting",
"status_text": status_text,
"login_source": "fresh_qr",
"provider_name": "server_864",
"provider_stage": provider_stage,
"connection_ready": False,
"login_required": True,
"verification_url": verification_url,
"login_qr_api": login_qr_api,
"login_way": login_way,
},
logger=logger,
callback_name="on_login_qr_update",
)
# 若 server 已明确告知二维码失效,则立即重新申请一张新码:
# 1. 这能避免 Dashboard 一直展示一张已经不可扫的旧二维码;
# 2. 也能让新环境登录时的交互与 855 保持一致,都是“过期就自动刷新”;
# 3. 重新申请后直接回到当前 while 顶部继续轮询新的 uuid 状态。
if effective_time <= 0:
uuid, url = await self.get_qr_code(print_qr=True, login_qr_api=login_qr_api, login_way=login_way)
scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else ""
await self._safe_callback(
on_login_qr_update,
{
"uuid": uuid,
"url": url,
"scan_url": scan_url,
"expires_in": None,
"status": "waiting",
"status_text": "二维码已刷新,等待扫码登录",
"login_source": "refresh_qr",
"provider_name": "server_864",
"provider_stage": "waiting_scan",
"connection_ready": False,
"login_required": True,
"login_qr_api": login_qr_api,
"login_way": login_way,
},
logger=logger,
callback_name="on_login_qr_update",
)
await asyncio.sleep(5)
await self._wait_init_ready(logger=logger)
await self._refresh_identity_from_profile(logger=logger)
ipad_config["wxid"] = self.wxid
ipad_config["login_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self._save_runtime_state(
state_path=state_path,
state_payload={"wxid": self.wxid, "login_time": ipad_config["login_time"]},
logger=logger,
)
async def _probe_login_stage(self) -> dict[str, Any]:
"""探测 864 当前登录阶段,供 Dashboard 展示更准确的运维状态。"""
default_payload = {
"status": "waiting",
"status_text": "等待扫码登录",
"login_source": "fresh_qr",
"provider_name": "server_864",
"provider_stage": "login_required",
"connection_ready": False,
"login_required": True,
}
try:
login_status = await self.get_login_status(auto_login=False)
except Exception as e:
error_message = str(e).strip()
if "该链接不存在" in error_message:
return {
**default_payload,
"status_text": "864 服务端连接尚未建立,等待创建登录连接",
"provider_stage": "connection_pending",
"login_required": False,
}
if "重新登录" in error_message or "未登录" in error_message:
return {
**default_payload,
"status_text": error_message or "当前登录态已失效,等待重新扫码登录",
}
return {
**default_payload,
"status": "unavailable",
"status_text": error_message or "864 登录状态暂不可用",
"provider_stage": "status_unavailable",
}
if isinstance(login_status, dict):
login_message = str(
login_status.get("msg")
or login_status.get("message")
or login_status.get("loginState")
or ""
).strip()
if "在线" in login_message or str(login_status.get("loginState", "")).strip().lower() == "online":
return {
"status": "logged_in",
"status_text": "已检测到现有登录态",
"login_source": "runtime_state",
"provider_name": "server_864",
"provider_stage": "logged_in",
"connection_ready": True,
"login_required": False,
}
if "重新登录" in login_message or "未登录" in login_message:
return {
**default_payload,
"status_text": login_message or "当前登录态已失效,等待重新扫码登录",
}
return default_payload
async def _wait_init_ready(self, *, logger) -> None:
"""等待 864 server 侧初始化完成。"""
for _ in range(30):
try:
if await self.get_init_status():
return
except Exception as e:
logger.warning(f"server_864 检查初始化状态失败: {e}")
await asyncio.sleep(2)
async def _refresh_identity_from_profile(self, *, logger) -> None:
"""从 864 的资料接口刷新当前登录身份。"""
profile = await self.get_profile()
self.wxid = str(
profile.get("UserName")
or profile.get("userName")
or profile.get("Wxid")
or profile.get("wxid")
or self.wxid
)
nickname = profile.get("NickName") or profile.get("nickName") or profile.get("Nickname") or ""
if isinstance(nickname, dict):
nickname = nickname.get("string", "")
alias = profile.get("Alias") or profile.get("alias") or ""
phone = profile.get("Mobile") or profile.get("mobile") or profile.get("BindMobile") or ""
if isinstance(phone, dict):
phone = phone.get("string", "")
signature = profile.get("Signature") or profile.get("signature") or ""
self.nickname = str(nickname or "")
self.alias = str(alias or "")
self.phone = str(phone or "")
self.signature = str(signature or "")
logger.info(
f"server_864 登录账号信息: wxid: {self.wxid} 昵称: {self.nickname} 微信号: {self.alias} 手机号: {self.phone}"
)
@staticmethod
def _save_runtime_state(*, state_path: str, state_payload: dict[str, Any], logger) -> None:
"""保存 864 provider 的本地登录缓存。"""
try:
normalized_path = str(state_path or "").strip()
if not normalized_path:
return
state_dir = os.path.dirname(normalized_path)
if state_dir:
os.makedirs(state_dir, exist_ok=True)
with open(normalized_path, "w", encoding="utf-8") as f:
toml.dump(state_payload, f)
except Exception as e:
logger.warning(f"写入 server_864 本地状态失败: path={state_path}, error={e}")
async def _set_runtime_running(self, running: bool, *, on_runtime_state_change: AsyncCallback | None, logger) -> None:
self._runtime_running = running
if on_runtime_state_change:
await self._safe_callback(
on_runtime_state_change,
running,
logger=logger,
callback_name="on_runtime_state_change",
)
async def _safe_callback(self, callback: AsyncCallback | None, *args: Any, logger, callback_name: str) -> None:
if callback is None:
return
try:
await callback(*args)
except Exception as e:
logger.exception(f"执行回调失败: {callback_name}, error: {e}")
def get_login_identity(self) -> dict[str, Any]:
"""返回统一登录身份结构。"""
return {
"wxid": self.wxid,
"nickname": self.nickname,
"alias": self.alias,
"phone": self.phone,
"signature": getattr(self, "signature", ""),
"device_name": "",
"device_id": "",
}