feat: dedupe and schedule emoji media downloads

This commit is contained in:
liuwei
2026-04-13 12:06:58 +08:00
parent 9698f9577f
commit ada1b656e0
4 changed files with 172 additions and 85 deletions

View File

@@ -118,6 +118,9 @@ def _normalize_recent_message(server, raw_message: dict, chat_type: str, target_
if message_type == "3":
display_type = "image"
display_content = content or "[图片]"
elif message_type in {"47", "1048625", "1090519089"}:
display_type = "image" if media_url else "text"
display_content = content or "[表情]"
elif message_type == "34":
display_type = "voice"
display_content = content or "[语音]"

View File

@@ -88,6 +88,25 @@ class MessageStorageDB(BaseDBOperator):
"""
return self.execute_query(sql, (f'%md5="{md5}"%',), fetch_one=True)
def get_media_message_by_md5(self, md5: str, current_message_id: int | str | None = None) -> Optional[Dict]:
"""根据 md5 查找已落盘的图片/表情消息,用于去重复用本地文件"""
sql = """
SELECT id, group_id, timestamp, sender, content, message_type,
attachment_url, message_id, message_xml, message_thumb, image_path
FROM messages
WHERE attachment_url IS NOT NULL
AND attachment_url <> ''
AND attachment_url LIKE %s
AND image_path IS NOT NULL
AND image_path <> ''
"""
params: List = [f'%md5="{md5}"%']
if current_message_id is not None:
sql += " AND message_id <> %s"
params.append(current_message_id)
sql += " ORDER BY id DESC LIMIT 1"
return self.execute_query(sql, tuple(params), fetch_one=True)
def get_member_recent_messages(self, group_id: str, wxid: str, days: int = 30,
limit: int = 200, include_today: bool = True) -> List[Dict]:
"""获取指定群成员近期消息"""
@@ -513,8 +532,8 @@ class MessageStorageDB(BaseDBOperator):
return self.execute_query(sql, tuple(params)) or []
def get_pending_image_messages(self, minutes_ago: int = 10, limit: int = 50) -> List[Dict]:
"""获取最近N分钟内未处理图片消息image_path IS NULL
def get_pending_media_messages(self, minutes_ago: int = 10, limit: int = 50) -> List[Dict]:
"""获取最近N分钟内未处理图片/表情消息image_path IS NULL
Args:
minutes_ago: 查询最近多少分钟的消息默认10分钟
@@ -524,19 +543,23 @@ class MessageStorageDB(BaseDBOperator):
包含消息ID、群ID、消息XML等信息的列表
"""
sql = """
SELECT message_id, group_id, message_xml, timestamp,attachment_url
SELECT message_id, group_id, sender, message_type, message_xml, timestamp, attachment_url
FROM messages
WHERE message_type = '3'
WHERE message_type IN ('3', '47', '1048625', '1090519089')
AND image_path IS NULL
AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE)
AND message_xml IS NOT NULL
AND message_xml != ''
AND attachment_url IS NOT NULL
AND attachment_url != ''
ORDER BY timestamp ASC
LIMIT %s
"""
params = (minutes_ago, limit)
return self.execute_query(sql, params) or []
def get_pending_image_messages(self, minutes_ago: int = 10, limit: int = 50) -> List[Dict]:
"""兼容旧方法名,内部复用统一媒体待处理查询"""
return self.get_pending_media_messages(minutes_ago, limit)
def get_messages_by_date_range(self, group_id: str, start_date: str, end_date: str = None,
min_content_length: int = 6, max_results: int = 5000) -> List[Dict]:
"""按日期范围获取消息(支持按天总结)

View File

@@ -144,7 +144,7 @@ def jobs(robot: Robot):
await manager.update_image_cache()
logger.info("图片缓存更新完成")
# ✅ 每2分钟处理一次待下载的图片消息(串行处理,避免数据库锁竞争)
# ✅ 每5分钟处理一次待下载的图片/表情消息(串行处理,避免数据库锁竞争)
@async_job.every_minutes(5)
async def process_pending_images_job():
if hasattr(robot, 'message_storage') and robot.message_storage:

View File

@@ -1,15 +1,17 @@
import asyncio
import time
import html
from datetime import datetime, timedelta
import xml.etree.ElementTree as ET
import concurrent.futures # 添加线程池支持
import os
import base64
import imghdr
import aiohttp
import re
from threading import Lock
from typing import Dict
from typing import Dict, Optional
from db.connection import DBConnectionManager
from db.contacts_db import ContactsDBOperator
@@ -60,6 +62,12 @@ class MessageStorage:
# 正则(替代 XML 解析)
self._aeskey_re = re.compile(r'aeskey="(.*?)"')
self._cdn_re = re.compile(r'cdnthumburl="(.*?)"')
self._cdn_mid_re = re.compile(r'cdnmidimgurl="(.*?)"')
self._cdn_big_re = re.compile(r'cdnbigimgurl="(.*?)"')
self._emoji_cdn_re = re.compile(r'cdnurl="(.*?)"')
self._emoji_encrypt_re = re.compile(r'encrypturl="(.*?)"')
self._emoji_extern_re = re.compile(r'externurl="(.*?)"')
self._md5_re = re.compile(r'md5="(.*?)"')
# 修改为项目根目录下的 static/images
self.image_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static", "images")
@@ -68,6 +76,75 @@ class MessageStorage:
os.makedirs(self.image_dir, exist_ok=True)
logger.debug(f"图片存储目录: {self.image_dir}")
def _extract_media_info(self, xml_content: str, message_type: str) -> Dict[str, str]:
md5_match = self._md5_re.search(xml_content or "")
aeskey_match = self._aeskey_re.search(xml_content or "")
urls = []
if str(message_type) == str(MessageType.IMAGE.value):
for pattern in (self._cdn_mid_re, self._cdn_big_re, self._cdn_re):
match = pattern.search(xml_content or "")
if match:
urls.append(html.unescape(match.group(1)))
else:
for pattern in (self._emoji_cdn_re, self._emoji_encrypt_re, self._emoji_extern_re):
match = pattern.search(xml_content or "")
if match:
urls.append(html.unescape(match.group(1)))
return {
"md5": md5_match.group(1) if md5_match else "",
"aeskey": aeskey_match.group(1) if aeskey_match else "",
"primary_url": urls[0] if urls else "",
"all_urls": urls
}
def _detect_image_extension(self, data: bytes) -> str:
kind = imghdr.what(None, h=data)
if kind == "jpeg":
return "jpg"
if kind:
return kind
if data.startswith(b"RIFF") and b"WEBP" in data[:16]:
return "webp"
return "bin"
async def _download_direct_binary(self, url: str) -> bytes:
headers = {
"User-Agent": "Mozilla/5.0",
"Referer": "http://weixin.qq.com/"
}
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
response.raise_for_status()
return await response.read()
async def _download_media_bytes(self, message_type: str, media_info: Dict[str, str]) -> bytes:
message_type = str(message_type)
primary_url = media_info.get("primary_url", "")
aeskey = media_info.get("aeskey", "")
if message_type == str(MessageType.IMAGE.value) and self.client and aeskey and primary_url:
try:
base64_str = await self.client.download_image(aeskey=aeskey, cdnmidimgurl=primary_url)
if base64_str:
return base64.b64decode(base64_str)
except Exception as e:
logger.warning(f"客户端下载图片失败,准备降级直连: {e}")
for url in media_info.get("all_urls", []):
if not url:
continue
try:
data = await self._download_direct_binary(url)
if data:
return data
except Exception as e:
logger.warning(f"直连下载媒体失败 url={url[:80]}... err={e}")
raise RuntimeError("未能下载媒体内容")
def process_message(self, message: WxMessage):
# 示例message字符串
current_date = datetime.now().strftime('%Y-%m-%d')
@@ -140,8 +217,10 @@ class MessageStorage:
message_id = db_record.get('message_id')
group_id = db_record.get('group_id', 'unknown')
xml_content = db_record.get('attachment_url', '')
message_type = str(db_record.get('message_type', ''))
sender = db_record.get('sender', '')
if not self.client or not message_id or not xml_content:
if not message_id or not xml_content:
return {
'success': False,
'message_id': message_id,
@@ -149,94 +228,80 @@ class MessageStorage:
}
try:
# ===== 1. 正则提取参数(替代 XML=====
aeskey_match = self._aeskey_re.search(xml_content)
cdn_match = self._cdn_re.search(xml_content)
if not aeskey_match or not cdn_match:
media_info = self._extract_media_info(xml_content, message_type)
if not media_info.get("primary_url"):
return {
'success': False,
'message_id': message_id,
'error': "XML 中未找到图片参数"
'error': "XML 中未找到媒体下载参数"
}
aeskey = aeskey_match.group(1)
cdnthumburl = cdn_match.group(1)
media_md5 = media_info.get("md5", "")
if media_md5:
existing = self.message_db.get_media_message_by_md5(media_md5, current_message_id=message_id)
if existing and existing.get("image_path"):
linked_path = existing.get("image_path")
success = self.message_db.update_message_image_file_path(message_id, linked_path)
return {
"success": bool(success),
"message_id": message_id,
"roomid": group_id or "unknown",
"sender": sender,
"file_path": linked_path,
"skipped": bool(success),
"linked": True
}
# ===== 2. 下载图片(异步方式,直接 await=====
try:
base64_str = await self.client.download_image(
aeskey=aeskey,
cdnmidimgurl=cdnthumburl
)
except Exception as e:
logger.error(f"图片下载失败 message_id={message_id}: {e}")
data = await self._download_media_bytes(message_type, media_info)
if not data:
return {
'success': False,
'message_id': message_id,
'error': f"图片下载失败: {str(e)}"
'error': "媒体下载失败:返回为空"
}
if not base64_str:
return {
'success': False,
'message_id': message_id,
'error': "图片下载失败:返回为空"
}
# ===== 3. base64 解码 =====
try:
data = base64.b64decode(base64_str)
except Exception as e:
logger.error(f"图片解码失败 message_id={message_id}: {e}")
return {
'success': False,
'message_id': message_id,
'error': f"图片解码失败: {str(e)}"
}
# ===== 4. 构建路径 =====
room_id = group_id or "unknown"
group_dir = os.path.join(self.image_dir, room_id)
os.makedirs(group_dir, exist_ok=True)
shared_dir = os.path.join(self.image_dir, "_shared")
os.makedirs(shared_dir, exist_ok=True)
# 微信图片默认 jpg
file_name = f"{message_id}.jpg"
file_path = os.path.join(group_dir, file_name)
extension = self._detect_image_extension(data)
if media_md5:
file_name = f"{media_md5}.{extension}"
else:
file_name = f"{message_type}_{message_id}.{extension}"
file_path = os.path.join(shared_dir, file_name)
web_path = f"/static/images/_shared/{file_name}"
# ===== 5. 写文件 =====
skipped = False
if os.path.isfile(file_path):
skipped = True
logger.debug(f"图片文件已存在,跳过保存: {room_id}-{file_name}")
logger.debug(f"媒体文件已存在,跳过保存: {file_name}")
else:
with open(file_path, "wb") as f:
f.write(data)
# ===== 6. 更新数据库(串行更新,避免锁竞争)=====
if not skipped:
web_path = f"/static/images/{room_id}/{file_name}"
success = self.message_db.update_message_image_file_path(message_id, web_path)
if success:
logger.debug(f"图片处理成功: message_id={message_id}, path={web_path}")
else:
logger.warning(f"图片路径更新失败: message_id={message_id}")
return {
'success': False,
'message_id': message_id,
'error': "数据库更新失败"
}
success = self.message_db.update_message_image_file_path(message_id, web_path)
if success:
logger.debug(f"媒体处理成功: message_id={message_id}, path={web_path}")
else:
logger.warning(f"媒体路径更新失败: message_id={message_id}")
return {
'success': False,
'message_id': message_id,
'error': "数据库更新失败"
}
return {
"success": True,
"message_id": message_id,
"roomid": room_id,
"file_path": f"/static/images/{room_id}/{file_name}" if not skipped else None,
"skipped": skipped
"sender": sender,
"file_path": web_path,
"skipped": skipped,
"linked": False
}
except Exception as e:
logger.exception(f"处理图片出错 message_id={message_id}")
logger.exception(f"处理媒体出错 message_id={message_id}")
return {
'success': False,
'message_id': message_id,
@@ -244,25 +309,21 @@ class MessageStorage:
}
async def process_pending_images(self, minutes_ago: int = 10, batch_size: int = 20):
"""定时任务:批量处理未下载的图片消息(串行处理,避免锁竞争)
"""定时任务:批量处理未下载的图片/表情消息(串行处理,避免锁竞争)
Args:
minutes_ago: 处理最近多少分钟的消息默认10分钟
batch_size: 每次处理多少条默认20条
"""
if not self.client:
logger.warning("微信客户端未初始化,跳过图片处理")
return
try:
# 查询未处理的图片消息
pending_messages = self.message_db.get_pending_image_messages(minutes_ago, batch_size)
# 查询未处理的图片/表情消息
pending_messages = self.message_db.get_pending_media_messages(minutes_ago, batch_size)
if not pending_messages:
logger.debug(f"未发现待处理的图片消息(最近{minutes_ago}分钟)")
logger.debug(f"未发现待处理的媒体消息(最近{minutes_ago}分钟)")
return
logger.info(f"开始处理 {len(pending_messages)} 条待处理图片消息")
logger.info(f"开始处理 {len(pending_messages)} 条待处理媒体消息")
success_count = 0
fail_count = 0
@@ -274,12 +335,12 @@ class MessageStorage:
else:
fail_count += 1
error = result.get('error', '未知错误')
logger.warning(f"图片处理失败 message_id={result.get('message_id')}: {error}")
logger.warning(f"媒体处理失败 message_id={result.get('message_id')}: {error}")
logger.info(f"图片处理完成: 成功={success_count}, 失败={fail_count}, 总计={len(pending_messages)}")
logger.info(f"媒体处理完成: 成功={success_count}, 失败={fail_count}, 总计={len(pending_messages)}")
except Exception as e:
logger.exception(f"定时处理图片任务出错: {e}")
logger.exception(f"定时处理媒体任务出错: {e}")
def _process_image_done(self, future):
"""任务完成统一回调(极轻量)"""
@@ -295,14 +356,14 @@ class MessageStorage:
def _process_image_callback(self, result):
if result['success']:
skipped_info = " (已存在)" if result.get('skipped') else ""
skipped_info = " (复用链接)" if result.get('linked') else (" (已存在)" if result.get('skipped') else "")
logger.info(
f"图片处理成功{skipped_info}: "
f"媒体处理成功{skipped_info}: "
f"{result['roomid']}:{result['sender']}:{result['message_id']}"
)
else:
logger.error(
f"图片处理失败: "
f"媒体处理失败: "
f"{result.get('roomid', '')}:"
f"{result.get('sender', '')}:"
f"{result.get('message_id', '')} - "