113 lines
4.0 KiB
Python
113 lines
4.0 KiB
Python
import re
|
||
from datetime import datetime
|
||
|
||
|
||
def compress_chat_data(chat_data_str, time_threshold=5):
|
||
"""
|
||
压缩聊天数据,减少 token 使用。
|
||
|
||
支持两种格式:
|
||
1. 旧格式:时间,发信人,内容(例如:2025-01-06 08:30,张三,大家好)
|
||
2. 新格式(优化后):
|
||
【时间】
|
||
发信人:内容
|
||
内容(续)
|
||
|
||
:param chat_data_str: 原始聊天记录的长字符串
|
||
:param time_threshold: 同一发信人连续发言间隔小于该值(秒),则合并(仅对旧格式有效)
|
||
:return: 压缩后的聊天数据的长字符串
|
||
"""
|
||
# 如果字符串长度超过40000,则去除前面的聊天记录(保留最新的)
|
||
if len(chat_data_str) > 40000:
|
||
lines = chat_data_str.splitlines()
|
||
total_length = 0
|
||
cut_index = 0
|
||
|
||
# 从后往前计算,找到保留哪些行
|
||
for i in range(len(lines) - 1, -1, -1):
|
||
line_length = len(lines[i]) + 1 # +1 是为了计入换行符
|
||
total_length += line_length
|
||
if total_length > 40000:
|
||
cut_index = i + 1 # 保留这个索引之后的行
|
||
break
|
||
|
||
# 只保留后面的聊天记录
|
||
chat_data_str = '\n'.join(lines[cut_index:])
|
||
|
||
# 检测格式类型
|
||
has_new_format = '【' in chat_data_str and '】' in chat_data_str
|
||
|
||
if has_new_format:
|
||
# 新格式:已经是压缩格式,直接返回
|
||
# 只需要确保不超过字符限制(上面已经处理)
|
||
return chat_data_str
|
||
else:
|
||
# 旧格式:需要压缩处理
|
||
return _compress_old_format(chat_data_str, time_threshold)
|
||
|
||
|
||
def _compress_old_format(chat_data_str, time_threshold):
|
||
"""压缩旧格式的聊天数据(时间,发信人,内容)"""
|
||
# 解析原始聊天数据为列表
|
||
chat_data = []
|
||
for line in chat_data_str.splitlines():
|
||
# 跳过空行
|
||
if not line.strip():
|
||
continue
|
||
|
||
# 分割每一行,确保有3部分(时间, 发信人, 内容)
|
||
parts = line.split(',', 2)
|
||
if len(parts) < 3:
|
||
continue # 如果没有完整的三部分,跳过该行
|
||
|
||
timestamp, sender, content = parts
|
||
chat_data.append((timestamp, sender, content))
|
||
|
||
# 其余代码保持不变
|
||
if not chat_data:
|
||
return "" # 如果没有有效数据,返回空字符串
|
||
|
||
compressed_data = []
|
||
current_date = None
|
||
prev_time_obj = None
|
||
prev_sender = None
|
||
prev_content = None
|
||
|
||
for timestamp, sender, content in chat_data:
|
||
try:
|
||
time_obj = datetime.strptime(timestamp.strip(), "%Y-%m-%d %H:%M:%S")
|
||
|
||
# 去除无意义的空格
|
||
content = re.sub(r"\s+", " ", content).strip()
|
||
if not content:
|
||
continue
|
||
|
||
# 检查日期是否变化(不包含年份)
|
||
date_str = time_obj.strftime('%m-%d')
|
||
|
||
# 如果是第一条消息或日期变化,添加日期标记
|
||
if current_date is None or date_str != current_date:
|
||
current_date = date_str
|
||
compressed_data.append(f"{date_str}")
|
||
|
||
# 检查是否需要合并消息
|
||
if (prev_sender == sender and prev_time_obj is not None and
|
||
(time_obj - prev_time_obj).total_seconds() <= time_threshold):
|
||
# 合并消息,更新最后一条消息
|
||
compressed_data[-1] = f"{prev_time_obj.strftime('%H:%M:%S')},{sender},{prev_content} {content}"
|
||
else:
|
||
# 添加新消息
|
||
compressed_data.append(f"{time_obj.strftime('%H:%M:%S')},{sender},{content}")
|
||
|
||
# 更新前一条消息的信息
|
||
prev_time_obj = time_obj
|
||
prev_sender = sender
|
||
prev_content = content
|
||
|
||
except ValueError as e:
|
||
# 处理日期格式错误
|
||
print(f"日期格式错误: {timestamp}, 错误: {e}")
|
||
continue
|
||
|
||
# 返回压缩后的数据长字符串
|
||
return '\n'.join(compressed_data) |