""" WechatHookClient - 微信 Hook API 客户端 封装所有微信操作的高级 API(HTTP 协议版本) """ import asyncio import hashlib import os import shutil import time from pathlib import Path from typing import List, Dict, Optional from loguru import logger from xml.sax.saxutils import escape as xml_escape from .http_client import HttpClient class WechatHookClient: """ 微信 Hook API 客户端 提供统一的异步 API 接口(HTTP 协议) """ def __init__(self, base_url: str = "http://127.0.0.1:8888", timeout: float = 30.0): """ 初始化客户端 Args: base_url: Hook API 的基础 URL timeout: 请求超时时间 """ self.base_url = base_url self.http_client = HttpClient(base_url=base_url, timeout=timeout) # 登录信息缓存 self._login_info: Optional[Dict] = None self._wxid: str = "" self._nickname: str = "" # 群成员缓存 {chatroom_id: {wxid: member_info}} self._chatroom_members_cache: Dict[str, Dict[str, Dict]] = {} self._db_tables_cache: Dict[str, List[str]] = {} self._db_table_columns_cache: Dict[str, Dict[str, List[str]]] = {} self._db_chatroom_list_cache: Dict[str, List[str]] = {} self._db_names_cache: List[str] = [] # 媒体下载缓存(微信图片/视频) base_dir = Path(__file__).resolve().parent.parent self._media_cache_dir = base_dir / "temp" / "wechat_media_cache" self._media_cache_dir.mkdir(parents=True, exist_ok=True) self._media_cache_ttl = 3600 self._media_cache_locks: Dict[str, asyncio.Lock] = {} logger.info(f"WechatHookClient 初始化: base_url={base_url}") async def close(self): """关闭客户端""" await self.http_client.close() logger.info("WechatHookClient 已关闭") @property def wxid(self) -> str: """获取当前登录的 wxid""" return self._wxid @property def nickname(self) -> str: """获取当前登录的昵称""" return self._nickname def update_profile(self, wxid: str, nickname: str): """ 更新登录信息 Args: wxid: 微信 ID nickname: 昵称 """ self._wxid = wxid self._nickname = nickname logger.info(f"更新登录信息: wxid={wxid}, nickname={nickname}") def update_chatroom_members_cache(self, chatroom_id: str, members: List[Dict]): """ 更新群成员缓存(从消息回调中提取) Args: chatroom_id: 群聊 ID members: 群成员列表 """ if not chatroom_id or not members: return if chatroom_id not in self._chatroom_members_cache: self._chatroom_members_cache[chatroom_id] = {} for member in members: wxid = member.get("userName", "") if wxid: self._chatroom_members_cache[chatroom_id][wxid] = member def get_cached_member_info(self, chatroom_id: str, user_wxid: str) -> Optional[Dict]: """从缓存获取群成员信息""" if chatroom_id in self._chatroom_members_cache: return self._chatroom_members_cache[chatroom_id].get(user_wxid) return None async def _log_bot_message(self, to_wxid: str, content: str, msg_type: str = "text", media_url: str = ""): """记录机器人发送的消息到 MessageLogger""" try: from utils.message_hook import log_bot_message await log_bot_message(to_wxid, content, msg_type, media_url) except Exception as e: logger.debug(f"记录机器人消息失败(可忽略): {e}") async def send_message(self, to_wxid: str, msg_type: str, content: str) -> bool: """ 统一发送消息入口(文本/图片/视频/文件/XML) Args: to_wxid: 接收者 wxid msg_type: 消息类型(text/image/video/file/xml) content: 文本内容或文件路径或 XML Returns: 是否发送成功 """ return await self._send_message(to_wxid, msg_type, content) async def _send_message(self, to_wxid: str, msg_type: str, content: str) -> bool: """统一发送消息实现,集中日志与类型处理""" msg_type = (msg_type or "").lower() if msg_type == "text": result = await self.http_client.send_text(to_wxid, content) if result: await self._log_bot_message(to_wxid, content, "text") return result if msg_type == "image": result = await self.http_client.send_image(to_wxid, content) if result: filename = os.path.basename(content) await self._log_bot_message(to_wxid, f"[图片] {filename}", "image", content) return result if msg_type == "video": # 新协议可能使用文件发送接口发送视频 result = await self.http_client.send_file(to_wxid, content) if result: filename = os.path.basename(content) await self._log_bot_message(to_wxid, f"[视频] {filename}", "video", content) return result if msg_type == "file": result = await self.http_client.send_file(to_wxid, content) if result: filename = os.path.basename(content) await self._log_bot_message(to_wxid, f"[文件] {filename}", "file", content) return result if msg_type == "xml": return await self._send_xml_message(to_wxid, content) logger.error(f"不支持的消息类型: {msg_type}") return False async def _send_xml_message(self, to_wxid: str, xml: str) -> bool: """发送 XML 消息(如聊天记录、音乐卡片等)""" # 尝试提取 appmsg 内容和 type,使用新协议 try: import xml.etree.ElementTree as ET # 尝试解析 XML root = ET.fromstring(xml) # 查找 appmsg 元素 appmsg = root.find(".//appmsg") if appmsg is None and root.tag == "appmsg": appmsg = root if appmsg is not None: # 提取 type type_elem = appmsg.find("type") msg_type = type_elem.text if type_elem is not None else "5" # 将 appmsg 转换回字符串 appmsg_content = ET.tostring(appmsg, encoding="unicode") result = await self.http_client.send_app_msg(to_wxid, appmsg_content, msg_type) if result: await self._log_bot_message(to_wxid, "[XML消息]", "xml") return result except Exception as e: logger.warning(f"解析 XML 失败,尝试旧协议: {e}") # 回退到旧协议 result = await self.http_client.send_xml(to_wxid, xml) if result: await self._log_bot_message(to_wxid, "[XML消息]", "xml") return result # ==================== 消息发送 ==================== async def send_text(self, to_wxid: str, content: str) -> bool: """ 发送文本消息 Args: to_wxid: 接收者 wxid content: 文本内容 Returns: 是否发送成功 """ return await self._send_message(to_wxid, "text", content) async def send_image(self, to_wxid: str, image_path: str) -> bool: """ 发送图片消息 Args: to_wxid: 接收者 wxid image_path: 图片文件路径 Returns: 是否发送成功 """ return await self._send_message(to_wxid, "image", image_path) async def send_file(self, to_wxid: str, file_path: str) -> bool: """ 发送文件消息 Args: to_wxid: 接收者 wxid file_path: 文件路径 Returns: 是否发送成功 """ return await self._send_message(to_wxid, "file", file_path) async def send_media(self, to_wxid: str, file_path: str, media_type: str = "") -> bool: """ 统一发送媒体消息(图片/视频/文件) Args: to_wxid: 接收者 wxid file_path: 文件路径 media_type: 媒体类型(image/video/file,可选) Returns: 是否发送成功 """ media_type = (media_type or "").lower() if media_type in {"image", "img", "pic", "picture", "photo"}: return await self._send_message(to_wxid, "image", file_path) if media_type in {"video", "vid"}: return await self._send_message(to_wxid, "video", file_path) if media_type in {"file", "doc", "document"}: return await self._send_message(to_wxid, "file", file_path) ext = os.path.splitext(file_path)[1].lower() if ext in {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}: return await self._send_message(to_wxid, "image", file_path) if ext in {".mp4", ".mov", ".avi", ".mkv", ".wmv", ".webm"}: return await self._send_message(to_wxid, "video", file_path) return await self._send_message(to_wxid, "file", file_path) async def send_video(self, to_wxid: str, video_path: str) -> bool: """ 发送视频消息 Args: to_wxid: 接收者 wxid video_path: 视频文件路径 Returns: 是否发送成功 """ return await self._send_message(to_wxid, "video", video_path) async def send_at_message( self, chatroom_id: str, content: str, at_list: List[str] ) -> bool: """ 发送群聊 @ 消息 Args: chatroom_id: 群聊 ID content: 消息内容 at_list: 要 @ 的 wxid 列表,["notify@all"] 表示 @所有人 Returns: 是否发送成功 """ # 将列表转换为逗号分隔的字符串 wxids = ",".join(at_list) return await self.http_client.send_at_text(chatroom_id, content, wxids) async def send_card( self, to_wxid: str, card_wxid: str, card_nickname: str = "" ) -> bool: """ 发送名片消息 Args: to_wxid: 接收者 wxid card_wxid: 名片的 wxid card_nickname: 名片昵称(新协议不需要) Returns: 是否发送成功 """ return await self.http_client.send_card(to_wxid, card_wxid) async def send_link( self, to_wxid: str, title: str, desc: str, url: str, thumb_url: str = "" ) -> bool: """ 发送链接消息 Args: to_wxid: 接收者 wxid title: 链接标题 desc: 链接描述 url: 链接地址 thumb_url: 缩略图 URL Returns: 是否发送成功 """ # 默认缩略图 if not thumb_url: thumb_url = "https://www.functen.cn/static/img/709a3f34713ef07b09d524bee2df69d6.DY.webp" safe_title = xml_escape(str(title or ""), {'"': """, "'": "'"}) safe_desc = xml_escape(str(desc or ""), {'"': """, "'": "'"}) safe_url = xml_escape(str(url or ""), {'"': """, "'": "'"}) safe_thumb_url = xml_escape(str(thumb_url or ""), {'"': """, "'": "'"}) # 构建 appmsg XML 内容(新协议不需要外层 标签) appmsg_content = f''' {safe_title} {safe_desc} 5 {safe_url} {safe_thumb_url} ''' # 使用新协议发送卡片消息,type=5 为链接卡片 return await self.http_client.send_app_msg(to_wxid, appmsg_content, "5") async def send_link_card( self, to_wxid: str, title: str, desc: str, url: str, image_url: str = "" ) -> bool: """ 发送链接卡片 Args: to_wxid: 接收者 wxid title: 卡片标题 desc: 卡片描述 url: 链接地址 image_url: 卡片图片 URL Returns: 是否发送成功 """ return await self.send_link(to_wxid, title, desc, url, image_url) async def revoke_message(self, msg_id: str) -> bool: """ 撤回消息 Args: msg_id: 消息 ID (newMsgId) Returns: 是否撤回成功 """ return await self.http_client.revoke_message(msg_id) async def send_xml(self, to_wxid: str, xml: str) -> bool: """ 发送 XML 消息(如聊天记录、音乐卡片等) Args: to_wxid: 接收者 wxid xml: XML 内容(可以是完整的 ... 或只有 ...) Returns: 是否发送成功 """ return await self._send_message(to_wxid, "xml", xml) async def get_group_member_contact(self, room_id: str, member_wxid: str) -> Optional[Dict]: """ 获取群成员的详细联系人信息(包含头像等) Args: room_id: 群聊 ID member_wxid: 成员 wxid Returns: 成员详细信息,失败返回 None """ return await self.http_client.get_group_member_contact(room_id, member_wxid) # ==================== CDN 相关(新协议简化) ==================== async def cdn_init(self) -> bool: """ 初始化 CDN 环境 新协议不需要单独初始化 CDN Returns: 始终返回 True """ logger.info("新协议无需初始化 CDN") return True async def cdn_download(self, file_id: str, aes_key: str, save_path: str, file_type: int = 2) -> bool: """ CDN 下载(兼容接口) 新协议使用 download_image/download_video Returns: 是否下载成功 """ logger.warning("cdn_download 在新协议中不可用,请使用 download_image/download_video") return False async def cdn_upload(self, file_path: str, file_type: int = 1) -> Optional[Dict]: """ CDN 上传(兼容接口) 新协议直接发送文件,无需先上传 Returns: None """ logger.warning("cdn_upload 在新协议中不需要,直接使用 send_image/send_file") return None async def send_cdn_image(self, to_wxid: str, file_path: str) -> bool: """ 发送图片(兼容接口) Args: to_wxid: 接收者 wxid file_path: 图片文件路径 Returns: 是否发送成功 """ return await self.send_image(to_wxid, file_path) # ==================== 好友管理 ==================== async def get_friend_list(self) -> List[Dict]: """ 获取好友列表 Returns: 好友列表 """ return await self.http_client.get_friend_list() async def get_friend_info(self, wxid: str) -> Optional[Dict]: """ 获取好友信息 Args: wxid: 好友 wxid Returns: 好友信息字典 """ return await self.http_client.get_friend_info(wxid) async def add_friend( self, wxid: str, verify_msg: str = "", scene: int = 3 ) -> bool: """ 添加好友 Args: wxid: 要添加的 wxid verify_msg: 验证消息 scene: 添加场景(3=搜索,15=名片) Returns: 是否发送成功 """ return await self.http_client.add_friend(wxid, verify_msg, scene) async def accept_friend(self, v3: str, v4: str, scene: int) -> bool: """ 同意好友请求 Args: v3: 好友请求的 v3 参数 v4: 好友请求的 v4 参数 scene: 场景值 Returns: 是否成功 """ return await self.http_client.accept_friend(v3, v4, scene) async def delete_friend(self, wxid: str) -> bool: """ 删除好友 Args: wxid: 要删除的好友 wxid Returns: 是否成功 """ return await self.http_client.delete_friend(wxid) async def set_friend_remark(self, wxid: str, remark: str) -> bool: """ 修改好友备注 Args: wxid: 好友 wxid remark: 新备注 Returns: 是否成功 """ return await self.http_client.set_friend_remark(wxid, remark) # ==================== 群聊管理 ==================== async def get_chatroom_list(self, force_refresh: bool = False) -> List[Dict]: """ 获取群聊列表 Returns: 群聊列表 """ # 新协议可能需要从好友列表中筛选 friends = await self.get_friend_list() chatrooms = [] room_ids = set() for friend in friends: contact = friend.get("contact", {}) username = contact.get("userName", {}) if isinstance(username, dict): wxid = username.get("String", "") else: wxid = str(username) if wxid.endswith("@chatroom"): chatrooms.append(friend) room_ids.add(wxid) # 从本地数据库补全未保存到通讯录的群聊 db_rooms = await self._get_chatroom_list_from_db(force_refresh=force_refresh) for room_id in db_rooms: if room_id not in room_ids: chatrooms.append(room_id) room_ids.add(room_id) return chatrooms async def _get_chatroom_list_from_db(self, force_refresh: bool = False) -> List[str]: """从本地 contact.db 尝试获取群聊列表(兜底)""" if not force_refresh and "contact.db" in self._db_chatroom_list_cache: cached = self._db_chatroom_list_cache.get("contact.db", []) if cached: return cached[:] candidates = set() queries = [ ("contact.db", "select chatroomname as room_id from chat_room_info_detail"), ("contact.db", "select username as room_id from chat_room_info_detail"), ("contact.db", "select chatroomname, username from chat_room_info_detail"), ("contact.db", "select username as room_id from rcontact where username like '%@chatroom'"), ("contact.db", "select username as room_id from chatroom"), ("contact.db", "select chatroomname as room_id from chatroom"), ] for db_name, sql_fmt in queries: rows = await self.http_client.sqlite_exec(db_name, sql_fmt) if not rows: continue for row in rows: room_id = "" if isinstance(row, str): room_id = row elif isinstance(row, dict): for key in ("room_id", "chatroomname", "username", "userName", "UserName"): value = row.get(key) if isinstance(value, dict): value = value.get("String", "") or value.get("string", "") if isinstance(value, str) and value: room_id = value break if room_id and room_id.endswith("@chatroom"): candidates.add(room_id) if not candidates: candidates.update(await self._scan_chatrooms_from_db("contact.db")) if force_refresh: db_names = await self._get_db_names() for db_name in db_names: if db_name == "contact.db": continue candidates.update(await self._scan_chatrooms_from_db(db_name)) result = list(candidates) if result: self._db_chatroom_list_cache["contact.db"] = result[:] return result async def _scan_chatrooms_from_db(self, db_name: str) -> List[str]: """扫描数据库表,尝试找出所有群聊 ID""" results = set() tables = await self._list_db_tables(db_name) for table in tables: table_lower = table.lower() if "chatroom" not in table_lower and "room" not in table_lower and "contact" not in table_lower: continue columns = await self._get_table_columns(db_name, table) for col in columns: col_lower = col.lower() if ( "chatroom" in col_lower or col_lower in {"username", "user_name", "roomid", "room_id", "chatroomname"} ): sql = f'select distinct "{col}" as room_id from "{table}" where "{col}" like \'%@chatroom\'' rows = await self.http_client.sqlite_exec(db_name, sql) for row in rows: room_id = "" if isinstance(row, dict): room_id = row.get("room_id", "") or row.get(col, "") elif isinstance(row, str): room_id = row if isinstance(room_id, dict): room_id = room_id.get("String", "") or room_id.get("string", "") if room_id and isinstance(room_id, str) and room_id.endswith("@chatroom"): results.add(room_id) return list(results) async def _list_db_tables(self, db_name: str) -> List[str]: """获取数据库表名列表""" if db_name in self._db_tables_cache: cached = self._db_tables_cache.get(db_name, []) if cached: return cached[:] rows = await self.http_client.sqlite_exec(db_name, "select name from sqlite_master where type='table'") tables = [] for row in rows: name = "" if isinstance(row, dict): name = row.get("name", "") elif isinstance(row, str): name = row if name: tables.append(name) if tables: self._db_tables_cache[db_name] = tables[:] return tables async def _get_db_names(self, force_refresh: bool = False) -> List[str]: """获取可用数据库名称列表""" if self._db_names_cache and not force_refresh: return self._db_names_cache[:] handles = await self.http_client.get_db_handle() names = [] for item in handles: if isinstance(item, dict): name = item.get("name", "") if name: names.append(name) if names: self._db_names_cache = names[:] return names async def _get_table_columns(self, db_name: str, table: str) -> List[str]: """获取表字段列表""" if db_name in self._db_table_columns_cache: cached = self._db_table_columns_cache[db_name].get(table) if cached: return cached[:] sql = f'pragma table_info("{table}")' rows = await self.http_client.sqlite_exec(db_name, sql) columns = [] for row in rows: name = "" if isinstance(row, dict): name = row.get("name", "") elif isinstance(row, str): name = row if name: columns.append(name) self._db_table_columns_cache.setdefault(db_name, {})[table] = columns[:] return columns async def get_chatroom_members(self, chatroom_id: str) -> List[Dict]: """ 获取群成员列表 Args: chatroom_id: 群聊 ID Returns: 群成员列表 """ info = await self.http_client.get_chatroom_info(chatroom_id) detail_map = {} total_count = 0 result = [] if info: new_data = info.get("newChatroomData", {}) for m in new_data.get("chatRoomMember", []) or []: wxid = m.get("userName", "") or m.get("wxid", "") if wxid: detail_map[wxid] = m all_list = info.get("allMemberUserNameList") or [] if all_list: for entry in all_list: if isinstance(entry, dict): wxid = entry.get("String", "") or entry.get("string", "") else: wxid = str(entry) if not wxid: continue detail = detail_map.get(wxid, {}) result.append({ "wxid": wxid, "nickname": detail.get("nickName", ""), "display_name": detail.get("displayName", ""), "avatar": detail.get("bigHeadImgUrl", ""), }) total_count = int(info.get("allMemberCount") or 0) if result and (not total_count or total_count == len(result)): return result # 兜底:使用原有接口 members = await self.http_client.get_chatroom_members(chatroom_id) for m in members: wxid = m.get("userName", "") if not wxid: continue if any(item.get("wxid") == wxid for item in result): continue result.append({ "wxid": wxid, "nickname": m.get("nickName", ""), "display_name": m.get("displayName", ""), "avatar": m.get("bigHeadImgUrl", ""), }) if result and (not total_count or total_count == len(result)): return result # 再兜底:尝试从本地数据库补全群成员 db_wxids = await self._get_chatroom_members_from_db(chatroom_id) if db_wxids: existing = {m.get("wxid") for m in result if m.get("wxid")} for wxid in db_wxids: if wxid in existing: continue detail = detail_map.get(wxid, {}) result.append({ "wxid": wxid, "nickname": detail.get("nickName", ""), "display_name": detail.get("displayName", ""), "avatar": detail.get("bigHeadImgUrl", ""), }) return result async def _get_chatroom_members_from_db(self, chatroom_id: str) -> List[str]: """从本地数据库尝试获取群成员 wxid 列表""" if not chatroom_id or not chatroom_id.endswith("@chatroom"): return [] room_id = chatroom_id.replace("'", "''") # 常见表/字段兜底 list_queries = [ ("contact.db", "chatroom", "chatroomname", "memberlist"), ("contact.db", "chatroom", "chatroomname", "member_list"), ("contact.db", "chat_room_info_detail", "chatroomname", "memberlist"), ("contact.db", "chat_room_info_detail", "username", "memberlist"), ("contact.db", "rcontact", "username", "memberlist"), ] row_queries = [ ("contact.db", "chatroom_member", "chatroomname", "membername"), ("contact.db", "chatroom_member", "chatroomname", "username"), ("contact.db", "chatroom_member", "roomid", "username"), ("contact.db", "chatroom_member", "room_id", "username"), ] members = await self._try_member_list_queries(list_queries, room_id) if members: return members members = await self._try_member_row_queries(row_queries, room_id) if members: return members # 扫描数据库表兜底 members = await self._scan_chatroom_members_from_db("contact.db", room_id) if members: return members db_names = await self._get_db_names() for db_name in db_names: if db_name == "contact.db": continue members = await self._scan_chatroom_members_from_db(db_name, room_id) if members: return members return [] def _parse_member_list_value(self, value: str) -> List[str]: if not value: return [] if not isinstance(value, str): return [] separators = [",", ";", "|", "\n", "\t", " "] for sep in separators: if sep in value: parts = [p.strip() for p in value.split(sep)] return [p for p in parts if p and not p.endswith("@chatroom")] return [value] if value and not value.endswith("@chatroom") else [] async def _try_member_list_queries(self, queries, room_id: str) -> List[str]: members = [] for db_name, table, room_col, list_col in queries: sql = f'select "{list_col}" as member_list from "{table}" where "{room_col}" = \'{room_id}\'' rows = await self.http_client.sqlite_exec(db_name, sql) for row in rows: value = "" if isinstance(row, dict): value = row.get("member_list", "") or row.get(list_col, "") elif isinstance(row, str): value = row members.extend(self._parse_member_list_value(value)) if members: return list(dict.fromkeys(members)) return [] async def _try_member_row_queries(self, queries, room_id: str) -> List[str]: members = [] for db_name, table, room_col, member_col in queries: sql = f'select "{member_col}" as member_id from "{table}" where "{room_col}" = \'{room_id}\'' rows = await self.http_client.sqlite_exec(db_name, sql) for row in rows: value = "" if isinstance(row, dict): value = row.get("member_id", "") or row.get(member_col, "") elif isinstance(row, str): value = row if isinstance(value, dict): value = value.get("String", "") or value.get("string", "") if isinstance(value, str) and value and not value.endswith("@chatroom"): members.append(value) if members: return list(dict.fromkeys(members)) return [] async def _scan_chatroom_members_from_db(self, db_name: str, room_id: str) -> List[str]: members = [] tables = await self._list_db_tables(db_name) for table in tables: table_lower = table.lower() if "chatroom" not in table_lower and "room" not in table_lower: continue columns = await self._get_table_columns(db_name, table) if not columns: continue room_cols = [c for c in columns if "chatroom" in c.lower() or c.lower() in {"roomid", "room_id", "chatroomname"}] list_cols = [c for c in columns if "member" in c.lower() and "list" in c.lower()] member_cols = [ c for c in columns if ("member" in c.lower() or c.lower() in {"username", "user_name", "wxid"}) and "list" not in c.lower() ] for room_col in room_cols: for list_col in list_cols: sql = f'select "{list_col}" as member_list from "{table}" where "{room_col}" = \'{room_id}\'' rows = await self.http_client.sqlite_exec(db_name, sql) for row in rows: value = "" if isinstance(row, dict): value = row.get("member_list", "") or row.get(list_col, "") elif isinstance(row, str): value = row members.extend(self._parse_member_list_value(value)) if members: return list(dict.fromkeys(members)) for member_col in member_cols: if member_col == room_col: continue sql = f'select "{member_col}" as member_id from "{table}" where "{room_col}" = \'{room_id}\'' rows = await self.http_client.sqlite_exec(db_name, sql) for row in rows: value = "" if isinstance(row, dict): value = row.get("member_id", "") or row.get(member_col, "") elif isinstance(row, str): value = row if isinstance(value, dict): value = value.get("String", "") or value.get("string", "") if isinstance(value, str) and value and not value.endswith("@chatroom"): members.append(value) if members: return list(dict.fromkeys(members)) return list(dict.fromkeys(members)) async def get_chatroom_info(self, chatroom_id: str) -> Optional[Dict]: """ 获取群信息 Args: chatroom_id: 群聊 ID Returns: 群信息字典 """ return await self.http_client.get_chatroom_info(chatroom_id) async def create_chatroom(self, member_list: List[str]) -> Optional[str]: """ 创建群聊 Args: member_list: 成员 wxid 列表(至少2人) Returns: 新群聊的 chatroom_id """ return await self.http_client.create_chatroom(member_list) async def invite_to_chatroom( self, chatroom_id: str, wxid_list: List[str] ) -> bool: """ 邀请进群 Args: chatroom_id: 群聊 ID wxid_list: 要邀请的 wxid 列表 Returns: 是否成功 """ return await self.http_client.invite_to_chatroom(chatroom_id, wxid_list) async def remove_chatroom_member( self, chatroom_id: str, wxid_list: List[str] ) -> bool: """ 踢出群成员 Args: chatroom_id: 群聊 ID wxid_list: 要踢出的 wxid 列表 Returns: 是否成功 """ return await self.http_client.remove_chatroom_member(chatroom_id, wxid_list) async def quit_chatroom(self, chatroom_id: str) -> bool: """ 退出群聊 Args: chatroom_id: 群聊 ID Returns: 是否成功 """ return await self.http_client.quit_chatroom(chatroom_id) async def set_chatroom_name(self, chatroom_id: str, name: str) -> bool: """ 修改群名称 Args: chatroom_id: 群聊 ID name: 新群名称 Returns: 是否成功 """ # 新协议可能需要使用不同的 API logger.warning("set_chatroom_name 在新协议中可能不可用") return False async def set_chatroom_announcement( self, chatroom_id: str, announcement: str ) -> bool: """ 修改群公告 Args: chatroom_id: 群聊 ID announcement: 群公告内容 Returns: 是否成功 """ return await self.http_client.set_chatroom_announcement(chatroom_id, announcement) async def set_my_chatroom_nickname( self, chatroom_id: str, nickname: str ) -> bool: """ 修改我的群昵称 Args: chatroom_id: 群聊 ID nickname: 新昵称 Returns: 是否成功 """ # 新协议可能需要使用不同的 API logger.warning("set_my_chatroom_nickname 在新协议中可能不可用") return False # ==================== 初始化 ==================== async def wechat_init(self) -> bool: """ 微信初始化好友列表、群列表 每天需要调用一次,用于刷新缓存 """ return await self.http_client.wechat_init() # ==================== 登录信息 ==================== async def get_login_info(self) -> Optional[Dict]: """ 获取当前登录信息 Returns: 登录信息字典 """ result = await self.http_client.get_self_info() if result: # 尝试提取 wxid 和 nickname contact = result.get("contact", result) if isinstance(contact, dict): username = contact.get("userName", {}) nickname = contact.get("nickName", {}) if isinstance(username, dict): self._wxid = username.get("String", "") else: self._wxid = str(username) if username else "" if isinstance(nickname, dict): self._nickname = nickname.get("String", "") else: self._nickname = str(nickname) if nickname else "" self._login_info = result logger.info(f"获取登录信息成功: wxid={self._wxid}, nickname={self._nickname}") return result async def get_user_info_in_chatroom(self, chatroom_id: str, user_wxid: str, max_retries: int = 1) -> Optional[Dict]: """ 获取群内用户详细信息 Args: chatroom_id: 群聊 ID user_wxid: 用户 wxid max_retries: 最大重试次数 Returns: 用户详细信息字典 """ # 1. 优先从缓存获取(消息回调中已缓存) cached = self.get_cached_member_info(chatroom_id, user_wxid) if cached: return cached # 2. 尝试 API(可能返回 502) for attempt in range(max_retries + 1): try: # 从群成员列表中查找 members = await self.get_chatroom_members(chatroom_id) for member in members: if member.get("wxid") == user_wxid or member.get("userName") == user_wxid: return member if attempt < max_retries: await asyncio.sleep(0.5) except Exception as e: logger.debug(f"获取群内用户信息失败: {e}") if attempt < max_retries: await asyncio.sleep(0.5) return None # ==================== 下载功能 ==================== def _build_media_cache_key( self, media_type: str, message: Optional[Dict] = None, msg_id: Optional[int] = None, file_id: str = "", aes_key: str = "" ) -> str: """构建媒体缓存 key(用于复用下载结果)""" media_type = (media_type or "").lower() key_parts = [media_type] if msg_id is None and message: raw = message.get("_raw", message) msg_id = ( raw.get("newMsgId") or message.get("MsgId") or raw.get("msgId") or message.get("msgId") ) if msg_id is not None and str(msg_id): key_parts.append(f"msg:{msg_id}") elif file_id and aes_key: key_parts.append(f"cdn:{file_id}:{aes_key}") else: return "" key_source = "|".join(key_parts) key_hash = hashlib.sha1(key_source.encode("utf-8")).hexdigest() return f"{media_type}_{key_hash}" def _media_cache_path(self, cache_key: str) -> Path: """获取缓存文件路径""" return self._media_cache_dir / f"{cache_key}.bin" def _is_media_cache_valid(self, cache_path: Path) -> bool: """检查缓存是否有效""" try: if not cache_path.exists(): return False if cache_path.stat().st_size <= 0: return False age = time.time() - cache_path.stat().st_mtime if age > self._media_cache_ttl: cache_path.unlink(missing_ok=True) return False return True except Exception: return False def _copy_cached_media(self, cache_path: Path, save_path: str) -> Optional[str]: """从缓存复制媒体到目标路径""" try: target_path = Path(save_path) target_path.parent.mkdir(parents=True, exist_ok=True) if target_path.exists(): target_path.unlink() shutil.copy2(cache_path, target_path) os.utime(cache_path, None) return str(target_path) except Exception as e: logger.warning(f"媒体缓存复制失败: {e}") return None def _store_media_cache(self, cache_key: str, source_path: str) -> None: """写入媒体缓存""" try: source = Path(source_path) if not source.exists() or source.stat().st_size <= 0: return cache_path = self._media_cache_path(cache_key) tmp_path = cache_path.with_suffix(".tmp") tmp_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(source, tmp_path) tmp_path.replace(cache_path) os.utime(cache_path, None) except Exception as e: logger.warning(f"写入媒体缓存失败: {e}") async def download_wechat_media( self, media_type: str, save_path: str, message: Optional[Dict] = None, msg_id: Optional[int] = None, total_len: int = 0, to_user: str = "", from_user: str = "", file_id: str = "", aes_key: str = "", prefer_original: bool = True, timeout: float = 60.0 ) -> Optional[str]: """ 统一微信消息媒体下载入口(图片/视频) Args: media_type: 媒体类型(image/video) save_path: 保存路径 message: 原始消息数据(优先使用) msg_id: 消息 ID (svrid) total_len: 文件总大小 to_user: 接收者 wxid(图片引用消息) from_user: 发送者 wxid(图片引用消息) file_id: CDN 文件ID(图片) aes_key: AES 密钥(图片) prefer_original: 是否优先下载原图 timeout: 单次下载超时(秒) Returns: 保存路径,失败返回 None """ media_type = (media_type or "").lower() cache_key = self._build_media_cache_key(media_type, message, msg_id, file_id, aes_key) if cache_key: lock = self._media_cache_locks.setdefault(cache_key, asyncio.Lock()) async with lock: cache_path = self._media_cache_path(cache_key) if self._is_media_cache_valid(cache_path): cached = self._copy_cached_media(cache_path, save_path) if cached: logger.debug(f"媒体缓存命中: {cache_key}") return cached result = await self._download_wechat_media_direct( media_type=media_type, save_path=save_path, message=message, msg_id=msg_id, total_len=total_len, to_user=to_user, from_user=from_user, file_id=file_id, aes_key=aes_key, prefer_original=prefer_original, timeout=timeout ) if result and result != "expired": self._store_media_cache(cache_key, result) return result return await self._download_wechat_media_direct( media_type=media_type, save_path=save_path, message=message, msg_id=msg_id, total_len=total_len, to_user=to_user, from_user=from_user, file_id=file_id, aes_key=aes_key, prefer_original=prefer_original, timeout=timeout ) async def _download_wechat_media_direct( self, media_type: str, save_path: str, message: Optional[Dict] = None, msg_id: Optional[int] = None, total_len: int = 0, to_user: str = "", from_user: str = "", file_id: str = "", aes_key: str = "", prefer_original: bool = True, timeout: float = 60.0 ) -> Optional[str]: """实际执行微信媒体下载(不含缓存逻辑)""" if media_type in {"image", "img", "pic", "picture", "photo"}: if message: return await self.download_image(message, save_path) if msg_id is not None: return await self.download_image_by_id(msg_id, total_len, save_path, to_user, from_user) if file_id and aes_key: return await self.download_image_by_cdn( file_id=file_id, aes_key=aes_key, save_path=save_path, prefer_original=prefer_original, timeout=timeout ) logger.error("download_wechat_media 缺少图片下载参数") return None if media_type in {"video", "vid"}: if message: return await self.download_video(message, save_path) if msg_id is not None: return await self.download_video_by_id(msg_id, total_len, save_path) logger.error("download_wechat_media 缺少视频下载参数") return None logger.error(f"download_wechat_media 不支持的媒体类型: {media_type}") return None async def download_image( self, message: Dict, save_path: str ) -> Optional[str]: """ 下载图片(使用新CDN接口) Args: message: 原始消息数据 save_path: 保存路径 Returns: 保存路径,失败返回 None """ # 从 XML 中提取 fileid 和 aeskey content = message.get("Content", "") if not content: logger.error("图片消息缺少 Content 字段") return None try: import xml.etree.ElementTree as ET root = ET.fromstring(content) img = root.find(".//img") if img is None: logger.error("无法从 XML 中找到 img 标签") return None # 打印所有可用的属性,帮助调试 logger.debug(f"图片XML属性: {dict(img.attrib)}") # 提取 fileid 和 aeskey # 注意:fileid 可能在不同的字段中 fileid = (img.get("cdnbigimgurl") or img.get("cdnmidimgurl") or img.get("cdnhdimgurl") or img.get("fileid") or "") aeskey = img.get("aeskey", "") # 提取缩略图专用参数 thumb_fileid = img.get("cdnthumburl", "") thumb_aeskey = img.get("cdnthumbaeskey", "") or img.get("aeskey", "") # 降级使用原图key if not fileid or not aeskey: logger.error(f"缺少必要参数: fileid={'有' if fileid else '无'}, aeskey={'有' if aeskey else '无'}") logger.error(f"可用属性: {list(img.attrib.keys())}") return None logger.info(f"提取CDN参数: fileid={fileid[:50]}..., aeskey={aeskey[:20]}...") if thumb_fileid: logger.debug(f"缩略图参数: thumb_fileid={thumb_fileid[:50]}..., thumb_aeskey={thumb_aeskey[:20]}...") # 优先下载原图 logger.debug("优先尝试下载原图") result = await self.http_client.cdn_download_image( fileid=fileid, aeskey=aeskey, save_path=save_path, img_type=1 # 1=原图 ) if result: import os for i in range(20): if os.path.exists(result) and os.path.getsize(result) > 0: logger.info(f"原图下载成功: {result}, size={os.path.getsize(result)}") return result await asyncio.sleep(0.5) logger.debug("原图下载失败,尝试缩略图") # 降级到缩略图 if thumb_fileid and thumb_aeskey: result = await self.http_client.cdn_download_image( fileid=thumb_fileid, aeskey=thumb_aeskey, save_path=save_path, img_type=2, # 2=缩略图 timeout=30.0 ) if result: import os for i in range(20): if os.path.exists(result) and os.path.getsize(result) > 0: logger.info(f"缩略图下载成功: {result}, size={os.path.getsize(result)}") return result await asyncio.sleep(0.5) # 如果原图和缩略图都失败,记录错误 logger.error("图片下载失败:原图和缩略图均无法下载") except Exception as e: logger.error(f"下载图片失败: {e}") import traceback logger.error(traceback.format_exc()) return None async def download_image_by_cdn( self, file_id: str, aes_key: str, save_path: str, prefer_original: bool = True, timeout: float = 60.0 ) -> Optional[str]: """ 通过 CDN 参数下载图片(新协议) Args: file_id: CDN 文件ID(如 cdnbigimgurl) aes_key: AES 密钥 save_path: 保存路径 prefer_original: 是否优先下载原图 timeout: 单次下载超时(秒) Returns: 保存路径,失败返回 None """ if not file_id or not aes_key: logger.error("缺少必要参数: file_id 或 aes_key 为空") return None img_types = [1, 2] if prefer_original else [2, 1] for img_type in img_types: result = await self.http_client.cdn_download_image( fileid=file_id, aeskey=aes_key, save_path=save_path, img_type=img_type, timeout=timeout ) if result: for _ in range(20): if os.path.exists(result) and os.path.getsize(result) > 0: return result await asyncio.sleep(0.5) logger.error("图片下载失败:原图和缩略图均无法下载") return None async def download_video( self, message: Dict, save_path: str ) -> Optional[str]: """ 下载视频 Args: message: 原始消息数据 save_path: 保存路径 Returns: 保存路径,失败返回 None """ raw_data = message.get("_raw", message) # 获取消息 ID msg_id = int(raw_data.get("msgId", 0)) new_msg_id = int(raw_data.get("newMsgId", 0)) # 从 XML 内容中提取 total_len total_len = 0 content = message.get("Content", "") if content: try: import xml.etree.ElementTree as ET root = ET.fromstring(content) video = root.find(".//videomsg") if video is not None: total_len = int(video.get("length", 0)) except Exception: pass if not total_len: logger.warning("无法获取视频长度,尝试使用默认值") return await self.http_client.download_video( msg_id=msg_id, new_msg_id=new_msg_id, total_len=total_len, save_path=save_path ) async def download_video_by_id( self, msg_id: int, total_len: int, save_path: str ) -> Optional[str]: """ 通过消息ID下载视频(用于引用消息场景) Args: msg_id: 消息 ID (svrid) total_len: 视频总长度 save_path: 保存路径 Returns: 保存路径,失败返回 None """ # 对于引用消息,new_msg_id 可以使用 msg_id return await self.http_client.download_video( msg_id=msg_id, new_msg_id=msg_id, total_len=total_len, save_path=save_path ) async def download_image_by_id( self, msg_id: int, total_len: int, save_path: str, to_user: str = "", from_user: str = "" ) -> Optional[str]: """ 通过消息ID下载图片(用于引用消息场景) Args: msg_id: 消息 ID (svrid) total_len: 图片总大小 save_path: 保存路径 to_user: 接收者 wxid(可选) from_user: 发送者 wxid(可选) Returns: 保存路径,失败返回 None """ return await self.http_client.download_image( to_user=to_user, from_user=from_user, msg_id=msg_id, total_len=total_len, save_path=save_path )