116 lines
4.9 KiB
Python
116 lines
4.9 KiB
Python
import pymysql
|
||
from datetime import datetime, timedelta
|
||
import redis
|
||
import xml.etree.ElementTree as ET
|
||
|
||
from wcferry import WxMsg
|
||
|
||
from message_summary.message_summary_4o import message_summary
|
||
|
||
import mysql.connector.pooling
|
||
|
||
|
||
class MessageStorage:
|
||
|
||
def __init__(self, db_pool: mysql.connector.pooling.MySQLConnectionPool, redis_pool: redis.ConnectionPool):
|
||
self.redis_pool = redis_pool
|
||
self.db_pool = db_pool
|
||
# 初始化本地缓存字典,使用 group_id 作为键
|
||
self.local_membercounts = {}
|
||
self.local_members = {}
|
||
|
||
def _get_redis_connection(self):
|
||
"""从连接池获取 Redis 连接"""
|
||
return redis.Redis(connection_pool=self.redis_pool, decode_responses=True)
|
||
|
||
def archive_message(self, msg: WxMsg):
|
||
# 连接到数据库
|
||
connection = self.db_pool.get_connection()
|
||
|
||
try:
|
||
now_time = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
|
||
with connection.cursor() as cursor:
|
||
# 插入消息信息
|
||
sql = """
|
||
INSERT INTO messages (group_id,timestamp, sender, content, message_type, attachment_url,message_id,message_xml,message_thumb)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
"""
|
||
cursor.execute(sql, (
|
||
msg.roomid, now_time, msg.sender, msg.content, msg.type, msg.extra, msg.id, msg.xml, msg.thumb))
|
||
|
||
# 提交事务
|
||
connection.commit()
|
||
print(f"Archived:{now_time}:{msg.roomid}:{msg.sender}: {msg.content}")
|
||
|
||
except Exception as e:
|
||
print(f"Error archiving message: {e}")
|
||
connection.rollback()
|
||
|
||
finally:
|
||
# 关闭连接
|
||
connection.close()
|
||
|
||
def get_messages(self, group_id, all_contacts: dict):
|
||
# 连接到数据库
|
||
with self.db_pool.get_connection() as connection:
|
||
# 获取 redis 中的上次总结时间,本次从上次开始算,若没有,则从 8 小时之前开始计算
|
||
key = f"{group_id}:summary_time"
|
||
last_summary_time = self._get_redis_connection().get(key)
|
||
print(f"last_summary_time:{last_summary_time}")
|
||
# 如果 Redis 返回值为字节类型,转换为字符串
|
||
|
||
current_time = datetime.now()
|
||
current_date = current_time.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
if not last_summary_time:
|
||
# 获取当前时间并计算 8 小时前的时间
|
||
eight_hours_ago = current_time - timedelta(hours=8)
|
||
last_summary_time = eight_hours_ago.strftime('%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
# 检查 redis 中的时间与当前时间差是否小于 3 小时
|
||
last_summary_time_obj = datetime.strptime(last_summary_time, '%Y-%m-%d %H:%M:%S')
|
||
time_diff = current_time - last_summary_time_obj
|
||
|
||
if time_diff < timedelta(hours=3):
|
||
# 小于 3 小时,取 3 小时前
|
||
last_summary_time = (current_time - timedelta(hours=3)).strftime('%Y-%m-%d %H:%M:%S')
|
||
elif time_diff > timedelta(days=1):
|
||
# 大于 24 小时,取 6 小时前
|
||
last_summary_time = (current_time - timedelta(hours=6)).strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
# 更新 Redis 存储的当前时间
|
||
self._get_redis_connection().set(key, current_date)
|
||
|
||
with connection.cursor() as cursor:
|
||
# 执行查询,获取最近 8 小时的消息
|
||
query = """
|
||
SELECT timestamp, sender, content,message_type
|
||
FROM messages
|
||
WHERE timestamp >= %s AND message_type in(1,49) AND group_id = %s
|
||
AND length(content)>6
|
||
AND content NOT LIKE '/%'
|
||
"""
|
||
cursor.execute(query, (last_summary_time, group_id))
|
||
|
||
# 构建最终的结果字符串
|
||
# message_type 需要加入49类型,因为49是引用之后的发言。但是49是xml ,需要将content进行xml解析
|
||
|
||
result = []
|
||
for row in cursor.fetchall():
|
||
timestamp, sender, content, message_type = row
|
||
try:
|
||
if message_type == "49":
|
||
# 解析 XML 字符串
|
||
root = ET.fromstring(content)
|
||
# 提取 title 内容
|
||
content = root.find('.//title').text
|
||
|
||
except Exception as e:
|
||
print(f"message_type 49 error: {e}")
|
||
sender_name = all_contacts.get(sender, sender) # 获取发送者的名字,若找不到则使用原 ID
|
||
result.append(f"{timestamp},{sender_name},{content}")
|
||
|
||
result_str = "\n".join(result) # 将结果拼接为最终字符串
|
||
print(result_str)
|
||
return result_str
|