diff --git a/requirements.txt b/requirements.txt index 6a2f4ae..2936797 100644 --- a/requirements.txt +++ b/requirements.txt @@ -44,4 +44,9 @@ numpy~=1.26.4 gewechat-client==0.1.5 fastapi~=0.115.12 uvicorn~=0.34.2 -dacite~=1.9.2 \ No newline at end of file +dacite~=1.9.2 +qrcode~=7.4.2 +pysilk~=0.0.1 +pydub~=0.25.1 +pymediainfo~=7.0.1 +loguru~=0.7.3 \ No newline at end of file diff --git a/robot.py b/robot.py index c52b98d..f9f6160 100644 --- a/robot.py +++ b/robot.py @@ -58,7 +58,7 @@ class Robot(Job): self.LOG.info(f"数据库连接管理器初始化完成") # 为了兼容现有代码,保留原有的连接池 - self.db_pool = self.db_manager.mysql_pool + self.db_pool = self.db_manager .mysql_pool self.redis_pool = self.db_manager.redis_pool self.contacts_db = ContactsDBOperator(self.db_manager) diff --git a/wechat_ipad/__init__.py b/wechat_ipad/__init__.py new file mode 100644 index 0000000..cc98352 --- /dev/null +++ b/wechat_ipad/__init__.py @@ -0,0 +1,9 @@ + +# 只导入错误类 +from wechat_ipad.errors import * + +# 设置包名 +__name__ = "wechat_api" + +# 导出 WechatAPIClient 类,但避免循环导入 +from wechat_ipad.client import WechatAPIClient diff --git a/wechat_ipad/bot-core.py b/wechat_ipad/bot-core.py new file mode 100644 index 0000000..e30c286 --- /dev/null +++ b/wechat_ipad/bot-core.py @@ -0,0 +1,149 @@ +import asyncio +import time +import tomllib + +import toml + +import wechat_ipad +# 明确导入需要的类 +from loguru import logger + + +async def bot_core(): + # 读取 config.toml 文件 + with open("config.toml", "rb") as f: + config = tomllib.load(f) + logger.info("启动bot") + # 获取 server_url 内容 + server_url = config.get("server_url", "") + if server_url == "": + raise ValueError("server_url 不能为空") + server_ip = config.get("server_ip", "") + server_port = config.get("server_port", 8058) + # 调用登录接口 + bot = wechat_ipad.WechatAPIClient(server_ip, server_port) + wxid = config.get("wxid", "") + device_name = config.get("device_name", "") + device_id = config.get("device_id", "") + if device_name == "": + device_name = bot.create_device_name() + if device_id == "": + device_id = bot.create_device_id() + + if not await bot.is_logged_in(wxid): + while not await bot.is_logged_in(wxid): + # 需要登录 + try: + if await bot.get_cached_info(wxid): + # 尝试唤醒登录 + uuid = await bot.awaken_login(wxid) + logger.info("获取到登录uuid: {}", uuid) + else: + # 二维码登录 + if not device_name: + device_name = bot.create_device_name() + if not device_id: + device_id = bot.create_device_id() + uuid, url = await bot.get_qr_code(device_id=device_id, device_name=device_name, print_qr=True) + logger.info("获取到登录uuid: {}", uuid) + logger.info("获取到登录二维码: {}", url) + except: + # 二维码登录 + if not device_name: + device_name = bot.create_device_name() + if not device_id: + device_id = bot.create_device_id() + uuid, url = await bot.get_qr_code(device_id=device_id, device_name=device_name, print_qr=True) + logger.info("获取到登录uuid: {}", uuid) + logger.info("获取到登录二维码: {}", url) + + while True: + logger.info(f"uuid: {uuid}, url: {url}") + stat, data = await bot.check_login_uuid(uuid, device_id=device_id) + if stat: + break + logger.info("等待登录中,过期倒计时:{}", data) + await asyncio.sleep(5) + # 保存登录信息 + config["wxid"] = bot.wxid + config["device_name"] = device_name + config["device_id"] = device_id + with open("config.toml", "w", encoding="utf-8") as f: + toml.dump(config, f) + # 获取登录账号信息 + bot.wxid = data.get("acctSectResp").get("userName") + bot.nickname = data.get("acctSectResp").get("nickName") + bot.alias = data.get("acctSectResp").get("alias") + bot.phone = data.get("acctSectResp").get("bindMobile") + + logger.info("登录账号信息: wxid: {} 昵称: {} 微信号: {} 手机号: {}", bot.wxid, bot.nickname, bot.alias, + bot.phone) + break + else: # 已登录 + bot.wxid = wxid + profile = await bot.get_profile() + + bot.nickname = profile.get("NickName").get("string") + bot.alias = profile.get("Alias") + bot.phone = profile.get("BindMobile").get("string") + + logger.info("登录账号信息: wxid: {} 昵称: {} 微信号: {} 手机号: {}", bot.wxid, bot.nickname, bot.alias, + bot.phone) + + logger.info("登录设备信息: device_name: {} device_id: {}", device_name, device_id) + + logger.info("登录成功") + + # 开启自动心跳(作为后台任务) + async def heartbeat_task(): + logger.success("开启心跳!") + while True: + try: + success = await bot.heartbeat() + if not success: + logger.warning("心跳失败") + except Exception as e: + logger.error("heartbeat:{}", e) + await asyncio.sleep(5) + + # 创建心跳任务但不等待它完成 + asyncio.create_task(heartbeat_task()) # noqa + + # 先接受堆积消息 + logger.info("处理堆积消息中") + count = 0 + while True: + data = await bot.sync_message() + data = data.get("AddMsgs") + if not data: + if count > 2: + break + else: + count += 1 + continue + + logger.debug("接受到 {} 条消息", len(data)) + await asyncio.sleep(1) + logger.success("处理堆积消息完毕") + + logger.success("开始处理消息") + while True: + now = time.time() + + try: + data = await bot.sync_message() + except Exception as e: + logger.warning("获取新消息失败 {}", e) + await asyncio.sleep(5) + continue + + data = data.get("AddMsgs") + if data: + for message in data: + logger.info("message: {}".format(message)) + # 使用异步睡眠替代忙等待循环 + await asyncio.sleep(0.5) + + +if __name__ == '__main__': + asyncio.run(bot_core()) diff --git a/wechat_ipad/client/__init__.py b/wechat_ipad/client/__init__.py new file mode 100644 index 0000000..bfaa79d --- /dev/null +++ b/wechat_ipad/client/__init__.py @@ -0,0 +1,43 @@ +from wechat_ipad import UserLoggedOut +from wechat_ipad.client.firends import FriendMixin +from wechat_ipad.client.group import ChatroomMixin +from wechat_ipad.client.login import LoginMixin +from wechat_ipad.client.message import MessageMixin +from wechat_ipad.client.tools import ToolMixin +from wechat_ipad.client.user import UserMixin + + +class WechatAPIClient(LoginMixin, MessageMixin, FriendMixin, ChatroomMixin, UserMixin, + ToolMixin): + + # 这里都是需要结合多个功能的方法 + + async def send_at_message(self, wxid: str, content: str, at: list[str]) -> tuple[int, int, int]: + """发送@消息 + + Args: + wxid (str): 接收人 + content (str): 消息内容 + at (list[str]): 要@的用户ID列表 + + Returns: + tuple[int, int, int]: 包含以下三个值的元组: + - ClientMsgid (int): 客户端消息ID + - CreateTime (int): 创建时间 + - NewMsgId (int): 新消息ID + + Raises: + UserLoggedOut: 用户未登录时抛出 + BanProtection: 新设备登录4小时内操作时抛出 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + output = "" + for id in at: + nickname = await self.get_nickname(id) + output += f"@{nickname}\u2005" + + output += content + + return await self.send_text_message(wxid, output, at) diff --git a/wechat_ipad/client/base.py b/wechat_ipad/client/base.py new file mode 100644 index 0000000..c737af9 --- /dev/null +++ b/wechat_ipad/client/base.py @@ -0,0 +1,103 @@ +from dataclasses import dataclass + +from wechat_ipad.errors import * + + +@dataclass +class Proxy: + """代理(无效果,别用!) + + Args: + ip (str): 代理服务器IP地址 + port (int): 代理服务器端口 + username (str, optional): 代理认证用户名. 默认为空字符串 + password (str, optional): 代理认证密码. 默认为空字符串 + """ + ip: str + port: int + username: str = "" + password: str = "" + + +@dataclass +class Section: + """数据段配置类 + + Args: + data_len (int): 数据长度 + start_pos (int): 起始位置 + """ + data_len: int + start_pos: int + + +class WechatAPIClientBase: + """微信API客户端基类 + + Args: + ip (str): 服务器IP地址 + port (int): 服务器端口 + + Attributes: + wxid (str): 微信ID + nickname (str): 昵称 + alias (str): 别名 + phone (str): 手机号 + """ + def __init__(self, ip: str, port: int): + self.ip = ip + self.port = port + + self.wxid = "" + self.nickname = "" + self.alias = "" + self.phone = "" + + # 调用所有 Mixin 的初始化方法 + super().__init__() + + @staticmethod + def error_handler(json_resp): + """处理API响应中的错误码 + + Args: + json_resp (dict): API响应的JSON数据 + + Raises: + ValueError: 参数错误时抛出 + MarshallingError: 序列化错误时抛出 + UnmarshallingError: 反序列化错误时抛出 + MMTLSError: MMTLS初始化错误时抛出 + PacketError: 数据包长度错误时抛出 + UserLoggedOut: 用户已退出登录时抛出 + ParsePacketError: 解析数据包错误时抛出 + DatabaseError: 数据库错误时抛出 + Exception: 其他类型错误时抛出 + """ + code = json_resp.get("Code") + if code == -1: # 参数错误 + raise ValueError(json_resp.get("Message")) + elif code == -2: # 其他错误 + raise Exception(json_resp.get("Message")) + elif code == -3: # 序列化错误 + raise MarshallingError(json_resp.get("Message")) + elif code == -4: # 反序列化错误 + raise UnmarshallingError(json_resp.get("Message")) + elif code == -5: # MMTLS初始化错误 + raise MMTLSError(json_resp.get("Message")) + elif code == -6: # 收到的数据包长度错误 + raise PacketError(json_resp.get("Message")) + elif code == -7: # 已退出登录 + raise UserLoggedOut("Already logged out") + elif code == -8: # 链接过期 + raise Exception(json_resp.get("Message")) + elif code == -9: # 解析数据包错误 + raise ParsePacketError(json_resp.get("Message")) + elif code == -10: # 数据库错误 + raise DatabaseError(json_resp.get("Message")) + elif code == -11: # 登陆异常 + raise UserLoggedOut(json_resp.get("Message")) + elif code == -12: # 操作过于频繁 + raise Exception(json_resp.get("Message")) + elif code == -13: # 上传失败 + raise Exception(json_resp.get("Message")) diff --git a/wechat_ipad/client/firends.py b/wechat_ipad/client/firends.py new file mode 100644 index 0000000..3cdc913 --- /dev/null +++ b/wechat_ipad/client/firends.py @@ -0,0 +1,148 @@ +from typing import Union + +import aiohttp + +from wechat_ipad import UserLoggedOut +from wechat_ipad.client.base import WechatAPIClientBase + + +class FriendMixin(WechatAPIClientBase): + async def accept_friend(self, scene: int, v1: str, v2: str) -> bool: + """接受好友请求 + + 主动添加好友单天上限如下所示:1小时内上限为 5个,超过上限时,无法发出好友请求,也收不到好友请求。 + + - 新账号:5/天 + - 注册超过7天:10个/天 + - 注册满3个月&&近期登录过该电脑:15/天 + - 注册满6个月&&近期经常登录过该电脑:20/天 + - 注册满6个月&&近期频繁登陆过该电脑:30/天 + - 注册1年以上&&一直登录:50/天 + - 上一次通过好友到下一次通过间隔20-40s + - 收到加人申请,到通过好友申请(每天最多通过300个好友申请),间隔30s+(随机时间) + + Args: + scene: 来源 在消息的xml获取 + v1: v1key + v2: v2key + + Returns: + bool: 操作是否成功 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "Scene": scene, "V1": v1, "V2": v2} + response = await session.post(f'http://{self.ip}:{self.port}/api/Friend/PassVerify', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return True + else: + self.error_handler(json_resp) + + async def get_contact(self, wxid: Union[str, list[str]]) -> Union[dict, list[dict]]: + """获取联系人信息 + + Args: + wxid: 联系人wxid, 可以是多个wxid在list里,也可查询chatroom + + Returns: + Union[dict, list[dict]]: 单个联系人返回dict,多个联系人返回list[dict] + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + if isinstance(wxid, list): + wxid = ",".join(wxid) + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "RequestWxids": wxid} + response = await session.post(f'http://{self.ip}:{self.port}/GetContact', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + contact_list = json_resp.get("Data").get("ContactList") + if len(contact_list) == 1: + return contact_list[0] + else: + return contact_list + else: + self.error_handler(json_resp) + + async def get_contract_detail(self, wxid: Union[str, list[str]], chatroom: str = "") -> list: + """获取联系人详情 + + Args: + wxid: 联系人wxid + chatroom: 群聊wxid + + Returns: + list: 联系人详情列表 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + if isinstance(wxid, list): + if len(wxid) > 20: + raise ValueError("一次最多查询20个联系人") + wxid = ",".join(wxid) + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "RequestWxids": wxid, "Chatroom": chatroom} + response = await session.post(f'http://{self.ip}:{self.port}/api/Friend/GetContractDetail', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return json_resp.get("Data").get("ContactList") + else: + self.error_handler(json_resp) + + async def get_contract_list(self, wx_seq: int = 0, chatroom_seq: int = 0) -> dict: + """获取联系人列表 + + Args: + wx_seq: 联系人序列 + chatroom_seq: 群聊序列 + + Returns: + dict: 联系人列表数据 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "CurrentWxcontactSeq": wx_seq, "CurrentChatroomContactSeq": chatroom_seq} + response = await session.post(f'http://{self.ip}:{self.port}/api/Friend/GetContractList', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return json_resp.get("Data") + else: + self.error_handler(json_resp) + + async def get_nickname(self, wxid: Union[str, list[str]]) -> Union[str, list[str]]: + """获取用户昵称 + + Args: + wxid: 用户wxid,可以是单个wxid或最多20个wxid的列表 + + Returns: + Union[str, list[str]]: 如果输入单个wxid返回str,如果输入wxid列表则返回对应的昵称列表 + """ + data = await self.get_contract_detail(wxid) + + if isinstance(wxid, str): + try: + return data[0].get("NickName").get("string") + except: + return "" + else: + result = [] + for contact in data: + try: + result.append(contact.get("NickName").get("string")) + except: + result.append("") + return result diff --git a/wechat_ipad/client/group.py b/wechat_ipad/client/group.py new file mode 100644 index 0000000..46318c7 --- /dev/null +++ b/wechat_ipad/client/group.py @@ -0,0 +1,148 @@ +from typing import Union, Any + +import aiohttp + +from wechat_ipad.client.base import WechatAPIClientBase +from wechat_ipad.errors import UserLoggedOut + + +class ChatroomMixin(WechatAPIClientBase): + async def add_chatroom_member(self, chatroom: str, wxid: str) -> bool: + """添加群成员(群聊最多40人) + + Args: + chatroom: 群聊wxid + wxid: 要添加的wxid + + Returns: + bool: 成功返回True, 失败False或者报错 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "Chatroom": chatroom, "InviteWxids": wxid} + response = await session.post(f'http://{self.ip}:{self.port}/AddChatroomMember', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return True + else: + self.error_handler(json_resp) + + async def get_chatroom_announce(self, chatroom: str) -> dict: + """获取群聊公告 + + Args: + chatroom: 群聊id + + Returns: + dict: 群聊信息字典 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "Chatroom": chatroom} + response = await session.post(f'http://{self.ip}:{self.port}/GetChatroomInfo', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + data = dict(json_resp.get("Data")) + data.pop("BaseResponse") + return data + else: + self.error_handler(json_resp) + + async def get_chatroom_info(self, chatroom: str) -> dict: + """获取群聊信息 + + Args: + chatroom: 群聊id + + Returns: + dict: 群聊信息字典 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "Chatroom": chatroom} + response = await session.post(f'http://{self.ip}:{self.port}/GetChatroomInfoNoAnnounce', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return json_resp.get("Data").get("ContactList")[0] + else: + self.error_handler(json_resp) + + async def get_chatroom_member_list(self, chatroom: str) -> list[dict]: + """获取群聊成员列表 + + Args: + chatroom: 群聊id + + Returns: + list[dict]: 群聊成员列表 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "Chatroom": chatroom} + response = await session.post(f'http://{self.ip}:{self.port}/GetChatroomMemberDetail', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return json_resp.get("Data").get("NewChatroomData").get("ChatRoomMember") + else: + self.error_handler(json_resp) + + async def get_chatroom_qrcode(self, chatroom: str) -> dict[str, Any]: + """获取群聊二维码 + + Args: + chatroom: 群聊id + + Returns: + dict: {"base64": 二维码的base64, "description": 二维码描述} + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "Chatroom": chatroom} + response = await session.post(f'http://{self.ip}:{self.port}/GetChatroomQRCode', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + data = json_resp.get("Data") + return {"base64": data.get("qrcode").get("buffer"), "description": data.get("revokeQrcodeWording")} + else: + self.error_handler(json_resp) + + async def invite_chatroom_member(self, wxid: Union[str, list], chatroom: str) -> bool: + """邀请群聊成员(群聊大于40人) + + Args: + wxid: 要邀请的用户wxid或wxid列表 + chatroom: 群聊id + + Returns: + bool: 成功返回True, 失败False或者报错 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + if isinstance(wxid, list): + wxid = ",".join(wxid) + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "Chatroom": chatroom, "InviteWxids": wxid} + response = await session.post(f'http://{self.ip}:{self.port}/InviteChatroomMember', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return True + else: + self.error_handler(json_resp) diff --git a/wechat_ipad/client/login.py b/wechat_ipad/client/login.py new file mode 100644 index 0000000..dbbc631 --- /dev/null +++ b/wechat_ipad/client/login.py @@ -0,0 +1,230 @@ +import hashlib +import string +from random import choice +from typing import Union + +import aiohttp +import qrcode + +from wechat_ipad.client.base import * + +from loguru import logger + + +class LoginMixin(WechatAPIClientBase): + + async def get_qr_code(self, device_name: str, device_id: str = "", proxy: Proxy = None, print_qr: bool = False) -> ( + str, str): + """获取登录二维码。 + + Args: + device_name (str): 设备名称 + device_id (str, optional): 设备ID. Defaults to "". + proxy (Proxy, optional): 代理信息. Defaults to None. + print_qr (bool, optional): 是否在控制台打印二维码. Defaults to False. + + Returns: + tuple[str, str]: 返回登录二维码的UUID和URL + + Raises: + 根据error_handler处理错误 + """ + async with aiohttp.ClientSession() as session: + json_param = {'DeviceName': device_name, 'DeviceID': device_id} + if proxy: + json_param['ProxyInfo'] = {'ProxyIp': f'{proxy.ip}:{proxy.port}', + 'ProxyPassword': proxy.password, + 'ProxyUser': proxy.username} + + response = await session.post(f'http://{self.ip}:{self.port}/api/Login/GetQR', json=json_param) + json_resp = await response.json() + logger.info("get_qr_code:{}", json_resp) + if json_resp.get("Success"): + + if print_qr: + qr = qrcode.QRCode( + version=1, + error_correction=qrcode.constants.ERROR_CORRECT_L, + box_size=10, + border=4, + ) + qr.add_data(f'http://weixin.qq.com/x/{json_resp.get("Data").get("Uuid")}') + qr.make(fit=True) + qr.print_ascii() + + return json_resp.get("Data").get("Uuid"), json_resp.get("Data").get("QrUrl") + else: + self.error_handler(json_resp) + + async def check_login_uuid(self, uuid: str, device_id: str = "") -> tuple[bool, Union[dict, int]]: + """检查登录的UUID状态。 + + Args: + uuid (str): 登录的UUID + device_id (str, optional): 设备ID. Defaults to "". + + Returns: + tuple[bool, Union[dict, int]]: 如果登录成功返回(True, 用户信息),否则返回(False, 过期时间) + + Raises: + 根据error_handler处理错误 + """ + async with aiohttp.ClientSession() as session: + response = await session.post(f'http://{self.ip}:{self.port}/api/Login/CheckQR?uuid={uuid}') + json_resp = await response.json() + logger.info("check_login_uuid:{}", json_resp) + if json_resp.get("Success"): + if json_resp.get("Message") == "登陆成功": + self.wxid = json_resp.get("Data").get("acctSectResp").get("userName") + self.nickname = json_resp.get("Data").get("acctSectResp").get("nickName") + return True, json_resp.get("Data") + else: + return False, json_resp.get("Data").get("expiredTime") + else: + self.error_handler(json_resp) + + async def log_out(self) -> bool: + """登出当前账号。 + + Returns: + bool: 登出成功返回True,否则返回False + + Raises: + UserLoggedOut: 如果未登录时调用 + 根据error_handler处理错误 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid} + response = await session.post(f'http://{self.ip}:{self.port}/Logout', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return True + elif json_resp.get("Success"): + return False + else: + self.error_handler(json_resp) + + async def awaken_login(self, wxid: str = "") -> str: + """唤醒登录。 + + Args: + wxid (str, optional): 要唤醒的微信ID. Defaults to "". + + Returns: + str: 返回新的登录UUID + + Raises: + Exception: 如果未提供wxid且未登录 + LoginError: 如果无法获取UUID + 根据error_handler处理错误 + """ + if not wxid and not self.wxid: + raise Exception("Please login using QRCode first") + + if not wxid and self.wxid: + wxid = self.wxid + + async with aiohttp.ClientSession() as session: + response = await session.post(f'http://{self.ip}:{self.port}/api/Login/Awaken?wxid={wxid}') + json_resp = await response.json() + + if json_resp.get("Success") and json_resp.get("Data").get("QrCodeResponse").get("Uuid"): + return json_resp.get("Data").get("QrCodeResponse").get("Uuid") + elif not json_resp.get("Data").get("QrCodeResponse").get("Uuid"): + raise LoginError("Please login using QRCode first") + else: + self.error_handler(json_resp) + + async def get_cached_info(self, wxid: str = None) -> dict: + """获取登录缓存信息。 + + Args: + wxid (str, optional): 要查询的微信ID. Defaults to None. + + Returns: + dict: 返回缓存信息,如果未提供wxid且未登录返回空字典 + """ + if not wxid: + wxid = self.wxid + + if not wxid: + return {} + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": wxid} + response = await session.post(f'http://{self.ip}:{self.port}/api/Login/GetCacheInfo', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return json_resp.get("Data") + else: + return {} + + async def heartbeat(self) -> bool: + """发送心跳包。 + + Returns: + bool: 成功返回True,否则返回False + + Raises: + UserLoggedOut: 如果未登录时调用 + 根据error_handler处理错误 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + response = await session.post(f'http://{self.ip}:{self.port}/api/Login/HeartBeat?wxid={self.wxid}') + json_resp = await response.json() + if json_resp.get("Success"): + return True + else: + self.error_handler(json_resp) + + @staticmethod + def create_device_name() -> str: + """生成一个随机的设备名。 + + Returns: + str: 返回生成的设备名 + """ + first_names = [ + "Oliver", "Emma", "Liam", "Ava", "Noah", "Sophia", "Elijah", "Isabella", + "James", "Mia", "William", "Amelia", "Benjamin", "Harper", "Lucas", "Evelyn", + "Henry", "Abigail", "Alexander", "Ella", "Jackson", "Scarlett", "Sebastian", + "Grace", "Aiden", "Chloe", "Matthew", "Zoey", "Samuel", "Lily", "David", + "Aria", "Joseph", "Riley", "Carter", "Nora", "Owen", "Luna", "Daniel", + "Sofia", "Gabriel", "Ellie", "Matthew", "Avery", "Isaac", "Mila", "Leo", + "Julian", "Layla" + ] + + last_names = [ + "Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis", + "Rodriguez", "Martinez", "Hernandez", "Lopez", "Gonzalez", "Wilson", "Anderson", + "Thomas", "Taylor", "Moore", "Jackson", "Martin", "Lee", "Perez", "Thompson", + "White", "Harris", "Sanchez", "Clark", "Ramirez", "Lewis", "Robinson", "Walker", + "Young", "Allen", "King", "Wright", "Scott", "Torres", "Nguyen", "Hill", + "Flores", "Green", "Adams", "Nelson", "Baker", "Hall", "Rivera", "Campbell", + "Mitchell", "Carter", "Roberts", "Gomez", "Phillips", "Evans" + ] + + return choice(first_names) + " " + choice(last_names) + "'s Pad" + + @staticmethod + def create_device_id(s: str = "") -> str: + """生成设备ID。 + + Args: + s (str, optional): 用于生成ID的字符串. Defaults to "". + + Returns: + str: 返回生成的设备ID + """ + if s == "" or s == "string": + s = ''.join(choice(string.ascii_letters) for _ in range(15)) + md5_hash = hashlib.md5(s.encode()).hexdigest() + return "49" + md5_hash[2:] diff --git a/wechat_ipad/client/message.py b/wechat_ipad/client/message.py new file mode 100644 index 0000000..16c9f75 --- /dev/null +++ b/wechat_ipad/client/message.py @@ -0,0 +1,627 @@ +import asyncio +import base64 +import os +from asyncio import Future +from asyncio import Queue, sleep +from io import BytesIO +from pathlib import Path +from typing import Union + +import aiohttp +import logging +from pymediainfo import MediaInfo + +import pysilk +from pydub import AudioSegment + +from wechat_ipad import UserLoggedOut +from wechat_ipad.client.base import WechatAPIClientBase + + +class MessageMixin(WechatAPIClientBase): + def __init__(self, ip: str, port: int): + # 初始化消息队列 + super().__init__(ip, port) + self._message_queue = Queue() + self._is_processing = False + + async def _process_message_queue(self): + """ + 处理消息队列的异步方法 + """ + if self._is_processing: + return + + self._is_processing = True + while True: + if self._message_queue.empty(): + self._is_processing = False + break + + func, args, kwargs, future = await self._message_queue.get() + try: + result = await func(*args, **kwargs) + future.set_result(result) + except Exception as e: + future.set_exception(e) + finally: + self._message_queue.task_done() + await sleep(1) # 消息发送间隔1秒 + + async def _queue_message(self, func, *args, **kwargs): + """ + 将消息添加到队列 + """ + future = Future() + await self._message_queue.put((func, args, kwargs, future)) + + if not self._is_processing: + asyncio.create_task(self._process_message_queue()) + + return await future + + async def revoke_message(self, wxid: str, client_msg_id: int, create_time: int, new_msg_id: int) -> bool: + """撤回消息。 + + Args: + wxid (str): 接收人wxid + client_msg_id (int): 发送消息的返回值 + create_time (int): 发送消息的返回值 + new_msg_id (int): 发送消息的返回值 + + Returns: + bool: 成功返回True,失败返回False + + Raises: + UserLoggedOut: 未登录时调用 + BanProtection: 登录新设备后4小时内操作 + 根据error_handler处理错误 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "ToWxid": wxid, "ClientMsgId": client_msg_id, "CreateTime": create_time, + "NewMsgId": new_msg_id} + response = await session.post(f'http://{self.ip}:{self.port}/RevokeMsg', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + logging.info("消息撤回成功: 对方wxid:{} ClientMsgId:{} CreateTime:{} NewMsgId:{}", + wxid, + client_msg_id, + new_msg_id) + return True + else: + self.error_handler(json_resp) + + async def send_text_message(self, wxid: str, content: str, at: Union[list, str] = "") -> tuple[int, int, int]: + """发送文本消息。 + + Args: + wxid (str): 接收人wxid + content (str): 消息内容 + at (list, str, optional): 要@的用户 + + Returns: + tuple[int, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId) + + Raises: + UserLoggedOut: 未登录时调用 + BanProtection: 登录新设备后4小时内操作 + 根据error_handler处理错误 + """ + return await self._queue_message(self._send_text_message, wxid, content, at) + + async def _send_text_message(self, wxid: str, content: str, at: list[str] = None) -> tuple[int, int, int]: + """ + 实际发送文本消息的方法 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + if isinstance(at, str): + at_str = at + elif isinstance(at, list): + if at is None: + at = [] + at_str = ",".join(at) + else: + raise ValueError("Argument 'at' should be str or list") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": content, "Type": 1, "At": at_str} + response = await session.post(f'http://{self.ip}:{self.port}/SendTextMsg', json=json_param) + json_resp = await response.json() + if json_resp.get("Success"): + logging.info("发送文字消息: 对方wxid:{} at:{} 内容:{}", wxid, at, content) + data = json_resp.get("Data") + return data.get("List")[0].get("ClientMsgid"), data.get("List")[0].get("Createtime"), data.get("List")[ + 0].get("NewMsgId") + else: + self.error_handler(json_resp) + + async def send_image_message(self, wxid: str, image: Union[str, bytes, os.PathLike]) -> tuple[int, int, int]: + """发送图片消息。 + + Args: + wxid (str): 接收人wxid + image (str, byte, os.PathLike): 图片,支持base64字符串,图片byte,图片路径 + + Returns: + tuple[int, int, int]: 返回(ClientImgId, CreateTime, NewMsgId) + + Raises: + UserLoggedOut: 未登录时调用 + BanProtection: 登录新设备后4小时内操作 + ValueError: image_path和image_base64都为空或都不为空时 + 根据error_handler处理错误 + """ + return await self._queue_message(self._send_image_message, wxid, image) + + async def _send_image_message(self, wxid: str, image: Union[str, bytes, os.PathLike]) -> tuple[ + int, int, int]: + if not self.wxid: + raise UserLoggedOut("请先登录") + + + if isinstance(image, str): + pass + elif isinstance(image, bytes): + image = base64.b64encode(image).decode() + elif isinstance(image, os.PathLike): + with open(image, 'rb') as f: + image = base64.b64encode(f.read()).decode() + else: + raise ValueError("Argument 'image' can only be str, bytes, or os.PathLike") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Base64": image} + response = await session.post(f'http://{self.ip}:{self.port}/SendImageMsg', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + json_param.pop('Base64') + logging.info("发送图片消息: 对方wxid:{} 图片base64略", wxid) + data = json_resp.get("Data") + return data.get("ClientImgId").get("string"), data.get("CreateTime"), data.get("Newmsgid") + else: + self.error_handler(json_resp) + + async def send_video_message(self, wxid: str, video: Union[str, bytes, os.PathLike], + image: [str, bytes, os.PathLike] = None): + """发送视频消息。不推荐使用,上传速度很慢300KB/s。如要使用,可压缩视频,或者发送链接卡片而不是视频。 + + Args: + wxid (str): 接收人wxid + video (str, bytes, os.PathLike): 视频 接受base64字符串,字节,文件路径 + image (str, bytes, os.PathLike): 视频封面图片 接受base64字符串,字节,文件路径 + + Returns: + tuple[int, int]: 返回(ClientMsgid, NewMsgId) + + Raises: + UserLoggedOut: 未登录时调用 + BanProtection: 登录新设备后4小时内操作 + ValueError: 视频或图片参数都为空或都不为空时 + 根据error_handler处理错误 + """ + if not image: + image = Path(os.path.join(Path(__file__).resolve().parent, "fallback.png")) + # get video base64 and duration + if isinstance(video, str): + vid_base64 = video + video = base64.b64decode(video) + file_len = len(video) + media_info = MediaInfo.parse(BytesIO(video)) + elif isinstance(video, bytes): + vid_base64 = base64.b64encode(video).decode() + file_len = len(video) + media_info = MediaInfo.parse(BytesIO(video)) + elif isinstance(video, os.PathLike): + with open(video, "rb") as f: + file_len = len(f.read()) + vid_base64 = base64.b64encode(f.read()).decode() + media_info = MediaInfo.parse(video) + else: + raise ValueError("video should be str, bytes, or path") + duration = media_info.tracks[0].duration + + # get image base64 + if isinstance(image, str): + image_base64 = image + elif isinstance(image, bytes): + image_base64 = base64.b64encode(image).decode() + elif isinstance(image, os.PathLike): + with open(image, "rb") as f: + image_base64 = base64.b64encode(f.read()).decode() + else: + raise ValueError("image should be str, bytes, or path") + + # 打印预估时间,300KB/s + predict_time = int(file_len / 1024 / 300) + logging.info("开始发送视频: 对方wxid:{} 视频base64略 图片base64略 预计耗时:{}秒", wxid, predict_time) + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Base64": vid_base64, "ImageBase64": image_base64, + "PlayLength": duration} + async with session.post(f'http://{self.ip}:{self.port}/SendVideoMsg', json=json_param) as resp: + json_resp = await resp.json() + + if json_resp.get("Success"): + json_param.pop('Base64') + json_param.pop('ImageBase64') + logging.info("发送视频成功: 对方wxid:{} 时长:{} 视频base64略 图片base64略", wxid, duration) + data = json_resp.get("Data") + return data.get("clientMsgId"), data.get("newMsgId") + else: + self.error_handler(json_resp) + + async def send_voice_message(self, wxid: str, voice: Union[str, bytes, os.PathLike], format: str = "amr") -> \ + tuple[int, int, int]: + """发送语音消息。 + + Args: + wxid (str): 接收人wxid + voice (str, bytes, os.PathLike): 语音 接受base64字符串,字节,文件路径 + format (str, optional): 语音格式,支持amr/wav/mp3. Defaults to "amr". + + Returns: + tuple[int, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId) + + Raises: + UserLoggedOut: 未登录时调用 + BanProtection: 登录新设备后4小时内操作 + ValueError: voice_path和voice_base64都为空或都不为空时,或format不支持时 + 根据error_handler处理错误 + """ + return await self._queue_message(self._send_voice_message, wxid, voice, format) + + async def _send_voice_message(self, wxid: str, voice: Union[str, bytes, os.PathLike], format: str = "amr") -> \ + tuple[int, int, int]: + if not self.wxid: + raise UserLoggedOut("请先登录") + + elif format not in ["amr", "wav", "mp3"]: + raise ValueError("format must be one of amr, wav, mp3") + + # read voice to byte + if isinstance(voice, str): + voice_byte = base64.b64decode(voice) + elif isinstance(voice, bytes): + voice_byte = voice + elif isinstance(voice, os.PathLike): + with open(voice, "rb") as f: + voice_byte = f.read() + else: + raise ValueError("voice should be str, bytes, or path") + + # get voice duration and b64 + if format.lower() == "amr": + audio = AudioSegment.from_file(BytesIO(voice_byte), format="amr") + voice_base64 = base64.b64encode(voice_byte).decode() + elif format.lower() == "wav": + audio = AudioSegment.from_file(BytesIO(voice_byte), format="wav").set_channels(1) + audio = audio.set_frame_rate(self._get_closest_frame_rate(audio.frame_rate)) + voice_base64 = base64.b64encode( + await pysilk.async_encode(audio.raw_data, sample_rate=audio.frame_rate)).decode() + elif format.lower() == "mp3": + audio = AudioSegment.from_file(BytesIO(voice_byte), format="mp3").set_channels(1) + audio = audio.set_frame_rate(self._get_closest_frame_rate(audio.frame_rate)) + voice_base64 = base64.b64encode( + await pysilk.async_encode(audio.raw_data, sample_rate=audio.frame_rate)).decode() + else: + raise ValueError("format must be one of amr, wav, mp3") + + duration = len(audio) + + format_dict = {"amr": 0, "wav": 4, "mp3": 4} + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Base64": voice_base64, "VoiceTime": duration, + "Type": format_dict[format]} + response = await session.post(f'http://{self.ip}:{self.port}/SendVoiceMsg', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + json_param.pop('Base64') + logging.info("发送语音消息: 对方wxid:{} 时长:{} 格式:{} 音频base64略", wxid, duration, format) + data = json_resp.get("Data") + return int(data.get("ClientMsgId")), data.get("CreateTime"), data.get("NewMsgId") + else: + self.error_handler(json_resp) + + @staticmethod + def _get_closest_frame_rate(frame_rate: int) -> int: + supported = [8000, 12000, 16000, 24000] + closest_rate = None + smallest_diff = float('inf') + for num in supported: + diff = abs(frame_rate - num) + if diff < smallest_diff: + smallest_diff = diff + closest_rate = num + + return closest_rate + + async def send_link_message(self, wxid: str, url: str, title: str = "", description: str = "", + thumb_url: str = "") -> tuple[str, int, int]: + """发送链接消息。 + + Args: + wxid (str): 接收人wxid + url (str): 跳转链接 + title (str, optional): 标题. Defaults to "". + description (str, optional): 描述. Defaults to "". + thumb_url (str, optional): 缩略图链接. Defaults to "". + + Returns: + tuple[str, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId) + + Raises: + UserLoggedOut: 未登录时调用 + BanProtection: 登录新设备后4小时内操作 + 根据error_handler处理错误 + """ + return await self._queue_message(self._send_link_message, wxid, url, title, description, thumb_url) + + async def _send_link_message(self, wxid: str, url: str, title: str = "", description: str = "", + thumb_url: str = "") -> tuple[int, int, int]: + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Url": url, "Title": title, "Desc": description, + "ThumbUrl": thumb_url} + response = await session.post(f'http://{self.ip}:{self.port}/SendShareLink', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + logging.info("发送链接消息: 对方wxid:{} 链接:{} 标题:{} 描述:{} 缩略图链接:{}", + wxid, + url, + title, + description, + thumb_url) + data = json_resp.get("Data") + return data.get("clientMsgId"), data.get("createTime"), data.get("newMsgId") + else: + self.error_handler(json_resp) + + async def send_emoji_message(self, wxid: str, md5: str, total_length: int) -> list[dict]: + """发送表情消息。 + + Args: + wxid (str): 接收人wxid + md5 (str): 表情md5值 + total_length (int): 表情总长度 + + Returns: + list[dict]: 返回表情项列表(list of emojiItem) + + Raises: + UserLoggedOut: 未登录时调用 + BanProtection: 登录新设备后4小时内操作 + 根据error_handler处理错误 + """ + return await self._queue_message(self._send_emoji_message, wxid, md5, total_length) + + async def _send_emoji_message(self, wxid: str, md5: str, total_length: int) -> tuple[int, int, int]: + if not self.wxid: + raise UserLoggedOut("请先登录") + + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Md5": md5, "TotalLen": total_length} + response = await session.post(f'http://{self.ip}:{self.port}/SendEmojiMsg', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + logging.info("发送表情消息: 对方wxid:{} md5:{} 总长度:{}", wxid, md5, total_length) + return json_resp.get("Data").get("emojiItem") + else: + self.error_handler(json_resp) + + async def send_card_message(self, wxid: str, card_wxid: str, card_nickname: str, card_alias: str = "") -> tuple[ + int, int, int]: + """发送名片消息。 + + Args: + wxid (str): 接收人wxid + card_wxid (str): 名片用户的wxid + card_nickname (str): 名片用户的昵称 + card_alias (str, optional): 名片用户的备注. Defaults to "". + + Returns: + tuple[int, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId) + + Raises: + UserLoggedOut: 未登录时调用 + BanProtection: 登录新设备后4小时内操作 + 根据error_handler处理错误 + """ + return await self._queue_message(self._send_card_message, wxid, card_wxid, card_nickname, card_alias) + + async def _send_card_message(self, wxid: str, card_wxid: str, card_nickname: str, card_alias: str = "") -> tuple[ + int, int, int]: + if not self.wxid: + raise UserLoggedOut("请先登录") + + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "ToWxid": wxid, "CardWxid": card_wxid, "CardAlias": card_alias, + "CardNickname": card_nickname} + response = await session.post(f'http://{self.ip}:{self.port}/SendCardMsg', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + logging.info("发送名片消息: 对方wxid:{} 名片wxid:{} 名片备注:{} 名片昵称:{}", wxid, + card_wxid, + card_alias, + card_nickname) + data = json_resp.get("Data") + return data.get("List")[0].get("ClientMsgid"), data.get("List")[0].get("Createtime"), data.get("List")[ + 0].get("NewMsgId") + else: + self.error_handler(json_resp) + + async def send_app_message(self, wxid: str, xml: str, type: int) -> tuple[str, int, int]: + """发送应用消息。 + + Args: + wxid (str): 接收人wxid + xml (str): 应用消息的xml内容 + type (int): 应用消息类型 + + Returns: + tuple[str, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId) + + Raises: + UserLoggedOut: 未登录时调用 + BanProtection: 登录新设备后4小时内操作 + 根据error_handler处理错误 + """ + return await self._queue_message(self._send_app_message, wxid, xml, type) + + async def _send_app_message(self, wxid: str, xml: str, type: int) -> tuple[int, int, int]: + if not self.wxid: + raise UserLoggedOut("请先登录") + + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Xml": xml, "Type": type} + response = await session.post(f'http://{self.ip}:{self.port}/SendAppMsg', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + json_param["Xml"] = json_param["Xml"].replace("\n", "") + logging.info("发送app消息: 对方wxid:{} 类型:{} xml:{}", wxid, type, json_param["Xml"]) + return json_resp.get("Data").get("clientMsgId"), json_resp.get("Data").get( + "createTime"), json_resp.get("Data").get("newMsgId") + else: + self.error_handler(json_resp) + + async def send_cdn_file_msg(self, wxid: str, xml: str) -> tuple[str, int, int]: + """转发文件消息。 + + Args: + wxid (str): 接收人wxid + xml (str): 要转发的文件消息xml内容 + + Returns: + tuple[str, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId) + + Raises: + UserLoggedOut: 未登录时调用 + BanProtection: 登录新设备后4小时内操作 + 根据error_handler处理错误 + """ + return await self._queue_message(self._send_cdn_file_msg, wxid, xml) + + async def _send_cdn_file_msg(self, wxid: str, xml: str) -> tuple[int, int, int]: + if not self.wxid: + raise UserLoggedOut("请先登录") + + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": xml} + response = await session.post(f'http://{self.ip}:{self.port}/SendCDNFileMsg', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + logging.info("转发文件消息: 对方wxid:{} xml:{}", wxid, xml) + data = json_resp.get("Data") + return data.get("clientMsgId"), data.get("createTime"), data.get("newMsgId") + else: + self.error_handler(json_resp) + + async def send_cdn_img_msg(self, wxid: str, xml: str) -> tuple[str, int, int]: + """转发图片消息。 + + Args: + wxid (str): 接收人wxid + xml (str): 要转发的图片消息xml内容 + + Returns: + tuple[str, int, int]: 返回(ClientImgId, CreateTime, NewMsgId) + + Raises: + UserLoggedOut: 未登录时调用 + BanProtection: 登录新设备后4小时内操作 + 根据error_handler处理错误 + """ + return await self._queue_message(self._send_cdn_img_msg, wxid, xml) + + async def _send_cdn_img_msg(self, wxid: str, xml: str) -> tuple[int, int, int]: + if not self.wxid: + raise UserLoggedOut("请先登录") + + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": xml} + response = await session.post(f'http://{self.ip}:{self.port}/SendCDNImgMsg', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + logging.info("转发图片消息: 对方wxid:{} xml:{}", wxid, xml) + data = json_resp.get("Data") + return data.get("ClientImgId").get("string"), data.get("CreateTime"), data.get("Newmsgid") + else: + self.error_handler(json_resp) + + async def send_cdn_video_msg(self, wxid: str, xml: str) -> tuple[str, int]: + """转发视频消息。 + + Args: + wxid (str): 接收人wxid + xml (str): 要转发的视频消息xml内容 + + Returns: + tuple[str, int]: 返回(ClientMsgid, NewMsgId) + + Raises: + UserLoggedOut: 未登录时调用 + BanProtection: 登录新设备后4小时内操作 + 根据error_handler处理错误 + """ + return await self._queue_message(self._send_cdn_video_msg, wxid, xml) + + async def _send_cdn_video_msg(self, wxid: str, xml: str) -> tuple[int, int]: + if not self.wxid: + raise UserLoggedOut("请先登录") + + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": xml} + response = await session.post(f'http://{self.ip}:{self.port}/SendCDNVideoMsg', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + logging.info("转发视频消息: 对方wxid:{} xml:{}", wxid, xml) + data = json_resp.get("Data") + return data.get("clientMsgId"), data.get("newMsgId") + else: + self.error_handler(json_resp) + + async def sync_message(self) -> dict: + """同步消息。 + + Returns: + dict: 返回同步到的消息数据 + + Raises: + UserLoggedOut: 未登录时调用 + 根据error_handler处理错误 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session: + json_param = {"Wxid": self.wxid, "Scene": 0, "Synckey": ""} + response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/Sync', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return json_resp.get("Data") + else: + self.error_handler(json_resp) diff --git a/wechat_ipad/client/tools.py b/wechat_ipad/client/tools.py new file mode 100644 index 0000000..df55d3b --- /dev/null +++ b/wechat_ipad/client/tools.py @@ -0,0 +1,357 @@ +import base64 +import io +import os + +import aiohttp +import pysilk +from pydub import AudioSegment + +from wechat_ipad import UserLoggedOut +from wechat_ipad.client.base import WechatAPIClientBase, Proxy + + +class ToolMixin(WechatAPIClientBase): + async def download_image(self, aeskey: str, cdnmidimgurl: str) -> str: + """CDN下载高清图片。 + + Args: + aeskey (str): 图片的AES密钥 + cdnmidimgurl (str): 图片的CDN URL + + Returns: + str: 图片的base64编码字符串 + + Raises: + UserLoggedOut: 未登录时调用 + 根据error_handler处理错误 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "AesKey": aeskey, "Cdnmidimgurl": cdnmidimgurl} + response = await session.post(f'http://{self.ip}:{self.port}/CdnDownloadImg', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return json_resp.get("Data") + else: + self.error_handler(json_resp) + + async def download_voice(self, msg_id: str, voiceurl: str, length: int) -> str: + """下载语音文件。 + + Args: + msg_id (str): 消息的msgid + voiceurl (str): 语音的url,从xml获取 + length (int): 语音长度,从xml获取 + + Returns: + str: 语音的base64编码字符串 + + Raises: + UserLoggedOut: 未登录时调用 + 根据error_handler处理错误 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "MsgId": msg_id, "Voiceurl": voiceurl, "Length": length} + response = await session.post(f'http://{self.ip}:{self.port}/DownloadVoice', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return json_resp.get("Data").get("data").get("buffer") + else: + self.error_handler(json_resp) + + async def download_attach(self, attach_id: str) -> dict: + """下载附件。 + + Args: + attach_id (str): 附件ID + + Returns: + dict: 附件数据 + + Raises: + UserLoggedOut: 未登录时调用 + 根据error_handler处理错误 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "AttachId": attach_id} + response = await session.post(f'http://{self.ip}:{self.port}/DownloadAttach', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return json_resp.get("Data").get("data").get("buffer") + else: + self.error_handler(json_resp) + + async def download_video(self, msg_id) -> str: + """下载视频。 + + Args: + msg_id (str): 消息的msg_id + + Returns: + str: 视频的base64编码字符串 + + Raises: + UserLoggedOut: 未登录时调用 + 根据error_handler处理错误 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "MsgId": msg_id} + response = await session.post(f'http://{self.ip}:{self.port}/DownloadVideo', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return json_resp.get("Data").get("data").get("buffer") + else: + self.error_handler(json_resp) + + async def set_step(self, count: int) -> bool: + """设置步数。 + + Args: + count (int): 要设置的步数 + + Returns: + bool: 成功返回True,失败返回False + + Raises: + UserLoggedOut: 未登录时调用 + BanProtection: 风控保护: 新设备登录后4小时内请挂机 + 根据error_handler处理错误 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "StepCount": count} + response = await session.post(f'http://{self.ip}:{self.port}/SetStep', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return True + else: + self.error_handler(json_resp) + + async def set_proxy(self, proxy: Proxy) -> bool: + """设置代理。 + + Args: + proxy (Proxy): 代理配置对象 + + Returns: + bool: 成功返回True,失败返回False + + Raises: + UserLoggedOut: 未登录时调用 + 根据error_handler处理错误 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, + "Proxy": {"ProxyIp": f"{proxy.ip}:{proxy.port}", + "ProxyUser": proxy.username, + "ProxyPassword": proxy.password}} + response = await session.post(f'http://{self.ip}:{self.port}/SetProxy', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return True + else: + self.error_handler(json_resp) + + async def check_database(self) -> bool: + """检查数据库状态。 + + Returns: + bool: 数据库正常返回True,否则返回False + """ + async with aiohttp.ClientSession() as session: + response = await session.get(f'http://{self.ip}:{self.port}/CheckDatabaseOK') + json_resp = await response.json() + + if json_resp.get("Running"): + return True + else: + return False + + @staticmethod + def base64_to_file(base64_str: str, file_name: str, file_path: str) -> bool: + """将base64字符串转换为文件并保存。 + + Args: + base64_str (str): base64编码的字符串 + file_name (str): 要保存的文件名 + file_path (str): 文件保存路径 + + Returns: + bool: 转换成功返回True,失败返回False + """ + try: + os.makedirs(file_path, exist_ok=True) + + # 拼接完整的文件路径 + full_path = os.path.join(file_path, file_name) + + # 移除可能存在的 base64 头部信息 + if ',' in base64_str: + base64_str = base64_str.split(',')[1] + + # 解码 base64 并写入文件 + with open(full_path, 'wb') as f: + f.write(base64.b64decode(base64_str)) + + return True + + except Exception as e: + return False + + @staticmethod + def file_to_base64(file_path: str) -> str: + """将文件转换为base64字符串。 + + Args: + file_path (str): 文件路径 + + Returns: + str: base64编码的字符串 + """ + with open(file_path, 'rb') as f: + return base64.b64encode(f.read()).decode() + + @staticmethod + def base64_to_byte(base64_str: str) -> bytes: + """将base64字符串转换为bytes。 + + Args: + base64_str (str): base64编码的字符串 + + Returns: + bytes: 解码后的字节数据 + """ + # 移除可能存在的 base64 头部信息 + if ',' in base64_str: + base64_str = base64_str.split(',')[1] + + return base64.b64decode(base64_str) + + @staticmethod + def byte_to_base64(byte: bytes) -> str: + """将bytes转换为base64字符串。 + + Args: + byte (bytes): 字节数据 + + Returns: + str: base64编码的字符串 + """ + return base64.b64encode(byte).decode("utf-8") + + @staticmethod + async def silk_byte_to_byte_wav_byte(silk_byte: bytes) -> bytes: + """将silk字节转换为wav字节。 + + Args: + silk_byte (bytes): silk格式的字节数据 + + Returns: + bytes: wav格式的字节数据 + """ + return await pysilk.async_decode(silk_byte, to_wav=True) + + @staticmethod + def wav_byte_to_amr_byte(wav_byte: bytes) -> bytes: + """将WAV字节数据转换为AMR格式。 + + Args: + wav_byte (bytes): WAV格式的字节数据 + + Returns: + bytes: AMR格式的字节数据 + + Raises: + Exception: 转换失败时抛出异常 + """ + try: + # 从字节数据创建 AudioSegment 对象 + audio = AudioSegment.from_wav(io.BytesIO(wav_byte)) + + # 设置 AMR 编码的标准参数 + audio = audio.set_frame_rate(8000).set_channels(1) + + # 创建一个字节缓冲区来存储 AMR 数据 + output = io.BytesIO() + + # 导出为 AMR 格式 + audio.export(output, format="amr") + + # 获取字节数据 + return output.getvalue() + + except Exception as e: + raise Exception(f"转换WAV到AMR失败: {str(e)}") + + @staticmethod + def wav_byte_to_amr_base64(wav_byte: bytes) -> str: + """将WAV字节数据转换为AMR格式的base64字符串。 + + Args: + wav_byte (bytes): WAV格式的字节数据 + + Returns: + str: AMR格式的base64编码字符串 + """ + return base64.b64encode(ToolMixin.wav_byte_to_amr_byte(wav_byte)).decode() + + @staticmethod + async def wav_byte_to_silk_byte(wav_byte: bytes) -> bytes: + """将WAV字节数据转换为silk格式。 + + Args: + wav_byte (bytes): WAV格式的字节数据 + + Returns: + bytes: silk格式的字节数据 + """ + # get pcm data + audio = AudioSegment.from_wav(io.BytesIO(wav_byte)) + pcm = audio.raw_data + return await pysilk.async_encode(pcm, data_rate=audio.frame_rate, sample_rate=audio.frame_rate) + + @staticmethod + async def wav_byte_to_silk_base64(wav_byte: bytes) -> str: + """将WAV字节数据转换为silk格式的base64字符串。 + + Args: + wav_byte (bytes): WAV格式的字节数据 + + Returns: + str: silk格式的base64编码字符串 + """ + return base64.b64encode(await ToolMixin.wav_byte_to_silk_byte(wav_byte)).decode() + + @staticmethod + async def silk_base64_to_wav_byte(silk_base64: str) -> bytes: + """将silk格式的base64字符串转换为WAV字节数据。 + + Args: + silk_base64 (str): silk格式的base64编码字符串 + + Returns: + bytes: WAV格式的字节数据 + """ + return await ToolMixin.silk_byte_to_byte_wav_byte(base64.b64decode(silk_base64)) \ No newline at end of file diff --git a/wechat_ipad/client/user.py b/wechat_ipad/client/user.py new file mode 100644 index 0000000..7dbc43d --- /dev/null +++ b/wechat_ipad/client/user.py @@ -0,0 +1,81 @@ +import aiohttp + +from wechat_ipad import UserLoggedOut +from wechat_ipad.client.base import WechatAPIClientBase + +from loguru import logger + + +class UserMixin(WechatAPIClientBase): + async def get_profile(self, wxid: str = None) -> dict: + """获取用户信息。 + + Args: + wxid (str, optional): 用户wxid. Defaults to None. + + Returns: + dict: 用户信息字典 + + Raises: + UserLoggedOut: 未登录时调用 + 根据error_handler处理错误 + """ + if not self.wxid and not wxid: + raise UserLoggedOut("请先登录") + + if not wxid: + wxid = self.wxid + + async with aiohttp.ClientSession() as session: + response = await session.post(f'http://{self.ip}:{self.port}/api/User/GetContractProfile?wxid={wxid}') + json_resp = await response.json() + + if json_resp.get("Success"): + return json_resp.get("Data").get("userInfo") + else: + self.error_handler(json_resp) + + async def get_my_qrcode(self, style: int = 0) -> str: + """获取个人二维码。 + + Args: + style (int, optional): 二维码样式. Defaults to 0. + + Returns: + str: 图片的base64编码字符串 + + Raises: + UserLoggedOut: 未登录时调用 + BanProtection: 风控保护: 新设备登录后4小时内请挂机 + 根据error_handler处理错误 + """ + if not self.wxid: + raise UserLoggedOut("请先登录") + + async with aiohttp.ClientSession() as session: + json_param = {"Wxid": self.wxid, "Style": style} + response = await session.post(f'http://{self.ip}:{self.port}/GetMyQRCode', json=json_param) + json_resp = await response.json() + + if json_resp.get("Success"): + return json_resp.get("Data").get("qrcode").get("buffer") + else: + self.error_handler(json_resp) + + async def is_logged_in(self, wxid: str = None) -> bool: + """检查是否登录。 + + Args: + wxid (str, optional): 用户wxid. Defaults to None. + + Returns: + bool: 已登录返回True,未登录返回False + """ + if not wxid: + wxid = self.wxid + try: + await self.get_profile(wxid) + return True + except Exception as e: + logger.error("is_logged_in:{}", e) + return False diff --git a/wechat_ipad/config.toml b/wechat_ipad/config.toml new file mode 100644 index 0000000..1945710 --- /dev/null +++ b/wechat_ipad/config.toml @@ -0,0 +1,6 @@ +server_url = "http://192.168.2.170:8058/" +wxid = "wxid_ts8v7yk4g5c522" +device_id = "4951dbd94b77b4e48f398b39e3857606" +device_name = "Isaac Campbell's Pad" +server_ip = "192.168.2.170" +server_port = "8058" diff --git a/wechat_ipad/errors.py b/wechat_ipad/errors.py new file mode 100644 index 0000000..e186583 --- /dev/null +++ b/wechat_ipad/errors.py @@ -0,0 +1,35 @@ +class MarshallingError(Exception): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + +class UnmarshallingError(Exception): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + +class MMTLSError(Exception): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + +class PacketError(Exception): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + +class ParsePacketError(Exception): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + +class DatabaseError(Exception): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + +class LoginError(Exception): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + +class UserLoggedOut(Exception): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + +class BanProtection(Exception): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs)