去除一些无用功能,以前的UI这些。
This commit is contained in:
@@ -1,63 +1,118 @@
|
||||
import pymysql
|
||||
from datetime import datetime, timedelta
|
||||
import redis
|
||||
import xml.etree.ElementTree as ET
|
||||
import logging
|
||||
|
||||
from wcferry import WxMsg
|
||||
|
||||
from message_summary.message_summary_4o import message_summary
|
||||
from db.connection import DBConnectionManager
|
||||
from db.message_storage import MessageStorageDB
|
||||
|
||||
import mysql.connector.pooling
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger("MessageStorage")
|
||||
|
||||
|
||||
class MessageStorage:
|
||||
|
||||
def __init__(self, db_pool: mysql.connector.pooling.MySQLConnectionPool, redis_pool: redis.ConnectionPool):
|
||||
self.redis_pool = redis_pool
|
||||
self.db_pool = db_pool
|
||||
def __init__(self):
|
||||
# 获取数据库连接管理器的单例
|
||||
self.db_manager = DBConnectionManager()
|
||||
self.message_db = MessageStorageDB(self.db_manager)
|
||||
# 初始化本地缓存字典,使用 group_id 作为键
|
||||
self.local_membercounts = {}
|
||||
self.local_members = {}
|
||||
|
||||
def _get_redis_connection(self):
|
||||
"""从连接池获取 Redis 连接"""
|
||||
return redis.Redis(connection_pool=self.redis_pool, decode_responses=True)
|
||||
def process_message(self, message: WxMsg):
|
||||
# 示例message字符串
|
||||
current_date = datetime.now().strftime('%Y-%m-%d')
|
||||
# 生成Redis key
|
||||
key = f"{message.roomid}:{message.sender}:{current_date}:count"
|
||||
# 获取 Redis 连接
|
||||
redis_conn = self.db_manager.get_redis_connection()
|
||||
# 使用Redis哈希(或字符串)增加发言次数
|
||||
redis_conn.hincrby(key, 'count', 1) # 这里使用哈希,但也可以考虑用字符串的INCR操作
|
||||
# 设置时效为48小时
|
||||
redis_conn.expire(key, 86400 * 2)
|
||||
# 或者使用字符串:r.incr(key) # 如果只存储一个整数值,字符串类型可能更简单
|
||||
|
||||
def archive_message(self, msg: WxMsg):
|
||||
# 连接到数据库
|
||||
connection = self.db_pool.get_connection()
|
||||
|
||||
try:
|
||||
now_time = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
|
||||
with connection.cursor() as cursor:
|
||||
# 插入消息信息
|
||||
sql = """
|
||||
INSERT INTO messages (group_id,timestamp, sender, content, message_type, attachment_url,message_id,message_xml,message_thumb)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||
"""
|
||||
cursor.execute(sql, (
|
||||
msg.roomid, now_time, msg.sender, msg.content, msg.type, msg.extra, msg.id, msg.xml, msg.thumb))
|
||||
# 使用 MessageStorageDB 类存档消息
|
||||
result = self.message_db.archive_message(msg)
|
||||
if result:
|
||||
logger.info(f"消息存档成功: {msg.roomid}:{msg.sender}: {msg.content}")
|
||||
else:
|
||||
logger.error(f"消息存档失败: {msg.roomid}:{msg.sender}")
|
||||
except Exception as e:
|
||||
logger.error(f"存档消息出错: {e}")
|
||||
|
||||
# 提交事务
|
||||
connection.commit()
|
||||
print(f"Archived:{now_time}:{msg.roomid}:{msg.sender}: {msg.content}")
|
||||
def write_to_db(self):
|
||||
"""从Redis读取发言统计数据并写入数据库"""
|
||||
# 获取Redis连接
|
||||
redis_conn = self.db_manager.get_redis_connection()
|
||||
|
||||
# 获取当前日期的前一天
|
||||
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
|
||||
|
||||
# 遍历Redis中所有与昨天日期相关的key,并写入数据库
|
||||
for key_bytes in redis_conn.keys(f"*:*:{yesterday}:count"):
|
||||
key = key_bytes.decode('utf-8')
|
||||
parts = key.split(':')
|
||||
group_id, wx_id, _date = parts[0], parts[1], parts[2] # _date应该是yesterday
|
||||
|
||||
# 获取计数值
|
||||
count_bytes = redis_conn.hget(key, 'count')
|
||||
count = int(count_bytes) if isinstance(count_bytes, bytes) else 0
|
||||
|
||||
# 使用MessageStorageDB插入数据
|
||||
try:
|
||||
result = self.message_db.insert_speech_count(group_id, wx_id, yesterday, count)
|
||||
if result:
|
||||
logging.info(f"成功写入发言统计: {group_id}, {wx_id}, {yesterday}, {count}")
|
||||
else:
|
||||
logging.error(f"写入发言统计失败: {group_id}, {wx_id}, {yesterday}, {count}")
|
||||
except Exception as e:
|
||||
logging.error(f"写入发言统计出错: {e}")
|
||||
|
||||
def generate_and_send_ranking(self, groupId, allContacts: dict):
|
||||
"""生成并发送群聊发言排名"""
|
||||
try:
|
||||
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
|
||||
|
||||
# 使用数据库操作类获取排名数据
|
||||
results = self.message_db.get_speech_ranking(yesterday, groupId, limit=20)
|
||||
|
||||
if not results:
|
||||
logging.info(f"没有找到 {yesterday} 的群聊 {groupId} 发言记录")
|
||||
return f"{yesterday} 没有发言记录"
|
||||
|
||||
# 格式化输出字符串
|
||||
ranking_str = yesterday + "发言数量前20的用户排名:\n"
|
||||
for rank, result in enumerate(results, start=1):
|
||||
username = result['wx_id']
|
||||
speech_count = result['speech_count']
|
||||
ranking_str += f"{rank}. {allContacts.get(username, username)}: {speech_count} 次发言\n"
|
||||
|
||||
logging.info(f"成功生成 {yesterday} 的群聊 {groupId} 发言排名")
|
||||
return ranking_str
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error archiving message: {e}")
|
||||
connection.rollback()
|
||||
|
||||
finally:
|
||||
# 关闭连接
|
||||
connection.close()
|
||||
logging.error(f"生成发言排名出错: {e}")
|
||||
return f"生成发言排名出错: {e}"
|
||||
|
||||
def get_messages(self, group_id, all_contacts: dict):
|
||||
# 连接到数据库
|
||||
with self.db_pool.get_connection() as connection:
|
||||
try:
|
||||
# 获取 Redis 连接
|
||||
redis_conn = self.db_manager.get_redis_connection()
|
||||
|
||||
# 获取 redis 中的上次总结时间,本次从上次开始算,若没有,则从 8 小时之前开始计算
|
||||
key = f"{group_id}:summary_time"
|
||||
last_summary_time = self._get_redis_connection().get(key)
|
||||
print(f"last_summary_time:{last_summary_time}")
|
||||
# 如果 Redis 返回值为字节类型,转换为字符串
|
||||
last_summary_time = redis_conn.get(key)
|
||||
logger.info(f"上次总结时间: {last_summary_time}")
|
||||
|
||||
current_time = datetime.now()
|
||||
current_date = current_time.strftime('%Y-%m-%d %H:%M:%S')
|
||||
@@ -67,6 +122,10 @@ class MessageStorage:
|
||||
eight_hours_ago = current_time - timedelta(hours=8)
|
||||
last_summary_time = eight_hours_ago.strftime('%Y-%m-%d %H:%M:%S')
|
||||
else:
|
||||
# 如果 Redis 返回值为字节类型,转换为字符串
|
||||
if isinstance(last_summary_time, bytes):
|
||||
last_summary_time = last_summary_time.decode('utf-8')
|
||||
|
||||
# 检查 redis 中的时间与当前时间差是否小于 3 小时
|
||||
last_summary_time_obj = datetime.strptime(last_summary_time, '%Y-%m-%d %H:%M:%S')
|
||||
time_diff = current_time - last_summary_time_obj
|
||||
@@ -79,37 +138,33 @@ class MessageStorage:
|
||||
last_summary_time = (current_time - timedelta(hours=10)).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# 更新 Redis 存储的当前时间
|
||||
self._get_redis_connection().set(key, current_date)
|
||||
redis_conn.set(key, current_date)
|
||||
|
||||
with connection.cursor() as cursor:
|
||||
# 执行查询,获取最近 8 小时的消息
|
||||
query = """
|
||||
SELECT timestamp, sender, content,message_type
|
||||
FROM messages
|
||||
WHERE timestamp >= %s AND message_type in(1,49) AND group_id = %s
|
||||
AND length(content)>6
|
||||
AND content NOT LIKE '/%'
|
||||
"""
|
||||
cursor.execute(query, (last_summary_time, group_id))
|
||||
# 使用 MessageStorageDB 类获取最近消息
|
||||
hours_ago = int(
|
||||
(current_time - datetime.strptime(last_summary_time, '%Y-%m-%d %H:%M:%S')).total_seconds() / 3600) + 1
|
||||
messages = self.message_db.get_recent_messages(group_id, hours_ago=hours_ago)
|
||||
|
||||
# 构建最终的结果字符串
|
||||
# message_type 需要加入49类型,因为49是引用之后的发言。但是49是xml ,需要将content进行xml解析
|
||||
# 构建最终的结果字符串
|
||||
result = []
|
||||
for msg in messages:
|
||||
timestamp, sender, content, message_type = msg['timestamp'], msg['sender'], msg['content'], msg[
|
||||
'message_type']
|
||||
try:
|
||||
if message_type == 49: # 注意这里是整数类型
|
||||
# 解析 XML 字符串
|
||||
root = ET.fromstring(content)
|
||||
# 提取 title 内容
|
||||
content = root.find('.//title').text
|
||||
except Exception as e:
|
||||
logger.error(f"解析消息类型49出错: {e}")
|
||||
|
||||
result = []
|
||||
for row in cursor.fetchall():
|
||||
timestamp, sender, content, message_type = row
|
||||
try:
|
||||
if message_type == "49":
|
||||
# 解析 XML 字符串
|
||||
root = ET.fromstring(content)
|
||||
# 提取 title 内容
|
||||
content = root.find('.//title').text
|
||||
sender_name = all_contacts.get(sender, sender) # 获取发送者的名字,若找不到则使用原 ID
|
||||
result.append(f"{timestamp},{sender_name},{content}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"message_type 49 error: {e}")
|
||||
sender_name = all_contacts.get(sender, sender) # 获取发送者的名字,若找不到则使用原 ID
|
||||
result.append(f"{timestamp},{sender_name},{content}")
|
||||
result_str = "\n".join(result) # 将结果拼接为最终字符串
|
||||
return result_str
|
||||
|
||||
result_str = "\n".join(result) # 将结果拼接为最终字符串
|
||||
# print(result_str)
|
||||
return result_str
|
||||
except Exception as e:
|
||||
logger.error(f"获取消息出错: {e}")
|
||||
return ""
|
||||
|
||||
Reference in New Issue
Block a user