1594 lines
54 KiB
Python
1594 lines
54 KiB
Python
"""
|
||
WechatHookClient - 微信 Hook API 客户端
|
||
|
||
封装所有微信操作的高级 API(HTTP 协议版本)
|
||
"""
|
||
|
||
import asyncio
|
||
import hashlib
|
||
import os
|
||
import shutil
|
||
import time
|
||
from pathlib import Path
|
||
from typing import List, Dict, Optional
|
||
from loguru import logger
|
||
from xml.sax.saxutils import escape as xml_escape
|
||
|
||
from .http_client import HttpClient
|
||
|
||
|
||
class WechatHookClient:
|
||
"""
|
||
微信 Hook API 客户端
|
||
|
||
提供统一的异步 API 接口(HTTP 协议)
|
||
"""
|
||
|
||
def __init__(self, base_url: str = "http://127.0.0.1:8888", timeout: float = 30.0):
|
||
"""
|
||
初始化客户端
|
||
|
||
Args:
|
||
base_url: Hook API 的基础 URL
|
||
timeout: 请求超时时间
|
||
"""
|
||
self.base_url = base_url
|
||
self.http_client = HttpClient(base_url=base_url, timeout=timeout)
|
||
|
||
# 登录信息缓存
|
||
self._login_info: Optional[Dict] = None
|
||
self._wxid: str = ""
|
||
self._nickname: str = ""
|
||
|
||
# 群成员缓存 {chatroom_id: {wxid: member_info}}
|
||
self._chatroom_members_cache: Dict[str, Dict[str, Dict]] = {}
|
||
self._db_tables_cache: Dict[str, List[str]] = {}
|
||
self._db_table_columns_cache: Dict[str, Dict[str, List[str]]] = {}
|
||
self._db_chatroom_list_cache: Dict[str, List[str]] = {}
|
||
self._db_names_cache: List[str] = []
|
||
|
||
# 媒体下载缓存(微信图片/视频)
|
||
base_dir = Path(__file__).resolve().parent.parent
|
||
self._media_cache_dir = base_dir / "temp" / "wechat_media_cache"
|
||
self._media_cache_dir.mkdir(parents=True, exist_ok=True)
|
||
self._media_cache_ttl = 3600
|
||
self._media_cache_locks: Dict[str, asyncio.Lock] = {}
|
||
|
||
logger.info(f"WechatHookClient 初始化: base_url={base_url}")
|
||
|
||
async def close(self):
|
||
"""关闭客户端"""
|
||
await self.http_client.close()
|
||
logger.info("WechatHookClient 已关闭")
|
||
|
||
@property
|
||
def wxid(self) -> str:
|
||
"""获取当前登录的 wxid"""
|
||
return self._wxid
|
||
|
||
@property
|
||
def nickname(self) -> str:
|
||
"""获取当前登录的昵称"""
|
||
return self._nickname
|
||
|
||
def update_profile(self, wxid: str, nickname: str):
|
||
"""
|
||
更新登录信息
|
||
|
||
Args:
|
||
wxid: 微信 ID
|
||
nickname: 昵称
|
||
"""
|
||
self._wxid = wxid
|
||
self._nickname = nickname
|
||
logger.info(f"更新登录信息: wxid={wxid}, nickname={nickname}")
|
||
|
||
def update_chatroom_members_cache(self, chatroom_id: str, members: List[Dict]):
|
||
"""
|
||
更新群成员缓存(从消息回调中提取)
|
||
|
||
Args:
|
||
chatroom_id: 群聊 ID
|
||
members: 群成员列表
|
||
"""
|
||
if not chatroom_id or not members:
|
||
return
|
||
if chatroom_id not in self._chatroom_members_cache:
|
||
self._chatroom_members_cache[chatroom_id] = {}
|
||
for member in members:
|
||
wxid = member.get("userName", "")
|
||
if wxid:
|
||
self._chatroom_members_cache[chatroom_id][wxid] = member
|
||
|
||
def get_cached_member_info(self, chatroom_id: str, user_wxid: str) -> Optional[Dict]:
|
||
"""从缓存获取群成员信息"""
|
||
if chatroom_id in self._chatroom_members_cache:
|
||
return self._chatroom_members_cache[chatroom_id].get(user_wxid)
|
||
return None
|
||
|
||
async def _log_bot_message(self, to_wxid: str, content: str, msg_type: str = "text", media_url: str = ""):
|
||
"""记录机器人发送的消息到 MessageLogger"""
|
||
try:
|
||
from utils.message_hook import log_bot_message
|
||
await log_bot_message(to_wxid, content, msg_type, media_url)
|
||
except Exception as e:
|
||
logger.debug(f"记录机器人消息失败(可忽略): {e}")
|
||
|
||
async def send_message(self, to_wxid: str, msg_type: str, content: str) -> bool:
|
||
"""
|
||
统一发送消息入口(文本/图片/视频/文件/XML)
|
||
|
||
Args:
|
||
to_wxid: 接收者 wxid
|
||
msg_type: 消息类型(text/image/video/file/xml)
|
||
content: 文本内容或文件路径或 XML
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
return await self._send_message(to_wxid, msg_type, content)
|
||
|
||
async def _send_message(self, to_wxid: str, msg_type: str, content: str) -> bool:
|
||
"""统一发送消息实现,集中日志与类型处理"""
|
||
msg_type = (msg_type or "").lower()
|
||
|
||
if msg_type == "text":
|
||
result = await self.http_client.send_text(to_wxid, content)
|
||
if result:
|
||
await self._log_bot_message(to_wxid, content, "text")
|
||
return result
|
||
|
||
if msg_type == "image":
|
||
result = await self.http_client.send_image(to_wxid, content)
|
||
if result:
|
||
filename = os.path.basename(content)
|
||
await self._log_bot_message(to_wxid, f"[图片] {filename}", "image", content)
|
||
return result
|
||
|
||
if msg_type == "video":
|
||
# 新协议可能使用文件发送接口发送视频
|
||
result = await self.http_client.send_file(to_wxid, content)
|
||
if result:
|
||
filename = os.path.basename(content)
|
||
await self._log_bot_message(to_wxid, f"[视频] {filename}", "video", content)
|
||
return result
|
||
|
||
if msg_type == "file":
|
||
result = await self.http_client.send_file(to_wxid, content)
|
||
if result:
|
||
filename = os.path.basename(content)
|
||
await self._log_bot_message(to_wxid, f"[文件] {filename}", "file", content)
|
||
return result
|
||
|
||
if msg_type == "xml":
|
||
return await self._send_xml_message(to_wxid, content)
|
||
|
||
logger.error(f"不支持的消息类型: {msg_type}")
|
||
return False
|
||
|
||
async def _send_xml_message(self, to_wxid: str, xml: str) -> bool:
|
||
"""发送 XML 消息(如聊天记录、音乐卡片等)"""
|
||
# 尝试提取 appmsg 内容和 type,使用新协议
|
||
try:
|
||
import xml.etree.ElementTree as ET
|
||
# 尝试解析 XML
|
||
root = ET.fromstring(xml)
|
||
|
||
# 查找 appmsg 元素
|
||
appmsg = root.find(".//appmsg")
|
||
if appmsg is None and root.tag == "appmsg":
|
||
appmsg = root
|
||
|
||
if appmsg is not None:
|
||
# 提取 type
|
||
type_elem = appmsg.find("type")
|
||
msg_type = type_elem.text if type_elem is not None else "5"
|
||
|
||
# 将 appmsg 转换回字符串
|
||
appmsg_content = ET.tostring(appmsg, encoding="unicode")
|
||
|
||
result = await self.http_client.send_app_msg(to_wxid, appmsg_content, msg_type)
|
||
if result:
|
||
await self._log_bot_message(to_wxid, "[XML消息]", "xml")
|
||
return result
|
||
except Exception as e:
|
||
logger.warning(f"解析 XML 失败,尝试旧协议: {e}")
|
||
|
||
# 回退到旧协议
|
||
result = await self.http_client.send_xml(to_wxid, xml)
|
||
if result:
|
||
await self._log_bot_message(to_wxid, "[XML消息]", "xml")
|
||
return result
|
||
|
||
# ==================== 消息发送 ====================
|
||
|
||
async def send_text(self, to_wxid: str, content: str) -> bool:
|
||
"""
|
||
发送文本消息
|
||
|
||
Args:
|
||
to_wxid: 接收者 wxid
|
||
content: 文本内容
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
return await self._send_message(to_wxid, "text", content)
|
||
|
||
async def send_image(self, to_wxid: str, image_path: str) -> bool:
|
||
"""
|
||
发送图片消息
|
||
|
||
Args:
|
||
to_wxid: 接收者 wxid
|
||
image_path: 图片文件路径
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
return await self._send_message(to_wxid, "image", image_path)
|
||
|
||
async def send_file(self, to_wxid: str, file_path: str) -> bool:
|
||
"""
|
||
发送文件消息
|
||
|
||
Args:
|
||
to_wxid: 接收者 wxid
|
||
file_path: 文件路径
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
return await self._send_message(to_wxid, "file", file_path)
|
||
|
||
async def send_media(self, to_wxid: str, file_path: str, media_type: str = "") -> bool:
|
||
"""
|
||
统一发送媒体消息(图片/视频/文件)
|
||
|
||
Args:
|
||
to_wxid: 接收者 wxid
|
||
file_path: 文件路径
|
||
media_type: 媒体类型(image/video/file,可选)
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
media_type = (media_type or "").lower()
|
||
|
||
if media_type in {"image", "img", "pic", "picture", "photo"}:
|
||
return await self._send_message(to_wxid, "image", file_path)
|
||
if media_type in {"video", "vid"}:
|
||
return await self._send_message(to_wxid, "video", file_path)
|
||
if media_type in {"file", "doc", "document"}:
|
||
return await self._send_message(to_wxid, "file", file_path)
|
||
|
||
ext = os.path.splitext(file_path)[1].lower()
|
||
if ext in {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}:
|
||
return await self._send_message(to_wxid, "image", file_path)
|
||
if ext in {".mp4", ".mov", ".avi", ".mkv", ".wmv", ".webm"}:
|
||
return await self._send_message(to_wxid, "video", file_path)
|
||
|
||
return await self._send_message(to_wxid, "file", file_path)
|
||
|
||
async def send_video(self, to_wxid: str, video_path: str) -> bool:
|
||
"""
|
||
发送视频消息
|
||
|
||
Args:
|
||
to_wxid: 接收者 wxid
|
||
video_path: 视频文件路径
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
return await self._send_message(to_wxid, "video", video_path)
|
||
|
||
async def send_at_message(
|
||
self,
|
||
chatroom_id: str,
|
||
content: str,
|
||
at_list: List[str]
|
||
) -> bool:
|
||
"""
|
||
发送群聊 @ 消息
|
||
|
||
Args:
|
||
chatroom_id: 群聊 ID
|
||
content: 消息内容
|
||
at_list: 要 @ 的 wxid 列表,["notify@all"] 表示 @所有人
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
# 将列表转换为逗号分隔的字符串
|
||
wxids = ",".join(at_list)
|
||
return await self.http_client.send_at_text(chatroom_id, content, wxids)
|
||
|
||
async def send_card(
|
||
self,
|
||
to_wxid: str,
|
||
card_wxid: str,
|
||
card_nickname: str = ""
|
||
) -> bool:
|
||
"""
|
||
发送名片消息
|
||
|
||
Args:
|
||
to_wxid: 接收者 wxid
|
||
card_wxid: 名片的 wxid
|
||
card_nickname: 名片昵称(新协议不需要)
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
return await self.http_client.send_card(to_wxid, card_wxid)
|
||
|
||
async def send_link(
|
||
self,
|
||
to_wxid: str,
|
||
title: str,
|
||
desc: str,
|
||
url: str,
|
||
thumb_url: str = ""
|
||
) -> bool:
|
||
"""
|
||
发送链接消息
|
||
|
||
Args:
|
||
to_wxid: 接收者 wxid
|
||
title: 链接标题
|
||
desc: 链接描述
|
||
url: 链接地址
|
||
thumb_url: 缩略图 URL
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
# 默认缩略图
|
||
if not thumb_url:
|
||
thumb_url = "https://www.functen.cn/static/img/709a3f34713ef07b09d524bee2df69d6.DY.webp"
|
||
|
||
safe_title = xml_escape(str(title or ""), {'"': """, "'": "'"})
|
||
safe_desc = xml_escape(str(desc or ""), {'"': """, "'": "'"})
|
||
safe_url = xml_escape(str(url or ""), {'"': """, "'": "'"})
|
||
safe_thumb_url = xml_escape(str(thumb_url or ""), {'"': """, "'": "'"})
|
||
|
||
# 构建 appmsg XML 内容(新协议不需要外层 <msg> 标签)
|
||
appmsg_content = f'''<appmsg appid="" sdkver="0">
|
||
<title>{safe_title}</title>
|
||
<des>{safe_desc}</des>
|
||
<type>5</type>
|
||
<url>{safe_url}</url>
|
||
<thumburl>{safe_thumb_url}</thumburl>
|
||
</appmsg>'''
|
||
# 使用新协议发送卡片消息,type=5 为链接卡片
|
||
return await self.http_client.send_app_msg(to_wxid, appmsg_content, "5")
|
||
|
||
async def send_link_card(
|
||
self,
|
||
to_wxid: str,
|
||
title: str,
|
||
desc: str,
|
||
url: str,
|
||
image_url: str = ""
|
||
) -> bool:
|
||
"""
|
||
发送链接卡片
|
||
|
||
Args:
|
||
to_wxid: 接收者 wxid
|
||
title: 卡片标题
|
||
desc: 卡片描述
|
||
url: 链接地址
|
||
image_url: 卡片图片 URL
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
return await self.send_link(to_wxid, title, desc, url, image_url)
|
||
|
||
async def revoke_message(self, msg_id: str) -> bool:
|
||
"""
|
||
撤回消息
|
||
|
||
Args:
|
||
msg_id: 消息 ID (newMsgId)
|
||
|
||
Returns:
|
||
是否撤回成功
|
||
"""
|
||
return await self.http_client.revoke_message(msg_id)
|
||
|
||
async def send_xml(self, to_wxid: str, xml: str) -> bool:
|
||
"""
|
||
发送 XML 消息(如聊天记录、音乐卡片等)
|
||
|
||
Args:
|
||
to_wxid: 接收者 wxid
|
||
xml: XML 内容(可以是完整的 <msg><appmsg>...</appmsg></msg> 或只有 <appmsg>...</appmsg>)
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
return await self._send_message(to_wxid, "xml", xml)
|
||
|
||
async def get_group_member_contact(self, room_id: str, member_wxid: str) -> Optional[Dict]:
|
||
"""
|
||
获取群成员的详细联系人信息(包含头像等)
|
||
|
||
Args:
|
||
room_id: 群聊 ID
|
||
member_wxid: 成员 wxid
|
||
|
||
Returns:
|
||
成员详细信息,失败返回 None
|
||
"""
|
||
return await self.http_client.get_group_member_contact(room_id, member_wxid)
|
||
|
||
# ==================== CDN 相关(新协议简化) ====================
|
||
|
||
async def cdn_init(self) -> bool:
|
||
"""
|
||
初始化 CDN 环境
|
||
|
||
新协议不需要单独初始化 CDN
|
||
|
||
Returns:
|
||
始终返回 True
|
||
"""
|
||
logger.info("新协议无需初始化 CDN")
|
||
return True
|
||
|
||
async def cdn_download(self, file_id: str, aes_key: str, save_path: str, file_type: int = 2) -> bool:
|
||
"""
|
||
CDN 下载(兼容接口)
|
||
|
||
新协议使用 download_image/download_video
|
||
|
||
Returns:
|
||
是否下载成功
|
||
"""
|
||
logger.warning("cdn_download 在新协议中不可用,请使用 download_image/download_video")
|
||
return False
|
||
|
||
async def cdn_upload(self, file_path: str, file_type: int = 1) -> Optional[Dict]:
|
||
"""
|
||
CDN 上传(兼容接口)
|
||
|
||
新协议直接发送文件,无需先上传
|
||
|
||
Returns:
|
||
None
|
||
"""
|
||
logger.warning("cdn_upload 在新协议中不需要,直接使用 send_image/send_file")
|
||
return None
|
||
|
||
async def send_cdn_image(self, to_wxid: str, file_path: str) -> bool:
|
||
"""
|
||
发送图片(兼容接口)
|
||
|
||
Args:
|
||
to_wxid: 接收者 wxid
|
||
file_path: 图片文件路径
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
return await self.send_image(to_wxid, file_path)
|
||
|
||
# ==================== 好友管理 ====================
|
||
|
||
async def get_friend_list(self) -> List[Dict]:
|
||
"""
|
||
获取好友列表
|
||
|
||
Returns:
|
||
好友列表
|
||
"""
|
||
return await self.http_client.get_friend_list()
|
||
|
||
async def get_friend_info(self, wxid: str) -> Optional[Dict]:
|
||
"""
|
||
获取好友信息
|
||
|
||
Args:
|
||
wxid: 好友 wxid
|
||
|
||
Returns:
|
||
好友信息字典
|
||
"""
|
||
return await self.http_client.get_friend_info(wxid)
|
||
|
||
async def add_friend(
|
||
self,
|
||
wxid: str,
|
||
verify_msg: str = "",
|
||
scene: int = 3
|
||
) -> bool:
|
||
"""
|
||
添加好友
|
||
|
||
Args:
|
||
wxid: 要添加的 wxid
|
||
verify_msg: 验证消息
|
||
scene: 添加场景(3=搜索,15=名片)
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
return await self.http_client.add_friend(wxid, verify_msg, scene)
|
||
|
||
async def accept_friend(self, v3: str, v4: str, scene: int) -> bool:
|
||
"""
|
||
同意好友请求
|
||
|
||
Args:
|
||
v3: 好友请求的 v3 参数
|
||
v4: 好友请求的 v4 参数
|
||
scene: 场景值
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
return await self.http_client.accept_friend(v3, v4, scene)
|
||
|
||
async def delete_friend(self, wxid: str) -> bool:
|
||
"""
|
||
删除好友
|
||
|
||
Args:
|
||
wxid: 要删除的好友 wxid
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
return await self.http_client.delete_friend(wxid)
|
||
|
||
async def set_friend_remark(self, wxid: str, remark: str) -> bool:
|
||
"""
|
||
修改好友备注
|
||
|
||
Args:
|
||
wxid: 好友 wxid
|
||
remark: 新备注
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
return await self.http_client.set_friend_remark(wxid, remark)
|
||
|
||
# ==================== 群聊管理 ====================
|
||
|
||
async def get_chatroom_list(self, force_refresh: bool = False) -> List[Dict]:
|
||
"""
|
||
获取群聊列表
|
||
|
||
Returns:
|
||
群聊列表
|
||
"""
|
||
# 新协议可能需要从好友列表中筛选
|
||
friends = await self.get_friend_list()
|
||
chatrooms = []
|
||
room_ids = set()
|
||
for friend in friends:
|
||
contact = friend.get("contact", {})
|
||
username = contact.get("userName", {})
|
||
if isinstance(username, dict):
|
||
wxid = username.get("String", "")
|
||
else:
|
||
wxid = str(username)
|
||
if wxid.endswith("@chatroom"):
|
||
chatrooms.append(friend)
|
||
room_ids.add(wxid)
|
||
|
||
# 从本地数据库补全未保存到通讯录的群聊
|
||
db_rooms = await self._get_chatroom_list_from_db(force_refresh=force_refresh)
|
||
for room_id in db_rooms:
|
||
if room_id not in room_ids:
|
||
chatrooms.append(room_id)
|
||
room_ids.add(room_id)
|
||
return chatrooms
|
||
|
||
async def _get_chatroom_list_from_db(self, force_refresh: bool = False) -> List[str]:
|
||
"""从本地 contact.db 尝试获取群聊列表(兜底)"""
|
||
if not force_refresh and "contact.db" in self._db_chatroom_list_cache:
|
||
cached = self._db_chatroom_list_cache.get("contact.db", [])
|
||
if cached:
|
||
return cached[:]
|
||
|
||
candidates = set()
|
||
queries = [
|
||
("contact.db", "select chatroomname as room_id from chat_room_info_detail"),
|
||
("contact.db", "select username as room_id from chat_room_info_detail"),
|
||
("contact.db", "select chatroomname, username from chat_room_info_detail"),
|
||
("contact.db", "select username as room_id from rcontact where username like '%@chatroom'"),
|
||
("contact.db", "select username as room_id from chatroom"),
|
||
("contact.db", "select chatroomname as room_id from chatroom"),
|
||
]
|
||
|
||
for db_name, sql_fmt in queries:
|
||
rows = await self.http_client.sqlite_exec(db_name, sql_fmt)
|
||
if not rows:
|
||
continue
|
||
for row in rows:
|
||
room_id = ""
|
||
if isinstance(row, str):
|
||
room_id = row
|
||
elif isinstance(row, dict):
|
||
for key in ("room_id", "chatroomname", "username", "userName", "UserName"):
|
||
value = row.get(key)
|
||
if isinstance(value, dict):
|
||
value = value.get("String", "") or value.get("string", "")
|
||
if isinstance(value, str) and value:
|
||
room_id = value
|
||
break
|
||
if room_id and room_id.endswith("@chatroom"):
|
||
candidates.add(room_id)
|
||
|
||
if not candidates:
|
||
candidates.update(await self._scan_chatrooms_from_db("contact.db"))
|
||
|
||
if force_refresh:
|
||
db_names = await self._get_db_names()
|
||
for db_name in db_names:
|
||
if db_name == "contact.db":
|
||
continue
|
||
candidates.update(await self._scan_chatrooms_from_db(db_name))
|
||
|
||
result = list(candidates)
|
||
if result:
|
||
self._db_chatroom_list_cache["contact.db"] = result[:]
|
||
return result
|
||
|
||
async def _scan_chatrooms_from_db(self, db_name: str) -> List[str]:
|
||
"""扫描数据库表,尝试找出所有群聊 ID"""
|
||
results = set()
|
||
tables = await self._list_db_tables(db_name)
|
||
for table in tables:
|
||
table_lower = table.lower()
|
||
if "chatroom" not in table_lower and "room" not in table_lower and "contact" not in table_lower:
|
||
continue
|
||
columns = await self._get_table_columns(db_name, table)
|
||
for col in columns:
|
||
col_lower = col.lower()
|
||
if (
|
||
"chatroom" in col_lower
|
||
or col_lower in {"username", "user_name", "roomid", "room_id", "chatroomname"}
|
||
):
|
||
sql = f'select distinct "{col}" as room_id from "{table}" where "{col}" like \'%@chatroom\''
|
||
rows = await self.http_client.sqlite_exec(db_name, sql)
|
||
for row in rows:
|
||
room_id = ""
|
||
if isinstance(row, dict):
|
||
room_id = row.get("room_id", "") or row.get(col, "")
|
||
elif isinstance(row, str):
|
||
room_id = row
|
||
if isinstance(room_id, dict):
|
||
room_id = room_id.get("String", "") or room_id.get("string", "")
|
||
if room_id and isinstance(room_id, str) and room_id.endswith("@chatroom"):
|
||
results.add(room_id)
|
||
return list(results)
|
||
|
||
async def _list_db_tables(self, db_name: str) -> List[str]:
|
||
"""获取数据库表名列表"""
|
||
if db_name in self._db_tables_cache:
|
||
cached = self._db_tables_cache.get(db_name, [])
|
||
if cached:
|
||
return cached[:]
|
||
|
||
rows = await self.http_client.sqlite_exec(db_name, "select name from sqlite_master where type='table'")
|
||
tables = []
|
||
for row in rows:
|
||
name = ""
|
||
if isinstance(row, dict):
|
||
name = row.get("name", "")
|
||
elif isinstance(row, str):
|
||
name = row
|
||
if name:
|
||
tables.append(name)
|
||
if tables:
|
||
self._db_tables_cache[db_name] = tables[:]
|
||
return tables
|
||
|
||
async def _get_db_names(self, force_refresh: bool = False) -> List[str]:
|
||
"""获取可用数据库名称列表"""
|
||
if self._db_names_cache and not force_refresh:
|
||
return self._db_names_cache[:]
|
||
|
||
handles = await self.http_client.get_db_handle()
|
||
names = []
|
||
for item in handles:
|
||
if isinstance(item, dict):
|
||
name = item.get("name", "")
|
||
if name:
|
||
names.append(name)
|
||
if names:
|
||
self._db_names_cache = names[:]
|
||
return names
|
||
|
||
async def _get_table_columns(self, db_name: str, table: str) -> List[str]:
|
||
"""获取表字段列表"""
|
||
if db_name in self._db_table_columns_cache:
|
||
cached = self._db_table_columns_cache[db_name].get(table)
|
||
if cached:
|
||
return cached[:]
|
||
|
||
sql = f'pragma table_info("{table}")'
|
||
rows = await self.http_client.sqlite_exec(db_name, sql)
|
||
columns = []
|
||
for row in rows:
|
||
name = ""
|
||
if isinstance(row, dict):
|
||
name = row.get("name", "")
|
||
elif isinstance(row, str):
|
||
name = row
|
||
if name:
|
||
columns.append(name)
|
||
|
||
self._db_table_columns_cache.setdefault(db_name, {})[table] = columns[:]
|
||
return columns
|
||
|
||
async def get_chatroom_members(self, chatroom_id: str) -> List[Dict]:
|
||
"""
|
||
获取群成员列表
|
||
|
||
Args:
|
||
chatroom_id: 群聊 ID
|
||
|
||
Returns:
|
||
群成员列表
|
||
"""
|
||
info = await self.http_client.get_chatroom_info(chatroom_id)
|
||
detail_map = {}
|
||
total_count = 0
|
||
result = []
|
||
if info:
|
||
new_data = info.get("newChatroomData", {})
|
||
for m in new_data.get("chatRoomMember", []) or []:
|
||
wxid = m.get("userName", "") or m.get("wxid", "")
|
||
if wxid:
|
||
detail_map[wxid] = m
|
||
|
||
all_list = info.get("allMemberUserNameList") or []
|
||
if all_list:
|
||
for entry in all_list:
|
||
if isinstance(entry, dict):
|
||
wxid = entry.get("String", "") or entry.get("string", "")
|
||
else:
|
||
wxid = str(entry)
|
||
if not wxid:
|
||
continue
|
||
detail = detail_map.get(wxid, {})
|
||
result.append({
|
||
"wxid": wxid,
|
||
"nickname": detail.get("nickName", ""),
|
||
"display_name": detail.get("displayName", ""),
|
||
"avatar": detail.get("bigHeadImgUrl", ""),
|
||
})
|
||
total_count = int(info.get("allMemberCount") or 0)
|
||
if result and (not total_count or total_count == len(result)):
|
||
return result
|
||
|
||
# 兜底:使用原有接口
|
||
members = await self.http_client.get_chatroom_members(chatroom_id)
|
||
for m in members:
|
||
wxid = m.get("userName", "")
|
||
if not wxid:
|
||
continue
|
||
if any(item.get("wxid") == wxid for item in result):
|
||
continue
|
||
result.append({
|
||
"wxid": wxid,
|
||
"nickname": m.get("nickName", ""),
|
||
"display_name": m.get("displayName", ""),
|
||
"avatar": m.get("bigHeadImgUrl", ""),
|
||
})
|
||
if result and (not total_count or total_count == len(result)):
|
||
return result
|
||
|
||
# 再兜底:尝试从本地数据库补全群成员
|
||
db_wxids = await self._get_chatroom_members_from_db(chatroom_id)
|
||
if db_wxids:
|
||
existing = {m.get("wxid") for m in result if m.get("wxid")}
|
||
for wxid in db_wxids:
|
||
if wxid in existing:
|
||
continue
|
||
detail = detail_map.get(wxid, {})
|
||
result.append({
|
||
"wxid": wxid,
|
||
"nickname": detail.get("nickName", ""),
|
||
"display_name": detail.get("displayName", ""),
|
||
"avatar": detail.get("bigHeadImgUrl", ""),
|
||
})
|
||
return result
|
||
|
||
async def _get_chatroom_members_from_db(self, chatroom_id: str) -> List[str]:
|
||
"""从本地数据库尝试获取群成员 wxid 列表"""
|
||
if not chatroom_id or not chatroom_id.endswith("@chatroom"):
|
||
return []
|
||
|
||
room_id = chatroom_id.replace("'", "''")
|
||
|
||
# 常见表/字段兜底
|
||
list_queries = [
|
||
("contact.db", "chatroom", "chatroomname", "memberlist"),
|
||
("contact.db", "chatroom", "chatroomname", "member_list"),
|
||
("contact.db", "chat_room_info_detail", "chatroomname", "memberlist"),
|
||
("contact.db", "chat_room_info_detail", "username", "memberlist"),
|
||
("contact.db", "rcontact", "username", "memberlist"),
|
||
]
|
||
row_queries = [
|
||
("contact.db", "chatroom_member", "chatroomname", "membername"),
|
||
("contact.db", "chatroom_member", "chatroomname", "username"),
|
||
("contact.db", "chatroom_member", "roomid", "username"),
|
||
("contact.db", "chatroom_member", "room_id", "username"),
|
||
]
|
||
|
||
members = await self._try_member_list_queries(list_queries, room_id)
|
||
if members:
|
||
return members
|
||
|
||
members = await self._try_member_row_queries(row_queries, room_id)
|
||
if members:
|
||
return members
|
||
|
||
# 扫描数据库表兜底
|
||
members = await self._scan_chatroom_members_from_db("contact.db", room_id)
|
||
if members:
|
||
return members
|
||
|
||
db_names = await self._get_db_names()
|
||
for db_name in db_names:
|
||
if db_name == "contact.db":
|
||
continue
|
||
members = await self._scan_chatroom_members_from_db(db_name, room_id)
|
||
if members:
|
||
return members
|
||
|
||
return []
|
||
|
||
def _parse_member_list_value(self, value: str) -> List[str]:
|
||
if not value:
|
||
return []
|
||
if not isinstance(value, str):
|
||
return []
|
||
separators = [",", ";", "|", "\n", "\t", " "]
|
||
for sep in separators:
|
||
if sep in value:
|
||
parts = [p.strip() for p in value.split(sep)]
|
||
return [p for p in parts if p and not p.endswith("@chatroom")]
|
||
return [value] if value and not value.endswith("@chatroom") else []
|
||
|
||
async def _try_member_list_queries(self, queries, room_id: str) -> List[str]:
|
||
members = []
|
||
for db_name, table, room_col, list_col in queries:
|
||
sql = f'select "{list_col}" as member_list from "{table}" where "{room_col}" = \'{room_id}\''
|
||
rows = await self.http_client.sqlite_exec(db_name, sql)
|
||
for row in rows:
|
||
value = ""
|
||
if isinstance(row, dict):
|
||
value = row.get("member_list", "") or row.get(list_col, "")
|
||
elif isinstance(row, str):
|
||
value = row
|
||
members.extend(self._parse_member_list_value(value))
|
||
if members:
|
||
return list(dict.fromkeys(members))
|
||
return []
|
||
|
||
async def _try_member_row_queries(self, queries, room_id: str) -> List[str]:
|
||
members = []
|
||
for db_name, table, room_col, member_col in queries:
|
||
sql = f'select "{member_col}" as member_id from "{table}" where "{room_col}" = \'{room_id}\''
|
||
rows = await self.http_client.sqlite_exec(db_name, sql)
|
||
for row in rows:
|
||
value = ""
|
||
if isinstance(row, dict):
|
||
value = row.get("member_id", "") or row.get(member_col, "")
|
||
elif isinstance(row, str):
|
||
value = row
|
||
if isinstance(value, dict):
|
||
value = value.get("String", "") or value.get("string", "")
|
||
if isinstance(value, str) and value and not value.endswith("@chatroom"):
|
||
members.append(value)
|
||
if members:
|
||
return list(dict.fromkeys(members))
|
||
return []
|
||
|
||
async def _scan_chatroom_members_from_db(self, db_name: str, room_id: str) -> List[str]:
|
||
members = []
|
||
tables = await self._list_db_tables(db_name)
|
||
for table in tables:
|
||
table_lower = table.lower()
|
||
if "chatroom" not in table_lower and "room" not in table_lower:
|
||
continue
|
||
|
||
columns = await self._get_table_columns(db_name, table)
|
||
if not columns:
|
||
continue
|
||
|
||
room_cols = [c for c in columns if "chatroom" in c.lower() or c.lower() in {"roomid", "room_id", "chatroomname"}]
|
||
list_cols = [c for c in columns if "member" in c.lower() and "list" in c.lower()]
|
||
member_cols = [
|
||
c for c in columns
|
||
if ("member" in c.lower() or c.lower() in {"username", "user_name", "wxid"})
|
||
and "list" not in c.lower()
|
||
]
|
||
|
||
for room_col in room_cols:
|
||
for list_col in list_cols:
|
||
sql = f'select "{list_col}" as member_list from "{table}" where "{room_col}" = \'{room_id}\''
|
||
rows = await self.http_client.sqlite_exec(db_name, sql)
|
||
for row in rows:
|
||
value = ""
|
||
if isinstance(row, dict):
|
||
value = row.get("member_list", "") or row.get(list_col, "")
|
||
elif isinstance(row, str):
|
||
value = row
|
||
members.extend(self._parse_member_list_value(value))
|
||
if members:
|
||
return list(dict.fromkeys(members))
|
||
|
||
for member_col in member_cols:
|
||
if member_col == room_col:
|
||
continue
|
||
sql = f'select "{member_col}" as member_id from "{table}" where "{room_col}" = \'{room_id}\''
|
||
rows = await self.http_client.sqlite_exec(db_name, sql)
|
||
for row in rows:
|
||
value = ""
|
||
if isinstance(row, dict):
|
||
value = row.get("member_id", "") or row.get(member_col, "")
|
||
elif isinstance(row, str):
|
||
value = row
|
||
if isinstance(value, dict):
|
||
value = value.get("String", "") or value.get("string", "")
|
||
if isinstance(value, str) and value and not value.endswith("@chatroom"):
|
||
members.append(value)
|
||
if members:
|
||
return list(dict.fromkeys(members))
|
||
|
||
return list(dict.fromkeys(members))
|
||
|
||
async def get_chatroom_info(self, chatroom_id: str) -> Optional[Dict]:
|
||
"""
|
||
获取群信息
|
||
|
||
Args:
|
||
chatroom_id: 群聊 ID
|
||
|
||
Returns:
|
||
群信息字典
|
||
"""
|
||
return await self.http_client.get_chatroom_info(chatroom_id)
|
||
|
||
async def create_chatroom(self, member_list: List[str]) -> Optional[str]:
|
||
"""
|
||
创建群聊
|
||
|
||
Args:
|
||
member_list: 成员 wxid 列表(至少2人)
|
||
|
||
Returns:
|
||
新群聊的 chatroom_id
|
||
"""
|
||
return await self.http_client.create_chatroom(member_list)
|
||
|
||
async def invite_to_chatroom(
|
||
self,
|
||
chatroom_id: str,
|
||
wxid_list: List[str]
|
||
) -> bool:
|
||
"""
|
||
邀请进群
|
||
|
||
Args:
|
||
chatroom_id: 群聊 ID
|
||
wxid_list: 要邀请的 wxid 列表
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
return await self.http_client.invite_to_chatroom(chatroom_id, wxid_list)
|
||
|
||
async def remove_chatroom_member(
|
||
self,
|
||
chatroom_id: str,
|
||
wxid_list: List[str]
|
||
) -> bool:
|
||
"""
|
||
踢出群成员
|
||
|
||
Args:
|
||
chatroom_id: 群聊 ID
|
||
wxid_list: 要踢出的 wxid 列表
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
return await self.http_client.remove_chatroom_member(chatroom_id, wxid_list)
|
||
|
||
async def quit_chatroom(self, chatroom_id: str) -> bool:
|
||
"""
|
||
退出群聊
|
||
|
||
Args:
|
||
chatroom_id: 群聊 ID
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
return await self.http_client.quit_chatroom(chatroom_id)
|
||
|
||
async def set_chatroom_name(self, chatroom_id: str, name: str) -> bool:
|
||
"""
|
||
修改群名称
|
||
|
||
Args:
|
||
chatroom_id: 群聊 ID
|
||
name: 新群名称
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
# 新协议可能需要使用不同的 API
|
||
logger.warning("set_chatroom_name 在新协议中可能不可用")
|
||
return False
|
||
|
||
async def set_chatroom_announcement(
|
||
self,
|
||
chatroom_id: str,
|
||
announcement: str
|
||
) -> bool:
|
||
"""
|
||
修改群公告
|
||
|
||
Args:
|
||
chatroom_id: 群聊 ID
|
||
announcement: 群公告内容
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
return await self.http_client.set_chatroom_announcement(chatroom_id, announcement)
|
||
|
||
async def set_my_chatroom_nickname(
|
||
self,
|
||
chatroom_id: str,
|
||
nickname: str
|
||
) -> bool:
|
||
"""
|
||
修改我的群昵称
|
||
|
||
Args:
|
||
chatroom_id: 群聊 ID
|
||
nickname: 新昵称
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
# 新协议可能需要使用不同的 API
|
||
logger.warning("set_my_chatroom_nickname 在新协议中可能不可用")
|
||
return False
|
||
|
||
# ==================== 初始化 ====================
|
||
|
||
async def wechat_init(self) -> bool:
|
||
"""
|
||
微信初始化好友列表、群列表
|
||
|
||
每天需要调用一次,用于刷新缓存
|
||
"""
|
||
return await self.http_client.wechat_init()
|
||
|
||
# ==================== 登录信息 ====================
|
||
|
||
async def get_login_info(self) -> Optional[Dict]:
|
||
"""
|
||
获取当前登录信息
|
||
|
||
Returns:
|
||
登录信息字典
|
||
"""
|
||
result = await self.http_client.get_self_info()
|
||
if result:
|
||
# 尝试提取 wxid 和 nickname
|
||
contact = result.get("contact", result)
|
||
if isinstance(contact, dict):
|
||
username = contact.get("userName", {})
|
||
nickname = contact.get("nickName", {})
|
||
|
||
if isinstance(username, dict):
|
||
self._wxid = username.get("String", "")
|
||
else:
|
||
self._wxid = str(username) if username else ""
|
||
|
||
if isinstance(nickname, dict):
|
||
self._nickname = nickname.get("String", "")
|
||
else:
|
||
self._nickname = str(nickname) if nickname else ""
|
||
|
||
self._login_info = result
|
||
logger.info(f"获取登录信息成功: wxid={self._wxid}, nickname={self._nickname}")
|
||
return result
|
||
|
||
async def get_user_info_in_chatroom(self, chatroom_id: str, user_wxid: str, max_retries: int = 1) -> Optional[Dict]:
|
||
"""
|
||
获取群内用户详细信息
|
||
|
||
Args:
|
||
chatroom_id: 群聊 ID
|
||
user_wxid: 用户 wxid
|
||
max_retries: 最大重试次数
|
||
|
||
Returns:
|
||
用户详细信息字典
|
||
"""
|
||
# 1. 优先从缓存获取(消息回调中已缓存)
|
||
cached = self.get_cached_member_info(chatroom_id, user_wxid)
|
||
if cached:
|
||
return cached
|
||
|
||
# 2. 尝试 API(可能返回 502)
|
||
for attempt in range(max_retries + 1):
|
||
try:
|
||
# 从群成员列表中查找
|
||
members = await self.get_chatroom_members(chatroom_id)
|
||
for member in members:
|
||
if member.get("wxid") == user_wxid or member.get("userName") == user_wxid:
|
||
return member
|
||
|
||
if attempt < max_retries:
|
||
await asyncio.sleep(0.5)
|
||
|
||
except Exception as e:
|
||
logger.debug(f"获取群内用户信息失败: {e}")
|
||
if attempt < max_retries:
|
||
await asyncio.sleep(0.5)
|
||
|
||
return None
|
||
|
||
# ==================== 下载功能 ====================
|
||
|
||
def _build_media_cache_key(
|
||
self,
|
||
media_type: str,
|
||
message: Optional[Dict] = None,
|
||
msg_id: Optional[int] = None,
|
||
file_id: str = "",
|
||
aes_key: str = ""
|
||
) -> str:
|
||
"""构建媒体缓存 key(用于复用下载结果)"""
|
||
media_type = (media_type or "").lower()
|
||
key_parts = [media_type]
|
||
|
||
if msg_id is None and message:
|
||
raw = message.get("_raw", message)
|
||
msg_id = (
|
||
raw.get("newMsgId")
|
||
or message.get("MsgId")
|
||
or raw.get("msgId")
|
||
or message.get("msgId")
|
||
)
|
||
|
||
if msg_id is not None and str(msg_id):
|
||
key_parts.append(f"msg:{msg_id}")
|
||
elif file_id and aes_key:
|
||
key_parts.append(f"cdn:{file_id}:{aes_key}")
|
||
else:
|
||
return ""
|
||
|
||
key_source = "|".join(key_parts)
|
||
key_hash = hashlib.sha1(key_source.encode("utf-8")).hexdigest()
|
||
return f"{media_type}_{key_hash}"
|
||
|
||
def _media_cache_path(self, cache_key: str) -> Path:
|
||
"""获取缓存文件路径"""
|
||
return self._media_cache_dir / f"{cache_key}.bin"
|
||
|
||
def _is_media_cache_valid(self, cache_path: Path) -> bool:
|
||
"""检查缓存是否有效"""
|
||
try:
|
||
if not cache_path.exists():
|
||
return False
|
||
if cache_path.stat().st_size <= 0:
|
||
return False
|
||
age = time.time() - cache_path.stat().st_mtime
|
||
if age > self._media_cache_ttl:
|
||
cache_path.unlink(missing_ok=True)
|
||
return False
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
def _copy_cached_media(self, cache_path: Path, save_path: str) -> Optional[str]:
|
||
"""从缓存复制媒体到目标路径"""
|
||
try:
|
||
target_path = Path(save_path)
|
||
target_path.parent.mkdir(parents=True, exist_ok=True)
|
||
if target_path.exists():
|
||
target_path.unlink()
|
||
shutil.copy2(cache_path, target_path)
|
||
os.utime(cache_path, None)
|
||
return str(target_path)
|
||
except Exception as e:
|
||
logger.warning(f"媒体缓存复制失败: {e}")
|
||
return None
|
||
|
||
def _store_media_cache(self, cache_key: str, source_path: str) -> None:
|
||
"""写入媒体缓存"""
|
||
try:
|
||
source = Path(source_path)
|
||
if not source.exists() or source.stat().st_size <= 0:
|
||
return
|
||
|
||
cache_path = self._media_cache_path(cache_key)
|
||
tmp_path = cache_path.with_suffix(".tmp")
|
||
tmp_path.parent.mkdir(parents=True, exist_ok=True)
|
||
shutil.copy2(source, tmp_path)
|
||
tmp_path.replace(cache_path)
|
||
os.utime(cache_path, None)
|
||
except Exception as e:
|
||
logger.warning(f"写入媒体缓存失败: {e}")
|
||
|
||
async def download_wechat_media(
|
||
self,
|
||
media_type: str,
|
||
save_path: str,
|
||
message: Optional[Dict] = None,
|
||
msg_id: Optional[int] = None,
|
||
total_len: int = 0,
|
||
to_user: str = "",
|
||
from_user: str = "",
|
||
file_id: str = "",
|
||
aes_key: str = "",
|
||
prefer_original: bool = True,
|
||
timeout: float = 60.0
|
||
) -> Optional[str]:
|
||
"""
|
||
统一微信消息媒体下载入口(图片/视频)
|
||
|
||
Args:
|
||
media_type: 媒体类型(image/video)
|
||
save_path: 保存路径
|
||
message: 原始消息数据(优先使用)
|
||
msg_id: 消息 ID (svrid)
|
||
total_len: 文件总大小
|
||
to_user: 接收者 wxid(图片引用消息)
|
||
from_user: 发送者 wxid(图片引用消息)
|
||
file_id: CDN 文件ID(图片)
|
||
aes_key: AES 密钥(图片)
|
||
prefer_original: 是否优先下载原图
|
||
timeout: 单次下载超时(秒)
|
||
|
||
Returns:
|
||
保存路径,失败返回 None
|
||
"""
|
||
media_type = (media_type or "").lower()
|
||
|
||
cache_key = self._build_media_cache_key(media_type, message, msg_id, file_id, aes_key)
|
||
if cache_key:
|
||
lock = self._media_cache_locks.setdefault(cache_key, asyncio.Lock())
|
||
async with lock:
|
||
cache_path = self._media_cache_path(cache_key)
|
||
if self._is_media_cache_valid(cache_path):
|
||
cached = self._copy_cached_media(cache_path, save_path)
|
||
if cached:
|
||
logger.debug(f"媒体缓存命中: {cache_key}")
|
||
return cached
|
||
|
||
result = await self._download_wechat_media_direct(
|
||
media_type=media_type,
|
||
save_path=save_path,
|
||
message=message,
|
||
msg_id=msg_id,
|
||
total_len=total_len,
|
||
to_user=to_user,
|
||
from_user=from_user,
|
||
file_id=file_id,
|
||
aes_key=aes_key,
|
||
prefer_original=prefer_original,
|
||
timeout=timeout
|
||
)
|
||
|
||
if result and result != "expired":
|
||
self._store_media_cache(cache_key, result)
|
||
return result
|
||
|
||
return await self._download_wechat_media_direct(
|
||
media_type=media_type,
|
||
save_path=save_path,
|
||
message=message,
|
||
msg_id=msg_id,
|
||
total_len=total_len,
|
||
to_user=to_user,
|
||
from_user=from_user,
|
||
file_id=file_id,
|
||
aes_key=aes_key,
|
||
prefer_original=prefer_original,
|
||
timeout=timeout
|
||
)
|
||
|
||
async def _download_wechat_media_direct(
|
||
self,
|
||
media_type: str,
|
||
save_path: str,
|
||
message: Optional[Dict] = None,
|
||
msg_id: Optional[int] = None,
|
||
total_len: int = 0,
|
||
to_user: str = "",
|
||
from_user: str = "",
|
||
file_id: str = "",
|
||
aes_key: str = "",
|
||
prefer_original: bool = True,
|
||
timeout: float = 60.0
|
||
) -> Optional[str]:
|
||
"""实际执行微信媒体下载(不含缓存逻辑)"""
|
||
if media_type in {"image", "img", "pic", "picture", "photo"}:
|
||
if message:
|
||
return await self.download_image(message, save_path)
|
||
if msg_id is not None:
|
||
return await self.download_image_by_id(msg_id, total_len, save_path, to_user, from_user)
|
||
if file_id and aes_key:
|
||
return await self.download_image_by_cdn(
|
||
file_id=file_id,
|
||
aes_key=aes_key,
|
||
save_path=save_path,
|
||
prefer_original=prefer_original,
|
||
timeout=timeout
|
||
)
|
||
logger.error("download_wechat_media 缺少图片下载参数")
|
||
return None
|
||
|
||
if media_type in {"video", "vid"}:
|
||
if message:
|
||
return await self.download_video(message, save_path)
|
||
if msg_id is not None:
|
||
return await self.download_video_by_id(msg_id, total_len, save_path)
|
||
logger.error("download_wechat_media 缺少视频下载参数")
|
||
return None
|
||
|
||
logger.error(f"download_wechat_media 不支持的媒体类型: {media_type}")
|
||
return None
|
||
|
||
async def download_image(
|
||
self,
|
||
message: Dict,
|
||
save_path: str
|
||
) -> Optional[str]:
|
||
"""
|
||
下载图片(使用新CDN接口)
|
||
|
||
Args:
|
||
message: 原始消息数据
|
||
save_path: 保存路径
|
||
|
||
Returns:
|
||
保存路径,失败返回 None
|
||
"""
|
||
# 从 XML 中提取 fileid 和 aeskey
|
||
content = message.get("Content", "")
|
||
if not content:
|
||
logger.error("图片消息缺少 Content 字段")
|
||
return None
|
||
|
||
try:
|
||
import xml.etree.ElementTree as ET
|
||
root = ET.fromstring(content)
|
||
img = root.find(".//img")
|
||
if img is None:
|
||
logger.error("无法从 XML 中找到 img 标签")
|
||
return None
|
||
|
||
# 打印所有可用的属性,帮助调试
|
||
logger.debug(f"图片XML属性: {dict(img.attrib)}")
|
||
|
||
# 提取 fileid 和 aeskey
|
||
# 注意:fileid 可能在不同的字段中
|
||
fileid = (img.get("cdnbigimgurl") or
|
||
img.get("cdnmidimgurl") or
|
||
img.get("cdnhdimgurl") or
|
||
img.get("fileid") or "")
|
||
aeskey = img.get("aeskey", "")
|
||
|
||
# 提取缩略图专用参数
|
||
thumb_fileid = img.get("cdnthumburl", "")
|
||
thumb_aeskey = img.get("cdnthumbaeskey", "") or img.get("aeskey", "") # 降级使用原图key
|
||
|
||
if not fileid or not aeskey:
|
||
logger.error(f"缺少必要参数: fileid={'有' if fileid else '无'}, aeskey={'有' if aeskey else '无'}")
|
||
logger.error(f"可用属性: {list(img.attrib.keys())}")
|
||
return None
|
||
|
||
logger.info(f"提取CDN参数: fileid={fileid[:50]}..., aeskey={aeskey[:20]}...")
|
||
if thumb_fileid:
|
||
logger.debug(f"缩略图参数: thumb_fileid={thumb_fileid[:50]}..., thumb_aeskey={thumb_aeskey[:20]}...")
|
||
|
||
# 优先下载原图
|
||
logger.debug("优先尝试下载原图")
|
||
result = await self.http_client.cdn_download_image(
|
||
fileid=fileid,
|
||
aeskey=aeskey,
|
||
save_path=save_path,
|
||
img_type=1 # 1=原图
|
||
)
|
||
|
||
if result:
|
||
import os
|
||
for i in range(20):
|
||
if os.path.exists(result) and os.path.getsize(result) > 0:
|
||
logger.info(f"原图下载成功: {result}, size={os.path.getsize(result)}")
|
||
return result
|
||
await asyncio.sleep(0.5)
|
||
|
||
logger.debug("原图下载失败,尝试缩略图")
|
||
|
||
# 降级到缩略图
|
||
if thumb_fileid and thumb_aeskey:
|
||
result = await self.http_client.cdn_download_image(
|
||
fileid=thumb_fileid,
|
||
aeskey=thumb_aeskey,
|
||
save_path=save_path,
|
||
img_type=2, # 2=缩略图
|
||
timeout=30.0
|
||
)
|
||
|
||
if result:
|
||
import os
|
||
for i in range(20):
|
||
if os.path.exists(result) and os.path.getsize(result) > 0:
|
||
logger.info(f"缩略图下载成功: {result}, size={os.path.getsize(result)}")
|
||
return result
|
||
await asyncio.sleep(0.5)
|
||
|
||
# 如果原图和缩略图都失败,记录错误
|
||
logger.error("图片下载失败:原图和缩略图均无法下载")
|
||
|
||
except Exception as e:
|
||
logger.error(f"下载图片失败: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
return None
|
||
|
||
async def download_image_by_cdn(
|
||
self,
|
||
file_id: str,
|
||
aes_key: str,
|
||
save_path: str,
|
||
prefer_original: bool = True,
|
||
timeout: float = 60.0
|
||
) -> Optional[str]:
|
||
"""
|
||
通过 CDN 参数下载图片(新协议)
|
||
|
||
Args:
|
||
file_id: CDN 文件ID(如 cdnbigimgurl)
|
||
aes_key: AES 密钥
|
||
save_path: 保存路径
|
||
prefer_original: 是否优先下载原图
|
||
timeout: 单次下载超时(秒)
|
||
|
||
Returns:
|
||
保存路径,失败返回 None
|
||
"""
|
||
if not file_id or not aes_key:
|
||
logger.error("缺少必要参数: file_id 或 aes_key 为空")
|
||
return None
|
||
|
||
img_types = [1, 2] if prefer_original else [2, 1]
|
||
|
||
for img_type in img_types:
|
||
result = await self.http_client.cdn_download_image(
|
||
fileid=file_id,
|
||
aeskey=aes_key,
|
||
save_path=save_path,
|
||
img_type=img_type,
|
||
timeout=timeout
|
||
)
|
||
|
||
if result:
|
||
for _ in range(20):
|
||
if os.path.exists(result) and os.path.getsize(result) > 0:
|
||
return result
|
||
await asyncio.sleep(0.5)
|
||
|
||
logger.error("图片下载失败:原图和缩略图均无法下载")
|
||
return None
|
||
|
||
async def download_video(
|
||
self,
|
||
message: Dict,
|
||
save_path: str
|
||
) -> Optional[str]:
|
||
"""
|
||
下载视频
|
||
|
||
Args:
|
||
message: 原始消息数据
|
||
save_path: 保存路径
|
||
|
||
Returns:
|
||
保存路径,失败返回 None
|
||
"""
|
||
raw_data = message.get("_raw", message)
|
||
|
||
# 获取消息 ID
|
||
msg_id = int(raw_data.get("msgId", 0))
|
||
new_msg_id = int(raw_data.get("newMsgId", 0))
|
||
|
||
# 从 XML 内容中提取 total_len
|
||
total_len = 0
|
||
content = message.get("Content", "")
|
||
if content:
|
||
try:
|
||
import xml.etree.ElementTree as ET
|
||
root = ET.fromstring(content)
|
||
video = root.find(".//videomsg")
|
||
if video is not None:
|
||
total_len = int(video.get("length", 0))
|
||
except Exception:
|
||
pass
|
||
|
||
if not total_len:
|
||
logger.warning("无法获取视频长度,尝试使用默认值")
|
||
|
||
return await self.http_client.download_video(
|
||
msg_id=msg_id,
|
||
new_msg_id=new_msg_id,
|
||
total_len=total_len,
|
||
save_path=save_path
|
||
)
|
||
|
||
async def download_video_by_id(
|
||
self,
|
||
msg_id: int,
|
||
total_len: int,
|
||
save_path: str
|
||
) -> Optional[str]:
|
||
"""
|
||
通过消息ID下载视频(用于引用消息场景)
|
||
|
||
Args:
|
||
msg_id: 消息 ID (svrid)
|
||
total_len: 视频总长度
|
||
save_path: 保存路径
|
||
|
||
Returns:
|
||
保存路径,失败返回 None
|
||
"""
|
||
# 对于引用消息,new_msg_id 可以使用 msg_id
|
||
return await self.http_client.download_video(
|
||
msg_id=msg_id,
|
||
new_msg_id=msg_id,
|
||
total_len=total_len,
|
||
save_path=save_path
|
||
)
|
||
|
||
async def download_image_by_id(
|
||
self,
|
||
msg_id: int,
|
||
total_len: int,
|
||
save_path: str,
|
||
to_user: str = "",
|
||
from_user: str = ""
|
||
) -> Optional[str]:
|
||
"""
|
||
通过消息ID下载图片(用于引用消息场景)
|
||
|
||
Args:
|
||
msg_id: 消息 ID (svrid)
|
||
total_len: 图片总大小
|
||
save_path: 保存路径
|
||
to_user: 接收者 wxid(可选)
|
||
from_user: 发送者 wxid(可选)
|
||
|
||
Returns:
|
||
保存路径,失败返回 None
|
||
"""
|
||
return await self.http_client.download_image(
|
||
to_user=to_user,
|
||
from_user=from_user,
|
||
msg_id=msg_id,
|
||
total_len=total_len,
|
||
save_path=save_path
|
||
)
|