搭建 legacy_855 独立 provider 骨架

- 新增微信 Gateway 与 Provider 基类,为多版本 server 切换预留入口
- 将 855/859 现有协议实现迁入 providers/legacy_855 独立目录管理
- 保留 WechatAPIClient 旧命名出口,先维持调用面兼容并带上运行资源文件
This commit is contained in:
liuwei
2026-05-07 09:49:06 +08:00
parent 33dea46e1b
commit 99d226c092
15 changed files with 2247 additions and 2 deletions

View File

@@ -5,5 +5,11 @@ from wechat_ipad.errors import *
# 设置包名
__name__ = "wechat_api"
# 导出 WechatAPIClient 类,但避免循环导入
from wechat_ipad.client import WechatAPIClient
# 导出新版 Provider 与 Gateway 入口:
# 1. `WechatAPIClient` 继续保留原有命名,尽量减少第一阶段替换成本;
# 2. 实际实现已切换到 `providers/legacy_855/` 目录,不再要求继续复用旧 client 目录;
# 3. `WechatGateway` 作为后续多版本切换入口,对 Robot 改造时可逐步接入。
from wechat_ipad.gateway import WechatGateway
from wechat_ipad.providers.legacy_855 import Legacy855WechatClient as WechatAPIClient
__all__ = [name for name in globals().keys() if not name.startswith("_")]

33
wechat_ipad/gateway.py Normal file
View File

@@ -0,0 +1,33 @@
from typing import Any, Dict, Type
from wechat_ipad.providers.legacy_855 import Legacy855WechatClient
class WechatGateway:
"""按 server_type 选择具体 Provider 的薄网关。
当前策略:
1. Gateway 只负责选择 Provider并把调用透传出去
2. 不在这里承载协议差异或运行时细节,避免再次形成新的“大中台”;
3. 第一阶段默认只完整支持 `legacy_855`,后续接入 864 时在映射表中扩展即可。
"""
_PROVIDER_MAP: Dict[str, Type[Legacy855WechatClient]] = {
"legacy_855": Legacy855WechatClient,
"855": Legacy855WechatClient,
"859": Legacy855WechatClient,
}
def __init__(self, ip: str, port: int, server_type: str = "legacy_855", **kwargs: Any):
normalized_server_type = str(server_type or "legacy_855").strip().lower()
provider_cls = self._PROVIDER_MAP.get(normalized_server_type)
if provider_cls is None:
raise ValueError(f"不支持的 wechat provider 类型: {server_type}")
self.server_type = normalized_server_type
self.provider = provider_cls(ip=ip, port=port, **kwargs)
def __getattr__(self, item: str) -> Any:
"""将未显式实现的属性/方法透传给具体 Provider。"""
return getattr(self.provider, item)

View File

@@ -0,0 +1,15 @@
from abc import ABC
class WechatProviderBase(ABC):
"""微信接入 Provider 的轻量基类。
设计说明:
1. 当前阶段不强制所有 Provider 继承一整套复杂抽象接口,只提供一个共同的语义入口;
2. 这里保留 `provider_name`、`server_type` 两个最基础标识,便于 Gateway 与日志识别;
3. 后续如需补统一生命周期方法,可继续在该基类上增量扩展,而不影响现有阅读体验。
"""
provider_name = "base"
server_type = "base"

View File

@@ -0,0 +1 @@
# 多版本微信接入 Provider 包。

View File

@@ -0,0 +1,4 @@
from wechat_ipad.providers.legacy_855.provider import Legacy855WechatClient
__all__ = ["Legacy855WechatClient"]

View File

@@ -0,0 +1,103 @@
from dataclasses import dataclass
from wechat_ipad.errors import *
@dataclass
class Proxy:
"""代理(无效果,别用!)
Args:
ip (str): 代理服务器IP地址
port (int): 代理服务器端口
username (str, optional): 代理认证用户名. 默认为空字符串
password (str, optional): 代理认证密码. 默认为空字符串
"""
ip: str
port: int
username: str = ""
password: str = ""
@dataclass
class Section:
"""数据段配置类
Args:
data_len (int): 数据长度
start_pos (int): 起始位置
"""
data_len: int
start_pos: int
class WechatAPIClientBase:
"""微信API客户端基类
Args:
ip (str): 服务器IP地址
port (int): 服务器端口
Attributes:
wxid (str): 微信ID
nickname (str): 昵称
alias (str): 别名
phone (str): 手机号
"""
def __init__(self, ip: str, port: int):
self.ip = ip
self.port = port
self.wxid = ""
self.nickname = ""
self.alias = ""
self.phone = ""
# 调用所有 Mixin 的初始化方法
super().__init__()
@staticmethod
def error_handler(json_resp):
"""处理API响应中的错误码
Args:
json_resp (dict): API响应的JSON数据
Raises:
ValueError: 参数错误时抛出
MarshallingError: 序列化错误时抛出
UnmarshallingError: 反序列化错误时抛出
MMTLSError: MMTLS初始化错误时抛出
PacketError: 数据包长度错误时抛出
UserLoggedOut: 用户已退出登录时抛出
ParsePacketError: 解析数据包错误时抛出
DatabaseError: 数据库错误时抛出
Exception: 其他类型错误时抛出
"""
code = json_resp.get("Code")
if code == -1: # 参数错误
raise ValueError(json_resp.get("Message"))
elif code == -2: # 其他错误
raise Exception(json_resp.get("Message"))
elif code == -3: # 序列化错误
raise MarshallingError(json_resp.get("Message"))
elif code == -4: # 反序列化错误
raise UnmarshallingError(json_resp.get("Message"))
elif code == -5: # MMTLS初始化错误
raise MMTLSError(json_resp.get("Message"))
elif code == -6: # 收到的数据包长度错误
raise PacketError(json_resp.get("Message"))
elif code == -7: # 已退出登录
raise UserLoggedOut("Already logged out")
elif code == -8: # 链接过期
raise Exception(json_resp.get("Message"))
elif code == -9: # 解析数据包错误
raise ParsePacketError(json_resp.get("Message"))
elif code == -10: # 数据库错误
raise DatabaseError(json_resp.get("Message"))
elif code == -11: # 登陆异常
raise UserLoggedOut(json_resp.get("Message"))
elif code == -12: # 操作过于频繁
raise Exception(json_resp.get("Message"))
elif code == -13: # 上传失败
raise Exception(json_resp.get("Message"))

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

View File

@@ -0,0 +1,145 @@
import base64
import os
from typing import Union
import aiofiles
import aiohttp
from wechat_ipad import UserLoggedOut
from wechat_ipad.models.friend_circle_info import build_friend_circle_xml
from wechat_ipad.providers.legacy_855.base import WechatAPIClientBase
class FriendCircleMixin(WechatAPIClientBase):
async def get_friend_circle_list(self, max_id: int = 0, first_page_md5: str = "") -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Maxid": max_id, "Fristpagemd5": first_page_md5}
response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/GetList", json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)
async def get_friend_circle_detail(self, towxid: str, max_id: int = 0, first_page_md5: str = "") -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {
"Wxid": self.wxid,
"Towxid": towxid,
"Maxid": max_id,
"Fristpagemd5": first_page_md5
}
response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/GetDetail", json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)
async def get_friend_circle_id_detail(self, object_id: Union[str, int], towxid: str = "") -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Towxid": towxid, "Id": int(object_id)}
response = await session.post(
f"http://{self.ip}:{self.port}/api/FriendCircle/GetIdDetail",
json=json_param
)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)
async def publish_friend_circle(self, content: str, media_items: list[dict] | None = None,
blacklist: str = "", with_user_list: str = "") -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
xml_content = build_friend_circle_xml(self.wxid, content, media_items=media_items)
async with aiohttp.ClientSession() as session:
json_param = {
"Wxid": self.wxid,
"Content": xml_content,
"BlackList": blacklist,
"WithUserList": with_user_list
}
response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/Messages", json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)
async def friend_circle_comment(self, object_id: str, content: str = "", type: int = 2,
reply_comment_id: int = 0) -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {
"Wxid": self.wxid,
"Id": str(object_id),
"Type": int(type),
"Content": content,
"ReplyCommnetId": int(reply_comment_id)
}
response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/Comment", json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)
async def friend_circle_operation(self, object_id: str, type: int, comment_id: int = 0) -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {
"Wxid": self.wxid,
"Id": str(object_id),
"Type": int(type),
"CommnetId": int(comment_id)
}
response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/Operation", json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)
async def sync_friend_circle(self, sync_key: str = "") -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Synckey": sync_key}
response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/MmSnsSync", json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)
async def upload_friend_circle_media(self, media: Union[str, bytes, os.PathLike]) -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
if isinstance(media, str):
media_base64 = media.split(",", 1)[1] if "," in media else media
elif isinstance(media, bytes):
media_base64 = base64.b64encode(media).decode()
elif isinstance(media, os.PathLike):
async with aiofiles.open(media, "rb") as f:
media_base64 = base64.b64encode(await f.read()).decode()
else:
raise ValueError("media should be str, bytes, or path")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Base64": media_base64}
response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/Upload", json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)

View File

@@ -0,0 +1,149 @@
from typing import Union
import aiohttp
from wechat_ipad import UserLoggedOut
from wechat_ipad.providers.legacy_855.base import WechatAPIClientBase
class FriendMixin(WechatAPIClientBase):
async def accept_friend(self, scene: int, v1: str, v2: str) -> bool:
"""接受好友请求
主动添加好友单天上限如下所示1小时内上限为 5个超过上限时无法发出好友请求也收不到好友请求。
- 新账号5/天
- 注册超过7天10个/天
- 注册满3个月&&近期登录过该电脑15/天
- 注册满6个月&&近期经常登录过该电脑20/天
- 注册满6个月&&近期频繁登陆过该电脑30/天
- 注册1年以上&&一直登录50/天
- 上一次通过好友到下一次通过间隔20-40s
- 收到加人申请到通过好友申请每天最多通过300个好友申请间隔30s+(随机时间)
Args:
scene: 来源 在消息的xml获取
v1: v1key
v2: v2key
Returns:
bool: 操作是否成功
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Scene": scene, "V1": v1, "V2": v2}
response = await session.post(f'http://{self.ip}:{self.port}/api/Friend/PassVerify', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def get_contact(self, wxid: Union[str, list[str]]) -> Union[dict, list[dict]]:
"""获取联系人信息
Args:
wxid: 联系人wxid, 可以是多个wxid在list里也可查询chatroom
Returns:
Union[dict, list[dict]]: 单个联系人返回dict多个联系人返回list[dict]
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
if isinstance(wxid, list):
wxid = ",".join(wxid)
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "RequestWxids": wxid}
response = await session.post(f'http://{self.ip}:{self.port}/api/Friend/GetContractDetail', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
contact_list = json_resp.get("Data").get("ContactList")
if len(contact_list) == 1:
return contact_list[0]
else:
return contact_list
else:
self.error_handler(json_resp)
async def get_contract_detail(self, wxid: Union[str, list[str]], chatroom: str = "") -> list:
"""获取联系人详情
Args:
wxid: 联系人wxid
chatroom: 群聊wxid
Returns:
list: 联系人详情列表
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
if isinstance(wxid, list):
if len(wxid) > 20:
raise ValueError("一次最多查询20个联系人")
wxid = ",".join(wxid)
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Towxids": wxid, "Chatroom": chatroom}
response = await session.post(f'http://{self.ip}:{self.port}/api/Friend/GetContractDetail', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("ContactList")
else:
self.error_handler(json_resp)
async def get_contract_list(self, wx_seq: int = 0, chatroom_seq: int = 0) -> list:
"""获取联系人用户名列表
Args:
wx_seq: 联系人序列
chatroom_seq: 群聊序列
Returns:
list: 联系人用户名列表
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "CurrentWxcontactSeq": wx_seq, "CurrentChatroomContactSeq": chatroom_seq}
response = await session.post(f'http://{self.ip}:{self.port}/api/Friend/GetContractList', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
# 直接返回联系人用户名列表
return json_resp.get("Data").get("ContactUsernameList", [])
else:
self.error_handler(json_resp)
async def get_nickname(self, wxid: Union[str, list[str]]) -> Union[str, list[str]]:
"""获取用户昵称
Args:
wxid: 用户wxid可以是单个wxid或最多20个wxid的列表
Returns:
Union[str, list[str]]: 如果输入单个wxid返回str如果输入wxid列表则返回对应的昵称列表
"""
data = await self.get_contract_detail(wxid)
if isinstance(wxid, str):
try:
return data[0].get("NickName").get("string")
except:
return ""
else:
result = []
for contact in data:
try:
result.append(contact.get("NickName").get("string"))
except:
result.append("")
return result

View File

@@ -0,0 +1,203 @@
from typing import Union, Any
import aiohttp
from wechat_ipad.errors import UserLoggedOut
from wechat_ipad.providers.legacy_855.base import WechatAPIClientBase
class ChatroomMixin(WechatAPIClientBase):
async def add_chatroom_member(self, chatroom: str, wxid: str) -> bool:
"""添加群成员(群聊最多40人)
Args:
chatroom: 群聊wxid
wxid: 要添加的wxid
Returns:
bool: 成功返回True, 失败False或者报错
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Chatroom": chatroom, "InviteWxids": wxid}
response = await session.post(f'http://{self.ip}:{self.port}/AddChatroomMember', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def get_chatroom_announce(self, chatroom: str) -> dict:
"""获取群聊公告
Args:
chatroom: 群聊id
Returns:
dict: 群聊信息字典
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "QID": chatroom}
response = await session.post(f'http://{self.ip}:{self.port}/api/Group/GetChatRoomInfoDetail',
json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
data = dict(json_resp.get("Data"))
data.pop("BaseResponse")
return data
else:
self.error_handler(json_resp)
async def get_chatroom_info(self, chatroom: str) -> dict:
"""获取群聊信息
Args:
chatroom: 群聊id
Returns:
dict: 群聊信息字典
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "QID": chatroom}
response = await session.post(f'http://{self.ip}:{self.port}/api/Group/GetChatRoomInfo', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("ContactList")[0]
else:
self.error_handler(json_resp)
async def get_chatroom_member_list(self, chatroom: str) -> list[dict]:
"""获取群聊成员列表
Args:
chatroom: 群聊id
Returns:
list[dict]: 群聊成员列表
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "QID": chatroom}
response = await session.post(f'http://{self.ip}:{self.port}/api/Group/GetChatRoomMemberDetail',
json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("NewChatroomData").get("ChatRoomMember")
else:
self.error_handler(json_resp)
async def get_chatroom_qrcode(self, chatroom: str) -> dict[str, Any]:
"""获取群聊二维码
Args:
chatroom: 群聊id
Returns:
dict: {"base64": 二维码的base64, "description": 二维码描述}
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Chatroom": chatroom}
response = await session.post(f'http://{self.ip}:{self.port}/GetChatroomQRCode', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
data = json_resp.get("Data")
return {"base64": data.get("qrcode").get("buffer"), "description": data.get("revokeQrcodeWording")}
else:
self.error_handler(json_resp)
async def invite_chatroom_member(self, wxid: Union[str, list], chatroom: str) -> bool:
"""邀请群聊成员(群聊大于40人)
Args:
wxid: 要邀请的用户wxid或wxid列表
chatroom: 群聊id
Returns:
bool: 成功返回True, 失败False或者报错
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
if isinstance(wxid, list):
wxid = ",".join(wxid)
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ChatRoomName": chatroom, "ToWxids": wxid}
response = await session.post(f'http://{self.ip}:{self.port}/api/Group/InviteChatRoomMember',
json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def get_chatroom_nickname(self, wxid: Union[str, list[str]], chatroom: str) -> Union[str, list[str]]:
"""获取用户昵称
Args:
wxid: 用户wxid可以是单个wxid或最多20个wxid的列表
chatroom: 群聊id
Returns:
Union[str, list[str]]: 如果输入单个wxid返回str如果输入wxid列表则返回对应的昵称列表
"""
data = await self.get_chatroom_member_list(chatroom)
if isinstance(wxid, str):
# 单个wxid的情况
for member in data:
if member.get("UserName") == wxid:
# 优先返回DisplayName如果不存在则返回NickName
return member.get("DisplayName") or member.get("NickName") or wxid
return "" # 如果没找到对应的成员,返回空字符串
else:
# wxid列表的情况
result = []
for single_wxid in wxid:
found = False
for member in data:
if member.get("UserName") == single_wxid:
# 优先返回DisplayName如果不存在则返回NickName
result.append(member.get("DisplayName") or member.get("NickName") or wxid)
found = True
break
if not found:
result.append(wxid) # 如果没找到对应的成员,添加空字符串
return result
async def get_chatroom_member_detail(self, wxid: str, chatroom: str) -> dict:
"""获取用户昵称
Args:
wxid: 用户wxid可以是单个wxid或最多20个wxid的列表
chatroom: 群聊id
Returns:
Union[str, list[str]]: 如果输入单个wxid返回str如果输入wxid列表则返回对应的昵称列表
"""
data = await self.get_chatroom_member_list(chatroom)
for member in data:
if member.get("UserName") == wxid:
# 优先返回DisplayName如果不存在则返回NickName
return member
return {} # 如果没找到对应的成员,返回空字符串

View File

@@ -0,0 +1,273 @@
import hashlib
import string
from random import choice
from typing import Union
import aiohttp
import qrcode
from loguru import logger
from wechat_ipad.providers.legacy_855.base import *
class LoginMixin(WechatAPIClientBase):
async def get_qr_code(self, device_name: str, device_id: str = "", proxy: Proxy = None, print_qr: bool = False) -> (
str, str):
"""获取登录二维码。
Args:
device_name (str): 设备名称
device_id (str, optional): 设备ID. Defaults to "".
proxy (Proxy, optional): 代理信息. Defaults to None.
print_qr (bool, optional): 是否在控制台打印二维码. Defaults to False.
Returns:
tuple[str, str]: 返回登录二维码的UUID和URL
Raises:
根据error_handler处理错误
"""
async with aiohttp.ClientSession() as session:
json_param = {'DeviceName': device_name, 'DeviceID': device_id}
if proxy:
json_param['ProxyInfo'] = {'ProxyIp': f'{proxy.ip}:{proxy.port}',
'ProxyPassword': proxy.password,
'ProxyUser': proxy.username}
response = await session.post(f'http://{self.ip}:{self.port}/api/Login/GetQR', json=json_param)
json_resp = await response.json()
logger.debug("get_qr_code:{}", json_resp)
if json_resp.get("Success"):
if print_qr:
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(f'http://weixin.qq.com/x/{json_resp.get("Data").get("Uuid")}')
qr.make(fit=True)
qr.print_ascii()
return json_resp.get("Data").get("Uuid"), json_resp.get("Data").get("QrUrl")
else:
self.error_handler(json_resp)
async def check_login_uuid(self, uuid: str, device_id: str = "") -> tuple[bool, Union[dict, int]]:
"""检查登录的UUID状态。
Args:
uuid (str): 登录的UUID
device_id (str, optional): 设备ID. Defaults to "".
Returns:
tuple[bool, Union[dict, int]]: 如果登录成功返回(True, 用户信息),否则返回(False, 过期时间)
Raises:
根据error_handler处理错误
"""
async with aiohttp.ClientSession() as session:
response = await session.post(f'http://{self.ip}:{self.port}/api/Login/CheckQR?uuid={uuid}')
json_resp = await response.json()
logger.debug("check_login_uuid:{}", json_resp)
if json_resp.get("Success"):
if json_resp.get("Message") == "登陆成功":
self.wxid = json_resp.get("Data").get("acctSectResp").get("userName")
self.nickname = json_resp.get("Data").get("acctSectResp").get("nickName")
return True, json_resp.get("Data")
else:
return False, json_resp.get("Data").get("expiredTime")
else:
self.error_handler(json_resp)
async def log_out(self) -> bool:
"""登出当前账号。
Returns:
bool: 登出成功返回True否则返回False
Raises:
UserLoggedOut: 如果未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
response = await session.post(f'http://{self.ip}:{self.port}/api/Login/LogOut?wxid={self.wxid}')
json_resp = await response.json()
if json_resp.get("Success"):
return True
elif json_resp.get("Success"):
return False
else:
self.error_handler(json_resp)
async def awaken_login(self, wxid: str = "") -> str:
"""唤醒登录。
Args:
wxid (str, optional): 要唤醒的微信ID. Defaults to "".
Returns:
str: 返回新的登录UUID
Raises:
Exception: 如果未提供wxid且未登录
LoginError: 如果无法获取UUID
根据error_handler处理错误
"""
if not wxid and not self.wxid:
raise Exception("Please login using QRCode first")
if not wxid and self.wxid:
wxid = self.wxid
async with aiohttp.ClientSession() as session:
response = await session.post(f'http://{self.ip}:{self.port}/api/Login/Awaken?wxid={wxid}')
json_resp = await response.json()
if json_resp.get("Success") and json_resp.get("Data").get("QrCodeResponse").get("Uuid"):
return json_resp.get("Data").get("QrCodeResponse").get("Uuid")
elif not json_resp.get("Data").get("QrCodeResponse").get("Uuid"):
raise LoginError("Please login using QRCode first")
else:
self.error_handler(json_resp)
async def get_cached_info(self, wxid: str = None) -> dict:
"""获取登录缓存信息。
Args:
wxid (str, optional): 要查询的微信ID. Defaults to None.
Returns:
dict: 返回缓存信息如果未提供wxid且未登录返回空字典
"""
if not wxid:
wxid = self.wxid
if not wxid:
return {}
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": wxid}
response = await session.post(f'http://{self.ip}:{self.port}/api/Login/GetCacheInfo', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data")
else:
return {}
async def heartbeat(self) -> bool:
"""发送心跳包。
Returns:
bool: 成功返回True否则返回False
Raises:
UserLoggedOut: 如果未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
# logger.debug(f'heartbeat: http://{self.ip}:{self.port}/api/Login/HeartBeat?wxid={self.wxid}')
response = await session.post(f'http://{self.ip}:{self.port}/api/Login/HeartBeat?wxid={self.wxid}')
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def heartbeat_long(self) -> bool:
"""发送心跳包。
Returns:
bool: 成功返回True否则返回False
Raises:
UserLoggedOut: 如果未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
# logger.debug(f'heartbeat long: http://{self.ip}:{self.port}/api/Login/HeartBeatLong?wxid={self.wxid}')
response = await session.post(f'http://{self.ip}:{self.port}/api/Login/HeartBeatLong?wxid={self.wxid}')
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def twice_auto_auth(self) -> bool:
"""二次登录。
Returns:
bool: 成功返回True否则返回False
Raises:
UserLoggedOut: 如果未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
response = await session.post(f'http://{self.ip}:{self.port}/api/Login/TwiceAutoAuth?wxid={self.wxid}')
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
logger.error("Twice Auto Auth Failed")
return False
@staticmethod
def create_device_name() -> str:
"""生成一个随机的设备名。
Returns:
str: 返回生成的设备名
"""
first_names = [
"Oliver", "Emma", "Liam", "Ava", "Noah", "Sophia", "Elijah", "Isabella",
"James", "Mia", "William", "Amelia", "Benjamin", "Harper", "Lucas", "Evelyn",
"Henry", "Abigail", "Alexander", "Ella", "Jackson", "Scarlett", "Sebastian",
"Grace", "Aiden", "Chloe", "Matthew", "Zoey", "Samuel", "Lily", "David",
"Aria", "Joseph", "Riley", "Carter", "Nora", "Owen", "Luna", "Daniel",
"Sofia", "Gabriel", "Ellie", "Matthew", "Avery", "Isaac", "Mila", "Leo",
"Julian", "Layla"
]
last_names = [
"Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis",
"Rodriguez", "Martinez", "Hernandez", "Lopez", "Gonzalez", "Wilson", "Anderson",
"Thomas", "Taylor", "Moore", "Jackson", "Martin", "Lee", "Perez", "Thompson",
"White", "Harris", "Sanchez", "Clark", "Ramirez", "Lewis", "Robinson", "Walker",
"Young", "Allen", "King", "Wright", "Scott", "Torres", "Nguyen", "Hill",
"Flores", "Green", "Adams", "Nelson", "Baker", "Hall", "Rivera", "Campbell",
"Mitchell", "Carter", "Roberts", "Gomez", "Phillips", "Evans"
]
return choice(first_names) + " " + choice(last_names) + "'s Pad"
@staticmethod
def create_device_id(s: str = "") -> str:
"""生成设备ID。
Args:
s (str, optional): 用于生成ID的字符串. Defaults to "".
Returns:
str: 返回生成的设备ID
"""
if s == "" or s == "string":
s = ''.join(choice(string.ascii_letters) for _ in range(15))
md5_hash = hashlib.md5(s.encode()).hexdigest()
return "49" + md5_hash[2:]

View File

@@ -0,0 +1,750 @@
import asyncio
import base64
import os
import time
from asyncio import Future
from asyncio import Queue, sleep
from io import BytesIO
from pathlib import Path
from typing import Union
import aiohttp
import aiofiles
import pysilk
from loguru import logger
from pydub import AudioSegment
from pymediainfo import MediaInfo
from utils.video_utils import get_first_frame, get_first_frame_bytes
from utils.trace_context import format_trace_prefix
from wechat_ipad import UserLoggedOut
from wechat_ipad.providers.legacy_855.base import WechatAPIClientBase
class MessageMixin(WechatAPIClientBase):
def __init__(self):
# 初始化消息队列:
# 1. 这里不再重复要求外层传 ip/port统一复用基类已经初始化好的连接信息
# 2. 保留现有串行发送队列语义,避免第一阶段替换后消息发送节奏发生变化;
# 3. 后续若要按 provider 粒度调优发送节流,可继续在本目录内演进,不影响其他 provider。
super().__init__()
self._message_queue = Queue()
self._is_processing = False
self.logging = logger
async def _process_message_queue(self):
"""
处理消息队列的异步方法
"""
if self._is_processing:
return
self._is_processing = True
while True:
if self._message_queue.empty():
self._is_processing = False
break
func, args, kwargs, future = await self._message_queue.get()
try:
result = await func(*args, **kwargs)
future.set_result(result)
except Exception as e:
future.set_exception(e)
finally:
self._message_queue.task_done()
await sleep(1) # 消息发送间隔1秒
async def _queue_message(self, func, *args, **kwargs):
"""
将消息添加到队列
"""
future = Future()
await self._message_queue.put((func, args, kwargs, future))
if not self._is_processing:
asyncio.create_task(self._process_message_queue())
return await future
async def revoke_message(self, wxid: str, client_msg_id: int, create_time: int, new_msg_id: int) -> bool:
"""撤回消息。
{
"ClientMsgId": 0,
"CreateTime": 0,
"NewMsgId": 0,
"ToUserName": "string",
"Wxid": "string"
}
Args:
wxid (str): 接收人wxid
client_msg_id (int): 发送消息的返回值
create_time (int): 发送消息的返回值
new_msg_id (int): 发送消息的返回值
Returns:
bool: 成功返回True失败返回False
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToUserName": wxid, "ClientMsgId": client_msg_id,
"CreateTime": create_time,
"NewMsgId": new_msg_id}
response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/Revoke', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
self.logging.info("消息撤回成功: 对方wxid:{} ClientMsgId:{} CreateTime:{} NewMsgId:{}",
wxid,
client_msg_id,
create_time,
new_msg_id) # 确保四个参数都正确传入
return True
else:
self.error_handler(json_resp)
async def send_text_message(self, wxid: str, content: str, at: Union[list, str] = "") -> tuple[int, int, int]:
"""发送文本消息。
Args:
wxid (str): 接收人wxid
content (str): 消息内容
at (list, str, optional): 要@的用户
Returns:
tuple[int, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_text_message, wxid, content, at)
async def _send_text_message(self, wxid: str, content: str, at: list[str] = None) -> tuple[int, int, int]:
"""
实际发送文本消息的方法
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
if isinstance(at, str):
at_str = at
elif isinstance(at, list):
if at is None:
at = []
at_str = ",".join(at)
else:
raise ValueError("Argument 'at' should be str or list")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": content, "Type": 1, "At": at_str}
response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendTxt', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
# 发送动作也带上 trace_id便于把“某条入站消息最终发了什么”直接串起来。
self.logging.info("{}发送文字消息: 对方wxid:{} at:{} 内容:{}",
format_trace_prefix(), wxid, at, content)
data = json_resp.get("Data")
return data.get("List")[0].get("ClientMsgId"), data.get("List")[0].get("CreateTime"), data.get("List")[
0].get("NewMsgId")
else:
self.error_handler(json_resp)
async def send_image_message(self, wxid: str, image: Union[str, bytes, os.PathLike]) -> tuple[int, int, int]:
"""发送图片消息。
Args:
wxid (str): 接收人wxid
image (str, byte, os.PathLike): 图片支持base64字符串图片byte图片路径
Returns:
tuple[int, int, int]: 返回(ClientImgId, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
ValueError: image_path和image_base64都为空或都不为空时
根据error_handler处理错误
"""
return await self._queue_message(self._send_image_message, wxid, image)
async def _send_image_message(self, wxid: str, image: Union[str, bytes, os.PathLike]) -> tuple[
int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
if isinstance(image, str):
pass
elif isinstance(image, bytes):
image = base64.b64encode(image).decode()
elif isinstance(image, os.PathLike):
async with aiofiles.open(image, 'rb') as f:
image = base64.b64encode(await f.read()).decode()
else:
raise ValueError("Argument 'image' can only be str, bytes, or os.PathLike")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Base64": image}
response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/UploadImg', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
json_param.pop('Base64')
# 图片日志不打印 base64 内容,但保留 trace_id便于关联具体发送动作。
self.logging.info("{}发送图片消息: 对方wxid:{} 图片base64略",
format_trace_prefix(), wxid)
data = json_resp.get("Data")
self.logging.debug("发送图片消息成功,返回:{}", data)
return data.get("ClientImgId").get("string"), data.get("CreateTime"), data.get("NewMsgId")
else:
self.error_handler(json_resp)
async def send_video_message(self, wxid: str, video: Union[str, bytes, os.PathLike],
image: [str, bytes, os.PathLike] = None):
"""发送视频消息。不推荐使用上传速度很慢300KB/s。如要使用可压缩视频或者发送链接卡片而不是视频。
Args:
wxid (str): 接收人wxid
video (str, bytes, os.PathLike): 视频 接受base64字符串字节文件路径
image (str, bytes, os.PathLike): 视频封面图片 接受base64字符串字节文件路径
Returns:
tuple[int, int]: 返回(ClientMsgid, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
ValueError: 视频或图片参数都为空或都不为空时
根据error_handler处理错误
"""
has_image = False
if not image:
image = Path(os.path.join(Path(__file__).resolve().parent, "fallback.png"))
else:
has_image = True
# get video base64 and duration
if isinstance(video, str):
vid_base64 = video
video = base64.b64decode(video)
file_len = len(video)
media_info = MediaInfo.parse(BytesIO(video))
elif isinstance(video, bytes):
vid_base64 = base64.b64encode(video).decode()
file_len = len(video)
media_info = MediaInfo.parse(BytesIO(video))
# 如果没有传入首帧,则自己提取一次
if not has_image:
first_frame = get_first_frame_bytes(video, f"frame_{int(time.time())}.jpg")
if first_frame:
image = Path(first_frame)
elif isinstance(video, os.PathLike):
video_path = Path(video)
if not video_path.exists():
raise ValueError(f"Video file does not exist: {video_path}")
async with aiofiles.open(video_path, "rb") as f:
video_bytes = await f.read()
file_len = len(video_bytes)
vid_base64 = base64.b64encode(video_bytes).decode()
media_info = MediaInfo.parse(video_path)
# 如果没有传入首帧,则自己提取一次
if not has_image:
first_frame = get_first_frame(video_path, f"frame_{int(time.time())}.jpg")
if first_frame:
image = Path(first_frame)
else:
raise ValueError("video should be str, bytes, or path")
# 获取视频时长
duration = None
for track in media_info.tracks:
if track.track_type == "Video" and track.duration is not None:
duration = int(track.duration / 1000) # 将毫秒转换为秒
break
if duration is None:
duration = 1
self.logging.error(f"无法从视频文件获取时长: {video}")
# get image base64
if isinstance(image, str):
image_base64 = image
elif isinstance(image, bytes):
image_base64 = base64.b64encode(image).decode()
elif isinstance(image, os.PathLike):
async with aiofiles.open(image, "rb") as f:
image_base64 = base64.b64encode(await f.read()).decode()
else:
raise ValueError("image should be str, bytes, or path")
# self.logging.debug(f"vid_base64:{vid_base64}")
# self.logging.debug(f"images_base64:{image_base64}")
# 打印预估时间300KB/s
predict_time = int(file_len / 1024 / 300)
self.logging.debug("开始发送视频: 对方wxid:{} 视频base64略 图片base64略 预计耗时:{}", wxid, predict_time)
# self.logging.debug(f"image{image};image_base64:{image_base64}")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Base64": "data:video/mp4;base64," + vid_base64,
"ImageBase64": "data:image/jpeg;base64," + image_base64,
"PlayLength": duration}
# self.logging.debug(f"json_param:{json_param}")
async with session.post(f'http://{self.ip}:{self.port}/api/Msg/SendVideo', json=json_param) as resp:
json_resp = await resp.json()
# self.logging.debug(f"json_resp:{json_resp}")
if json_resp.get("Success"):
json_param.pop('Base64')
json_param.pop('ImageBase64')
self.logging.info("发送视频成功: 对方wxid:{} 时长:{} 视频base64略 图片base64略", wxid, duration)
data = json_resp.get("Data")
return data.get("clientMsgId"), data.get("newMsgId")
else:
self.error_handler(json_resp)
async def send_voice_message(self, wxid: str, voice: Union[str, bytes, os.PathLike], format: str = "amr") -> \
tuple[int, int, int]:
"""发送语音消息。
Args:
wxid (str): 接收人wxid
voice (str, bytes, os.PathLike): 语音 接受base64字符串字节文件路径
format (str, optional): 语音格式支持amr/wav/mp3. Defaults to "amr".
Returns:
tuple[int, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
ValueError: voice_path和voice_base64都为空或都不为空时或format不支持时
根据error_handler处理错误
"""
return await self._queue_message(self._send_voice_message, wxid, voice, format)
async def _send_voice_message(self, wxid: str, voice: Union[str, bytes, os.PathLike], format: str = "mar") -> \
tuple[int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
elif format not in ["amr", "wav", "mp3", "silk", "speex"]:
raise ValueError("format must be one of amr, wav, mp3")
# read voice to byte
if isinstance(voice, str):
voice_byte = base64.b64decode(voice)
elif isinstance(voice, bytes):
voice_byte = voice
elif isinstance(voice, os.PathLike):
async with aiofiles.open(voice, "rb") as f:
voice_byte = await f.read()
else:
raise ValueError("voice should be str, bytes, or path")
voice_type = 0
# get voice duration and b64
if format.lower() == "amr":
audio = AudioSegment.from_file(BytesIO(voice_byte), format="amr")
voice_base64 = base64.b64encode(voice_byte).decode()
elif format.lower() == "wav":
audio = AudioSegment.from_file(BytesIO(voice_byte), format="wav").set_channels(1)
self.logging.debug(f"1audio.frame_rate: {audio.frame_rate}")
audio = audio.set_frame_rate(self._get_closest_frame_rate(audio.frame_rate))
self.logging.debug(f"2audio.frame_rate: {audio.frame_rate}")
audio = audio.set_channels(1).set_sample_width(2) # 16-bit PCM
logger.info(
f"音频处理: 格式={format}, 采样率={audio.frame_rate}, 声道数={audio.channels}, 时长={len(audio) / 1000}s")
voice_base64 = base64.b64encode(
await pysilk.async_encode(audio.raw_data, sample_rate=audio.frame_rate)).decode()
voice_type = 4
elif format.lower() == "mp3":
audio = AudioSegment.from_file(BytesIO(voice_byte), format="mp3").set_channels(1)
audio = audio.set_frame_rate(self._get_closest_frame_rate(audio.frame_rate))
voice_base64 = base64.b64encode(
await pysilk.async_encode(audio.raw_data, sample_rate=audio.frame_rate)).decode()
voice_type = 4
else:
raise ValueError("format must be one of amr, wav, mp3")
duration = len(audio)
# Type AMR = 0, MP3 = 2, SILK = 4, SPEEX = 1, WAVE = 3 VoiceTime :音频长度 1000为一秒
format_dict = {"amr": 0, "wav": 3, "mp3": 2, "silk": 4, "speex": 1}
# {
# "Base64": "string",
# "ToWxid": "string",
# "Type": 0,
# "VoiceTime": 0,
# "Wxid": "string"
# }
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Base64": voice_base64, "VoiceTime": duration,
"Type": voice_type}
response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendVoice', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
json_param.pop('Base64')
self.logging.info("发送语音消息: 对方wxid:{} 时长:{} 格式:{} 音频base64略", wxid, duration, format)
data = json_resp.get("Data")
return int(data.get("ClientMsgId")), data.get("CreateTime"), data.get("NewMsgId")
else:
self.error_handler(json_resp)
@staticmethod
def _get_closest_frame_rate(frame_rate: int) -> int:
supported = [8000, 12000, 16000, 24000]
closest_rate = None
smallest_diff = float('inf')
for num in supported:
diff = abs(frame_rate - num)
if diff < smallest_diff:
smallest_diff = diff
closest_rate = num
return closest_rate
async def send_link_xml_message(self, xml: str, towxid: str) -> tuple[str, int, int]:
"""发送链接消息。
{
"ToWxid": "string",
"Type": 0,
"Wxid": "string",
"Xml": "string"
}
Args:
xml (str): 发送的内容
towxid (str):接收人
Returns:
tuple[str, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_link_xml_message, xml, towxid)
async def send_link_message(self, wxid: str, url: str, title: str = "", description: str = "",
thumb_url: str = "") -> tuple[str, int, int]:
"""发送链接消息。
{
"ToWxid": "string",
"Type": 0,
"Wxid": "string",
"Xml": "string"
}
Args:
wxid (str): 接收人wxid
url (str): 跳转链接
title (str, optional): 标题. Defaults to "".
description (str, optional): 描述. Defaults to "".
thumb_url (str, optional): 缩略图链接. Defaults to "".
Returns:
tuple[str, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_link_message, wxid, url, title, description, thumb_url)
async def _send_link_xml_message(self, xml: str, towxid: str) -> tuple[int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": towxid, "Xml": xml, "Type": 0}
logger.debug(f"_send_link_xml_message{xml}")
response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/ShareLink', json=json_param)
json_resp = await response.json()
logger.info(f"_send_link_xml_message resp:{json_resp}")
if json_resp.get("Success"):
data = json_resp.get("Data")
return data.get("clientMsgId"), data.get("createTime"), data.get("newMsgId")
else:
self.error_handler(json_resp)
async def _send_link_message(self, wxid: str, url: str, title: str = "", description: str = "",
thumb_url: str = "") -> tuple[int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Url": url, "Title": title, "Desc": description,
"ThumbUrl": thumb_url}
response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/ShareLink', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
self.logging.info("发送链接消息: 对方wxid:{} 链接:{} 标题:{} 描述:{} 缩略图链接:{}",
wxid,
url,
title,
description,
thumb_url)
data = json_resp.get("Data")
return data.get("clientMsgId"), data.get("createTime"), data.get("newMsgId")
else:
self.error_handler(json_resp)
async def send_emoji_message(self, wxid: str, md5: str, total_length: int) -> list[dict]:
"""发送表情消息。
Args:
wxid (str): 接收人wxid
md5 (str): 表情md5值
total_length (int): 表情总长度
Returns:
list[dict]: 返回表情项列表(list of emojiItem)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_emoji_message, wxid, md5, total_length)
async def _send_emoji_message(self, wxid: str, md5: str, total_length: int) -> tuple[int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
# 表情发送接口历史上最容易出现“接口长时间不返回,导致整个消息队列被拖住”的问题,
# 因此这里单独加总超时和更细的日志,方便区分“参数错误”和“接口无响应”两类故障。
timeout = aiohttp.ClientTimeout(total=20)
async with aiohttp.ClientSession(timeout=timeout) as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Md5": md5, "TotalLen": total_length}
try:
self.logging.info("开始发送表情消息: 对方wxid:{} md5:{} 总长度:{}", wxid, md5, total_length)
response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendEmoji', json=json_param)
json_resp = await response.json(content_type=None)
except asyncio.TimeoutError as exc:
self.logging.error("发送表情消息超时: 对方wxid:{} md5:{} 总长度:{}", wxid, md5, total_length)
raise TimeoutError("SendEmoji 接口调用超时") from exc
if json_resp.get("Success"):
data = json_resp.get("Data") or {}
self.logging.info("发送表情消息成功: 对方wxid:{} md5:{} 总长度:{}", wxid, md5, total_length)
return data.get("emojiItem") or data.get("EmojiItem") or data
else:
self.logging.error("发送表情消息失败: 对方wxid:{} md5:{} 总长度:{} resp:{}",
wxid, md5, total_length, json_resp)
self.error_handler(json_resp)
async def send_card_message(self, wxid: str, card_wxid: str, card_nickname: str, card_alias: str = "") -> tuple[
int, int, int]:
"""发送名片消息。
Args:
wxid (str): 接收人wxid
card_wxid (str): 名片用户的wxid
card_nickname (str): 名片用户的昵称
card_alias (str, optional): 名片用户的备注. Defaults to "".
Returns:
tuple[int, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_card_message, wxid, card_wxid, card_nickname, card_alias)
async def _send_card_message(self, wxid: str, card_wxid: str, card_nickname: str, card_alias: str = "") -> tuple[
int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with (aiohttp.ClientSession() as session):
json_param = {
"CardAlias": card_alias,
"CardNickName": card_nickname,
"CardWxId": card_wxid,
"ToWxid": wxid,
"Wxid": self.wxid
}
response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/ShareCard', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
self.logging.info("发送名片消息: 对方wxid:{} 名片wxid:{} 名片备注:{} 名片昵称:{}", wxid,
card_wxid,
card_alias,
card_nickname)
data = json_resp.get("Data")
return data.get("List")[0].get("ClientMsgid"), data.get("List")[0].get("Createtime"), data.get("List")[
0].get("NewMsgId")
else:
self.error_handler(json_resp)
async def send_app_message(self, wxid: str, xml: str, type: int) -> tuple[str, int, int]:
"""发送应用消息。
Args:
wxid (str): 接收人wxid
xml (str): 应用消息的xml内容
type (int): 应用消息类型
Returns:
tuple[str, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_app_message, wxid, xml, type)
async def _send_app_message(self, wxid: str, xml: str, type: int = 0) -> tuple[int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
# {
# "ToWxid": "string",
# "Type": 0,
# "Wxid": "string",
# "Xml": "string"
# }
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Xml": xml, "Type": type}
response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendApp', json=json_param)
json_resp = await response.json()
logger.info(f"json_resp: {json_resp}")
if json_resp.get("Success"):
json_param["Xml"] = json_param["Xml"].replace("\n", "")
self.logging.info("发送app消息: 对方wxid:{} 类型:{} xml:{}", wxid, type, json_param["Xml"])
return json_resp.get("Data").get("clientMsgId"), json_resp.get("Data").get(
"createTime"), json_resp.get("Data").get("newMsgId")
else:
self.error_handler(json_resp)
async def send_cdn_file_msg(self, wxid: str, xml: str) -> tuple[str, int, int]:
"""转发文件消息。
Args:
wxid (str): 接收人wxid
xml (str): 要转发的文件消息xml内容
Returns:
tuple[str, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_cdn_file_msg, wxid, xml)
async def _send_cdn_file_msg(self, wxid: str, xml: str) -> tuple[int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": xml}
response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendCDNFile', json=json_param)
json_resp = await response.json()
self.logging.debug("json_resp: %s", json_resp)
if json_resp.get("Success"):
self.logging.info("转发文件消息: 对方wxid:{} xml:{}", wxid, xml)
data = json_resp.get("Data")
return data.get("clientMsgId"), data.get("createTime"), data.get("newMsgId")
else:
self.error_handler(json_resp)
async def send_cdn_img_msg(self, wxid: str, xml: str) -> tuple[str, int, int]:
"""转发图片消息。
Args:
wxid (str): 接收人wxid
xml (str): 要转发的图片消息xml内容
Returns:
tuple[str, int, int]: 返回(ClientImgId, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_cdn_img_msg, wxid, xml)
async def _send_cdn_img_msg(self, wxid: str, xml: str) -> tuple[int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": xml}
response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendCDNImg', json=json_param)
json_resp = await response.json()
self.logging.debug("json_resp: %s", json_resp)
if json_resp.get("Success"):
self.logging.info("转发图片消息: 对方wxid:{} xml:{}", wxid, xml)
data = json_resp.get("Data")
return data.get("ClientImgId").get("string"), data.get("CreateTime"), data.get("Newmsgid")
else:
self.error_handler(json_resp)
async def send_cdn_video_msg(self, wxid: str, xml: str) -> tuple[str, int]:
"""转发视频消息。
Args:
wxid (str): 接收人wxid
xml (str): 要转发的视频消息xml内容
Returns:
tuple[str, int]: 返回(ClientMsgid, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_cdn_video_msg, wxid, xml)
async def _send_cdn_video_msg(self, wxid: str, xml: str) -> tuple[int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": xml}
response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendCDNVideo', json=json_param)
json_resp = await response.json()
self.logging.debug("json_resp: %s", json_resp)
if json_resp.get("Success"):
self.logging.info("转发视频消息: 对方wxid:{} xml:{}", wxid, xml)
data = json_resp.get("Data")
return data.get("clientMsgId"), data.get("newMsgId")
else:
self.error_handler(json_resp)
async def sync_message(self) -> dict:
"""同步消息。
Returns:
dict: 返回同步到的消息数据
Raises:
UserLoggedOut: 未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session:
json_param = {"Wxid": self.wxid, "Scene": 0, "Synckey": ""}
response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/Sync', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data")
else:
self.error_handler(json_resp)

View File

@@ -0,0 +1,50 @@
from wechat_ipad import UserLoggedOut
from wechat_ipad.provider_base import WechatProviderBase
from wechat_ipad.providers.legacy_855.friend_circle import FriendCircleMixin
from wechat_ipad.providers.legacy_855.friends import FriendMixin
from wechat_ipad.providers.legacy_855.group import ChatroomMixin
from wechat_ipad.providers.legacy_855.login import LoginMixin
from wechat_ipad.providers.legacy_855.message import MessageMixin
from wechat_ipad.providers.legacy_855.tools import ToolMixin
from wechat_ipad.providers.legacy_855.user import UserMixin
class Legacy855WechatClient(
LoginMixin,
MessageMixin,
FriendCircleMixin,
FriendMixin,
ChatroomMixin,
UserMixin,
ToolMixin,
WechatProviderBase,
):
"""855/859 风格 server 的独立 Provider。
说明:
1. 这里不再直接依赖旧 `wechat_ipad/client/` 目录,而是将当前现网协议实现收口到独立 provider 目录;
2. 对外仍继续暴露与旧 `WechatAPIClient` 基本兼容的方法名,减少第一阶段替换成本;
3. 第二阶段接入 864 时,会新增独立 provider而不是继续向本类堆条件分支。
"""
provider_name = "legacy_855"
server_type = "legacy_855"
async def send_at_message(self, wxid: str, content: str, at: list[str]) -> tuple[int, int, int]:
"""发送 @ 消息,兼容现有插件调用方式。"""
if not self.wxid:
raise UserLoggedOut("请先登录")
output = ""
if wxid.endswith("@chatroom"):
for at_id in at:
nickname = await self.get_chatroom_nickname(at_id, wxid)
output += f"@{nickname}\u2005"
output += "\n"
output += content
else:
output = content
return await self.send_text_message(wxid, output, at)

View File

@@ -0,0 +1,402 @@
import base64
import io
import os
import aiofiles
import aiohttp
import pysilk
from pydub import AudioSegment
from wechat_ipad import UserLoggedOut
from wechat_ipad.providers.legacy_855.base import WechatAPIClientBase, Proxy
class ToolMixin(WechatAPIClientBase):
async def download_image(self, aeskey: str, cdnmidimgurl: str) -> str:
"""CDN下载高清图片。
{
"Wxid": "string",
"FileNo": "string",
"FileAesKey": "string"
}
Args:
aeskey (str): 图片的AES密钥
cdnmidimgurl (str): 图片的CDN URL
Returns:
str: 图片的base64编码字符串
Raises:
UserLoggedOut: 未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "FileAesKey": aeskey, "FileNo": cdnmidimgurl}
response = await session.post(f'http://{self.ip}:{self.port}/api/Tools/CdnDownloadImage', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("Image")
else:
self.error_handler(json_resp)
async def download_voice(self, msg_id: str, voiceurl: str, length: int) -> str:
"""下载语音文件。
Args:
msg_id (str): 消息的msgid
voiceurl (str): 语音的url从xml获取
length (int): 语音长度从xml获取
Returns:
str: 语音的base64编码字符串
Raises:
UserLoggedOut: 未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "MsgId": msg_id, "Voiceurl": voiceurl, "Length": length}
response = await session.post(f'http://{self.ip}:{self.port}/DownloadVoice', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("data").get("buffer")
else:
self.error_handler(json_resp)
async def download_attach_xml(self, xml_str: str) -> str:
# 读取消息信息,进行处理
import xml.etree.ElementTree as ET
root = ET.fromstring(xml_str)
appmsg = root.find("appmsg")
appid = appmsg.attrib.get("appid", "")
appattach = appmsg.find("appattach")
attach_id = appattach.findtext("attachid", "")
datalen = int(appattach.findtext("totallen", "0"))
username = root.findtext("fromusername", "")
return self.download_attach(attach_id, datalen, username, appid)
async def download_attach(self, attach_id: str, datalen: int, username: str, appid: str) -> str:
"""下载附件。
{
"AppID": "wx6618f1cfc6c132f8",
"AttachId": "@cdn_3057020100044b304902010002042d0c366c02032df7950204d35d06af0204681af942042438363966373134342d663961352d343065612d623038662d3062643730663335343731370204052400050201000405004c54a100_c57ad24ba4e9ceeb3c5e10e33361028d_1",
"DataLen": 1160,
"Section": {
"DataLen": 1160,
"StartPos": 0
},
"UserName": "Jyunere",
"Wxid": "wxid_ts8v7yk4g5c522"
}
Args:
attach_id (str): 附件ID
Returns:
dict: 附件数据
Raises:
UserLoggedOut: 未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "AttachId": attach_id, "DataLen": datalen,
"Section": {"DataLen": datalen, "StartPos": 0}, "UserName": username, "AppID": appid}
response = await session.post(f'http://{self.ip}:{self.port}/api/Tools/DownloadFile', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("data").get("buffer")
else:
self.error_handler(json_resp)
async def download_video(self, msg_id) -> str:
"""下载视频。
Args:
msg_id (str): 消息的msg_id
Returns:
str: 视频的base64编码字符串
Raises:
UserLoggedOut: 未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "MsgId": msg_id}
response = await session.post(f'http://{self.ip}:{self.port}/DownloadVideo', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("data").get("buffer")
else:
self.error_handler(json_resp)
async def friend_circle_upload(self, base64: str) -> str:
# / FriendCircle / Upload
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Base64": base64}
response = await session.post(f'http://{self.ip}:{self.port}/api/FriendCircle/Upload', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("ThumbUrls")[0].get("Url")
else:
self.error_handler(json_resp)
async def set_step(self, count: int) -> bool:
"""设置步数。
Args:
count (int): 要设置的步数
Returns:
bool: 成功返回True失败返回False
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 风控保护: 新设备登录后4小时内请挂机
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "StepCount": count}
response = await session.post(f'http://{self.ip}:{self.port}/SetStep', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def set_proxy(self, proxy: Proxy) -> bool:
"""设置代理。
Args:
proxy (Proxy): 代理配置对象
Returns:
bool: 成功返回True失败返回False
Raises:
UserLoggedOut: 未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid,
"Proxy": {"ProxyIp": f"{proxy.ip}:{proxy.port}",
"ProxyUser": proxy.username,
"ProxyPassword": proxy.password}}
response = await session.post(f'http://{self.ip}:{self.port}/SetProxy', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def check_database(self) -> bool:
"""检查数据库状态。
Returns:
bool: 数据库正常返回True否则返回False
"""
async with aiohttp.ClientSession() as session:
response = await session.get(f'http://{self.ip}:{self.port}/CheckDatabaseOK')
json_resp = await response.json()
if json_resp.get("Running"):
return True
else:
return False
@staticmethod
async def base64_to_file(base64_str: str, file_name: str, file_path: str) -> bool:
"""将base64字符串转换为文件并保存。
Args:
base64_str (str): base64编码的字符串
file_name (str): 要保存的文件名
file_path (str): 文件保存路径
Returns:
bool: 转换成功返回True失败返回False
"""
try:
os.makedirs(file_path, exist_ok=True)
# 拼接完整的文件路径
full_path = os.path.join(file_path, file_name)
# 移除可能存在的 base64 头部信息
if ',' in base64_str:
base64_str = base64_str.split(',')[1]
# 解码 base64 并写入文件
async with aiofiles.open(full_path, 'wb') as f:
await f.write(base64.b64decode(base64_str))
return True
except Exception as e:
return False
@staticmethod
async def file_to_base64(file_path: str) -> str:
"""将文件转换为base64字符串。
Args:
file_path (str): 文件路径
Returns:
str: base64编码的字符串
"""
async with aiofiles.open(file_path, 'rb') as f:
return base64.b64encode(await f.read()).decode()
@staticmethod
def base64_to_byte(base64_str: str) -> bytes:
"""将base64字符串转换为bytes。
Args:
base64_str (str): base64编码的字符串
Returns:
bytes: 解码后的字节数据
"""
# 移除可能存在的 base64 头部信息
if ',' in base64_str:
base64_str = base64_str.split(',')[1]
return base64.b64decode(base64_str)
@staticmethod
def byte_to_base64(byte: bytes) -> str:
"""将bytes转换为base64字符串。
Args:
byte (bytes): 字节数据
Returns:
str: base64编码的字符串
"""
return base64.b64encode(byte).decode("utf-8")
@staticmethod
async def silk_byte_to_byte_wav_byte(silk_byte: bytes) -> bytes:
"""将silk字节转换为wav字节。
Args:
silk_byte (bytes): silk格式的字节数据
Returns:
bytes: wav格式的字节数据
"""
return await pysilk.async_decode(silk_byte, to_wav=True)
@staticmethod
def wav_byte_to_amr_byte(wav_byte: bytes) -> bytes:
"""将WAV字节数据转换为AMR格式。
Args:
wav_byte (bytes): WAV格式的字节数据
Returns:
bytes: AMR格式的字节数据
Raises:
Exception: 转换失败时抛出异常
"""
try:
# 从字节数据创建 AudioSegment 对象
audio = AudioSegment.from_wav(io.BytesIO(wav_byte))
# 设置 AMR 编码的标准参数
audio = audio.set_frame_rate(8000).set_channels(1)
# 创建一个字节缓冲区来存储 AMR 数据
output = io.BytesIO()
# 导出为 AMR 格式
audio.export(output, format="amr")
# 获取字节数据
return output.getvalue()
except Exception as e:
raise Exception(f"转换WAV到AMR失败: {str(e)}")
@staticmethod
def wav_byte_to_amr_base64(wav_byte: bytes) -> str:
"""将WAV字节数据转换为AMR格式的base64字符串。
Args:
wav_byte (bytes): WAV格式的字节数据
Returns:
str: AMR格式的base64编码字符串
"""
return base64.b64encode(ToolMixin.wav_byte_to_amr_byte(wav_byte)).decode()
@staticmethod
async def wav_byte_to_silk_byte(wav_byte: bytes) -> bytes:
"""将WAV字节数据转换为silk格式。
Args:
wav_byte (bytes): WAV格式的字节数据
Returns:
bytes: silk格式的字节数据
"""
# get pcm data
audio = AudioSegment.from_wav(io.BytesIO(wav_byte))
pcm = audio.raw_data
return await pysilk.async_encode(pcm, data_rate=audio.frame_rate, sample_rate=audio.frame_rate)
@staticmethod
async def wav_byte_to_silk_base64(wav_byte: bytes) -> str:
"""将WAV字节数据转换为silk格式的base64字符串。
Args:
wav_byte (bytes): WAV格式的字节数据
Returns:
str: silk格式的base64编码字符串
"""
return base64.b64encode(await ToolMixin.wav_byte_to_silk_byte(wav_byte)).decode()
@staticmethod
async def silk_base64_to_wav_byte(silk_base64: str) -> bytes:
"""将silk格式的base64字符串转换为WAV字节数据。
Args:
silk_base64 (str): silk格式的base64编码字符串
Returns:
bytes: WAV格式的字节数据
"""
return await ToolMixin.silk_byte_to_byte_wav_byte(base64.b64decode(silk_base64))

View File

@@ -0,0 +1,111 @@
import json
import aiohttp
from wechat_ipad import UserLoggedOut
from wechat_ipad.providers.legacy_855.base import WechatAPIClientBase
from loguru import logger
class UserMixin(WechatAPIClientBase):
async def get_profile(self, wxid: str = None) -> dict:
"""获取用户信息。
Args:
wxid (str, optional): 用户wxid. Defaults to None.
Returns:
dict: 用户信息字典
Raises:
UserLoggedOut: 未登录时调用
根据error_handler处理错误
"""
if not self.wxid and not wxid:
raise UserLoggedOut("请先登录")
if not wxid:
wxid = self.wxid
async with aiohttp.ClientSession() as session:
response = await session.post(f'http://{self.ip}:{self.port}/api/User/GetContractProfile?wxid={wxid}')
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("userInfo")
else:
self.error_handler(json_resp)
async def get_profile_info_ext(self, wxid: str = None) -> dict:
"""获取用户扩展信息。
Args:
wxid (str, optional): 用户wxid. Defaults to None.
Returns:
dict: 用户信息字典
Raises:
UserLoggedOut: 未登录时调用
根据error_handler处理错误
"""
if not self.wxid and not wxid:
raise UserLoggedOut("请先登录")
if not wxid:
wxid = self.wxid
async with aiohttp.ClientSession() as session:
response = await session.post(f'http://{self.ip}:{self.port}/api/User/GetContractProfile?wxid={wxid}')
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("userInfoExt")
else:
self.error_handler(json_resp)
async def get_my_qrcode(self, style: int = 0) -> str:
"""获取个人二维码。
Args:
style (int, optional): 二维码样式. Defaults to 0.
Returns:
str: 图片的base64编码字符串
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 风控保护: 新设备登录后4小时内请挂机
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Style": style}
response = await session.post(f'http://{self.ip}:{self.port}/GetMyQRCode', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("qrcode").get("buffer")
else:
self.error_handler(json_resp)
async def is_logged_in(self, wxid: str = None) -> bool:
"""检查是否登录。
Args:
wxid (str, optional): 用户wxid. Defaults to None.
Returns:
bool: 已登录返回True未登录返回False
"""
if not wxid:
wxid = self.wxid
try:
await self.get_profile(wxid)
return True
except Exception as e:
logger.error("is_logged_in:{}", e)
return False