Files

981 lines
44 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
消息记录插件
将所有消息存储到MySQL数据库
"""
import asyncio
import os
import tomllib
from pathlib import Path
from datetime import datetime
from loguru import logger
from utils.plugin_base import PluginBase
from utils.decorators import (
on_text_message,
on_image_message,
on_voice_message,
on_video_message,
on_link_message,
on_card_message,
on_miniapp_message,
on_file_message,
on_emoji_message,
on_quote_message,
)
from utils.redis_cache import RedisCache, get_cache
from utils.member_info_service import get_member_service
from utils.config_manager import get_config
import aiosqlite
import pymysql
from WechatHook import WechatHookClient
from minio import Minio
from minio.error import S3Error
import uuid
import aiohttp
import re
import xml.etree.ElementTree as ET
class MessageLogger(PluginBase):
"""消息记录插件"""
description = "消息记录插件 - 将消息存储到MySQL"
author = "ShiHao"
version = "1.0.0"
def __init__(self):
super().__init__()
self.config = None
self.db_config = None
self.redis_cache = None # Redis 缓存实例
self.minio_client = None
self.minio_bucket = ""
self.minio_public_base_url = ""
# 创建独立的日志记录器
self._setup_logger()
def _setup_logger(self):
"""设置独立的日志记录器"""
# 创建日志目录
log_dir = Path(__file__).parent.parent.parent / "logs"
log_dir.mkdir(exist_ok=True)
# 添加独立的日志文件处理器(使用 filter 来过滤)
log_file = log_dir / "message_logger.log"
# 为这个插件添加一个独立的日志处理器
self.logger_id = logger.add(
log_file,
rotation="10 MB",
retention="7 days",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
encoding="utf-8",
filter=lambda record: record["extra"].get("name") == "MessageLogger"
)
# 创建带标记的 logger
self.log = logger.bind(name="MessageLogger")
async def async_init(self):
"""异步初始化"""
try:
self.log.info("=" * 50)
self.log.info("MessageLogger 插件开始初始化...")
self.log.info("=" * 50)
config_path = Path(__file__).parent / "config.toml"
if not config_path.exists():
self.log.error(f"MessageLogger 配置文件不存在: {config_path}")
return
with open(config_path, "rb") as f:
self.config = tomllib.load(f)
self.db_config = self.config["database"]
# 初始化 Redis 缓存
redis_config = self.config.get("redis", {})
if redis_config.get("enabled", False):
self.log.info("正在初始化 Redis 缓存...")
self.redis_cache = RedisCache(redis_config)
if self.redis_cache.enabled:
self.log.success(f"Redis 缓存初始化成功TTL={redis_config.get('ttl', 3600)}")
else:
self.log.warning("Redis 缓存初始化失败,将不使用缓存")
self.redis_cache = None
else:
self.log.info("Redis 缓存未启用")
# 初始化 MinIO 客户端(优先读取配置/环境变量,不再硬编码)
minio_config = self._build_minio_config()
if minio_config.get("enabled"):
self.minio_client = Minio(
minio_config["endpoint"],
access_key=minio_config["access_key"],
secret_key=minio_config["secret_key"],
secure=minio_config["secure"],
)
self.minio_bucket = minio_config["bucket"]
self.minio_public_base_url = minio_config["public_base_url"].rstrip("/")
self.log.success(f"MinIO 初始化成功: endpoint={minio_config['endpoint']}, bucket={self.minio_bucket}")
else:
self.minio_client = None
self.minio_bucket = ""
self.minio_public_base_url = ""
self.log.warning("MinIO 未配置或已禁用,媒体消息将不上传,仅记录文本信息")
# 设置全局实例,供其他地方调用
MessageLogger._instance = self
self.log.info(f"MessageLogger 全局实例已设置: {self}")
# 测试数据库连接
try:
await asyncio.to_thread(self._test_db_connection)
self.log.info("MessageLogger 数据库连接测试成功")
except Exception as e:
self.log.error(f"MessageLogger 数据库连接测试失败: {e}")
self.log.success("=" * 50)
self.log.success("MessageLogger 插件初始化完成!")
self.log.success("=" * 50)
except Exception as e:
self.log.error(f"MessageLogger 插件初始化失败: {e}")
import traceback
self.log.error(f"详细错误: {traceback.format_exc()}")
self.config = None
@classmethod
def get_instance(cls):
"""获取MessageLogger实例"""
instance = getattr(cls, '_instance', None)
if instance is None:
logger.warning("MessageLogger 全局实例为空,可能插件未正确初始化")
return instance
def _build_minio_config(self) -> dict:
"""构建 MinIO 配置,环境变量优先于 config.toml。"""
minio_config = (self.config or {}).get("minio", {})
secure = bool(minio_config.get("secure", False))
endpoint = os.getenv("MESSAGE_LOGGER_MINIO_ENDPOINT") or minio_config.get("endpoint", "")
access_key = os.getenv("MESSAGE_LOGGER_MINIO_ACCESS_KEY") or minio_config.get("access_key", "")
secret_key = os.getenv("MESSAGE_LOGGER_MINIO_SECRET_KEY") or minio_config.get("secret_key", "")
bucket = os.getenv("MESSAGE_LOGGER_MINIO_BUCKET") or minio_config.get("bucket", "wechat")
public_base_url = os.getenv("MESSAGE_LOGGER_MINIO_PUBLIC_BASE_URL") or minio_config.get("public_base_url", "")
if not public_base_url and endpoint:
public_base_url = f"{'https' if secure else 'http'}://{endpoint}"
enabled = bool(minio_config.get("enabled", True)) and all([endpoint, access_key, secret_key, bucket])
return {
"enabled": enabled,
"endpoint": endpoint,
"access_key": access_key,
"secret_key": secret_key,
"bucket": bucket,
"secure": secure,
"public_base_url": public_base_url,
}
def _test_db_connection(self):
with self.get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
async def on_unload(self):
if getattr(MessageLogger, "_instance", None) is self:
MessageLogger._instance = None
if getattr(self, "logger_id", None):
try:
logger.remove(self.logger_id)
except Exception:
pass
await super().on_unload()
def get_db_connection(self):
"""获取数据库连接"""
return pymysql.connect(
host=self.db_config["host"],
port=self.db_config["port"],
user=self.db_config["user"],
password=self.db_config["password"],
database=self.db_config["database"],
charset=self.db_config["charset"],
autocommit=True
)
def _fetch_latest_profile_from_history(self, sender_wxid: str) -> dict:
with self.get_db_connection() as conn:
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
sql = """
SELECT nickname, avatar_url
FROM messages
WHERE sender_wxid = %s AND nickname != '' AND avatar_url != ''
ORDER BY create_time DESC
LIMIT 1
"""
cursor.execute(sql, (sender_wxid,))
return cursor.fetchone() or {}
def _insert_message_record(self, sender_wxid: str, nickname: str, avatar_url: str,
content: str, msg_type: str, is_group: bool,
group_id: str, media_url: str, create_time: datetime):
with self.get_db_connection() as conn:
with conn.cursor() as cursor:
sql = """
INSERT INTO messages
(sender_wxid, nickname, avatar_url, content, msg_type,
is_group, group_id, media_url, create_time)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, (
sender_wxid,
nickname,
avatar_url,
content,
msg_type,
int(is_group),
group_id,
media_url,
create_time,
))
def _load_bot_profile_from_main_config(self) -> tuple[str, str, str]:
bot_config = get_config().get_section("Bot")
bot_wxid = bot_config.get("wxid", "bot")
bot_nickname = bot_config.get("nickname", "机器人")
bot_avatar_url = bot_config.get("avatar_url", "")
return bot_wxid, bot_nickname, bot_avatar_url
def _get_member_sync_db_path(self):
"""获取 MemberSync SQLite 数据库路径(带缓存)"""
if hasattr(self, "_member_sync_db_path"):
return self._member_sync_db_path
try:
config_path = Path(__file__).parent.parent / "MemberSync" / "config.toml"
if not config_path.exists():
self.log.warning(f"MemberSync 配置不存在: {config_path}")
self._member_sync_db_path = None
return None
with open(config_path, "rb") as f:
cfg = tomllib.load(f)
db_rel = (cfg.get("database") or {}).get("db_path", "data/member_sync.db")
self._member_sync_db_path = Path(__file__).parent.parent / "MemberSync" / db_rel
except Exception as e:
self.log.warning(f"读取 MemberSync 配置失败: {e}")
self._member_sync_db_path = None
return self._member_sync_db_path
async def _update_member_activity(self, chatroom_wxid: str, wxid: str, msg_time: datetime):
"""更新群成员最后发言时间与周期统计MemberSync"""
if not chatroom_wxid or not wxid:
return
db_path = self._get_member_sync_db_path()
if not db_path or not db_path.exists():
return
last_msg_at = msg_time.strftime("%Y年%m月%d%H时%M分钟")
daily_key = msg_time.strftime("%Y-%m-%d")
iso_year, iso_week, _ = msg_time.isocalendar()
weekly_key = f"{iso_year}-W{iso_week:02d}"
monthly_key = msg_time.strftime("%Y-%m")
try:
async with aiosqlite.connect(db_path) as db:
await db.execute(
"""
UPDATE group_members
SET
last_msg_at = CASE
WHEN last_msg_at IS NULL OR last_msg_at = '' OR last_msg_at < ?
THEN ?
ELSE last_msg_at
END,
daily_key = ?,
daily_count = CASE
WHEN daily_key = ? THEN COALESCE(daily_count, 0) + 1
ELSE 1
END,
weekly_key = ?,
weekly_count = CASE
WHEN weekly_key = ? THEN COALESCE(weekly_count, 0) + 1
ELSE 1
END,
monthly_key = ?,
monthly_count = CASE
WHEN monthly_key = ? THEN COALESCE(monthly_count, 0) + 1
ELSE 1
END
WHERE chatroom_wxid = ? AND wxid = ?
""",
(
last_msg_at,
last_msg_at,
daily_key,
daily_key,
weekly_key,
weekly_key,
monthly_key,
monthly_key,
chatroom_wxid,
wxid,
),
)
await db.commit()
except Exception as e:
self.log.warning(f"更新 MemberSync 最后发言时间失败: {e}")
def extract_image_info(self, raw_msg: str) -> tuple:
"""从图片消息中提取 CDN URL 和 AES Key"""
try:
root = ET.fromstring(raw_msg)
img = root.find(".//img")
if img is not None:
cdnurl = img.get("cdnbigimgurl", "") or img.get("cdnmidimgurl", "")
aeskey = img.get("aeskey", "")
return (cdnurl, aeskey)
except Exception as e:
self.log.error(f"提取图片信息失败: {e}")
return ("", "")
def extract_video_info(self, raw_msg: str) -> tuple:
"""从视频消息中提取 CDN URL 和 AES Key"""
try:
root = ET.fromstring(raw_msg)
video = root.find(".//videomsg")
if video is not None:
cdnurl = video.get("cdnvideourl", "")
aeskey = video.get("aeskey", "")
# 如果主要的CDN信息为空尝试获取原始视频信息
if not cdnurl or not aeskey:
cdnrawvideourl = video.get("cdnrawvideourl", "")
cdnrawvideoaeskey = video.get("cdnrawvideoaeskey", "")
if cdnrawvideourl and cdnrawvideoaeskey:
self.log.info(f"使用原始视频CDN信息: url={cdnrawvideourl[:50]}..., aeskey={cdnrawvideoaeskey[:20]}...")
return (cdnrawvideourl, cdnrawvideoaeskey)
if cdnurl and aeskey:
self.log.info(f"提取视频CDN信息成功: url={cdnurl[:50]}..., aeskey={aeskey[:20]}...")
return (cdnurl, aeskey)
else:
self.log.warning(f"视频CDN信息不完整: cdnurl={'' if cdnurl else ''}, aeskey={'' if aeskey else ''}")
except Exception as e:
self.log.error(f"提取视频信息失败: {e}")
return ("", "")
def extract_cdn_url(self, raw_msg: str) -> str:
"""从消息中提取 CDN URL表情包等"""
try:
match = re.search(r'cdnurl\s*=\s*"([^"]+)"', raw_msg)
if not match:
match = re.search(r"cdnurl\s*=\s*'([^']+)'", raw_msg)
if match:
url = match.group(1).replace("&amp;", "&")
return url
except Exception as e:
self.log.error(f"提取 CDN URL 失败: {e}")
return ""
def _summarize_content_for_storage(self, msg_type: str, content: str) -> str:
"""将非文本 XML 正文转换为可读摘要,避免把整段 XML 写入 content 字段"""
if not isinstance(content, str):
return str(content) if content is not None else ""
text = content.strip()
if not text:
return ""
# 仅处理明显的 XML 载荷,普通文本保持原样
is_xml_payload = text.startswith("<msg") or text.startswith("<?xml")
if msg_type == "text" or not is_xml_payload:
return content
if msg_type == "emoji":
return "[表情消息]"
if msg_type == "voice":
try:
root = ET.fromstring(text)
voice_node = root.find(".//voicemsg")
if voice_node is not None:
voice_length = voice_node.get("voicelength", "")
if voice_length:
return f"[语音消息] 时长={voice_length}ms"
except Exception:
pass
return "[语音消息]"
if msg_type == "image":
return "[图片消息]"
if msg_type == "video":
return "[视频消息]"
if msg_type == "file":
filename, _, _, _, _ = self.extract_file_info(text)
return f"[文件消息] {filename}" if filename else "[文件消息]"
return f"[{msg_type}消息]"
def extract_file_info(self, raw_msg: str) -> tuple:
"""从文件消息中提取文件信息"""
try:
root = ET.fromstring(raw_msg)
appmsg = root.find(".//appmsg")
if appmsg is not None:
# 提取文件基本信息
title = appmsg.find("title")
filename = title.text if title is not None else ""
# 提取文件附件信息
appattach = appmsg.find("appattach")
if appattach is not None:
cdnattachurl = appattach.find("cdnattachurl")
aeskey = appattach.find("aeskey")
fileext = appattach.find("fileext")
totallen = appattach.find("totallen")
cdn_url = cdnattachurl.text if cdnattachurl is not None else ""
aes_key = aeskey.text if aeskey is not None else ""
file_ext = fileext.text if fileext is not None else ""
file_size = totallen.text if totallen is not None else "0"
self.log.info(f"提取文件信息: 文件名={filename}, 大小={file_size}字节, 扩展名={file_ext}")
return (filename, cdn_url, aes_key, file_ext, file_size)
except Exception as e:
self.log.error(f"提取文件信息失败: {e}")
return ("", "", "", "", "0")
async def download_image_and_upload(self, bot, message: dict) -> str:
"""下载图片并上传到 MinIO使用新协议同时缓存 base64 供其他插件使用"""
try:
import base64
temp_file = Path(__file__).parent / f"temp_{uuid.uuid4().hex}.jpg"
self.log.info(f"开始下载图片到: {temp_file}")
# 使用统一入口下载图片
result = await bot.download_wechat_media("image", message=message, save_path=str(temp_file))
self.log.info(f"下载图片返回: result={result}")
# 使用实际返回的路径(可能与请求路径不同)
actual_file = Path(result) if result and result != "expired" else temp_file
if result and actual_file.exists() and actual_file.stat().st_size > 0:
self.log.info(f"图片文件已生成: {actual_file}, size={actual_file.stat().st_size}")
# 读取文件并缓存 base64供 AIChat 等插件使用)
with open(actual_file, "rb") as f:
image_data = f.read()
base64_data = f"data:image/jpeg;base64,{base64.b64encode(image_data).decode()}"
# 缓存到 Redis5分钟过期
redis_cache = get_cache()
if redis_cache and redis_cache.enabled:
# 使用消息ID作为缓存key
msg_id = message.get("_raw", message).get("msgId", "")
new_msg_id = message.get("_raw", message).get("newMsgId", "")
self.log.info(f"准备缓存图片: msgId={msg_id}, newMsgId={new_msg_id}")
if msg_id:
media_key = f"image:{msg_id}"
redis_cache.cache_media(media_key, base64_data, "image", ttl=900)
self.log.info(f"图片已缓存到 Redis: {media_key}")
# 同时使用 newMsgId (svrid) 作为缓存key供引用消息使用
if new_msg_id:
media_key_svrid = f"image:svrid:{new_msg_id}"
redis_cache.cache_media(media_key_svrid, base64_data, "image", ttl=900)
self.log.info(f"图片已缓存到 Redis (svrid): {media_key_svrid}")
media_url = await self.upload_file_to_minio(str(actual_file), "images")
actual_file.unlink()
return media_url
else:
self.log.error(f"图片下载失败: result={result}, actual_file={actual_file}, exists={actual_file.exists() if actual_file else False}")
return ""
except Exception as e:
self.log.error(f"下载图片并上传失败: {e}")
return ""
async def download_video_and_upload(self, bot, message: dict) -> str:
"""下载视频并上传到 MinIO使用新协议"""
try:
temp_file = Path(__file__).parent / f"temp_{uuid.uuid4().hex}.mp4"
# 使用统一入口下载视频
result = await bot.download_wechat_media("video", message=message, save_path=str(temp_file))
# 使用实际返回的路径(可能与请求路径不同)
actual_file = Path(result) if result and result != "expired" else temp_file
if result and actual_file.exists() and actual_file.stat().st_size > 0:
media_url = await self.upload_file_to_minio(str(actual_file), "videos")
actual_file.unlink()
return media_url
else:
self.log.error("视频下载失败")
return ""
except Exception as e:
self.log.error(f"下载视频并上传失败: {e}")
return ""
async def download_file_and_upload(self, bot, cdnurl: str, aeskey: str, filename: str, file_ext: str) -> str:
"""下载文件并上传到 MinIO"""
try:
# 生成临时文件名,保持原始扩展名
if not filename.endswith(f".{file_ext}") and file_ext:
temp_filename = f"temp_{uuid.uuid4().hex}.{file_ext}"
else:
temp_filename = f"temp_{uuid.uuid4().hex}_{filename}"
temp_file = Path(__file__).parent / temp_filename
# 新接口不支持 CDN 下载文件,暂时跳过
self.log.warning(f"新接口暂不支持文件下载: {filename}")
return ""
# file_type=5 是文件
# self.log.info(f"开始下载文件: {filename}")
# success = await bot.cdn_download(cdnurl, aeskey, str(temp_file), file_type=5)
# 等待文件下载完成
import asyncio
downloaded_file = None
# 等待更长时间,并检查可能的文件路径
for i in range(100): # 增加等待时间到10秒
# 检查原始路径
if temp_file.exists() and temp_file.stat().st_size > 0:
downloaded_file = temp_file
break
# 检查可能的其他路径CDN下载可能会修改文件名
parent_dir = temp_file.parent
temp_pattern = temp_file.stem
for possible_file in parent_dir.glob(f"*{temp_pattern}*"):
if possible_file.exists() and possible_file.stat().st_size > 0:
downloaded_file = possible_file
self.log.info(f"找到下载的文件: {possible_file}")
break
if downloaded_file:
break
await asyncio.sleep(0.1)
if downloaded_file and downloaded_file.exists() and downloaded_file.stat().st_size > 0:
self.log.info(f"文件下载成功开始上传到MinIO: {downloaded_file}")
media_url = await self.upload_file_to_minio(str(downloaded_file), "files", filename)
downloaded_file.unlink()
return media_url
else:
self.log.error(f"文件下载超时或失败: {filename}")
# 清理可能存在的临时文件
parent_dir = temp_file.parent
temp_pattern = temp_file.stem
for possible_file in parent_dir.glob(f"*{temp_pattern}*"):
if possible_file.exists():
self.log.info(f"清理临时文件: {possible_file}")
possible_file.unlink()
return ""
except Exception as e:
self.log.error(f"下载文件并上传失败: {e}")
return ""
async def download_and_upload(self, url: str, file_type: str, ext: str) -> str:
"""下载文件并上传到 MinIO同时缓存 base64 供其他插件使用"""
try:
import base64
# 下载文件
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
data = await resp.read()
# 缓存表情包 base64供 AIChat 等插件使用)
if file_type == "emojis" and data:
redis_cache = get_cache()
if redis_cache and redis_cache.enabled:
base64_data = f"data:image/gif;base64,{base64.b64encode(data).decode()}"
media_key = RedisCache.generate_media_key(cdnurl=url)
if media_key:
redis_cache.cache_media(media_key, base64_data, "emoji", ttl=300)
self.log.debug(f"表情包已缓存到 Redis: {media_key[:20]}...")
# 保存到临时文件
temp_file = Path(__file__).parent / f"temp_{uuid.uuid4().hex}{ext}"
temp_file.write_bytes(data)
# 上传
media_url = await self.upload_file_to_minio(str(temp_file), file_type)
# 删除临时文件
temp_file.unlink()
return media_url
except Exception as e:
self.log.error(f"下载并上传文件失败: {e}")
return ""
async def upload_file_to_minio(self, local_file: str, file_type: str, original_filename: str = "") -> str:
"""上传文件到 MinIO"""
try:
if not self.minio_client or not self.minio_bucket:
self.log.warning("MinIO 未启用,跳过媒体上传")
return ""
# 生成唯一文件名
file_ext = Path(local_file).suffix
unique_id = uuid.uuid4().hex
if original_filename:
# 使用原始文件名(去掉扩展名)+ 唯一ID + 扩展名
original_name = Path(original_filename).stem
# 清理文件名中的特殊字符
import re
original_name = re.sub(r'[^\w\-_\.]', '_', original_name)
filename = f"{original_name}_{unique_id}{file_ext}"
else:
filename = f"{unique_id}{file_ext}"
object_name = f"{file_type}/{datetime.now().strftime('%Y%m%d')}/{filename}"
# 上传文件
import asyncio
await asyncio.to_thread(
self.minio_client.fput_object,
self.minio_bucket,
object_name,
local_file
)
# 返回访问 URL
if self.minio_public_base_url:
url = f"{self.minio_public_base_url}/{self.minio_bucket}/{object_name}"
else:
url = f"/{self.minio_bucket}/{object_name}"
self.log.debug(f"文件上传成功: {url}")
return url
except S3Error as e:
self.log.error(f"上传文件到 MinIO 失败: {e}")
return ""
async def save_message(self, message: dict, msg_type: str, bot: WechatHookClient):
"""保存消息到数据库"""
if not self.config or not self.config["behavior"]["enabled"]:
return
try:
sender_wxid = message.get("SenderWxid", "")
from_wxid = message.get("FromWxid", "")
is_group = message.get("IsGroup", False)
content = message.get("Content", "")
content_for_storage = self._summarize_content_for_storage(msg_type, content)
create_time = message.get("CreateTime", 0)
# 转换时间戳
if create_time:
msg_time = datetime.fromtimestamp(create_time)
else:
msg_time = datetime.now()
# 获取昵称和头像(优先使用 MemberSync 数据库)
nickname = ""
avatar_url = ""
if is_group and self.config["behavior"]["fetch_avatar"]:
# 1. 优先从 MemberSync 数据库获取
member_service = get_member_service()
member_info = await member_service.get_chatroom_member_info(from_wxid, sender_wxid)
if not member_info:
member_info = await member_service.get_member_info(sender_wxid)
if member_info:
nickname = member_info.get("nickname", "")
avatar_url = member_info.get("avatar_url", "")
self.log.debug(f"[MemberSync数据库命中] {sender_wxid}: {nickname}")
# 2. 数据库未命中,尝试 Redis 缓存
if not nickname or not avatar_url:
if self.redis_cache and self.redis_cache.enabled:
cached_info = self.redis_cache.get_user_basic_info(from_wxid, sender_wxid)
if cached_info:
nickname = cached_info.get("nickname", "") or nickname
avatar_url = cached_info.get("avatar_url", "") or avatar_url
if nickname and avatar_url:
self.log.debug(f"[Redis缓存命中] {sender_wxid}: {nickname}")
# 3. 如果仍然没有获取到,从历史记录中查找
if not nickname or not avatar_url:
self.log.info(f"尝试从历史记录获取用户信息: {sender_wxid}")
try:
result = await asyncio.to_thread(self._fetch_latest_profile_from_history, sender_wxid)
if result:
if not nickname:
nickname = result.get("nickname", "")
if not avatar_url:
avatar_url = result.get("avatar_url", "")
self.log.success(f"从历史记录获取成功: nickname={nickname}, avatar_url={avatar_url[:50] if avatar_url else '(空)'}...")
except Exception as e:
self.log.error(f"从历史记录获取用户信息失败: {e}")
elif not is_group and self.config["behavior"]["fetch_avatar"]:
# 私聊消息,尝试获取联系人信息
try:
self.log.info(f"尝试获取私聊用户信息: {sender_wxid}")
# 这里可以添加获取私聊用户信息的逻辑
# user_info = await bot.get_contact_info(sender_wxid)
except Exception as e:
self.log.error(f"获取私聊用户信息失败: {e}")
# 群组ID如果是群聊
group_id = from_wxid if is_group else None
# 处理媒体文件上传
media_url = ""
# 表情包消息 - 从 CDN 下载
if msg_type == "emoji":
cdn_url = self.extract_cdn_url(content)
if cdn_url and cdn_url.startswith("http"):
media_url = await self.download_and_upload(cdn_url, "emojis", ".gif")
# 图片消息 - 使用新协议下载
elif msg_type == "image":
media_url = await self.download_image_and_upload(bot, message)
# 视频消息 - 使用新协议下载
elif msg_type == "video":
self.log.info(f"处理视频消息: from={from_wxid}, sender={sender_wxid}")
media_url = await self.download_video_and_upload(bot, message)
if media_url:
self.log.success(f"视频上传成功: {media_url}")
else:
self.log.error("视频下载或上传失败")
# 语音消息
elif msg_type == "voice":
voice_data = message.get("ImgBuf", {}).get("buffer")
if voice_data:
temp_file = Path(__file__).parent / f"temp_{uuid.uuid4().hex}.silk"
temp_file.write_bytes(voice_data)
media_url = await self.upload_file_to_minio(str(temp_file), "voices")
temp_file.unlink()
# 文件消息 - 使用 CDN 下载 API
elif msg_type == "file":
self.log.info(f"处理文件消息: from={from_wxid}, sender={sender_wxid}")
filename, cdnurl, aeskey, file_ext, file_size = self.extract_file_info(content)
if cdnurl and aeskey and filename:
self.log.info(f"开始下载并上传文件: {filename} ({file_size}字节)")
media_url = await self.download_file_and_upload(bot, cdnurl, aeskey, filename, file_ext)
if media_url:
self.log.success(f"文件上传成功: {media_url}")
else:
self.log.error("文件上传失败")
elif message.get("File"):
self.log.info("使用消息中的文件数据")
file_data = message["File"]
filename = message.get("Filename", "file")
temp_file = Path(__file__).parent / f"temp_{filename}"
temp_file.write_bytes(file_data)
media_url = await self.upload_file_to_minio(str(temp_file), "files", filename)
temp_file.unlink()
else:
self.log.warning("文件消息中没有找到可用的CDN信息或文件数据")
# 保存到数据库(放到线程池,避免阻塞事件循环)
await asyncio.to_thread(
self._insert_message_record,
sender_wxid,
nickname,
avatar_url,
content_for_storage,
msg_type,
is_group,
group_id,
media_url,
msg_time,
)
self.log.debug(f"消息已保存: {sender_wxid} - {content_for_storage[:20]}...")
# 记录群成员最后发言时间与周期统计(仅群聊)
if is_group:
await self._update_member_activity(from_wxid, sender_wxid, msg_time)
except Exception as e:
self.log.error(f"保存消息失败: {e}")
async def save_bot_message(self, to_wxid: str, content: str, msg_type: str = "text", media_url: str = ""):
"""保存机器人自身发送的消息"""
if not self.config or not self.config["behavior"]["enabled"]:
return
if not self.config["behavior"].get("log_bot_messages", True):
return
try:
# 获取机器人信息
bot_wxid, bot_nickname, bot_avatar_url = self._load_bot_profile_from_main_config()
main_config_avatar = bot_avatar_url
# 判断是否是群聊(需要先定义,后面会用到)
is_group = to_wxid.endswith("@chatroom")
group_id = to_wxid if is_group else None
# 机器人媒体消息:如果传入的是本地文件路径,先上传到 MinIO 再入库
final_media_url = media_url
if msg_type in {"image", "video", "file", "voice"} and media_url:
is_remote_url = media_url.startswith("http://") or media_url.startswith("https://")
if not is_remote_url:
local_path = Path(media_url)
if local_path.exists() and local_path.is_file():
media_type_map = {
"image": "images",
"video": "videos",
"file": "files",
"voice": "voices",
}
upload_type = media_type_map.get(msg_type, "files")
uploaded_url = await self.upload_file_to_minio(
str(local_path),
upload_type,
local_path.name
)
if uploaded_url:
final_media_url = uploaded_url
self.log.info(f"机器人{msg_type}消息已上传到 MinIO: {uploaded_url}")
else:
self.log.warning(f"机器人{msg_type}消息上传 MinIO 失败,保留原 media_url")
else:
self.log.warning(f"机器人{msg_type}消息本地文件不存在: {media_url}")
# 获取机器人头像(如果启用了头像获取功能)
if self.config["behavior"]["fetch_avatar"]:
try:
self.log.info(f"尝试获取机器人头像: {bot_wxid}")
# 检查是否有缓存的机器人头像
if not hasattr(self, '_bot_avatar_cache'):
self._bot_avatar_cache = {}
if bot_wxid in self._bot_avatar_cache:
bot_avatar_url = self._bot_avatar_cache[bot_wxid]
self.log.info(f"使用缓存的机器人头像: {bot_avatar_url[:50] if bot_avatar_url else '(空)'}...")
else:
# 尝试自动获取机器人头像
bot_avatar_url = ""
# 方法1: 优先使用配置中的头像URL如果有的话
config_avatar = self.config["behavior"].get("bot_avatar_url", "")
if config_avatar:
bot_avatar_url = config_avatar
self.log.info(f"使用配置的机器人头像: {bot_avatar_url}")
else:
# 方法2: 由于API限制机器人无法通过get_user_info_in_chatroom获取自己的头像
# 我们需要使用其他方法
self.log.info("API无法获取机器人自己的头像建议在配置中设置bot_avatar_url")
# 可以尝试从主配置获取
main_avatar = main_config_avatar
if main_avatar:
bot_avatar_url = main_avatar
self.log.info(f"从主配置获取机器人头像: {bot_avatar_url}")
# 缓存头像URL即使为空也缓存避免重复尝试
self._bot_avatar_cache[bot_wxid] = bot_avatar_url
self.log.info(f"最终机器人头像URL: {bot_avatar_url if bot_avatar_url else '(空)'}")
except Exception as e:
self.log.warning(f"获取机器人头像失败: {e}")
bot_avatar_url = ""
# 保存到数据库(放到线程池,避免阻塞事件循环)
await asyncio.to_thread(
self._insert_message_record,
bot_wxid,
bot_nickname,
bot_avatar_url,
content,
msg_type,
is_group,
group_id,
final_media_url,
datetime.now(),
)
self.log.debug(f"机器人消息已保存: {bot_wxid} -> {to_wxid} - {content[:20]}...")
except Exception as e:
self.log.error(f"保存机器人消息失败: {e}")
@on_text_message(priority=10)
async def handle_text(self, bot: WechatHookClient, message: dict):
"""处理文本消息"""
self.log.info(f"[MessageLogger] 收到文本消息: {message.get('Content', '')[:20]}")
if self.config and self.config["behavior"]["log_text"]:
asyncio.create_task(self.save_message(message, "text", bot))
return True
@on_image_message(priority=10)
async def handle_image(self, bot: WechatHookClient, message: dict):
"""处理图片消息"""
self.log.info(f"[MessageLogger] 收到图片消息: {message.get('FromWxid')}")
if self.config and self.config["behavior"]["log_image"]:
asyncio.create_task(self.save_message(message, "image", bot))
return True
@on_voice_message(priority=10)
async def handle_voice(self, bot: WechatHookClient, message: dict):
"""处理语音消息"""
if self.config and self.config["behavior"]["log_voice"]:
asyncio.create_task(self.save_message(message, "voice", bot))
return True
@on_video_message(priority=10)
async def handle_video(self, bot: WechatHookClient, message: dict):
"""处理视频消息"""
self.log.info(f"MessageLogger 收到视频消息: {message.get('FromWxid')}")
if self.config and self.config["behavior"]["log_video"]:
asyncio.create_task(self.save_message(message, "video", bot))
return True
@on_link_message(priority=10)
async def handle_link(self, bot: WechatHookClient, message: dict):
"""处理链接消息"""
if self.config and self.config["behavior"].get("log_text", True):
asyncio.create_task(self.save_message(message, "link", bot))
return True
@on_card_message(priority=10)
async def handle_card(self, bot: WechatHookClient, message: dict):
"""处理名片消息"""
if self.config and self.config["behavior"].get("log_text", True):
asyncio.create_task(self.save_message(message, "card", bot))
return True
@on_miniapp_message(priority=10)
async def handle_miniapp(self, bot: WechatHookClient, message: dict):
"""处理小程序消息"""
if self.config and self.config["behavior"].get("log_text", True):
asyncio.create_task(self.save_message(message, "miniapp", bot))
return True
@on_quote_message(priority=10)
async def handle_quote(self, bot: WechatHookClient, message: dict):
"""处理引用消息"""
if self.config and self.config["behavior"].get("log_text", True):
asyncio.create_task(self.save_message(message, "quote", bot))
return True
@on_file_message(priority=10)
async def handle_file(self, bot: WechatHookClient, message: dict):
"""处理文件消息"""
if self.config and self.config["behavior"]["log_file"]:
asyncio.create_task(self.save_message(message, "file", bot))
return True
@on_emoji_message(priority=10)
async def handle_emoji(self, bot: WechatHookClient, message: dict):
"""处理表情包消息"""
if self.config and self.config["behavior"]["log_emoji"]:
asyncio.create_task(self.save_message(message, "emoji", bot))
return True