377 lines
16 KiB
Python
377 lines
16 KiB
Python
import asyncio
|
||
import os
|
||
import time
|
||
from typing import Any, Awaitable, Callable
|
||
|
||
import toml
|
||
|
||
|
||
AsyncCallback = Callable[..., Awaitable[None]]
|
||
|
||
|
||
class Server864RuntimeMixin:
|
||
"""864 provider 的运行时编排。
|
||
|
||
设计说明:
|
||
1. 864 的差异重点在于“固定 key + server 侧维护更多登录状态”;
|
||
2. 因此这里不再照搬 855 的心跳/长心跳双循环,而是采用“登录确认 + 初始化等待 + HTTP 消息轮询”;
|
||
3. 这样能先把现有 Robot 主链路无感切到 864,后续若要补 WS 监听也只需在本目录内演进。
|
||
"""
|
||
|
||
def _init_runtime_state(self) -> None:
|
||
self._runtime_running = False
|
||
|
||
def stop_runtime(self) -> None:
|
||
self._runtime_running = False
|
||
|
||
def is_runtime_running(self) -> bool:
|
||
return bool(getattr(self, "_runtime_running", False))
|
||
|
||
async def run_runtime(
|
||
self,
|
||
*,
|
||
ipad_config: dict,
|
||
state_path: str,
|
||
logger,
|
||
on_login_ready: AsyncCallback,
|
||
on_history_message: AsyncCallback,
|
||
on_message: AsyncCallback,
|
||
on_idle_payload: AsyncCallback | None = None,
|
||
on_logout: AsyncCallback | None = None,
|
||
on_runtime_state_change: AsyncCallback | None = None,
|
||
on_login_qr_update: AsyncCallback | None = None,
|
||
on_login_qr_cleared: AsyncCallback | None = None,
|
||
) -> None:
|
||
"""启动 864 provider 的运行时主循环。"""
|
||
del on_history_message
|
||
server_key = str(ipad_config.get("server_key", "") or "").strip()
|
||
if not server_key:
|
||
raise ValueError("server_864 启动失败:缺少 server_key,请在 .env 中配置 WECHAT_SERVER_KEY")
|
||
self.server_key = server_key
|
||
|
||
await self._ensure_login(
|
||
ipad_config=ipad_config,
|
||
state_path=state_path,
|
||
logger=logger,
|
||
on_login_qr_update=on_login_qr_update,
|
||
on_login_qr_cleared=on_login_qr_cleared,
|
||
)
|
||
|
||
await on_login_ready(self.get_login_identity())
|
||
logger.info("server_864 登录成功")
|
||
|
||
await self._set_runtime_running(True, on_runtime_state_change=on_runtime_state_change, logger=logger)
|
||
|
||
try:
|
||
logger.info("开始处理 server_864 消息轮询")
|
||
while self.is_runtime_running():
|
||
try:
|
||
data_temp = await self.sync_message()
|
||
except Exception as e:
|
||
logger.error(f"server_864 获取新消息失败: {e}")
|
||
await self._safe_callback(on_logout, str(e), logger=logger, callback_name="on_logout")
|
||
await self._set_runtime_running(False, on_runtime_state_change=on_runtime_state_change, logger=logger)
|
||
break
|
||
|
||
data = data_temp.get("AddMsgs") or []
|
||
if data:
|
||
for message in data:
|
||
await self._safe_callback(on_message, message, logger=logger, callback_name="on_message")
|
||
elif on_idle_payload:
|
||
await self._safe_callback(
|
||
on_idle_payload,
|
||
data_temp,
|
||
logger=logger,
|
||
callback_name="on_idle_payload",
|
||
)
|
||
|
||
await asyncio.sleep(2)
|
||
finally:
|
||
await self._set_runtime_running(False, on_runtime_state_change=on_runtime_state_change, logger=logger)
|
||
|
||
async def _ensure_login(
|
||
self,
|
||
*,
|
||
ipad_config: dict,
|
||
state_path: str,
|
||
logger,
|
||
on_login_qr_update: AsyncCallback | None = None,
|
||
on_login_qr_cleared: AsyncCallback | None = None,
|
||
) -> None:
|
||
"""确保 864 已完成登录。"""
|
||
if await self.is_logged_in():
|
||
await self._refresh_identity_from_profile(logger=logger)
|
||
await self._safe_callback(
|
||
on_login_qr_cleared,
|
||
{
|
||
"status": "logged_in",
|
||
"status_text": "已检测到现有登录态",
|
||
"provider_name": "server_864",
|
||
"provider_stage": "logged_in",
|
||
"connection_ready": True,
|
||
"login_required": False,
|
||
},
|
||
logger=logger,
|
||
callback_name="on_login_qr_cleared",
|
||
)
|
||
return
|
||
|
||
# 先探测一次 864 当前阶段,让 Dashboard 能直接区分“等服务端准备”和“需要扫码”:
|
||
# 1. 864 的未登录态并不只有一种,部分场景其实是远端连接对象还没建好;
|
||
# 2. 若首页始终只显示“未登录”,运维很难判断下一步是等服务端还是去扫码;
|
||
# 3. 这里把差异压缩成轻量阶段字段,供前端直接展示,不改动核心登录流程。
|
||
login_stage_snapshot = await self._probe_login_stage()
|
||
await self._safe_callback(
|
||
on_login_qr_update,
|
||
login_stage_snapshot,
|
||
logger=logger,
|
||
callback_name="on_login_qr_update",
|
||
)
|
||
|
||
uuid, url = await self.get_qr_code(print_qr=True)
|
||
scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else ""
|
||
await self._safe_callback(
|
||
on_login_qr_update,
|
||
{
|
||
"uuid": uuid,
|
||
"url": url,
|
||
"scan_url": scan_url,
|
||
"expires_in": None,
|
||
"status": "waiting",
|
||
"status_text": "等待扫码登录",
|
||
"login_source": "fresh_qr",
|
||
"provider_name": "server_864",
|
||
"provider_stage": "waiting_scan",
|
||
"connection_ready": False,
|
||
"login_required": True,
|
||
},
|
||
logger=logger,
|
||
callback_name="on_login_qr_update",
|
||
)
|
||
|
||
while True:
|
||
is_logged_in, login_status = await self.check_login_status()
|
||
if is_logged_in:
|
||
await self._safe_callback(
|
||
on_login_qr_cleared,
|
||
{
|
||
"status": "confirmed",
|
||
"status_text": "扫码登录成功",
|
||
"uuid": uuid,
|
||
"provider_name": "server_864",
|
||
"provider_stage": "logged_in",
|
||
"connection_ready": True,
|
||
"login_required": False,
|
||
},
|
||
logger=logger,
|
||
callback_name="on_login_qr_cleared",
|
||
)
|
||
break
|
||
|
||
# 864 的登录状态查询会回传当前 uuid 和有效期:
|
||
# 1. 真实联调中已确认 `CheckLoginStatus` 会返回 `uuid/effective_time`;
|
||
# 2. 这些值应优先作为 Dashboard 的二维码倒计时与当前扫码目标来源;
|
||
# 3. 一旦 server 侧切换了新的 uuid,这里也要及时覆盖本地展示态,避免前端一直盯着旧码。
|
||
latest_uuid = str(login_status.get("uuid", "") or uuid).strip() or uuid
|
||
effective_time = int(login_status.get("effective_time", 0) or 0)
|
||
if latest_uuid != uuid:
|
||
uuid = latest_uuid
|
||
scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else ""
|
||
url = f"https://api.2dcode.biz/v1/create-qr-code?data={scan_url}" if scan_url else url
|
||
|
||
await self._safe_callback(
|
||
on_login_qr_update,
|
||
{
|
||
"uuid": uuid,
|
||
"url": url,
|
||
"scan_url": scan_url,
|
||
"expires_in": effective_time if effective_time > 0 else None,
|
||
"status": "waiting",
|
||
"status_text": str(login_status.get("msg") or login_status.get("loginState") or "等待扫码登录"),
|
||
"login_source": "fresh_qr",
|
||
"provider_name": "server_864",
|
||
"provider_stage": "waiting_scan" if uuid else "login_required",
|
||
"connection_ready": False,
|
||
"login_required": True,
|
||
},
|
||
logger=logger,
|
||
callback_name="on_login_qr_update",
|
||
)
|
||
|
||
# 若 server 已明确告知二维码失效,则立即重新申请一张新码:
|
||
# 1. 这能避免 Dashboard 一直展示一张已经不可扫的旧二维码;
|
||
# 2. 也能让新环境登录时的交互与 855 保持一致,都是“过期就自动刷新”;
|
||
# 3. 重新申请后直接回到当前 while 顶部继续轮询新的 uuid 状态。
|
||
if effective_time <= 0:
|
||
uuid, url = await self.get_qr_code(print_qr=True)
|
||
scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else ""
|
||
await self._safe_callback(
|
||
on_login_qr_update,
|
||
{
|
||
"uuid": uuid,
|
||
"url": url,
|
||
"scan_url": scan_url,
|
||
"expires_in": None,
|
||
"status": "waiting",
|
||
"status_text": "二维码已刷新,等待扫码登录",
|
||
"login_source": "refresh_qr",
|
||
"provider_name": "server_864",
|
||
"provider_stage": "waiting_scan",
|
||
"connection_ready": False,
|
||
"login_required": True,
|
||
},
|
||
logger=logger,
|
||
callback_name="on_login_qr_update",
|
||
)
|
||
await asyncio.sleep(5)
|
||
|
||
await self._wait_init_ready(logger=logger)
|
||
await self._refresh_identity_from_profile(logger=logger)
|
||
ipad_config["wxid"] = self.wxid
|
||
ipad_config["login_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
self._save_runtime_state(
|
||
state_path=state_path,
|
||
state_payload={"wxid": self.wxid, "login_time": ipad_config["login_time"]},
|
||
logger=logger,
|
||
)
|
||
|
||
async def _probe_login_stage(self) -> dict[str, Any]:
|
||
"""探测 864 当前登录阶段,供 Dashboard 展示更准确的运维状态。"""
|
||
default_payload = {
|
||
"status": "waiting",
|
||
"status_text": "等待扫码登录",
|
||
"login_source": "fresh_qr",
|
||
"provider_name": "server_864",
|
||
"provider_stage": "login_required",
|
||
"connection_ready": False,
|
||
"login_required": True,
|
||
}
|
||
try:
|
||
login_status = await self.get_login_status(auto_login=False)
|
||
except Exception as e:
|
||
error_message = str(e).strip()
|
||
if "该链接不存在" in error_message:
|
||
return {
|
||
**default_payload,
|
||
"status_text": "864 服务端连接尚未建立,等待创建登录连接",
|
||
"provider_stage": "connection_pending",
|
||
"login_required": False,
|
||
}
|
||
if "重新登录" in error_message or "未登录" in error_message:
|
||
return {
|
||
**default_payload,
|
||
"status_text": error_message or "当前登录态已失效,等待重新扫码登录",
|
||
}
|
||
return {
|
||
**default_payload,
|
||
"status": "unavailable",
|
||
"status_text": error_message or "864 登录状态暂不可用",
|
||
"provider_stage": "status_unavailable",
|
||
}
|
||
|
||
if isinstance(login_status, dict):
|
||
login_message = str(
|
||
login_status.get("msg")
|
||
or login_status.get("message")
|
||
or login_status.get("loginState")
|
||
or ""
|
||
).strip()
|
||
if "在线" in login_message or str(login_status.get("loginState", "")).strip().lower() == "online":
|
||
return {
|
||
"status": "logged_in",
|
||
"status_text": "已检测到现有登录态",
|
||
"login_source": "runtime_state",
|
||
"provider_name": "server_864",
|
||
"provider_stage": "logged_in",
|
||
"connection_ready": True,
|
||
"login_required": False,
|
||
}
|
||
if "重新登录" in login_message or "未登录" in login_message:
|
||
return {
|
||
**default_payload,
|
||
"status_text": login_message or "当前登录态已失效,等待重新扫码登录",
|
||
}
|
||
return default_payload
|
||
|
||
async def _wait_init_ready(self, *, logger) -> None:
|
||
"""等待 864 server 侧初始化完成。"""
|
||
for _ in range(30):
|
||
try:
|
||
if await self.get_init_status():
|
||
return
|
||
except Exception as e:
|
||
logger.warning(f"server_864 检查初始化状态失败: {e}")
|
||
await asyncio.sleep(2)
|
||
|
||
async def _refresh_identity_from_profile(self, *, logger) -> None:
|
||
"""从 864 的资料接口刷新当前登录身份。"""
|
||
profile = await self.get_profile()
|
||
self.wxid = str(
|
||
profile.get("UserName")
|
||
or profile.get("userName")
|
||
or profile.get("Wxid")
|
||
or profile.get("wxid")
|
||
or self.wxid
|
||
)
|
||
nickname = profile.get("NickName") or profile.get("nickName") or profile.get("Nickname") or ""
|
||
if isinstance(nickname, dict):
|
||
nickname = nickname.get("string", "")
|
||
alias = profile.get("Alias") or profile.get("alias") or ""
|
||
phone = profile.get("Mobile") or profile.get("mobile") or profile.get("BindMobile") or ""
|
||
if isinstance(phone, dict):
|
||
phone = phone.get("string", "")
|
||
signature = profile.get("Signature") or profile.get("signature") or ""
|
||
|
||
self.nickname = str(nickname or "")
|
||
self.alias = str(alias or "")
|
||
self.phone = str(phone or "")
|
||
self.signature = str(signature or "")
|
||
logger.info(
|
||
f"server_864 登录账号信息: wxid: {self.wxid} 昵称: {self.nickname} 微信号: {self.alias} 手机号: {self.phone}"
|
||
)
|
||
|
||
@staticmethod
|
||
def _save_runtime_state(*, state_path: str, state_payload: dict[str, Any], logger) -> None:
|
||
"""保存 864 provider 的本地登录缓存。"""
|
||
try:
|
||
normalized_path = str(state_path or "").strip()
|
||
if not normalized_path:
|
||
return
|
||
state_dir = os.path.dirname(normalized_path)
|
||
if state_dir:
|
||
os.makedirs(state_dir, exist_ok=True)
|
||
with open(normalized_path, "w", encoding="utf-8") as f:
|
||
toml.dump(state_payload, f)
|
||
except Exception as e:
|
||
logger.warning(f"写入 server_864 本地状态失败: path={state_path}, error={e}")
|
||
|
||
async def _set_runtime_running(self, running: bool, *, on_runtime_state_change: AsyncCallback | None, logger) -> None:
|
||
self._runtime_running = running
|
||
if on_runtime_state_change:
|
||
await self._safe_callback(
|
||
on_runtime_state_change,
|
||
running,
|
||
logger=logger,
|
||
callback_name="on_runtime_state_change",
|
||
)
|
||
|
||
async def _safe_callback(self, callback: AsyncCallback | None, *args: Any, logger, callback_name: str) -> None:
|
||
if callback is None:
|
||
return
|
||
try:
|
||
await callback(*args)
|
||
except Exception as e:
|
||
logger.exception(f"执行回调失败: {callback_name}, error: {e}")
|
||
|
||
def get_login_identity(self) -> dict[str, Any]:
|
||
"""返回统一登录身份结构。"""
|
||
return {
|
||
"wxid": self.wxid,
|
||
"nickname": self.nickname,
|
||
"alias": self.alias,
|
||
"phone": self.phone,
|
||
"signature": getattr(self, "signature", ""),
|
||
"device_name": "",
|
||
"device_id": "",
|
||
}
|