优化群媒体落盘去重与链接复用

This commit is contained in:
liuwei
2026-05-06 11:04:43 +08:00
parent 3dd4300608
commit 831a61e7ea

View File

@@ -1,6 +1,7 @@
import asyncio import asyncio
import time import time
import html import html
import hashlib
from datetime import datetime, timedelta from datetime import datetime, timedelta
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
import concurrent.futures # 添加线程池支持 import concurrent.futures # 添加线程池支持
@@ -79,6 +80,14 @@ class MessageStorage:
# 确保图片存储目录存在 # 确保图片存储目录存在
if not os.path.exists(self.image_dir): if not os.path.exists(self.image_dir):
os.makedirs(self.image_dir, exist_ok=True) os.makedirs(self.image_dir, exist_ok=True)
# 媒体实体文件统一放到共享目录,群目录只放软链接/硬链接视图:
# 1. 这样相同图片在多个群出现时,磁盘上真正的数据只保留一份;
# 2. 群目录仍然保留“按群浏览”的可读性,方便后续文件管理;
# 3. 若运行环境不支持软链接,会自动回退到硬链接,再不行就直接使用共享路径。
self.shared_media_dir = os.path.join(self.image_dir, "_shared")
self.group_media_dir = os.path.join(self.image_dir, "_by_group")
os.makedirs(self.shared_media_dir, exist_ok=True)
os.makedirs(self.group_media_dir, exist_ok=True)
logger.debug(f"图片存储目录: {self.image_dir}") logger.debug(f"图片存储目录: {self.image_dir}")
def _extract_media_info(self, xml_content: str, message_type: str) -> Dict[str, str]: def _extract_media_info(self, xml_content: str, message_type: str) -> Dict[str, str]:
@@ -115,6 +124,102 @@ class MessageStorage:
return "webp" return "webp"
return "bin" return "bin"
@staticmethod
def _sanitize_storage_segment(value: str) -> str:
"""把群 ID / 会话 ID 转成适合目录名的安全片段。"""
raw = str(value or "").strip()
if not raw:
return "unknown"
# Windows/Linux 都可能用到这里,统一替换掉路径敏感字符,避免群 ID 直接落目录时报错。
sanitized = re.sub(r'[^0-9A-Za-z_.@-]+', '_', raw)
return sanitized[:120] or "unknown"
@staticmethod
def _compute_content_digest(data: bytes) -> str:
"""为没有 md5 的媒体生成稳定内容哈希,跨群重复内容也能命中同一实体文件。"""
return hashlib.sha1(data or b"").hexdigest()
def _build_shared_media_file_name(
self,
message_type: str,
message_id: str,
extension: str,
media_md5: str,
data: bytes,
) -> str:
"""构造共享实体文件名,优先复用协议层 md5缺失时退化到内容哈希。"""
normalized_extension = str(extension or "bin").strip().lstrip(".") or "bin"
normalized_md5 = str(media_md5 or "").strip().lower()
if normalized_md5:
return f"{normalized_md5}.{normalized_extension}"
# 旧逻辑在拿不到 md5 时直接用 message_id 命名,跨群重复内容仍会多落一份。
# 这里改成按内容哈希命名,即便来自不同群、不同消息 ID也会复用同一实体文件。
content_digest = self._compute_content_digest(data)
return f"sha1_{content_digest}.{normalized_extension}"
def _static_abs_to_web_path(self, absolute_path: str) -> str:
"""把 static/images 下的绝对路径转换为浏览器可访问路径。"""
normalized_abs = os.path.abspath(str(absolute_path or ""))
relative_path = os.path.relpath(normalized_abs, self.image_dir).replace("\\", "/")
return f"/static/images/{relative_path}"
def _web_path_to_static_abs(self, web_path: str) -> str:
"""把 /static/images/... 反解回本地绝对路径。"""
text = str(web_path or "").strip()
if text.startswith("/static/images/"):
suffix = text[len("/static/images/"):].replace("/", os.sep)
return os.path.abspath(os.path.join(self.image_dir, suffix))
return os.path.abspath(text) if text else ""
def _ensure_group_media_link(self, room_id: str, file_name: str, shared_abs_path: str) -> str:
"""为群目录创建指向共享实体文件的链接视图,并返回可访问路径。
说明:
1. Linux 线上优先使用软链接,磁盘占用最低;
2. 若当前环境不允许创建软链接,则回退到硬链接;
3. 两种链接都失败时,直接回退共享路径,保证主流程绝不因为链接失败中断。
"""
safe_room_id = self._sanitize_storage_segment(room_id)
group_dir = os.path.join(self.group_media_dir, safe_room_id)
os.makedirs(group_dir, exist_ok=True)
link_abs_path = os.path.join(group_dir, file_name)
try:
if os.path.lexists(link_abs_path):
# 已经存在正确链接/文件时直接复用,避免重复系统调用。
if os.path.realpath(link_abs_path) == os.path.realpath(shared_abs_path):
return self._static_abs_to_web_path(link_abs_path)
os.remove(link_abs_path)
relative_target = os.path.relpath(shared_abs_path, group_dir)
try:
os.symlink(relative_target, link_abs_path)
return self._static_abs_to_web_path(link_abs_path)
except Exception as symlink_error:
logger.debug(f"创建媒体软链接失败,准备回退硬链接: path={link_abs_path}, error={symlink_error}")
try:
os.link(shared_abs_path, link_abs_path)
return self._static_abs_to_web_path(link_abs_path)
except Exception as hardlink_error:
logger.debug(f"创建媒体硬链接失败,回退共享路径: path={link_abs_path}, error={hardlink_error}")
return self._static_abs_to_web_path(shared_abs_path)
except Exception as link_error:
logger.warning(f"创建群媒体链接视图失败,回退共享路径: room_id={room_id}, error={link_error}")
return self._static_abs_to_web_path(shared_abs_path)
def _resolve_shared_media_abs_path(self, existing_web_path: str) -> str:
"""根据已存图片路径解析共享实体文件的真实路径。"""
existing_abs = self._web_path_to_static_abs(existing_web_path)
if existing_abs and os.path.exists(existing_abs):
return os.path.realpath(existing_abs)
file_name = os.path.basename(str(existing_web_path or "").strip())
if file_name:
fallback_abs = os.path.join(self.shared_media_dir, file_name)
if os.path.exists(fallback_abs):
return os.path.realpath(fallback_abs)
return ""
async def _download_direct_binary(self, url: str) -> bytes: async def _download_direct_binary(self, url: str) -> bytes:
headers = { headers = {
"User-Agent": "Mozilla/5.0", "User-Agent": "Mozilla/5.0",
@@ -265,6 +370,10 @@ class MessageStorage:
if media_md5: if media_md5:
existing = self.message_db.get_media_message_by_md5(media_md5, current_message_id=message_id) existing = self.message_db.get_media_message_by_md5(media_md5, current_message_id=message_id)
if existing and existing.get("image_path"): if existing and existing.get("image_path"):
shared_abs_path = self._resolve_shared_media_abs_path(existing.get("image_path"))
if shared_abs_path:
linked_path = self._ensure_group_media_link(group_id or "unknown", os.path.basename(shared_abs_path), shared_abs_path)
else:
linked_path = existing.get("image_path") linked_path = existing.get("image_path")
success = self.message_db.update_message_image_file_path(message_id, linked_path) success = self.message_db.update_message_image_file_path(message_id, linked_path)
if success and message_type in {str(MessageType.EMOTICON.value), str(MessageType.EMOJI.value)}: if success and message_type in {str(MessageType.EMOTICON.value), str(MessageType.EMOJI.value)}:
@@ -294,16 +403,15 @@ class MessageStorage:
} }
room_id = group_id or "unknown" room_id = group_id or "unknown"
shared_dir = os.path.join(self.image_dir, "_shared")
os.makedirs(shared_dir, exist_ok=True)
extension = self._detect_image_extension(data) extension = self._detect_image_extension(data)
if media_md5: file_name = self._build_shared_media_file_name(
file_name = f"{media_md5}.{extension}" message_type=message_type,
else: message_id=str(message_id),
file_name = f"{message_type}_{message_id}.{extension}" extension=extension,
file_path = os.path.join(shared_dir, file_name) media_md5=media_md5,
web_path = f"/static/images/_shared/{file_name}" data=data,
)
file_path = os.path.join(self.shared_media_dir, file_name)
skipped = False skipped = False
if os.path.isfile(file_path): if os.path.isfile(file_path):
@@ -313,6 +421,10 @@ class MessageStorage:
with open(file_path, "wb") as f: with open(file_path, "wb") as f:
f.write(data) f.write(data)
# 共享目录负责“真实数据只存一份”,群目录负责“按群查看时有可读入口”。
# 这样同一媒体反复出现在不同群里时,不会再复制实体文件。
web_path = self._ensure_group_media_link(room_id, file_name, file_path)
success = self.message_db.update_message_image_file_path(message_id, web_path) success = self.message_db.update_message_image_file_path(message_id, web_path)
if success: if success:
if message_type in {str(MessageType.EMOTICON.value), str(MessageType.EMOJI.value)}: if message_type in {str(MessageType.EMOTICON.value), str(MessageType.EMOJI.value)}: