diff --git a/.env.example b/.env.example index d07dad5..cd98adc 100644 --- a/.env.example +++ b/.env.example @@ -36,6 +36,8 @@ WECHAT_SERVER_URL=http://127.0.0.1:8059/ WECHAT_SERVER_IP=127.0.0.1 WECHAT_SERVER_PORT=8059 WECHAT_SERVER_TYPE=legacy_855 +# 当使用 server_864 时,这里必须填写服务端分配的固定 key。 +WECHAT_SERVER_KEY= # 以下三项可留空,首次登录后会自动写入本地状态缓存文件。 WECHAT_WXID= WECHAT_DEVICE_NAME= diff --git a/config.example.yaml b/config.example.yaml index e02c7e5..775990d 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -53,6 +53,8 @@ wechat_ipad: server_ip: "${WECHAT_SERVER_IP:127.0.0.1}" server_port: "${WECHAT_SERVER_PORT:8059}" server_type: "${WECHAT_SERVER_TYPE:legacy_855}" + # 当 server_type=server_864 时必须提供固定 key;855/859 可留空。 + server_key: "${WECHAT_SERVER_KEY:}" wxid: "${WECHAT_WXID:}" device_name: "${WECHAT_DEVICE_NAME:}" device_id: "${WECHAT_DEVICE_ID:}" diff --git a/config.yaml b/config.yaml index e02c7e5..775990d 100644 --- a/config.yaml +++ b/config.yaml @@ -53,6 +53,8 @@ wechat_ipad: server_ip: "${WECHAT_SERVER_IP:127.0.0.1}" server_port: "${WECHAT_SERVER_PORT:8059}" server_type: "${WECHAT_SERVER_TYPE:legacy_855}" + # 当 server_type=server_864 时必须提供固定 key;855/859 可留空。 + server_key: "${WECHAT_SERVER_KEY:}" wxid: "${WECHAT_WXID:}" device_name: "${WECHAT_DEVICE_NAME:}" device_id: "${WECHAT_DEVICE_ID:}" diff --git a/configuration.py b/configuration.py index 4ef6b1a..bd1c9ed 100644 --- a/configuration.py +++ b/configuration.py @@ -214,6 +214,11 @@ class Config(object): wechat_ipad_config["server_type"] = str( wechat_ipad_config.get("server_type", "legacy_855") or "legacy_855" ).strip() + # 864 风格 server 的鉴权核心是固定 `key`: + # 1. 它不是像 855 那样主要依赖运行时 `wxid`; + # 2. 因此这里把 `server_key` 也纳入统一配置归一化,确保 `.env` 成为唯一静态维护入口; + # 3. 留空仍允许通过校验阶段给出明确提示,而不是在 provider 启动后才报模糊错误。 + wechat_ipad_config["server_key"] = str(wechat_ipad_config.get("server_key", "") or "").strip() wechat_ipad_config["wxid"] = str(wechat_ipad_config.get("wxid", "") or "").strip() wechat_ipad_config["device_name"] = str(wechat_ipad_config.get("device_name", "") or "").strip() wechat_ipad_config["device_id"] = str(wechat_ipad_config.get("device_id", "") or "").strip() @@ -343,6 +348,8 @@ class Config(object): server_url = str(wechat_ipad_config.get("server_url", "") or "").strip() server_ip = str(wechat_ipad_config.get("server_ip", "") or "").strip() server_port = wechat_ipad_config.get("server_port", 0) + server_type = str(wechat_ipad_config.get("server_type", "") or "").strip().lower() + server_key = str(wechat_ipad_config.get("server_key", "") or "").strip() if not server_url: self._append_issue( @@ -368,6 +375,18 @@ class Config(object): "wechat_ipad server_port 未配置,机器人无法连接 wechat_ipad server。", ) + # 864 provider 明确依赖静态 `server_key`: + # 1. 它用于服务端 license / 实例身份校验; + # 2. 即使扫码成功,也不能替代这份静态鉴权参数; + # 3. 因此这里在启动前直接报错,避免上线后才在登录页反复拿不到二维码。 + if server_type in {"864", "server_864"} and not server_key: + self._append_issue( + report["errors"], + "missing_wechat_server_key", + "wechat_ipad.server_key", + "server_864 模式必须配置 wechat_ipad.server_key(建议通过 .env 的 WECHAT_SERVER_KEY 注入)。", + ) + def _validate_llm_config(self, report: dict) -> None: """检查 LLM 配置的完整性与路由一致性。""" llm_config = self.llm or {} diff --git a/docs/wechat_ipad多版本Server适配路线图.md b/docs/wechat_ipad多版本Server适配路线图.md index 0a45e4a..69b5341 100644 --- a/docs/wechat_ipad多版本Server适配路线图.md +++ b/docs/wechat_ipad多版本Server适配路线图.md @@ -26,19 +26,24 @@ - 已补上 `Legacy855WechatClient` 的显式初始化入口,避免 provider 多继承构造链不稳定 - 已删除历史 `wechat_ipad/client/` 目录,避免后续误回退到旧实现 - 已为 855 登录流程补充 Dashboard 首页二维码引导态,支持未登录时自动弹窗、倒计时与最近二维码记录展示 +- 已新增 `providers/server_864/` 独立目录,用于承接 864 风格 server +- 已为 864 接入补充 `wechat_ipad.server_key` 统一配置项,支持通过 `.env` 的 `WECHAT_SERVER_KEY` 注入 +- 已在 [wechat_ipad/gateway.py](/d:/learn/abot/wechat_ipad/gateway.py:1) 中注册 `server_864 / 864` 别名 +- 已实现 864 第一版登录、初始化等待、HTTP 消息轮询、联系人、群信息、资料与朋友圈基础接口 当前尚未完成的关键项: - 855 provider 仍需完成一轮“当前项目实际依赖接口”的可上线回归验证 - 855 provider 仍需继续梳理“项目真实使用到的接口覆盖面”,确认是否还有遗漏能力未纳入 provider 目录 -- 864 provider 尚未开始接入,当前统一接口仍主要围绕 855 第一阶段目标进行验证 +- 864 provider 仍需完成真机联调,尤其是消息字段归一化、视频发送、名片发送等增强能力 因此,当前状态可以定义为: - “接入入口已收口” - “855 运行时主链路已迁入 provider” - “未登录场景已有 Dashboard 可视化登录引导” -- “尚未达到 855 可直接替换现网上线的最终状态” +- “864 第一版 provider 已落地,但还需要真实 server 回归验证” +- “尚未达到 855 / 864 都可直接无差异替换现网上线的最终状态” ## 2. 当前问题概览 @@ -70,8 +75,8 @@ - 更偏向 `key + body` 的请求风格 - 登录接口命名和流程明显不同 -- 消息同步接口不再与 855 完全一致 -- 在线状态、保活和消息同步的职责边界可能由 server 侧承担更多 +- 消息同步同时支持 WS 与 HTTP 轮询 +- 在线状态、保活和消息同步的职责边界更多由 server 侧承担 结论: @@ -425,6 +430,20 @@ wechat_ipad/ - 864 可以先跑主链路 - 高级能力后续再补 +当前已完成的第一版范围: + +- 已接入固定 `server_key` 配置 +- 已实现二维码登录轮询与初始化等待 +- 已实现 HTTP 轮询消息同步 +- 已实现联系人、群资料、当前账号资料、朋友圈基础接口 +- 已保留与 855 尽量一致的对外方法名,便于 `Robot` 无感切换 + +当前仍待补强的范围: + +- 真实 864 server 环境下的字段回包核对 +- 视频发送、名片发送等低频接口 +- 如后续需要更低延迟,可再补 WS 同步消息 runtime + ### 阶段三:扩展 864 与后续 provider 的高级能力 后续再逐步统一: diff --git a/robot.py b/robot.py index 0aa8496..193fbe6 100644 --- a/robot.py +++ b/robot.py @@ -359,7 +359,7 @@ class Robot: # 静态字段优先级:`.env/config.yaml` > 历史文件。 # 这样每个人只要改 `.env` 就能切换自己的 server,不需要再同步别处。 - for field_name in ("server_url", "server_ip", "server_port", "server_type"): + for field_name in ("server_url", "server_ip", "server_port", "server_type", "server_key"): if not str(merged_config.get(field_name, "") or "").strip(): legacy_value = legacy_config.get(field_name) if legacy_value not in (None, ""): @@ -409,6 +409,8 @@ class Robot: normalized_server_type = str(server_type or "legacy_855").strip().lower() if normalized_server_type in {"855", "859", "legacy_855"}: return "legacy_855" + if normalized_server_type in {"864", "server_864"}: + return "server_864" return normalized_server_type or "legacy_855" def _load_toml_config_if_exists(self, file_path: str) -> dict: diff --git a/wechat_ipad/config.toml.example b/wechat_ipad/config.toml.example index 26609ff..6eaa5b7 100644 --- a/wechat_ipad/config.toml.example +++ b/wechat_ipad/config.toml.example @@ -2,6 +2,8 @@ server_url = "http://127.0.0.1:8059/" server_ip = "127.0.0.1" server_port = "8059" server_type = "legacy_855" +# 864 风格 server 需要额外配置固定 key,855/859 可留空。 +server_key = "" # 登录成功后的动态状态默认不再写回本文件, # 而是写到 `wechat_ipad/providers//runtime_state.toml`。 wxid = "" diff --git a/wechat_ipad/gateway.py b/wechat_ipad/gateway.py index c587a88..f6505db 100644 --- a/wechat_ipad/gateway.py +++ b/wechat_ipad/gateway.py @@ -1,6 +1,7 @@ from typing import Any, Dict, Type from wechat_ipad.providers.legacy_855 import Legacy855WechatClient +from wechat_ipad.providers.server_864 import Server864WechatClient class WechatGateway: @@ -9,13 +10,15 @@ class WechatGateway: 当前策略: 1. Gateway 只负责选择 Provider,并把调用透传出去; 2. 不在这里承载协议差异或运行时细节,避免再次形成新的“大中台”; - 3. 第一阶段默认只完整支持 `legacy_855`,后续接入 864 时在映射表中扩展即可。 + 3. 不同 provider 内部各自维护自己的运行模型,Gateway 只负责路由,不承载版本差异。 """ - _PROVIDER_MAP: Dict[str, Type[Legacy855WechatClient]] = { + _PROVIDER_MAP: Dict[str, Type[Legacy855WechatClient | Server864WechatClient]] = { "legacy_855": Legacy855WechatClient, "855": Legacy855WechatClient, "859": Legacy855WechatClient, + "server_864": Server864WechatClient, + "864": Server864WechatClient, } _LOCAL_ATTRS = {"server_type", "provider"} diff --git a/wechat_ipad/providers/server_864/__init__.py b/wechat_ipad/providers/server_864/__init__.py new file mode 100644 index 0000000..a1a292d --- /dev/null +++ b/wechat_ipad/providers/server_864/__init__.py @@ -0,0 +1,3 @@ +from wechat_ipad.providers.server_864.provider import Server864WechatClient + +__all__ = ["Server864WechatClient"] diff --git a/wechat_ipad/providers/server_864/base.py b/wechat_ipad/providers/server_864/base.py new file mode 100644 index 0000000..667f5ff --- /dev/null +++ b/wechat_ipad/providers/server_864/base.py @@ -0,0 +1,119 @@ +import time +from typing import Any + +import aiohttp + +from wechat_ipad.errors import UserLoggedOut + + +class Server864APIClientBase: + """864 provider 的基础 HTTP 访问封装。 + + 设计说明: + 1. 864 与 855 一样仍然是 HTTP 驱动,但核心鉴权从 `wxid` 切到了固定 `key`; + 2. 这里把请求拼装、错误转换、常见返回结构解析集中收口,避免每个 mixin 重复写样板代码; + 3. 对外仍尽量返回项目当前可直接消费的 dict / list,保持切换成本低。 + """ + + def __init__(self, ip: str, port: int, server_key: str = "", **kwargs): + del kwargs + self.ip = ip + self.port = port + self.server_key = str(server_key or "").strip() + self.wxid = "" + self.nickname = "" + self.alias = "" + self.phone = "" + self.signature = "" + + # 864 的发送接口与 855 一样,很多业务链路仍会依赖这些消息回执字段: + # 1. 但 864 有些接口返回的是 protobuf JSON,而不是统一 msg id 三元组; + # 2. 因此这里准备一份本地递增 client id,在响应缺字段时作为兼容兜底; + # 3. 这样可以保证上层至少拿到稳定结构,而不会因为个别 server 少字段直接崩掉。 + self._fallback_client_msg_id = int(time.time() * 1000) + + super().__init__() + + @property + def base_url(self) -> str: + return f"http://{self.ip}:{self.port}" + + def _ensure_server_key(self) -> str: + """确保 864 固定鉴权 key 已配置。""" + if not self.server_key: + raise ValueError("server_864 缺少 server_key,请在 .env 中配置 WECHAT_SERVER_KEY") + return self.server_key + + async def _request_payload( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + json_body: dict[str, Any] | None = None, + timeout: int = 20, + ) -> dict[str, Any]: + """向 864 server 发送请求,并保留原始 payload 便于上层按需解析。""" + merged_params = dict(params or {}) + merged_params["key"] = self._ensure_server_key() + request_timeout = aiohttp.ClientTimeout(total=timeout) + async with aiohttp.ClientSession(timeout=request_timeout) as session: + async with session.request( + method.upper(), + f"{self.base_url}{path}", + params=merged_params, + json=json_body, + ) as response: + payload = await response.json(content_type=None) + return self._validate_payload(payload) + + async def _request_data( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + json_body: dict[str, Any] | None = None, + timeout: int = 20, + ) -> Any: + """获取成功返回中的 Data 字段。""" + payload = await self._request_payload( + method, + path, + params=params, + json_body=json_body, + timeout=timeout, + ) + return payload.get("Data") + + def _validate_payload(self, payload: Any) -> dict[str, Any]: + """把 864 的 DTO 返回统一转换成 Python 异常或 dict。""" + if not isinstance(payload, dict): + raise ValueError(f"server_864 返回了无法识别的响应结构: {payload!r}") + + code = payload.get("Code") + if code == 200: + return payload + + message = str(payload.get("Text") or payload.get("Message") or "server_864 请求失败").strip() + lowered_message = message.lower() + if any(keyword in lowered_message for keyword in ("重新登录", "已退出登录", "离线", "账号需要重新登录")): + raise UserLoggedOut(message) + raise Exception(message) + + def _next_fallback_message_ids(self) -> tuple[int, int, int]: + """生成一组兼容旧调用面的消息回执兜底值。""" + self._fallback_client_msg_id += 1 + client_msg_id = self._fallback_client_msg_id + create_time = int(time.time()) + return client_msg_id, create_time, 0 + + @staticmethod + def _pick_first(data: Any, *keys: str) -> Any: + """从 dict 中按优先级取第一个存在的字段。""" + if not isinstance(data, dict): + return None + for key in keys: + if key in data and data.get(key) is not None: + return data.get(key) + return None diff --git a/wechat_ipad/providers/server_864/friend_circle.py b/wechat_ipad/providers/server_864/friend_circle.py new file mode 100644 index 0000000..7041831 --- /dev/null +++ b/wechat_ipad/providers/server_864/friend_circle.py @@ -0,0 +1,122 @@ +import base64 +import os +from typing import Union + +import aiofiles + +from wechat_ipad.models.friend_circle_info import build_friend_circle_xml +from wechat_ipad.providers.server_864.base import Server864APIClientBase + + +class FriendCircleMixin(Server864APIClientBase): + """864 朋友圈相关接口。""" + + async def get_friend_circle_list(self, max_id: int = 0, first_page_md5: str = "") -> dict: + data = await self._request_data( + "post", + "/sns/SendSnsTimeLine", + json_body={"UserName": "", "FirstPageMD5": first_page_md5, "MaxID": int(max_id)}, + timeout=30, + ) + return dict(data or {}) + + async def get_friend_circle_detail(self, towxid: str, max_id: int = 0, first_page_md5: str = "") -> dict: + data = await self._request_data( + "post", + "/sns/SendSnsUserPage", + json_body={"UserName": towxid, "FirstPageMD5": first_page_md5, "MaxID": int(max_id)}, + timeout=30, + ) + return dict(data or {}) + + async def get_friend_circle_id_detail(self, object_id: Union[str, int], towxid: str = "") -> dict: + del towxid + data = await self._request_data( + "post", + "/sns/SendSnsObjectDetailById", + json_body={"Id": str(object_id), "BlackList": [], "LocationVal": 0, "Location": {}}, + timeout=30, + ) + return dict(data or {}) + + async def publish_friend_circle( + self, + content: str, + media_items: list[dict] | None = None, + blacklist: str = "", + with_user_list: str = "", + ) -> dict: + payload = { + "ContentStyle": 1 if media_items else 0, + "ContentUrl": "", + "Description": "", + "Privacy": 0, + "Content": content, + "MediaList": list(media_items or []), + "WithUserList": [item for item in str(with_user_list or "").split(",") if item], + "GroupUserList": [], + "BlackList": [item for item in str(blacklist or "").split(",") if item], + "LocationInfo": None, + } + data = await self._request_data("post", "/sns/SendFriendCircle", json_body=payload, timeout=60) + return dict(data or {}) + + async def friend_circle_comment( + self, + object_id: str, + content: str = "", + type: int = 2, + reply_comment_id: int = 0, + ) -> dict: + payload = { + "SnsCommentList": [{ + "OpType": int(type), + "ItemID": str(object_id), + "ToUserName": "", + "Content": content, + "CreateTime": 0, + "ReplyCommentID": int(reply_comment_id), + "ReplyItem": {"UserName": "", "NickName": "", "OpType": 0, "Source": 0}, + }], + "Tx": False, + } + data = await self._request_data("post", "/sns/SendSnsComment", json_body=payload, timeout=30) + return {"result": data} + + async def friend_circle_operation(self, object_id: str, type: int, comment_id: int = 0) -> dict: + payload = { + "SnsObjectOpList": [{ + "SnsObjID": str(object_id), + "OpType": int(type), + "DataLen": 0, + "Data": [], + "Ext": int(comment_id), + }] + } + data = await self._request_data("post", "/sns/SendSnsObjectOp", json_body=payload, timeout=30) + return {"result": data} + + async def upload_friend_circle_media(self, media: Union[str, bytes, os.PathLike]) -> dict: + if isinstance(media, str): + media_base64 = media.split(",", 1)[1] if "," in media else media + elif isinstance(media, bytes): + media_base64 = base64.b64encode(media).decode() + elif isinstance(media, os.PathLike): + async with aiofiles.open(media, "rb") as f: + media_base64 = base64.b64encode(await f.read()).decode() + else: + raise ValueError("media should be str, bytes, or path") + + data = await self._request_data( + "post", + "/sns/UploadFriendCircleImage", + json_body={"ImageDataList": [media_base64], "VideoDataList": []}, + timeout=60, + ) + return {"result": data} + + async def publish_friend_circle_by_xml(self, content: str, media_items: list[dict] | None = None) -> dict: + """为后续扩展保留 XML 版本朋友圈发送入口。""" + xml_content = build_friend_circle_xml(self.wxid, content, media_items=media_items) + data = await self._request_data("post", "/sns/SendFriendCircleByXMl", json_body=xml_content, timeout=60) + return {"result": data} diff --git a/wechat_ipad/providers/server_864/friends.py b/wechat_ipad/providers/server_864/friends.py new file mode 100644 index 0000000..b63c54b --- /dev/null +++ b/wechat_ipad/providers/server_864/friends.py @@ -0,0 +1,84 @@ +from typing import Union + +from wechat_ipad.providers.server_864.base import Server864APIClientBase + + +class FriendMixin(Server864APIClientBase): + """864 联系人与通讯录相关接口。""" + + async def get_contact(self, wxid: Union[str, list[str]]) -> Union[dict, list[dict]]: + """获取联系人详情,兼容旧调用面。""" + detail_list = await self.get_contract_detail(wxid) + if isinstance(wxid, str): + return detail_list[0] if detail_list else {} + return detail_list + + async def get_contract_detail(self, wxid: Union[str, list[str]], chatroom: str = "") -> list: + """批量获取联系人详情。""" + user_names = [wxid] if isinstance(wxid, str) else list(wxid or []) + room_wxid_list = [chatroom] if chatroom else [] + data = await self._request_data( + "post", + "/friend/GetContactDetail", + json_body={"UserNames": user_names, "RoomWxIDList": room_wxid_list}, + timeout=30, + ) + + if isinstance(data, list): + return data + if not isinstance(data, dict): + return [] + + contact_list = ( + data.get("ContactList") + or data.get("contactList") + or data.get("MemberList") + or data.get("memberList") + or data.get("UserList") + or data.get("userList") + or [] + ) + return list(contact_list or []) + + async def get_contract_list(self, wx_seq: int = 0, chatroom_seq: int = 0) -> list: + """获取通讯录用户名列表。""" + data = await self._request_data( + "post", + "/friend/GetContactList", + json_body={ + "CurrentWxcontactSeq": int(wx_seq), + "CurrentChatRoomContactSeq": int(chatroom_seq), + }, + timeout=30, + ) + if isinstance(data, list): + return data + if not isinstance(data, dict): + return [] + return list( + data.get("ContactUsernameList") + or data.get("contactUsernameList") + or data.get("UserNames") + or data.get("userNames") + or [] + ) + + async def get_nickname(self, wxid: Union[str, list[str]]) -> Union[str, list[str]]: + """根据联系人详情返回昵称。""" + data = await self.get_contract_detail(wxid) + if isinstance(wxid, str): + if not data: + return "" + nickname = data[0].get("NickName") + if isinstance(nickname, dict): + return nickname.get("string", "") + return nickname or "" + + result = [] + for contact in data: + nickname = contact.get("NickName") + if isinstance(nickname, dict): + result.append(nickname.get("string", "")) + else: + result.append(nickname or "") + return result diff --git a/wechat_ipad/providers/server_864/group.py b/wechat_ipad/providers/server_864/group.py new file mode 100644 index 0000000..1d5b799 --- /dev/null +++ b/wechat_ipad/providers/server_864/group.py @@ -0,0 +1,90 @@ +from typing import Any, Union + +from wechat_ipad.providers.server_864.base import Server864APIClientBase + + +class ChatroomMixin(Server864APIClientBase): + """864 群聊相关接口。""" + + async def get_chatroom_info(self, chatroom: str) -> dict: + """获取单个群详情。""" + data = await self._request_data( + "post", + "/group/GetChatRoomInfo", + json_body={"ChatRoomWxIdList": [chatroom]}, + timeout=30, + ) + if isinstance(data, list): + return dict(data[0] or {}) if data else {} + if not isinstance(data, dict): + return {} + + contact_list = data.get("ContactList") or data.get("contactList") or [] + if contact_list: + return dict(contact_list[0] or {}) + return dict(data) + + async def get_chatroom_member_list(self, chatroom: str) -> list[dict]: + """获取群成员列表。""" + data = await self._request_data( + "post", + "/group/GetChatroomMemberDetail", + json_body={"ChatRoomName": chatroom}, + timeout=30, + ) + if isinstance(data, list): + return list(data) + if not isinstance(data, dict): + return [] + return list( + data.get("ChatRoomMember") + or data.get("chatRoomMember") + or (data.get("NewChatroomData") or {}).get("ChatRoomMember", []) + or (data.get("newChatroomData") or {}).get("chatRoomMember", []) + or data.get("MemberList") + or [] + ) + + async def get_chatroom_announce(self, chatroom: str) -> dict: + """获取群公告详情。""" + data = await self._request_data( + "post", + "/group/GetChatRoomInfoDetail", + json_body={"ChatRoomName": chatroom}, + timeout=30, + ) + return dict(data or {}) + + async def get_chatroom_qrcode(self, chatroom: str) -> dict[str, Any]: + """获取群二维码。""" + data = await self._request_data( + "post", + "/group/GetChatroomQrCode", + json_body={"ChatRoomName": chatroom}, + timeout=30, + ) + return { + "base64": str(self._pick_first(data, "QrCode", "qrcode", "QrCodeBase64", "base64") or ""), + "description": str(self._pick_first(data, "Desc", "desc", "Description", "description") or ""), + } + + async def get_chatroom_nickname(self, wxid: Union[str, list[str]], chatroom: str) -> Union[str, list[str]]: + """获取成员在群内的展示昵称。""" + members = await self.get_chatroom_member_list(chatroom) + member_map = {} + for member in members: + member_wxid = str(member.get("UserName", "") or "") + nickname = member.get("DisplayName") or member.get("NickName") or member_wxid + member_map[member_wxid] = nickname + + if isinstance(wxid, str): + return str(member_map.get(wxid, "")) + return [str(member_map.get(single_wxid, "")) for single_wxid in wxid] + + async def get_chatroom_member_detail(self, wxid: str, chatroom: str) -> dict: + """获取单个群成员详情。""" + members = await self.get_chatroom_member_list(chatroom) + for member in members: + if str(member.get("UserName", "") or "") == wxid: + return dict(member) + return {} diff --git a/wechat_ipad/providers/server_864/login.py b/wechat_ipad/providers/server_864/login.py new file mode 100644 index 0000000..fda8843 --- /dev/null +++ b/wechat_ipad/providers/server_864/login.py @@ -0,0 +1,84 @@ +import qrcode + +from wechat_ipad.providers.server_864.base import Server864APIClientBase + + +class LoginMixin(Server864APIClientBase): + """864 登录相关接口。""" + + async def get_qr_code( + self, + device_name: str = "", + device_id: str = "", + proxy=None, + print_qr: bool = False, + ) -> tuple[str, str]: + """获取 864 登录二维码。 + + 说明: + 1. 864 不依赖 855 的 `device_name/device_id` 入参,但保留参数签名以兼容上层调用; + 2. `proxy` 当前仅保留兼容占位,后续如需补实际代理登录,可直接映射到 swagger 的 Proxy 字段; + 3. 返回值继续保持 `(uuid, url)`,方便 Dashboard 与运行时共用同一套二维码展示逻辑。 + """ + del device_name, device_id + proxy_value = "" + if proxy is not None: + proxy_value = getattr(proxy, "proxy", "") or "" + + data = await self._request_data( + "post", + "/login/GetLoginQrCodeNew", + json_body={"Proxy": proxy_value, "Check": False}, + timeout=30, + ) + uuid = self._pick_first(data, "UUID", "Uuid", "uuid") or "" + qr_url = ( + self._pick_first(data, "QrUrl", "QRUrl", "qrUrl") + or self._pick_first(self._pick_first(data, "Qrcode", "QrCode", "qrcode") or {}, "Src", "src") + or "" + ) + + if print_qr and uuid: + qr = qrcode.QRCode( + version=1, + error_correction=qrcode.constants.ERROR_CORRECT_L, + box_size=10, + border=4, + ) + qr.add_data(f"http://weixin.qq.com/x/{uuid}") + qr.make(fit=True) + qr.print_ascii() + + return str(uuid), str(qr_url) + + async def check_login_status(self) -> tuple[bool, dict]: + """检查当前二维码登录状态。""" + data = await self._request_data("get", "/login/CheckLoginStatus", timeout=20) + normalized = dict(data or {}) + state = int(normalized.get("state", 0) or 0) + login_state = str(normalized.get("loginState", "") or "").strip().lower() + return state == 2 or login_state == "online", normalized + + async def get_init_status(self) -> bool: + """检查 server 侧初始化是否完成。""" + data = await self._request_data("get", "/login/GetInItStatus", timeout=15) + return bool(data) + + async def awaken_login(self, wxid: str = "") -> dict: + """触发 864 的唤醒登录。""" + del wxid + return await self._request_data("post", "/login/WakeUpLogin", timeout=30) + + async def get_login_status(self, auto_login: bool = True) -> dict: + """获取 864 在线状态。""" + return await self._request_data( + "get", + "/login/GetLoginStatus", + params={"autoLogin": str(bool(auto_login)).lower()}, + timeout=20, + ) + + async def log_out(self) -> bool: + """退出当前 864 登录态。""" + await self._request_data("get", "/login/LogOutRequest", timeout=15) + return True diff --git a/wechat_ipad/providers/server_864/message.py b/wechat_ipad/providers/server_864/message.py new file mode 100644 index 0000000..8d47931 --- /dev/null +++ b/wechat_ipad/providers/server_864/message.py @@ -0,0 +1,284 @@ +import asyncio +import base64 +import os +import time +from asyncio import Future, Queue, sleep +from typing import Union + +import aiofiles + +from utils.trace_context import format_trace_prefix +from wechat_ipad.providers.server_864.base import Server864APIClientBase + + +class MessageMixin(Server864APIClientBase): + """864 消息发送与消息同步接口。""" + + def __init__(self): + # 这里不能再走 `super().__init__()`: + # 1. provider 显式构造时已经先初始化过 `Server864APIClientBase`; + # 2. 若继续沿 MRO 调 `super()`,会再次命中基类构造并要求重复传 ip/port; + # 3. 因此消息模块只初始化自己独有的发送队列状态即可。 + self._message_queue = Queue() + self._is_processing = False + + async def _process_message_queue(self): + """串行处理消息发送,保持现有项目的节流语义。""" + if self._is_processing: + return + + self._is_processing = True + while True: + if self._message_queue.empty(): + self._is_processing = False + break + + func, args, kwargs, future = await self._message_queue.get() + try: + result = await func(*args, **kwargs) + future.set_result(result) + except Exception as e: + future.set_exception(e) + finally: + self._message_queue.task_done() + await sleep(1) + + async def _queue_message(self, func, *args, **kwargs): + """将发送动作排入本地发送队列。""" + future = Future() + await self._message_queue.put((func, args, kwargs, future)) + if not self._is_processing: + asyncio.create_task(self._process_message_queue()) + return await future + + @staticmethod + def _normalize_base64_payload(data: Union[str, bytes, os.PathLike]) -> str: + """把图片/语音等输入统一转成 base64 字符串。""" + if isinstance(data, str): + return data.split(",", 1)[1] if "," in data else data + if isinstance(data, bytes): + return base64.b64encode(data).decode() + raise ValueError("需要先读取文件路径后再处理") + + async def _read_base64_payload(self, data: Union[str, bytes, os.PathLike]) -> str: + """兼容路径/字节/字符串三种输入。""" + if isinstance(data, os.PathLike): + async with aiofiles.open(data, "rb") as f: + return base64.b64encode(await f.read()).decode() + return self._normalize_base64_payload(data) + + def _extract_send_message_result(self, data, *, fallback_target: str = "") -> tuple[int, int, int]: + """把 864 的发送回执尽量归一化成旧项目习惯的三元组。""" + if isinstance(data, list) and data: + item = dict(data[0] or {}) + elif isinstance(data, dict): + item = dict(data) + else: + item = {} + + resp = item.get("resp") or {} + base_response = resp.get("BaseResponse") or resp.get("baseResponse") or {} + + client_msg_id = ( + item.get("ClientMsgId") + or item.get("clientMsgId") + or resp.get("ClientMsgId") + or resp.get("clientMsgId") + or resp.get("MsgId") + or resp.get("msgId") + ) + create_time = item.get("CreateTime") or item.get("createTime") or int(time.time()) + new_msg_id = ( + item.get("NewMsgId") + or item.get("newMsgId") + or resp.get("NewMsgId") + or resp.get("newMsgId") + or resp.get("MsgSvrId") + or resp.get("msgSvrId") + ) + + if client_msg_id is None and base_response.get("Ret") == 0: + return self._next_fallback_message_ids() + + if client_msg_id is None: + client_msg_id, fallback_create_time, fallback_new_msg_id = self._next_fallback_message_ids() + return client_msg_id, fallback_create_time, fallback_new_msg_id + + return int(client_msg_id), int(create_time or 0), int(new_msg_id or 0) + + async def send_text_message(self, wxid: str, content: str, at: Union[list, str] = "") -> tuple[int, int, int]: + return await self._queue_message(self._send_text_message, wxid, content, at) + + async def _send_text_message(self, wxid: str, content: str, at: Union[list, str] = "") -> tuple[int, int, int]: + at_list = [item for item in (at if isinstance(at, list) else str(at or "").split(",")) if item] + data = await self._request_data( + "post", + "/message/SendTextMessage", + json_body={"MsgItem": [{ + "ToUserName": wxid, + "TextContent": content, + "ImageContent": "", + "MsgType": 1, + "AtWxIDList": at_list, + }]}, + timeout=20, + ) + return self._extract_send_message_result(data, fallback_target=wxid) + + async def send_image_message(self, wxid: str, image: Union[str, bytes, os.PathLike]) -> tuple[int, int, int]: + return await self._queue_message(self._send_image_message, wxid, image) + + async def _send_image_message(self, wxid: str, image: Union[str, bytes, os.PathLike]) -> tuple[int, int, int]: + image_base64 = await self._read_base64_payload(image) + data = await self._request_data( + "post", + "/message/SendImageMessage", + json_body={"MsgItem": [{ + "ToUserName": wxid, + "TextContent": "", + "ImageContent": image_base64, + "MsgType": 2, + "AtWxIDList": [], + }]}, + timeout=60, + ) + return self._extract_send_message_result(data, fallback_target=wxid) + + async def send_voice_message( + self, + wxid: str, + voice: Union[str, bytes, os.PathLike], + format: str = "silk", + ) -> tuple[int, int, int]: + return await self._queue_message(self._send_voice_message, wxid, voice, format) + + async def _send_voice_message( + self, + wxid: str, + voice: Union[str, bytes, os.PathLike], + format: str = "silk", + ) -> tuple[int, int, int]: + voice_base64 = await self._read_base64_payload(voice) + data = await self._request_data( + "post", + "/message/UploadVoiceRequest", + json_body={ + "ToUserName": wxid, + "VoiceData": voice_base64, + "VoiceSecond": 1, + "VoiceFormat": 4 if str(format).lower() in {"silk", "wav", "mp3"} else 0, + }, + timeout=60, + ) + return self._extract_send_message_result(data, fallback_target=wxid) + + async def send_link_xml_message(self, xml: str, towxid: str) -> tuple[int, int, int]: + return await self._queue_message(self._send_link_xml_message, xml, towxid) + + async def _send_link_xml_message(self, xml: str, towxid: str) -> tuple[int, int, int]: + data = await self._request_data( + "post", + "/message/SendAppMessage", + json_body={"AppList": [{ + "ToUserName": towxid, + "ContentXML": xml, + "ContentType": 5, + }]}, + timeout=20, + ) + return self._extract_send_message_result(data, fallback_target=towxid) + + async def send_link_message( + self, + wxid: str, + url: str, + title: str = "", + description: str = "", + thumb_url: str = "", + ) -> tuple[int, int, int]: + xml = ( + f"{title}{description}" + f"{url}{thumb_url}" + ) + return await self.send_link_xml_message(xml, wxid) + + async def send_emoji_message(self, wxid: str, md5: str, total_length: int): + return await self._queue_message(self._send_emoji_message, wxid, md5, total_length) + + async def _send_emoji_message(self, wxid: str, md5: str, total_length: int): + return await self._request_data( + "post", + "/message/SendEmojiMessage", + json_body={"EmojiList": [{ + "ToUserName": wxid, + "EmojiMd5": md5, + "EmojiSize": int(total_length), + }]}, + timeout=20, + ) + + async def revoke_message(self, wxid: str, client_msg_id: int, create_time: int, new_msg_id: int) -> bool: + del create_time + await self._request_data( + "post", + "/message/RevokeMsgNew", + json_body={ + "NewMsgId": str(new_msg_id), + "ClientMsgId": int(client_msg_id), + "CreateTime": int(time.time()), + "ToUserName": wxid, + }, + timeout=20, + ) + return True + + async def send_video_message(self, wxid: str, video: Union[str, bytes, os.PathLike], image=None): + """864 首版暂未做视频主动发送,先明确抛错避免静默失败。""" + del wxid, video, image + raise NotImplementedError("server_864 第一版暂未实现 send_video_message,可后续补 CDN 视频发送适配") + + async def send_card_message(self, wxid: str, card_wxid: str, card_nickname: str, card_alias: str = ""): + del wxid, card_wxid, card_nickname, card_alias + raise NotImplementedError("server_864 第一版暂未实现 send_card_message") + + async def send_app_message(self, wxid: str, xml: str, type: int): + data = await self._request_data( + "post", + "/message/SendAppMessage", + json_body={"AppList": [{ + "ToUserName": wxid, + "ContentXML": xml, + "ContentType": int(type), + }]}, + timeout=20, + ) + return self._extract_send_message_result(data, fallback_target=wxid) + + async def sync_message(self, count: int = 0) -> dict: + """轮询 864 的 HTTP 同步消息接口,并归一化成 `AddMsgs` 结构。""" + data = await self._request_data( + "post", + "/message/HttpSyncMsg", + json_body={"Count": int(count)}, + timeout=25, + ) + + if isinstance(data, dict) and "AddMsgs" in data: + return data + + if isinstance(data, list): + return {"AddMsgs": data, "raw": data} + + if isinstance(data, dict): + add_msgs = ( + data.get("AddMsgs") + or data.get("Msgs") + or data.get("MsgList") + or data.get("msgList") + or data.get("Messages") + or data.get("messages") + or [] + ) + return {"AddMsgs": list(add_msgs or []), "raw": data} + + return {"AddMsgs": [], "raw": data} diff --git a/wechat_ipad/providers/server_864/provider.py b/wechat_ipad/providers/server_864/provider.py new file mode 100644 index 0000000..a4cc111 --- /dev/null +++ b/wechat_ipad/providers/server_864/provider.py @@ -0,0 +1,38 @@ +from wechat_ipad.provider_base import WechatProviderBase +from wechat_ipad.providers.server_864.base import Server864APIClientBase +from wechat_ipad.providers.server_864.friend_circle import FriendCircleMixin +from wechat_ipad.providers.server_864.friends import FriendMixin +from wechat_ipad.providers.server_864.group import ChatroomMixin +from wechat_ipad.providers.server_864.login import LoginMixin +from wechat_ipad.providers.server_864.message import MessageMixin +from wechat_ipad.providers.server_864.runtime import Server864RuntimeMixin +from wechat_ipad.providers.server_864.user import UserMixin + + +class Server864WechatClient( + LoginMixin, + MessageMixin, + FriendCircleMixin, + FriendMixin, + ChatroomMixin, + UserMixin, + Server864RuntimeMixin, + WechatProviderBase, +): + """864 风格 server 的独立 provider。 + + 说明: + 1. 这里保持与 855 provider 类似的模块拆分方式,方便未来继续并行维护多个 server; + 2. 但内部实现完全独立,不在 855 provider 里堆 `if server_type == 864` 分支; + 3. 第一版优先覆盖当前项目真实主链路与常用管理能力,后续再按需要补更多高级接口。 + """ + + provider_name = "server_864" + server_type = "server_864" + + def __init__(self, ip: str, port: int, **kwargs): + """初始化 864 provider。""" + server_key = str(kwargs.pop("server_key", "") or "").strip() + Server864APIClientBase.__init__(self, ip, port, server_key=server_key, **kwargs) + MessageMixin.__init__(self) + self._init_runtime_state() diff --git a/wechat_ipad/providers/server_864/runtime.py b/wechat_ipad/providers/server_864/runtime.py new file mode 100644 index 0000000..5dc6c41 --- /dev/null +++ b/wechat_ipad/providers/server_864/runtime.py @@ -0,0 +1,246 @@ +import asyncio +import os +import time +from typing import Any, Awaitable, Callable + +import toml + + +AsyncCallback = Callable[..., Awaitable[None]] + + +class Server864RuntimeMixin: + """864 provider 的运行时编排。 + + 设计说明: + 1. 864 的差异重点在于“固定 key + server 侧维护更多登录状态”; + 2. 因此这里不再照搬 855 的心跳/长心跳双循环,而是采用“登录确认 + 初始化等待 + HTTP 消息轮询”; + 3. 这样能先把现有 Robot 主链路无感切到 864,后续若要补 WS 监听也只需在本目录内演进。 + """ + + def _init_runtime_state(self) -> None: + self._runtime_running = False + + def stop_runtime(self) -> None: + self._runtime_running = False + + def is_runtime_running(self) -> bool: + return bool(getattr(self, "_runtime_running", False)) + + async def run_runtime( + self, + *, + ipad_config: dict, + state_path: str, + logger, + on_login_ready: AsyncCallback, + on_history_message: AsyncCallback, + on_message: AsyncCallback, + on_idle_payload: AsyncCallback | None = None, + on_logout: AsyncCallback | None = None, + on_runtime_state_change: AsyncCallback | None = None, + on_login_qr_update: AsyncCallback | None = None, + on_login_qr_cleared: AsyncCallback | None = None, + ) -> None: + """启动 864 provider 的运行时主循环。""" + del on_history_message + server_key = str(ipad_config.get("server_key", "") or "").strip() + if not server_key: + raise ValueError("server_864 启动失败:缺少 server_key,请在 .env 中配置 WECHAT_SERVER_KEY") + self.server_key = server_key + + await self._ensure_login( + ipad_config=ipad_config, + state_path=state_path, + logger=logger, + on_login_qr_update=on_login_qr_update, + on_login_qr_cleared=on_login_qr_cleared, + ) + + await on_login_ready(self.get_login_identity()) + logger.info("server_864 登录成功") + + await self._set_runtime_running(True, on_runtime_state_change=on_runtime_state_change, logger=logger) + + try: + logger.info("开始处理 server_864 消息轮询") + while self.is_runtime_running(): + try: + data_temp = await self.sync_message() + except Exception as e: + logger.error(f"server_864 获取新消息失败: {e}") + await self._safe_callback(on_logout, str(e), logger=logger, callback_name="on_logout") + await self._set_runtime_running(False, on_runtime_state_change=on_runtime_state_change, logger=logger) + break + + data = data_temp.get("AddMsgs") or [] + if data: + for message in data: + await self._safe_callback(on_message, message, logger=logger, callback_name="on_message") + elif on_idle_payload: + await self._safe_callback( + on_idle_payload, + data_temp, + logger=logger, + callback_name="on_idle_payload", + ) + + await asyncio.sleep(2) + finally: + await self._set_runtime_running(False, on_runtime_state_change=on_runtime_state_change, logger=logger) + + async def _ensure_login( + self, + *, + ipad_config: dict, + state_path: str, + logger, + on_login_qr_update: AsyncCallback | None = None, + on_login_qr_cleared: AsyncCallback | None = None, + ) -> None: + """确保 864 已完成登录。""" + if await self.is_logged_in(): + await self._refresh_identity_from_profile(logger=logger) + await self._safe_callback( + on_login_qr_cleared, + {"status": "logged_in", "status_text": "已检测到现有登录态"}, + logger=logger, + callback_name="on_login_qr_cleared", + ) + return + + uuid, url = await self.get_qr_code(print_qr=True) + scan_url = f"http://weixin.qq.com/x/{uuid}" if uuid else "" + await self._safe_callback( + on_login_qr_update, + { + "uuid": uuid, + "url": url, + "scan_url": scan_url, + "expires_in": None, + "status": "waiting", + "status_text": "等待扫码登录", + "login_source": "fresh_qr", + }, + logger=logger, + callback_name="on_login_qr_update", + ) + + while True: + is_logged_in, login_status = await self.check_login_status() + if is_logged_in: + await self._safe_callback( + on_login_qr_cleared, + {"status": "confirmed", "status_text": "扫码登录成功", "uuid": uuid}, + logger=logger, + callback_name="on_login_qr_cleared", + ) + break + + await self._safe_callback( + on_login_qr_update, + { + "uuid": uuid, + "url": url, + "scan_url": scan_url, + "expires_in": None, + "status": "waiting", + "status_text": str(login_status.get("msg") or login_status.get("loginState") or "等待扫码登录"), + "login_source": "fresh_qr", + }, + logger=logger, + callback_name="on_login_qr_update", + ) + await asyncio.sleep(5) + + await self._wait_init_ready(logger=logger) + await self._refresh_identity_from_profile(logger=logger) + ipad_config["wxid"] = self.wxid + ipad_config["login_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + self._save_runtime_state( + state_path=state_path, + state_payload={"wxid": self.wxid, "login_time": ipad_config["login_time"]}, + logger=logger, + ) + + async def _wait_init_ready(self, *, logger) -> None: + """等待 864 server 侧初始化完成。""" + for _ in range(30): + try: + if await self.get_init_status(): + return + except Exception as e: + logger.warning(f"server_864 检查初始化状态失败: {e}") + await asyncio.sleep(2) + + async def _refresh_identity_from_profile(self, *, logger) -> None: + """从 864 的资料接口刷新当前登录身份。""" + profile = await self.get_profile() + self.wxid = str( + profile.get("UserName") + or profile.get("userName") + or profile.get("Wxid") + or profile.get("wxid") + or self.wxid + ) + nickname = profile.get("NickName") or profile.get("nickName") or profile.get("Nickname") or "" + if isinstance(nickname, dict): + nickname = nickname.get("string", "") + alias = profile.get("Alias") or profile.get("alias") or "" + phone = profile.get("Mobile") or profile.get("mobile") or profile.get("BindMobile") or "" + if isinstance(phone, dict): + phone = phone.get("string", "") + signature = profile.get("Signature") or profile.get("signature") or "" + + self.nickname = str(nickname or "") + self.alias = str(alias or "") + self.phone = str(phone or "") + self.signature = str(signature or "") + logger.info( + f"server_864 登录账号信息: wxid: {self.wxid} 昵称: {self.nickname} 微信号: {self.alias} 手机号: {self.phone}" + ) + + @staticmethod + def _save_runtime_state(*, state_path: str, state_payload: dict[str, Any], logger) -> None: + """保存 864 provider 的本地登录缓存。""" + try: + normalized_path = str(state_path or "").strip() + if not normalized_path: + return + state_dir = os.path.dirname(normalized_path) + if state_dir: + os.makedirs(state_dir, exist_ok=True) + with open(normalized_path, "w", encoding="utf-8") as f: + toml.dump(state_payload, f) + except Exception as e: + logger.warning(f"写入 server_864 本地状态失败: path={state_path}, error={e}") + + async def _set_runtime_running(self, running: bool, *, on_runtime_state_change: AsyncCallback | None, logger) -> None: + self._runtime_running = running + if on_runtime_state_change: + await self._safe_callback( + on_runtime_state_change, + running, + logger=logger, + callback_name="on_runtime_state_change", + ) + + async def _safe_callback(self, callback: AsyncCallback | None, *args: Any, logger, callback_name: str) -> None: + if callback is None: + return + try: + await callback(*args) + except Exception as e: + logger.exception(f"执行回调失败: {callback_name}, error: {e}") + + def get_login_identity(self) -> dict[str, Any]: + """返回统一登录身份结构。""" + return { + "wxid": self.wxid, + "nickname": self.nickname, + "alias": self.alias, + "phone": self.phone, + "signature": getattr(self, "signature", ""), + "device_name": "", + "device_id": "", + } diff --git a/wechat_ipad/providers/server_864/user.py b/wechat_ipad/providers/server_864/user.py new file mode 100644 index 0000000..c71ff8b --- /dev/null +++ b/wechat_ipad/providers/server_864/user.py @@ -0,0 +1,46 @@ +from loguru import logger + +from wechat_ipad.providers.server_864.base import Server864APIClientBase + + +class UserMixin(Server864APIClientBase): + """864 用户资料相关接口。""" + + async def get_profile(self, wxid: str = None) -> dict: + """获取当前登录账号的资料。""" + del wxid + data = await self._request_data("get", "/user/GetProfile", timeout=20) + return dict(data or {}) + + async def get_profile_info_ext(self, wxid: str = None) -> dict: + """获取扩展资料。 + + 说明: + 1. 864 当前公开接口只直接暴露“当前账号资料”; + 2. 为了兼容项目现有调用面,这里先从主资料中补常见字段; + 3. 若后续发现 864 某个版本提供了专门扩展接口,再局部替换即可。 + """ + del wxid + profile = await self.get_profile() + return dict(profile.get("userInfoExt", {}) or profile.get("UserInfoExt", {}) or profile) + + async def get_my_qrcode(self, style: int = 8) -> str: + """获取当前登录账号个人二维码。""" + data = await self._request_data( + "post", + "/user/GetMyQrCode", + json_body={"Style": int(style), "Recover": False}, + timeout=20, + ) + qrcode_info = self._pick_first(data, "qrcode", "Qrcode", "QrCode") or {} + return str(self._pick_first(qrcode_info, "buffer", "Buffer", "src", "Src") or "") + + async def is_logged_in(self, wxid: str = None) -> bool: + """检查 864 当前账号是否在线。""" + del wxid + try: + await self.get_profile() + return True + except Exception as e: + logger.error("server_864 is_logged_in:{}", e) + return False