Files
abot/wechat_ipad/client/group.py

204 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Union, Any
import aiohttp
from wechat_ipad.client.base import WechatAPIClientBase
from wechat_ipad.errors import UserLoggedOut
class ChatroomMixin(WechatAPIClientBase):
async def add_chatroom_member(self, chatroom: str, wxid: str) -> bool:
"""添加群成员(群聊最多40人)
Args:
chatroom: 群聊wxid
wxid: 要添加的wxid
Returns:
bool: 成功返回True, 失败False或者报错
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Chatroom": chatroom, "InviteWxids": wxid}
response = await session.post(f'http://{self.ip}:{self.port}/AddChatroomMember', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def get_chatroom_announce(self, chatroom: str) -> dict:
"""获取群聊公告
Args:
chatroom: 群聊id
Returns:
dict: 群聊信息字典
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "QID": chatroom}
response = await session.post(f'http://{self.ip}:{self.port}/api/Group/GetChatRoomInfoDetail',
json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
data = dict(json_resp.get("Data"))
data.pop("BaseResponse")
return data
else:
self.error_handler(json_resp)
async def get_chatroom_info(self, chatroom: str) -> dict:
"""获取群聊信息
Args:
chatroom: 群聊id
Returns:
dict: 群聊信息字典
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "QID": chatroom}
response = await session.post(f'http://{self.ip}:{self.port}/api/Group/GetChatRoomInfo', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("ContactList")[0]
else:
self.error_handler(json_resp)
async def get_chatroom_member_list(self, chatroom: str) -> list[dict]:
"""获取群聊成员列表
Args:
chatroom: 群聊id
Returns:
list[dict]: 群聊成员列表
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "QID": chatroom}
response = await session.post(f'http://{self.ip}:{self.port}/api/Group/GetChatRoomMemberDetail',
json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("NewChatroomData").get("ChatRoomMember")
else:
self.error_handler(json_resp)
async def get_chatroom_qrcode(self, chatroom: str) -> dict[str, Any]:
"""获取群聊二维码
Args:
chatroom: 群聊id
Returns:
dict: {"base64": 二维码的base64, "description": 二维码描述}
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Chatroom": chatroom}
response = await session.post(f'http://{self.ip}:{self.port}/GetChatroomQRCode', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
data = json_resp.get("Data")
return {"base64": data.get("qrcode").get("buffer"), "description": data.get("revokeQrcodeWording")}
else:
self.error_handler(json_resp)
async def invite_chatroom_member(self, wxid: Union[str, list], chatroom: str) -> bool:
"""邀请群聊成员(群聊大于40人)
Args:
wxid: 要邀请的用户wxid或wxid列表
chatroom: 群聊id
Returns:
bool: 成功返回True, 失败False或者报错
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
if isinstance(wxid, list):
wxid = ",".join(wxid)
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ChatRoomName": chatroom, "ToWxids": wxid}
response = await session.post(f'http://{self.ip}:{self.port}/api/Group/InviteChatRoomMember',
json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def get_chatroom_nickname(self, wxid: Union[str, list[str]], chatroom: str) -> Union[str, list[str]]:
"""获取用户昵称
Args:
wxid: 用户wxid可以是单个wxid或最多20个wxid的列表
chatroom: 群聊id
Returns:
Union[str, list[str]]: 如果输入单个wxid返回str如果输入wxid列表则返回对应的昵称列表
"""
data = await self.get_chatroom_member_list(chatroom)
if isinstance(wxid, str):
# 单个wxid的情况
for member in data:
if member.get("UserName") == wxid:
# 优先返回DisplayName如果不存在则返回NickName
return member.get("DisplayName") or member.get("NickName") or wxid
return "" # 如果没找到对应的成员,返回空字符串
else:
# wxid列表的情况
result = []
for single_wxid in wxid:
found = False
for member in data:
if member.get("UserName") == single_wxid:
# 优先返回DisplayName如果不存在则返回NickName
result.append(member.get("DisplayName") or member.get("NickName") or wxid)
found = True
break
if not found:
result.append(wxid) # 如果没找到对应的成员,添加空字符串
return result
async def get_chatroom_member_detail(self, wxid: str, chatroom: str) -> dict:
"""获取用户昵称
Args:
wxid: 用户wxid可以是单个wxid或最多20个wxid的列表
chatroom: 群聊id
Returns:
Union[str, list[str]]: 如果输入单个wxid返回str如果输入wxid列表则返回对应的昵称列表
"""
data = await self.get_chatroom_member_list(chatroom)
for member in data:
if member.get("UserName") == wxid:
# 优先返回DisplayName如果不存在则返回NickName
return member
return {} # 如果没找到对应的成员,返回空字符串