""" HTTP 回调服务器模块 接收微信 Hook 推送的消息回调 """ import asyncio import json from typing import Callable, List, Optional, Dict, Any from loguru import logger try: from aiohttp import web AIOHTTP_AVAILABLE = True except ImportError: AIOHTTP_AVAILABLE = False logger.warning("aiohttp 未安装,HTTP 回调服务器将不可用") class CallbackServer: """ HTTP 回调服务器 接收微信 Hook 推送的消息 """ def __init__(self, host: str = "0.0.0.0", port: int = 9999): """ 初始化回调服务器 Args: host: 监听地址 port: 监听端口 """ self.host = host self.port = port self._app: Optional[web.Application] = None self._runner: Optional[web.AppRunner] = None self._site: Optional[web.TCPSite] = None self._message_handlers: List[Callable] = [] self._running = False def add_message_handler(self, handler: Callable): """ 添加消息处理器 Args: handler: 消息处理函数,签名为 async def handler(message_type: str, data: dict) """ if handler not in self._message_handlers: self._message_handlers.append(handler) logger.debug(f"注册消息处理器: {handler.__name__}") def remove_message_handler(self, handler: Callable): """ 移除消息处理器 Args: handler: 要移除的处理函数 """ if handler in self._message_handlers: self._message_handlers.remove(handler) logger.debug(f"移除消息处理器: {handler.__name__}") async def _handle_callback(self, request: web.Request) -> web.Response: """ 处理回调请求 Args: request: HTTP 请求 Returns: HTTP 响应 """ try: # 读取原始请求体(用于完整日志) raw_body = await request.text() # logger.debug(f"[回调原始请求] {raw_body}") # 解析 JSON 数据 data = json.loads(raw_body) if raw_body else {} # 判断消息类型 message_type = self._detect_message_type(data) # 记录原始消息(用于调试) msg_type_code = str(data.get("msgType", "")) event_type = data.get("event_type") event_type_str = str(event_type) if event_type is not None else "" logger.info(f"[回调] type={message_type}, msgType={msg_type_code}, messageType={data.get('messageType', '')}") # 如果是系统消息、群信息变化事件或特殊消息,记录原始数据 if msg_type_code in ("10000", "10002") or event_type_str == "1010" or message_type not in ["private_message", "group_message"]: logger.info(f"[回调原始数据] {json.dumps(data, ensure_ascii=False, indent=2)}") logger.info(f"[回调详情] 完整数据: {data}") else: from_user = data.get("fromUserName", {}) if isinstance(from_user, dict): from_wxid = from_user.get("String", "") else: from_wxid = str(from_user) logger.debug(f"[回调简要] from={from_wxid}, msgId={data.get('msgId', '')}, newMsgId={data.get('newMsgId', '')}") # 调用所有处理器 for handler in self._message_handlers: try: await handler(message_type, data) except Exception as e: logger.error(f"消息处理器异常: {handler.__name__} -> {e}") return web.json_response({"code": 0, "msg": "success"}) except json.JSONDecodeError: logger.error("回调数据 JSON 解析失败") return web.json_response({"code": -1, "msg": "invalid json"}, status=400) except Exception as e: logger.error(f"处理回调异常: {e}") return web.json_response({"code": -1, "msg": str(e)}, status=500) def _detect_message_type(self, data: dict) -> str: """ 检测消息类型 Args: data: 消息数据 Returns: 消息类型字符串 """ # 优先检查 event_type(新接口的事件通知) event_type = data.get("event_type") if event_type: # 事件类型映射 event_type_map = { 1008: "chatroom_member_add", # 群成员新增 1009: "chatroom_member_remove", # 群成员删除 1010: "chatroom_info_change", # 群信息变化(猜测) 1012: "chatroom_member_nickname_change", # 群成员昵称修改 } event_name = event_type_map.get(event_type) if event_name: logger.info(f"[事件识别] event_type={event_type} -> {event_name}") return event_name # 根据消息字段判断类型 message_type_field = data.get("messageType", "") if message_type_field == "私聊消息": return "private_message" elif message_type_field == "群聊消息": return "group_message" elif "snsObject" in data: return "moments_message" # 根据 fromUserName 判断 from_user = data.get("fromUserName", {}) if isinstance(from_user, dict): from_wxid = from_user.get("String", "") else: from_wxid = str(from_user) if from_wxid.endswith("@chatroom"): return "group_message" # 默认私聊消息 return "private_message" async def _health_check(self, request: web.Request) -> web.Response: """健康检查端点""" return web.json_response({"status": "ok", "server": "callback_server"}) async def start(self): """启动回调服务器""" if not AIOHTTP_AVAILABLE: logger.error("aiohttp 未安装,无法启动回调服务器") return False if self._running: logger.warning("回调服务器已在运行") return True try: self._app = web.Application() # 注册路由(支持多种路径) self._app.router.add_route("*", "/", self._handle_callback) self._app.router.add_route("*", "/callback", self._handle_callback) self._app.router.add_route("*", "/vxapi", self._handle_callback) # Hook 默认路径 self._app.router.add_route("*", "/api/recvMsg", self._handle_callback) # 新协议路径 self._app.router.add_get("/health", self._health_check) # 启动服务器 self._runner = web.AppRunner(self._app) await self._runner.setup() self._site = web.TCPSite(self._runner, self.host, self.port) await self._site.start() self._running = True logger.success(f"回调服务器已启动: http://{self.host}:{self.port}") return True except Exception as e: logger.error(f"启动回调服务器失败: {e}") return False async def stop(self): """停止回调服务器""" if not self._running: return try: if self._site: await self._site.stop() if self._runner: await self._runner.cleanup() self._running = False self._app = None self._runner = None self._site = None logger.info("回调服务器已停止") except Exception as e: logger.error(f"停止回调服务器失败: {e}") @property def is_running(self) -> bool: """是否正在运行""" return self._running class MessageNormalizer: """ 消息格式标准化器 将新协议的消息格式转换为内部统一格式 """ # 微信消息类型映射 MSG_TYPE_MAP = { "1": "text", "3": "image", "34": "voice", "43": "video", "47": "emoji", "48": "location", "49": "link", # 也可能是小程序、文件等 "42": "card", "10000": "system", "10002": "revoke", } @classmethod def normalize(cls, message_type: str, data: dict) -> dict: """ 标准化消息格式 Args: message_type: 消息类型 (private_message/group_message) data: 原始消息数据 Returns: 标准化的消息字典 """ from .message_types import normalize_from_callback return normalize_from_callback(message_type, data) @classmethod def _extract_string(cls, value) -> str: """ 提取字符串值 Args: value: 可能是 dict 或 str Returns: 字符串值 """ if isinstance(value, dict): return value.get("String", "") return str(value) if value else "" @classmethod def _get_internal_type(cls, msg_type_code: str, message_type: str) -> int: """ 获取内部消息类型码 Args: msg_type_code: 微信消息类型码 message_type: 消息来源类型 Returns: 内部消息类型码 """ # 映射到内部类型码(与旧协议兼容) type_map = { "1": 11046, # 文本 "3": 11047, # 图片 "34": 11048, # 语音 "43": 11051, # 视频 "47": 11052, # 表情 "48": 11053, # 位置 "49": 11054, # 链接/小程序/文件 "42": 11055, # 名片 "10000": 11058, # 系统消息 "10002": 11057, # 撤回消息 } return type_map.get(msg_type_code, 11046)