Files
abot/wechat_ipad/providers/legacy_855/runtime.py

494 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import os
import time
from typing import Any, Awaitable, Callable
import toml
AsyncCallback = Callable[..., Awaitable[None]]
class Legacy855RuntimeMixin:
"""855/859 风格 server 的运行时编排实现。
设计说明:
1. 855 的差异不只是接口路径,而是“客户端自己负责保活和拉消息”的运行模型;
2. 这里把登录、心跳、历史消息消化、实时轮询、掉线恢复集中收口到 provider 内部;
3. `Robot` 只再关心“登录成功后做什么”“收到消息后怎么处理”,避免后续接 864 时继续改主链路。
"""
def _init_runtime_state(self) -> None:
"""初始化运行时状态字段。
这里不用 `__init__` 参与多继承链,而是由 provider 显式调用:
1. 现有 mixin 组合里已经有消息队列初始化逻辑;
2. 显式初始化更容易看清哪些状态只属于 855 runtime
3. 也能避免后续再因为 MRO 调用顺序引入隐蔽问题。
"""
self._runtime_running = False
self._runtime_recovery_lock = asyncio.Lock()
def stop_runtime(self) -> None:
"""请求停止当前 provider 的运行时主循环。"""
self._runtime_running = False
def is_runtime_running(self) -> bool:
"""返回当前 provider 运行时是否处于运行态。"""
return bool(getattr(self, "_runtime_running", False))
async def run_runtime(
self,
*,
ipad_config: dict,
state_path: str,
logger,
on_login_ready: AsyncCallback,
on_history_message: AsyncCallback,
on_message: AsyncCallback,
on_idle_payload: AsyncCallback | None = None,
on_logout: AsyncCallback | None = None,
on_runtime_state_change: AsyncCallback | None = None,
on_login_qr_update: AsyncCallback | None = None,
on_login_qr_cleared: AsyncCallback | None = None,
) -> None:
"""启动 855 provider 的完整运行时。
参数说明:
1. `ipad_config` 与 `state_path` 由上层传入provider 只负责更新和落盘登录态;
2. `on_*` 回调保持尽量少,只暴露业务层真正需要接手的几个时机;
3. 这样既避免 `Robot` 再写协议细节,也不额外引入复杂的事件总线或状态机层。
"""
wxid = str(ipad_config.get("wxid", "") or "").strip()
device_name = str(ipad_config.get("device_name", "") or "").strip()
device_id = str(ipad_config.get("device_id", "") or "").strip()
if not device_name:
device_name = self.create_device_name()
if not device_id:
device_id = self.create_device_id()
await self._ensure_login(
wxid=wxid,
device_name=device_name,
device_id=device_id,
ipad_config=ipad_config,
state_path=state_path,
logger=logger,
on_login_qr_update=on_login_qr_update,
on_login_qr_cleared=on_login_qr_cleared,
)
# 登录后的项目初始化若失败,应直接中断启动:
# 1. 这里会初始化联系人缓存、插件注入、消息存储等关键依赖;
# 2. 如果仅记录异常继续运行,后续消息循环只会在半初始化状态下引发更多连锁问题;
# 3. 因此这里刻意不吞异常,让启动期问题尽早暴露。
await on_login_ready(self.get_login_identity(device_name=device_name, device_id=device_id))
logger.info(f"wechat_ipad登录设备信息: device_name: {device_name} device_id: {device_id}")
logger.info("wechat_ipad登录成功")
logger.info("处理堆积消息中")
await self._drain_startup_history(on_history_message=on_history_message, logger=logger)
logger.info("处理堆积消息完毕")
await self._set_runtime_running(True, on_runtime_state_change=on_runtime_state_change, logger=logger)
heartbeat_task = asyncio.create_task(
self._heartbeat_loop(
heartbeat_func=self.heartbeat,
interval_seconds=60,
logger=logger,
loop_name="wechat_ipad心跳",
on_logout=on_logout,
on_runtime_state_change=on_runtime_state_change,
)
)
heartbeat_long_task = asyncio.create_task(
self._heartbeat_loop(
heartbeat_func=self.heartbeat_long,
interval_seconds=120,
logger=logger,
loop_name="wechat_ipad长连接心跳",
on_logout=on_logout,
on_runtime_state_change=on_runtime_state_change,
)
)
try:
logger.info("开始处理wechat_ipad消息")
while self.is_runtime_running():
try:
data_temp = await self.sync_message()
except Exception as e:
logger.error(f"获取新消息失败 {e}")
recovered = await self._try_recover_from_logout(
reason=e,
logger=logger,
on_logout=on_logout,
on_runtime_state_change=on_runtime_state_change,
)
if not recovered:
if not self.is_runtime_running():
break
await asyncio.sleep(5)
continue
data = data_temp.get("AddMsgs")
if data:
for message in data:
await self._safe_callback(
on_message,
message,
logger=logger,
callback_name="on_message",
)
else:
# 对于 855 而言,`Ret=0 + KeyBuf` 只是正常空轮询返回,不需要额外刷屏日志。
if on_idle_payload:
await self._safe_callback(
on_idle_payload,
data_temp,
logger=logger,
callback_name="on_idle_payload",
)
await asyncio.sleep(2)
finally:
heartbeat_task.cancel()
heartbeat_long_task.cancel()
await asyncio.gather(heartbeat_task, heartbeat_long_task, return_exceptions=True)
await self._set_runtime_running(False, on_runtime_state_change=on_runtime_state_change, logger=logger)
async def _ensure_login(
self,
*,
wxid: str,
device_name: str,
device_id: str,
ipad_config: dict,
state_path: str,
logger,
on_login_qr_update: AsyncCallback | None = None,
on_login_qr_cleared: AsyncCallback | None = None,
) -> None:
"""保证当前 provider 已完成登录,并把登录结果写回配置。
这里沿用现有 855 的行为:
1. 优先复用缓存唤醒;
2. 唤醒失败或无缓存时回退到二维码登录;
3. 登录成功后只把 wxid / device 信息写回本地状态文件,不再要求用户手工维护 TOML。
"""
if await self.is_logged_in(wxid):
self.wxid = wxid
profile = await self.get_profile()
self.nickname = profile.get("NickName", {}).get("string", "")
self.alias = profile.get("Alias", "")
self.phone = profile.get("BindMobile", {}).get("string", "")
self.signature = profile.get("Signature", "")
await self._safe_callback(
on_login_qr_cleared,
{
"status": "logged_in",
"status_text": "已检测到现有登录态",
},
logger=logger,
callback_name="on_login_qr_cleared",
)
logger.info(
f"wechat_ipad登录账号信息: wxid: {self.wxid} 昵称: {self.nickname} 微信号: {self.alias} 手机号: {self.phone}"
)
return
while not await self.is_logged_in(wxid):
uuid = ""
url = ""
login_source = "fresh_qr"
try:
if await self.get_cached_info(wxid):
uuid = await self.awaken_login(wxid)
login_source = "awaken"
logger.info(f"获取到登录uuid: {uuid}")
else:
uuid, url = await self.get_qr_code(device_id=device_id, device_name=device_name, print_qr=True)
logger.info(f"获取到登录uuid: {uuid}")
logger.info(f"获取到登录二维码: {url}")
except Exception as e:
logger.error(f"登录过程出错: {e}")
uuid, url = await self.get_qr_code(device_id=device_id, device_name=device_name, print_qr=True)
login_source = "fresh_qr"
logger.info(f"获取到登录uuid: {uuid}")
logger.info(f"获取到登录二维码: {url}")
# 每次拿到新的 uuid 都立刻把二维码状态推给上层:
# 1. 这样 Dashboard 无需等待下一次轮询结果,就能立刻弹出二维码;
# 2. 即使是 awaken 登录没有返回图片 URL也可以先靠 uuid 生成扫码内容;
# 3. 后续倒计时再通过 check_login_uuid 的轮询结果持续刷新。
scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else ""
await self._safe_callback(
on_login_qr_update,
{
"uuid": uuid,
"url": url,
"scan_url": scan_url,
"expires_in": None,
"status": "waiting",
"status_text": "等待扫码登录",
"login_source": login_source,
},
logger=logger,
callback_name="on_login_qr_update",
)
while True:
logger.info(f"uuid: {uuid}, url: {url}")
stat, data = await self.check_login_uuid(uuid, device_id=device_id)
if stat:
await self._safe_callback(
on_login_qr_cleared,
{
"status": "confirmed",
"status_text": "扫码登录成功",
"uuid": uuid,
},
logger=logger,
callback_name="on_login_qr_cleared",
)
break
# 855 的扫码登录会返回剩余有效期:
# 1. 这里把它直接同步给上层Dashboard 就能展示实时倒计时;
# 2. 一旦倒计时归零,当前二维码已失效,应跳出内层循环重新申请新二维码;
# 3. 这样新环境登录时不会卡在一张已经过期的旧码上。
expires_in = int(data or 0)
qr_status = "expired" if expires_in <= 0 else "waiting"
qr_status_text = "二维码已过期,准备刷新" if expires_in <= 0 else "等待扫码登录"
await self._safe_callback(
on_login_qr_update,
{
"uuid": uuid,
"url": url,
"scan_url": scan_url,
"expires_in": expires_in,
"status": qr_status,
"status_text": qr_status_text,
"login_source": login_source,
},
logger=logger,
callback_name="on_login_qr_update",
)
logger.info(f"等待登录中,过期倒计时:{expires_in}")
if expires_in <= 0:
break
await asyncio.sleep(5)
if not stat:
# 当前二维码失效后回到外层 while 重新申请新二维码,
# 这样可以持续给 Dashboard 产出新的扫码入口。
continue
self._apply_login_result(data=data, logger=logger)
ipad_config["wxid"] = self.wxid
ipad_config["device_name"] = device_name
ipad_config["device_id"] = device_id
ipad_config["login_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self._save_runtime_state(
state_path=state_path,
state_payload={
"wxid": self.wxid,
"device_name": device_name,
"device_id": device_id,
"login_time": ipad_config["login_time"],
},
logger=logger,
)
break
@staticmethod
def _save_runtime_state(*, state_path: str, state_payload: dict[str, Any], logger) -> None:
"""把运行期登录状态写入本地缓存文件。
这里刻意只保存动态字段:
1. server_url / server_ip / server_port 已经统一走 `.env`
2. 本地状态文件只承载登录缓存,避免用户再维护两套静态配置;
3. 路径所在目录不存在时自动创建,兼容首次启动与 Docker 挂载目录。
"""
try:
normalized_path = str(state_path or "").strip()
if not normalized_path:
return
state_dir = os.path.dirname(normalized_path)
if state_dir:
os.makedirs(state_dir, exist_ok=True)
with open(normalized_path, "w", encoding="utf-8") as f:
toml.dump(state_payload, f)
except Exception as e:
logger.warning(f"写入 wechat_ipad 本地状态失败: path={state_path}, error={e}")
def _apply_login_result(self, *, data: dict, logger) -> None:
"""把登录接口返回的用户信息统一写回当前 provider。"""
acct_section = data.get("acctSectResp", {}) or {}
self.wxid = acct_section.get("userName", "")
self.nickname = acct_section.get("nickName", "")
self.alias = acct_section.get("alias", "")
self.phone = acct_section.get("bindMobile", "")
self.signature = data.get("Signature", "")
logger.info(
f"wechat_ipad登录账号信息: wxid: {self.wxid} 昵称: {self.nickname} 微信号: {self.alias} 手机号: {self.phone}"
)
async def _drain_startup_history(self, *, on_history_message: AsyncCallback, logger) -> None:
"""在实时主循环前先消化堆积消息。
这里保持旧逻辑的退出条件:
1. 连续多次轮询不到 `AddMsgs` 才认为历史堆积已经处理完;
2. 每批历史消息仍交给上层回调决定如何归档;
3. provider 只负责拉取与调度,不把历史消息也混入实时业务处理。
"""
empty_rounds = 0
while True:
data = await self.sync_message()
add_msgs = data.get("AddMsgs")
if not add_msgs:
if empty_rounds > 2:
break
empty_rounds += 1
continue
logger.debug(f"接受到 {len(add_msgs)} 条历史消息,开始仅落库归档")
for raw_message in add_msgs:
await self._safe_callback(
on_history_message,
raw_message,
logger=logger,
callback_name="on_history_message",
)
await asyncio.sleep(1)
async def _heartbeat_loop(
self,
*,
heartbeat_func: Callable[[], Awaitable[bool]],
interval_seconds: int,
logger,
loop_name: str,
on_logout: AsyncCallback | None,
on_runtime_state_change: AsyncCallback | None,
) -> None:
"""统一承接心跳与长心跳循环,减少 855 provider 内部重复代码。"""
logger.info(f"开启{loop_name}")
while self.is_runtime_running():
try:
success = await heartbeat_func()
if success:
logger.debug(f"{loop_name}进行中")
else:
logger.warning(f"{loop_name}失败")
except Exception as e:
logger.error(f"{loop_name}: {e}")
recovered = await self._try_recover_from_logout(
reason=e,
logger=logger,
on_logout=on_logout,
on_runtime_state_change=on_runtime_state_change,
)
if not recovered and not self.is_runtime_running():
break
await asyncio.sleep(interval_seconds)
async def _try_recover_from_logout(
self,
*,
reason: Exception | str,
logger,
on_logout: AsyncCallback | None,
on_runtime_state_change: AsyncCallback | None,
) -> bool:
"""处理 855 provider 的掉线恢复逻辑。
关键点:
1. 855 的掉线恢复是 provider 运行模型的一部分,因此也应该收口在 provider 内部;
2. 这里用锁把恢复流程串行化,避免心跳线程与消息轮询线程同时触发二次登录;
3. 上层只接收一个“检测到掉线”的通知,用于发告警或记录运维日志。
"""
if not self._is_logout_reason(reason):
return False
async with self._runtime_recovery_lock:
# 进入锁后再判断一次,避免并发恢复时第二个协程重复执行二次登录。
if not self.is_runtime_running():
return False
await self._safe_callback(
on_logout,
str(reason),
logger=logger,
callback_name="on_logout",
)
try:
logger.info("定时进行二次登录动作")
resp = await self.twice_auto_auth()
if resp:
logger.info("定时二次登录成功!")
return True
logger.error("定时二次登录失败!")
except Exception as e:
logger.error(f"login_twice_auto_auth error: {e}")
await self._set_runtime_running(False, on_runtime_state_change=on_runtime_state_change, logger=logger)
return False
async def _set_runtime_running(
self,
running: bool,
*,
on_runtime_state_change: AsyncCallback | None,
logger,
) -> None:
"""同步 provider 运行态,并通知上层镜像状态。"""
self._runtime_running = running
if on_runtime_state_change:
await self._safe_callback(
on_runtime_state_change,
running,
logger=logger,
callback_name="on_runtime_state_change",
)
async def _safe_callback(
self,
callback: AsyncCallback | None,
*args: Any,
logger,
callback_name: str,
) -> None:
"""统一保护上层回调,避免单个业务异常直接打断 provider 主循环。"""
if callback is None:
return
try:
await callback(*args)
except Exception as e:
logger.exception(f"执行回调失败: {callback_name}, error: {e}")
@staticmethod
def _is_logout_reason(reason: Exception | str) -> bool:
"""判断当前异常是否属于 855 provider 约定的掉线场景。"""
return "用户可能退出" in str(reason)
def get_login_identity(self, *, device_name: str = "", device_id: str = "") -> dict[str, Any]:
"""返回当前登录身份的轻量归一化结构。
第一阶段先继续使用 dict
1. 便于 `Robot` 直接消费,不额外引入 dataclass
2. 后续如果 864 也需要对齐结构,可以在 provider 内继续增量补字段;
3. 这里同时把 device 信息带上,方便上层统一打印和展示。
"""
return {
"wxid": self.wxid,
"nickname": self.nickname,
"alias": self.alias,
"phone": self.phone,
"signature": getattr(self, "signature", ""),
"device_name": device_name,
"device_id": device_id,
}