加入图片下载逻辑。

This commit is contained in:
liuwei
2025-04-01 16:43:54 +08:00
parent 57947bde07
commit c95b5b8a9d
4 changed files with 229 additions and 45 deletions

View File

@@ -2,8 +2,11 @@ from datetime import datetime, timedelta
import xml.etree.ElementTree as ET
import logging
import concurrent.futures # 添加线程池支持
import os
import hashlib
import shutil
from wcferry import WxMsg
from wcferry import WxMsg, Wcf
from db.connection import DBConnectionManager
from db.message_storage import MessageStorageDB
@@ -20,7 +23,7 @@ logger = logging.getLogger("MessageStorage")
class MessageStorage:
def __init__(self):
def __init__(self, wcf: Wcf = None):
# 获取数据库连接管理器的单例
self.db_manager = DBConnectionManager.get_instance()
self.message_db = MessageStorageDB(self.db_manager)
@@ -32,6 +35,18 @@ class MessageStorage:
# 用于跟踪异步任务的列表
self.pending_tasks = []
# 保存WCF实例用于图片处理
self.wcf = wcf
# 图片处理相关初始化
self.image_executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) # 专用于图片处理的线程池
self.image_tasks = []
self.image_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "static", "images")
# 确保图片存储目录存在
if not os.path.exists(self.image_dir):
os.makedirs(self.image_dir, exist_ok=True)
logger.info(f"图片存储目录: {self.image_dir}")
def process_message(self, message: WxMsg):
# 示例message字符串
current_date = datetime.now().strftime('%Y-%m-%d')
@@ -55,7 +70,7 @@ class MessageStorage:
self.pending_tasks.append(future)
# 清理已完成的任务
self._cleanup_completed_tasks()
def _archive_message_task(self, msg: WxMsg):
"""实际执行消息存档的任务函数"""
try:
@@ -65,7 +80,8 @@ class MessageStorage:
'success': result,
'roomid': msg.roomid,
'sender': msg.sender,
'content': msg.content # 添加消息内容
'content': msg.content, # 添加消息内容
'message_id': msg.id # 添加消息ID
}
except Exception as e:
logger.error(f"存档消息出错: {e}")
@@ -74,9 +90,133 @@ class MessageStorage:
'roomid': msg.roomid,
'sender': msg.sender,
'content': msg.content, # 添加消息内容
'message_id': msg.id, # 添加消息ID
'error': str(e)
}
def process_image(self, msg: WxMsg):
"""异步处理图片消息,与消息存档分离"""
if msg.type != 3 or not self.wcf: # 不是图片消息或没有WCF实例
return False
# 提交任务到图片处理线程池
future = self.image_executor.submit(self._process_image_task, msg)
# 添加回调函数
future.add_done_callback(self._process_image_callback)
# 将任务添加到待处理列表
self.image_tasks.append(future)
# 清理已完成的任务
self._cleanup_completed_tasks()
return True
def _process_image_task(self, msg: WxMsg):
"""实际执行图片处理的任务函数"""
try:
# 从msg.extra中获取本地图片路径
local_image_path = msg.extra
if not local_image_path or not os.path.exists(local_image_path):
return {
'success': False,
'message_id': msg.id,
'roomid': msg.roomid,
'sender': msg.sender,
'error': "图片本地路径不存在"
}
# 生成目标文件名
filename = self._generate_image_filename(local_image_path)
# 构建完整的目标文件路径
target_path = os.path.join(self.image_dir, filename)
# 使用绝对路径而不是相对路径
relative_path = target_path
# 检查目标文件是否已存在
if os.path.exists(target_path):
logger.info(f"图片已存在,跳过复制: {msg.id} -> {target_path}")
# 更新数据库中的图片路径
self.message_db.update_message_image_path(msg.id, relative_path)
return {
'success': True,
'message_id': msg.id,
'roomid': msg.roomid,
'sender': msg.sender,
'file_path': target_path,
'original_path': local_image_path,
'skipped': True
}
# 复制图片到静态目录
shutil.copy2(local_image_path, target_path)
logger.info(f"图片处理成功: {msg.id} -> {target_path}")
# 更新数据库中的图片路径
self.message_db.update_message_image_path(msg.id, relative_path)
return {
'success': True,
'message_id': msg.id,
'roomid': msg.roomid,
'sender': msg.sender,
'file_path': target_path,
'original_path': local_image_path
}
except Exception as e:
logger.error(f"图片处理出错: {msg.id}, 错误: {e}")
return {
'success': False,
'message_id': msg.id,
'roomid': msg.roomid,
'sender': msg.sender,
'error': str(e)
}
def _generate_image_filename(self, original_path):
"""
使用图片内容的哈希值生成唯一的文件名
Args:
original_path: 原始图片路径
Returns:
生成的文件名
"""
try:
# 读取图片内容
with open(original_path, 'rb') as f:
image_content = f.read()
# 使用图片内容的哈希值生成唯一文件名
hash_obj = hashlib.md5(image_content)
file_ext = os.path.splitext(original_path)[-1] if '.' in original_path else '.jpg'
if not file_ext or len(file_ext) > 5:
file_ext = '.jpg' # 默认使用jpg扩展名
return f"{hash_obj.hexdigest()}{file_ext}"
except Exception as e:
# 如果读取图片内容失败,回退到使用路径生成哈希值
logger.warning(f"读取图片内容失败,使用路径生成哈希值: {e}")
hash_obj = hashlib.md5(original_path.encode())
file_ext = os.path.splitext(original_path)[-1] if '.' in original_path else '.jpg'
if not file_ext or len(file_ext) > 5:
file_ext = '.jpg' # 默认使用jpg扩展名
return f"{hash_obj.hexdigest()}{file_ext}"
def _process_image_callback(self, future):
"""处理异步图片处理任务完成后的回调"""
try:
result = future.result()
if result['success']:
skipped_info = " (已存在)" if result.get('skipped') else ""
logger.info(f"图片处理成功{skipped_info}: {result['roomid']}:{result['sender']}:{result['message_id']}")
else:
error_msg = result.get('error', '未知错误')
logger.error(
f"图片处理失败: {result.get('roomid', '')}:{result.get('sender', '')}:{result.get('message_id', '')} - {error_msg}")
except Exception as e:
logger.error(f"处理图片回调时出错: {e}")
def _archive_callback(self, future):
"""处理异步存档任务完成后的回调"""
try:
@@ -89,18 +229,31 @@ class MessageStorage:
logger.error(f"消息存档失败: {result['roomid']}:{result['sender']} - {error_msg}")
except Exception as e:
logger.error(f"处理存档回调时出错: {e}")
def _cleanup_completed_tasks(self):
"""清理已完成的任务,防止内存泄漏"""
# 过滤出已完成的任务
completed_tasks = [task for task in self.pending_tasks if task.done()]
# 从待处理列表中移除已完成的任务
for task in completed_tasks:
self.pending_tasks.remove(task)
# 只有当任务数量超过阈值时才进行清理,减少频繁操作
if len(self.pending_tasks) > 20:
# 过滤出已完成的任务
completed_tasks = [task for task in self.pending_tasks if task.done()]
# 从待处理列表中移除已完成的任务
for task in completed_tasks:
self.pending_tasks.remove(task)
# 如果待处理任务过多,记录警告日志
if len(self.pending_tasks) > 100:
logger.warning(f"待处理的存档任务数量过多: {len(self.pending_tasks)}")
# 如果待处理任务过多,记录警告日志
if len(self.pending_tasks) > 100:
logger.warning(f"待处理的存档任务数量过多: {len(self.pending_tasks)}")
# 只有当图片任务数量超过阈值时才进行清理
if len(self.image_tasks) > 10:
# 清理已完成的图片处理任务
completed_image_tasks = [task for task in self.image_tasks if task.done()]
for task in completed_image_tasks:
self.image_tasks.remove(task)
# 如果待处理任务过多,记录警告日志
if len(self.image_tasks) > 50:
logger.warning(f"待处理的图片处理任务数量过多: {len(self.image_tasks)}")
def write_to_db(self):
"""从Redis读取发言统计数据并写入数据库"""
@@ -145,13 +298,13 @@ class MessageStorage:
# 格式化输出字符串添加emoji和美化格式
ranking_str = f"🏆 {yesterday} 发言排行榜 🏆\n"
# 为不同名次添加不同的奖杯和样式
for rank, result in enumerate(results, start=1):
username = result['wx_id']
speech_count = result['speech_count']
display_name = allContacts.get(username, username)
# 根据排名添加不同的emoji
if rank == 1:
ranking_str += f"🥇🐲 {rank}.{display_name}: {speech_count}次 🔥\n"
@@ -163,7 +316,7 @@ class MessageStorage:
ranking_str += f"🌟 {rank}.{display_name}: {speech_count}\n"
else:
ranking_str += f"👍 {rank}.{display_name}: {speech_count}\n"
logging.info(f"成功生成 {yesterday} 的群聊 {groupId} 发言排名")
return ranking_str
@@ -214,7 +367,8 @@ class MessageStorage:
# 构建最终的结果字符串
result = []
for msg in messages:
timestamp, sender, content, message_type = msg['timestamp'], msg['sender'], msg['content'], msg['message_type']
timestamp, sender, content, message_type = msg['timestamp'], msg['sender'], msg['content'], msg[
'message_type']
try:
if message_type == 49: # 应用消息类型
# 检查是否为引用消息