Files
abot/message_storage/message_to_db.py
2025-03-05 09:01:08 +08:00

113 lines
4.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pymysql
from datetime import datetime, timedelta
import redis
import xml.etree.ElementTree as ET
from message_summary.message_summary_4o import message_summary
import mysql.connector.pooling
class MessageStorage:
def __init__(self, db_pool: mysql.connector.pooling.MySQLConnectionPool, redis_pool: redis.ConnectionPool):
self.redis_pool = redis_pool
self.db_pool = db_pool
# 初始化本地缓存字典,使用 group_id 作为键
self.local_membercounts = {}
self.local_members = {}
def _get_redis_connection(self):
"""从连接池获取 Redis 连接"""
return redis.Redis(connection_pool=self.redis_pool)
def archive_message(self, group_id, timestamp_str, sender, content, message_type, attachment_url=None):
# 连接到数据库
connection = self.db_pool.get_connection()
try:
with connection.cursor() as cursor:
# 插入消息信息
sql = """
INSERT INTO messages (group_id,timestamp, sender, content, message_type, attachment_url)
VALUES (%s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, (group_id, timestamp_str, sender, content, message_type, attachment_url))
# 提交事务
connection.commit()
print(f"Archived{timestamp_str}:{group_id}:{sender}: {content}")
except Exception as e:
print(f"Error archiving message: {e}")
connection.rollback()
finally:
# 关闭连接
connection.close()
def get_messages(self, group_id, all_contacts: dict):
# 连接到数据库
with self.db_pool.get_connection() as connection:
# 获取 redis 中的上次总结时间,本次从上次开始算,若没有,则从 8 小时之前开始计算
key = f"{group_id}:summary_time"
last_summary_time = self._get_redis_connection().get(key)
# 如果 Redis 返回值为字节类型,转换为字符串
if last_summary_time:
last_summary_time = last_summary_time.decode('utf-8')
current_time = datetime.now()
current_date = current_time.strftime('%Y-%m-%d %H:%M:%S')
if not last_summary_time:
# 获取当前时间并计算 8 小时前的时间
eight_hours_ago = current_time - timedelta(hours=8)
last_summary_time = eight_hours_ago.strftime('%Y-%m-%d %H:%M:%S')
else:
# 检查 redis 中的时间与当前时间差是否小于 3 小时
last_summary_time_obj = datetime.strptime(last_summary_time, '%Y-%m-%d %H:%M:%S')
time_diff = current_time - last_summary_time_obj
if time_diff < timedelta(hours=3):
# 如果小于 3 小时,取当天的内容
last_summary_time = current_time.replace(hour=0, minute=0, second=0, microsecond=0).strftime(
'%Y-%m-%d %H:%M:%S')
elif time_diff > timedelta(days=1):
# 如果超过 24 小时,将时间设置为当天 0 点
last_summary_time = current_time.replace(hour=0, minute=0, second=0, microsecond=0).strftime(
'%Y-%m-%d %H:%M:%S')
# 更新 Redis 存储的当前时间
# r.set(key, current_date)
with connection.cursor() as cursor:
# 执行查询,获取最近 8 小时的消息
query = """
SELECT timestamp, sender, content,message_type
FROM messages
WHERE timestamp >= %s AND message_type in(1,49) AND group_id = %s
"""
cursor.execute(query, (last_summary_time, group_id))
# 构建最终的结果字符串
# message_type 需要加入49类型因为49是引用之后的发言。但是49是xml 需要将content进行xml解析
result = []
for row in cursor.fetchall():
timestamp, sender, content, message_type = row
try:
if message_type == "49":
# 解析 XML 字符串
root = ET.fromstring(content)
# 提取 title 内容
content = root.find('.//title').text
except Exception as e:
print(f"message_type 49 error: {e}")
sender_name = all_contacts.get(sender, sender) # 获取发送者的名字,若找不到则使用原 ID
result.append(f"{timestamp},{sender_name},{content}")
result_str = "\n".join(result) # 将结果拼接为最终字符串
print(result_str)
return result_str