diff --git a/db/message_storage.py b/db/message_storage.py index ee683cd..fe115a2 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -11,10 +11,10 @@ from db.connection import DBConnectionManager class MessageStorageDB(BaseDBOperator): """消息存储相关数据库操作""" - + def __init__(self, db_manager: DBConnectionManager): super().__init__(db_manager) - + def archive_message(self, msg: WxMsg) -> bool: """存档消息""" now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -25,7 +25,7 @@ class MessageStorageDB(BaseDBOperator): params = (msg.roomid, now_time, msg.sender, msg.content, msg.type, msg.extra, msg.id, msg.xml, msg.thumb) result = self.execute_update(sql, params) return result - + def get_recent_messages(self, group_id: str, hours_ago: int = 8, min_content_length: int = 6) -> List[Dict]: """获取最近的消息""" sql = """ @@ -40,7 +40,7 @@ class MessageStorageDB(BaseDBOperator): """ params = (hours_ago, group_id, min_content_length) return self.execute_query(sql, params) or [] - + def get_message_count_by_date(self, date: str) -> List[Dict]: """获取指定日期的消息统计""" sql = """ @@ -50,8 +50,8 @@ class MessageStorageDB(BaseDBOperator): GROUP BY group_id, sender """ return self.execute_query(sql, (date,)) or [] - - + + def get_speech_ranking(self, date: str, group_id: str, limit: int = 20) -> List[Dict]: """获取指定日期和群组的发言排名""" sql = """ @@ -66,7 +66,7 @@ class MessageStorageDB(BaseDBOperator): params = (date, group_id, limit) results = self.execute_query(sql, params) return results or [] - + def insert_speech_count(self, group_id: str, wx_id: str, date: str, count: int) -> bool: """插入发言统计数据 @@ -86,8 +86,8 @@ class MessageStorageDB(BaseDBOperator): """ params = (group_id, wx_id, date, count) return self.execute_update(sql, params) - - + + def get_message_trend(self, group_id: str, days: int = 7) -> List[Dict]: """获取指定群组的消息趋势数据 @@ -109,8 +109,8 @@ class MessageStorageDB(BaseDBOperator): ORDER BY date """ return self.execute_query(sql, (group_id, days)) or [] - - def get_messages_by_filter(self, group_id=None, start_date=None, end_date=None, + + def get_messages_by_filter(self, group_id=None, start_date=None, end_date=None, search_text=None, page=1, page_size=20) -> Dict: """按条件筛选消息并支持分页和模糊搜索 @@ -133,50 +133,78 @@ class MessageStorageDB(BaseDBOperator): FROM messages WHERE 1=1 """ - + # 构建参数列表 params = [] - + # 添加筛选条件 if group_id: sql_count += " AND group_id = %s " sql_data += " AND group_id = %s " params.append(group_id) - + if start_date: sql_count += " AND DATE(timestamp) >= %s " sql_data += " AND DATE(timestamp) >= %s " params.append(start_date) - + if end_date: sql_count += " AND DATE(timestamp) <= %s " sql_data += " AND DATE(timestamp) <= %s " params.append(end_date) - + if search_text: sql_count += " AND content LIKE %s " sql_data += " AND content LIKE %s " params.append(f"%{search_text}%") - + # 添加排序和分页 sql_data += " ORDER BY timestamp DESC " sql_data += " LIMIT %s OFFSET %s " - + # 计算分页参数 offset = (page - 1) * page_size data_params = params.copy() data_params.extend([page_size, offset]) - + # 执行查询 count_result = self.execute_query(sql_count, params) total = count_result[0]['total'] if count_result else 0 - + messages = self.execute_query(sql_data, data_params) or [] - + return { 'total': total, 'page': page, 'page_size': page_size, 'total_pages': (total + page_size - 1) // page_size, 'messages': messages - } \ No newline at end of file + } + + def update_message_image_path(self, message_id, image_path): + """ + 更新消息的图片路径 + + Args: + message_id: 消息ID + image_path: 图片路径 + + Returns: + bool: 更新成功返回True,否则返回False + """ + try: + # 构建SQL语句 + sql = """ + UPDATE messages + SET image_path = %s + WHERE message_id = %s + """ + params = (image_path, message_id) + + # 执行更新操作 + result = self.execute_update(sql, params) + return result + except Exception as e: + # 使用已有的日志记录方式 + print(f"更新消息图片路径出错: {e}") + return False \ No newline at end of file diff --git a/message_storage/message_to_db.py b/message_storage/message_to_db.py index f21a634..df3220e 100644 --- a/message_storage/message_to_db.py +++ b/message_storage/message_to_db.py @@ -2,8 +2,11 @@ from datetime import datetime, timedelta import xml.etree.ElementTree as ET import logging import concurrent.futures # 添加线程池支持 +import os +import hashlib +import shutil -from wcferry import WxMsg +from wcferry import WxMsg, Wcf from db.connection import DBConnectionManager from db.message_storage import MessageStorageDB @@ -20,7 +23,7 @@ logger = logging.getLogger("MessageStorage") class MessageStorage: - def __init__(self): + def __init__(self, wcf: Wcf = None): # 获取数据库连接管理器的单例 self.db_manager = DBConnectionManager.get_instance() self.message_db = MessageStorageDB(self.db_manager) @@ -32,6 +35,18 @@ class MessageStorage: # 用于跟踪异步任务的列表 self.pending_tasks = [] + # 保存WCF实例,用于图片处理 + self.wcf = wcf + + # 图片处理相关初始化 + self.image_executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) # 专用于图片处理的线程池 + self.image_tasks = [] + self.image_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "static", "images") + # 确保图片存储目录存在 + if not os.path.exists(self.image_dir): + os.makedirs(self.image_dir, exist_ok=True) + logger.info(f"图片存储目录: {self.image_dir}") + def process_message(self, message: WxMsg): # 示例message字符串 current_date = datetime.now().strftime('%Y-%m-%d') @@ -55,7 +70,7 @@ class MessageStorage: self.pending_tasks.append(future) # 清理已完成的任务 self._cleanup_completed_tasks() - + def _archive_message_task(self, msg: WxMsg): """实际执行消息存档的任务函数""" try: @@ -65,7 +80,8 @@ class MessageStorage: 'success': result, 'roomid': msg.roomid, 'sender': msg.sender, - 'content': msg.content # 添加消息内容 + 'content': msg.content, # 添加消息内容 + 'message_id': msg.id # 添加消息ID } except Exception as e: logger.error(f"存档消息出错: {e}") @@ -74,9 +90,133 @@ class MessageStorage: 'roomid': msg.roomid, 'sender': msg.sender, 'content': msg.content, # 添加消息内容 + 'message_id': msg.id, # 添加消息ID 'error': str(e) } - + + def process_image(self, msg: WxMsg): + """异步处理图片消息,与消息存档分离""" + if msg.type != 3 or not self.wcf: # 不是图片消息或没有WCF实例 + return False + + # 提交任务到图片处理线程池 + future = self.image_executor.submit(self._process_image_task, msg) + # 添加回调函数 + future.add_done_callback(self._process_image_callback) + # 将任务添加到待处理列表 + self.image_tasks.append(future) + # 清理已完成的任务 + self._cleanup_completed_tasks() + return True + + def _process_image_task(self, msg: WxMsg): + """实际执行图片处理的任务函数""" + try: + # 从msg.extra中获取本地图片路径 + local_image_path = msg.extra + + if not local_image_path or not os.path.exists(local_image_path): + return { + 'success': False, + 'message_id': msg.id, + 'roomid': msg.roomid, + 'sender': msg.sender, + 'error': "图片本地路径不存在" + } + + # 生成目标文件名 + filename = self._generate_image_filename(local_image_path) + + # 构建完整的目标文件路径 + target_path = os.path.join(self.image_dir, filename) + # 使用绝对路径而不是相对路径 + relative_path = target_path + + # 检查目标文件是否已存在 + if os.path.exists(target_path): + logger.info(f"图片已存在,跳过复制: {msg.id} -> {target_path}") + # 更新数据库中的图片路径 + self.message_db.update_message_image_path(msg.id, relative_path) + return { + 'success': True, + 'message_id': msg.id, + 'roomid': msg.roomid, + 'sender': msg.sender, + 'file_path': target_path, + 'original_path': local_image_path, + 'skipped': True + } + + # 复制图片到静态目录 + shutil.copy2(local_image_path, target_path) + + logger.info(f"图片处理成功: {msg.id} -> {target_path}") + + # 更新数据库中的图片路径 + self.message_db.update_message_image_path(msg.id, relative_path) + + return { + 'success': True, + 'message_id': msg.id, + 'roomid': msg.roomid, + 'sender': msg.sender, + 'file_path': target_path, + 'original_path': local_image_path + } + except Exception as e: + logger.error(f"图片处理出错: {msg.id}, 错误: {e}") + return { + 'success': False, + 'message_id': msg.id, + 'roomid': msg.roomid, + 'sender': msg.sender, + 'error': str(e) + } + + def _generate_image_filename(self, original_path): + """ + 使用图片内容的哈希值生成唯一的文件名 + + Args: + original_path: 原始图片路径 + + Returns: + 生成的文件名 + """ + try: + # 读取图片内容 + with open(original_path, 'rb') as f: + image_content = f.read() + + # 使用图片内容的哈希值生成唯一文件名 + hash_obj = hashlib.md5(image_content) + file_ext = os.path.splitext(original_path)[-1] if '.' in original_path else '.jpg' + if not file_ext or len(file_ext) > 5: + file_ext = '.jpg' # 默认使用jpg扩展名 + return f"{hash_obj.hexdigest()}{file_ext}" + except Exception as e: + # 如果读取图片内容失败,回退到使用路径生成哈希值 + logger.warning(f"读取图片内容失败,使用路径生成哈希值: {e}") + hash_obj = hashlib.md5(original_path.encode()) + file_ext = os.path.splitext(original_path)[-1] if '.' in original_path else '.jpg' + if not file_ext or len(file_ext) > 5: + file_ext = '.jpg' # 默认使用jpg扩展名 + return f"{hash_obj.hexdigest()}{file_ext}" + + def _process_image_callback(self, future): + """处理异步图片处理任务完成后的回调""" + try: + result = future.result() + if result['success']: + skipped_info = " (已存在)" if result.get('skipped') else "" + logger.info(f"图片处理成功{skipped_info}: {result['roomid']}:{result['sender']}:{result['message_id']}") + else: + error_msg = result.get('error', '未知错误') + logger.error( + f"图片处理失败: {result.get('roomid', '')}:{result.get('sender', '')}:{result.get('message_id', '')} - {error_msg}") + except Exception as e: + logger.error(f"处理图片回调时出错: {e}") + def _archive_callback(self, future): """处理异步存档任务完成后的回调""" try: @@ -89,18 +229,31 @@ class MessageStorage: logger.error(f"消息存档失败: {result['roomid']}:{result['sender']} - {error_msg}") except Exception as e: logger.error(f"处理存档回调时出错: {e}") - + def _cleanup_completed_tasks(self): """清理已完成的任务,防止内存泄漏""" - # 过滤出已完成的任务 - completed_tasks = [task for task in self.pending_tasks if task.done()] - # 从待处理列表中移除已完成的任务 - for task in completed_tasks: - self.pending_tasks.remove(task) + # 只有当任务数量超过阈值时才进行清理,减少频繁操作 + if len(self.pending_tasks) > 20: + # 过滤出已完成的任务 + completed_tasks = [task for task in self.pending_tasks if task.done()] + # 从待处理列表中移除已完成的任务 + for task in completed_tasks: + self.pending_tasks.remove(task) + + # 如果待处理任务过多,记录警告日志 + if len(self.pending_tasks) > 100: + logger.warning(f"待处理的存档任务数量过多: {len(self.pending_tasks)}") - # 如果待处理任务过多,记录警告日志 - if len(self.pending_tasks) > 100: - logger.warning(f"待处理的存档任务数量过多: {len(self.pending_tasks)}") + # 只有当图片任务数量超过阈值时才进行清理 + if len(self.image_tasks) > 10: + # 清理已完成的图片处理任务 + completed_image_tasks = [task for task in self.image_tasks if task.done()] + for task in completed_image_tasks: + self.image_tasks.remove(task) + + # 如果待处理任务过多,记录警告日志 + if len(self.image_tasks) > 50: + logger.warning(f"待处理的图片处理任务数量过多: {len(self.image_tasks)}") def write_to_db(self): """从Redis读取发言统计数据并写入数据库""" @@ -145,13 +298,13 @@ class MessageStorage: # 格式化输出字符串,添加emoji和美化格式 ranking_str = f"🏆 {yesterday} 发言排行榜 🏆\n" - + # 为不同名次添加不同的奖杯和样式 for rank, result in enumerate(results, start=1): username = result['wx_id'] speech_count = result['speech_count'] display_name = allContacts.get(username, username) - + # 根据排名添加不同的emoji if rank == 1: ranking_str += f"🥇🐲 {rank}.{display_name}: {speech_count}次 🔥\n" @@ -163,7 +316,7 @@ class MessageStorage: ranking_str += f"🌟 {rank}.{display_name}: {speech_count}次\n" else: ranking_str += f"👍 {rank}.{display_name}: {speech_count}次\n" - + logging.info(f"成功生成 {yesterday} 的群聊 {groupId} 发言排名") return ranking_str @@ -214,7 +367,8 @@ class MessageStorage: # 构建最终的结果字符串 result = [] for msg in messages: - timestamp, sender, content, message_type = msg['timestamp'], msg['sender'], msg['content'], msg['message_type'] + timestamp, sender, content, message_type = msg['timestamp'], msg['sender'], msg['content'], msg[ + 'message_type'] try: if message_type == 49: # 应用消息类型 # 检查是否为引用消息 diff --git a/robot.py b/robot.py index 308cf1c..28791ab 100644 --- a/robot.py +++ b/robot.py @@ -59,7 +59,7 @@ class Robot(Job): self.contact_manager = ContactManager.get_instance() self.allContacts = self.get_all_contacts() self.contact_manager.set_contacts(self.allContacts) - + self.LOG.info(f"DB+REDIS 连接池开始初始化") # 使用单例模式获取实例 self.db_manager = DBConnectionManager.get_instance( @@ -105,7 +105,7 @@ class Robot(Job): self.LOG.info("插件系统初始化完成") # 消息存档模块初始化,自动完成入库动作 - self.message_storage = MessageStorage() + self.message_storage = MessageStorage(self.wcf) if ChatType.is_in_chat_types(chat_type): if chat_type == ChatType.TIGER_BOT.value and TigerBot.value_check(self.config.TIGERBOT): self.chat = TigerBot(self.config.TIGERBOT) @@ -238,7 +238,7 @@ class Robot(Job): GroupBotManager.set_group_permission(msg.roomid, Feature.ROBOT, PermissionStatus.ENABLED) except Exception as e: self.LOG.error(f"加入新群,自动添加并开启机器人功能 error: {e}") - + # 发布消息接收事件 self.event_system.publish(EventType.MESSAGE_RECEIVED, {"message": msg}) @@ -258,6 +258,9 @@ class Robot(Job): # 聊天记录入库动作: try: self.message_storage.archive_message(msg) + # 单独处理图片消息 + if msg.type == 3: # 图片消息类型 + self.message_storage.process_image(msg) except Exception as e: self.LOG.error(f"archive_message error: {e}") @@ -413,7 +416,7 @@ class Robot(Job): self.allContacts[msg.sender] = nickName[0] self.contact_manager.update_contact(msg.sender, nickName[0]) self.send_text_msg(f"Hi {nickName[0]},我自动通过了你的好友请求。", msg.sender) - + def send_group_txt_message(self, msg: str, feature: Feature): try: receivers = self.gbm.get_group_list() diff --git a/utils/message_formatter.py b/utils/message_formatter.py index fb75224..40c846b 100644 --- a/utils/message_formatter.py +++ b/utils/message_formatter.py @@ -16,7 +16,6 @@ def format_quote_message(xml_content): try: xml_content = xml_content.replace('<', '<').replace('>', '>') - print(xml_content) # 使用正则表达式直接提取关键信息,避免XML解析问题 title_match = re.search(r'(.*?)', xml_content) main_content = title_match.group(1) if title_match else "[无标题]"