Files
abot/utils/compress_chat_data.py
2026-01-06 16:02:00 +08:00

113 lines
4.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import re
from datetime import datetime
def compress_chat_data(chat_data_str, time_threshold=5):
"""
压缩聊天数据,减少 token 使用。
支持两种格式:
1. 旧格式:时间,发信人,内容例如2025-01-06 08:30,张三,大家好)
2. 新格式(优化后):
【时间】
发信人:内容
内容(续)
:param chat_data_str: 原始聊天记录的长字符串
:param time_threshold: 同一发信人连续发言间隔小于该值(秒),则合并(仅对旧格式有效)
:return: 压缩后的聊天数据的长字符串
"""
# 如果字符串长度超过40000则去除前面的聊天记录保留最新的
if len(chat_data_str) > 40000:
lines = chat_data_str.splitlines()
total_length = 0
cut_index = 0
# 从后往前计算,找到保留哪些行
for i in range(len(lines) - 1, -1, -1):
line_length = len(lines[i]) + 1 # +1 是为了计入换行符
total_length += line_length
if total_length > 40000:
cut_index = i + 1 # 保留这个索引之后的行
break
# 只保留后面的聊天记录
chat_data_str = '\n'.join(lines[cut_index:])
# 检测格式类型
has_new_format = '' in chat_data_str and '' in chat_data_str
if has_new_format:
# 新格式:已经是压缩格式,直接返回
# 只需要确保不超过字符限制(上面已经处理)
return chat_data_str
else:
# 旧格式:需要压缩处理
return _compress_old_format(chat_data_str, time_threshold)
def _compress_old_format(chat_data_str, time_threshold):
"""压缩旧格式的聊天数据(时间,发信人,内容)"""
# 解析原始聊天数据为列表
chat_data = []
for line in chat_data_str.splitlines():
# 跳过空行
if not line.strip():
continue
# 分割每一行确保有3部分时间, 发信人, 内容)
parts = line.split(',', 2)
if len(parts) < 3:
continue # 如果没有完整的三部分,跳过该行
timestamp, sender, content = parts
chat_data.append((timestamp, sender, content))
# 其余代码保持不变
if not chat_data:
return "" # 如果没有有效数据,返回空字符串
compressed_data = []
current_date = None
prev_time_obj = None
prev_sender = None
prev_content = None
for timestamp, sender, content in chat_data:
try:
time_obj = datetime.strptime(timestamp.strip(), "%Y-%m-%d %H:%M:%S")
# 去除无意义的空格
content = re.sub(r"\s+", " ", content).strip()
if not content:
continue
# 检查日期是否变化(不包含年份)
date_str = time_obj.strftime('%m-%d')
# 如果是第一条消息或日期变化,添加日期标记
if current_date is None or date_str != current_date:
current_date = date_str
compressed_data.append(f"{date_str}")
# 检查是否需要合并消息
if (prev_sender == sender and prev_time_obj is not None and
(time_obj - prev_time_obj).total_seconds() <= time_threshold):
# 合并消息,更新最后一条消息
compressed_data[-1] = f"{prev_time_obj.strftime('%H:%M:%S')},{sender},{prev_content} {content}"
else:
# 添加新消息
compressed_data.append(f"{time_obj.strftime('%H:%M:%S')},{sender},{content}")
# 更新前一条消息的信息
prev_time_obj = time_obj
prev_sender = sender
prev_content = content
except ValueError as e:
# 处理日期格式错误
print(f"日期格式错误: {timestamp}, 错误: {e}")
continue
# 返回压缩后的数据长字符串
return '\n'.join(compressed_data)