Files
abot/db/message_storage.py
2026-05-01 12:45:40 +08:00

818 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
from datetime import datetime
import json
from typing import Dict, List, Optional
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
from wechat_ipad.models.message import WxMessage
class MessageStorageDB(BaseDBOperator):
"""消息存储相关数据库操作"""
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
def archive_message(self, msg: WxMessage) -> bool:
"""存档消息
说明:
1. 结构化字段content/message_type/message_xml 等)继续保留,便于高频查询;
2. 新增 raw_payload 字段,落盘 API 原始消息的完整序列化内容,便于后续排障与二次特征提取;
3. 为兼容尚未升级表结构的老环境,先尝试新 SQL失败后自动回退旧 SQL。
"""
now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 先准备好可复用字段,避免在两套 SQL 中重复拼接业务字段。
base_params = (
msg.roomid,
now_time,
msg.sender,
str(msg.content.clean_content),
msg.msg_type.value,
str(msg.content.xml_content),
msg.msg_id,
msg.msg_source,
"",
)
# 尽可能保存完整原始负载:优先使用对象自带序列化能力,其次兜底到 __dict__。
raw_payload = self._serialize_raw_payload(msg)
sql_with_raw_payload = """
INSERT INTO messages (
group_id, timestamp, sender, content, message_type,
attachment_url, message_id, message_xml, raw_payload, mentioned_user_ids, message_thumb
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# 为了降低主链路延迟,这里不做@解析mentioned_user_ids 先置空,后续由定时任务批处理回填。
params_with_raw_payload = (*base_params[:8], raw_payload, None, base_params[8])
archived = self.execute_update(sql_with_raw_payload, params_with_raw_payload)
if archived:
return True
# 兼容旧表结构:如果线上还没执行 ALTER TABLE加列前仍可继续正常归档。
sql_legacy = """
INSERT INTO messages (
group_id, timestamp, sender, content, message_type,
attachment_url, message_id, message_xml, message_thumb
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
return self.execute_update(sql_legacy, base_params)
def _serialize_raw_payload(self, msg: WxMessage) -> str:
"""将消息对象尽量完整地序列化为 JSON 字符串。
序列化优先级(从高到低):
1. pydantic v2: model_dump_json
2. pydantic v1: json
3. 自定义方法: to_json
4. 通用兜底: __dict__
说明:
- 不做脱敏,按用户要求保存完整信息;
- 使用 ensure_ascii=False 保留中文原文,便于后续排查。
"""
try:
if hasattr(msg, "model_dump_json") and callable(getattr(msg, "model_dump_json")):
return msg.model_dump_json(exclude_none=False)
if hasattr(msg, "json") and callable(getattr(msg, "json")):
return msg.json(ensure_ascii=False)
if hasattr(msg, "to_json") and callable(getattr(msg, "to_json")):
raw_text = msg.to_json()
return raw_text if isinstance(raw_text, str) else json.dumps(raw_text, ensure_ascii=False, default=str)
return json.dumps(getattr(msg, "__dict__", {"repr": str(msg)}), ensure_ascii=False, default=str)
except Exception:
# 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。
return str(msg)
def get_recent_messages(self, group_id: str, hours_ago: int = 8, min_content_length: int = 6) -> List[Dict]:
"""获取最近的消息"""
sql = """
SELECT timestamp, sender, content, message_type
FROM messages
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s HOUR)
AND message_type in (1, 49)
AND group_id = %s
AND length(content) > %s
AND CHAR_LENGTH(content) < 300
AND content NOT LIKE '/%'
ORDER BY timestamp ASC
"""
params = (hours_ago, group_id, min_content_length)
return self.execute_query(sql, params) or []
def get_latest_image_message(self, group_id: str, before_timestamp: str = "", hours_ago: int = 8) -> Optional[Dict]:
"""获取指定群最近一条已落盘图片消息"""
sql = """
SELECT timestamp, sender, content, message_type, image_path
FROM messages
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s HOUR)
AND group_id = %s
AND message_type = 3
AND image_path IS NOT NULL
AND image_path <> ''
"""
params: List = [hours_ago, group_id]
if before_timestamp:
sql += " AND timestamp <= %s"
params.append(before_timestamp)
sql += " ORDER BY timestamp DESC LIMIT 1"
return self.execute_query(sql, tuple(params), fetch_one=True)
def get_message_by_message_id(self, message_id: int | str) -> Optional[Dict]:
"""根据 message_id 获取单条消息"""
sql = """
SELECT id, group_id, timestamp, sender, content, message_type,
attachment_url, message_id, message_xml, message_thumb, image_path
FROM messages
WHERE message_id = %s
ORDER BY id DESC
LIMIT 1
"""
return self.execute_query(sql, (message_id,), fetch_one=True)
def get_image_message_by_md5(self, md5: str) -> Optional[Dict]:
"""根据图片消息 attachment_url 中的 md5 反查原图消息"""
sql = """
SELECT id, group_id, timestamp, sender, content, message_type,
attachment_url, message_id, message_xml, message_thumb, image_path
FROM messages
WHERE message_type = 3
AND attachment_url IS NOT NULL
AND attachment_url <> ''
AND attachment_url LIKE %s
ORDER BY id DESC
LIMIT 1
"""
return self.execute_query(sql, (f'%md5="{md5}"%',), fetch_one=True)
def get_media_message_by_md5(self, md5: str, current_message_id: int | str | None = None) -> Optional[Dict]:
"""根据 md5 查找已落盘的图片/表情消息,用于去重复用本地文件"""
sql = """
SELECT id, group_id, timestamp, sender, content, message_type,
attachment_url, message_id, message_xml, message_thumb, image_path
FROM messages
WHERE attachment_url IS NOT NULL
AND attachment_url <> ''
AND attachment_url LIKE %s
AND image_path IS NOT NULL
AND image_path <> ''
"""
params: List = [f'%md5="{md5}"%']
if current_message_id is not None:
sql += " AND message_id <> %s"
params.append(current_message_id)
sql += " ORDER BY id DESC LIMIT 1"
return self.execute_query(sql, tuple(params), fetch_one=True)
def get_member_recent_messages(self, group_id: str, wxid: str, days: int = 30,
limit: int = 200, include_today: bool = True) -> List[Dict]:
"""获取指定群成员近期消息"""
if include_today:
sql = """
SELECT timestamp, sender, content, message_type
FROM messages
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
AND group_id = %s
AND sender = %s
AND message_type IN (1, 49)
AND CHAR_LENGTH(content) BETWEEN 2 AND 500
AND content NOT LIKE '/%%'
ORDER BY timestamp DESC
LIMIT %s
"""
params = (days, group_id, wxid, limit)
else:
sql = """
SELECT timestamp, sender, content, message_type
FROM messages
WHERE timestamp >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
AND timestamp < CURDATE()
AND group_id = %s
AND sender = %s
AND message_type IN (1, 49)
AND CHAR_LENGTH(content) BETWEEN 2 AND 500
AND content NOT LIKE '/%%'
ORDER BY timestamp DESC
LIMIT %s
"""
params = (days, group_id, wxid, limit)
results = self.execute_query(sql, params) or []
return list(reversed(results))
def get_member_messages_since(self, group_id: str, wxid: str, since_time, limit: int = 200) -> List[Dict]:
"""获取指定时间之后的成员消息"""
sql = """
SELECT timestamp, sender, content, message_type
FROM messages
WHERE timestamp > %s
AND group_id = %s
AND sender = %s
AND message_type IN (1, 49)
AND CHAR_LENGTH(content) BETWEEN 2 AND 500
AND content NOT LIKE '/%%'
ORDER BY timestamp ASC
LIMIT %s
"""
if isinstance(since_time, datetime):
since_time = since_time.strftime("%Y-%m-%d %H:%M:%S")
return self.execute_query(sql, (since_time, group_id, wxid, limit)) or []
def get_member_active_dates(self, group_id: str, wxid: str, days: int = 365) -> List[Dict]:
"""获取成员在指定时间窗口内的活跃日期列表"""
sql = """
SELECT
DATE(timestamp) AS message_date,
COUNT(*) AS msg_count,
MIN(timestamp) AS first_message_time,
MAX(timestamp) AS last_message_time
FROM messages
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
AND group_id = %s
AND sender = %s
AND message_type IN (1, 49)
AND CHAR_LENGTH(content) BETWEEN 2 AND 500
AND content NOT LIKE '/%%'
GROUP BY DATE(timestamp)
ORDER BY message_date ASC
"""
rows = self.execute_query(sql, (days, group_id, wxid)) or []
for row in rows:
for key in ("message_date", "first_message_time", "last_message_time"):
value = row.get(key)
if isinstance(value, datetime):
row[key] = value.strftime("%Y-%m-%d %H:%M:%S") if key != "message_date" else value.strftime("%Y-%m-%d")
elif value:
row[key] = str(value)
return rows
def get_member_messages_on_date(self, group_id: str, wxid: str, target_date: str, limit: int = 120) -> List[Dict]:
"""获取成员在某一天的消息"""
sql = """
SELECT timestamp, sender, content, message_type
FROM messages
WHERE DATE(timestamp) = %s
AND group_id = %s
AND sender = %s
AND message_type IN (1, 49)
AND CHAR_LENGTH(content) BETWEEN 2 AND 500
AND content NOT LIKE '/%%'
ORDER BY timestamp ASC
LIMIT %s
"""
return self.execute_query(sql, (target_date, group_id, wxid, limit)) or []
def get_member_messages_for_group_date(self, group_id: str, target_date: str, limit: int = 5000) -> List[Dict]:
"""获取群在某一天的全部文本消息"""
sql = """
SELECT timestamp, sender, content, message_type
FROM messages
WHERE DATE(timestamp) = %s
AND group_id = %s
AND sender IS NOT NULL
AND sender <> ''
AND message_type IN (1, 49)
AND CHAR_LENGTH(content) BETWEEN 2 AND 500
AND content NOT LIKE '/%%'
ORDER BY timestamp ASC
LIMIT %s
"""
return self.execute_query(sql, (target_date, group_id, limit)) or []
def get_recent_group_chat_messages(self, group_id: str, limit: int = 20) -> List[Dict]:
"""获取群聊最近消息"""
sql = """
SELECT timestamp, sender, content, message_type, attachment_url, message_id, message_xml, message_thumb, image_path
FROM messages
WHERE group_id = %s
ORDER BY timestamp DESC
LIMIT %s
"""
results = self.execute_query(sql, (group_id, limit)) or []
return list(reversed(results))
def get_recent_personal_messages(self, wxid: str, limit: int = 20) -> List[Dict]:
"""获取私聊最近归档消息
说明:
当前消息表没有可靠的 to_user 会话维度,这里只返回目标联系人发来的、
且未归属到群聊的消息,用于通讯录内的“尽力模式”历史预览。
"""
sql = """
SELECT timestamp, sender, content, message_type, attachment_url, message_id, message_xml, message_thumb, image_path
FROM messages
WHERE (group_id IS NULL OR group_id = '')
AND sender = %s
ORDER BY timestamp DESC
LIMIT %s
"""
results = self.execute_query(sql, (wxid, limit)) or []
return list(reversed(results))
def get_message_count_by_date(self, date: str) -> List[Dict]:
"""获取指定日期的消息统计"""
sql = """
SELECT group_id, sender, COUNT(*) as count
FROM messages
WHERE DATE(timestamp) = %s
GROUP BY group_id, sender
"""
return self.execute_query(sql, (date,)) or []
def get_speech_ranking(self, date: str, group_id: str, limit: int = 20) -> List[Dict]:
"""获取指定日期和群组的发言排名"""
sql = """
SELECT wx_id, count AS speech_count
FROM speech_counts
WHERE date = %s
AND group_id = %s
GROUP BY wx_id
ORDER BY count DESC
LIMIT %s
"""
params = (date, group_id, limit)
results = self.execute_query(sql, params)
return results or []
def insert_speech_count(self, group_id: str, wx_id: str, date: str, count: int) -> bool:
"""插入发言统计数据
Args:
group_id: 群组ID
wx_id: 微信ID
date: 日期,格式为 YYYY-MM-DD
count: 发言计数
Returns:
是否成功插入
"""
sql = """
INSERT INTO speech_counts (group_id, wx_id, date, count)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE count = VALUES(count)
"""
params = (group_id, wx_id, date, count)
return self.execute_update(sql, params)
def get_message_trend(self, group_id: str, days: int = 7) -> List[Dict]:
"""获取指定群组的消息趋势数据
Args:
group_id: 群组ID
days: 获取最近几天的数据默认7天
Returns:
包含日期和消息数量的列表
"""
sql = """
SELECT
DATE(timestamp) as date,
COUNT(*) as message_count
FROM messages
WHERE group_id = %s
AND timestamp >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY DATE(timestamp)
ORDER BY date
"""
return self.execute_query(sql, (group_id, days)) or []
def get_group_member_message_ranking(self, group_id: str, start_time: datetime,
end_time: datetime, limit: int = 10) -> List[Dict]:
"""获取群成员发言排行"""
sql = """
SELECT
sender,
COUNT(*) AS message_count,
MAX(timestamp) AS last_message_time
FROM messages
WHERE timestamp >= %s
AND timestamp <= %s
AND group_id = %s
AND sender IS NOT NULL
AND sender <> ''
GROUP BY sender
ORDER BY message_count DESC, last_message_time DESC
LIMIT %s
"""
params = (
start_time.strftime('%Y-%m-%d %H:%M:%S'),
end_time.strftime('%Y-%m-%d %H:%M:%S'),
group_id,
limit,
)
rows = self.execute_query(sql, params) or []
for row in rows:
dt = row.get("last_message_time")
if isinstance(dt, datetime):
row["last_message_time"] = dt.strftime("%Y-%m-%d %H:%M:%S")
return rows
def get_group_hourly_distribution(self, group_id: str, days: int = 30) -> List[Dict]:
"""获取群消息小时分布"""
sql = """
SELECT
HOUR(timestamp) AS hour_slot,
COUNT(*) AS message_count
FROM messages
WHERE group_id = %s
AND timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
GROUP BY HOUR(timestamp)
ORDER BY hour_slot
"""
rows = self.execute_query(sql, (group_id, days)) or []
return [
{
"hour": int(row.get("hour_slot") or 0),
"message_count": int(row.get("message_count") or 0),
}
for row in rows
]
def get_group_last_message(self, group_id: str) -> Optional[Dict]:
"""获取群最后一条消息信息"""
sql = """
SELECT sender, content, message_type, timestamp
FROM messages
WHERE group_id = %s
ORDER BY timestamp DESC
LIMIT 1
"""
row = self.execute_query(sql, (group_id,), fetch_one=True)
if row and isinstance(row.get("timestamp"), datetime):
row["timestamp"] = row["timestamp"].strftime("%Y-%m-%d %H:%M:%S")
return row
def get_messages_by_filter(self, group_id=None, start_date=None, end_date=None,
search_text=None, page=1, page_size=20) -> Dict:
"""按条件筛选消息并支持分页和模糊搜索
Args:
group_id: 群组ID可选
start_date: 开始日期格式为YYYY-MM-DD可选
end_date: 结束日期格式为YYYY-MM-DD可选
search_text: 搜索文本,可选,用于模糊搜索消息内容
page: 页码从1开始
page_size: 每页记录数
Returns:
包含消息列表和总记录数的字典
"""
# 构建基础SQL查询
sql_count = "SELECT COUNT(*) as total FROM messages WHERE 1=1 "
sql_data = """
SELECT id, group_id, timestamp, sender, content, message_type,
attachment_url, message_id, message_xml, message_thumb, image_path
FROM messages
WHERE 1=1
"""
# 构建参数列表
params = []
# 添加筛选条件
if group_id:
sql_count += " AND group_id = %s "
sql_data += " AND group_id = %s "
params.append(group_id)
if start_date:
sql_count += " AND DATE(timestamp) >= %s "
sql_data += " AND DATE(timestamp) >= %s "
params.append(start_date)
if end_date:
sql_count += " AND DATE(timestamp) <= %s "
sql_data += " AND DATE(timestamp) <= %s "
params.append(end_date)
if search_text:
sql_count += " AND content LIKE %s "
sql_data += " AND content LIKE %s "
params.append(f"%{search_text}%")
# 添加排序和分页
sql_data += " ORDER BY timestamp DESC "
sql_data += " LIMIT %s OFFSET %s "
# 计算分页参数
offset = (page - 1) * page_size
data_params = params.copy()
data_params.extend([page_size, offset])
# 执行查询
count_result = self.execute_query(sql_count, params)
total = count_result[0]['total'] if count_result else 0
messages = self.execute_query(sql_data, data_params) or []
return {
'total': total,
'page': page,
'page_size': page_size,
'total_pages': (total + page_size - 1) // page_size,
'messages': messages
}
def update_message_image_path(self, message_id, image_base64str):
"""
更新消息的图片路径
Args:
message_id: 消息ID
image_base64str: 图片base64内容
Returns:
bool: 更新成功返回True否则返回False
"""
try:
# 构建SQL语句
sql = """
UPDATE messages
SET message_thumb = %s
WHERE message_id = %s
"""
params = (image_base64str, message_id)
# 执行更新操作
result = self.execute_update(sql, params)
return result
except Exception as e:
# 使用已有的日志记录方式
print(f"更新消息图片路径出错: {e}")
return False
def update_message_image_file_path(self, message_id, image_path):
try:
sql = """
UPDATE messages
SET image_path = %s
WHERE message_id = %s
AND image_path IS NULL
"""
params = (image_path, message_id)
result = self.execute_update(sql, params)
return result
except Exception as e:
print(f"更新消息图片文件路径出错: {e}")
return False
def get_hourly_message_trend(self, group_id: str = None, days: int = 1) -> List[Dict]:
"""获取指定群组的按小时消息趋势数据
Args:
group_id: 群组ID如果为None则获取所有群组的数据
days: 获取最近几天的数据默认1天
Returns:
包含小时和消息数量的列表
"""
sql = """
SELECT
DATE_FORMAT(timestamp, '%Y-%m-%d %H:00') as hour_slot,
COUNT(*) as message_count
FROM messages
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
"""
params = [days]
# 如果指定了群组ID则添加群组筛选条件
if group_id:
sql += "AND group_id = %s "
params.append(group_id)
# 按小时分组并排序
sql += """
GROUP BY hour_slot
ORDER BY hour_slot
"""
return self.execute_query(sql, tuple(params)) or []
def get_pending_media_messages(self, minutes_ago: int = 10, limit: int = 50,
group_ids: Optional[List[str]] = None) -> List[Dict]:
"""获取最近N分钟内未处理图片/表情消息image_path IS NULL
Args:
minutes_ago: 查询最近多少分钟的消息默认10分钟
limit: 每次最多处理多少条默认50条
group_ids: 限制只查询指定群组,传空列表则直接返回空
Returns:
包含消息ID、群ID、消息XML等信息的列表
"""
if group_ids is not None and not group_ids:
return []
sql = """
SELECT message_id, group_id, sender, message_type, message_xml, timestamp, attachment_url
FROM messages
WHERE message_type IN ('3', '47', '1048625', '1090519089')
AND image_path IS NULL
AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE)
AND attachment_url IS NOT NULL
AND attachment_url != ''
ORDER BY timestamp ASC
LIMIT %s
"""
params: List = [minutes_ago]
if group_ids is not None:
placeholders = ", ".join(["%s"] * len(group_ids))
sql = sql.replace("ORDER BY timestamp ASC", f"AND group_id IN ({placeholders})\n ORDER BY timestamp ASC")
params.extend(group_ids)
params.append(limit)
return self.execute_query(sql, params) or []
def get_pending_image_messages(self, minutes_ago: int = 10, limit: int = 50) -> List[Dict]:
"""兼容旧方法名,内部复用统一媒体待处理查询"""
return self.get_pending_media_messages(minutes_ago, limit)
def get_recent_emoji_assets(self, limit: int = 200) -> List[Dict]:
"""获取近期表情消息记录(用于提取 md5 + len 发送参数)"""
sql = """
SELECT message_id, group_id, sender, timestamp, message_type, attachment_url, image_path
FROM messages
WHERE message_type IN ('47', '1048625', '1090519089')
AND attachment_url IS NOT NULL
AND attachment_url <> ''
ORDER BY timestamp DESC
LIMIT %s
"""
return self.execute_query(sql, (limit,)) or []
def get_emoji_asset_by_md5(self, md5: str) -> Optional[Dict]:
"""根据表情 md5 精确查找最近一条表情消息。
说明:
1. 后台聊天发送表情时,前端偶尔只能拿到 md5拿不到 total_length
2. wechat_ipad 的 SendEmoji 接口要求同时提供 Md5 和 TotalLen
3. 这里直接从历史消息表里按 md5 反查最近一条原始记录,给发送接口补全长度。
"""
sql = """
SELECT message_id, group_id, sender, timestamp, message_type, attachment_url, image_path
FROM messages
WHERE message_type IN ('47', '1048625', '1090519089')
AND attachment_url IS NOT NULL
AND attachment_url <> ''
AND attachment_url LIKE %s
ORDER BY timestamp DESC
LIMIT 1
"""
return self.execute_query(sql, (f'%md5="{md5}"%',), fetch_one=True)
def get_messages_by_date_range(self, group_id: str, start_date: str, end_date: str = None,
min_content_length: int = 6, max_results: int = 5000) -> List[Dict]:
"""按日期范围获取消息(支持按天总结)
Args:
group_id: 群组ID
start_date: 开始日期,格式 YYYY-MM-DD
end_date: 结束日期,格式 YYYY-MM-DD如果为None则使用start_date当天
min_content_length: 最小内容长度
max_results: 最多返回多少条消息防止数据过多默认5000条足够总结使用
Returns:
消息列表
"""
if end_date is None:
end_date = start_date
sql = """
SELECT timestamp, sender, content, message_type
FROM messages
WHERE DATE(timestamp) >= %s
AND DATE(timestamp) <= %s
AND group_id = %s
AND message_type IN (1, 49)
AND LENGTH(content) > %s
AND CHAR_LENGTH(content) < 300
AND content NOT LIKE '/%'
ORDER BY timestamp ASC
LIMIT %s
"""
params = (start_date, end_date, group_id, min_content_length, max_results)
return self.execute_query(sql, params) or []
def get_messages_for_summary(self, group_id: str, hours_ago: int = 8,
min_messages: int = 50,
max_hours: int = 48,
max_results: int = 5000) -> List[Dict]:
"""智能获取用于总结的消息(自动调整时间范围)
Args:
group_id: 群组ID
hours_ago: 默认查询最近多少小时
min_messages: 最少需要多少条消息,如果不足会扩大时间范围
max_hours: 最大查询多少小时内的消息
max_results: 最多返回多少条消息默认5000条确保有足够数据
Returns:
消息列表
"""
# 先尝试默认时间范围
messages = self.get_recent_messages(group_id, hours_ago=hours_ago)
# 如果消息不足,逐步扩大时间范围
current_hours = hours_ago
while len(messages) < min_messages and current_hours < max_hours:
current_hours += 8 # 每次增加8小时
messages = self.get_recent_messages(group_id, hours_ago=current_hours)
# 限制最大返回数量5000条足以覆盖1-2天的活跃群聊
return messages[:max_results] if messages else []
def get_messages_by_date_range(self, group_id: str, start_time: datetime, end_time: datetime) -> List[Dict]:
"""获取指定时间范围内的消息
Args:
group_id: 群组ID
start_time: 开始时间
end_time: 结束时间
Returns:
消息列表
"""
sql = """
SELECT timestamp, sender, content, message_type
FROM messages
WHERE timestamp >= %s
AND timestamp <= %s
AND message_type in (1, 49)
AND group_id = %s
AND length(content) > 6
AND CHAR_LENGTH(content) < 300
AND content NOT LIKE '/%'
ORDER BY timestamp ASC
"""
params = (start_time.strftime('%Y-%m-%d %H:%M:%S'),
end_time.strftime('%Y-%m-%d %H:%M:%S'),
group_id)
return self.execute_query(sql, params) or []
def count_messages_by_date_range(self, group_id: str, start_time: datetime, end_time: datetime) -> int:
"""统计指定时间范围内的消息数量
Args:
group_id: 群组ID
start_time: 开始时间
end_time: 结束时间
Returns:
消息数量
"""
sql = """
SELECT COUNT(*) as count
FROM messages
WHERE timestamp >= %s
AND timestamp <= %s
AND message_type in (1, 49)
AND group_id = %s
AND length(content) > 6
AND CHAR_LENGTH(content) < 300
AND content NOT LIKE '/%'
"""
params = (start_time.strftime('%Y-%m-%d %H:%M:%S'),
end_time.strftime('%Y-%m-%d %H:%M:%S'),
group_id)
result = self.execute_query(sql, params)
return result[0]['count'] if result else 0
def get_message_stats_by_date_range(self, group_id: str, start_time: datetime, end_time: datetime) -> Dict:
"""统计指定时间范围内的群消息概览"""
sql = """
SELECT
COUNT(*) AS total_count,
COUNT(DISTINCT sender) AS participant_count,
SUM(CASE WHEN message_type = 1 THEN 1 ELSE 0 END) AS text_count,
SUM(CASE WHEN message_type = 3 THEN 1 ELSE 0 END) AS image_count,
SUM(CASE WHEN message_type IN (43, 62) THEN 1 ELSE 0 END) AS video_count,
SUM(CASE WHEN message_type = 49 THEN 1 ELSE 0 END) AS link_count,
SUM(CASE WHEN message_type IN (47, 1048625, 1090519089) THEN 1 ELSE 0 END) AS emoji_count
FROM messages
WHERE timestamp >= %s
AND timestamp <= %s
AND group_id = %s
AND sender IS NOT NULL
AND sender <> ''
"""
params = (
start_time.strftime('%Y-%m-%d %H:%M:%S'),
end_time.strftime('%Y-%m-%d %H:%M:%S'),
group_id,
)
result = self.execute_query(sql, params, fetch_one=True) or {}
return {
"total_count": int(result.get("total_count") or 0),
"participant_count": int(result.get("participant_count") or 0),
"text_count": int(result.get("text_count") or 0),
"image_count": int(result.get("image_count") or 0),
"video_count": int(result.get("video_count") or 0),
"link_count": int(result.get("link_count") or 0),
"emoji_count": int(result.get("emoji_count") or 0),
}