74 lines
3.5 KiB
Python
74 lines
3.5 KiB
Python
from loguru import logger
|
||
|
||
from wechat_ipad.errors import UserLoggedOut
|
||
|
||
from wechat_ipad.providers.server_864.base import Server864APIClientBase
|
||
|
||
|
||
class UserMixin(Server864APIClientBase):
|
||
"""864 用户资料相关接口。"""
|
||
|
||
async def get_profile(self, wxid: str = None) -> dict:
|
||
"""获取当前登录账号的资料。"""
|
||
del wxid
|
||
data = await self._request_data("get", "/user/GetProfile", timeout=20)
|
||
return dict(data or {})
|
||
|
||
async def get_profile_info_ext(self, wxid: str = None) -> dict:
|
||
"""获取扩展资料。
|
||
|
||
说明:
|
||
1. 864 当前公开接口只直接暴露“当前账号资料”;
|
||
2. 为了兼容项目现有调用面,这里先从主资料中补常见字段;
|
||
3. 若后续发现 864 某个版本提供了专门扩展接口,再局部替换即可。
|
||
"""
|
||
del wxid
|
||
profile = await self.get_profile()
|
||
return dict(profile.get("userInfoExt", {}) or profile.get("UserInfoExt", {}) or profile)
|
||
|
||
async def get_my_qrcode(self, style: int = 8) -> str:
|
||
"""获取当前登录账号个人二维码。"""
|
||
data = await self._request_data(
|
||
"post",
|
||
"/user/GetMyQrCode",
|
||
json_body={"Style": int(style), "Recover": False},
|
||
timeout=20,
|
||
)
|
||
qrcode_info = self._pick_first(data, "qrcode", "Qrcode", "QrCode") or {}
|
||
return str(self._pick_first(qrcode_info, "buffer", "Buffer", "src", "Src") or "")
|
||
|
||
async def is_logged_in(self, wxid: str = None) -> bool:
|
||
"""检查 864 当前账号是否在线。"""
|
||
del wxid
|
||
try:
|
||
# 优先使用 864 自己的登录状态接口判断在线态:
|
||
# 1. `GetProfile` 在某些版本里会比真实登录态更早失效,容易把“已登录但资料接口异常”误判成未登录;
|
||
# 2. 用户当前给出的 `GetLoginStatus` 正是更贴近 server 自身会话状态的一条探针;
|
||
# 3. 因此这里先走登录状态接口,只有它也无法确认时,才回退到资料接口兜底。
|
||
login_status = await self.get_login_status(auto_login=False)
|
||
if self._is_online_from_login_status_payload(login_status):
|
||
identity = self._extract_login_identity_from_status(login_status)
|
||
if identity.get("wxid"):
|
||
self.wxid = identity["wxid"]
|
||
if identity.get("nickname"):
|
||
self.nickname = identity["nickname"]
|
||
if identity.get("alias"):
|
||
self.alias = identity["alias"]
|
||
if identity.get("phone"):
|
||
self.phone = identity["phone"]
|
||
if identity.get("signature"):
|
||
self.signature = identity["signature"]
|
||
return True
|
||
await self.get_profile()
|
||
return True
|
||
except UserLoggedOut as e:
|
||
# “未登录 / 需要重新登录”是 864 登录引导中的常规状态,不应该长期污染 error 日志:
|
||
# 1. 当前首页会主动引导扫码,这类返回本质上只是“当前还没登录完成”;
|
||
# 2. 若仍按 error 记录,运维排查时很难分清真正异常和正常登录态切换;
|
||
# 3. 因此这里降级为 info,只保留可读状态文本。
|
||
logger.info("server_864 is_logged_in:{}", e)
|
||
return False
|
||
except Exception as e:
|
||
logger.warning("server_864 is_logged_in:{}", e)
|
||
return False
|