Files
WechatHookBot/utils/hookbot.py
2025-12-23 16:46:41 +08:00

266 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
HookBot - 机器人核心类
处理消息路由和事件分发
"""
import asyncio
import tomllib
import time
from typing import Dict, Any
from loguru import logger
from WechatHook import WechatHookClient, MESSAGE_TYPE_MAP, normalize_message
from utils.event_manager import EventManager
class HookBot:
"""
HookBot 核心类
负责消息处理、路由和事件分发
"""
def __init__(self, client: WechatHookClient):
"""
初始化 HookBot
Args:
client: WechatHookClient 实例
"""
self.client = client
self.wxid = None
self.nickname = None
# 读取配置
with open("main_config.toml", "rb") as f:
main_config = tomllib.load(f)
bot_config = main_config.get("Bot", {})
preset_wxid = bot_config.get("wxid") or bot_config.get("bot_wxid")
preset_nickname = bot_config.get("nickname") or bot_config.get("bot_nickname")
if preset_wxid:
self.wxid = preset_wxid
logger.info(f"使用配置中的机器人 wxid: {self.wxid}")
if preset_nickname:
self.nickname = preset_nickname
logger.info(f"使用配置中的机器人昵称: {self.nickname}")
self.ignore_mode = bot_config.get("ignore-mode", "None")
self.whitelist = bot_config.get("whitelist", [])
self.blacklist = bot_config.get("blacklist", [])
# 性能配置
perf_config = main_config.get("Performance", {})
self.log_sampling_rate = perf_config.get("log_sampling_rate", 1.0)
# 消息去重(部分环境会重复回调同一条消息,导致插件回复两次)
self._dedup_ttl_seconds = perf_config.get("dedup_ttl_seconds", 30)
self._dedup_max_size = perf_config.get("dedup_max_size", 5000)
self._dedup_lock = asyncio.Lock()
self._recent_message_keys: Dict[str, float] = {}
# 消息计数和统计
self.message_count = 0
self.filtered_count = 0
self.processed_count = 0
logger.info("HookBot 初始化完成")
def _extract_msg_id(self, data: Dict[str, Any]) -> str:
"""从原始回调数据中提取消息ID用于去重"""
for k in ("msgid", "msg_id", "MsgId", "id"):
v = data.get(k)
if v:
return str(v)
return ""
async def _is_duplicate_message(self, msg_type: int, data: Dict[str, Any]) -> bool:
"""判断该条消息是否为短时间内重复回调。"""
msg_id = self._extract_msg_id(data)
if not msg_id:
# 没有稳定 msgid 时不做去重,避免误伤(同一秒内同内容可能是用户真实重复发送)
return False
key = f"msgid:{msg_id}"
now = time.time()
ttl = max(float(self._dedup_ttl_seconds or 0), 0.0)
if ttl <= 0:
return False
async with self._dedup_lock:
last_seen = self._recent_message_keys.get(key)
if last_seen is not None and (now - last_seen) < ttl:
return True
# 记录/刷新
self._recent_message_keys.pop(key, None)
self._recent_message_keys[key] = now
# 清理过期 key按插入顺序从旧到新
cutoff = now - ttl
while self._recent_message_keys:
first_key = next(iter(self._recent_message_keys))
if self._recent_message_keys.get(first_key, now) >= cutoff:
break
self._recent_message_keys.pop(first_key, None)
# 限制大小,避免长期运行内存增长
max_size = int(self._dedup_max_size or 0)
if max_size > 0:
while len(self._recent_message_keys) > max_size and self._recent_message_keys:
first_key = next(iter(self._recent_message_keys))
self._recent_message_keys.pop(first_key, None)
return False
def update_profile(self, wxid: str, nickname: str):
"""
更新机器人信息
Args:
wxid: 机器人 wxid
nickname: 机器人昵称
"""
self.wxid = wxid
self.nickname = nickname
logger.info(f"机器人信息: wxid={wxid}, nickname={nickname}")
async def process_message(self, msg_type: int, data: dict):
"""
处理接收到的消息
Args:
msg_type: 消息类型
data: 消息数据
"""
# 过滤 API 响应消息
# - 11032: 获取群成员信息响应
# - 11174/11230: 协议/上传等 API 回调
if msg_type in [11032, 11174, 11230]:
return
# 去重:同一条消息重复回调时不再重复触发事件(避免“同一句话回复两次”)
try:
if await self._is_duplicate_message(msg_type, data):
logger.debug(f"[HookBot] 重复消息已丢弃: type={msg_type}, msgid={self._extract_msg_id(data) or 'N/A'}")
return
except Exception as e:
# 去重失败不影响主流程
logger.debug(f"[HookBot] 消息去重检查失败: {e}")
# 消息计数
self.message_count += 1
# 日志采样 - 只记录部分消息以减少日志量
should_log = self._should_log_message(msg_type)
if should_log:
logger.debug(f"处理消息: type={msg_type}")
# 重要事件始终记录
if msg_type in [11098, 11099, 11058]: # 群成员变动、系统消息
logger.info(f"重要事件: type={msg_type}")
# 获取事件类型
event_type = MESSAGE_TYPE_MAP.get(msg_type)
if should_log and event_type:
logger.info(f"[HookBot] 消息类型映射: {msg_type} -> {event_type}")
if not event_type:
# 记录未知消息类型的详细信息,帮助调试
content_preview = str(data.get('raw_msg', data.get('msg', '')))[:200]
logger.warning(f"未映射的消息类型: {msg_type}, wx_type: {data.get('wx_type')}, 内容预览: {content_preview}")
return
# 格式转换
try:
message = normalize_message(msg_type, data)
except Exception as e:
logger.error(f"格式转换失败: {e}")
return
# 过滤消息
if not self._check_filter(message):
self.filtered_count += 1
if should_log:
logger.debug(f"消息被过滤: {message.get('FromWxid')}")
return
self.processed_count += 1
# 采样记录处理的消息
if should_log:
content = message.get('Content', '')
if len(content) > 50:
content = content[:50] + "..."
logger.info(f"处理消息: type={event_type}, from={message.get('FromWxid')}, content={content}")
# 触发事件
try:
await EventManager.emit(event_type, self.client, message)
except Exception as e:
logger.error(f"事件处理失败: {e}")
def _should_log_message(self, msg_type: int) -> bool:
"""判断是否应该记录此消息的日志"""
# 重要消息类型始终记录
important_types = {
11058, 11098, 11099, 11025, # 系统消息、群成员变动、登录信息
11051, 11047, 11052, 11055 # 视频、图片、表情、文件消息
}
if msg_type in important_types:
return True
# 其他消息按采样率记录
import random
return random.random() < self.log_sampling_rate
def _check_filter(self, message: Dict[str, Any]) -> bool:
"""
检查消息是否通过过滤
Args:
message: 消息字典
Returns:
是否通过过滤
"""
from_wxid = message.get("FromWxid", "")
sender_wxid = message.get("SenderWxid", "")
msg_type = message.get("MsgType", 0)
# 系统消息type=11058不过滤因为包含重要的群聊事件信息
if msg_type == 11058:
return True
# 过滤机器人自己发送的消息,避免无限循环
if self.wxid and (from_wxid == self.wxid or sender_wxid == self.wxid):
return False
# None 模式:处理所有消息
if self.ignore_mode == "None":
return True
# Whitelist 模式:仅处理白名单
if self.ignore_mode == "Whitelist":
return from_wxid in self.whitelist or sender_wxid in self.whitelist
# Blacklist 模式:屏蔽黑名单
if self.ignore_mode == "Blacklist":
return from_wxid not in self.blacklist and sender_wxid not in self.blacklist
return True
def get_stats(self) -> dict:
"""获取消息处理统计信息"""
return {
"total_messages": self.message_count,
"filtered_messages": self.filtered_count,
"processed_messages": self.processed_count,
"filter_rate": self.filtered_count / max(self.message_count, 1),
"process_rate": self.processed_count / max(self.message_count, 1)
}