From ada1b656e0790460503c47342b5e6593be4b7914 Mon Sep 17 00:00:00 2001 From: liuwei Date: Mon, 13 Apr 2026 12:06:58 +0800 Subject: [PATCH] feat: dedupe and schedule emoji media downloads --- admin/dashboard/blueprints/contacts.py | 3 + db/message_storage.py | 35 +++- main.py | 2 +- utils/wechat/message_to_db.py | 217 ++++++++++++++++--------- 4 files changed, 172 insertions(+), 85 deletions(-) diff --git a/admin/dashboard/blueprints/contacts.py b/admin/dashboard/blueprints/contacts.py index ea67ad2..a353780 100644 --- a/admin/dashboard/blueprints/contacts.py +++ b/admin/dashboard/blueprints/contacts.py @@ -118,6 +118,9 @@ def _normalize_recent_message(server, raw_message: dict, chat_type: str, target_ if message_type == "3": display_type = "image" display_content = content or "[图片]" + elif message_type in {"47", "1048625", "1090519089"}: + display_type = "image" if media_url else "text" + display_content = content or "[表情]" elif message_type == "34": display_type = "voice" display_content = content or "[语音]" diff --git a/db/message_storage.py b/db/message_storage.py index 911d8b1..c71aa13 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -88,6 +88,25 @@ class MessageStorageDB(BaseDBOperator): """ return self.execute_query(sql, (f'%md5="{md5}"%',), fetch_one=True) + def get_media_message_by_md5(self, md5: str, current_message_id: int | str | None = None) -> Optional[Dict]: + """根据 md5 查找已落盘的图片/表情消息,用于去重复用本地文件""" + sql = """ + SELECT id, group_id, timestamp, sender, content, message_type, + attachment_url, message_id, message_xml, message_thumb, image_path + FROM messages + WHERE attachment_url IS NOT NULL + AND attachment_url <> '' + AND attachment_url LIKE %s + AND image_path IS NOT NULL + AND image_path <> '' + """ + params: List = [f'%md5="{md5}"%'] + if current_message_id is not None: + sql += " AND message_id <> %s" + params.append(current_message_id) + sql += " ORDER BY id DESC LIMIT 1" + return self.execute_query(sql, tuple(params), fetch_one=True) + def get_member_recent_messages(self, group_id: str, wxid: str, days: int = 30, limit: int = 200, include_today: bool = True) -> List[Dict]: """获取指定群成员近期消息""" @@ -513,8 +532,8 @@ class MessageStorageDB(BaseDBOperator): return self.execute_query(sql, tuple(params)) or [] - def get_pending_image_messages(self, minutes_ago: int = 10, limit: int = 50) -> List[Dict]: - """获取最近N分钟内未处理图片的消息(image_path IS NULL) + def get_pending_media_messages(self, minutes_ago: int = 10, limit: int = 50) -> List[Dict]: + """获取最近N分钟内未处理图片/表情消息(image_path IS NULL) Args: minutes_ago: 查询最近多少分钟的消息,默认10分钟 @@ -524,19 +543,23 @@ class MessageStorageDB(BaseDBOperator): 包含消息ID、群ID、消息XML等信息的列表 """ sql = """ - SELECT message_id, group_id, message_xml, timestamp,attachment_url + SELECT message_id, group_id, sender, message_type, message_xml, timestamp, attachment_url FROM messages - WHERE message_type = '3' + WHERE message_type IN ('3', '47', '1048625', '1090519089') AND image_path IS NULL AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE) - AND message_xml IS NOT NULL - AND message_xml != '' + AND attachment_url IS NOT NULL + AND attachment_url != '' ORDER BY timestamp ASC LIMIT %s """ params = (minutes_ago, limit) return self.execute_query(sql, params) or [] + def get_pending_image_messages(self, minutes_ago: int = 10, limit: int = 50) -> List[Dict]: + """兼容旧方法名,内部复用统一媒体待处理查询""" + return self.get_pending_media_messages(minutes_ago, limit) + def get_messages_by_date_range(self, group_id: str, start_date: str, end_date: str = None, min_content_length: int = 6, max_results: int = 5000) -> List[Dict]: """按日期范围获取消息(支持按天总结) diff --git a/main.py b/main.py index a3a89bf..f4298a8 100644 --- a/main.py +++ b/main.py @@ -144,7 +144,7 @@ def jobs(robot: Robot): await manager.update_image_cache() logger.info("图片缓存更新完成") - # ✅ 每2分钟处理一次待下载的图片消息(串行处理,避免数据库锁竞争) + # ✅ 每5分钟处理一次待下载的图片/表情消息(串行处理,避免数据库锁竞争) @async_job.every_minutes(5) async def process_pending_images_job(): if hasattr(robot, 'message_storage') and robot.message_storage: diff --git a/utils/wechat/message_to_db.py b/utils/wechat/message_to_db.py index e5045fb..a5c5ee8 100644 --- a/utils/wechat/message_to_db.py +++ b/utils/wechat/message_to_db.py @@ -1,15 +1,17 @@ import asyncio import time +import html from datetime import datetime, timedelta import xml.etree.ElementTree as ET import concurrent.futures # 添加线程池支持 import os import base64 import imghdr +import aiohttp import re from threading import Lock -from typing import Dict +from typing import Dict, Optional from db.connection import DBConnectionManager from db.contacts_db import ContactsDBOperator @@ -60,6 +62,12 @@ class MessageStorage: # 正则(替代 XML 解析) self._aeskey_re = re.compile(r'aeskey="(.*?)"') self._cdn_re = re.compile(r'cdnthumburl="(.*?)"') + self._cdn_mid_re = re.compile(r'cdnmidimgurl="(.*?)"') + self._cdn_big_re = re.compile(r'cdnbigimgurl="(.*?)"') + self._emoji_cdn_re = re.compile(r'cdnurl="(.*?)"') + self._emoji_encrypt_re = re.compile(r'encrypturl="(.*?)"') + self._emoji_extern_re = re.compile(r'externurl="(.*?)"') + self._md5_re = re.compile(r'md5="(.*?)"') # 修改为项目根目录下的 static/images self.image_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static", "images") @@ -68,6 +76,75 @@ class MessageStorage: os.makedirs(self.image_dir, exist_ok=True) logger.debug(f"图片存储目录: {self.image_dir}") + def _extract_media_info(self, xml_content: str, message_type: str) -> Dict[str, str]: + md5_match = self._md5_re.search(xml_content or "") + aeskey_match = self._aeskey_re.search(xml_content or "") + + urls = [] + if str(message_type) == str(MessageType.IMAGE.value): + for pattern in (self._cdn_mid_re, self._cdn_big_re, self._cdn_re): + match = pattern.search(xml_content or "") + if match: + urls.append(html.unescape(match.group(1))) + else: + for pattern in (self._emoji_cdn_re, self._emoji_encrypt_re, self._emoji_extern_re): + match = pattern.search(xml_content or "") + if match: + urls.append(html.unescape(match.group(1))) + + return { + "md5": md5_match.group(1) if md5_match else "", + "aeskey": aeskey_match.group(1) if aeskey_match else "", + "primary_url": urls[0] if urls else "", + "all_urls": urls + } + + def _detect_image_extension(self, data: bytes) -> str: + kind = imghdr.what(None, h=data) + if kind == "jpeg": + return "jpg" + if kind: + return kind + + if data.startswith(b"RIFF") and b"WEBP" in data[:16]: + return "webp" + return "bin" + + async def _download_direct_binary(self, url: str) -> bytes: + headers = { + "User-Agent": "Mozilla/5.0", + "Referer": "http://weixin.qq.com/" + } + async with aiohttp.ClientSession(headers=headers) as session: + async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response: + response.raise_for_status() + return await response.read() + + async def _download_media_bytes(self, message_type: str, media_info: Dict[str, str]) -> bytes: + message_type = str(message_type) + primary_url = media_info.get("primary_url", "") + aeskey = media_info.get("aeskey", "") + + if message_type == str(MessageType.IMAGE.value) and self.client and aeskey and primary_url: + try: + base64_str = await self.client.download_image(aeskey=aeskey, cdnmidimgurl=primary_url) + if base64_str: + return base64.b64decode(base64_str) + except Exception as e: + logger.warning(f"客户端下载图片失败,准备降级直连: {e}") + + for url in media_info.get("all_urls", []): + if not url: + continue + try: + data = await self._download_direct_binary(url) + if data: + return data + except Exception as e: + logger.warning(f"直连下载媒体失败 url={url[:80]}... err={e}") + + raise RuntimeError("未能下载媒体内容") + def process_message(self, message: WxMessage): # 示例message字符串 current_date = datetime.now().strftime('%Y-%m-%d') @@ -140,8 +217,10 @@ class MessageStorage: message_id = db_record.get('message_id') group_id = db_record.get('group_id', 'unknown') xml_content = db_record.get('attachment_url', '') + message_type = str(db_record.get('message_type', '')) + sender = db_record.get('sender', '') - if not self.client or not message_id or not xml_content: + if not message_id or not xml_content: return { 'success': False, 'message_id': message_id, @@ -149,94 +228,80 @@ class MessageStorage: } try: - # ===== 1. 正则提取参数(替代 XML)===== - aeskey_match = self._aeskey_re.search(xml_content) - cdn_match = self._cdn_re.search(xml_content) - - if not aeskey_match or not cdn_match: + media_info = self._extract_media_info(xml_content, message_type) + if not media_info.get("primary_url"): return { 'success': False, 'message_id': message_id, - 'error': "XML 中未找到图片参数" + 'error': "XML 中未找到媒体下载参数" } - - aeskey = aeskey_match.group(1) - cdnthumburl = cdn_match.group(1) + media_md5 = media_info.get("md5", "") + if media_md5: + existing = self.message_db.get_media_message_by_md5(media_md5, current_message_id=message_id) + if existing and existing.get("image_path"): + linked_path = existing.get("image_path") + success = self.message_db.update_message_image_file_path(message_id, linked_path) + return { + "success": bool(success), + "message_id": message_id, + "roomid": group_id or "unknown", + "sender": sender, + "file_path": linked_path, + "skipped": bool(success), + "linked": True + } - # ===== 2. 下载图片(异步方式,直接 await)===== - try: - base64_str = await self.client.download_image( - aeskey=aeskey, - cdnmidimgurl=cdnthumburl - ) - except Exception as e: - logger.error(f"图片下载失败 message_id={message_id}: {e}") + data = await self._download_media_bytes(message_type, media_info) + if not data: return { 'success': False, 'message_id': message_id, - 'error': f"图片下载失败: {str(e)}" + 'error': "媒体下载失败:返回为空" } - if not base64_str: - return { - 'success': False, - 'message_id': message_id, - 'error': "图片下载失败:返回为空" - } - - # ===== 3. base64 解码 ===== - try: - data = base64.b64decode(base64_str) - except Exception as e: - logger.error(f"图片解码失败 message_id={message_id}: {e}") - return { - 'success': False, - 'message_id': message_id, - 'error': f"图片解码失败: {str(e)}" - } - - # ===== 4. 构建路径 ===== room_id = group_id or "unknown" - group_dir = os.path.join(self.image_dir, room_id) - os.makedirs(group_dir, exist_ok=True) + shared_dir = os.path.join(self.image_dir, "_shared") + os.makedirs(shared_dir, exist_ok=True) - # 微信图片默认 jpg - file_name = f"{message_id}.jpg" - file_path = os.path.join(group_dir, file_name) + extension = self._detect_image_extension(data) + if media_md5: + file_name = f"{media_md5}.{extension}" + else: + file_name = f"{message_type}_{message_id}.{extension}" + file_path = os.path.join(shared_dir, file_name) + web_path = f"/static/images/_shared/{file_name}" - # ===== 5. 写文件 ===== skipped = False if os.path.isfile(file_path): skipped = True - logger.debug(f"图片文件已存在,跳过保存: {room_id}-{file_name}") + logger.debug(f"媒体文件已存在,跳过保存: {file_name}") else: with open(file_path, "wb") as f: f.write(data) - - # ===== 6. 更新数据库(串行更新,避免锁竞争)===== - if not skipped: - web_path = f"/static/images/{room_id}/{file_name}" - success = self.message_db.update_message_image_file_path(message_id, web_path) - if success: - logger.debug(f"图片处理成功: message_id={message_id}, path={web_path}") - else: - logger.warning(f"图片路径更新失败: message_id={message_id}") - return { - 'success': False, - 'message_id': message_id, - 'error': "数据库更新失败" - } + + success = self.message_db.update_message_image_file_path(message_id, web_path) + if success: + logger.debug(f"媒体处理成功: message_id={message_id}, path={web_path}") + else: + logger.warning(f"媒体路径更新失败: message_id={message_id}") + return { + 'success': False, + 'message_id': message_id, + 'error': "数据库更新失败" + } return { "success": True, "message_id": message_id, "roomid": room_id, - "file_path": f"/static/images/{room_id}/{file_name}" if not skipped else None, - "skipped": skipped + "sender": sender, + "file_path": web_path, + "skipped": skipped, + "linked": False } except Exception as e: - logger.exception(f"处理图片出错 message_id={message_id}") + logger.exception(f"处理媒体出错 message_id={message_id}") return { 'success': False, 'message_id': message_id, @@ -244,25 +309,21 @@ class MessageStorage: } async def process_pending_images(self, minutes_ago: int = 10, batch_size: int = 20): - """定时任务:批量处理未下载的图片消息(串行处理,避免锁竞争) + """定时任务:批量处理未下载的图片/表情消息(串行处理,避免锁竞争) Args: minutes_ago: 处理最近多少分钟的消息,默认10分钟 batch_size: 每次处理多少条,默认20条 """ - if not self.client: - logger.warning("微信客户端未初始化,跳过图片处理") - return - try: - # 查询未处理的图片消息 - pending_messages = self.message_db.get_pending_image_messages(minutes_ago, batch_size) + # 查询未处理的图片/表情消息 + pending_messages = self.message_db.get_pending_media_messages(minutes_ago, batch_size) if not pending_messages: - logger.debug(f"未发现待处理的图片消息(最近{minutes_ago}分钟)") + logger.debug(f"未发现待处理的媒体消息(最近{minutes_ago}分钟)") return - logger.info(f"开始处理 {len(pending_messages)} 条待处理图片消息") + logger.info(f"开始处理 {len(pending_messages)} 条待处理媒体消息") success_count = 0 fail_count = 0 @@ -274,12 +335,12 @@ class MessageStorage: else: fail_count += 1 error = result.get('error', '未知错误') - logger.warning(f"图片处理失败 message_id={result.get('message_id')}: {error}") + logger.warning(f"媒体处理失败 message_id={result.get('message_id')}: {error}") - logger.info(f"图片处理完成: 成功={success_count}, 失败={fail_count}, 总计={len(pending_messages)}") + logger.info(f"媒体处理完成: 成功={success_count}, 失败={fail_count}, 总计={len(pending_messages)}") except Exception as e: - logger.exception(f"定时处理图片任务出错: {e}") + logger.exception(f"定时处理媒体任务出错: {e}") def _process_image_done(self, future): """任务完成统一回调(极轻量)""" @@ -295,14 +356,14 @@ class MessageStorage: def _process_image_callback(self, result): if result['success']: - skipped_info = " (已存在)" if result.get('skipped') else "" + skipped_info = " (复用链接)" if result.get('linked') else (" (已存在)" if result.get('skipped') else "") logger.info( - f"图片处理成功{skipped_info}: " + f"媒体处理成功{skipped_info}: " f"{result['roomid']}:{result['sender']}:{result['message_id']}" ) else: logger.error( - f"图片处理失败: " + f"媒体处理失败: " f"{result.get('roomid', '')}:" f"{result.get('sender', '')}:" f"{result.get('message_id', '')} - "