import asyncio import os import time from typing import Any, Awaitable, Callable import toml AsyncCallback = Callable[..., Awaitable[None]] class Server864RuntimeMixin: """864 provider 的运行时编排。 设计说明: 1. 864 的差异重点在于“固定 key + server 侧维护更多登录状态”; 2. 因此这里不再照搬 855 的心跳/长心跳双循环,而是采用“登录确认 + 初始化等待 + HTTP 消息轮询”; 3. 这样能先把现有 Robot 主链路无感切到 864,后续若要补 WS 监听也只需在本目录内演进。 """ def _init_runtime_state(self) -> None: self._runtime_running = False def stop_runtime(self) -> None: self._runtime_running = False def is_runtime_running(self) -> bool: return bool(getattr(self, "_runtime_running", False)) async def run_runtime( self, *, ipad_config: dict, state_path: str, logger, on_login_ready: AsyncCallback, on_history_message: AsyncCallback, on_message: AsyncCallback, on_idle_payload: AsyncCallback | None = None, on_logout: AsyncCallback | None = None, on_runtime_state_change: AsyncCallback | None = None, on_login_qr_update: AsyncCallback | None = None, on_login_qr_cleared: AsyncCallback | None = None, ) -> None: """启动 864 provider 的运行时主循环。""" del on_history_message server_key = str(ipad_config.get("server_key", "") or "").strip() if not server_key: raise ValueError("server_864 启动失败:缺少 server_key,请在 .env 中配置 WECHAT_SERVER_KEY") self.server_key = server_key await self._ensure_login( ipad_config=ipad_config, state_path=state_path, logger=logger, on_login_qr_update=on_login_qr_update, on_login_qr_cleared=on_login_qr_cleared, ) await on_login_ready(self.get_login_identity()) logger.info("server_864 登录成功") await self._set_runtime_running(True, on_runtime_state_change=on_runtime_state_change, logger=logger) try: logger.info("开始处理 server_864 消息轮询") while self.is_runtime_running(): try: data_temp = await self.sync_message() except Exception as e: logger.error(f"server_864 获取新消息失败: {e}") await self._safe_callback(on_logout, str(e), logger=logger, callback_name="on_logout") await self._set_runtime_running(False, on_runtime_state_change=on_runtime_state_change, logger=logger) break data = data_temp.get("AddMsgs") or [] if data: for message in data: await self._safe_callback(on_message, message, logger=logger, callback_name="on_message") elif on_idle_payload: await self._safe_callback( on_idle_payload, data_temp, logger=logger, callback_name="on_idle_payload", ) await asyncio.sleep(2) finally: await self._set_runtime_running(False, on_runtime_state_change=on_runtime_state_change, logger=logger) async def _ensure_login( self, *, ipad_config: dict, state_path: str, logger, on_login_qr_update: AsyncCallback | None = None, on_login_qr_cleared: AsyncCallback | None = None, ) -> None: """确保 864 已完成登录。""" if await self.is_logged_in(): await self._refresh_identity_from_profile(logger=logger) await self._safe_callback( on_login_qr_cleared, {"status": "logged_in", "status_text": "已检测到现有登录态"}, logger=logger, callback_name="on_login_qr_cleared", ) return uuid, url = await self.get_qr_code(print_qr=True) scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else "" await self._safe_callback( on_login_qr_update, { "uuid": uuid, "url": url, "scan_url": scan_url, "expires_in": None, "status": "waiting", "status_text": "等待扫码登录", "login_source": "fresh_qr", }, logger=logger, callback_name="on_login_qr_update", ) while True: is_logged_in, login_status = await self.check_login_status() if is_logged_in: await self._safe_callback( on_login_qr_cleared, {"status": "confirmed", "status_text": "扫码登录成功", "uuid": uuid}, logger=logger, callback_name="on_login_qr_cleared", ) break # 864 的登录状态查询会回传当前 uuid 和有效期: # 1. 真实联调中已确认 `CheckLoginStatus` 会返回 `uuid/effective_time`; # 2. 这些值应优先作为 Dashboard 的二维码倒计时与当前扫码目标来源; # 3. 一旦 server 侧切换了新的 uuid,这里也要及时覆盖本地展示态,避免前端一直盯着旧码。 latest_uuid = str(login_status.get("uuid", "") or uuid).strip() or uuid effective_time = int(login_status.get("effective_time", 0) or 0) if latest_uuid != uuid: uuid = latest_uuid scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else "" url = f"https://api.2dcode.biz/v1/create-qr-code?data={scan_url}" if scan_url else url await self._safe_callback( on_login_qr_update, { "uuid": uuid, "url": url, "scan_url": scan_url, "expires_in": effective_time if effective_time > 0 else None, "status": "waiting", "status_text": str(login_status.get("msg") or login_status.get("loginState") or "等待扫码登录"), "login_source": "fresh_qr", }, logger=logger, callback_name="on_login_qr_update", ) # 若 server 已明确告知二维码失效,则立即重新申请一张新码: # 1. 这能避免 Dashboard 一直展示一张已经不可扫的旧二维码; # 2. 也能让新环境登录时的交互与 855 保持一致,都是“过期就自动刷新”; # 3. 重新申请后直接回到当前 while 顶部继续轮询新的 uuid 状态。 if effective_time <= 0: uuid, url = await self.get_qr_code(print_qr=True) scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else "" await self._safe_callback( on_login_qr_update, { "uuid": uuid, "url": url, "scan_url": scan_url, "expires_in": None, "status": "waiting", "status_text": "二维码已刷新,等待扫码登录", "login_source": "refresh_qr", }, logger=logger, callback_name="on_login_qr_update", ) await asyncio.sleep(5) await self._wait_init_ready(logger=logger) await self._refresh_identity_from_profile(logger=logger) ipad_config["wxid"] = self.wxid ipad_config["login_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) self._save_runtime_state( state_path=state_path, state_payload={"wxid": self.wxid, "login_time": ipad_config["login_time"]}, logger=logger, ) async def _wait_init_ready(self, *, logger) -> None: """等待 864 server 侧初始化完成。""" for _ in range(30): try: if await self.get_init_status(): return except Exception as e: logger.warning(f"server_864 检查初始化状态失败: {e}") await asyncio.sleep(2) async def _refresh_identity_from_profile(self, *, logger) -> None: """从 864 的资料接口刷新当前登录身份。""" profile = await self.get_profile() self.wxid = str( profile.get("UserName") or profile.get("userName") or profile.get("Wxid") or profile.get("wxid") or self.wxid ) nickname = profile.get("NickName") or profile.get("nickName") or profile.get("Nickname") or "" if isinstance(nickname, dict): nickname = nickname.get("string", "") alias = profile.get("Alias") or profile.get("alias") or "" phone = profile.get("Mobile") or profile.get("mobile") or profile.get("BindMobile") or "" if isinstance(phone, dict): phone = phone.get("string", "") signature = profile.get("Signature") or profile.get("signature") or "" self.nickname = str(nickname or "") self.alias = str(alias or "") self.phone = str(phone or "") self.signature = str(signature or "") logger.info( f"server_864 登录账号信息: wxid: {self.wxid} 昵称: {self.nickname} 微信号: {self.alias} 手机号: {self.phone}" ) @staticmethod def _save_runtime_state(*, state_path: str, state_payload: dict[str, Any], logger) -> None: """保存 864 provider 的本地登录缓存。""" try: normalized_path = str(state_path or "").strip() if not normalized_path: return state_dir = os.path.dirname(normalized_path) if state_dir: os.makedirs(state_dir, exist_ok=True) with open(normalized_path, "w", encoding="utf-8") as f: toml.dump(state_payload, f) except Exception as e: logger.warning(f"写入 server_864 本地状态失败: path={state_path}, error={e}") async def _set_runtime_running(self, running: bool, *, on_runtime_state_change: AsyncCallback | None, logger) -> None: self._runtime_running = running if on_runtime_state_change: await self._safe_callback( on_runtime_state_change, running, logger=logger, callback_name="on_runtime_state_change", ) async def _safe_callback(self, callback: AsyncCallback | None, *args: Any, logger, callback_name: str) -> None: if callback is None: return try: await callback(*args) except Exception as e: logger.exception(f"执行回调失败: {callback_name}, error: {e}") def get_login_identity(self) -> dict[str, Any]: """返回统一登录身份结构。""" return { "wxid": self.wxid, "nickname": self.nickname, "alias": self.alias, "phone": self.phone, "signature": getattr(self, "signature", ""), "device_name": "", "device_id": "", }