diff --git a/docs/wechat_ipad多版本Server适配路线图.md b/docs/wechat_ipad多版本Server适配路线图.md index b8b1946..b109efa 100644 --- a/docs/wechat_ipad多版本Server适配路线图.md +++ b/docs/wechat_ipad多版本Server适配路线图.md @@ -24,11 +24,12 @@ - 已将 855 的登录、历史消息拉取、心跳、长心跳、消息轮询、掉线二次登录恢复迁入 `legacy_855` provider - 已将 [robot.py](/d:/learn/abot/robot.py:1) 精简为“注册回调 + 业务处理”,不再直接维护 855 的运行时主循环 - 已补上 `Legacy855WechatClient` 的显式初始化入口,避免 provider 多继承构造链不稳定 +- 已删除历史 `wechat_ipad/client/` 目录,避免后续误回退到旧实现 当前尚未完成的关键项: - 855 provider 仍需完成一轮“当前项目实际依赖接口”的可上线回归验证 -- 855 provider 仍需继续梳理“项目真实使用到的接口覆盖面”,确认是否还有遗漏的旧能力残留在历史目录 +- 855 provider 仍需继续梳理“项目真实使用到的接口覆盖面”,确认是否还有遗漏能力未纳入 provider 目录 - 864 provider 尚未开始接入,当前统一接口仍主要围绕 855 第一阶段目标进行验证 因此,当前状态可以定义为: @@ -46,11 +47,11 @@ - 历史版本曾直接依赖 `wechat_ipad/config.toml`,当前已开始切向 `config.yaml + .env` - `Robot` 的实例化入口虽然已切到 `WechatGateway`,但配置读取与业务初始化仍在主程序中 - 855 的运行时职责已经迁入 provider,但 864 尚未接入验证,统一抽象仍需继续收敛 -- `wechat_ipad/client/*.py` 仍作为历史目录存在,接口路径、请求体、返回结构都面向旧 server 编写 +- 公共 `models/` 仍在服务主链路,而多版本协议实现已正式收口到 `providers/*/` 这导致一个结果: -- 如果继续在历史目录或 `Robot` 主链路里堆版本判断,后续 server 版本变化时改动面仍会再次放大 +- 如果继续在 `Robot` 主链路里堆版本判断,后续 server 版本变化时改动面仍会再次放大 ### 2.2 855 / 864 的核心差异 @@ -106,7 +107,7 @@ 考虑到当前项目已经依赖 855/859 风格能力在真实环境运行,第一阶段不能只做“主链路最小闭环”,而要做到: -- 855 provider 能完整替代当前 `Robot + wechat_ipad/client` 的现网接入方式 +- 855 provider 能完整替代当前 `Robot + 旧接入实现` 的现网接入方式 - 替换后可以直接上线运行 - 业务行为、插件调用、后台功能、消息归档链路不发生明显退化 @@ -167,9 +168,9 @@ Gateway 不负责: 说明: -- 不再要求 855 继续复用现有 `wechat_ipad/client/` +- 不再保留旧 `wechat_ipad/client/` 作为正式实现入口 - 更推荐每个 provider 自带独立目录,内部自行管理登录、消息、联系人、群信息等协议实现 -- 这样可以把不同 server 的协议差异彻底隔离,避免为了兼容新版本继续污染旧 client +- 这样可以把不同 server 的协议差异彻底隔离,避免为了兼容新版本继续污染历史实现 ## 5. 为什么要把“运行模型”抽出来 @@ -379,7 +380,7 @@ wechat_ipad/ - 让 `Robot` 依赖 Gateway - 由 `providers/legacy_855/` 完整承接当前 855/859 接入能力 -- 替换后可以直接上线,不依赖旧 `wechat_ipad/client/` +- 替换后可以直接上线,不依赖历史 `client` 目录 建议步骤: @@ -450,7 +451,7 @@ wechat_ipad/ 1. 不建议一次抽象所有微信功能 2. 不建议为每个 API endpoint 单独建类 3. 不建议让 `Robot` 持续保留 `if server_type == ...` 的运行逻辑分叉 -4. 不建议强行让 864 复用当前 `client/*.py` +4. 不建议让 864 回退复用已经移除的旧 client 结构 5. 不建议现在就引入太多类型层、事件总线或复杂工厂模式 ## 13. 结论 diff --git a/wechat_ipad/client/__init__.py b/wechat_ipad/client/__init__.py deleted file mode 100644 index 2a45e8d..0000000 --- a/wechat_ipad/client/__init__.py +++ /dev/null @@ -1,48 +0,0 @@ -from wechat_ipad import UserLoggedOut -from wechat_ipad.client.friend_circle import FriendCircleMixin -from wechat_ipad.client.firends import FriendMixin -from wechat_ipad.client.group import ChatroomMixin -from wechat_ipad.client.login import LoginMixin -from wechat_ipad.client.message import MessageMixin -from wechat_ipad.client.tools import ToolMixin -from wechat_ipad.client.user import UserMixin - - -class WechatAPIClient(LoginMixin, MessageMixin, FriendCircleMixin, FriendMixin, ChatroomMixin, UserMixin, - ToolMixin): - - # 这里都是需要结合多个功能的方法 - - async def send_at_message(self, wxid: str, content: str, at: list[str]) -> tuple[int, int, int]: - """发送@消息 - - Args: - wxid (str): 接收人 - content (str): 消息内容 - at (list[str]): 要@的用户ID列表 - - Returns: - tuple[int, int, int]: 包含以下三个值的元组: - - ClientMsgid (int): 客户端消息ID - - CreateTime (int): 创建时间 - - NewMsgId (int): 新消息ID - - Raises: - UserLoggedOut: 用户未登录时抛出 - BanProtection: 新设备登录4小时内操作时抛出 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - output = "" - # 如果@不是在群里对话,则直接发送消息 - if wxid.endswith("@chatroom"): - for at_id in at: - nickname = await self.get_chatroom_nickname(at_id, wxid) - output += f"@{nickname}\u2005" - - output += "\n" - output += content - else: - output = content - - return await self.send_text_message(wxid, output, at) diff --git a/wechat_ipad/client/base.py b/wechat_ipad/client/base.py deleted file mode 100644 index c737af9..0000000 --- a/wechat_ipad/client/base.py +++ /dev/null @@ -1,103 +0,0 @@ -from dataclasses import dataclass - -from wechat_ipad.errors import * - - -@dataclass -class Proxy: - """代理(无效果,别用!) - - Args: - ip (str): 代理服务器IP地址 - port (int): 代理服务器端口 - username (str, optional): 代理认证用户名. 默认为空字符串 - password (str, optional): 代理认证密码. 默认为空字符串 - """ - ip: str - port: int - username: str = "" - password: str = "" - - -@dataclass -class Section: - """数据段配置类 - - Args: - data_len (int): 数据长度 - start_pos (int): 起始位置 - """ - data_len: int - start_pos: int - - -class WechatAPIClientBase: - """微信API客户端基类 - - Args: - ip (str): 服务器IP地址 - port (int): 服务器端口 - - Attributes: - wxid (str): 微信ID - nickname (str): 昵称 - alias (str): 别名 - phone (str): 手机号 - """ - def __init__(self, ip: str, port: int): - self.ip = ip - self.port = port - - self.wxid = "" - self.nickname = "" - self.alias = "" - self.phone = "" - - # 调用所有 Mixin 的初始化方法 - super().__init__() - - @staticmethod - def error_handler(json_resp): - """处理API响应中的错误码 - - Args: - json_resp (dict): API响应的JSON数据 - - Raises: - ValueError: 参数错误时抛出 - MarshallingError: 序列化错误时抛出 - UnmarshallingError: 反序列化错误时抛出 - MMTLSError: MMTLS初始化错误时抛出 - PacketError: 数据包长度错误时抛出 - UserLoggedOut: 用户已退出登录时抛出 - ParsePacketError: 解析数据包错误时抛出 - DatabaseError: 数据库错误时抛出 - Exception: 其他类型错误时抛出 - """ - code = json_resp.get("Code") - if code == -1: # 参数错误 - raise ValueError(json_resp.get("Message")) - elif code == -2: # 其他错误 - raise Exception(json_resp.get("Message")) - elif code == -3: # 序列化错误 - raise MarshallingError(json_resp.get("Message")) - elif code == -4: # 反序列化错误 - raise UnmarshallingError(json_resp.get("Message")) - elif code == -5: # MMTLS初始化错误 - raise MMTLSError(json_resp.get("Message")) - elif code == -6: # 收到的数据包长度错误 - raise PacketError(json_resp.get("Message")) - elif code == -7: # 已退出登录 - raise UserLoggedOut("Already logged out") - elif code == -8: # 链接过期 - raise Exception(json_resp.get("Message")) - elif code == -9: # 解析数据包错误 - raise ParsePacketError(json_resp.get("Message")) - elif code == -10: # 数据库错误 - raise DatabaseError(json_resp.get("Message")) - elif code == -11: # 登陆异常 - raise UserLoggedOut(json_resp.get("Message")) - elif code == -12: # 操作过于频繁 - raise Exception(json_resp.get("Message")) - elif code == -13: # 上传失败 - raise Exception(json_resp.get("Message")) diff --git a/wechat_ipad/client/fallback.png b/wechat_ipad/client/fallback.png deleted file mode 100644 index e081b27..0000000 Binary files a/wechat_ipad/client/fallback.png and /dev/null differ diff --git a/wechat_ipad/client/firends.py b/wechat_ipad/client/firends.py deleted file mode 100644 index f9d13a0..0000000 --- a/wechat_ipad/client/firends.py +++ /dev/null @@ -1,149 +0,0 @@ -from typing import Union - -import aiohttp - -from wechat_ipad import UserLoggedOut -from wechat_ipad.client.base import WechatAPIClientBase - - -class FriendMixin(WechatAPIClientBase): - async def accept_friend(self, scene: int, v1: str, v2: str) -> bool: - """接受好友请求 - - 主动添加好友单天上限如下所示:1小时内上限为 5个,超过上限时,无法发出好友请求,也收不到好友请求。 - - - 新账号:5/天 - - 注册超过7天:10个/天 - - 注册满3个月&&近期登录过该电脑:15/天 - - 注册满6个月&&近期经常登录过该电脑:20/天 - - 注册满6个月&&近期频繁登陆过该电脑:30/天 - - 注册1年以上&&一直登录:50/天 - - 上一次通过好友到下一次通过间隔20-40s - - 收到加人申请,到通过好友申请(每天最多通过300个好友申请),间隔30s+(随机时间) - - Args: - scene: 来源 在消息的xml获取 - v1: v1key - v2: v2key - - Returns: - bool: 操作是否成功 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "Scene": scene, "V1": v1, "V2": v2} - response = await session.post(f'http://{self.ip}:{self.port}/api/Friend/PassVerify', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return True - else: - self.error_handler(json_resp) - - async def get_contact(self, wxid: Union[str, list[str]]) -> Union[dict, list[dict]]: - """获取联系人信息 - - Args: - wxid: 联系人wxid, 可以是多个wxid在list里,也可查询chatroom - - Returns: - Union[dict, list[dict]]: 单个联系人返回dict,多个联系人返回list[dict] - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - if isinstance(wxid, list): - wxid = ",".join(wxid) - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "RequestWxids": wxid} - response = await session.post(f'http://{self.ip}:{self.port}/api/Friend/GetContractDetail', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - contact_list = json_resp.get("Data").get("ContactList") - if len(contact_list) == 1: - return contact_list[0] - else: - return contact_list - else: - self.error_handler(json_resp) - - async def get_contract_detail(self, wxid: Union[str, list[str]], chatroom: str = "") -> list: - """获取联系人详情 - - Args: - wxid: 联系人wxid - chatroom: 群聊wxid - - Returns: - list: 联系人详情列表 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - if isinstance(wxid, list): - if len(wxid) > 20: - raise ValueError("一次最多查询20个联系人") - wxid = ",".join(wxid) - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "Towxids": wxid, "Chatroom": chatroom} - response = await session.post(f'http://{self.ip}:{self.port}/api/Friend/GetContractDetail', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return json_resp.get("Data").get("ContactList") - else: - self.error_handler(json_resp) - - async def get_contract_list(self, wx_seq: int = 0, chatroom_seq: int = 0) -> list: - """获取联系人用户名列表 - - Args: - wx_seq: 联系人序列 - chatroom_seq: 群聊序列 - - Returns: - list: 联系人用户名列表 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "CurrentWxcontactSeq": wx_seq, "CurrentChatroomContactSeq": chatroom_seq} - response = await session.post(f'http://{self.ip}:{self.port}/api/Friend/GetContractList', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - # 直接返回联系人用户名列表 - return json_resp.get("Data").get("ContactUsernameList", []) - else: - self.error_handler(json_resp) - - async def get_nickname(self, wxid: Union[str, list[str]]) -> Union[str, list[str]]: - """获取用户昵称 - - Args: - wxid: 用户wxid,可以是单个wxid或最多20个wxid的列表 - - Returns: - Union[str, list[str]]: 如果输入单个wxid返回str,如果输入wxid列表则返回对应的昵称列表 - """ - data = await self.get_contract_detail(wxid) - - if isinstance(wxid, str): - try: - return data[0].get("NickName").get("string") - except: - return "" - else: - result = [] - for contact in data: - try: - result.append(contact.get("NickName").get("string")) - except: - result.append("") - return result diff --git a/wechat_ipad/client/friend_circle.py b/wechat_ipad/client/friend_circle.py deleted file mode 100644 index b149476..0000000 --- a/wechat_ipad/client/friend_circle.py +++ /dev/null @@ -1,145 +0,0 @@ -import base64 -import os -from typing import Union - -import aiofiles -import aiohttp - -from wechat_ipad import UserLoggedOut -from wechat_ipad.client.base import WechatAPIClientBase -from wechat_ipad.models.friend_circle_info import build_friend_circle_xml - - -class FriendCircleMixin(WechatAPIClientBase): - async def get_friend_circle_list(self, max_id: int = 0, first_page_md5: str = "") -> dict: - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "Maxid": max_id, "Fristpagemd5": first_page_md5} - response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/GetList", json=json_param) - json_resp = await response.json() - if json_resp.get("Success"): - return json_resp.get("Data", {}) - self.error_handler(json_resp) - - async def get_friend_circle_detail(self, towxid: str, max_id: int = 0, first_page_md5: str = "") -> dict: - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = { - "Wxid": self.wxid, - "Towxid": towxid, - "Maxid": max_id, - "Fristpagemd5": first_page_md5 - } - response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/GetDetail", json=json_param) - json_resp = await response.json() - if json_resp.get("Success"): - return json_resp.get("Data", {}) - self.error_handler(json_resp) - - async def get_friend_circle_id_detail(self, object_id: Union[str, int], towxid: str = "") -> dict: - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "Towxid": towxid, "Id": int(object_id)} - response = await session.post( - f"http://{self.ip}:{self.port}/api/FriendCircle/GetIdDetail", - json=json_param - ) - json_resp = await response.json() - if json_resp.get("Success"): - return json_resp.get("Data", {}) - self.error_handler(json_resp) - - async def publish_friend_circle(self, content: str, media_items: list[dict] | None = None, - blacklist: str = "", with_user_list: str = "") -> dict: - if not self.wxid: - raise UserLoggedOut("请先登录") - - xml_content = build_friend_circle_xml(self.wxid, content, media_items=media_items) - async with aiohttp.ClientSession() as session: - json_param = { - "Wxid": self.wxid, - "Content": xml_content, - "BlackList": blacklist, - "WithUserList": with_user_list - } - response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/Messages", json=json_param) - json_resp = await response.json() - if json_resp.get("Success"): - return json_resp.get("Data", {}) - self.error_handler(json_resp) - - async def friend_circle_comment(self, object_id: str, content: str = "", type: int = 2, - reply_comment_id: int = 0) -> dict: - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = { - "Wxid": self.wxid, - "Id": str(object_id), - "Type": int(type), - "Content": content, - "ReplyCommnetId": int(reply_comment_id) - } - response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/Comment", json=json_param) - json_resp = await response.json() - if json_resp.get("Success"): - return json_resp.get("Data", {}) - self.error_handler(json_resp) - - async def friend_circle_operation(self, object_id: str, type: int, comment_id: int = 0) -> dict: - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = { - "Wxid": self.wxid, - "Id": str(object_id), - "Type": int(type), - "CommnetId": int(comment_id) - } - response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/Operation", json=json_param) - json_resp = await response.json() - if json_resp.get("Success"): - return json_resp.get("Data", {}) - self.error_handler(json_resp) - - async def sync_friend_circle(self, sync_key: str = "") -> dict: - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "Synckey": sync_key} - response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/MmSnsSync", json=json_param) - json_resp = await response.json() - if json_resp.get("Success"): - return json_resp.get("Data", {}) - self.error_handler(json_resp) - - async def upload_friend_circle_media(self, media: Union[str, bytes, os.PathLike]) -> dict: - if not self.wxid: - raise UserLoggedOut("请先登录") - - if isinstance(media, str): - media_base64 = media.split(",", 1)[1] if "," in media else media - elif isinstance(media, bytes): - media_base64 = base64.b64encode(media).decode() - elif isinstance(media, os.PathLike): - async with aiofiles.open(media, "rb") as f: - media_base64 = base64.b64encode(await f.read()).decode() - else: - raise ValueError("media should be str, bytes, or path") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "Base64": media_base64} - response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/Upload", json=json_param) - json_resp = await response.json() - if json_resp.get("Success"): - return json_resp.get("Data", {}) - self.error_handler(json_resp) diff --git a/wechat_ipad/client/group.py b/wechat_ipad/client/group.py deleted file mode 100644 index 5cd2449..0000000 --- a/wechat_ipad/client/group.py +++ /dev/null @@ -1,203 +0,0 @@ -from typing import Union, Any - -import aiohttp - -from wechat_ipad.client.base import WechatAPIClientBase -from wechat_ipad.errors import UserLoggedOut - - -class ChatroomMixin(WechatAPIClientBase): - async def add_chatroom_member(self, chatroom: str, wxid: str) -> bool: - """添加群成员(群聊最多40人) - - Args: - chatroom: 群聊wxid - wxid: 要添加的wxid - - Returns: - bool: 成功返回True, 失败False或者报错 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "Chatroom": chatroom, "InviteWxids": wxid} - response = await session.post(f'http://{self.ip}:{self.port}/AddChatroomMember', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return True - else: - self.error_handler(json_resp) - - async def get_chatroom_announce(self, chatroom: str) -> dict: - """获取群聊公告 - - Args: - chatroom: 群聊id - - Returns: - dict: 群聊信息字典 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "QID": chatroom} - response = await session.post(f'http://{self.ip}:{self.port}/api/Group/GetChatRoomInfoDetail', - json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - data = dict(json_resp.get("Data")) - data.pop("BaseResponse") - return data - else: - self.error_handler(json_resp) - - async def get_chatroom_info(self, chatroom: str) -> dict: - """获取群聊信息 - - Args: - chatroom: 群聊id - - Returns: - dict: 群聊信息字典 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "QID": chatroom} - response = await session.post(f'http://{self.ip}:{self.port}/api/Group/GetChatRoomInfo', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return json_resp.get("Data").get("ContactList")[0] - else: - self.error_handler(json_resp) - - async def get_chatroom_member_list(self, chatroom: str) -> list[dict]: - """获取群聊成员列表 - - Args: - chatroom: 群聊id - - Returns: - list[dict]: 群聊成员列表 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "QID": chatroom} - response = await session.post(f'http://{self.ip}:{self.port}/api/Group/GetChatRoomMemberDetail', - json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return json_resp.get("Data").get("NewChatroomData").get("ChatRoomMember") - else: - self.error_handler(json_resp) - - async def get_chatroom_qrcode(self, chatroom: str) -> dict[str, Any]: - """获取群聊二维码 - - Args: - chatroom: 群聊id - - Returns: - dict: {"base64": 二维码的base64, "description": 二维码描述} - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "Chatroom": chatroom} - response = await session.post(f'http://{self.ip}:{self.port}/GetChatroomQRCode', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - data = json_resp.get("Data") - return {"base64": data.get("qrcode").get("buffer"), "description": data.get("revokeQrcodeWording")} - else: - self.error_handler(json_resp) - - async def invite_chatroom_member(self, wxid: Union[str, list], chatroom: str) -> bool: - """邀请群聊成员(群聊大于40人) - - Args: - wxid: 要邀请的用户wxid或wxid列表 - chatroom: 群聊id - - Returns: - bool: 成功返回True, 失败False或者报错 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - if isinstance(wxid, list): - wxid = ",".join(wxid) - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "ChatRoomName": chatroom, "ToWxids": wxid} - response = await session.post(f'http://{self.ip}:{self.port}/api/Group/InviteChatRoomMember', - json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return True - else: - self.error_handler(json_resp) - - async def get_chatroom_nickname(self, wxid: Union[str, list[str]], chatroom: str) -> Union[str, list[str]]: - """获取用户昵称 - - Args: - wxid: 用户wxid,可以是单个wxid或最多20个wxid的列表 - chatroom: 群聊id - - Returns: - Union[str, list[str]]: 如果输入单个wxid返回str,如果输入wxid列表则返回对应的昵称列表 - """ - data = await self.get_chatroom_member_list(chatroom) - - if isinstance(wxid, str): - # 单个wxid的情况 - for member in data: - if member.get("UserName") == wxid: - # 优先返回DisplayName,如果不存在则返回NickName - return member.get("DisplayName") or member.get("NickName") or wxid - return "" # 如果没找到对应的成员,返回空字符串 - else: - # wxid列表的情况 - result = [] - for single_wxid in wxid: - found = False - for member in data: - if member.get("UserName") == single_wxid: - # 优先返回DisplayName,如果不存在则返回NickName - result.append(member.get("DisplayName") or member.get("NickName") or wxid) - found = True - break - if not found: - result.append(wxid) # 如果没找到对应的成员,添加空字符串 - return result - - async def get_chatroom_member_detail(self, wxid: str, chatroom: str) -> dict: - """获取用户昵称 - - Args: - wxid: 用户wxid,可以是单个wxid或最多20个wxid的列表 - chatroom: 群聊id - - Returns: - Union[str, list[str]]: 如果输入单个wxid返回str,如果输入wxid列表则返回对应的昵称列表 - """ - data = await self.get_chatroom_member_list(chatroom) - - for member in data: - if member.get("UserName") == wxid: - # 优先返回DisplayName,如果不存在则返回NickName - return member - return {} # 如果没找到对应的成员,返回空字符串 diff --git a/wechat_ipad/client/login.py b/wechat_ipad/client/login.py deleted file mode 100644 index 0ea67ff..0000000 --- a/wechat_ipad/client/login.py +++ /dev/null @@ -1,274 +0,0 @@ -import hashlib -import string -from random import choice -from typing import Union - -import aiohttp -import qrcode - -from wechat_ipad.client.base import * - -from loguru import logger - - -class LoginMixin(WechatAPIClientBase): - - async def get_qr_code(self, device_name: str, device_id: str = "", proxy: Proxy = None, print_qr: bool = False) -> ( - str, str): - """获取登录二维码。 - - Args: - device_name (str): 设备名称 - device_id (str, optional): 设备ID. Defaults to "". - proxy (Proxy, optional): 代理信息. Defaults to None. - print_qr (bool, optional): 是否在控制台打印二维码. Defaults to False. - - Returns: - tuple[str, str]: 返回登录二维码的UUID和URL - - Raises: - 根据error_handler处理错误 - """ - async with aiohttp.ClientSession() as session: - json_param = {'DeviceName': device_name, 'DeviceID': device_id} - if proxy: - json_param['ProxyInfo'] = {'ProxyIp': f'{proxy.ip}:{proxy.port}', - 'ProxyPassword': proxy.password, - 'ProxyUser': proxy.username} - - response = await session.post(f'http://{self.ip}:{self.port}/api/Login/GetQR', json=json_param) - json_resp = await response.json() - logger.debug("get_qr_code:{}", json_resp) - if json_resp.get("Success"): - - if print_qr: - qr = qrcode.QRCode( - version=1, - error_correction=qrcode.constants.ERROR_CORRECT_L, - box_size=10, - border=4, - ) - qr.add_data(f'http://weixin.qq.com/x/{json_resp.get("Data").get("Uuid")}') - qr.make(fit=True) - qr.print_ascii() - - return json_resp.get("Data").get("Uuid"), json_resp.get("Data").get("QrUrl") - else: - self.error_handler(json_resp) - - async def check_login_uuid(self, uuid: str, device_id: str = "") -> tuple[bool, Union[dict, int]]: - """检查登录的UUID状态。 - - Args: - uuid (str): 登录的UUID - device_id (str, optional): 设备ID. Defaults to "". - - Returns: - tuple[bool, Union[dict, int]]: 如果登录成功返回(True, 用户信息),否则返回(False, 过期时间) - - Raises: - 根据error_handler处理错误 - """ - async with aiohttp.ClientSession() as session: - response = await session.post(f'http://{self.ip}:{self.port}/api/Login/CheckQR?uuid={uuid}') - json_resp = await response.json() - logger.debug("check_login_uuid:{}", json_resp) - if json_resp.get("Success"): - if json_resp.get("Message") == "登陆成功": - self.wxid = json_resp.get("Data").get("acctSectResp").get("userName") - self.nickname = json_resp.get("Data").get("acctSectResp").get("nickName") - return True, json_resp.get("Data") - else: - return False, json_resp.get("Data").get("expiredTime") - else: - self.error_handler(json_resp) - - async def log_out(self) -> bool: - """登出当前账号。 - - Returns: - bool: 登出成功返回True,否则返回False - - Raises: - UserLoggedOut: 如果未登录时调用 - 根据error_handler处理错误 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - response = await session.post(f'http://{self.ip}:{self.port}/api/Login/LogOut?wxid={self.wxid}') - json_resp = await response.json() - - if json_resp.get("Success"): - return True - elif json_resp.get("Success"): - return False - else: - self.error_handler(json_resp) - - async def awaken_login(self, wxid: str = "") -> str: - """唤醒登录。 - - Args: - wxid (str, optional): 要唤醒的微信ID. Defaults to "". - - Returns: - str: 返回新的登录UUID - - Raises: - Exception: 如果未提供wxid且未登录 - LoginError: 如果无法获取UUID - 根据error_handler处理错误 - """ - if not wxid and not self.wxid: - raise Exception("Please login using QRCode first") - - if not wxid and self.wxid: - wxid = self.wxid - - async with aiohttp.ClientSession() as session: - response = await session.post(f'http://{self.ip}:{self.port}/api/Login/Awaken?wxid={wxid}') - json_resp = await response.json() - - if json_resp.get("Success") and json_resp.get("Data").get("QrCodeResponse").get("Uuid"): - return json_resp.get("Data").get("QrCodeResponse").get("Uuid") - elif not json_resp.get("Data").get("QrCodeResponse").get("Uuid"): - raise LoginError("Please login using QRCode first") - else: - self.error_handler(json_resp) - - async def get_cached_info(self, wxid: str = None) -> dict: - """获取登录缓存信息。 - - Args: - wxid (str, optional): 要查询的微信ID. Defaults to None. - - Returns: - dict: 返回缓存信息,如果未提供wxid且未登录返回空字典 - """ - if not wxid: - wxid = self.wxid - - if not wxid: - return {} - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": wxid} - response = await session.post(f'http://{self.ip}:{self.port}/api/Login/GetCacheInfo', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return json_resp.get("Data") - else: - return {} - - async def heartbeat(self) -> bool: - """发送心跳包。 - - Returns: - bool: 成功返回True,否则返回False - - Raises: - UserLoggedOut: 如果未登录时调用 - 根据error_handler处理错误 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - # logger.debug(f'heartbeat: http://{self.ip}:{self.port}/api/Login/HeartBeat?wxid={self.wxid}') - response = await session.post(f'http://{self.ip}:{self.port}/api/Login/HeartBeat?wxid={self.wxid}') - json_resp = await response.json() - if json_resp.get("Success"): - return True - else: - self.error_handler(json_resp) - - async def heartbeat_long(self) -> bool: - """发送心跳包。 - - Returns: - bool: 成功返回True,否则返回False - - Raises: - UserLoggedOut: 如果未登录时调用 - 根据error_handler处理错误 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - # logger.debug(f'heartbeat long: http://{self.ip}:{self.port}/api/Login/HeartBeatLong?wxid={self.wxid}') - response = await session.post(f'http://{self.ip}:{self.port}/api/Login/HeartBeatLong?wxid={self.wxid}') - json_resp = await response.json() - if json_resp.get("Success"): - return True - else: - self.error_handler(json_resp) - - async def twice_auto_auth(self) -> bool: - """二次登录。 - - Returns: - bool: 成功返回True,否则返回False - - Raises: - UserLoggedOut: 如果未登录时调用 - 根据error_handler处理错误 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - response = await session.post(f'http://{self.ip}:{self.port}/api/Login/TwiceAutoAuth?wxid={self.wxid}') - json_resp = await response.json() - if json_resp.get("Success"): - return True - else: - logger.error("Twice Auto Auth Failed") - return False - - @staticmethod - def create_device_name() -> str: - """生成一个随机的设备名。 - - Returns: - str: 返回生成的设备名 - """ - first_names = [ - "Oliver", "Emma", "Liam", "Ava", "Noah", "Sophia", "Elijah", "Isabella", - "James", "Mia", "William", "Amelia", "Benjamin", "Harper", "Lucas", "Evelyn", - "Henry", "Abigail", "Alexander", "Ella", "Jackson", "Scarlett", "Sebastian", - "Grace", "Aiden", "Chloe", "Matthew", "Zoey", "Samuel", "Lily", "David", - "Aria", "Joseph", "Riley", "Carter", "Nora", "Owen", "Luna", "Daniel", - "Sofia", "Gabriel", "Ellie", "Matthew", "Avery", "Isaac", "Mila", "Leo", - "Julian", "Layla" - ] - - last_names = [ - "Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis", - "Rodriguez", "Martinez", "Hernandez", "Lopez", "Gonzalez", "Wilson", "Anderson", - "Thomas", "Taylor", "Moore", "Jackson", "Martin", "Lee", "Perez", "Thompson", - "White", "Harris", "Sanchez", "Clark", "Ramirez", "Lewis", "Robinson", "Walker", - "Young", "Allen", "King", "Wright", "Scott", "Torres", "Nguyen", "Hill", - "Flores", "Green", "Adams", "Nelson", "Baker", "Hall", "Rivera", "Campbell", - "Mitchell", "Carter", "Roberts", "Gomez", "Phillips", "Evans" - ] - - return choice(first_names) + " " + choice(last_names) + "'s Pad" - - @staticmethod - def create_device_id(s: str = "") -> str: - """生成设备ID。 - - Args: - s (str, optional): 用于生成ID的字符串. Defaults to "". - - Returns: - str: 返回生成的设备ID - """ - if s == "" or s == "string": - s = ''.join(choice(string.ascii_letters) for _ in range(15)) - md5_hash = hashlib.md5(s.encode()).hexdigest() - return "49" + md5_hash[2:] diff --git a/wechat_ipad/client/message.py b/wechat_ipad/client/message.py deleted file mode 100644 index 14cde47..0000000 --- a/wechat_ipad/client/message.py +++ /dev/null @@ -1,747 +0,0 @@ -import asyncio -import base64 -import os -import time -from asyncio import Future -from asyncio import Queue, sleep -from io import BytesIO -from pathlib import Path -from typing import Union - -import aiohttp -import pysilk -from loguru import logger -from pydub import AudioSegment -from pymediainfo import MediaInfo -import aiofiles - -from utils.video_utils import get_first_frame, get_first_frame_bytes -from utils.trace_context import format_trace_prefix -from wechat_ipad import UserLoggedOut -from wechat_ipad.client.base import WechatAPIClientBase - - -class MessageMixin(WechatAPIClientBase): - def __init__(self, ip: str, port: int): - # 初始化消息队列 - super().__init__(ip, port) - self._message_queue = Queue() - self._is_processing = False - self.logging = logger - - async def _process_message_queue(self): - """ - 处理消息队列的异步方法 - """ - if self._is_processing: - return - - self._is_processing = True - while True: - if self._message_queue.empty(): - self._is_processing = False - break - - func, args, kwargs, future = await self._message_queue.get() - try: - result = await func(*args, **kwargs) - future.set_result(result) - except Exception as e: - future.set_exception(e) - finally: - self._message_queue.task_done() - await sleep(1) # 消息发送间隔1秒 - - async def _queue_message(self, func, *args, **kwargs): - """ - 将消息添加到队列 - """ - future = Future() - await self._message_queue.put((func, args, kwargs, future)) - - if not self._is_processing: - asyncio.create_task(self._process_message_queue()) - - return await future - - async def revoke_message(self, wxid: str, client_msg_id: int, create_time: int, new_msg_id: int) -> bool: - """撤回消息。 - { - "ClientMsgId": 0, - "CreateTime": 0, - "NewMsgId": 0, - "ToUserName": "string", - "Wxid": "string" - } - Args: - wxid (str): 接收人wxid - client_msg_id (int): 发送消息的返回值 - create_time (int): 发送消息的返回值 - new_msg_id (int): 发送消息的返回值 - - Returns: - bool: 成功返回True,失败返回False - - Raises: - UserLoggedOut: 未登录时调用 - BanProtection: 登录新设备后4小时内操作 - 根据error_handler处理错误 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "ToUserName": wxid, "ClientMsgId": client_msg_id, - "CreateTime": create_time, - "NewMsgId": new_msg_id} - response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/Revoke', json=json_param) - json_resp = await response.json() - if json_resp.get("Success"): - self.logging.info("消息撤回成功: 对方wxid:{} ClientMsgId:{} CreateTime:{} NewMsgId:{}", - wxid, - client_msg_id, - create_time, - new_msg_id) # 确保四个参数都正确传入 - return True - else: - self.error_handler(json_resp) - - async def send_text_message(self, wxid: str, content: str, at: Union[list, str] = "") -> tuple[int, int, int]: - """发送文本消息。 - - Args: - wxid (str): 接收人wxid - content (str): 消息内容 - at (list, str, optional): 要@的用户 - - Returns: - tuple[int, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId) - - Raises: - UserLoggedOut: 未登录时调用 - BanProtection: 登录新设备后4小时内操作 - 根据error_handler处理错误 - """ - return await self._queue_message(self._send_text_message, wxid, content, at) - - async def _send_text_message(self, wxid: str, content: str, at: list[str] = None) -> tuple[int, int, int]: - """ - 实际发送文本消息的方法 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - if isinstance(at, str): - at_str = at - elif isinstance(at, list): - if at is None: - at = [] - at_str = ",".join(at) - else: - raise ValueError("Argument 'at' should be str or list") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": content, "Type": 1, "At": at_str} - response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendTxt', json=json_param) - json_resp = await response.json() - if json_resp.get("Success"): - # 发送动作也带上 trace_id,便于把“某条入站消息最终发了什么”直接串起来。 - self.logging.info("{}发送文字消息: 对方wxid:{} at:{} 内容:{}", - format_trace_prefix(), wxid, at, content) - data = json_resp.get("Data") - return data.get("List")[0].get("ClientMsgId"), data.get("List")[0].get("CreateTime"), data.get("List")[ - 0].get("NewMsgId") - else: - self.error_handler(json_resp) - - async def send_image_message(self, wxid: str, image: Union[str, bytes, os.PathLike]) -> tuple[int, int, int]: - """发送图片消息。 - - Args: - wxid (str): 接收人wxid - image (str, byte, os.PathLike): 图片,支持base64字符串,图片byte,图片路径 - - Returns: - tuple[int, int, int]: 返回(ClientImgId, CreateTime, NewMsgId) - - Raises: - UserLoggedOut: 未登录时调用 - BanProtection: 登录新设备后4小时内操作 - ValueError: image_path和image_base64都为空或都不为空时 - 根据error_handler处理错误 - """ - return await self._queue_message(self._send_image_message, wxid, image) - - async def _send_image_message(self, wxid: str, image: Union[str, bytes, os.PathLike]) -> tuple[ - int, int, int]: - if not self.wxid: - raise UserLoggedOut("请先登录") - - if isinstance(image, str): - pass - elif isinstance(image, bytes): - image = base64.b64encode(image).decode() - elif isinstance(image, os.PathLike): - async with aiofiles.open(image, 'rb') as f: - image = base64.b64encode(await f.read()).decode() - else: - raise ValueError("Argument 'image' can only be str, bytes, or os.PathLike") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Base64": image} - response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/UploadImg', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - json_param.pop('Base64') - # 图片日志不打印 base64 内容,但保留 trace_id,便于关联具体发送动作。 - self.logging.info("{}发送图片消息: 对方wxid:{} 图片base64略", - format_trace_prefix(), wxid) - data = json_resp.get("Data") - self.logging.debug("发送图片消息成功,返回:{}", data) - return data.get("ClientImgId").get("string"), data.get("CreateTime"), data.get("NewMsgId") - else: - self.error_handler(json_resp) - - async def send_video_message(self, wxid: str, video: Union[str, bytes, os.PathLike], - image: [str, bytes, os.PathLike] = None): - """发送视频消息。不推荐使用,上传速度很慢300KB/s。如要使用,可压缩视频,或者发送链接卡片而不是视频。 - - Args: - wxid (str): 接收人wxid - video (str, bytes, os.PathLike): 视频 接受base64字符串,字节,文件路径 - image (str, bytes, os.PathLike): 视频封面图片 接受base64字符串,字节,文件路径 - - Returns: - tuple[int, int]: 返回(ClientMsgid, NewMsgId) - - Raises: - UserLoggedOut: 未登录时调用 - BanProtection: 登录新设备后4小时内操作 - ValueError: 视频或图片参数都为空或都不为空时 - 根据error_handler处理错误 - """ - has_image = False - if not image: - image = Path(os.path.join(Path(__file__).resolve().parent, "fallback.png")) - else: - has_image = True - # get video base64 and duration - if isinstance(video, str): - vid_base64 = video - video = base64.b64decode(video) - file_len = len(video) - media_info = MediaInfo.parse(BytesIO(video)) - elif isinstance(video, bytes): - vid_base64 = base64.b64encode(video).decode() - file_len = len(video) - media_info = MediaInfo.parse(BytesIO(video)) - # 如果没有传入首帧,则自己提取一次 - if not has_image: - first_frame = get_first_frame_bytes(video, f"frame_{int(time.time())}.jpg") - if first_frame: - image = Path(first_frame) - elif isinstance(video, os.PathLike): - video_path = Path(video) - if not video_path.exists(): - raise ValueError(f"Video file does not exist: {video_path}") - async with aiofiles.open(video_path, "rb") as f: - video_bytes = await f.read() - file_len = len(video_bytes) - vid_base64 = base64.b64encode(video_bytes).decode() - media_info = MediaInfo.parse(video_path) - # 如果没有传入首帧,则自己提取一次 - if not has_image: - first_frame = get_first_frame(video_path, f"frame_{int(time.time())}.jpg") - if first_frame: - image = Path(first_frame) - else: - raise ValueError("video should be str, bytes, or path") - # 获取视频时长 - duration = None - for track in media_info.tracks: - if track.track_type == "Video" and track.duration is not None: - duration = int(track.duration / 1000) # 将毫秒转换为秒 - break - if duration is None: - duration = 1 - self.logging.error(f"无法从视频文件获取时长: {video}") - # get image base64 - if isinstance(image, str): - image_base64 = image - elif isinstance(image, bytes): - image_base64 = base64.b64encode(image).decode() - elif isinstance(image, os.PathLike): - async with aiofiles.open(image, "rb") as f: - image_base64 = base64.b64encode(await f.read()).decode() - else: - raise ValueError("image should be str, bytes, or path") - # self.logging.debug(f"vid_base64:{vid_base64}") - # self.logging.debug(f"images_base64:{image_base64}") - # 打印预估时间,300KB/s - predict_time = int(file_len / 1024 / 300) - self.logging.debug("开始发送视频: 对方wxid:{} 视频base64略 图片base64略 预计耗时:{}秒", wxid, predict_time) - # self.logging.debug(f"image:{image};image_base64:{image_base64}") - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Base64": "data:video/mp4;base64," + vid_base64, - "ImageBase64": "data:image/jpeg;base64," + image_base64, - "PlayLength": duration} - # self.logging.debug(f"json_param::{json_param}") - async with session.post(f'http://{self.ip}:{self.port}/api/Msg/SendVideo', json=json_param) as resp: - json_resp = await resp.json() - # self.logging.debug(f"json_resp:{json_resp}") - if json_resp.get("Success"): - json_param.pop('Base64') - json_param.pop('ImageBase64') - self.logging.info("发送视频成功: 对方wxid:{} 时长:{} 视频base64略 图片base64略", wxid, duration) - data = json_resp.get("Data") - return data.get("clientMsgId"), data.get("newMsgId") - else: - self.error_handler(json_resp) - - async def send_voice_message(self, wxid: str, voice: Union[str, bytes, os.PathLike], format: str = "amr") -> \ - tuple[int, int, int]: - """发送语音消息。 - - Args: - wxid (str): 接收人wxid - voice (str, bytes, os.PathLike): 语音 接受base64字符串,字节,文件路径 - format (str, optional): 语音格式,支持amr/wav/mp3. Defaults to "amr". - - Returns: - tuple[int, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId) - - Raises: - UserLoggedOut: 未登录时调用 - BanProtection: 登录新设备后4小时内操作 - ValueError: voice_path和voice_base64都为空或都不为空时,或format不支持时 - 根据error_handler处理错误 - """ - return await self._queue_message(self._send_voice_message, wxid, voice, format) - - async def _send_voice_message(self, wxid: str, voice: Union[str, bytes, os.PathLike], format: str = "mar") -> \ - tuple[int, int, int]: - if not self.wxid: - raise UserLoggedOut("请先登录") - - elif format not in ["amr", "wav", "mp3", "silk", "speex"]: - raise ValueError("format must be one of amr, wav, mp3") - - # read voice to byte - if isinstance(voice, str): - voice_byte = base64.b64decode(voice) - elif isinstance(voice, bytes): - voice_byte = voice - elif isinstance(voice, os.PathLike): - async with aiofiles.open(voice, "rb") as f: - voice_byte = await f.read() - else: - raise ValueError("voice should be str, bytes, or path") - voice_type = 0 - # get voice duration and b64 - if format.lower() == "amr": - audio = AudioSegment.from_file(BytesIO(voice_byte), format="amr") - voice_base64 = base64.b64encode(voice_byte).decode() - elif format.lower() == "wav": - audio = AudioSegment.from_file(BytesIO(voice_byte), format="wav").set_channels(1) - self.logging.debug(f"1audio.frame_rate: {audio.frame_rate}") - audio = audio.set_frame_rate(self._get_closest_frame_rate(audio.frame_rate)) - self.logging.debug(f"2audio.frame_rate: {audio.frame_rate}") - audio = audio.set_channels(1).set_sample_width(2) # 16-bit PCM - logger.info( - f"音频处理: 格式={format}, 采样率={audio.frame_rate}, 声道数={audio.channels}, 时长={len(audio) / 1000}s") - voice_base64 = base64.b64encode( - await pysilk.async_encode(audio.raw_data, sample_rate=audio.frame_rate)).decode() - voice_type = 4 - elif format.lower() == "mp3": - audio = AudioSegment.from_file(BytesIO(voice_byte), format="mp3").set_channels(1) - audio = audio.set_frame_rate(self._get_closest_frame_rate(audio.frame_rate)) - voice_base64 = base64.b64encode( - await pysilk.async_encode(audio.raw_data, sample_rate=audio.frame_rate)).decode() - voice_type = 4 - else: - raise ValueError("format must be one of amr, wav, mp3") - - duration = len(audio) - # Type: AMR = 0, MP3 = 2, SILK = 4, SPEEX = 1, WAVE = 3 VoiceTime :音频长度 1000为一秒 - format_dict = {"amr": 0, "wav": 3, "mp3": 2, "silk": 4, "speex": 1} - # { - # "Base64": "string", - # "ToWxid": "string", - # "Type": 0, - # "VoiceTime": 0, - # "Wxid": "string" - # } - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Base64": voice_base64, "VoiceTime": duration, - "Type": voice_type} - response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendVoice', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - json_param.pop('Base64') - self.logging.info("发送语音消息: 对方wxid:{} 时长:{} 格式:{} 音频base64略", wxid, duration, format) - data = json_resp.get("Data") - return int(data.get("ClientMsgId")), data.get("CreateTime"), data.get("NewMsgId") - else: - self.error_handler(json_resp) - - @staticmethod - def _get_closest_frame_rate(frame_rate: int) -> int: - supported = [8000, 12000, 16000, 24000] - closest_rate = None - smallest_diff = float('inf') - for num in supported: - diff = abs(frame_rate - num) - if diff < smallest_diff: - smallest_diff = diff - closest_rate = num - - return closest_rate - - async def send_link_xml_message(self, xml: str, towxid: str) -> tuple[str, int, int]: - """发送链接消息。 - { - "ToWxid": "string", - "Type": 0, - "Wxid": "string", - "Xml": "string" - } - Args: - xml (str): 发送的内容 - towxid (str):接收人 - - Returns: - tuple[str, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId) - - Raises: - UserLoggedOut: 未登录时调用 - BanProtection: 登录新设备后4小时内操作 - 根据error_handler处理错误 - """ - - return await self._queue_message(self._send_link_xml_message, xml, towxid) - - async def send_link_message(self, wxid: str, url: str, title: str = "", description: str = "", - thumb_url: str = "") -> tuple[str, int, int]: - """发送链接消息。 - { - "ToWxid": "string", - "Type": 0, - "Wxid": "string", - "Xml": "string" - } - Args: - wxid (str): 接收人wxid - url (str): 跳转链接 - title (str, optional): 标题. Defaults to "". - description (str, optional): 描述. Defaults to "". - thumb_url (str, optional): 缩略图链接. Defaults to "". - - Returns: - tuple[str, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId) - - Raises: - UserLoggedOut: 未登录时调用 - BanProtection: 登录新设备后4小时内操作 - 根据error_handler处理错误 - """ - - return await self._queue_message(self._send_link_message, wxid, url, title, description, thumb_url) - - async def _send_link_xml_message(self, xml: str, towxid: str) -> tuple[int, int, int]: - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "ToWxid": towxid, "Xml": xml, "Type": 0} - logger.debug(f"_send_link_xml_message:{xml}") - response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/ShareLink', json=json_param) - json_resp = await response.json() - logger.info(f"_send_link_xml_message resp:{json_resp}") - if json_resp.get("Success"): - data = json_resp.get("Data") - return data.get("clientMsgId"), data.get("createTime"), data.get("newMsgId") - else: - self.error_handler(json_resp) - - async def _send_link_message(self, wxid: str, url: str, title: str = "", description: str = "", - thumb_url: str = "") -> tuple[int, int, int]: - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Url": url, "Title": title, "Desc": description, - "ThumbUrl": thumb_url} - - response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/ShareLink', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - self.logging.info("发送链接消息: 对方wxid:{} 链接:{} 标题:{} 描述:{} 缩略图链接:{}", - wxid, - url, - title, - description, - thumb_url) - data = json_resp.get("Data") - return data.get("clientMsgId"), data.get("createTime"), data.get("newMsgId") - else: - self.error_handler(json_resp) - - async def send_emoji_message(self, wxid: str, md5: str, total_length: int) -> list[dict]: - """发送表情消息。 - - Args: - wxid (str): 接收人wxid - md5 (str): 表情md5值 - total_length (int): 表情总长度 - - Returns: - list[dict]: 返回表情项列表(list of emojiItem) - - Raises: - UserLoggedOut: 未登录时调用 - BanProtection: 登录新设备后4小时内操作 - 根据error_handler处理错误 - """ - return await self._queue_message(self._send_emoji_message, wxid, md5, total_length) - - async def _send_emoji_message(self, wxid: str, md5: str, total_length: int) -> tuple[int, int, int]: - if not self.wxid: - raise UserLoggedOut("请先登录") - - # 表情发送接口历史上最容易出现“接口长时间不返回,导致整个消息队列被拖住”的问题, - # 因此这里单独加总超时和更细的日志,方便区分“参数错误”和“接口无响应”两类故障。 - timeout = aiohttp.ClientTimeout(total=20) - async with aiohttp.ClientSession(timeout=timeout) as session: - json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Md5": md5, "TotalLen": total_length} - try: - self.logging.info("开始发送表情消息: 对方wxid:{} md5:{} 总长度:{}", wxid, md5, total_length) - response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendEmoji', json=json_param) - json_resp = await response.json(content_type=None) - except asyncio.TimeoutError as exc: - self.logging.error("发送表情消息超时: 对方wxid:{} md5:{} 总长度:{}", wxid, md5, total_length) - raise TimeoutError("SendEmoji 接口调用超时") from exc - - if json_resp.get("Success"): - data = json_resp.get("Data") or {} - self.logging.info("发送表情消息成功: 对方wxid:{} md5:{} 总长度:{}", wxid, md5, total_length) - return data.get("emojiItem") or data.get("EmojiItem") or data - else: - self.logging.error("发送表情消息失败: 对方wxid:{} md5:{} 总长度:{} resp:{}", - wxid, md5, total_length, json_resp) - self.error_handler(json_resp) - - async def send_card_message(self, wxid: str, card_wxid: str, card_nickname: str, card_alias: str = "") -> tuple[ - int, int, int]: - """发送名片消息。 - - Args: - wxid (str): 接收人wxid - card_wxid (str): 名片用户的wxid - card_nickname (str): 名片用户的昵称 - card_alias (str, optional): 名片用户的备注. Defaults to "". - - Returns: - tuple[int, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId) - - Raises: - UserLoggedOut: 未登录时调用 - BanProtection: 登录新设备后4小时内操作 - 根据error_handler处理错误 - """ - return await self._queue_message(self._send_card_message, wxid, card_wxid, card_nickname, card_alias) - - async def _send_card_message(self, wxid: str, card_wxid: str, card_nickname: str, card_alias: str = "") -> tuple[ - int, int, int]: - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with (aiohttp.ClientSession() as session): - - json_param = { - "CardAlias": card_alias, - "CardNickName": card_nickname, - "CardWxId": card_wxid, - "ToWxid": wxid, - "Wxid": self.wxid - } - - response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/ShareCard', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - self.logging.info("发送名片消息: 对方wxid:{} 名片wxid:{} 名片备注:{} 名片昵称:{}", wxid, - card_wxid, - card_alias, - card_nickname) - data = json_resp.get("Data") - return data.get("List")[0].get("ClientMsgid"), data.get("List")[0].get("Createtime"), data.get("List")[ - 0].get("NewMsgId") - else: - self.error_handler(json_resp) - - async def send_app_message(self, wxid: str, xml: str, type: int) -> tuple[str, int, int]: - """发送应用消息。 - - Args: - wxid (str): 接收人wxid - xml (str): 应用消息的xml内容 - type (int): 应用消息类型 - - Returns: - tuple[str, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId) - - Raises: - UserLoggedOut: 未登录时调用 - BanProtection: 登录新设备后4小时内操作 - 根据error_handler处理错误 - """ - return await self._queue_message(self._send_app_message, wxid, xml, type) - - async def _send_app_message(self, wxid: str, xml: str, type: int = 0) -> tuple[int, int, int]: - if not self.wxid: - raise UserLoggedOut("请先登录") - # { - # "ToWxid": "string", - # "Type": 0, - # "Wxid": "string", - # "Xml": "string" - # } - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Xml": xml, "Type": type} - response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendApp', json=json_param) - json_resp = await response.json() - logger.info(f"json_resp: {json_resp}") - if json_resp.get("Success"): - json_param["Xml"] = json_param["Xml"].replace("\n", "") - self.logging.info("发送app消息: 对方wxid:{} 类型:{} xml:{}", wxid, type, json_param["Xml"]) - return json_resp.get("Data").get("clientMsgId"), json_resp.get("Data").get( - "createTime"), json_resp.get("Data").get("newMsgId") - else: - self.error_handler(json_resp) - - async def send_cdn_file_msg(self, wxid: str, xml: str) -> tuple[str, int, int]: - """转发文件消息。 - - Args: - wxid (str): 接收人wxid - xml (str): 要转发的文件消息xml内容 - - Returns: - tuple[str, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId) - - Raises: - UserLoggedOut: 未登录时调用 - BanProtection: 登录新设备后4小时内操作 - 根据error_handler处理错误 - """ - return await self._queue_message(self._send_cdn_file_msg, wxid, xml) - - async def _send_cdn_file_msg(self, wxid: str, xml: str) -> tuple[int, int, int]: - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": xml} - response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendCDNFile', json=json_param) - json_resp = await response.json() - self.logging.debug("json_resp: %s", json_resp) - if json_resp.get("Success"): - self.logging.info("转发文件消息: 对方wxid:{} xml:{}", wxid, xml) - data = json_resp.get("Data") - return data.get("clientMsgId"), data.get("createTime"), data.get("newMsgId") - else: - self.error_handler(json_resp) - - async def send_cdn_img_msg(self, wxid: str, xml: str) -> tuple[str, int, int]: - """转发图片消息。 - - Args: - wxid (str): 接收人wxid - xml (str): 要转发的图片消息xml内容 - - Returns: - tuple[str, int, int]: 返回(ClientImgId, CreateTime, NewMsgId) - - Raises: - UserLoggedOut: 未登录时调用 - BanProtection: 登录新设备后4小时内操作 - 根据error_handler处理错误 - """ - return await self._queue_message(self._send_cdn_img_msg, wxid, xml) - - async def _send_cdn_img_msg(self, wxid: str, xml: str) -> tuple[int, int, int]: - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": xml} - response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendCDNImg', json=json_param) - json_resp = await response.json() - self.logging.debug("json_resp: %s", json_resp) - - if json_resp.get("Success"): - self.logging.info("转发图片消息: 对方wxid:{} xml:{}", wxid, xml) - data = json_resp.get("Data") - return data.get("ClientImgId").get("string"), data.get("CreateTime"), data.get("Newmsgid") - else: - self.error_handler(json_resp) - - async def send_cdn_video_msg(self, wxid: str, xml: str) -> tuple[str, int]: - """转发视频消息。 - - Args: - wxid (str): 接收人wxid - xml (str): 要转发的视频消息xml内容 - - Returns: - tuple[str, int]: 返回(ClientMsgid, NewMsgId) - - Raises: - UserLoggedOut: 未登录时调用 - BanProtection: 登录新设备后4小时内操作 - 根据error_handler处理错误 - """ - return await self._queue_message(self._send_cdn_video_msg, wxid, xml) - - async def _send_cdn_video_msg(self, wxid: str, xml: str) -> tuple[int, int]: - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": xml} - response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/SendCDNVideo', json=json_param) - json_resp = await response.json() - - self.logging.debug("json_resp: %s", json_resp) - if json_resp.get("Success"): - self.logging.info("转发视频消息: 对方wxid:{} xml:{}", wxid, xml) - data = json_resp.get("Data") - return data.get("clientMsgId"), data.get("newMsgId") - else: - self.error_handler(json_resp) - - async def sync_message(self) -> dict: - """同步消息。 - - Returns: - dict: 返回同步到的消息数据 - - Raises: - UserLoggedOut: 未登录时调用 - 根据error_handler处理错误 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session: - json_param = {"Wxid": self.wxid, "Scene": 0, "Synckey": ""} - response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/Sync', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return json_resp.get("Data") - else: - self.error_handler(json_resp) diff --git a/wechat_ipad/client/tools.py b/wechat_ipad/client/tools.py deleted file mode 100644 index 4b9b5bd..0000000 --- a/wechat_ipad/client/tools.py +++ /dev/null @@ -1,402 +0,0 @@ -import base64 -import io -import os -import aiofiles - -import aiohttp -import pysilk -from pydub import AudioSegment - -from wechat_ipad import UserLoggedOut -from wechat_ipad.client.base import WechatAPIClientBase, Proxy - - -class ToolMixin(WechatAPIClientBase): - async def download_image(self, aeskey: str, cdnmidimgurl: str) -> str: - """CDN下载高清图片。 - { - "Wxid": "string", - "FileNo": "string", - "FileAesKey": "string" - } - Args: - aeskey (str): 图片的AES密钥 - cdnmidimgurl (str): 图片的CDN URL - - Returns: - str: 图片的base64编码字符串 - - Raises: - UserLoggedOut: 未登录时调用 - 根据error_handler处理错误 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "FileAesKey": aeskey, "FileNo": cdnmidimgurl} - response = await session.post(f'http://{self.ip}:{self.port}/api/Tools/CdnDownloadImage', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return json_resp.get("Data").get("Image") - else: - self.error_handler(json_resp) - - async def download_voice(self, msg_id: str, voiceurl: str, length: int) -> str: - """下载语音文件。 - - Args: - msg_id (str): 消息的msgid - voiceurl (str): 语音的url,从xml获取 - length (int): 语音长度,从xml获取 - - Returns: - str: 语音的base64编码字符串 - - Raises: - UserLoggedOut: 未登录时调用 - 根据error_handler处理错误 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "MsgId": msg_id, "Voiceurl": voiceurl, "Length": length} - response = await session.post(f'http://{self.ip}:{self.port}/DownloadVoice', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return json_resp.get("Data").get("data").get("buffer") - else: - self.error_handler(json_resp) - - async def download_attach_xml(self, xml_str: str) -> str: - # 读取消息信息,进行处理 - import xml.etree.ElementTree as ET - root = ET.fromstring(xml_str) - appmsg = root.find("appmsg") - appid = appmsg.attrib.get("appid", "") - appattach = appmsg.find("appattach") - attach_id = appattach.findtext("attachid", "") - datalen = int(appattach.findtext("totallen", "0")) - username = root.findtext("fromusername", "") - - return self.download_attach(attach_id, datalen, username, appid) - - async def download_attach(self, attach_id: str, datalen: int, username: str, appid: str) -> str: - """下载附件。 - { - "AppID": "wx6618f1cfc6c132f8", - "AttachId": "@cdn_3057020100044b304902010002042d0c366c02032df7950204d35d06af0204681af942042438363966373134342d663961352d343065612d623038662d3062643730663335343731370204052400050201000405004c54a100_c57ad24ba4e9ceeb3c5e10e33361028d_1", - "DataLen": 1160, - "Section": { - "DataLen": 1160, - "StartPos": 0 - }, - "UserName": "Jyunere", - "Wxid": "wxid_ts8v7yk4g5c522" - } - Args: - attach_id (str): 附件ID - - Returns: - dict: 附件数据 - - Raises: - UserLoggedOut: 未登录时调用 - 根据error_handler处理错误 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "AttachId": attach_id, "DataLen": datalen, - "Section": {"DataLen": datalen, "StartPos": 0}, "UserName": username, "AppID": appid} - response = await session.post(f'http://{self.ip}:{self.port}/api/Tools/DownloadFile', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return json_resp.get("Data").get("data").get("buffer") - else: - self.error_handler(json_resp) - - async def download_video(self, msg_id) -> str: - """下载视频。 - - Args: - msg_id (str): 消息的msg_id - - Returns: - str: 视频的base64编码字符串 - - Raises: - UserLoggedOut: 未登录时调用 - 根据error_handler处理错误 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "MsgId": msg_id} - response = await session.post(f'http://{self.ip}:{self.port}/DownloadVideo', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return json_resp.get("Data").get("data").get("buffer") - else: - self.error_handler(json_resp) - - async def friend_circle_upload(self, base64: str) -> str: - # / FriendCircle / Upload - - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "Base64": base64} - response = await session.post(f'http://{self.ip}:{self.port}/api/FriendCircle/Upload', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return json_resp.get("Data").get("ThumbUrls")[0].get("Url") - else: - self.error_handler(json_resp) - - async def set_step(self, count: int) -> bool: - """设置步数。 - - Args: - count (int): 要设置的步数 - - Returns: - bool: 成功返回True,失败返回False - - Raises: - UserLoggedOut: 未登录时调用 - BanProtection: 风控保护: 新设备登录后4小时内请挂机 - 根据error_handler处理错误 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "StepCount": count} - response = await session.post(f'http://{self.ip}:{self.port}/SetStep', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return True - else: - self.error_handler(json_resp) - - async def set_proxy(self, proxy: Proxy) -> bool: - """设置代理。 - - Args: - proxy (Proxy): 代理配置对象 - - Returns: - bool: 成功返回True,失败返回False - - Raises: - UserLoggedOut: 未登录时调用 - 根据error_handler处理错误 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, - "Proxy": {"ProxyIp": f"{proxy.ip}:{proxy.port}", - "ProxyUser": proxy.username, - "ProxyPassword": proxy.password}} - response = await session.post(f'http://{self.ip}:{self.port}/SetProxy', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return True - else: - self.error_handler(json_resp) - - async def check_database(self) -> bool: - """检查数据库状态。 - - Returns: - bool: 数据库正常返回True,否则返回False - """ - async with aiohttp.ClientSession() as session: - response = await session.get(f'http://{self.ip}:{self.port}/CheckDatabaseOK') - json_resp = await response.json() - - if json_resp.get("Running"): - return True - else: - return False - - @staticmethod - async def base64_to_file(base64_str: str, file_name: str, file_path: str) -> bool: - """将base64字符串转换为文件并保存。 - - Args: - base64_str (str): base64编码的字符串 - file_name (str): 要保存的文件名 - file_path (str): 文件保存路径 - - Returns: - bool: 转换成功返回True,失败返回False - """ - try: - os.makedirs(file_path, exist_ok=True) - - # 拼接完整的文件路径 - full_path = os.path.join(file_path, file_name) - - # 移除可能存在的 base64 头部信息 - if ',' in base64_str: - base64_str = base64_str.split(',')[1] - - # 解码 base64 并写入文件 - async with aiofiles.open(full_path, 'wb') as f: - await f.write(base64.b64decode(base64_str)) - - return True - - except Exception as e: - return False - - @staticmethod - async def file_to_base64(file_path: str) -> str: - """将文件转换为base64字符串。 - - Args: - file_path (str): 文件路径 - - Returns: - str: base64编码的字符串 - """ - async with aiofiles.open(file_path, 'rb') as f: - return base64.b64encode(await f.read()).decode() - - @staticmethod - def base64_to_byte(base64_str: str) -> bytes: - """将base64字符串转换为bytes。 - - Args: - base64_str (str): base64编码的字符串 - - Returns: - bytes: 解码后的字节数据 - """ - # 移除可能存在的 base64 头部信息 - if ',' in base64_str: - base64_str = base64_str.split(',')[1] - - return base64.b64decode(base64_str) - - @staticmethod - def byte_to_base64(byte: bytes) -> str: - """将bytes转换为base64字符串。 - - Args: - byte (bytes): 字节数据 - - Returns: - str: base64编码的字符串 - """ - return base64.b64encode(byte).decode("utf-8") - - @staticmethod - async def silk_byte_to_byte_wav_byte(silk_byte: bytes) -> bytes: - """将silk字节转换为wav字节。 - - Args: - silk_byte (bytes): silk格式的字节数据 - - Returns: - bytes: wav格式的字节数据 - """ - return await pysilk.async_decode(silk_byte, to_wav=True) - - @staticmethod - def wav_byte_to_amr_byte(wav_byte: bytes) -> bytes: - """将WAV字节数据转换为AMR格式。 - - Args: - wav_byte (bytes): WAV格式的字节数据 - - Returns: - bytes: AMR格式的字节数据 - - Raises: - Exception: 转换失败时抛出异常 - """ - try: - # 从字节数据创建 AudioSegment 对象 - audio = AudioSegment.from_wav(io.BytesIO(wav_byte)) - - # 设置 AMR 编码的标准参数 - audio = audio.set_frame_rate(8000).set_channels(1) - - # 创建一个字节缓冲区来存储 AMR 数据 - output = io.BytesIO() - - # 导出为 AMR 格式 - audio.export(output, format="amr") - - # 获取字节数据 - return output.getvalue() - - except Exception as e: - raise Exception(f"转换WAV到AMR失败: {str(e)}") - - @staticmethod - def wav_byte_to_amr_base64(wav_byte: bytes) -> str: - """将WAV字节数据转换为AMR格式的base64字符串。 - - Args: - wav_byte (bytes): WAV格式的字节数据 - - Returns: - str: AMR格式的base64编码字符串 - """ - return base64.b64encode(ToolMixin.wav_byte_to_amr_byte(wav_byte)).decode() - - @staticmethod - async def wav_byte_to_silk_byte(wav_byte: bytes) -> bytes: - """将WAV字节数据转换为silk格式。 - - Args: - wav_byte (bytes): WAV格式的字节数据 - - Returns: - bytes: silk格式的字节数据 - """ - # get pcm data - audio = AudioSegment.from_wav(io.BytesIO(wav_byte)) - pcm = audio.raw_data - return await pysilk.async_encode(pcm, data_rate=audio.frame_rate, sample_rate=audio.frame_rate) - - @staticmethod - async def wav_byte_to_silk_base64(wav_byte: bytes) -> str: - """将WAV字节数据转换为silk格式的base64字符串。 - - Args: - wav_byte (bytes): WAV格式的字节数据 - - Returns: - str: silk格式的base64编码字符串 - """ - return base64.b64encode(await ToolMixin.wav_byte_to_silk_byte(wav_byte)).decode() - - @staticmethod - async def silk_base64_to_wav_byte(silk_base64: str) -> bytes: - """将silk格式的base64字符串转换为WAV字节数据。 - - Args: - silk_base64 (str): silk格式的base64编码字符串 - - Returns: - bytes: WAV格式的字节数据 - """ - return await ToolMixin.silk_byte_to_byte_wav_byte(base64.b64decode(silk_base64)) diff --git a/wechat_ipad/client/user.py b/wechat_ipad/client/user.py deleted file mode 100644 index 91edf7d..0000000 --- a/wechat_ipad/client/user.py +++ /dev/null @@ -1,111 +0,0 @@ -import json - -import aiohttp - -from wechat_ipad import UserLoggedOut -from wechat_ipad.client.base import WechatAPIClientBase - -from loguru import logger - - -class UserMixin(WechatAPIClientBase): - async def get_profile(self, wxid: str = None) -> dict: - """获取用户信息。 - - Args: - wxid (str, optional): 用户wxid. Defaults to None. - - Returns: - dict: 用户信息字典 - - Raises: - UserLoggedOut: 未登录时调用 - 根据error_handler处理错误 - """ - if not self.wxid and not wxid: - raise UserLoggedOut("请先登录") - - if not wxid: - wxid = self.wxid - - async with aiohttp.ClientSession() as session: - response = await session.post(f'http://{self.ip}:{self.port}/api/User/GetContractProfile?wxid={wxid}') - json_resp = await response.json() - - if json_resp.get("Success"): - return json_resp.get("Data").get("userInfo") - else: - self.error_handler(json_resp) - - async def get_profile_info_ext(self, wxid: str = None) -> dict: - """获取用户扩展信息。 - - Args: - wxid (str, optional): 用户wxid. Defaults to None. - - Returns: - dict: 用户信息字典 - - Raises: - UserLoggedOut: 未登录时调用 - 根据error_handler处理错误 - """ - if not self.wxid and not wxid: - raise UserLoggedOut("请先登录") - - if not wxid: - wxid = self.wxid - - async with aiohttp.ClientSession() as session: - response = await session.post(f'http://{self.ip}:{self.port}/api/User/GetContractProfile?wxid={wxid}') - json_resp = await response.json() - - if json_resp.get("Success"): - return json_resp.get("Data").get("userInfoExt") - else: - self.error_handler(json_resp) - - async def get_my_qrcode(self, style: int = 0) -> str: - """获取个人二维码。 - - Args: - style (int, optional): 二维码样式. Defaults to 0. - - Returns: - str: 图片的base64编码字符串 - - Raises: - UserLoggedOut: 未登录时调用 - BanProtection: 风控保护: 新设备登录后4小时内请挂机 - 根据error_handler处理错误 - """ - if not self.wxid: - raise UserLoggedOut("请先登录") - - async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "Style": style} - response = await session.post(f'http://{self.ip}:{self.port}/GetMyQRCode', json=json_param) - json_resp = await response.json() - - if json_resp.get("Success"): - return json_resp.get("Data").get("qrcode").get("buffer") - else: - self.error_handler(json_resp) - - async def is_logged_in(self, wxid: str = None) -> bool: - """检查是否登录。 - - Args: - wxid (str, optional): 用户wxid. Defaults to None. - - Returns: - bool: 已登录返回True,未登录返回False - """ - if not wxid: - wxid = self.wxid - try: - await self.get_profile(wxid) - return True - except Exception as e: - logger.error("is_logged_in:{}", e) - return False