110 lines
3.6 KiB
Python
110 lines
3.6 KiB
Python
# -*- coding: utf-8 -*-
|
||
from datetime import datetime
|
||
from typing import Dict, List, Optional
|
||
|
||
from wcferry import WxMsg
|
||
|
||
from db.base import BaseDBOperator
|
||
from db.connection import DBConnectionManager
|
||
|
||
|
||
class MessageStorageDB(BaseDBOperator):
|
||
"""消息存储相关数据库操作"""
|
||
|
||
def __init__(self, db_manager: DBConnectionManager):
|
||
super().__init__(db_manager)
|
||
|
||
def archive_message(self, msg: WxMsg) -> bool:
|
||
"""存档消息"""
|
||
now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
sql = """
|
||
INSERT INTO messages (group_id, timestamp, sender, content, message_type, attachment_url, message_id, message_xml, message_thumb)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
"""
|
||
params = (msg.roomid, now_time, msg.sender, msg.content, msg.type, msg.extra, msg.id, msg.xml, msg.thumb)
|
||
result = self.execute_update(sql, params)
|
||
return result
|
||
|
||
def get_recent_messages(self, group_id: str, hours_ago: int = 8, min_content_length: int = 6) -> List[Dict]:
|
||
"""获取最近的消息"""
|
||
sql = """
|
||
SELECT timestamp, sender, content, message_type
|
||
FROM messages
|
||
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s HOUR)
|
||
AND message_type in (1, 49)
|
||
AND group_id = %s
|
||
AND length(content) > %s
|
||
AND CHAR_LENGTH(content) < 300
|
||
AND content NOT LIKE '/%'
|
||
"""
|
||
params = (hours_ago, group_id, min_content_length)
|
||
return self.execute_query(sql, params) or []
|
||
|
||
def get_message_count_by_date(self, date: str) -> List[Dict]:
|
||
"""获取指定日期的消息统计"""
|
||
sql = """
|
||
SELECT group_id, sender, COUNT(*) as count
|
||
FROM messages
|
||
WHERE DATE(timestamp) = %s
|
||
GROUP BY group_id, sender
|
||
"""
|
||
return self.execute_query(sql, (date,)) or []
|
||
|
||
|
||
def get_speech_ranking(self, date: str, group_id: str, limit: int = 20) -> List[Dict]:
|
||
"""获取指定日期和群组的发言排名"""
|
||
sql = """
|
||
SELECT wx_id, count AS speech_count
|
||
FROM speech_counts
|
||
WHERE date = %s
|
||
AND group_id = %s
|
||
GROUP BY wx_id
|
||
ORDER BY count DESC
|
||
LIMIT %s
|
||
"""
|
||
params = (date, group_id, limit)
|
||
results = self.execute_query(sql, params)
|
||
return results or []
|
||
|
||
def insert_speech_count(self, group_id: str, wx_id: str, date: str, count: int) -> bool:
|
||
"""插入发言统计数据
|
||
|
||
Args:
|
||
group_id: 群组ID
|
||
wx_id: 微信ID
|
||
date: 日期,格式为 YYYY-MM-DD
|
||
count: 发言计数
|
||
|
||
Returns:
|
||
是否成功插入
|
||
"""
|
||
sql = """
|
||
INSERT INTO speech_counts (group_id, wx_id, date, count)
|
||
VALUES (%s, %s, %s, %s)
|
||
ON DUPLICATE KEY UPDATE count = VALUES(count)
|
||
"""
|
||
params = (group_id, wx_id, date, count)
|
||
return self.execute_update(sql, params)
|
||
|
||
|
||
def get_message_trend(self, group_id: str, days: int = 7) -> List[Dict]:
|
||
"""获取指定群组的消息趋势数据
|
||
|
||
Args:
|
||
group_id: 群组ID
|
||
days: 获取最近几天的数据,默认7天
|
||
|
||
Returns:
|
||
包含日期和消息数量的列表
|
||
"""
|
||
sql = """
|
||
SELECT
|
||
DATE(timestamp) as date,
|
||
COUNT(*) as message_count
|
||
FROM messages
|
||
WHERE group_id = %s
|
||
AND timestamp >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
|
||
GROUP BY DATE(timestamp)
|
||
ORDER BY date
|
||
"""
|
||
return self.execute_query(sql, (group_id, days)) or [] |