from typing import Any, Dict, Type from wechat_ipad.providers.legacy_855 import Legacy855WechatClient from wechat_ipad.providers.server_864 import Server864WechatClient class WechatGateway: """按 server_type 选择具体 Provider 的薄网关。 当前策略: 1. Gateway 只负责选择 Provider,并把调用透传出去; 2. 不在这里承载协议差异或运行时细节,避免再次形成新的“大中台”; 3. 不同 provider 内部各自维护自己的运行模型,Gateway 只负责路由,不承载版本差异。 """ _PROVIDER_MAP: Dict[str, Type[Legacy855WechatClient | Server864WechatClient]] = { "legacy_855": Legacy855WechatClient, "855": Legacy855WechatClient, "859": Legacy855WechatClient, "server_864": Server864WechatClient, "864": Server864WechatClient, } _LOCAL_ATTRS = {"server_type", "provider"} def __init__(self, ip: str, port: int, server_type: str = "legacy_855", **kwargs: Any): normalized_server_type = str(server_type or "legacy_855").strip().lower() provider_cls = self._PROVIDER_MAP.get(normalized_server_type) if provider_cls is None: raise ValueError(f"不支持的 wechat provider 类型: {server_type}") object.__setattr__(self, "server_type", normalized_server_type) object.__setattr__(self, "provider", provider_cls(ip=ip, port=port, **kwargs)) def __getattr__(self, item: str) -> Any: """将未显式实现的属性/方法透传给具体 Provider。""" return getattr(self.provider, item) def __setattr__(self, key: str, value: Any) -> None: """将运行期动态属性写入透传给 Provider,保持旧调用面的兼容性。""" if key in self._LOCAL_ATTRS or "provider" not in self.__dict__: object.__setattr__(self, key, value) return setattr(self.provider, key, value)