尝试进行总结优化
This commit is contained in:
@@ -260,11 +260,11 @@ class MessageStorageDB(BaseDBOperator):
|
||||
|
||||
def get_pending_image_messages(self, minutes_ago: int = 10, limit: int = 50) -> List[Dict]:
|
||||
"""获取最近N分钟内未处理图片的消息(image_path IS NULL)
|
||||
|
||||
|
||||
Args:
|
||||
minutes_ago: 查询最近多少分钟的消息,默认10分钟
|
||||
limit: 每次最多处理多少条,默认50条
|
||||
|
||||
|
||||
Returns:
|
||||
包含消息ID、群ID、消息XML等信息的列表
|
||||
"""
|
||||
@@ -281,3 +281,64 @@ class MessageStorageDB(BaseDBOperator):
|
||||
"""
|
||||
params = (minutes_ago, limit)
|
||||
return self.execute_query(sql, params) or []
|
||||
|
||||
def get_messages_by_date_range(self, group_id: str, start_date: str, end_date: str = None,
|
||||
min_content_length: int = 6, max_results: int = 5000) -> List[Dict]:
|
||||
"""按日期范围获取消息(支持按天总结)
|
||||
|
||||
Args:
|
||||
group_id: 群组ID
|
||||
start_date: 开始日期,格式 YYYY-MM-DD
|
||||
end_date: 结束日期,格式 YYYY-MM-DD,如果为None则使用start_date当天
|
||||
min_content_length: 最小内容长度
|
||||
max_results: 最多返回多少条消息,防止数据过多(默认5000条,足够总结使用)
|
||||
|
||||
Returns:
|
||||
消息列表
|
||||
"""
|
||||
if end_date is None:
|
||||
end_date = start_date
|
||||
|
||||
sql = """
|
||||
SELECT timestamp, sender, content, message_type
|
||||
FROM messages
|
||||
WHERE DATE(timestamp) >= %s
|
||||
AND DATE(timestamp) <= %s
|
||||
AND group_id = %s
|
||||
AND message_type IN (1, 49)
|
||||
AND LENGTH(content) > %s
|
||||
AND CHAR_LENGTH(content) < 300
|
||||
AND content NOT LIKE '/%'
|
||||
ORDER BY timestamp ASC
|
||||
LIMIT %s
|
||||
"""
|
||||
params = (start_date, end_date, group_id, min_content_length, max_results)
|
||||
return self.execute_query(sql, params) or []
|
||||
|
||||
def get_messages_for_summary(self, group_id: str, hours_ago: int = 8,
|
||||
min_messages: int = 50,
|
||||
max_hours: int = 48,
|
||||
max_results: int = 5000) -> List[Dict]:
|
||||
"""智能获取用于总结的消息(自动调整时间范围)
|
||||
|
||||
Args:
|
||||
group_id: 群组ID
|
||||
hours_ago: 默认查询最近多少小时
|
||||
min_messages: 最少需要多少条消息,如果不足会扩大时间范围
|
||||
max_hours: 最大查询多少小时内的消息
|
||||
max_results: 最多返回多少条消息(默认5000条,确保有足够数据)
|
||||
|
||||
Returns:
|
||||
消息列表
|
||||
"""
|
||||
# 先尝试默认时间范围
|
||||
messages = self.get_recent_messages(group_id, hours_ago=hours_ago)
|
||||
|
||||
# 如果消息不足,逐步扩大时间范围
|
||||
current_hours = hours_ago
|
||||
while len(messages) < min_messages and current_hours < max_hours:
|
||||
current_hours += 8 # 每次增加8小时
|
||||
messages = self.get_recent_messages(group_id, hours_ago=current_hours)
|
||||
|
||||
# 限制最大返回数量(5000条足以覆盖1-2天的活跃群聊)
|
||||
return messages[:max_results] if messages else []
|
||||
|
||||
@@ -4,13 +4,20 @@ from datetime import datetime
|
||||
|
||||
def compress_chat_data(chat_data_str, time_threshold=5):
|
||||
"""
|
||||
压缩聊天数据,减少 token 使用,格式为时间,发信人,内容。
|
||||
压缩聊天数据,减少 token 使用。
|
||||
|
||||
支持两种格式:
|
||||
1. 旧格式:时间,发信人,内容(例如:2025-01-06 08:30,张三,大家好)
|
||||
2. 新格式(优化后):
|
||||
【时间】
|
||||
发信人:内容
|
||||
内容(续)
|
||||
|
||||
:param chat_data_str: 原始聊天记录的长字符串
|
||||
:param time_threshold: 同一发信人连续发言间隔小于该值(秒),则合并
|
||||
:param time_threshold: 同一发信人连续发言间隔小于该值(秒),则合并(仅对旧格式有效)
|
||||
:return: 压缩后的聊天数据的长字符串
|
||||
"""
|
||||
# 如果字符串长度超过30000,则去除前面的聊天记录
|
||||
# 如果字符串长度超过40000,则去除前面的聊天记录(保留最新的)
|
||||
if len(chat_data_str) > 40000:
|
||||
lines = chat_data_str.splitlines()
|
||||
total_length = 0
|
||||
@@ -27,6 +34,20 @@ def compress_chat_data(chat_data_str, time_threshold=5):
|
||||
# 只保留后面的聊天记录
|
||||
chat_data_str = '\n'.join(lines[cut_index:])
|
||||
|
||||
# 检测格式类型
|
||||
has_new_format = '【' in chat_data_str and '】' in chat_data_str
|
||||
|
||||
if has_new_format:
|
||||
# 新格式:已经是压缩格式,直接返回
|
||||
# 只需要确保不超过字符限制(上面已经处理)
|
||||
return chat_data_str
|
||||
else:
|
||||
# 旧格式:需要压缩处理
|
||||
return _compress_old_format(chat_data_str, time_threshold)
|
||||
|
||||
|
||||
def _compress_old_format(chat_data_str, time_threshold):
|
||||
"""压缩旧格式的聊天数据(时间,发信人,内容)"""
|
||||
# 解析原始聊天数据为列表
|
||||
chat_data = []
|
||||
for line in chat_data_str.splitlines():
|
||||
|
||||
@@ -472,36 +472,142 @@ class MessageStorage:
|
||||
last_summary_time = (current_time - timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
|
||||
elif time_diff > timedelta(days=1):
|
||||
# 大于 24 小时,取 10 小时前
|
||||
last_summary_time = (current_time - timedelta(hours=10)).strftime('%Y-%m-%d %H:%M:%S')
|
||||
last_summary_time = (current_time - timedelta(hours=24)).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# 更新 Redis 存储的当前时间
|
||||
redis_conn.set(key, current_date)
|
||||
|
||||
# 使用 MessageStorageDB 类获取最近消息
|
||||
hours_ago = int(
|
||||
(current_time - datetime.strptime(last_summary_time, '%Y-%m-%d %H:%M:%S')).total_seconds() / 3600) + 1
|
||||
messages = self.message_db.get_recent_messages(group_id, hours_ago=hours_ago)
|
||||
# 构建最终的结果字符串
|
||||
result = []
|
||||
for msg in messages:
|
||||
timestamp, sender, content, message_type = msg['timestamp'], msg['sender'], msg['content'], msg[
|
||||
'message_type']
|
||||
try:
|
||||
if message_type == 49: # 应用消息类型
|
||||
# 其他类型的应用消息,解析 XML 提取标题
|
||||
root = ET.fromstring(content)
|
||||
title_elem = root.find('.//title')
|
||||
if title_elem is not None:
|
||||
content = title_elem.text
|
||||
except Exception as e:
|
||||
logger.error(f"解析消息类型49出错: {e}")
|
||||
# 使用智能查询方法(自动调整时间范围,确保有足够的消息)
|
||||
messages = self.message_db.get_messages_for_summary(
|
||||
group_id,
|
||||
hours_ago=8, # 默认8小时
|
||||
min_messages=50, # 最少需要50条消息
|
||||
max_hours=48, # 最多查询48小时
|
||||
max_results=5000 # 最多返回5000条(之前是500)
|
||||
)
|
||||
|
||||
sender_name = all_contacts.get(sender, sender) # 获取发送者的名字,若找不到则使用原 ID
|
||||
result.append(f"{timestamp},{sender_name},{content}")
|
||||
# 使用优化后的格式化方法
|
||||
result_str = self._format_messages_optimized(messages, all_contacts)
|
||||
logger.info(f"获取到 {len(messages)} 条消息,格式化后长度: {len(result_str)}")
|
||||
|
||||
result_str = "\n".join(result) # 将结果拼接为最终字符串
|
||||
return result_str
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取消息出错: {e}")
|
||||
return ""
|
||||
|
||||
def _format_messages_optimized(self, messages: list, all_contacts: dict) -> str:
|
||||
"""优化的消息格式化方法,减少冗余
|
||||
|
||||
格式示例:
|
||||
【08:00-09:00】
|
||||
张三:消息1
|
||||
消息2
|
||||
李四:消息3
|
||||
"""
|
||||
if not messages:
|
||||
return ""
|
||||
|
||||
from collections import defaultdict
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
# 按时间分组(每30分钟一组)
|
||||
time_groups = defaultdict(lambda: defaultdict(list))
|
||||
current_hour = None
|
||||
|
||||
for msg in messages:
|
||||
timestamp, sender, content, message_type = msg['timestamp'], msg['sender'], msg['content'], msg['message_type']
|
||||
|
||||
# 处理应用消息
|
||||
try:
|
||||
if message_type == 49:
|
||||
root = ET.fromstring(content)
|
||||
title_elem = root.find('.//title')
|
||||
if title_elem is not None:
|
||||
content = title_elem.text
|
||||
except Exception as e:
|
||||
logger.error(f"解析消息类型49出错: {e}")
|
||||
|
||||
# 解析时间并按30分钟分组
|
||||
try:
|
||||
dt = datetime.strptime(str(timestamp), '%Y-%m-%d %H:%M:%S')
|
||||
time_key = dt.strftime('%Y-%m-%d %H:%M') # 按小时分组
|
||||
|
||||
# 获取发送者名称
|
||||
sender_name = all_contacts.get(sender, sender)
|
||||
|
||||
# 添加到分组
|
||||
time_groups[time_key][sender_name].append(content)
|
||||
except Exception as e:
|
||||
logger.warning(f"解析时间戳失败: {timestamp}, 错误: {e}")
|
||||
continue
|
||||
|
||||
# 构建结果字符串
|
||||
result_lines = []
|
||||
|
||||
for time_key in sorted(time_groups.keys()):
|
||||
# 提取时分部分
|
||||
time_display = time_key.split(' ')[1] # 只显示 HH:MM
|
||||
|
||||
# 获取该时间段的所有发言
|
||||
senders = time_groups[time_key]
|
||||
|
||||
# 添加时间标题
|
||||
result_lines.append(f"\n【{time_display}】")
|
||||
|
||||
# 按发送者组织消息
|
||||
for sender_name, contents in senders.items():
|
||||
# 如果一个人有多条消息,缩进显示
|
||||
for idx, content in enumerate(contents):
|
||||
if idx == 0:
|
||||
# 第一条消息显示发送者名
|
||||
result_lines.append(f"{sender_name}:{content}")
|
||||
else:
|
||||
# 后续消息缩进
|
||||
result_lines.append(f" {content}")
|
||||
|
||||
return "\n".join(result_lines)
|
||||
|
||||
def get_messages_by_date(self, group_id: str, all_contacts: dict, days: int = 1) -> str:
|
||||
"""按天获取消息(用于按天总结)
|
||||
|
||||
Args:
|
||||
group_id: 群组ID
|
||||
all_contacts: 联系人字典
|
||||
days: 获取最近几天的消息,默认1天(昨天+今天)
|
||||
|
||||
Returns:
|
||||
格式化后的消息字符串
|
||||
"""
|
||||
try:
|
||||
current_time = datetime.now()
|
||||
|
||||
# 计算日期范围
|
||||
if days == 1:
|
||||
# 昨天全天 + 今天到目前为止
|
||||
yesterday = (current_time - timedelta(days=1)).strftime('%Y-%m-%d')
|
||||
today = current_time.strftime('%Y-%m-%d')
|
||||
start_date = yesterday
|
||||
end_date = today
|
||||
else:
|
||||
# 获取最近N天
|
||||
start_date = (current_time - timedelta(days=days)).strftime('%Y-%m-%d')
|
||||
end_date = current_time.strftime('%Y-%m-%d')
|
||||
|
||||
# 使用新的按日期查询方法
|
||||
messages = self.message_db.get_messages_by_date_range(
|
||||
group_id,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
max_results=5000 # 增加到5000条
|
||||
)
|
||||
|
||||
# 使用优化后的格式化方法
|
||||
result_str = self._format_messages_optimized(messages, all_contacts)
|
||||
logger.info(f"按天查询获取到 {len(messages)} 条消息({start_date} 至 {end_date})")
|
||||
|
||||
return result_str
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"按天获取消息出错: {e}")
|
||||
return ""
|
||||
|
||||
Reference in New Issue
Block a user