Files
WeChatHookBot/WechatHook/client.py

1594 lines
54 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
WechatHookClient - 微信 Hook API 客户端
封装所有微信操作的高级 APIHTTP 协议版本)
"""
import asyncio
import hashlib
import os
import shutil
import time
from pathlib import Path
from typing import List, Dict, Optional
from loguru import logger
from xml.sax.saxutils import escape as xml_escape
from .http_client import HttpClient
class WechatHookClient:
"""
微信 Hook API 客户端
提供统一的异步 API 接口HTTP 协议)
"""
def __init__(self, base_url: str = "http://127.0.0.1:8888", timeout: float = 30.0):
"""
初始化客户端
Args:
base_url: Hook API 的基础 URL
timeout: 请求超时时间
"""
self.base_url = base_url
self.http_client = HttpClient(base_url=base_url, timeout=timeout)
# 登录信息缓存
self._login_info: Optional[Dict] = None
self._wxid: str = ""
self._nickname: str = ""
# 群成员缓存 {chatroom_id: {wxid: member_info}}
self._chatroom_members_cache: Dict[str, Dict[str, Dict]] = {}
self._db_tables_cache: Dict[str, List[str]] = {}
self._db_table_columns_cache: Dict[str, Dict[str, List[str]]] = {}
self._db_chatroom_list_cache: Dict[str, List[str]] = {}
self._db_names_cache: List[str] = []
# 媒体下载缓存(微信图片/视频)
base_dir = Path(__file__).resolve().parent.parent
self._media_cache_dir = base_dir / "temp" / "wechat_media_cache"
self._media_cache_dir.mkdir(parents=True, exist_ok=True)
self._media_cache_ttl = 3600
self._media_cache_locks: Dict[str, asyncio.Lock] = {}
logger.info(f"WechatHookClient 初始化: base_url={base_url}")
async def close(self):
"""关闭客户端"""
await self.http_client.close()
logger.info("WechatHookClient 已关闭")
@property
def wxid(self) -> str:
"""获取当前登录的 wxid"""
return self._wxid
@property
def nickname(self) -> str:
"""获取当前登录的昵称"""
return self._nickname
def update_profile(self, wxid: str, nickname: str):
"""
更新登录信息
Args:
wxid: 微信 ID
nickname: 昵称
"""
self._wxid = wxid
self._nickname = nickname
logger.info(f"更新登录信息: wxid={wxid}, nickname={nickname}")
def update_chatroom_members_cache(self, chatroom_id: str, members: List[Dict]):
"""
更新群成员缓存(从消息回调中提取)
Args:
chatroom_id: 群聊 ID
members: 群成员列表
"""
if not chatroom_id or not members:
return
if chatroom_id not in self._chatroom_members_cache:
self._chatroom_members_cache[chatroom_id] = {}
for member in members:
wxid = member.get("userName", "")
if wxid:
self._chatroom_members_cache[chatroom_id][wxid] = member
def get_cached_member_info(self, chatroom_id: str, user_wxid: str) -> Optional[Dict]:
"""从缓存获取群成员信息"""
if chatroom_id in self._chatroom_members_cache:
return self._chatroom_members_cache[chatroom_id].get(user_wxid)
return None
async def _log_bot_message(self, to_wxid: str, content: str, msg_type: str = "text", media_url: str = ""):
"""记录机器人发送的消息到 MessageLogger"""
try:
from utils.message_hook import log_bot_message
await log_bot_message(to_wxid, content, msg_type, media_url)
except Exception as e:
logger.debug(f"记录机器人消息失败(可忽略): {e}")
async def send_message(self, to_wxid: str, msg_type: str, content: str) -> bool:
"""
统一发送消息入口(文本/图片/视频/文件/XML
Args:
to_wxid: 接收者 wxid
msg_type: 消息类型text/image/video/file/xml
content: 文本内容或文件路径或 XML
Returns:
是否发送成功
"""
return await self._send_message(to_wxid, msg_type, content)
async def _send_message(self, to_wxid: str, msg_type: str, content: str) -> bool:
"""统一发送消息实现,集中日志与类型处理"""
msg_type = (msg_type or "").lower()
if msg_type == "text":
result = await self.http_client.send_text(to_wxid, content)
if result:
await self._log_bot_message(to_wxid, content, "text")
return result
if msg_type == "image":
result = await self.http_client.send_image(to_wxid, content)
if result:
filename = os.path.basename(content)
await self._log_bot_message(to_wxid, f"[图片] {filename}", "image", content)
return result
if msg_type == "video":
# 新协议可能使用文件发送接口发送视频
result = await self.http_client.send_file(to_wxid, content)
if result:
filename = os.path.basename(content)
await self._log_bot_message(to_wxid, f"[视频] {filename}", "video", content)
return result
if msg_type == "file":
result = await self.http_client.send_file(to_wxid, content)
if result:
filename = os.path.basename(content)
await self._log_bot_message(to_wxid, f"[文件] {filename}", "file", content)
return result
if msg_type == "xml":
return await self._send_xml_message(to_wxid, content)
logger.error(f"不支持的消息类型: {msg_type}")
return False
async def _send_xml_message(self, to_wxid: str, xml: str) -> bool:
"""发送 XML 消息(如聊天记录、音乐卡片等)"""
# 尝试提取 appmsg 内容和 type使用新协议
try:
import xml.etree.ElementTree as ET
# 尝试解析 XML
root = ET.fromstring(xml)
# 查找 appmsg 元素
appmsg = root.find(".//appmsg")
if appmsg is None and root.tag == "appmsg":
appmsg = root
if appmsg is not None:
# 提取 type
type_elem = appmsg.find("type")
msg_type = type_elem.text if type_elem is not None else "5"
# 将 appmsg 转换回字符串
appmsg_content = ET.tostring(appmsg, encoding="unicode")
result = await self.http_client.send_app_msg(to_wxid, appmsg_content, msg_type)
if result:
await self._log_bot_message(to_wxid, "[XML消息]", "xml")
return result
except Exception as e:
logger.warning(f"解析 XML 失败,尝试旧协议: {e}")
# 回退到旧协议
result = await self.http_client.send_xml(to_wxid, xml)
if result:
await self._log_bot_message(to_wxid, "[XML消息]", "xml")
return result
# ==================== 消息发送 ====================
async def send_text(self, to_wxid: str, content: str) -> bool:
"""
发送文本消息
Args:
to_wxid: 接收者 wxid
content: 文本内容
Returns:
是否发送成功
"""
return await self._send_message(to_wxid, "text", content)
async def send_image(self, to_wxid: str, image_path: str) -> bool:
"""
发送图片消息
Args:
to_wxid: 接收者 wxid
image_path: 图片文件路径
Returns:
是否发送成功
"""
return await self._send_message(to_wxid, "image", image_path)
async def send_file(self, to_wxid: str, file_path: str) -> bool:
"""
发送文件消息
Args:
to_wxid: 接收者 wxid
file_path: 文件路径
Returns:
是否发送成功
"""
return await self._send_message(to_wxid, "file", file_path)
async def send_media(self, to_wxid: str, file_path: str, media_type: str = "") -> bool:
"""
统一发送媒体消息(图片/视频/文件)
Args:
to_wxid: 接收者 wxid
file_path: 文件路径
media_type: 媒体类型image/video/file可选
Returns:
是否发送成功
"""
media_type = (media_type or "").lower()
if media_type in {"image", "img", "pic", "picture", "photo"}:
return await self._send_message(to_wxid, "image", file_path)
if media_type in {"video", "vid"}:
return await self._send_message(to_wxid, "video", file_path)
if media_type in {"file", "doc", "document"}:
return await self._send_message(to_wxid, "file", file_path)
ext = os.path.splitext(file_path)[1].lower()
if ext in {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}:
return await self._send_message(to_wxid, "image", file_path)
if ext in {".mp4", ".mov", ".avi", ".mkv", ".wmv", ".webm"}:
return await self._send_message(to_wxid, "video", file_path)
return await self._send_message(to_wxid, "file", file_path)
async def send_video(self, to_wxid: str, video_path: str) -> bool:
"""
发送视频消息
Args:
to_wxid: 接收者 wxid
video_path: 视频文件路径
Returns:
是否发送成功
"""
return await self._send_message(to_wxid, "video", video_path)
async def send_at_message(
self,
chatroom_id: str,
content: str,
at_list: List[str]
) -> bool:
"""
发送群聊 @ 消息
Args:
chatroom_id: 群聊 ID
content: 消息内容
at_list: 要 @ 的 wxid 列表,["notify@all"] 表示 @所有人
Returns:
是否发送成功
"""
# 将列表转换为逗号分隔的字符串
wxids = ",".join(at_list)
return await self.http_client.send_at_text(chatroom_id, content, wxids)
async def send_card(
self,
to_wxid: str,
card_wxid: str,
card_nickname: str = ""
) -> bool:
"""
发送名片消息
Args:
to_wxid: 接收者 wxid
card_wxid: 名片的 wxid
card_nickname: 名片昵称(新协议不需要)
Returns:
是否发送成功
"""
return await self.http_client.send_card(to_wxid, card_wxid)
async def send_link(
self,
to_wxid: str,
title: str,
desc: str,
url: str,
thumb_url: str = ""
) -> bool:
"""
发送链接消息
Args:
to_wxid: 接收者 wxid
title: 链接标题
desc: 链接描述
url: 链接地址
thumb_url: 缩略图 URL
Returns:
是否发送成功
"""
# 默认缩略图
if not thumb_url:
thumb_url = "https://www.functen.cn/static/img/709a3f34713ef07b09d524bee2df69d6.DY.webp"
safe_title = xml_escape(str(title or ""), {'"': """, "'": "'"})
safe_desc = xml_escape(str(desc or ""), {'"': """, "'": "'"})
safe_url = xml_escape(str(url or ""), {'"': """, "'": "'"})
safe_thumb_url = xml_escape(str(thumb_url or ""), {'"': """, "'": "'"})
# 构建 appmsg XML 内容(新协议不需要外层 <msg> 标签)
appmsg_content = f'''<appmsg appid="" sdkver="0">
<title>{safe_title}</title>
<des>{safe_desc}</des>
<type>5</type>
<url>{safe_url}</url>
<thumburl>{safe_thumb_url}</thumburl>
</appmsg>'''
# 使用新协议发送卡片消息type=5 为链接卡片
return await self.http_client.send_app_msg(to_wxid, appmsg_content, "5")
async def send_link_card(
self,
to_wxid: str,
title: str,
desc: str,
url: str,
image_url: str = ""
) -> bool:
"""
发送链接卡片
Args:
to_wxid: 接收者 wxid
title: 卡片标题
desc: 卡片描述
url: 链接地址
image_url: 卡片图片 URL
Returns:
是否发送成功
"""
return await self.send_link(to_wxid, title, desc, url, image_url)
async def revoke_message(self, msg_id: str) -> bool:
"""
撤回消息
Args:
msg_id: 消息 ID (newMsgId)
Returns:
是否撤回成功
"""
return await self.http_client.revoke_message(msg_id)
async def send_xml(self, to_wxid: str, xml: str) -> bool:
"""
发送 XML 消息(如聊天记录、音乐卡片等)
Args:
to_wxid: 接收者 wxid
xml: XML 内容(可以是完整的 <msg><appmsg>...</appmsg></msg> 或只有 <appmsg>...</appmsg>
Returns:
是否发送成功
"""
return await self._send_message(to_wxid, "xml", xml)
async def get_group_member_contact(self, room_id: str, member_wxid: str) -> Optional[Dict]:
"""
获取群成员的详细联系人信息(包含头像等)
Args:
room_id: 群聊 ID
member_wxid: 成员 wxid
Returns:
成员详细信息,失败返回 None
"""
return await self.http_client.get_group_member_contact(room_id, member_wxid)
# ==================== CDN 相关(新协议简化) ====================
async def cdn_init(self) -> bool:
"""
初始化 CDN 环境
新协议不需要单独初始化 CDN
Returns:
始终返回 True
"""
logger.info("新协议无需初始化 CDN")
return True
async def cdn_download(self, file_id: str, aes_key: str, save_path: str, file_type: int = 2) -> bool:
"""
CDN 下载(兼容接口)
新协议使用 download_image/download_video
Returns:
是否下载成功
"""
logger.warning("cdn_download 在新协议中不可用,请使用 download_image/download_video")
return False
async def cdn_upload(self, file_path: str, file_type: int = 1) -> Optional[Dict]:
"""
CDN 上传(兼容接口)
新协议直接发送文件,无需先上传
Returns:
None
"""
logger.warning("cdn_upload 在新协议中不需要,直接使用 send_image/send_file")
return None
async def send_cdn_image(self, to_wxid: str, file_path: str) -> bool:
"""
发送图片(兼容接口)
Args:
to_wxid: 接收者 wxid
file_path: 图片文件路径
Returns:
是否发送成功
"""
return await self.send_image(to_wxid, file_path)
# ==================== 好友管理 ====================
async def get_friend_list(self) -> List[Dict]:
"""
获取好友列表
Returns:
好友列表
"""
return await self.http_client.get_friend_list()
async def get_friend_info(self, wxid: str) -> Optional[Dict]:
"""
获取好友信息
Args:
wxid: 好友 wxid
Returns:
好友信息字典
"""
return await self.http_client.get_friend_info(wxid)
async def add_friend(
self,
wxid: str,
verify_msg: str = "",
scene: int = 3
) -> bool:
"""
添加好友
Args:
wxid: 要添加的 wxid
verify_msg: 验证消息
scene: 添加场景3=搜索15=名片)
Returns:
是否发送成功
"""
return await self.http_client.add_friend(wxid, verify_msg, scene)
async def accept_friend(self, v3: str, v4: str, scene: int) -> bool:
"""
同意好友请求
Args:
v3: 好友请求的 v3 参数
v4: 好友请求的 v4 参数
scene: 场景值
Returns:
是否成功
"""
return await self.http_client.accept_friend(v3, v4, scene)
async def delete_friend(self, wxid: str) -> bool:
"""
删除好友
Args:
wxid: 要删除的好友 wxid
Returns:
是否成功
"""
return await self.http_client.delete_friend(wxid)
async def set_friend_remark(self, wxid: str, remark: str) -> bool:
"""
修改好友备注
Args:
wxid: 好友 wxid
remark: 新备注
Returns:
是否成功
"""
return await self.http_client.set_friend_remark(wxid, remark)
# ==================== 群聊管理 ====================
async def get_chatroom_list(self, force_refresh: bool = False) -> List[Dict]:
"""
获取群聊列表
Returns:
群聊列表
"""
# 新协议可能需要从好友列表中筛选
friends = await self.get_friend_list()
chatrooms = []
room_ids = set()
for friend in friends:
contact = friend.get("contact", {})
username = contact.get("userName", {})
if isinstance(username, dict):
wxid = username.get("String", "")
else:
wxid = str(username)
if wxid.endswith("@chatroom"):
chatrooms.append(friend)
room_ids.add(wxid)
# 从本地数据库补全未保存到通讯录的群聊
db_rooms = await self._get_chatroom_list_from_db(force_refresh=force_refresh)
for room_id in db_rooms:
if room_id not in room_ids:
chatrooms.append(room_id)
room_ids.add(room_id)
return chatrooms
async def _get_chatroom_list_from_db(self, force_refresh: bool = False) -> List[str]:
"""从本地 contact.db 尝试获取群聊列表(兜底)"""
if not force_refresh and "contact.db" in self._db_chatroom_list_cache:
cached = self._db_chatroom_list_cache.get("contact.db", [])
if cached:
return cached[:]
candidates = set()
queries = [
("contact.db", "select chatroomname as room_id from chat_room_info_detail"),
("contact.db", "select username as room_id from chat_room_info_detail"),
("contact.db", "select chatroomname, username from chat_room_info_detail"),
("contact.db", "select username as room_id from rcontact where username like '%@chatroom'"),
("contact.db", "select username as room_id from chatroom"),
("contact.db", "select chatroomname as room_id from chatroom"),
]
for db_name, sql_fmt in queries:
rows = await self.http_client.sqlite_exec(db_name, sql_fmt)
if not rows:
continue
for row in rows:
room_id = ""
if isinstance(row, str):
room_id = row
elif isinstance(row, dict):
for key in ("room_id", "chatroomname", "username", "userName", "UserName"):
value = row.get(key)
if isinstance(value, dict):
value = value.get("String", "") or value.get("string", "")
if isinstance(value, str) and value:
room_id = value
break
if room_id and room_id.endswith("@chatroom"):
candidates.add(room_id)
if not candidates:
candidates.update(await self._scan_chatrooms_from_db("contact.db"))
if force_refresh:
db_names = await self._get_db_names()
for db_name in db_names:
if db_name == "contact.db":
continue
candidates.update(await self._scan_chatrooms_from_db(db_name))
result = list(candidates)
if result:
self._db_chatroom_list_cache["contact.db"] = result[:]
return result
async def _scan_chatrooms_from_db(self, db_name: str) -> List[str]:
"""扫描数据库表,尝试找出所有群聊 ID"""
results = set()
tables = await self._list_db_tables(db_name)
for table in tables:
table_lower = table.lower()
if "chatroom" not in table_lower and "room" not in table_lower and "contact" not in table_lower:
continue
columns = await self._get_table_columns(db_name, table)
for col in columns:
col_lower = col.lower()
if (
"chatroom" in col_lower
or col_lower in {"username", "user_name", "roomid", "room_id", "chatroomname"}
):
sql = f'select distinct "{col}" as room_id from "{table}" where "{col}" like \'%@chatroom\''
rows = await self.http_client.sqlite_exec(db_name, sql)
for row in rows:
room_id = ""
if isinstance(row, dict):
room_id = row.get("room_id", "") or row.get(col, "")
elif isinstance(row, str):
room_id = row
if isinstance(room_id, dict):
room_id = room_id.get("String", "") or room_id.get("string", "")
if room_id and isinstance(room_id, str) and room_id.endswith("@chatroom"):
results.add(room_id)
return list(results)
async def _list_db_tables(self, db_name: str) -> List[str]:
"""获取数据库表名列表"""
if db_name in self._db_tables_cache:
cached = self._db_tables_cache.get(db_name, [])
if cached:
return cached[:]
rows = await self.http_client.sqlite_exec(db_name, "select name from sqlite_master where type='table'")
tables = []
for row in rows:
name = ""
if isinstance(row, dict):
name = row.get("name", "")
elif isinstance(row, str):
name = row
if name:
tables.append(name)
if tables:
self._db_tables_cache[db_name] = tables[:]
return tables
async def _get_db_names(self, force_refresh: bool = False) -> List[str]:
"""获取可用数据库名称列表"""
if self._db_names_cache and not force_refresh:
return self._db_names_cache[:]
handles = await self.http_client.get_db_handle()
names = []
for item in handles:
if isinstance(item, dict):
name = item.get("name", "")
if name:
names.append(name)
if names:
self._db_names_cache = names[:]
return names
async def _get_table_columns(self, db_name: str, table: str) -> List[str]:
"""获取表字段列表"""
if db_name in self._db_table_columns_cache:
cached = self._db_table_columns_cache[db_name].get(table)
if cached:
return cached[:]
sql = f'pragma table_info("{table}")'
rows = await self.http_client.sqlite_exec(db_name, sql)
columns = []
for row in rows:
name = ""
if isinstance(row, dict):
name = row.get("name", "")
elif isinstance(row, str):
name = row
if name:
columns.append(name)
self._db_table_columns_cache.setdefault(db_name, {})[table] = columns[:]
return columns
async def get_chatroom_members(self, chatroom_id: str) -> List[Dict]:
"""
获取群成员列表
Args:
chatroom_id: 群聊 ID
Returns:
群成员列表
"""
info = await self.http_client.get_chatroom_info(chatroom_id)
detail_map = {}
total_count = 0
result = []
if info:
new_data = info.get("newChatroomData", {})
for m in new_data.get("chatRoomMember", []) or []:
wxid = m.get("userName", "") or m.get("wxid", "")
if wxid:
detail_map[wxid] = m
all_list = info.get("allMemberUserNameList") or []
if all_list:
for entry in all_list:
if isinstance(entry, dict):
wxid = entry.get("String", "") or entry.get("string", "")
else:
wxid = str(entry)
if not wxid:
continue
detail = detail_map.get(wxid, {})
result.append({
"wxid": wxid,
"nickname": detail.get("nickName", ""),
"display_name": detail.get("displayName", ""),
"avatar": detail.get("bigHeadImgUrl", ""),
})
total_count = int(info.get("allMemberCount") or 0)
if result and (not total_count or total_count == len(result)):
return result
# 兜底:使用原有接口
members = await self.http_client.get_chatroom_members(chatroom_id)
for m in members:
wxid = m.get("userName", "")
if not wxid:
continue
if any(item.get("wxid") == wxid for item in result):
continue
result.append({
"wxid": wxid,
"nickname": m.get("nickName", ""),
"display_name": m.get("displayName", ""),
"avatar": m.get("bigHeadImgUrl", ""),
})
if result and (not total_count or total_count == len(result)):
return result
# 再兜底:尝试从本地数据库补全群成员
db_wxids = await self._get_chatroom_members_from_db(chatroom_id)
if db_wxids:
existing = {m.get("wxid") for m in result if m.get("wxid")}
for wxid in db_wxids:
if wxid in existing:
continue
detail = detail_map.get(wxid, {})
result.append({
"wxid": wxid,
"nickname": detail.get("nickName", ""),
"display_name": detail.get("displayName", ""),
"avatar": detail.get("bigHeadImgUrl", ""),
})
return result
async def _get_chatroom_members_from_db(self, chatroom_id: str) -> List[str]:
"""从本地数据库尝试获取群成员 wxid 列表"""
if not chatroom_id or not chatroom_id.endswith("@chatroom"):
return []
room_id = chatroom_id.replace("'", "''")
# 常见表/字段兜底
list_queries = [
("contact.db", "chatroom", "chatroomname", "memberlist"),
("contact.db", "chatroom", "chatroomname", "member_list"),
("contact.db", "chat_room_info_detail", "chatroomname", "memberlist"),
("contact.db", "chat_room_info_detail", "username", "memberlist"),
("contact.db", "rcontact", "username", "memberlist"),
]
row_queries = [
("contact.db", "chatroom_member", "chatroomname", "membername"),
("contact.db", "chatroom_member", "chatroomname", "username"),
("contact.db", "chatroom_member", "roomid", "username"),
("contact.db", "chatroom_member", "room_id", "username"),
]
members = await self._try_member_list_queries(list_queries, room_id)
if members:
return members
members = await self._try_member_row_queries(row_queries, room_id)
if members:
return members
# 扫描数据库表兜底
members = await self._scan_chatroom_members_from_db("contact.db", room_id)
if members:
return members
db_names = await self._get_db_names()
for db_name in db_names:
if db_name == "contact.db":
continue
members = await self._scan_chatroom_members_from_db(db_name, room_id)
if members:
return members
return []
def _parse_member_list_value(self, value: str) -> List[str]:
if not value:
return []
if not isinstance(value, str):
return []
separators = [",", ";", "|", "\n", "\t", " "]
for sep in separators:
if sep in value:
parts = [p.strip() for p in value.split(sep)]
return [p for p in parts if p and not p.endswith("@chatroom")]
return [value] if value and not value.endswith("@chatroom") else []
async def _try_member_list_queries(self, queries, room_id: str) -> List[str]:
members = []
for db_name, table, room_col, list_col in queries:
sql = f'select "{list_col}" as member_list from "{table}" where "{room_col}" = \'{room_id}\''
rows = await self.http_client.sqlite_exec(db_name, sql)
for row in rows:
value = ""
if isinstance(row, dict):
value = row.get("member_list", "") or row.get(list_col, "")
elif isinstance(row, str):
value = row
members.extend(self._parse_member_list_value(value))
if members:
return list(dict.fromkeys(members))
return []
async def _try_member_row_queries(self, queries, room_id: str) -> List[str]:
members = []
for db_name, table, room_col, member_col in queries:
sql = f'select "{member_col}" as member_id from "{table}" where "{room_col}" = \'{room_id}\''
rows = await self.http_client.sqlite_exec(db_name, sql)
for row in rows:
value = ""
if isinstance(row, dict):
value = row.get("member_id", "") or row.get(member_col, "")
elif isinstance(row, str):
value = row
if isinstance(value, dict):
value = value.get("String", "") or value.get("string", "")
if isinstance(value, str) and value and not value.endswith("@chatroom"):
members.append(value)
if members:
return list(dict.fromkeys(members))
return []
async def _scan_chatroom_members_from_db(self, db_name: str, room_id: str) -> List[str]:
members = []
tables = await self._list_db_tables(db_name)
for table in tables:
table_lower = table.lower()
if "chatroom" not in table_lower and "room" not in table_lower:
continue
columns = await self._get_table_columns(db_name, table)
if not columns:
continue
room_cols = [c for c in columns if "chatroom" in c.lower() or c.lower() in {"roomid", "room_id", "chatroomname"}]
list_cols = [c for c in columns if "member" in c.lower() and "list" in c.lower()]
member_cols = [
c for c in columns
if ("member" in c.lower() or c.lower() in {"username", "user_name", "wxid"})
and "list" not in c.lower()
]
for room_col in room_cols:
for list_col in list_cols:
sql = f'select "{list_col}" as member_list from "{table}" where "{room_col}" = \'{room_id}\''
rows = await self.http_client.sqlite_exec(db_name, sql)
for row in rows:
value = ""
if isinstance(row, dict):
value = row.get("member_list", "") or row.get(list_col, "")
elif isinstance(row, str):
value = row
members.extend(self._parse_member_list_value(value))
if members:
return list(dict.fromkeys(members))
for member_col in member_cols:
if member_col == room_col:
continue
sql = f'select "{member_col}" as member_id from "{table}" where "{room_col}" = \'{room_id}\''
rows = await self.http_client.sqlite_exec(db_name, sql)
for row in rows:
value = ""
if isinstance(row, dict):
value = row.get("member_id", "") or row.get(member_col, "")
elif isinstance(row, str):
value = row
if isinstance(value, dict):
value = value.get("String", "") or value.get("string", "")
if isinstance(value, str) and value and not value.endswith("@chatroom"):
members.append(value)
if members:
return list(dict.fromkeys(members))
return list(dict.fromkeys(members))
async def get_chatroom_info(self, chatroom_id: str) -> Optional[Dict]:
"""
获取群信息
Args:
chatroom_id: 群聊 ID
Returns:
群信息字典
"""
return await self.http_client.get_chatroom_info(chatroom_id)
async def create_chatroom(self, member_list: List[str]) -> Optional[str]:
"""
创建群聊
Args:
member_list: 成员 wxid 列表至少2人
Returns:
新群聊的 chatroom_id
"""
return await self.http_client.create_chatroom(member_list)
async def invite_to_chatroom(
self,
chatroom_id: str,
wxid_list: List[str]
) -> bool:
"""
邀请进群
Args:
chatroom_id: 群聊 ID
wxid_list: 要邀请的 wxid 列表
Returns:
是否成功
"""
return await self.http_client.invite_to_chatroom(chatroom_id, wxid_list)
async def remove_chatroom_member(
self,
chatroom_id: str,
wxid_list: List[str]
) -> bool:
"""
踢出群成员
Args:
chatroom_id: 群聊 ID
wxid_list: 要踢出的 wxid 列表
Returns:
是否成功
"""
return await self.http_client.remove_chatroom_member(chatroom_id, wxid_list)
async def quit_chatroom(self, chatroom_id: str) -> bool:
"""
退出群聊
Args:
chatroom_id: 群聊 ID
Returns:
是否成功
"""
return await self.http_client.quit_chatroom(chatroom_id)
async def set_chatroom_name(self, chatroom_id: str, name: str) -> bool:
"""
修改群名称
Args:
chatroom_id: 群聊 ID
name: 新群名称
Returns:
是否成功
"""
# 新协议可能需要使用不同的 API
logger.warning("set_chatroom_name 在新协议中可能不可用")
return False
async def set_chatroom_announcement(
self,
chatroom_id: str,
announcement: str
) -> bool:
"""
修改群公告
Args:
chatroom_id: 群聊 ID
announcement: 群公告内容
Returns:
是否成功
"""
return await self.http_client.set_chatroom_announcement(chatroom_id, announcement)
async def set_my_chatroom_nickname(
self,
chatroom_id: str,
nickname: str
) -> bool:
"""
修改我的群昵称
Args:
chatroom_id: 群聊 ID
nickname: 新昵称
Returns:
是否成功
"""
# 新协议可能需要使用不同的 API
logger.warning("set_my_chatroom_nickname 在新协议中可能不可用")
return False
# ==================== 初始化 ====================
async def wechat_init(self) -> bool:
"""
微信初始化好友列表、群列表
每天需要调用一次,用于刷新缓存
"""
return await self.http_client.wechat_init()
# ==================== 登录信息 ====================
async def get_login_info(self) -> Optional[Dict]:
"""
获取当前登录信息
Returns:
登录信息字典
"""
result = await self.http_client.get_self_info()
if result:
# 尝试提取 wxid 和 nickname
contact = result.get("contact", result)
if isinstance(contact, dict):
username = contact.get("userName", {})
nickname = contact.get("nickName", {})
if isinstance(username, dict):
self._wxid = username.get("String", "")
else:
self._wxid = str(username) if username else ""
if isinstance(nickname, dict):
self._nickname = nickname.get("String", "")
else:
self._nickname = str(nickname) if nickname else ""
self._login_info = result
logger.info(f"获取登录信息成功: wxid={self._wxid}, nickname={self._nickname}")
return result
async def get_user_info_in_chatroom(self, chatroom_id: str, user_wxid: str, max_retries: int = 1) -> Optional[Dict]:
"""
获取群内用户详细信息
Args:
chatroom_id: 群聊 ID
user_wxid: 用户 wxid
max_retries: 最大重试次数
Returns:
用户详细信息字典
"""
# 1. 优先从缓存获取(消息回调中已缓存)
cached = self.get_cached_member_info(chatroom_id, user_wxid)
if cached:
return cached
# 2. 尝试 API可能返回 502
for attempt in range(max_retries + 1):
try:
# 从群成员列表中查找
members = await self.get_chatroom_members(chatroom_id)
for member in members:
if member.get("wxid") == user_wxid or member.get("userName") == user_wxid:
return member
if attempt < max_retries:
await asyncio.sleep(0.5)
except Exception as e:
logger.debug(f"获取群内用户信息失败: {e}")
if attempt < max_retries:
await asyncio.sleep(0.5)
return None
# ==================== 下载功能 ====================
def _build_media_cache_key(
self,
media_type: str,
message: Optional[Dict] = None,
msg_id: Optional[int] = None,
file_id: str = "",
aes_key: str = ""
) -> str:
"""构建媒体缓存 key用于复用下载结果"""
media_type = (media_type or "").lower()
key_parts = [media_type]
if msg_id is None and message:
raw = message.get("_raw", message)
msg_id = (
raw.get("newMsgId")
or message.get("MsgId")
or raw.get("msgId")
or message.get("msgId")
)
if msg_id is not None and str(msg_id):
key_parts.append(f"msg:{msg_id}")
elif file_id and aes_key:
key_parts.append(f"cdn:{file_id}:{aes_key}")
else:
return ""
key_source = "|".join(key_parts)
key_hash = hashlib.sha1(key_source.encode("utf-8")).hexdigest()
return f"{media_type}_{key_hash}"
def _media_cache_path(self, cache_key: str) -> Path:
"""获取缓存文件路径"""
return self._media_cache_dir / f"{cache_key}.bin"
def _is_media_cache_valid(self, cache_path: Path) -> bool:
"""检查缓存是否有效"""
try:
if not cache_path.exists():
return False
if cache_path.stat().st_size <= 0:
return False
age = time.time() - cache_path.stat().st_mtime
if age > self._media_cache_ttl:
cache_path.unlink(missing_ok=True)
return False
return True
except Exception:
return False
def _copy_cached_media(self, cache_path: Path, save_path: str) -> Optional[str]:
"""从缓存复制媒体到目标路径"""
try:
target_path = Path(save_path)
target_path.parent.mkdir(parents=True, exist_ok=True)
if target_path.exists():
target_path.unlink()
shutil.copy2(cache_path, target_path)
os.utime(cache_path, None)
return str(target_path)
except Exception as e:
logger.warning(f"媒体缓存复制失败: {e}")
return None
def _store_media_cache(self, cache_key: str, source_path: str) -> None:
"""写入媒体缓存"""
try:
source = Path(source_path)
if not source.exists() or source.stat().st_size <= 0:
return
cache_path = self._media_cache_path(cache_key)
tmp_path = cache_path.with_suffix(".tmp")
tmp_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, tmp_path)
tmp_path.replace(cache_path)
os.utime(cache_path, None)
except Exception as e:
logger.warning(f"写入媒体缓存失败: {e}")
async def download_wechat_media(
self,
media_type: str,
save_path: str,
message: Optional[Dict] = None,
msg_id: Optional[int] = None,
total_len: int = 0,
to_user: str = "",
from_user: str = "",
file_id: str = "",
aes_key: str = "",
prefer_original: bool = True,
timeout: float = 60.0
) -> Optional[str]:
"""
统一微信消息媒体下载入口(图片/视频)
Args:
media_type: 媒体类型image/video
save_path: 保存路径
message: 原始消息数据(优先使用)
msg_id: 消息 ID (svrid)
total_len: 文件总大小
to_user: 接收者 wxid图片引用消息
from_user: 发送者 wxid图片引用消息
file_id: CDN 文件ID图片
aes_key: AES 密钥(图片)
prefer_original: 是否优先下载原图
timeout: 单次下载超时(秒)
Returns:
保存路径,失败返回 None
"""
media_type = (media_type or "").lower()
cache_key = self._build_media_cache_key(media_type, message, msg_id, file_id, aes_key)
if cache_key:
lock = self._media_cache_locks.setdefault(cache_key, asyncio.Lock())
async with lock:
cache_path = self._media_cache_path(cache_key)
if self._is_media_cache_valid(cache_path):
cached = self._copy_cached_media(cache_path, save_path)
if cached:
logger.debug(f"媒体缓存命中: {cache_key}")
return cached
result = await self._download_wechat_media_direct(
media_type=media_type,
save_path=save_path,
message=message,
msg_id=msg_id,
total_len=total_len,
to_user=to_user,
from_user=from_user,
file_id=file_id,
aes_key=aes_key,
prefer_original=prefer_original,
timeout=timeout
)
if result and result != "expired":
self._store_media_cache(cache_key, result)
return result
return await self._download_wechat_media_direct(
media_type=media_type,
save_path=save_path,
message=message,
msg_id=msg_id,
total_len=total_len,
to_user=to_user,
from_user=from_user,
file_id=file_id,
aes_key=aes_key,
prefer_original=prefer_original,
timeout=timeout
)
async def _download_wechat_media_direct(
self,
media_type: str,
save_path: str,
message: Optional[Dict] = None,
msg_id: Optional[int] = None,
total_len: int = 0,
to_user: str = "",
from_user: str = "",
file_id: str = "",
aes_key: str = "",
prefer_original: bool = True,
timeout: float = 60.0
) -> Optional[str]:
"""实际执行微信媒体下载(不含缓存逻辑)"""
if media_type in {"image", "img", "pic", "picture", "photo"}:
if message:
return await self.download_image(message, save_path)
if msg_id is not None:
return await self.download_image_by_id(msg_id, total_len, save_path, to_user, from_user)
if file_id and aes_key:
return await self.download_image_by_cdn(
file_id=file_id,
aes_key=aes_key,
save_path=save_path,
prefer_original=prefer_original,
timeout=timeout
)
logger.error("download_wechat_media 缺少图片下载参数")
return None
if media_type in {"video", "vid"}:
if message:
return await self.download_video(message, save_path)
if msg_id is not None:
return await self.download_video_by_id(msg_id, total_len, save_path)
logger.error("download_wechat_media 缺少视频下载参数")
return None
logger.error(f"download_wechat_media 不支持的媒体类型: {media_type}")
return None
async def download_image(
self,
message: Dict,
save_path: str
) -> Optional[str]:
"""
下载图片使用新CDN接口
Args:
message: 原始消息数据
save_path: 保存路径
Returns:
保存路径,失败返回 None
"""
# 从 XML 中提取 fileid 和 aeskey
content = message.get("Content", "")
if not content:
logger.error("图片消息缺少 Content 字段")
return None
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(content)
img = root.find(".//img")
if img is None:
logger.error("无法从 XML 中找到 img 标签")
return None
# 打印所有可用的属性,帮助调试
logger.debug(f"图片XML属性: {dict(img.attrib)}")
# 提取 fileid 和 aeskey
# 注意fileid 可能在不同的字段中
fileid = (img.get("cdnbigimgurl") or
img.get("cdnmidimgurl") or
img.get("cdnhdimgurl") or
img.get("fileid") or "")
aeskey = img.get("aeskey", "")
# 提取缩略图专用参数
thumb_fileid = img.get("cdnthumburl", "")
thumb_aeskey = img.get("cdnthumbaeskey", "") or img.get("aeskey", "") # 降级使用原图key
if not fileid or not aeskey:
logger.error(f"缺少必要参数: fileid={'' if fileid else ''}, aeskey={'' if aeskey else ''}")
logger.error(f"可用属性: {list(img.attrib.keys())}")
return None
logger.info(f"提取CDN参数: fileid={fileid[:50]}..., aeskey={aeskey[:20]}...")
if thumb_fileid:
logger.debug(f"缩略图参数: thumb_fileid={thumb_fileid[:50]}..., thumb_aeskey={thumb_aeskey[:20]}...")
# 优先下载原图
logger.debug("优先尝试下载原图")
result = await self.http_client.cdn_download_image(
fileid=fileid,
aeskey=aeskey,
save_path=save_path,
img_type=1 # 1=原图
)
if result:
import os
for i in range(20):
if os.path.exists(result) and os.path.getsize(result) > 0:
logger.info(f"原图下载成功: {result}, size={os.path.getsize(result)}")
return result
await asyncio.sleep(0.5)
logger.debug("原图下载失败,尝试缩略图")
# 降级到缩略图
if thumb_fileid and thumb_aeskey:
result = await self.http_client.cdn_download_image(
fileid=thumb_fileid,
aeskey=thumb_aeskey,
save_path=save_path,
img_type=2, # 2=缩略图
timeout=30.0
)
if result:
import os
for i in range(20):
if os.path.exists(result) and os.path.getsize(result) > 0:
logger.info(f"缩略图下载成功: {result}, size={os.path.getsize(result)}")
return result
await asyncio.sleep(0.5)
# 如果原图和缩略图都失败,记录错误
logger.error("图片下载失败:原图和缩略图均无法下载")
except Exception as e:
logger.error(f"下载图片失败: {e}")
import traceback
logger.error(traceback.format_exc())
return None
async def download_image_by_cdn(
self,
file_id: str,
aes_key: str,
save_path: str,
prefer_original: bool = True,
timeout: float = 60.0
) -> Optional[str]:
"""
通过 CDN 参数下载图片(新协议)
Args:
file_id: CDN 文件ID如 cdnbigimgurl
aes_key: AES 密钥
save_path: 保存路径
prefer_original: 是否优先下载原图
timeout: 单次下载超时(秒)
Returns:
保存路径,失败返回 None
"""
if not file_id or not aes_key:
logger.error("缺少必要参数: file_id 或 aes_key 为空")
return None
img_types = [1, 2] if prefer_original else [2, 1]
for img_type in img_types:
result = await self.http_client.cdn_download_image(
fileid=file_id,
aeskey=aes_key,
save_path=save_path,
img_type=img_type,
timeout=timeout
)
if result:
for _ in range(20):
if os.path.exists(result) and os.path.getsize(result) > 0:
return result
await asyncio.sleep(0.5)
logger.error("图片下载失败:原图和缩略图均无法下载")
return None
async def download_video(
self,
message: Dict,
save_path: str
) -> Optional[str]:
"""
下载视频
Args:
message: 原始消息数据
save_path: 保存路径
Returns:
保存路径,失败返回 None
"""
raw_data = message.get("_raw", message)
# 获取消息 ID
msg_id = int(raw_data.get("msgId", 0))
new_msg_id = int(raw_data.get("newMsgId", 0))
# 从 XML 内容中提取 total_len
total_len = 0
content = message.get("Content", "")
if content:
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(content)
video = root.find(".//videomsg")
if video is not None:
total_len = int(video.get("length", 0))
except Exception:
pass
if not total_len:
logger.warning("无法获取视频长度,尝试使用默认值")
return await self.http_client.download_video(
msg_id=msg_id,
new_msg_id=new_msg_id,
total_len=total_len,
save_path=save_path
)
async def download_video_by_id(
self,
msg_id: int,
total_len: int,
save_path: str
) -> Optional[str]:
"""
通过消息ID下载视频用于引用消息场景
Args:
msg_id: 消息 ID (svrid)
total_len: 视频总长度
save_path: 保存路径
Returns:
保存路径,失败返回 None
"""
# 对于引用消息new_msg_id 可以使用 msg_id
return await self.http_client.download_video(
msg_id=msg_id,
new_msg_id=msg_id,
total_len=total_len,
save_path=save_path
)
async def download_image_by_id(
self,
msg_id: int,
total_len: int,
save_path: str,
to_user: str = "",
from_user: str = ""
) -> Optional[str]:
"""
通过消息ID下载图片用于引用消息场景
Args:
msg_id: 消息 ID (svrid)
total_len: 图片总大小
save_path: 保存路径
to_user: 接收者 wxid可选
from_user: 发送者 wxid可选
Returns:
保存路径,失败返回 None
"""
return await self.http_client.download_image(
to_user=to_user,
from_user=from_user,
msg_id=msg_id,
total_len=total_len,
save_path=save_path
)