From 491c0d16fbb24048e011f4205dc6ed513ef3fde3 Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 6 Jan 2026 16:02:00 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E8=BF=9B=E8=A1=8C=E6=80=BB?= =?UTF-8?q?=E7=BB=93=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- db/message_storage.py | 65 ++++++++++++++- utils/compress_chat_data.py | 27 +++++- utils/wechat/message_to_db.py | 150 +++++++++++++++++++++++++++++----- 3 files changed, 215 insertions(+), 27 deletions(-) diff --git a/db/message_storage.py b/db/message_storage.py index 88eda0d..80a7e6f 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -260,11 +260,11 @@ class MessageStorageDB(BaseDBOperator): def get_pending_image_messages(self, minutes_ago: int = 10, limit: int = 50) -> List[Dict]: """获取最近N分钟内未处理图片的消息(image_path IS NULL) - + Args: minutes_ago: 查询最近多少分钟的消息,默认10分钟 limit: 每次最多处理多少条,默认50条 - + Returns: 包含消息ID、群ID、消息XML等信息的列表 """ @@ -281,3 +281,64 @@ class MessageStorageDB(BaseDBOperator): """ params = (minutes_ago, limit) return self.execute_query(sql, params) or [] + + def get_messages_by_date_range(self, group_id: str, start_date: str, end_date: str = None, + min_content_length: int = 6, max_results: int = 5000) -> List[Dict]: + """按日期范围获取消息(支持按天总结) + + Args: + group_id: 群组ID + start_date: 开始日期,格式 YYYY-MM-DD + end_date: 结束日期,格式 YYYY-MM-DD,如果为None则使用start_date当天 + min_content_length: 最小内容长度 + max_results: 最多返回多少条消息,防止数据过多(默认5000条,足够总结使用) + + Returns: + 消息列表 + """ + if end_date is None: + end_date = start_date + + sql = """ + SELECT timestamp, sender, content, message_type + FROM messages + WHERE DATE(timestamp) >= %s + AND DATE(timestamp) <= %s + AND group_id = %s + AND message_type IN (1, 49) + AND LENGTH(content) > %s + AND CHAR_LENGTH(content) < 300 + AND content NOT LIKE '/%' + ORDER BY timestamp ASC + LIMIT %s + """ + params = (start_date, end_date, group_id, min_content_length, max_results) + return self.execute_query(sql, params) or [] + + def get_messages_for_summary(self, group_id: str, hours_ago: int = 8, + min_messages: int = 50, + max_hours: int = 48, + max_results: int = 5000) -> List[Dict]: + """智能获取用于总结的消息(自动调整时间范围) + + Args: + group_id: 群组ID + hours_ago: 默认查询最近多少小时 + min_messages: 最少需要多少条消息,如果不足会扩大时间范围 + max_hours: 最大查询多少小时内的消息 + max_results: 最多返回多少条消息(默认5000条,确保有足够数据) + + Returns: + 消息列表 + """ + # 先尝试默认时间范围 + messages = self.get_recent_messages(group_id, hours_ago=hours_ago) + + # 如果消息不足,逐步扩大时间范围 + current_hours = hours_ago + while len(messages) < min_messages and current_hours < max_hours: + current_hours += 8 # 每次增加8小时 + messages = self.get_recent_messages(group_id, hours_ago=current_hours) + + # 限制最大返回数量(5000条足以覆盖1-2天的活跃群聊) + return messages[:max_results] if messages else [] diff --git a/utils/compress_chat_data.py b/utils/compress_chat_data.py index e2567f7..cdd53a1 100644 --- a/utils/compress_chat_data.py +++ b/utils/compress_chat_data.py @@ -4,13 +4,20 @@ from datetime import datetime def compress_chat_data(chat_data_str, time_threshold=5): """ - 压缩聊天数据,减少 token 使用,格式为时间,发信人,内容。 + 压缩聊天数据,减少 token 使用。 + + 支持两种格式: + 1. 旧格式:时间,发信人,内容(例如:2025-01-06 08:30,张三,大家好) + 2. 新格式(优化后): + 【时间】 + 发信人:内容 + 内容(续) :param chat_data_str: 原始聊天记录的长字符串 - :param time_threshold: 同一发信人连续发言间隔小于该值(秒),则合并 + :param time_threshold: 同一发信人连续发言间隔小于该值(秒),则合并(仅对旧格式有效) :return: 压缩后的聊天数据的长字符串 """ - # 如果字符串长度超过30000,则去除前面的聊天记录 + # 如果字符串长度超过40000,则去除前面的聊天记录(保留最新的) if len(chat_data_str) > 40000: lines = chat_data_str.splitlines() total_length = 0 @@ -27,6 +34,20 @@ def compress_chat_data(chat_data_str, time_threshold=5): # 只保留后面的聊天记录 chat_data_str = '\n'.join(lines[cut_index:]) + # 检测格式类型 + has_new_format = '【' in chat_data_str and '】' in chat_data_str + + if has_new_format: + # 新格式:已经是压缩格式,直接返回 + # 只需要确保不超过字符限制(上面已经处理) + return chat_data_str + else: + # 旧格式:需要压缩处理 + return _compress_old_format(chat_data_str, time_threshold) + + +def _compress_old_format(chat_data_str, time_threshold): + """压缩旧格式的聊天数据(时间,发信人,内容)""" # 解析原始聊天数据为列表 chat_data = [] for line in chat_data_str.splitlines(): diff --git a/utils/wechat/message_to_db.py b/utils/wechat/message_to_db.py index 404cbcb..d7e1b21 100644 --- a/utils/wechat/message_to_db.py +++ b/utils/wechat/message_to_db.py @@ -472,36 +472,142 @@ class MessageStorage: last_summary_time = (current_time - timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S') elif time_diff > timedelta(days=1): # 大于 24 小时,取 10 小时前 - last_summary_time = (current_time - timedelta(hours=10)).strftime('%Y-%m-%d %H:%M:%S') + last_summary_time = (current_time - timedelta(hours=24)).strftime('%Y-%m-%d %H:%M:%S') # 更新 Redis 存储的当前时间 redis_conn.set(key, current_date) - # 使用 MessageStorageDB 类获取最近消息 - hours_ago = int( - (current_time - datetime.strptime(last_summary_time, '%Y-%m-%d %H:%M:%S')).total_seconds() / 3600) + 1 - messages = self.message_db.get_recent_messages(group_id, hours_ago=hours_ago) - # 构建最终的结果字符串 - result = [] - for msg in messages: - timestamp, sender, content, message_type = msg['timestamp'], msg['sender'], msg['content'], msg[ - 'message_type'] - try: - if message_type == 49: # 应用消息类型 - # 其他类型的应用消息,解析 XML 提取标题 - root = ET.fromstring(content) - title_elem = root.find('.//title') - if title_elem is not None: - content = title_elem.text - except Exception as e: - logger.error(f"解析消息类型49出错: {e}") + # 使用智能查询方法(自动调整时间范围,确保有足够的消息) + messages = self.message_db.get_messages_for_summary( + group_id, + hours_ago=8, # 默认8小时 + min_messages=50, # 最少需要50条消息 + max_hours=48, # 最多查询48小时 + max_results=5000 # 最多返回5000条(之前是500) + ) - sender_name = all_contacts.get(sender, sender) # 获取发送者的名字,若找不到则使用原 ID - result.append(f"{timestamp},{sender_name},{content}") + # 使用优化后的格式化方法 + result_str = self._format_messages_optimized(messages, all_contacts) + logger.info(f"获取到 {len(messages)} 条消息,格式化后长度: {len(result_str)}") - result_str = "\n".join(result) # 将结果拼接为最终字符串 return result_str except Exception as e: logger.error(f"获取消息出错: {e}") return "" + + def _format_messages_optimized(self, messages: list, all_contacts: dict) -> str: + """优化的消息格式化方法,减少冗余 + + 格式示例: + 【08:00-09:00】 + 张三:消息1 + 消息2 + 李四:消息3 + """ + if not messages: + return "" + + from collections import defaultdict + import xml.etree.ElementTree as ET + + # 按时间分组(每30分钟一组) + time_groups = defaultdict(lambda: defaultdict(list)) + current_hour = None + + for msg in messages: + timestamp, sender, content, message_type = msg['timestamp'], msg['sender'], msg['content'], msg['message_type'] + + # 处理应用消息 + try: + if message_type == 49: + root = ET.fromstring(content) + title_elem = root.find('.//title') + if title_elem is not None: + content = title_elem.text + except Exception as e: + logger.error(f"解析消息类型49出错: {e}") + + # 解析时间并按30分钟分组 + try: + dt = datetime.strptime(str(timestamp), '%Y-%m-%d %H:%M:%S') + time_key = dt.strftime('%Y-%m-%d %H:%M') # 按小时分组 + + # 获取发送者名称 + sender_name = all_contacts.get(sender, sender) + + # 添加到分组 + time_groups[time_key][sender_name].append(content) + except Exception as e: + logger.warning(f"解析时间戳失败: {timestamp}, 错误: {e}") + continue + + # 构建结果字符串 + result_lines = [] + + for time_key in sorted(time_groups.keys()): + # 提取时分部分 + time_display = time_key.split(' ')[1] # 只显示 HH:MM + + # 获取该时间段的所有发言 + senders = time_groups[time_key] + + # 添加时间标题 + result_lines.append(f"\n【{time_display}】") + + # 按发送者组织消息 + for sender_name, contents in senders.items(): + # 如果一个人有多条消息,缩进显示 + for idx, content in enumerate(contents): + if idx == 0: + # 第一条消息显示发送者名 + result_lines.append(f"{sender_name}:{content}") + else: + # 后续消息缩进 + result_lines.append(f" {content}") + + return "\n".join(result_lines) + + def get_messages_by_date(self, group_id: str, all_contacts: dict, days: int = 1) -> str: + """按天获取消息(用于按天总结) + + Args: + group_id: 群组ID + all_contacts: 联系人字典 + days: 获取最近几天的消息,默认1天(昨天+今天) + + Returns: + 格式化后的消息字符串 + """ + try: + current_time = datetime.now() + + # 计算日期范围 + if days == 1: + # 昨天全天 + 今天到目前为止 + yesterday = (current_time - timedelta(days=1)).strftime('%Y-%m-%d') + today = current_time.strftime('%Y-%m-%d') + start_date = yesterday + end_date = today + else: + # 获取最近N天 + start_date = (current_time - timedelta(days=days)).strftime('%Y-%m-%d') + end_date = current_time.strftime('%Y-%m-%d') + + # 使用新的按日期查询方法 + messages = self.message_db.get_messages_by_date_range( + group_id, + start_date=start_date, + end_date=end_date, + max_results=5000 # 增加到5000条 + ) + + # 使用优化后的格式化方法 + result_str = self._format_messages_optimized(messages, all_contacts) + logger.info(f"按天查询获取到 {len(messages)} 条消息({start_date} 至 {end_date})") + + return result_str + + except Exception as e: + logger.error(f"按天获取消息出错: {e}") + return ""