""" HTTP 客户端模块 用于与新版 Hook API 进行 HTTP 通信 """ import asyncio import time from urllib.parse import urlparse import httpx from typing import Optional, Dict, Any, List from loguru import logger class HttpClient: """ HTTP API 客户端 封装所有与 Hook API 的 HTTP 通信 """ def __init__(self, base_url: str = "http://127.0.0.1:8888", timeout: float = 30.0): """ 初始化 HTTP 客户端 Args: base_url: API 基础 URL timeout: 请求超时时间(秒) """ self.base_url = base_url.rstrip("/") self.timeout = timeout self._client: Optional[httpx.AsyncClient] = None self._last_hook_probe_ts = 0.0 self._last_hook_probe_ok: Optional[bool] = None self._last_hook_probe_error: str = "" # 全局串行:所有 Hook API 只允许一个请求在飞行中。 self._hook_request_semaphore = asyncio.Semaphore(1) self._hook_request_delay = 0.4 # 发送消息专用信号量(串行发送,避免风控) self._send_semaphore = asyncio.Semaphore(1) self._send_delay = 0.5 # 发送间隔 async def _get_client(self) -> httpx.AsyncClient: """获取或创建 HTTP 客户端""" if self._client is None or self._client.is_closed: self._client = httpx.AsyncClient( base_url=self.base_url, timeout=httpx.Timeout(self.timeout), headers={"Content-Type": "application/json"}, trust_env=False ) return self._client async def close(self): """关闭 HTTP 客户端""" if self._client and not self._client.is_closed: await self._client.aclose() self._client = None async def _probe_hook_port(self) -> bool: """探测 Hook 端口是否可连接(用于定位连接失败原因)""" now = time.time() if now - self._last_hook_probe_ts < 2.0: return self._last_hook_probe_ok is True self._last_hook_probe_ts = now parsed = urlparse(self.base_url) host = parsed.hostname or "127.0.0.1" port = parsed.port or (443 if parsed.scheme == "https" else 80) try: reader, writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout=0.8 ) writer.close() await writer.wait_closed() self._last_hook_probe_ok = True self._last_hook_probe_error = "" return True except Exception as e: self._last_hook_probe_ok = False self._last_hook_probe_error = str(e) return False async def _request( self, method: str, endpoint: str, data: Optional[Dict[str, Any]] = None, **kwargs ) -> Optional[Dict[str, Any]]: """ 发送 HTTP 请求 Args: method: HTTP 方法 (GET, POST, etc.) endpoint: API 端点 data: 请求数据 Returns: 响应数据字典,失败返回 None """ if self._hook_request_semaphore.locked(): logger.debug("Hook API 排队中,等待串行执行") async with self._hook_request_semaphore: max_retries = 2 for attempt in range(max_retries + 1): try: if self._hook_request_delay > 0: await asyncio.sleep(self._hook_request_delay) client = await self._get_client() full_url = f"{self.base_url}{endpoint}" logger.debug(f"[HTTP] {method} {full_url} data={data}") if method.upper() == "GET": response = await client.get(endpoint, params=data, **kwargs) elif method.upper() == "POST": response = await client.post(endpoint, json=data, **kwargs) elif method.upper() == "PUT": response = await client.put(endpoint, json=data, **kwargs) else: logger.error(f"不支持的 HTTP 方法: {method}") return None logger.debug(f"[HTTP] 响应状态: {response.status_code}") response.raise_for_status() result = response.json() # 群成员列表响应太长,只记录摘要 if isinstance(result, dict) and 'chatRoomMember' in result.get('newChatroomData', {}): member_count = len(result['newChatroomData']['chatRoomMember']) logger.debug(f"[HTTP] 响应内容: 群成员列表 (共 {member_count} 人)") else: logger.debug(f"[HTTP] 响应内容: {result}") return result except httpx.ConnectError as e: if attempt < max_retries: wait = 0.2 * (attempt + 1) logger.warning(f"HTTP 连接失败: {endpoint} -> {e}, {wait:.1f}s 后重试") await asyncio.sleep(wait) continue hook_ok = await self._probe_hook_port() logger.error( f"HTTP 请求失败: {endpoint} -> {e} | " f"hook_port_open={hook_ok} base_url={self.base_url} " f"probe_error={self._last_hook_probe_error}" ) return None except httpx.TimeoutException: logger.error(f"HTTP 请求超时: {endpoint}") return None except httpx.HTTPStatusError as e: logger.error(f"HTTP 状态错误: {endpoint} -> {e.response.status_code}") return None except Exception as e: logger.error(f"HTTP 请求失败: {endpoint} -> {e}") return None async def post(self, endpoint: str, data: Optional[Dict[str, Any]] = None, **kwargs) -> Optional[Dict[str, Any]]: """发送 POST 请求""" return await self._request("POST", endpoint, data, **kwargs) async def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None, **kwargs) -> Optional[Dict[str, Any]]: """发送 GET 请求""" return await self._request("GET", endpoint, params, **kwargs) # ==================== 消息发送 API ==================== async def send_text(self, wxid: str, msg: str) -> bool: """ 发送文本消息 Args: wxid: 接收者 wxid msg: 文本内容 Returns: 是否发送成功 """ async with self._send_semaphore: if self._send_delay > 0: await asyncio.sleep(self._send_delay) data = {"wxid": wxid, "msg": msg} logger.debug(f"[DEBUG] 发送文本请求: wxid={wxid}, msg长度={len(msg)}") result = await self.post("/api/send_text_msg", data) logger.info(f"[DEBUG] 发送文本 API 响应: {result}") if result is None: logger.error(f"发送文本失败: {wxid}, API 返回 None (可能连接失败)") return False # 检查多种成功响应格式 if result.get("code") == 1: logger.info(f"发送文本成功: {wxid}") return True # 某些 API 使用 baseResponse.ret == 0 表示成功 base_response = result.get("baseResponse", {}) if base_response.get("ret") == 0: logger.info(f"发送文本成功 (baseResponse): {wxid}") return True # 检查 Success 字段 if result.get("Success") is True: logger.info(f"发送文本成功 (Success): {wxid}") return True logger.error(f"发送文本失败: {wxid}, 响应: {result}") return False async def send_image(self, wxid: str, image_path: str, timeout: float = 120.0) -> bool: """ 发送图片消息 Args: wxid: 接收者 wxid image_path: 图片文件路径 timeout: 超时时间(秒) Returns: 是否发送成功 """ async with self._send_semaphore: if self._send_delay > 0: await asyncio.sleep(self._send_delay) data = {"wxid": wxid, "image_path": image_path} result = await self.post("/api/send_image_msg", data, timeout=httpx.Timeout(timeout)) if result is None: logger.error(f"发送图片失败: {wxid}, API 返回 None (可能连接失败)") return False if result.get("code") == 1: logger.info(f"发送图片成功: {wxid}") return True base_response = result.get("baseResponse", {}) if base_response.get("ret") == 0: logger.info(f"发送图片成功 (baseResponse): {wxid}") return True if result.get("Success") is True or result.get("errCode") == 1: logger.info(f"发送图片成功 (Success/errCode): {wxid}") return True logger.error(f"发送图片失败: {wxid}, 响应: {result}") return False async def send_file(self, wxid: str, file_path: str, timeout: float = 120.0) -> bool: """ 发送文件消息 Args: wxid: 接收者 wxid file_path: 文件路径 timeout: 超时时间(秒) Returns: 是否发送成功 """ data = {"wxid": wxid, "full_path": file_path} result = await self.post("/api/send_file_msg", data, timeout=httpx.Timeout(timeout)) if result is None: logger.error(f"发送文件失败: {wxid}, API 返回 None (可能连接失败)") return False if result.get("code") == 1: logger.info(f"发送文件成功: {wxid}") return True base_response = result.get("baseResponse", {}) if base_response.get("ret") == 0: logger.info(f"发送文件成功 (baseResponse): {wxid}") return True if result.get("Success") is True or result.get("errCode") == 1: logger.info(f"发送文件成功 (Success/errCode): {wxid}") return True logger.error(f"发送文件失败: {wxid}, 响应: {result}") return False async def send_at_text(self, room_id: str, msg: str, wxids: str) -> bool: """ 发送 @ 消息 Args: room_id: 群聊 ID msg: 消息内容 wxids: 要 @ 的 wxid,多个用逗号分隔,notify@all 表示 @所有人 Returns: 是否发送成功 """ data = {"room_id": room_id, "msg": msg, "wxids": wxids} result = await self.post("/api/send_at_text", data) if result and result.get("code") == 1: logger.info(f"发送 @ 消息成功: {room_id}") return True logger.error(f"发送 @ 消息失败: {room_id}") return False async def send_card(self, to_wxid: str, card_wxid: str) -> bool: """ 发送名片消息 Args: to_wxid: 接收者 wxid card_wxid: 名片的 wxid Returns: 是否发送成功 """ data = {"towxid": to_wxid, "fromwxid": card_wxid} result = await self.post("/api/send_card_msg", data) if result: base_response = result.get("baseResponse", {}) if base_response.get("ret") == 0: logger.info(f"发送名片成功: {to_wxid}") return True logger.error(f"发送名片失败: {to_wxid}") return False async def send_voice(self, wxid: str, voice_path: str) -> bool: """ 发送语音消息 Args: wxid: 接收者 wxid voice_path: 语音文件路径(silk) Returns: 是否发送成功 """ def _is_success(resp: Optional[Dict[str, Any]]) -> bool: if not resp: return False if resp.get("code") == 1 or resp.get("Success") is True: return True base_response = resp.get("baseResponse", {}) if base_response.get("ret") == 0: return True return False # 新接口: /api/send_voice (toWxid, silkPath) data_new = {"toWxid": wxid, "silkPath": voice_path} result = await self.post("/api/send_voice", data_new) if _is_success(result): logger.info(f"发送语音成功: {wxid}") return True logger.error(f"发送语音失败: {wxid}, 响应: {result}") return False async def send_xml(self, wxid: str, xml: str) -> bool: """ 发送 XML 消息(旧协议,已弃用) Args: wxid: 接收者 wxid xml: XML 内容 Returns: 是否发送成功 """ data = {"wxid": wxid, "xml": xml} result = await self.post("/api/send_xml_msg", data) if result and result.get("code") == 1: logger.info(f"发送 XML 成功: {wxid}") return True logger.error(f"发送 XML 失败: {wxid}") return False async def send_app_msg(self, wxid: str, content: str, msg_type: str = "5") -> bool: """ 发送卡片/XML消息(新协议) Args: wxid: 接收者 wxid content: appmsg XML 内容(不含外层 标签) msg_type: 消息类型,如 "5" 为链接卡片,"19" 为聊天记录等 Returns: 是否发送成功 """ data = {"wxid": wxid, "content": content, "type": msg_type} result = await self.post("/api/send_app_msg", data) if result is None: logger.error(f"发送卡片消息失败: {wxid}, API 返回 None") return False # 检查多种成功响应格式 if result.get("code") == 1: logger.info(f"发送卡片消息成功: {wxid}") return True base_response = result.get("baseResponse", {}) if base_response.get("ret") == 0: logger.info(f"发送卡片消息成功 (baseResponse): {wxid}") return True if result.get("Success") is True: logger.info(f"发送卡片消息成功 (Success): {wxid}") return True logger.error(f"发送卡片消息失败: {wxid}, 响应: {result}") return False async def revoke_message(self, new_msg_id: str) -> bool: """ 撤回消息 Args: new_msg_id: 消息 ID (newMsgId) Returns: 是否撤回成功 """ data = {"newMsgId": new_msg_id} result = await self.post("/api/revoke_msg", data) if result: logger.info(f"撤回消息成功: {new_msg_id}") return True logger.error(f"撤回消息失败: {new_msg_id}") return False # ==================== 好友管理 API ==================== async def get_friend_list(self) -> List[Dict]: """ 获取好友列表 Returns: 好友列表 """ result = await self.post("/api/get_frien_lists") if result and "data" in result: friends = result.get("data", []) logger.info(f"获取好友列表成功,共 {len(friends)} 个好友") return friends # 新接口兜底:先执行初始化再尝试获取 logger.warning("获取好友列表失败,尝试执行微信初始化后重试") await self.wechat_init() result = await self.post("/api/get_frien_lists") if result and "data" in result: friends = result.get("data", []) logger.info(f"获取好友列表成功(初始化后),共 {len(friends)} 个好友") return friends # 兜底:触发全量更新好友列表接口 logger.warning("获取好友列表仍失败,尝试更新好友列表接口") result = await self.post("/api/update_all_friend") if result and "data" in result: friends = result.get("data", []) logger.info(f"获取好友列表成功(更新后),共 {len(friends)} 个好友") return friends logger.error("获取好友列表失败") return [] async def get_friend_info(self, wxid: str) -> Optional[Dict]: """ 获取好友资料(网络获取) Args: wxid: 好友 wxid Returns: 好友资料 """ data = {"wxid": wxid} result = await self.post("/api/get_contact_profile", data) if result: logger.info(f"获取好友资料成功: {wxid}") return result logger.error(f"获取好友资料失败: {wxid}") return None async def get_friend_info_cache(self, wxid: str) -> Optional[Dict]: """ 快速获取好友资料(缓存) Args: wxid: 好友 wxid Returns: 好友资料 """ data = {"wxid": wxid} result = await self.post("/api/get_contact_profile_cache", data) if result: return result return None async def add_friend(self, wxid: str, verify_msg: str = "", scene: int = 3) -> bool: """ 添加好友 Args: wxid: 要添加的 wxid verify_msg: 验证消息 scene: 添加场景 Returns: 是否发送成功 """ data = {"wxid": wxid, "verify_msg": verify_msg, "scene": scene} result = await self.post("/api/add_friend", data) if result: logger.info(f"发送好友请求成功: {wxid}") return True logger.error(f"发送好友请求失败: {wxid}") return False async def accept_friend(self, v3: str, v4: str, scene: int) -> bool: """ 同意好友请求 Args: v3: 好友请求的 v3 参数 v4: 好友请求的 v4 参数 scene: 场景值 Returns: 是否成功 """ data = {"v3": v3, "v4": v4, "scene": scene} result = await self.post("/api/accept_friend", data) if result: logger.info("同意好友请求成功") return True logger.error("同意好友请求失败") return False async def delete_friend(self, wxid: str) -> bool: """ 删除好友 Args: wxid: 要删除的好友 wxid Returns: 是否成功 """ data = {"wxid": wxid} result = await self.post("/api/delete_friend", data) if result: logger.info(f"删除好友成功: {wxid}") return True logger.error(f"删除好友失败: {wxid}") return False async def set_friend_remark(self, wxid: str, remark: str) -> bool: """ 修改好友备注 Args: wxid: 好友 wxid remark: 新备注 Returns: 是否成功 """ data = {"wxid": wxid, "remark": remark} result = await self.post("/api/set_friend_remark", data) if result: logger.info(f"修改备注成功: {wxid} -> {remark}") return True logger.error(f"修改备注失败: {wxid}") return False async def get_db_handle(self) -> List[Dict]: """ 获取数据库句柄列表(新接口) Returns: 数据库句柄列表 """ result = await self.post("/api/get_db_handle") if result and isinstance(result.get("data"), list): return result.get("data", []) return [] async def sqlite_exec(self, db_name: str, sql_fmt: str) -> List[Dict]: """ 执行 SQLite 查询(新接口) Args: db_name: 数据库名(如 contact.db) sql_fmt: SQL 语句 Returns: 结果行列表,失败返回空列表 """ data = {"db_name": db_name, "sql_fmt": sql_fmt} result = await self.post("/api/sqlite3_exec", data) if isinstance(result, list): return result if isinstance(result, dict): rows = result.get("data") if isinstance(rows, list): return rows logger.error(f"执行数据库查询失败: db={db_name}") return [] # ==================== 群聊管理 API ==================== async def get_chatroom_members(self, room_id: str) -> List[Dict]: """ 获取群成员列表 Args: room_id: 群聊 ID Returns: 群成员列表 """ data = {"room_id": room_id} result = await self.post("/api/get_room_members", data) if result: base_response = result.get("baseResponse", {}) if base_response.get("ret") == 0: chatroom_data = result.get("newChatroomData", {}) members = chatroom_data.get("chatRoomMember", []) logger.info(f"获取群成员成功: {room_id}, 成员数: {len(members)}") return members logger.error(f"获取群成员失败: {room_id}") return [] async def get_chatroom_info(self, room_id: str) -> Optional[Dict]: """ 获取群信息 Args: room_id: 群聊 ID Returns: 群信息字典 """ data = {"room_id": room_id} result = await self.post("/api/get_room_members", data) if result: base_response = result.get("baseResponse", {}) if base_response.get("ret") == 0: return result return None async def get_group_member_contact(self, room_id: str, member_wxid: str) -> Optional[Dict]: """ 查询群成员联系人信息(更详细) Args: room_id: 群聊 ID member_wxid: 成员 wxid Returns: 成员联系人信息 """ data = {"roomId": room_id, "wxid": member_wxid} result = await self.post("/api/get_group_member_contact", data) if result: base_response = result.get("baseResponse", {}) if base_response.get("ret") == 0: contact_list = result.get("contactList", []) if contact_list: return contact_list[0] return None async def create_chatroom(self, wxid_list: List[str]) -> Optional[str]: """ 创建群聊 Args: wxid_list: 成员 wxid 列表(至少2人) Returns: 新群聊的 chatroom_id """ data = {"wxid_list": ",".join(wxid_list)} result = await self.post("/api/create_chat_room", data) if result: logger.info("创建群聊成功") return result.get("chatroomUserName") logger.error("创建群聊失败") return None async def invite_to_chatroom(self, room_id: str, wxid_list: List[str]) -> bool: """ 邀请进群 Args: room_id: 群聊 ID wxid_list: 要邀请的 wxid 列表 Returns: 是否成功 """ data = {"room_id": room_id, "wxid_list": ",".join(wxid_list)} result = await self.post("/api/invite_member_to_chat_room", data) if result: logger.info(f"邀请进群成功: {room_id}") return True logger.error(f"邀请进群失败: {room_id}") return False async def remove_chatroom_member(self, room_id: str, wxid_list: List[str]) -> bool: """ 踢出群成员 Args: room_id: 群聊 ID wxid_list: 要踢出的 wxid 列表 Returns: 是否成功 """ data = {"room_id": room_id, "wxid_list": ",".join(wxid_list)} result = await self.post("/api/del_member_from_chat_room", data) if result: logger.info(f"踢出群成员成功: {room_id}") return True logger.error(f"踢出群成员失败: {room_id}") return False async def quit_chatroom(self, room_id: str) -> bool: """ 退出群聊 Args: room_id: 群聊 ID Returns: 是否成功 """ data = {"room_id": room_id} result = await self.post("/api/quit_and_del_chat_room", data) if result: logger.info(f"退出群聊成功: {room_id}") return True logger.error(f"退出群聊失败: {room_id}") return False async def set_chatroom_announcement(self, room_id: str, announcement: str) -> bool: """ 修改群公告 Args: room_id: 群聊 ID announcement: 群公告内容 Returns: 是否成功 """ data = {"roomId": room_id, "announcement": announcement} result = await self.post("/api/set_room_announcement_pb", data) if result: logger.info(f"修改群公告成功: {room_id}") return True logger.error(f"修改群公告失败: {room_id}") return False # ==================== 下载 API ==================== async def cdn_download_image( self, fileid: str, aeskey: str, save_path: str, img_type: int = 1, timeout: float = 60.0 ) -> Optional[str]: """ CDN 下载图片(新接口) Args: fileid: 文件ID aeskey: AES密钥 save_path: 保存路径 img_type: 图片类型 (1=原图, 2=缩略图) timeout: 超时时间(秒),默认60秒 Returns: 保存路径,失败返回 None """ data = { "fileid": fileid, "asekey": aeskey, # 注意:API参数名是 asekey 不是 aeskey "imgType": img_type, "out": save_path } if self._hook_request_semaphore.locked(): logger.debug("Hook API 排队中,等待串行执行") async with self._hook_request_semaphore: # CDN 下载需要更长的超时时间 import httpx try: max_retries = 2 for attempt in range(max_retries + 1): try: client = await self._get_client() logger.debug(f"[HTTP] POST /api/cdn_download (timeout={timeout}s)") response = await client.post( "/api/cdn_download", json=data, timeout=httpx.Timeout(timeout) ) response.raise_for_status() result = response.json() logger.debug(f"CDN下载图片 API 响应: {result}") if result and result.get("errCode") == 1: logger.info(f"CDN下载图片成功: {save_path}") return save_path logger.error(f"CDN下载图片失败, 响应: {result}") return None except httpx.ConnectError as e: if attempt < max_retries: wait = 0.2 * (attempt + 1) logger.warning(f"CDN下载连接失败: {e}, {wait:.1f}s 后重试") await asyncio.sleep(wait) continue logger.error(f"CDN下载图片异常: {e}") return None except httpx.TimeoutException: logger.error(f"CDN下载图片超时 (>{timeout}s): {save_path}") return None except Exception as e: logger.error(f"CDN下载图片异常: {e}") return None async def download_image( self, to_user: str, from_user: str, msg_id: int, total_len: int, save_path: str, start_pos: int = 0, data_len: int = 0, compress_type: int = 0 ) -> Optional[str]: """ 下载图片 Args: to_user: 接收者 wxid from_user: 发送者 wxid msg_id: 消息 ID total_len: 图片总大小 save_path: 保存路径 start_pos: 起始位置 data_len: 数据长度 compress_type: 压缩类型 Returns: 保存路径,失败返回 None """ data = { "to_user": to_user, "from_user": from_user, "MsgId": msg_id, "total_len": total_len, "data_len": data_len or total_len, "start_pos": start_pos, "compress_type": compress_type, "path": save_path } result = await self.post("/api/download_img", data) logger.debug(f"下载图片 API 响应: {result}") if result and result.get("status") == "success": logger.info(f"下载图片成功: {save_path}") return result.get("path") # 检查是否文件过期 if result and result.get("status") == "server_error": server_resp = result.get("serverResp", {}) base_resp = server_resp.get("baseResponse", {}) err_msg = base_resp.get("errMsg", {}) if isinstance(err_msg, dict): err_msg = err_msg.get("String", "") logger.warning(f"下载图片服务器错误: {err_msg}") if "Expired" in str(err_msg): logger.warning(f"图片已过期无法下载: {msg_id}") return "expired" logger.error(f"下载图片失败: {msg_id}, 响应: {result}") return None async def download_video( self, msg_id: int, new_msg_id: int, total_len: int, save_path: str ) -> Optional[str]: """ 下载视频 Args: msg_id: 消息 ID (MsgId) new_msg_id: 新消息 ID (NewMsgId) total_len: 视频总长度 save_path: 保存路径 Returns: 保存路径,失败返回 None """ data = { "MsgId": msg_id, "NewMsgId": new_msg_id, "total_len": total_len, "path": save_path } result = await self.post("/api/download_video", data) if result and result.get("status") == "success": logger.info(f"下载视频成功: {save_path}") return save_path logger.error(f"下载视频失败: {msg_id}") return None # ==================== 初始化 API ==================== async def wechat_init(self) -> bool: """ 微信初始化好友列表、群列表 每天需要调用一次,用于刷新好友和群聊缓存 Returns: 是否成功 """ result = await self.post("/api/wechat_init") if result: logger.info("微信初始化成功(好友列表、群列表)") return True logger.error("微信初始化失败") return False # ==================== 个人信息 API ==================== async def get_self_info(self) -> Optional[Dict]: """ 获取自己的信息(缓存) Returns: 个人信息 """ result = await self.post("/api/get_self_info") if result: return result return None async def set_nickname(self, nickname: str) -> bool: """ 修改自己的昵称 Args: nickname: 新昵称 Returns: 是否成功 """ data = {"nickname": nickname} result = await self.post("/api/set_nickname", data) if result: logger.info(f"修改昵称成功: {nickname}") return True logger.error("修改昵称失败") return False