Files
WechatHookBot/WechatHook/client.py
2025-12-03 15:48:44 +08:00

1332 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
WechatHookClient - 微信 Hook API 客户端
封装所有微信操作的高级 API
"""
import asyncio
import json
import uuid
from typing import List, Dict, Optional
from loguru import logger
from .loader import NoveLoader
from .message_types import MessageType
from .callbacks import RECV_CALLBACK, add_callback_handler
class WechatHookClient:
"""
微信 Hook API 客户端
提供统一的异步 API 接口
"""
def __init__(self, loader: NoveLoader, client_id: int):
"""
初始化客户端
Args:
loader: NoveLoader 实例
client_id: 客户端 ID进程 ID
"""
self.loader = loader
self.client_id = client_id
# 存储待处理的API请求
self.pending_requests = {}
# 注册回调处理器
add_callback_handler(self)
logger.info(f"WechatHookClient 初始化: client_id={client_id}")
async def _log_bot_message(self, to_wxid: str, content: str, msg_type: str = "text", media_url: str = ""):
"""记录机器人发送的消息到 MessageLogger"""
try:
logger.info(f"尝试记录机器人消息: {to_wxid} - {content[:50]}...")
from utils.message_hook import log_bot_message
await log_bot_message(to_wxid, content, msg_type, media_url)
logger.info(f"机器人消息记录成功")
except Exception as e:
logger.error(f"记录机器人消息失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
def _send_data(self, msg_type: int, data: dict) -> bool:
"""
发送数据到微信(同步)
Args:
msg_type: 消息类型
data: 消息数据
Returns:
是否发送成功
"""
payload = {
"type": msg_type,
"data": data
}
message = json.dumps(payload, ensure_ascii=False)
return self.loader.SendWeChatData(self.client_id, message)
async def _send_data_async(self, msg_type: int, data: dict) -> bool:
"""
发送数据到微信(异步)
Args:
msg_type: 消息类型
data: 消息数据
Returns:
是否发送成功
"""
return await asyncio.to_thread(self._send_data, msg_type, data)
# ==================== 消息发送 ====================
async def send_text(self, to_wxid: str, content: str) -> bool:
"""
发送文本消息
Args:
to_wxid: 接收者 wxid
content: 文本内容
Returns:
是否发送成功
"""
data = {
"to_wxid": to_wxid,
"content": content
}
result = await self._send_data_async(MessageType.MT_SEND_TEXT, data)
if result:
logger.info(f"发送文本成功: {to_wxid}")
# 记录机器人发送的消息到 MessageLogger
await self._log_bot_message(to_wxid, content, "text")
else:
logger.error(f"发送文本失败: {to_wxid}")
return result
async def send_image(self, to_wxid: str, image_path: str) -> bool:
"""
发送图片消息
Args:
to_wxid: 接收者 wxid
image_path: 图片文件路径
Returns:
是否发送成功
"""
data = {
"to_wxid": to_wxid,
"file": image_path
}
# 使用正确的图片发送API类型
result = await self._send_data_async(11040, data)
if result:
logger.info(f"发送图片成功: {to_wxid}")
# 记录机器人发送的图片消息
import os
filename = os.path.basename(image_path)
await self._log_bot_message(to_wxid, f"[图片] {filename}", "image", image_path)
else:
logger.error(f"发送图片失败: {to_wxid}")
return result
async def send_file(self, to_wxid: str, file_path: str) -> bool:
"""
发送文件消息(普通发送)
Args:
to_wxid: 接收者 wxid
file_path: 文件路径
Returns:
是否发送成功
"""
data = {
"to_wxid": to_wxid,
"file": file_path # 注意:参数名是 "file" 而不是 "file_path"
}
result = await self._send_data_async(11041, data) # 使用正确的 type=11041
if result:
logger.info(f"发送文件成功: {to_wxid}")
# 记录机器人发送的文件消息
import os
filename = os.path.basename(file_path)
await self._log_bot_message(to_wxid, f"[文件] {filename}", "file", file_path)
else:
logger.error(f"发送文件失败: {to_wxid}")
return result
async def send_video(self, to_wxid: str, video_path: str) -> bool:
"""
发送视频消息
Args:
to_wxid: 接收者 wxid
video_path: 视频文件路径
Returns:
是否发送成功
"""
data = {
"to_wxid": to_wxid,
"video_path": video_path
}
result = await self._send_data_async(11039, data)
if result:
logger.info(f"发送视频成功: {to_wxid}")
# 记录机器人发送的视频消息
import os
filename = os.path.basename(video_path)
await self._log_bot_message(to_wxid, f"[视频] {filename}", "video", video_path)
else:
logger.error(f"发送视频失败: {to_wxid}")
return result
async def send_at_message(
self,
chatroom_id: str,
content: str,
at_list: List[str]
) -> bool:
"""
发送群聊 @ 消息
Args:
chatroom_id: 群聊 ID
content: 消息内容
at_list: 要 @ 的 wxid 列表,["notify@all"] 表示 @所有人
Returns:
是否发送成功
"""
data = {
"chatroom_id": chatroom_id,
"content": content,
"at_list": at_list
}
result = await self._send_data_async(11040, data)
if result:
logger.info(f"发送 @ 消息成功: {chatroom_id}")
else:
logger.error(f"发送 @ 消息失败: {chatroom_id}")
return result
async def send_card(
self,
to_wxid: str,
card_wxid: str,
card_nickname: str
) -> bool:
"""
发送名片消息
Args:
to_wxid: 接收者 wxid
card_wxid: 名片的 wxid
card_nickname: 名片昵称
Returns:
是否发送成功
"""
data = {
"to_wxid": to_wxid,
"card_wxid": card_wxid,
"card_nickname": card_nickname
}
result = await self._send_data_async(11041, data)
if result:
logger.info(f"发送名片成功: {to_wxid}")
else:
logger.error(f"发送名片失败: {to_wxid}")
return result
async def send_link(
self,
to_wxid: str,
title: str,
desc: str,
url: str,
thumb_url: str = ""
) -> bool:
"""
发送链接消息
Args:
to_wxid: 接收者 wxid
title: 链接标题
desc: 链接描述
url: 链接地址
thumb_url: 缩略图 URL
Returns:
是否发送成功
"""
data = {
"to_wxid": to_wxid,
"title": title,
"desc": desc,
"url": url,
"thumb_url": thumb_url
}
result = await self._send_data_async(11042, data)
if result:
logger.info(f"发送链接成功: {to_wxid}")
else:
logger.error(f"发送链接失败: {to_wxid}")
return result
async def cdn_init(self) -> bool:
"""
初始化 CDN 环境
用于初始化 CDN 环境,收到登录消息后执行一次即可。
初始化后才能使用协议 API如获取群成员信息等
Returns:
是否初始化成功
"""
result = await self._send_data_async(11228, {})
if result:
logger.success("CDN 初始化成功")
else:
logger.error("CDN 初始化失败")
return result
async def cdn_download(self, file_id: str, aes_key: str, save_path: str, file_type: int = 2) -> bool:
"""
CDN 下载文件(图片/视频等)
Args:
file_id: 文件 ID从消息中获取的 cdnbigimgurl 等)
aes_key: AES 密钥
save_path: 保存路径
file_type: 文件类型 (1=原图, 2=中图, 3=缩略图, 4=视频, 5=文件&GIF)
Returns:
是否下载成功
"""
data = {
"file_id": file_id,
"file_type": file_type,
"aes_key": aes_key,
"save_path": save_path
}
result = await self._send_data_async(11230, data)
if result:
logger.info(f"CDN 下载成功: {save_path}")
else:
logger.error(f"CDN 下载失败: {file_id}")
return result
async def cdn_upload(self, file_path: str, file_type: int = 1) -> Optional[Dict]:
"""
CDN 上传文件
Args:
file_path: 文件路径
file_type: 文件类型 (1=原图, 2=中图, 3=缩略图, 4=视频, 5=文件&GIF)
Returns:
上传结果字典,包含 aes_key, file_md5 等信息
失败返回 None
"""
# 生成唯一请求ID
request_id = str(uuid.uuid4())
# 创建等待事件
event = asyncio.Event()
result_data = {"cdn_info": None}
# 存储待处理请求
request_key = f"cdn_upload_{request_id}"
self.pending_requests[request_key] = {
"request_id": request_id,
"event": event,
"result": result_data,
"type": "cdn_upload",
"file_path": file_path
}
data = {
"file_path": file_path,
"file_type": file_type
}
await self._send_data_async(11229, data)
logger.info(f"CDN 上传请求已发送: {file_path}")
# 等待回调结果
try:
await asyncio.wait_for(event.wait(), timeout=30)
cdn_info = result_data["cdn_info"]
if cdn_info and cdn_info.get("error_code") == 0:
logger.success(f"CDN 上传成功: {file_path}")
return cdn_info
else:
error_code = cdn_info.get("error_code") if cdn_info else "unknown"
logger.error(f"CDN 上传失败: {file_path}, error_code={error_code}")
return None
except asyncio.TimeoutError:
logger.error(f"CDN 上传超时: {file_path}")
return None
finally:
if request_key in self.pending_requests:
del self.pending_requests[request_key]
async def send_link_card(
self,
to_wxid: str,
title: str,
desc: str,
url: str,
image_url: str = ""
) -> bool:
"""
发送链接卡片CDN发送
Args:
to_wxid: 接收者 wxid
title: 卡片标题
desc: 卡片描述
url: 链接地址
image_url: 卡片图片 URL
Returns:
是否发送成功
"""
data = {
"to_wxid": to_wxid,
"title": title,
"desc": desc,
"url": url,
"image_url": image_url
}
result = await self._send_data_async(11236, data)
if result:
logger.info(f"发送链接卡片成功: {to_wxid}")
else:
logger.error(f"发送链接卡片失败: {to_wxid}")
return result
async def send_link_card_and_get_response(
self,
to_wxid: str,
title: str,
desc: str,
url: str,
image_url: str = "",
timeout_sec: int = 15
) -> Optional[dict]:
data = {
"to_wxid": to_wxid,
"title": title,
"desc": desc,
"url": url,
"image_url": image_url
}
request_key = f"send_link_card_{to_wxid}"
event = asyncio.Event()
self.pending_requests[request_key] = {"event": event, "result": None}
await self._send_data_async(11236, data)
try:
await asyncio.wait_for(event.wait(), timeout=timeout_sec)
result = self.pending_requests[request_key]["result"]
del self.pending_requests[request_key]
return result
except asyncio.TimeoutError:
del self.pending_requests[request_key]
return None
async def revoke_message(self, msg_id: str) -> bool:
"""
撤回消息
Args:
msg_id: 消息 ID
Returns:
是否撤回成功
"""
data = {"msg_id": msg_id}
result = await self._send_data_async(11043, data)
if result:
logger.info(f"撤回消息成功: {msg_id}")
else:
logger.error(f"撤回消息失败: {msg_id}")
return result
# ==================== 好友管理 ====================
async def get_friend_list(self) -> List[Dict]:
"""
获取好友列表
Returns:
好友列表
"""
# 需要实际测试确认返回格式
data = {}
await self._send_data_async(11050, data)
logger.info("请求好友列表")
return []
async def get_friend_info(self, wxid: str) -> Optional[Dict]:
"""
获取好友信息
Args:
wxid: 好友 wxid
Returns:
好友信息字典
"""
data = {"wxid": wxid}
await self._send_data_async(11051, data)
logger.info(f"请求好友信息: {wxid}")
return None
async def add_friend(
self,
wxid: str,
verify_msg: str = "",
scene: int = 3
) -> bool:
"""
添加好友
Args:
wxid: 要添加的 wxid
verify_msg: 验证消息
scene: 添加场景3=搜索15=名片)
Returns:
是否发送成功
"""
data = {
"wxid": wxid,
"verify_msg": verify_msg,
"scene": scene
}
result = await self._send_data_async(11052, data)
if result:
logger.info(f"发送好友请求成功: {wxid}")
else:
logger.error(f"发送好友请求失败: {wxid}")
return result
async def accept_friend(self, v3: str, v4: str, scene: int) -> bool:
"""
同意好友请求
Args:
v3: 好友请求的 v3 参数
v4: 好友请求的 v4 参数
scene: 场景值
Returns:
是否成功
"""
data = {
"v3": v3,
"v4": v4,
"scene": scene
}
result = await self._send_data_async(11053, data)
if result:
logger.info("同意好友请求成功")
else:
logger.error("同意好友请求失败")
return result
async def delete_friend(self, wxid: str) -> bool:
"""
删除好友
Args:
wxid: 要删除的好友 wxid
Returns:
是否成功
"""
data = {"wxid": wxid}
result = await self._send_data_async(11054, data)
if result:
logger.info(f"删除好友成功: {wxid}")
else:
logger.error(f"删除好友失败: {wxid}")
return result
async def set_friend_remark(self, wxid: str, remark: str) -> bool:
"""
修改好友备注
Args:
wxid: 好友 wxid
remark: 新备注
Returns:
是否成功
"""
data = {
"wxid": wxid,
"remark": remark
}
result = await self._send_data_async(11055, data)
if result:
logger.info(f"修改备注成功: {wxid} -> {remark}")
else:
logger.error(f"修改备注失败: {wxid}")
return result
# ==================== 群聊管理 ====================
async def get_chatroom_list(self) -> List[Dict]:
"""
获取群聊列表
Returns:
群聊列表
"""
data = {}
await self._send_data_async(11060, data)
logger.info("请求群聊列表")
return []
async def get_chatroom_members(self, chatroom_id: str) -> List[Dict]:
"""
获取群成员列表(使用协议 API
Args:
chatroom_id: 群聊 ID
Returns:
群成员列表,每个成员包含: wxid, nickname, display_name, avatar
"""
# 生成唯一请求ID
request_id = str(uuid.uuid4())
# 创建等待事件
event = asyncio.Event()
result_data = {"members": [], "success": False}
# 存储待处理请求
self.pending_requests[f"chatroom_info_{chatroom_id}"] = {
"request_id": request_id,
"event": event,
"result": result_data,
"type": "chatroom_info"
}
# 使用 type=11174 获取群信息(协议),包含成员列表和头像
data = {"wxid": chatroom_id}
await self._send_data_async(11174, data)
logger.info(f"请求群信息(协议): {chatroom_id}, request_id: {request_id}")
# 等待回调结果
members = await self._wait_for_chatroom_info(chatroom_id, timeout=15)
return members
async def _wait_for_chatroom_info(self, chatroom_id: str, timeout: int = 15) -> List[Dict]:
"""等待群信息回调type=11174"""
request_key = f"chatroom_info_{chatroom_id}"
if request_key not in self.pending_requests:
logger.error(f"未找到待处理的群信息请求: {chatroom_id}")
return []
request_info = self.pending_requests[request_key]
event = request_info["event"]
try:
# 等待回调事件,设置超时
await asyncio.wait_for(event.wait(), timeout=timeout)
# 获取结果
result = request_info["result"]["members"]
logger.success(f"获取群成员成功(协议): {chatroom_id}, 成员数: {len(result)}")
return result
except asyncio.TimeoutError:
logger.error(f"获取群信息超时: {chatroom_id}")
return []
finally:
# 清理请求
if request_key in self.pending_requests:
del self.pending_requests[request_key]
async def _wait_for_chatroom_members(self, chatroom_id: str, timeout: int = 15) -> List[Dict]:
"""等待群成员信息回调(已废弃,保留用于兼容)"""
if chatroom_id not in self.pending_requests:
logger.error(f"未找到待处理的群成员请求: {chatroom_id}")
return []
request_info = self.pending_requests[chatroom_id]
event = request_info["event"]
try:
# 等待回调事件,设置超时
await asyncio.wait_for(event.wait(), timeout=timeout)
# 获取结果
result = request_info["result"]["members"]
logger.success(f"获取群成员成功: {chatroom_id}, 成员数: {len(result)}")
return result
except asyncio.TimeoutError:
logger.error(f"获取群成员超时: {chatroom_id}")
return []
finally:
# 清理请求
if chatroom_id in self.pending_requests:
del self.pending_requests[chatroom_id]
async def _get_chatroom_members_fallback(self, chatroom_id: str) -> List[Dict]:
"""
备用方案使用11031获取群成员基本信息
Args:
chatroom_id: 群聊ID
Returns:
群成员列表仅包含wxid
"""
try:
# 生成唯一请求ID
request_id = str(uuid.uuid4())
# 创建等待事件
event = asyncio.Event()
result_data = {"chatrooms": []}
# 存储待处理请求
self.pending_requests[f"chatroom_list_{request_id}"] = {
"request_id": request_id,
"event": event,
"result": result_data,
"type": "chatroom_list",
"target_chatroom": chatroom_id
}
# 发送获取群聊列表请求(包含成员列表)
data = {"detail": 1}
await self._send_data_async(11031, data)
logger.info(f"请求群聊列表(备用方案): {chatroom_id}, request_id: {request_id}")
# 等待回调结果
try:
await asyncio.wait_for(event.wait(), timeout=15)
# 查找目标群聊的成员列表
chatrooms = result_data["chatrooms"]
for chatroom in chatrooms:
if chatroom.get("wxid") == chatroom_id:
member_list = chatroom.get("member_list", [])
# 转换为标准格式
members = []
for wxid in member_list:
members.append({
"wxid": wxid,
"nickname": wxid, # 使用wxid作为备用昵称
"display_name": "" # 群内昵称为空
})
logger.success(f"备用方案获取群成员成功: {chatroom_id}, 成员数: {len(members)}")
return members
logger.warning(f"在群聊列表中未找到目标群聊: {chatroom_id}")
return []
except asyncio.TimeoutError:
logger.error(f"备用方案获取群成员超时: {chatroom_id}")
return []
finally:
# 清理请求
key = f"chatroom_list_{request_id}"
if key in self.pending_requests:
del self.pending_requests[key]
except Exception as e:
logger.error(f"备用方案获取群成员失败: {e}")
return []
async def get_chatroom_info(self, chatroom_id: str) -> Optional[Dict]:
"""
获取群信息
Args:
chatroom_id: 群聊 ID
Returns:
群信息字典
"""
data = {"chatroom_id": chatroom_id}
await self._send_data_async(11062, data)
logger.info(f"请求群信息: {chatroom_id}")
return None
async def create_chatroom(self, member_list: List[str]) -> Optional[str]:
"""
创建群聊
Args:
member_list: 成员 wxid 列表至少2人
Returns:
新群聊的 chatroom_id
"""
data = {"member_list": member_list}
result = await self._send_data_async(11063, data)
if result:
logger.info("创建群聊成功")
else:
logger.error("创建群聊失败")
return None
async def invite_to_chatroom(
self,
chatroom_id: str,
wxid_list: List[str]
) -> bool:
"""
邀请进群
Args:
chatroom_id: 群聊 ID
wxid_list: 要邀请的 wxid 列表
Returns:
是否成功
"""
data = {
"chatroom_id": chatroom_id,
"wxid_list": wxid_list
}
result = await self._send_data_async(11064, data)
if result:
logger.info(f"邀请进群成功: {chatroom_id}")
else:
logger.error(f"邀请进群失败: {chatroom_id}")
return result
async def remove_chatroom_member(
self,
chatroom_id: str,
wxid_list: List[str]
) -> bool:
"""
踢出群成员
Args:
chatroom_id: 群聊 ID
wxid_list: 要踢出的 wxid 列表
Returns:
是否成功
"""
data = {
"chatroom_id": chatroom_id,
"wxid_list": wxid_list
}
result = await self._send_data_async(11065, data)
if result:
logger.info(f"踢出群成员成功: {chatroom_id}")
else:
logger.error(f"踢出群成员失败: {chatroom_id}")
return result
async def quit_chatroom(self, chatroom_id: str) -> bool:
"""
退出群聊
Args:
chatroom_id: 群聊 ID
Returns:
是否成功
"""
data = {"chatroom_id": chatroom_id}
result = await self._send_data_async(11066, data)
if result:
logger.info(f"退出群聊成功: {chatroom_id}")
else:
logger.error(f"退出群聊失败: {chatroom_id}")
return result
async def set_chatroom_name(self, chatroom_id: str, name: str) -> bool:
"""
修改群名称
Args:
chatroom_id: 群聊 ID
name: 新群名称
Returns:
是否成功
"""
data = {
"chatroom_id": chatroom_id,
"name": name
}
result = await self._send_data_async(11067, data)
if result:
logger.info(f"修改群名称成功: {chatroom_id}")
else:
logger.error(f"修改群名称失败: {chatroom_id}")
return result
async def set_chatroom_announcement(
self,
chatroom_id: str,
announcement: str
) -> bool:
"""
修改群公告
Args:
chatroom_id: 群聊 ID
announcement: 群公告内容
Returns:
是否成功
"""
data = {
"chatroom_id": chatroom_id,
"announcement": announcement
}
result = await self._send_data_async(11068, data)
if result:
logger.info(f"修改群公告成功: {chatroom_id}")
else:
logger.error(f"修改群公告失败: {chatroom_id}")
return result
async def set_my_chatroom_nickname(
self,
chatroom_id: str,
nickname: str
) -> bool:
"""
修改我的群昵称
Args:
chatroom_id: 群聊 ID
nickname: 新昵称
Returns:
是否成功
"""
data = {
"chatroom_id": chatroom_id,
"nickname": nickname
}
result = await self._send_data_async(11069, data)
if result:
logger.info(f"修改群昵称成功: {chatroom_id}")
else:
logger.error(f"修改群昵称失败: {chatroom_id}")
return result
# ==================== 登录信息 ====================
async def get_login_info(self) -> Optional[Dict]:
"""
获取当前登录信息
Returns:
登录信息字典
"""
data = {}
await self._send_data_async(MessageType.MT_GET_LOGIN_INFO, data)
logger.info("请求登录信息")
return None
async def get_user_info_in_chatroom(self, chatroom_id: str, user_wxid: str, max_retries: int = 2) -> Optional[Dict]:
"""
获取群内用户详细信息使用协议API
Args:
chatroom_id: 群聊 ID
user_wxid: 用户 wxid
max_retries: 最大重试次数
Returns:
用户详细信息字典
"""
for attempt in range(max_retries + 1):
# 生成唯一请求ID
request_id = str(uuid.uuid4())
# 创建等待事件
event = asyncio.Event()
result_data = {"user_info": None}
# 存储待处理请求
request_key = f"user_info_{chatroom_id}_{user_wxid}"
self.pending_requests[request_key] = {
"request_id": request_id,
"event": event,
"result": result_data,
"type": "user_info_in_chatroom",
"chatroom_id": chatroom_id,
"user_wxid": user_wxid
}
# 发送请求
data = {
"room_wxid": chatroom_id,
"wxid": user_wxid
}
await self._send_data_async(11174, data)
if attempt == 0:
logger.info(f"请求群内用户信息: chatroom={chatroom_id}, user={user_wxid}")
else:
logger.info(f"重试请求群内用户信息 (第{attempt}次): user={user_wxid}")
# 等待回调结果
try:
await asyncio.wait_for(event.wait(), timeout=15)
# 获取结果
user_info = result_data["user_info"]
if user_info:
logger.success(f"获取群内用户信息成功: {user_wxid}")
return user_info
else:
logger.warning(f"获取群内用户信息为空: {user_wxid}")
if attempt < max_retries:
await asyncio.sleep(1)
continue
return None
except asyncio.TimeoutError:
logger.warning(f"获取群内用户信息超时 (第{attempt + 1}次): {user_wxid}")
if attempt < max_retries:
await asyncio.sleep(1)
continue
logger.error(f"获取群内用户信息最终失败: {user_wxid}")
return None
finally:
# 清理请求
if request_key in self.pending_requests:
del self.pending_requests[request_key]
return None
@RECV_CALLBACK(in_class=True)
def on_api_response(self, client_id: int, msg_type: int, data: dict):
"""
处理API响应回调
Args:
client_id: 客户端ID
msg_type: 消息类型
data: 响应数据
"""
# 只处理本客户端的消息
if client_id != self.client_id:
return
# 处理群成员信息响应
if msg_type == 11032:
self._handle_chatroom_members_response(data)
# 处理群聊列表响应(备用方案)
elif msg_type == 11031:
self._handle_chatroom_list_response(data)
# 处理单个用户信息响应协议API
elif msg_type == 11174:
self._handle_user_info_in_chatroom_response(data)
# 处理CDN上传响应
elif msg_type == 11229:
self._handle_cdn_upload_response(data)
elif msg_type == 11236:
try:
to_user = data.get("toUserName", "")
request_key = f"send_link_card_{to_user}"
if request_key in self.pending_requests:
self.pending_requests[request_key]["result"] = data
self.pending_requests[request_key]["event"].set()
except Exception as e:
logger.error(f"处理链接卡片响应失败: {e}")
def _handle_chatroom_members_response(self, data: dict):
"""
处理群成员信息响应
Args:
data: 响应数据
"""
try:
# 检查是否有错误
errcode = data.get("errcode")
errmsg = data.get("errmsg", "")
if errcode is not None and errcode != 0:
logger.error(f"群成员信息API返回错误: errcode={errcode}, errmsg={errmsg}")
# 对于所有待处理的群成员请求,都触发事件(返回空结果)
for chatroom_id, request_info in list(self.pending_requests.items()):
if request_info.get("type") == "chatroom_members":
request_info["result"]["members"] = []
request_info["result"]["success"] = False
request_info["event"].set()
logger.warning(f"群成员请求因API错误而失败: {chatroom_id}")
return
group_wxid = data.get("group_wxid", "")
member_list = data.get("member_list", [])
logger.info(f"收到群成员信息响应: group_wxid={group_wxid}, 成员数={len(member_list)}")
# 查找对应的待处理请求
if group_wxid in self.pending_requests:
request_info = self.pending_requests[group_wxid]
# 存储结果数据
request_info["result"]["members"] = member_list
request_info["result"]["success"] = True
# 触发等待事件
request_info["event"].set()
logger.success(f"群成员信息处理完成: {group_wxid}")
else:
logger.warning(f"未找到对应的群成员请求: {group_wxid}")
except Exception as e:
logger.error(f"处理群成员信息响应失败: {e}")
# 触发所有待处理的群成员请求(返回空结果)
for chatroom_id, request_info in list(self.pending_requests.items()):
if request_info.get("type") == "chatroom_members":
request_info["result"]["members"] = []
request_info["result"]["success"] = False
request_info["event"].set()
def _handle_chatroom_list_response(self, data: dict):
"""
处理群聊列表响应(备用方案)
Args:
data: 响应数据
"""
try:
# data应该是一个包含群聊信息的列表
chatrooms = data if isinstance(data, list) else []
logger.info(f"收到群聊列表响应: 群聊数={len(chatrooms)}")
# 查找所有等待群聊列表的请求
for key, request_info in list(self.pending_requests.items()):
if key.startswith("chatroom_list_") and request_info.get("type") == "chatroom_list":
# 存储结果数据
request_info["result"]["chatrooms"] = chatrooms
# 触发等待事件
request_info["event"].set()
logger.success(f"群聊列表处理完成: {key}")
break
except Exception as e:
logger.error(f"处理群聊列表响应失败: {e}")
def _handle_user_info_in_chatroom_response(self, data: dict):
"""
处理群内用户信息响应11174 API
支持两种请求类型:
1. chatroom_info - 获取群信息(包含成员列表)
2. user_info_in_chatroom - 获取单个用户信息
Args:
data: 响应数据
"""
try:
# 检查基础响应
base_response = data.get("baseResponse", {})
ret_code = base_response.get("ret", -1)
if ret_code != 0:
logger.error(f"11174 API返回错误: ret={ret_code}")
# 触发所有相关的待处理请求
self._trigger_user_info_requests(None)
self._trigger_chatroom_info_requests([])
return
# 获取联系人列表
contact_list = data.get("contactList", [])
if not contact_list:
logger.warning("11174 响应中无联系人数据")
self._trigger_user_info_requests(None)
self._trigger_chatroom_info_requests([])
return
# 第一个联系人信息
contact_info = contact_list[0]
contact_wxid = contact_info.get("userName", {}).get("string", "")
# 判断是群聊还是个人
is_chatroom = contact_wxid.endswith("@chatroom")
if is_chatroom:
# 处理群信息请求
logger.info(f"收到群信息响应: chatroom_id={contact_wxid}")
# 提取群成员列表
new_chatroom_data = contact_info.get("newChatroomData", {})
member_list = new_chatroom_data.get("chatRoomMemberList", [])
# 转换成员数据格式
members = []
for member in member_list:
members.append({
"wxid": member.get("userName", ""),
"nickname": member.get("nickName", ""),
"display_name": member.get("displayName", ""),
"avatar": member.get("bigHeadImgUrl", ""),
"invite_by": member.get("inviteBy", "")
})
logger.info(f"解析到 {len(members)} 个群成员")
# 查找对应的待处理请求
request_key = f"chatroom_info_{contact_wxid}"
if request_key in self.pending_requests:
request_info = self.pending_requests[request_key]
request_info["result"]["members"] = members
request_info["result"]["success"] = True
request_info["event"].set()
logger.success(f"群信息处理完成: {contact_wxid}")
else:
logger.warning(f"未找到对应的群信息请求: {contact_wxid}")
else:
# 处理单个用户信息请求
logger.info(f"收到群内用户信息响应: user_wxid={contact_wxid}")
# 查找对应的待处理请求
for request_key, request_info in list(self.pending_requests.items()):
if (request_info.get("type") == "user_info_in_chatroom" and
request_info.get("user_wxid") == contact_wxid):
# 存储结果数据
request_info["result"]["user_info"] = contact_info
# 触发等待事件
request_info["event"].set()
logger.success(f"群内用户信息处理完成: {contact_wxid}")
return
logger.warning(f"未找到对应的群内用户信息请求: {contact_wxid}")
except Exception as e:
logger.error(f"处理 11174 响应失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
self._trigger_user_info_requests(None)
self._trigger_chatroom_info_requests([])
def _trigger_user_info_requests(self, user_info):
"""
触发所有待处理的用户信息请求
Args:
user_info: 用户信息None表示失败
"""
for request_key, request_info in list(self.pending_requests.items()):
if request_info.get("type") == "user_info_in_chatroom":
request_info["result"]["user_info"] = user_info
request_info["event"].set()
def _trigger_chatroom_info_requests(self, members):
"""
触发所有待处理的群信息请求
Args:
members: 群成员列表,空列表表示失败
"""
for request_key, request_info in list(self.pending_requests.items()):
if request_info.get("type") == "chatroom_info":
request_info["result"]["members"] = members
request_info["result"]["success"] = len(members) > 0
request_info["event"].set()
def _handle_cdn_upload_response(self, data: dict):
"""
处理CDN上传响应
Args:
data: 响应数据,包含 aes_key, file_md5, error_code 等
"""
try:
file_path = data.get("file_path", "")
error_code = data.get("error_code", -1)
logger.info(f"收到CDN上传响应: file_path={file_path}, error_code={error_code}")
# 查找对应的待处理请求
for request_key, request_info in list(self.pending_requests.items()):
if request_info.get("type") == "cdn_upload":
# 存储结果数据
request_info["result"]["cdn_info"] = data
# 触发等待事件
request_info["event"].set()
logger.success(f"CDN上传响应处理完成: {file_path}")
break
except Exception as e:
logger.error(f"处理CDN上传响应失败: {e}")
async def send_cdn_image(self, to_wxid: str, file_path: str) -> bool:
"""
通过CDN上传并发送图片
Args:
to_wxid: 接收者 wxid
file_path: 图片文件路径
Returns:
是否发送成功
"""
try:
# 1. 上传到CDN
cdn_info = await self.cdn_upload(file_path, file_type=1)
if not cdn_info:
logger.error(f"CDN上传失败无法发送图片: {file_path}")
return False
# 2. 使用CDN信息发送图片
data = {
"to_wxid": to_wxid,
"aes_key": cdn_info.get("aes_key", ""),
"file_md5": cdn_info.get("file_md5", ""),
"file_size": cdn_info.get("file_size", 0),
"mid_file_md5": cdn_info.get("mid_file_md5", ""),
"mid_file_size": cdn_info.get("mid_file_size", 0),
"thumb_file_md5": cdn_info.get("thumb_file_md5", ""),
"thumb_file_size": cdn_info.get("thumb_file_size", 0)
}
result = await self._send_data_async(11233, data)
if result:
logger.success(f"CDN图片发送成功: {to_wxid}")
import os
filename = os.path.basename(file_path)
await self._log_bot_message(to_wxid, f"[图片] {filename}", "image", file_path)
else:
logger.error(f"CDN图片发送失败: {to_wxid}")
return result
except Exception as e:
logger.error(f"发送CDN图片异常: {e}")
return False