Files
2025-12-05 18:06:13 +08:00

748 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
消息记录插件
将所有消息存储到MySQL数据库
"""
import asyncio
import tomllib
from pathlib import Path
from datetime import datetime
from loguru import logger
from utils.plugin_base import PluginBase
from utils.decorators import (
on_text_message,
on_image_message,
on_voice_message,
on_video_message,
on_file_message,
on_emoji_message
)
from utils.redis_cache import RedisCache, get_cache
import pymysql
from WechatHook import WechatHookClient
from minio import Minio
from minio.error import S3Error
import uuid
import aiohttp
import re
import xml.etree.ElementTree as ET
class MessageLogger(PluginBase):
"""消息记录插件"""
description = "消息记录插件 - 将消息存储到MySQL"
author = "ShiHao"
version = "1.0.0"
def __init__(self):
super().__init__()
self.config = None
self.db_config = None
self.redis_cache = None # Redis 缓存实例
# 创建独立的日志记录器
self._setup_logger()
def _setup_logger(self):
"""设置独立的日志记录器"""
# 创建日志目录
log_dir = Path(__file__).parent.parent.parent / "logs"
log_dir.mkdir(exist_ok=True)
# 添加独立的日志文件处理器(使用 filter 来过滤)
log_file = log_dir / "message_logger.log"
# 为这个插件添加一个独立的日志处理器
self.logger_id = logger.add(
log_file,
rotation="10 MB",
retention="7 days",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
encoding="utf-8",
filter=lambda record: record["extra"].get("name") == "MessageLogger"
)
# 创建带标记的 logger
self.log = logger.bind(name="MessageLogger")
async def async_init(self):
"""异步初始化"""
try:
self.log.info("=" * 50)
self.log.info("MessageLogger 插件开始初始化...")
self.log.info("=" * 50)
config_path = Path(__file__).parent / "config.toml"
if not config_path.exists():
self.log.error(f"MessageLogger 配置文件不存在: {config_path}")
return
with open(config_path, "rb") as f:
self.config = tomllib.load(f)
self.db_config = self.config["database"]
# 初始化 Redis 缓存
redis_config = self.config.get("redis", {})
if redis_config.get("enabled", False):
self.log.info("正在初始化 Redis 缓存...")
self.redis_cache = RedisCache(redis_config)
if self.redis_cache.enabled:
self.log.success(f"Redis 缓存初始化成功TTL={redis_config.get('ttl', 3600)}")
else:
self.log.warning("Redis 缓存初始化失败,将不使用缓存")
self.redis_cache = None
else:
self.log.info("Redis 缓存未启用")
# 初始化 MinIO 客户端
self.minio_client = Minio(
"115.190.113.141:19000",
access_key="admin",
secret_key="80012029Lz",
secure=False
)
self.minio_bucket = "wechat"
# 设置全局实例,供其他地方调用
MessageLogger._instance = self
self.log.info(f"MessageLogger 全局实例已设置: {self}")
# 测试数据库连接
try:
with self.get_db_connection() as conn:
self.log.info("MessageLogger 数据库连接测试成功")
except Exception as e:
self.log.error(f"MessageLogger 数据库连接测试失败: {e}")
self.log.success("=" * 50)
self.log.success("MessageLogger 插件初始化完成!")
self.log.success("=" * 50)
except Exception as e:
self.log.error(f"MessageLogger 插件初始化失败: {e}")
import traceback
self.log.error(f"详细错误: {traceback.format_exc()}")
self.config = None
@classmethod
def get_instance(cls):
"""获取MessageLogger实例"""
instance = getattr(cls, '_instance', None)
if instance is None:
logger.warning("MessageLogger 全局实例为空,可能插件未正确初始化")
return instance
def get_db_connection(self):
"""获取数据库连接"""
return pymysql.connect(
host=self.db_config["host"],
port=self.db_config["port"],
user=self.db_config["user"],
password=self.db_config["password"],
database=self.db_config["database"],
charset=self.db_config["charset"],
autocommit=True
)
def extract_image_info(self, raw_msg: str) -> tuple:
"""从图片消息中提取 CDN URL 和 AES Key"""
try:
root = ET.fromstring(raw_msg)
img = root.find(".//img")
if img is not None:
cdnurl = img.get("cdnbigimgurl", "") or img.get("cdnmidimgurl", "")
aeskey = img.get("aeskey", "")
return (cdnurl, aeskey)
except Exception as e:
self.log.error(f"提取图片信息失败: {e}")
return ("", "")
def extract_video_info(self, raw_msg: str) -> tuple:
"""从视频消息中提取 CDN URL 和 AES Key"""
try:
root = ET.fromstring(raw_msg)
video = root.find(".//videomsg")
if video is not None:
cdnurl = video.get("cdnvideourl", "")
aeskey = video.get("aeskey", "")
# 如果主要的CDN信息为空尝试获取原始视频信息
if not cdnurl or not aeskey:
cdnrawvideourl = video.get("cdnrawvideourl", "")
cdnrawvideoaeskey = video.get("cdnrawvideoaeskey", "")
if cdnrawvideourl and cdnrawvideoaeskey:
self.log.info(f"使用原始视频CDN信息: url={cdnrawvideourl[:50]}..., aeskey={cdnrawvideoaeskey[:20]}...")
return (cdnrawvideourl, cdnrawvideoaeskey)
if cdnurl and aeskey:
self.log.info(f"提取视频CDN信息成功: url={cdnurl[:50]}..., aeskey={aeskey[:20]}...")
return (cdnurl, aeskey)
else:
self.log.warning(f"视频CDN信息不完整: cdnurl={'' if cdnurl else ''}, aeskey={'' if aeskey else ''}")
except Exception as e:
self.log.error(f"提取视频信息失败: {e}")
return ("", "")
def extract_cdn_url(self, raw_msg: str) -> str:
"""从消息中提取 CDN URL表情包等"""
try:
match = re.search(r'cdnurl="([^"]+)"', raw_msg)
if match:
url = match.group(1).replace("&", "&")
return url
except Exception as e:
self.log.error(f"提取 CDN URL 失败: {e}")
return ""
def extract_file_info(self, raw_msg: str) -> tuple:
"""从文件消息中提取文件信息"""
try:
root = ET.fromstring(raw_msg)
appmsg = root.find(".//appmsg")
if appmsg is not None:
# 提取文件基本信息
title = appmsg.find("title")
filename = title.text if title is not None else ""
# 提取文件附件信息
appattach = appmsg.find("appattach")
if appattach is not None:
cdnattachurl = appattach.find("cdnattachurl")
aeskey = appattach.find("aeskey")
fileext = appattach.find("fileext")
totallen = appattach.find("totallen")
cdn_url = cdnattachurl.text if cdnattachurl is not None else ""
aes_key = aeskey.text if aeskey is not None else ""
file_ext = fileext.text if fileext is not None else ""
file_size = totallen.text if totallen is not None else "0"
self.log.info(f"提取文件信息: 文件名={filename}, 大小={file_size}字节, 扩展名={file_ext}")
return (filename, cdn_url, aes_key, file_ext, file_size)
except Exception as e:
self.log.error(f"提取文件信息失败: {e}")
return ("", "", "", "", "0")
async def download_image_and_upload(self, bot, cdnurl: str, aeskey: str) -> str:
"""下载图片并上传到 MinIO同时缓存 base64 供其他插件使用"""
try:
temp_file = Path(__file__).parent / f"temp_{uuid.uuid4().hex}.jpg"
success = await bot.cdn_download(cdnurl, aeskey, str(temp_file), file_type=2)
if not success:
success = await bot.cdn_download(cdnurl, aeskey, str(temp_file), file_type=1)
# 等待文件下载完成
import asyncio
import base64
for _ in range(50):
if temp_file.exists() and temp_file.stat().st_size > 0:
break
await asyncio.sleep(0.1)
if temp_file.exists() and temp_file.stat().st_size > 0:
# 读取文件并缓存 base64供 AIChat 等插件使用)
with open(temp_file, "rb") as f:
image_data = f.read()
base64_data = f"data:image/jpeg;base64,{base64.b64encode(image_data).decode()}"
# 缓存到 Redis5分钟过期
redis_cache = get_cache()
if redis_cache and redis_cache.enabled:
media_key = RedisCache.generate_media_key(cdnurl, aeskey)
if media_key:
redis_cache.cache_media(media_key, base64_data, "image", ttl=300)
self.log.debug(f"图片已缓存到 Redis: {media_key[:20]}...")
media_url = await self.upload_file_to_minio(str(temp_file), "images")
temp_file.unlink()
return media_url
else:
self.log.error("图片下载超时或失败")
return ""
except Exception as e:
self.log.error(f"下载图片并上传失败: {e}")
return ""
async def download_video_and_upload(self, bot, cdnurl: str, aeskey: str) -> str:
"""下载视频并上传到 MinIO"""
try:
temp_file = Path(__file__).parent / f"temp_{uuid.uuid4().hex}.mp4"
# file_type=4 是视频
success = await bot.cdn_download(cdnurl, aeskey, str(temp_file), file_type=4)
# 等待文件下载完成(视频较大,等待时间更长)
import asyncio
for _ in range(100):
if temp_file.exists() and temp_file.stat().st_size > 0:
break
await asyncio.sleep(0.1)
if temp_file.exists() and temp_file.stat().st_size > 0:
media_url = await self.upload_file_to_minio(str(temp_file), "videos")
temp_file.unlink()
return media_url
else:
self.log.error("视频下载超时或失败")
return ""
except Exception as e:
self.log.error(f"下载视频并上传失败: {e}")
return ""
async def download_file_and_upload(self, bot, cdnurl: str, aeskey: str, filename: str, file_ext: str) -> str:
"""下载文件并上传到 MinIO"""
try:
# 生成临时文件名,保持原始扩展名
if not filename.endswith(f".{file_ext}") and file_ext:
temp_filename = f"temp_{uuid.uuid4().hex}.{file_ext}"
else:
temp_filename = f"temp_{uuid.uuid4().hex}_{filename}"
temp_file = Path(__file__).parent / temp_filename
# file_type=5 是文件
self.log.info(f"开始下载文件: {filename}")
success = await bot.cdn_download(cdnurl, aeskey, str(temp_file), file_type=5)
# 等待文件下载完成
import asyncio
downloaded_file = None
# 等待更长时间,并检查可能的文件路径
for i in range(100): # 增加等待时间到10秒
# 检查原始路径
if temp_file.exists() and temp_file.stat().st_size > 0:
downloaded_file = temp_file
break
# 检查可能的其他路径CDN下载可能会修改文件名
parent_dir = temp_file.parent
temp_pattern = temp_file.stem
for possible_file in parent_dir.glob(f"*{temp_pattern}*"):
if possible_file.exists() and possible_file.stat().st_size > 0:
downloaded_file = possible_file
self.log.info(f"找到下载的文件: {possible_file}")
break
if downloaded_file:
break
await asyncio.sleep(0.1)
if downloaded_file and downloaded_file.exists() and downloaded_file.stat().st_size > 0:
self.log.info(f"文件下载成功开始上传到MinIO: {downloaded_file}")
media_url = await self.upload_file_to_minio(str(downloaded_file), "files", filename)
downloaded_file.unlink()
return media_url
else:
self.log.error(f"文件下载超时或失败: {filename}")
# 清理可能存在的临时文件
parent_dir = temp_file.parent
temp_pattern = temp_file.stem
for possible_file in parent_dir.glob(f"*{temp_pattern}*"):
if possible_file.exists():
self.log.info(f"清理临时文件: {possible_file}")
possible_file.unlink()
return ""
except Exception as e:
self.log.error(f"下载文件并上传失败: {e}")
return ""
async def download_and_upload(self, url: str, file_type: str, ext: str) -> str:
"""下载文件并上传到 MinIO同时缓存 base64 供其他插件使用"""
try:
import base64
# 下载文件
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
data = await resp.read()
# 缓存表情包 base64供 AIChat 等插件使用)
if file_type == "emojis" and data:
redis_cache = get_cache()
if redis_cache and redis_cache.enabled:
base64_data = f"data:image/gif;base64,{base64.b64encode(data).decode()}"
media_key = RedisCache.generate_media_key(cdnurl=url)
if media_key:
redis_cache.cache_media(media_key, base64_data, "emoji", ttl=300)
self.log.debug(f"表情包已缓存到 Redis: {media_key[:20]}...")
# 保存到临时文件
temp_file = Path(__file__).parent / f"temp_{uuid.uuid4().hex}{ext}"
temp_file.write_bytes(data)
# 上传
media_url = await self.upload_file_to_minio(str(temp_file), file_type)
# 删除临时文件
temp_file.unlink()
return media_url
except Exception as e:
self.log.error(f"下载并上传文件失败: {e}")
return ""
async def upload_file_to_minio(self, local_file: str, file_type: str, original_filename: str = "") -> str:
"""上传文件到 MinIO"""
try:
# 生成唯一文件名
file_ext = Path(local_file).suffix
unique_id = uuid.uuid4().hex
if original_filename:
# 使用原始文件名(去掉扩展名)+ 唯一ID + 扩展名
original_name = Path(original_filename).stem
# 清理文件名中的特殊字符
import re
original_name = re.sub(r'[^\w\-_\.]', '_', original_name)
filename = f"{original_name}_{unique_id}{file_ext}"
else:
filename = f"{unique_id}{file_ext}"
object_name = f"{file_type}/{datetime.now().strftime('%Y%m%d')}/{filename}"
# 上传文件
import asyncio
await asyncio.to_thread(
self.minio_client.fput_object,
self.minio_bucket,
object_name,
local_file
)
# 返回访问 URL
url = f"http://115.190.113.141:19000/{self.minio_bucket}/{object_name}"
self.log.debug(f"文件上传成功: {url}")
return url
except S3Error as e:
self.log.error(f"上传文件到 MinIO 失败: {e}")
return ""
async def save_message(self, message: dict, msg_type: str, bot: WechatHookClient):
"""保存消息到数据库"""
if not self.config or not self.config["behavior"]["enabled"]:
return
try:
sender_wxid = message.get("SenderWxid", "")
from_wxid = message.get("FromWxid", "")
is_group = message.get("IsGroup", False)
content = message.get("Content", "")
create_time = message.get("CreateTime", 0)
# 转换时间戳
if create_time:
msg_time = datetime.fromtimestamp(create_time)
else:
msg_time = datetime.now()
# 获取昵称和头像
nickname = ""
avatar_url = ""
if is_group and self.config["behavior"]["fetch_avatar"]:
cache_hit = False
# 1. 先尝试从 Redis 缓存获取
if self.redis_cache and self.redis_cache.enabled:
cached_info = self.redis_cache.get_user_basic_info(from_wxid, sender_wxid)
if cached_info:
nickname = cached_info.get("nickname", "")
avatar_url = cached_info.get("avatar_url", "")
if nickname and avatar_url:
cache_hit = True
self.log.debug(f"[缓存命中] {sender_wxid}: {nickname}")
# 2. 缓存未命中,调用 API 获取
if not cache_hit:
try:
self.log.info(f"[缓存未命中] 调用API获取用户信息: {sender_wxid}")
user_info = await bot.get_user_info_in_chatroom(from_wxid, sender_wxid)
if user_info:
# 处理不同的数据结构
if isinstance(user_info.get("nickName"), dict):
nickname = user_info.get("nickName", {}).get("string", "")
else:
nickname = user_info.get("nickName", "")
avatar_url = user_info.get("bigHeadImgUrl", "")
self.log.info(f"API获取成功: nickname={nickname}, avatar_url={avatar_url[:50] if avatar_url else ''}...")
# 3. 将用户信息存入 Redis 缓存
if self.redis_cache and self.redis_cache.enabled and nickname:
self.redis_cache.set_user_info(from_wxid, sender_wxid, user_info)
self.log.debug(f"[已缓存] {sender_wxid}: {nickname}")
else:
self.log.warning(f"用户信息为空: {sender_wxid}")
except Exception as e:
self.log.error(f"获取用户信息失败: {e}")
# 4. 如果仍然没有获取到,从历史记录中查找
if not nickname or not avatar_url:
self.log.info(f"尝试从历史记录获取用户信息: {sender_wxid}")
try:
with self.get_db_connection() as conn:
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
sql = """
SELECT nickname, avatar_url
FROM messages
WHERE sender_wxid = %s AND nickname != '' AND avatar_url != ''
ORDER BY create_time DESC
LIMIT 1
"""
cursor.execute(sql, (sender_wxid,))
result = cursor.fetchone()
if result:
if not nickname:
nickname = result.get("nickname", "")
if not avatar_url:
avatar_url = result.get("avatar_url", "")
self.log.success(f"从历史记录获取成功: nickname={nickname}, avatar_url={avatar_url[:50] if avatar_url else '(空)'}...")
except Exception as e:
self.log.error(f"从历史记录获取用户信息失败: {e}")
elif not is_group and self.config["behavior"]["fetch_avatar"]:
# 私聊消息,尝试获取联系人信息
try:
self.log.info(f"尝试获取私聊用户信息: {sender_wxid}")
# 这里可以添加获取私聊用户信息的逻辑
# user_info = await bot.get_contact_info(sender_wxid)
except Exception as e:
self.log.error(f"获取私聊用户信息失败: {e}")
# 群组ID如果是群聊
group_id = from_wxid if is_group else None
# 处理媒体文件上传
media_url = ""
# 表情包消息 - 从 CDN 下载
if msg_type == "emoji":
cdn_url = self.extract_cdn_url(content)
if cdn_url and cdn_url.startswith("http"):
media_url = await self.download_and_upload(cdn_url, "emojis", ".gif")
# 图片消息 - 使用 CDN 下载 API
elif msg_type == "image":
cdnurl, aeskey = self.extract_image_info(content)
if cdnurl and aeskey:
media_url = await self.download_image_and_upload(bot, cdnurl, aeskey)
# 视频消息 - 使用 CDN 下载 API
elif msg_type == "video":
self.log.info(f"处理视频消息: from={from_wxid}, sender={sender_wxid}")
cdnurl, aeskey = self.extract_video_info(content)
if cdnurl and aeskey:
self.log.info(f"开始下载并上传视频: {cdnurl[:50]}...")
media_url = await self.download_video_and_upload(bot, cdnurl, aeskey)
if media_url:
self.log.success(f"视频上传成功: {media_url}")
else:
self.log.error("视频上传失败")
elif message.get("Video"):
self.log.info("使用消息中的视频数据")
video_data = message["Video"]
temp_file = Path(__file__).parent / f"temp_{uuid.uuid4().hex}.mp4"
temp_file.write_bytes(video_data)
media_url = await self.upload_file_to_minio(str(temp_file), "videos")
temp_file.unlink()
else:
self.log.warning("视频消息中没有找到可用的CDN信息或视频数据")
# 语音消息
elif msg_type == "voice":
voice_data = message.get("ImgBuf", {}).get("buffer")
if voice_data:
temp_file = Path(__file__).parent / f"temp_{uuid.uuid4().hex}.silk"
temp_file.write_bytes(voice_data)
media_url = await self.upload_file_to_minio(str(temp_file), "voices")
temp_file.unlink()
# 文件消息 - 使用 CDN 下载 API
elif msg_type == "file":
self.log.info(f"处理文件消息: from={from_wxid}, sender={sender_wxid}")
filename, cdnurl, aeskey, file_ext, file_size = self.extract_file_info(content)
if cdnurl and aeskey and filename:
self.log.info(f"开始下载并上传文件: {filename} ({file_size}字节)")
media_url = await self.download_file_and_upload(bot, cdnurl, aeskey, filename, file_ext)
if media_url:
self.log.success(f"文件上传成功: {media_url}")
else:
self.log.error("文件上传失败")
elif message.get("File"):
self.log.info("使用消息中的文件数据")
file_data = message["File"]
filename = message.get("Filename", "file")
temp_file = Path(__file__).parent / f"temp_{filename}"
temp_file.write_bytes(file_data)
media_url = await self.upload_file_to_minio(str(temp_file), "files", filename)
temp_file.unlink()
else:
self.log.warning("文件消息中没有找到可用的CDN信息或文件数据")
# 保存到数据库
with self.get_db_connection() as conn:
with conn.cursor() as cursor:
sql = """
INSERT INTO messages
(sender_wxid, nickname, avatar_url, content, msg_type,
is_group, group_id, media_url, create_time)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, (
sender_wxid,
nickname,
avatar_url,
content,
msg_type,
int(is_group),
group_id,
media_url,
msg_time
))
self.log.debug(f"消息已保存: {sender_wxid} - {content[:20]}...")
except Exception as e:
self.log.error(f"保存消息失败: {e}")
async def save_bot_message(self, to_wxid: str, content: str, msg_type: str = "text", media_url: str = ""):
"""保存机器人自身发送的消息"""
if not self.config or not self.config["behavior"]["enabled"]:
return
if not self.config["behavior"].get("log_bot_messages", True):
return
try:
# 获取机器人信息
import tomllib
with open("main_config.toml", "rb") as f:
main_config = tomllib.load(f)
bot_config = main_config.get("Bot", {})
bot_wxid = bot_config.get("wxid", "bot")
bot_nickname = bot_config.get("nickname", "机器人")
bot_avatar_url = ""
# 判断是否是群聊(需要先定义,后面会用到)
is_group = to_wxid.endswith("@chatroom")
group_id = to_wxid if is_group else None
# 获取机器人头像(如果启用了头像获取功能)
if self.config["behavior"]["fetch_avatar"]:
try:
self.log.info(f"尝试获取机器人头像: {bot_wxid}")
# 检查是否有缓存的机器人头像
if not hasattr(self, '_bot_avatar_cache'):
self._bot_avatar_cache = {}
if bot_wxid in self._bot_avatar_cache:
bot_avatar_url = self._bot_avatar_cache[bot_wxid]
self.log.info(f"使用缓存的机器人头像: {bot_avatar_url[:50] if bot_avatar_url else '(空)'}...")
else:
# 尝试自动获取机器人头像
bot_avatar_url = ""
# 方法1: 优先使用配置中的头像URL如果有的话
config_avatar = self.config["behavior"].get("bot_avatar_url", "")
if config_avatar:
bot_avatar_url = config_avatar
self.log.info(f"使用配置的机器人头像: {bot_avatar_url}")
else:
# 方法2: 由于API限制机器人无法通过get_user_info_in_chatroom获取自己的头像
# 我们需要使用其他方法
self.log.info("API无法获取机器人自己的头像建议在配置中设置bot_avatar_url")
# 可以尝试从主配置获取
main_avatar = bot_config.get("avatar_url", "")
if main_avatar:
bot_avatar_url = main_avatar
self.log.info(f"从主配置获取机器人头像: {bot_avatar_url}")
# 缓存头像URL即使为空也缓存避免重复尝试
self._bot_avatar_cache[bot_wxid] = bot_avatar_url
self.log.info(f"最终机器人头像URL: {bot_avatar_url if bot_avatar_url else '(空)'}")
except Exception as e:
self.log.warning(f"获取机器人头像失败: {e}")
bot_avatar_url = ""
# 保存到数据库
with self.get_db_connection() as conn:
with conn.cursor() as cursor:
sql = """
INSERT INTO messages
(sender_wxid, nickname, avatar_url, content, msg_type,
is_group, group_id, media_url, create_time)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, (
bot_wxid,
bot_nickname,
bot_avatar_url, # 使用获取到的机器人头像URL
content,
msg_type,
int(is_group),
group_id,
media_url,
datetime.now()
))
self.log.debug(f"机器人消息已保存: {bot_wxid} -> {to_wxid} - {content[:20]}...")
except Exception as e:
self.log.error(f"保存机器人消息失败: {e}")
@on_text_message(priority=10)
async def handle_text(self, bot: WechatHookClient, message: dict):
"""处理文本消息"""
self.log.info(f"[MessageLogger] 收到文本消息: {message.get('Content', '')[:20]}")
if self.config and self.config["behavior"]["log_text"]:
asyncio.create_task(self.save_message(message, "text", bot))
return True
@on_image_message(priority=10)
async def handle_image(self, bot: WechatHookClient, message: dict):
"""处理图片消息"""
self.log.info(f"[MessageLogger] 收到图片消息: {message.get('FromWxid')}")
if self.config and self.config["behavior"]["log_image"]:
asyncio.create_task(self.save_message(message, "image", bot))
return True
@on_voice_message(priority=10)
async def handle_voice(self, bot: WechatHookClient, message: dict):
"""处理语音消息"""
if self.config and self.config["behavior"]["log_voice"]:
asyncio.create_task(self.save_message(message, "voice", bot))
return True
@on_video_message(priority=10)
async def handle_video(self, bot: WechatHookClient, message: dict):
"""处理视频消息"""
self.log.info(f"MessageLogger 收到视频消息: {message.get('FromWxid')}")
if self.config and self.config["behavior"]["log_video"]:
asyncio.create_task(self.save_message(message, "video", bot))
return True
@on_file_message(priority=10)
async def handle_file(self, bot: WechatHookClient, message: dict):
"""处理文件消息"""
if self.config and self.config["behavior"]["log_file"]:
asyncio.create_task(self.save_message(message, "file", bot))
return True
@on_emoji_message(priority=10)
async def handle_emoji(self, bot: WechatHookClient, message: dict):
"""处理表情包消息"""
if self.config and self.config["behavior"]["log_emoji"]:
asyncio.create_task(self.save_message(message, "emoji", bot))
return True