Files
abot/wechat_ipad/providers/server_864/group.py
liuwei ff33edb0d1 新增864 provider并打通server_key配置
- 新增 server_864 独立 provider 目录,接入登录、消息轮询、联系人、群资料、用户资料与朋友圈基础能力

- 扩展 gateway、robot 与配置归一化逻辑,支持 server_864/864 别名和 WECHAT_SERVER_KEY

- 更新配置示例与多版本适配路线图,明确 864 第一版接入范围和后续待补项
2026-05-07 11:24:33 +08:00

91 lines
3.3 KiB
Python

from typing import Any, Union
from wechat_ipad.providers.server_864.base import Server864APIClientBase
class ChatroomMixin(Server864APIClientBase):
"""864 群聊相关接口。"""
async def get_chatroom_info(self, chatroom: str) -> dict:
"""获取单个群详情。"""
data = await self._request_data(
"post",
"/group/GetChatRoomInfo",
json_body={"ChatRoomWxIdList": [chatroom]},
timeout=30,
)
if isinstance(data, list):
return dict(data[0] or {}) if data else {}
if not isinstance(data, dict):
return {}
contact_list = data.get("ContactList") or data.get("contactList") or []
if contact_list:
return dict(contact_list[0] or {})
return dict(data)
async def get_chatroom_member_list(self, chatroom: str) -> list[dict]:
"""获取群成员列表。"""
data = await self._request_data(
"post",
"/group/GetChatroomMemberDetail",
json_body={"ChatRoomName": chatroom},
timeout=30,
)
if isinstance(data, list):
return list(data)
if not isinstance(data, dict):
return []
return list(
data.get("ChatRoomMember")
or data.get("chatRoomMember")
or (data.get("NewChatroomData") or {}).get("ChatRoomMember", [])
or (data.get("newChatroomData") or {}).get("chatRoomMember", [])
or data.get("MemberList")
or []
)
async def get_chatroom_announce(self, chatroom: str) -> dict:
"""获取群公告详情。"""
data = await self._request_data(
"post",
"/group/GetChatRoomInfoDetail",
json_body={"ChatRoomName": chatroom},
timeout=30,
)
return dict(data or {})
async def get_chatroom_qrcode(self, chatroom: str) -> dict[str, Any]:
"""获取群二维码。"""
data = await self._request_data(
"post",
"/group/GetChatroomQrCode",
json_body={"ChatRoomName": chatroom},
timeout=30,
)
return {
"base64": str(self._pick_first(data, "QrCode", "qrcode", "QrCodeBase64", "base64") or ""),
"description": str(self._pick_first(data, "Desc", "desc", "Description", "description") or ""),
}
async def get_chatroom_nickname(self, wxid: Union[str, list[str]], chatroom: str) -> Union[str, list[str]]:
"""获取成员在群内的展示昵称。"""
members = await self.get_chatroom_member_list(chatroom)
member_map = {}
for member in members:
member_wxid = str(member.get("UserName", "") or "")
nickname = member.get("DisplayName") or member.get("NickName") or member_wxid
member_map[member_wxid] = nickname
if isinstance(wxid, str):
return str(member_map.get(wxid, ""))
return [str(member_map.get(single_wxid, "")) for single_wxid in wxid]
async def get_chatroom_member_detail(self, wxid: str, chatroom: str) -> dict:
"""获取单个群成员详情。"""
members = await self.get_chatroom_member_list(chatroom)
for member in members:
if str(member.get("UserName", "") or "") == wxid:
return dict(member)
return {}