- 新增 server_864 独立 provider 目录,接入登录、消息轮询、联系人、群资料、用户资料与朋友圈基础能力 - 扩展 gateway、robot 与配置归一化逻辑,支持 server_864/864 别名和 WECHAT_SERVER_KEY - 更新配置示例与多版本适配路线图,明确 864 第一版接入范围和后续待补项
85 lines
3.1 KiB
Python
85 lines
3.1 KiB
Python
import qrcode
|
|
|
|
from wechat_ipad.providers.server_864.base import Server864APIClientBase
|
|
|
|
|
|
class LoginMixin(Server864APIClientBase):
|
|
"""864 登录相关接口。"""
|
|
|
|
async def get_qr_code(
|
|
self,
|
|
device_name: str = "",
|
|
device_id: str = "",
|
|
proxy=None,
|
|
print_qr: bool = False,
|
|
) -> tuple[str, str]:
|
|
"""获取 864 登录二维码。
|
|
|
|
说明:
|
|
1. 864 不依赖 855 的 `device_name/device_id` 入参,但保留参数签名以兼容上层调用;
|
|
2. `proxy` 当前仅保留兼容占位,后续如需补实际代理登录,可直接映射到 swagger 的 Proxy 字段;
|
|
3. 返回值继续保持 `(uuid, url)`,方便 Dashboard 与运行时共用同一套二维码展示逻辑。
|
|
"""
|
|
del device_name, device_id
|
|
proxy_value = ""
|
|
if proxy is not None:
|
|
proxy_value = getattr(proxy, "proxy", "") or ""
|
|
|
|
data = await self._request_data(
|
|
"post",
|
|
"/login/GetLoginQrCodeNew",
|
|
json_body={"Proxy": proxy_value, "Check": False},
|
|
timeout=30,
|
|
)
|
|
uuid = self._pick_first(data, "UUID", "Uuid", "uuid") or ""
|
|
qr_url = (
|
|
self._pick_first(data, "QrUrl", "QRUrl", "qrUrl")
|
|
or self._pick_first(self._pick_first(data, "Qrcode", "QrCode", "qrcode") or {}, "Src", "src")
|
|
or ""
|
|
)
|
|
|
|
if print_qr and uuid:
|
|
qr = qrcode.QRCode(
|
|
version=1,
|
|
error_correction=qrcode.constants.ERROR_CORRECT_L,
|
|
box_size=10,
|
|
border=4,
|
|
)
|
|
qr.add_data(f"http://weixin.qq.com/x/{uuid}")
|
|
qr.make(fit=True)
|
|
qr.print_ascii()
|
|
|
|
return str(uuid), str(qr_url)
|
|
|
|
async def check_login_status(self) -> tuple[bool, dict]:
|
|
"""检查当前二维码登录状态。"""
|
|
data = await self._request_data("get", "/login/CheckLoginStatus", timeout=20)
|
|
normalized = dict(data or {})
|
|
state = int(normalized.get("state", 0) or 0)
|
|
login_state = str(normalized.get("loginState", "") or "").strip().lower()
|
|
return state == 2 or login_state == "online", normalized
|
|
|
|
async def get_init_status(self) -> bool:
|
|
"""检查 server 侧初始化是否完成。"""
|
|
data = await self._request_data("get", "/login/GetInItStatus", timeout=15)
|
|
return bool(data)
|
|
|
|
async def awaken_login(self, wxid: str = "") -> dict:
|
|
"""触发 864 的唤醒登录。"""
|
|
del wxid
|
|
return await self._request_data("post", "/login/WakeUpLogin", timeout=30)
|
|
|
|
async def get_login_status(self, auto_login: bool = True) -> dict:
|
|
"""获取 864 在线状态。"""
|
|
return await self._request_data(
|
|
"get",
|
|
"/login/GetLoginStatus",
|
|
params={"autoLogin": str(bool(auto_login)).lower()},
|
|
timeout=20,
|
|
)
|
|
|
|
async def log_out(self) -> bool:
|
|
"""退出当前 864 登录态。"""
|
|
await self._request_data("get", "/login/LogOutRequest", timeout=15)
|
|
return True
|