312 lines
10 KiB
Python
312 lines
10 KiB
Python
"""
|
||
HTTP 回调服务器模块
|
||
|
||
接收微信 Hook 推送的消息回调
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
from typing import Callable, List, Optional, Dict, Any
|
||
from loguru import logger
|
||
|
||
try:
|
||
from aiohttp import web
|
||
AIOHTTP_AVAILABLE = True
|
||
except ImportError:
|
||
AIOHTTP_AVAILABLE = False
|
||
logger.warning("aiohttp 未安装,HTTP 回调服务器将不可用")
|
||
|
||
|
||
class CallbackServer:
|
||
"""
|
||
HTTP 回调服务器
|
||
|
||
接收微信 Hook 推送的消息
|
||
"""
|
||
|
||
def __init__(self, host: str = "0.0.0.0", port: int = 9999):
|
||
"""
|
||
初始化回调服务器
|
||
|
||
Args:
|
||
host: 监听地址
|
||
port: 监听端口
|
||
"""
|
||
self.host = host
|
||
self.port = port
|
||
self._app: Optional[web.Application] = None
|
||
self._runner: Optional[web.AppRunner] = None
|
||
self._site: Optional[web.TCPSite] = None
|
||
self._message_handlers: List[Callable] = []
|
||
self._running = False
|
||
|
||
def add_message_handler(self, handler: Callable):
|
||
"""
|
||
添加消息处理器
|
||
|
||
Args:
|
||
handler: 消息处理函数,签名为 async def handler(message_type: str, data: dict)
|
||
"""
|
||
if handler not in self._message_handlers:
|
||
self._message_handlers.append(handler)
|
||
logger.debug(f"注册消息处理器: {handler.__name__}")
|
||
|
||
def remove_message_handler(self, handler: Callable):
|
||
"""
|
||
移除消息处理器
|
||
|
||
Args:
|
||
handler: 要移除的处理函数
|
||
"""
|
||
if handler in self._message_handlers:
|
||
self._message_handlers.remove(handler)
|
||
logger.debug(f"移除消息处理器: {handler.__name__}")
|
||
|
||
async def _handle_callback(self, request: web.Request) -> web.Response:
|
||
"""
|
||
处理回调请求
|
||
|
||
Args:
|
||
request: HTTP 请求
|
||
|
||
Returns:
|
||
HTTP 响应
|
||
"""
|
||
try:
|
||
# 读取原始请求体(用于完整日志)
|
||
raw_body = await request.text()
|
||
# logger.debug(f"[回调原始请求] {raw_body}")
|
||
|
||
# 解析 JSON 数据
|
||
data = json.loads(raw_body) if raw_body else {}
|
||
|
||
# 判断消息类型
|
||
message_type = self._detect_message_type(data)
|
||
|
||
# 记录原始消息(用于调试)
|
||
msg_type_code = str(data.get("msgType", ""))
|
||
event_type = data.get("event_type")
|
||
event_type_str = str(event_type) if event_type is not None else ""
|
||
logger.info(f"[回调] type={message_type}, msgType={msg_type_code}, messageType={data.get('messageType', '')}")
|
||
|
||
# 如果是系统消息、群信息变化事件或特殊消息,记录原始数据
|
||
if msg_type_code in ("10000", "10002") or event_type_str == "1010" or message_type not in ["private_message", "group_message"]:
|
||
logger.info(f"[回调原始数据] {json.dumps(data, ensure_ascii=False, indent=2)}")
|
||
logger.info(f"[回调详情] 完整数据: {data}")
|
||
else:
|
||
from_user = data.get("fromUserName", {})
|
||
if isinstance(from_user, dict):
|
||
from_wxid = from_user.get("String", "")
|
||
else:
|
||
from_wxid = str(from_user)
|
||
logger.debug(f"[回调简要] from={from_wxid}, msgId={data.get('msgId', '')}, newMsgId={data.get('newMsgId', '')}")
|
||
|
||
# 调用所有处理器
|
||
for handler in self._message_handlers:
|
||
try:
|
||
await handler(message_type, data)
|
||
except Exception as e:
|
||
logger.error(f"消息处理器异常: {handler.__name__} -> {e}")
|
||
|
||
return web.json_response({"code": 0, "msg": "success"})
|
||
|
||
except json.JSONDecodeError:
|
||
logger.error("回调数据 JSON 解析失败")
|
||
return web.json_response({"code": -1, "msg": "invalid json"}, status=400)
|
||
except Exception as e:
|
||
logger.error(f"处理回调异常: {e}")
|
||
return web.json_response({"code": -1, "msg": str(e)}, status=500)
|
||
|
||
def _detect_message_type(self, data: dict) -> str:
|
||
"""
|
||
检测消息类型
|
||
|
||
Args:
|
||
data: 消息数据
|
||
|
||
Returns:
|
||
消息类型字符串
|
||
"""
|
||
# 优先检查 event_type(新接口的事件通知)
|
||
event_type = data.get("event_type")
|
||
if event_type:
|
||
# 事件类型映射
|
||
event_type_map = {
|
||
1008: "chatroom_member_add", # 群成员新增
|
||
1009: "chatroom_member_remove", # 群成员删除
|
||
1010: "chatroom_info_change", # 群信息变化(猜测)
|
||
1012: "chatroom_member_nickname_change", # 群成员昵称修改
|
||
}
|
||
event_name = event_type_map.get(event_type)
|
||
if event_name:
|
||
logger.info(f"[事件识别] event_type={event_type} -> {event_name}")
|
||
return event_name
|
||
|
||
# 根据消息字段判断类型
|
||
message_type_field = data.get("messageType", "")
|
||
|
||
if message_type_field == "私聊消息":
|
||
return "private_message"
|
||
elif message_type_field == "群聊消息":
|
||
return "group_message"
|
||
elif "snsObject" in data:
|
||
return "moments_message"
|
||
|
||
# 根据 fromUserName 判断
|
||
from_user = data.get("fromUserName", {})
|
||
if isinstance(from_user, dict):
|
||
from_wxid = from_user.get("String", "")
|
||
else:
|
||
from_wxid = str(from_user)
|
||
|
||
if from_wxid.endswith("@chatroom"):
|
||
return "group_message"
|
||
|
||
# 默认私聊消息
|
||
return "private_message"
|
||
|
||
async def _health_check(self, request: web.Request) -> web.Response:
|
||
"""健康检查端点"""
|
||
return web.json_response({"status": "ok", "server": "callback_server"})
|
||
|
||
async def start(self):
|
||
"""启动回调服务器"""
|
||
if not AIOHTTP_AVAILABLE:
|
||
logger.error("aiohttp 未安装,无法启动回调服务器")
|
||
return False
|
||
|
||
if self._running:
|
||
logger.warning("回调服务器已在运行")
|
||
return True
|
||
|
||
try:
|
||
self._app = web.Application()
|
||
|
||
# 注册路由(支持多种路径)
|
||
self._app.router.add_route("*", "/", self._handle_callback)
|
||
self._app.router.add_route("*", "/callback", self._handle_callback)
|
||
self._app.router.add_route("*", "/vxapi", self._handle_callback) # Hook 默认路径
|
||
self._app.router.add_route("*", "/api/recvMsg", self._handle_callback) # 新协议路径
|
||
self._app.router.add_get("/health", self._health_check)
|
||
|
||
# 启动服务器
|
||
self._runner = web.AppRunner(self._app)
|
||
await self._runner.setup()
|
||
|
||
self._site = web.TCPSite(self._runner, self.host, self.port)
|
||
await self._site.start()
|
||
|
||
self._running = True
|
||
logger.success(f"回调服务器已启动: http://{self.host}:{self.port}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"启动回调服务器失败: {e}")
|
||
return False
|
||
|
||
async def stop(self):
|
||
"""停止回调服务器"""
|
||
if not self._running:
|
||
return
|
||
|
||
try:
|
||
if self._site:
|
||
await self._site.stop()
|
||
|
||
if self._runner:
|
||
await self._runner.cleanup()
|
||
|
||
self._running = False
|
||
self._app = None
|
||
self._runner = None
|
||
self._site = None
|
||
|
||
logger.info("回调服务器已停止")
|
||
|
||
except Exception as e:
|
||
logger.error(f"停止回调服务器失败: {e}")
|
||
|
||
@property
|
||
def is_running(self) -> bool:
|
||
"""是否正在运行"""
|
||
return self._running
|
||
|
||
|
||
class MessageNormalizer:
|
||
"""
|
||
消息格式标准化器
|
||
|
||
将新协议的消息格式转换为内部统一格式
|
||
"""
|
||
|
||
# 微信消息类型映射
|
||
MSG_TYPE_MAP = {
|
||
"1": "text",
|
||
"3": "image",
|
||
"34": "voice",
|
||
"43": "video",
|
||
"47": "emoji",
|
||
"48": "location",
|
||
"49": "link", # 也可能是小程序、文件等
|
||
"42": "card",
|
||
"10000": "system",
|
||
"10002": "revoke",
|
||
}
|
||
|
||
@classmethod
|
||
def normalize(cls, message_type: str, data: dict) -> dict:
|
||
"""
|
||
标准化消息格式
|
||
|
||
Args:
|
||
message_type: 消息类型 (private_message/group_message)
|
||
data: 原始消息数据
|
||
|
||
Returns:
|
||
标准化的消息字典
|
||
"""
|
||
from .message_types import normalize_from_callback
|
||
return normalize_from_callback(message_type, data)
|
||
|
||
@classmethod
|
||
def _extract_string(cls, value) -> str:
|
||
"""
|
||
提取字符串值
|
||
|
||
Args:
|
||
value: 可能是 dict 或 str
|
||
|
||
Returns:
|
||
字符串值
|
||
"""
|
||
if isinstance(value, dict):
|
||
return value.get("String", "")
|
||
return str(value) if value else ""
|
||
|
||
@classmethod
|
||
def _get_internal_type(cls, msg_type_code: str, message_type: str) -> int:
|
||
"""
|
||
获取内部消息类型码
|
||
|
||
Args:
|
||
msg_type_code: 微信消息类型码
|
||
message_type: 消息来源类型
|
||
|
||
Returns:
|
||
内部消息类型码
|
||
"""
|
||
# 映射到内部类型码(与旧协议兼容)
|
||
type_map = {
|
||
"1": 11046, # 文本
|
||
"3": 11047, # 图片
|
||
"34": 11048, # 语音
|
||
"43": 11051, # 视频
|
||
"47": 11052, # 表情
|
||
"48": 11053, # 位置
|
||
"49": 11054, # 链接/小程序/文件
|
||
"42": 11055, # 名片
|
||
"10000": 11058, # 系统消息
|
||
"10002": 11057, # 撤回消息
|
||
}
|
||
return type_map.get(msg_type_code, 11046)
|