818 lines
31 KiB
Python
818 lines
31 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
from datetime import datetime
|
||
import json
|
||
from typing import Dict, List, Optional
|
||
|
||
from db.base import BaseDBOperator
|
||
from db.connection import DBConnectionManager
|
||
from wechat_ipad.models.message import WxMessage
|
||
|
||
|
||
class MessageStorageDB(BaseDBOperator):
|
||
"""消息存储相关数据库操作"""
|
||
|
||
def __init__(self, db_manager: DBConnectionManager):
|
||
super().__init__(db_manager)
|
||
|
||
def archive_message(self, msg: WxMessage) -> bool:
|
||
"""存档消息
|
||
|
||
说明:
|
||
1. 结构化字段(content/message_type/message_xml 等)继续保留,便于高频查询;
|
||
2. 新增 raw_payload 字段,落盘 API 原始消息的完整序列化内容,便于后续排障与二次特征提取;
|
||
3. 为兼容尚未升级表结构的老环境,先尝试新 SQL,失败后自动回退旧 SQL。
|
||
"""
|
||
now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
# 先准备好可复用字段,避免在两套 SQL 中重复拼接业务字段。
|
||
base_params = (
|
||
msg.roomid,
|
||
now_time,
|
||
msg.sender,
|
||
str(msg.content.clean_content),
|
||
msg.msg_type.value,
|
||
str(msg.content.xml_content),
|
||
msg.msg_id,
|
||
msg.msg_source,
|
||
"",
|
||
)
|
||
|
||
# 尽可能保存完整原始负载:优先使用对象自带序列化能力,其次兜底到 __dict__。
|
||
raw_payload = self._serialize_raw_payload(msg)
|
||
|
||
sql_with_raw_payload = """
|
||
INSERT INTO messages (
|
||
group_id, timestamp, sender, content, message_type,
|
||
attachment_url, message_id, message_xml, raw_payload, mentioned_user_ids, message_thumb
|
||
)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
"""
|
||
# 为了降低主链路延迟,这里不做@解析,mentioned_user_ids 先置空,后续由定时任务批处理回填。
|
||
params_with_raw_payload = (*base_params[:8], raw_payload, None, base_params[8])
|
||
archived = self.execute_update(sql_with_raw_payload, params_with_raw_payload)
|
||
if archived:
|
||
return True
|
||
|
||
# 兼容旧表结构:如果线上还没执行 ALTER TABLE,加列前仍可继续正常归档。
|
||
sql_legacy = """
|
||
INSERT INTO messages (
|
||
group_id, timestamp, sender, content, message_type,
|
||
attachment_url, message_id, message_xml, message_thumb
|
||
)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
"""
|
||
return self.execute_update(sql_legacy, base_params)
|
||
|
||
def _serialize_raw_payload(self, msg: WxMessage) -> str:
|
||
"""将消息对象尽量完整地序列化为 JSON 字符串。
|
||
|
||
序列化优先级(从高到低):
|
||
1. pydantic v2: model_dump_json
|
||
2. pydantic v1: json
|
||
3. 自定义方法: to_json
|
||
4. 通用兜底: __dict__
|
||
|
||
说明:
|
||
- 不做脱敏,按用户要求保存完整信息;
|
||
- 使用 ensure_ascii=False 保留中文原文,便于后续排查。
|
||
"""
|
||
try:
|
||
if hasattr(msg, "model_dump_json") and callable(getattr(msg, "model_dump_json")):
|
||
return msg.model_dump_json(exclude_none=False)
|
||
if hasattr(msg, "json") and callable(getattr(msg, "json")):
|
||
return msg.json(ensure_ascii=False)
|
||
if hasattr(msg, "to_json") and callable(getattr(msg, "to_json")):
|
||
raw_text = msg.to_json()
|
||
return raw_text if isinstance(raw_text, str) else json.dumps(raw_text, ensure_ascii=False, default=str)
|
||
return json.dumps(getattr(msg, "__dict__", {"repr": str(msg)}), ensure_ascii=False, default=str)
|
||
except Exception:
|
||
# 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。
|
||
return str(msg)
|
||
|
||
def get_recent_messages(self, group_id: str, hours_ago: int = 8, min_content_length: int = 6) -> List[Dict]:
|
||
"""获取最近的消息"""
|
||
sql = """
|
||
SELECT timestamp, sender, content, message_type
|
||
FROM messages
|
||
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s HOUR)
|
||
AND message_type in (1, 49)
|
||
AND group_id = %s
|
||
AND length(content) > %s
|
||
AND CHAR_LENGTH(content) < 300
|
||
AND content NOT LIKE '/%'
|
||
ORDER BY timestamp ASC
|
||
"""
|
||
params = (hours_ago, group_id, min_content_length)
|
||
return self.execute_query(sql, params) or []
|
||
|
||
def get_latest_image_message(self, group_id: str, before_timestamp: str = "", hours_ago: int = 8) -> Optional[Dict]:
|
||
"""获取指定群最近一条已落盘图片消息"""
|
||
sql = """
|
||
SELECT timestamp, sender, content, message_type, image_path
|
||
FROM messages
|
||
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s HOUR)
|
||
AND group_id = %s
|
||
AND message_type = 3
|
||
AND image_path IS NOT NULL
|
||
AND image_path <> ''
|
||
"""
|
||
params: List = [hours_ago, group_id]
|
||
if before_timestamp:
|
||
sql += " AND timestamp <= %s"
|
||
params.append(before_timestamp)
|
||
sql += " ORDER BY timestamp DESC LIMIT 1"
|
||
return self.execute_query(sql, tuple(params), fetch_one=True)
|
||
|
||
def get_message_by_message_id(self, message_id: int | str) -> Optional[Dict]:
|
||
"""根据 message_id 获取单条消息"""
|
||
sql = """
|
||
SELECT id, group_id, timestamp, sender, content, message_type,
|
||
attachment_url, message_id, message_xml, message_thumb, image_path
|
||
FROM messages
|
||
WHERE message_id = %s
|
||
ORDER BY id DESC
|
||
LIMIT 1
|
||
"""
|
||
return self.execute_query(sql, (message_id,), fetch_one=True)
|
||
|
||
def get_image_message_by_md5(self, md5: str) -> Optional[Dict]:
|
||
"""根据图片消息 attachment_url 中的 md5 反查原图消息"""
|
||
sql = """
|
||
SELECT id, group_id, timestamp, sender, content, message_type,
|
||
attachment_url, message_id, message_xml, message_thumb, image_path
|
||
FROM messages
|
||
WHERE message_type = 3
|
||
AND attachment_url IS NOT NULL
|
||
AND attachment_url <> ''
|
||
AND attachment_url LIKE %s
|
||
ORDER BY id DESC
|
||
LIMIT 1
|
||
"""
|
||
return self.execute_query(sql, (f'%md5="{md5}"%',), fetch_one=True)
|
||
|
||
def get_media_message_by_md5(self, md5: str, current_message_id: int | str | None = None) -> Optional[Dict]:
|
||
"""根据 md5 查找已落盘的图片/表情消息,用于去重复用本地文件"""
|
||
sql = """
|
||
SELECT id, group_id, timestamp, sender, content, message_type,
|
||
attachment_url, message_id, message_xml, message_thumb, image_path
|
||
FROM messages
|
||
WHERE attachment_url IS NOT NULL
|
||
AND attachment_url <> ''
|
||
AND attachment_url LIKE %s
|
||
AND image_path IS NOT NULL
|
||
AND image_path <> ''
|
||
"""
|
||
params: List = [f'%md5="{md5}"%']
|
||
if current_message_id is not None:
|
||
sql += " AND message_id <> %s"
|
||
params.append(current_message_id)
|
||
sql += " ORDER BY id DESC LIMIT 1"
|
||
return self.execute_query(sql, tuple(params), fetch_one=True)
|
||
|
||
def get_member_recent_messages(self, group_id: str, wxid: str, days: int = 30,
|
||
limit: int = 200, include_today: bool = True) -> List[Dict]:
|
||
"""获取指定群成员近期消息"""
|
||
if include_today:
|
||
sql = """
|
||
SELECT timestamp, sender, content, message_type
|
||
FROM messages
|
||
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
|
||
AND group_id = %s
|
||
AND sender = %s
|
||
AND message_type IN (1, 49)
|
||
AND CHAR_LENGTH(content) BETWEEN 2 AND 500
|
||
AND content NOT LIKE '/%%'
|
||
ORDER BY timestamp DESC
|
||
LIMIT %s
|
||
"""
|
||
params = (days, group_id, wxid, limit)
|
||
else:
|
||
sql = """
|
||
SELECT timestamp, sender, content, message_type
|
||
FROM messages
|
||
WHERE timestamp >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
|
||
AND timestamp < CURDATE()
|
||
AND group_id = %s
|
||
AND sender = %s
|
||
AND message_type IN (1, 49)
|
||
AND CHAR_LENGTH(content) BETWEEN 2 AND 500
|
||
AND content NOT LIKE '/%%'
|
||
ORDER BY timestamp DESC
|
||
LIMIT %s
|
||
"""
|
||
params = (days, group_id, wxid, limit)
|
||
results = self.execute_query(sql, params) or []
|
||
return list(reversed(results))
|
||
|
||
def get_member_messages_since(self, group_id: str, wxid: str, since_time, limit: int = 200) -> List[Dict]:
|
||
"""获取指定时间之后的成员消息"""
|
||
sql = """
|
||
SELECT timestamp, sender, content, message_type
|
||
FROM messages
|
||
WHERE timestamp > %s
|
||
AND group_id = %s
|
||
AND sender = %s
|
||
AND message_type IN (1, 49)
|
||
AND CHAR_LENGTH(content) BETWEEN 2 AND 500
|
||
AND content NOT LIKE '/%%'
|
||
ORDER BY timestamp ASC
|
||
LIMIT %s
|
||
"""
|
||
if isinstance(since_time, datetime):
|
||
since_time = since_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
return self.execute_query(sql, (since_time, group_id, wxid, limit)) or []
|
||
|
||
def get_member_active_dates(self, group_id: str, wxid: str, days: int = 365) -> List[Dict]:
|
||
"""获取成员在指定时间窗口内的活跃日期列表"""
|
||
sql = """
|
||
SELECT
|
||
DATE(timestamp) AS message_date,
|
||
COUNT(*) AS msg_count,
|
||
MIN(timestamp) AS first_message_time,
|
||
MAX(timestamp) AS last_message_time
|
||
FROM messages
|
||
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
|
||
AND group_id = %s
|
||
AND sender = %s
|
||
AND message_type IN (1, 49)
|
||
AND CHAR_LENGTH(content) BETWEEN 2 AND 500
|
||
AND content NOT LIKE '/%%'
|
||
GROUP BY DATE(timestamp)
|
||
ORDER BY message_date ASC
|
||
"""
|
||
rows = self.execute_query(sql, (days, group_id, wxid)) or []
|
||
for row in rows:
|
||
for key in ("message_date", "first_message_time", "last_message_time"):
|
||
value = row.get(key)
|
||
if isinstance(value, datetime):
|
||
row[key] = value.strftime("%Y-%m-%d %H:%M:%S") if key != "message_date" else value.strftime("%Y-%m-%d")
|
||
elif value:
|
||
row[key] = str(value)
|
||
return rows
|
||
|
||
def get_member_messages_on_date(self, group_id: str, wxid: str, target_date: str, limit: int = 120) -> List[Dict]:
|
||
"""获取成员在某一天的消息"""
|
||
sql = """
|
||
SELECT timestamp, sender, content, message_type
|
||
FROM messages
|
||
WHERE DATE(timestamp) = %s
|
||
AND group_id = %s
|
||
AND sender = %s
|
||
AND message_type IN (1, 49)
|
||
AND CHAR_LENGTH(content) BETWEEN 2 AND 500
|
||
AND content NOT LIKE '/%%'
|
||
ORDER BY timestamp ASC
|
||
LIMIT %s
|
||
"""
|
||
return self.execute_query(sql, (target_date, group_id, wxid, limit)) or []
|
||
|
||
def get_member_messages_for_group_date(self, group_id: str, target_date: str, limit: int = 5000) -> List[Dict]:
|
||
"""获取群在某一天的全部文本消息"""
|
||
sql = """
|
||
SELECT timestamp, sender, content, message_type
|
||
FROM messages
|
||
WHERE DATE(timestamp) = %s
|
||
AND group_id = %s
|
||
AND sender IS NOT NULL
|
||
AND sender <> ''
|
||
AND message_type IN (1, 49)
|
||
AND CHAR_LENGTH(content) BETWEEN 2 AND 500
|
||
AND content NOT LIKE '/%%'
|
||
ORDER BY timestamp ASC
|
||
LIMIT %s
|
||
"""
|
||
return self.execute_query(sql, (target_date, group_id, limit)) or []
|
||
|
||
def get_recent_group_chat_messages(self, group_id: str, limit: int = 20) -> List[Dict]:
|
||
"""获取群聊最近消息"""
|
||
sql = """
|
||
SELECT timestamp, sender, content, message_type, attachment_url, message_id, message_xml, message_thumb, image_path
|
||
FROM messages
|
||
WHERE group_id = %s
|
||
ORDER BY timestamp DESC
|
||
LIMIT %s
|
||
"""
|
||
results = self.execute_query(sql, (group_id, limit)) or []
|
||
return list(reversed(results))
|
||
|
||
def get_recent_personal_messages(self, wxid: str, limit: int = 20) -> List[Dict]:
|
||
"""获取私聊最近归档消息
|
||
|
||
说明:
|
||
当前消息表没有可靠的 to_user 会话维度,这里只返回目标联系人发来的、
|
||
且未归属到群聊的消息,用于通讯录内的“尽力模式”历史预览。
|
||
"""
|
||
sql = """
|
||
SELECT timestamp, sender, content, message_type, attachment_url, message_id, message_xml, message_thumb, image_path
|
||
FROM messages
|
||
WHERE (group_id IS NULL OR group_id = '')
|
||
AND sender = %s
|
||
ORDER BY timestamp DESC
|
||
LIMIT %s
|
||
"""
|
||
results = self.execute_query(sql, (wxid, limit)) or []
|
||
return list(reversed(results))
|
||
|
||
def get_message_count_by_date(self, date: str) -> List[Dict]:
|
||
"""获取指定日期的消息统计"""
|
||
sql = """
|
||
SELECT group_id, sender, COUNT(*) as count
|
||
FROM messages
|
||
WHERE DATE(timestamp) = %s
|
||
GROUP BY group_id, sender
|
||
"""
|
||
return self.execute_query(sql, (date,)) or []
|
||
|
||
def get_speech_ranking(self, date: str, group_id: str, limit: int = 20) -> List[Dict]:
|
||
"""获取指定日期和群组的发言排名"""
|
||
sql = """
|
||
SELECT wx_id, count AS speech_count
|
||
FROM speech_counts
|
||
WHERE date = %s
|
||
AND group_id = %s
|
||
GROUP BY wx_id
|
||
ORDER BY count DESC
|
||
LIMIT %s
|
||
"""
|
||
params = (date, group_id, limit)
|
||
results = self.execute_query(sql, params)
|
||
return results or []
|
||
|
||
def insert_speech_count(self, group_id: str, wx_id: str, date: str, count: int) -> bool:
|
||
"""插入发言统计数据
|
||
|
||
Args:
|
||
group_id: 群组ID
|
||
wx_id: 微信ID
|
||
date: 日期,格式为 YYYY-MM-DD
|
||
count: 发言计数
|
||
|
||
Returns:
|
||
是否成功插入
|
||
"""
|
||
sql = """
|
||
INSERT INTO speech_counts (group_id, wx_id, date, count)
|
||
VALUES (%s, %s, %s, %s)
|
||
ON DUPLICATE KEY UPDATE count = VALUES(count)
|
||
"""
|
||
params = (group_id, wx_id, date, count)
|
||
return self.execute_update(sql, params)
|
||
|
||
def get_message_trend(self, group_id: str, days: int = 7) -> List[Dict]:
|
||
"""获取指定群组的消息趋势数据
|
||
|
||
Args:
|
||
group_id: 群组ID
|
||
days: 获取最近几天的数据,默认7天
|
||
|
||
Returns:
|
||
包含日期和消息数量的列表
|
||
"""
|
||
sql = """
|
||
SELECT
|
||
DATE(timestamp) as date,
|
||
COUNT(*) as message_count
|
||
FROM messages
|
||
WHERE group_id = %s
|
||
AND timestamp >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
|
||
GROUP BY DATE(timestamp)
|
||
ORDER BY date
|
||
"""
|
||
return self.execute_query(sql, (group_id, days)) or []
|
||
|
||
def get_group_member_message_ranking(self, group_id: str, start_time: datetime,
|
||
end_time: datetime, limit: int = 10) -> List[Dict]:
|
||
"""获取群成员发言排行"""
|
||
sql = """
|
||
SELECT
|
||
sender,
|
||
COUNT(*) AS message_count,
|
||
MAX(timestamp) AS last_message_time
|
||
FROM messages
|
||
WHERE timestamp >= %s
|
||
AND timestamp <= %s
|
||
AND group_id = %s
|
||
AND sender IS NOT NULL
|
||
AND sender <> ''
|
||
GROUP BY sender
|
||
ORDER BY message_count DESC, last_message_time DESC
|
||
LIMIT %s
|
||
"""
|
||
params = (
|
||
start_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
end_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
group_id,
|
||
limit,
|
||
)
|
||
rows = self.execute_query(sql, params) or []
|
||
for row in rows:
|
||
dt = row.get("last_message_time")
|
||
if isinstance(dt, datetime):
|
||
row["last_message_time"] = dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
return rows
|
||
|
||
def get_group_hourly_distribution(self, group_id: str, days: int = 30) -> List[Dict]:
|
||
"""获取群消息小时分布"""
|
||
sql = """
|
||
SELECT
|
||
HOUR(timestamp) AS hour_slot,
|
||
COUNT(*) AS message_count
|
||
FROM messages
|
||
WHERE group_id = %s
|
||
AND timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
|
||
GROUP BY HOUR(timestamp)
|
||
ORDER BY hour_slot
|
||
"""
|
||
rows = self.execute_query(sql, (group_id, days)) or []
|
||
return [
|
||
{
|
||
"hour": int(row.get("hour_slot") or 0),
|
||
"message_count": int(row.get("message_count") or 0),
|
||
}
|
||
for row in rows
|
||
]
|
||
|
||
def get_group_last_message(self, group_id: str) -> Optional[Dict]:
|
||
"""获取群最后一条消息信息"""
|
||
sql = """
|
||
SELECT sender, content, message_type, timestamp
|
||
FROM messages
|
||
WHERE group_id = %s
|
||
ORDER BY timestamp DESC
|
||
LIMIT 1
|
||
"""
|
||
row = self.execute_query(sql, (group_id,), fetch_one=True)
|
||
if row and isinstance(row.get("timestamp"), datetime):
|
||
row["timestamp"] = row["timestamp"].strftime("%Y-%m-%d %H:%M:%S")
|
||
return row
|
||
|
||
def get_messages_by_filter(self, group_id=None, start_date=None, end_date=None,
|
||
search_text=None, page=1, page_size=20) -> Dict:
|
||
"""按条件筛选消息并支持分页和模糊搜索
|
||
|
||
Args:
|
||
group_id: 群组ID,可选
|
||
start_date: 开始日期,格式为YYYY-MM-DD,可选
|
||
end_date: 结束日期,格式为YYYY-MM-DD,可选
|
||
search_text: 搜索文本,可选,用于模糊搜索消息内容
|
||
page: 页码,从1开始
|
||
page_size: 每页记录数
|
||
|
||
Returns:
|
||
包含消息列表和总记录数的字典
|
||
"""
|
||
# 构建基础SQL查询
|
||
sql_count = "SELECT COUNT(*) as total FROM messages WHERE 1=1 "
|
||
sql_data = """
|
||
SELECT id, group_id, timestamp, sender, content, message_type,
|
||
attachment_url, message_id, message_xml, message_thumb, image_path
|
||
FROM messages
|
||
WHERE 1=1
|
||
"""
|
||
|
||
# 构建参数列表
|
||
params = []
|
||
|
||
# 添加筛选条件
|
||
if group_id:
|
||
sql_count += " AND group_id = %s "
|
||
sql_data += " AND group_id = %s "
|
||
params.append(group_id)
|
||
|
||
if start_date:
|
||
sql_count += " AND DATE(timestamp) >= %s "
|
||
sql_data += " AND DATE(timestamp) >= %s "
|
||
params.append(start_date)
|
||
|
||
if end_date:
|
||
sql_count += " AND DATE(timestamp) <= %s "
|
||
sql_data += " AND DATE(timestamp) <= %s "
|
||
params.append(end_date)
|
||
|
||
if search_text:
|
||
sql_count += " AND content LIKE %s "
|
||
sql_data += " AND content LIKE %s "
|
||
params.append(f"%{search_text}%")
|
||
|
||
# 添加排序和分页
|
||
sql_data += " ORDER BY timestamp DESC "
|
||
sql_data += " LIMIT %s OFFSET %s "
|
||
|
||
# 计算分页参数
|
||
offset = (page - 1) * page_size
|
||
data_params = params.copy()
|
||
data_params.extend([page_size, offset])
|
||
|
||
# 执行查询
|
||
count_result = self.execute_query(sql_count, params)
|
||
total = count_result[0]['total'] if count_result else 0
|
||
|
||
messages = self.execute_query(sql_data, data_params) or []
|
||
|
||
return {
|
||
'total': total,
|
||
'page': page,
|
||
'page_size': page_size,
|
||
'total_pages': (total + page_size - 1) // page_size,
|
||
'messages': messages
|
||
}
|
||
|
||
def update_message_image_path(self, message_id, image_base64str):
|
||
"""
|
||
更新消息的图片路径
|
||
|
||
Args:
|
||
message_id: 消息ID
|
||
image_base64str: 图片base64内容
|
||
|
||
Returns:
|
||
bool: 更新成功返回True,否则返回False
|
||
"""
|
||
try:
|
||
# 构建SQL语句
|
||
sql = """
|
||
UPDATE messages
|
||
SET message_thumb = %s
|
||
WHERE message_id = %s
|
||
"""
|
||
params = (image_base64str, message_id)
|
||
|
||
# 执行更新操作
|
||
result = self.execute_update(sql, params)
|
||
return result
|
||
except Exception as e:
|
||
# 使用已有的日志记录方式
|
||
print(f"更新消息图片路径出错: {e}")
|
||
return False
|
||
|
||
def update_message_image_file_path(self, message_id, image_path):
|
||
try:
|
||
sql = """
|
||
UPDATE messages
|
||
SET image_path = %s
|
||
WHERE message_id = %s
|
||
AND image_path IS NULL
|
||
"""
|
||
params = (image_path, message_id)
|
||
result = self.execute_update(sql, params)
|
||
return result
|
||
except Exception as e:
|
||
print(f"更新消息图片文件路径出错: {e}")
|
||
return False
|
||
|
||
|
||
def get_hourly_message_trend(self, group_id: str = None, days: int = 1) -> List[Dict]:
|
||
"""获取指定群组的按小时消息趋势数据
|
||
|
||
Args:
|
||
group_id: 群组ID,如果为None则获取所有群组的数据
|
||
days: 获取最近几天的数据,默认1天
|
||
|
||
Returns:
|
||
包含小时和消息数量的列表
|
||
"""
|
||
sql = """
|
||
SELECT
|
||
DATE_FORMAT(timestamp, '%Y-%m-%d %H:00') as hour_slot,
|
||
COUNT(*) as message_count
|
||
FROM messages
|
||
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
|
||
"""
|
||
|
||
params = [days]
|
||
|
||
# 如果指定了群组ID,则添加群组筛选条件
|
||
if group_id:
|
||
sql += "AND group_id = %s "
|
||
params.append(group_id)
|
||
|
||
# 按小时分组并排序
|
||
sql += """
|
||
GROUP BY hour_slot
|
||
ORDER BY hour_slot
|
||
"""
|
||
|
||
return self.execute_query(sql, tuple(params)) or []
|
||
|
||
def get_pending_media_messages(self, minutes_ago: int = 10, limit: int = 50,
|
||
group_ids: Optional[List[str]] = None) -> List[Dict]:
|
||
"""获取最近N分钟内未处理图片/表情消息(image_path IS NULL)
|
||
|
||
Args:
|
||
minutes_ago: 查询最近多少分钟的消息,默认10分钟
|
||
limit: 每次最多处理多少条,默认50条
|
||
group_ids: 限制只查询指定群组,传空列表则直接返回空
|
||
|
||
Returns:
|
||
包含消息ID、群ID、消息XML等信息的列表
|
||
"""
|
||
if group_ids is not None and not group_ids:
|
||
return []
|
||
|
||
sql = """
|
||
SELECT message_id, group_id, sender, message_type, message_xml, timestamp, attachment_url
|
||
FROM messages
|
||
WHERE message_type IN ('3', '47', '1048625', '1090519089')
|
||
AND image_path IS NULL
|
||
AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE)
|
||
AND attachment_url IS NOT NULL
|
||
AND attachment_url != ''
|
||
ORDER BY timestamp ASC
|
||
LIMIT %s
|
||
"""
|
||
params: List = [minutes_ago]
|
||
if group_ids is not None:
|
||
placeholders = ", ".join(["%s"] * len(group_ids))
|
||
sql = sql.replace("ORDER BY timestamp ASC", f"AND group_id IN ({placeholders})\n ORDER BY timestamp ASC")
|
||
params.extend(group_ids)
|
||
params.append(limit)
|
||
return self.execute_query(sql, params) or []
|
||
|
||
def get_pending_image_messages(self, minutes_ago: int = 10, limit: int = 50) -> List[Dict]:
|
||
"""兼容旧方法名,内部复用统一媒体待处理查询"""
|
||
return self.get_pending_media_messages(minutes_ago, limit)
|
||
|
||
def get_recent_emoji_assets(self, limit: int = 200) -> List[Dict]:
|
||
"""获取近期表情消息记录(用于提取 md5 + len 发送参数)"""
|
||
sql = """
|
||
SELECT message_id, group_id, sender, timestamp, message_type, attachment_url, image_path
|
||
FROM messages
|
||
WHERE message_type IN ('47', '1048625', '1090519089')
|
||
AND attachment_url IS NOT NULL
|
||
AND attachment_url <> ''
|
||
ORDER BY timestamp DESC
|
||
LIMIT %s
|
||
"""
|
||
return self.execute_query(sql, (limit,)) or []
|
||
|
||
def get_emoji_asset_by_md5(self, md5: str) -> Optional[Dict]:
|
||
"""根据表情 md5 精确查找最近一条表情消息。
|
||
|
||
说明:
|
||
1. 后台聊天发送表情时,前端偶尔只能拿到 md5,拿不到 total_length;
|
||
2. wechat_ipad 的 SendEmoji 接口要求同时提供 Md5 和 TotalLen;
|
||
3. 这里直接从历史消息表里按 md5 反查最近一条原始记录,给发送接口补全长度。
|
||
"""
|
||
sql = """
|
||
SELECT message_id, group_id, sender, timestamp, message_type, attachment_url, image_path
|
||
FROM messages
|
||
WHERE message_type IN ('47', '1048625', '1090519089')
|
||
AND attachment_url IS NOT NULL
|
||
AND attachment_url <> ''
|
||
AND attachment_url LIKE %s
|
||
ORDER BY timestamp DESC
|
||
LIMIT 1
|
||
"""
|
||
return self.execute_query(sql, (f'%md5="{md5}"%',), fetch_one=True)
|
||
|
||
def get_messages_by_date_range(self, group_id: str, start_date: str, end_date: str = None,
|
||
min_content_length: int = 6, max_results: int = 5000) -> List[Dict]:
|
||
"""按日期范围获取消息(支持按天总结)
|
||
|
||
Args:
|
||
group_id: 群组ID
|
||
start_date: 开始日期,格式 YYYY-MM-DD
|
||
end_date: 结束日期,格式 YYYY-MM-DD,如果为None则使用start_date当天
|
||
min_content_length: 最小内容长度
|
||
max_results: 最多返回多少条消息,防止数据过多(默认5000条,足够总结使用)
|
||
|
||
Returns:
|
||
消息列表
|
||
"""
|
||
if end_date is None:
|
||
end_date = start_date
|
||
|
||
sql = """
|
||
SELECT timestamp, sender, content, message_type
|
||
FROM messages
|
||
WHERE DATE(timestamp) >= %s
|
||
AND DATE(timestamp) <= %s
|
||
AND group_id = %s
|
||
AND message_type IN (1, 49)
|
||
AND LENGTH(content) > %s
|
||
AND CHAR_LENGTH(content) < 300
|
||
AND content NOT LIKE '/%'
|
||
ORDER BY timestamp ASC
|
||
LIMIT %s
|
||
"""
|
||
params = (start_date, end_date, group_id, min_content_length, max_results)
|
||
return self.execute_query(sql, params) or []
|
||
|
||
def get_messages_for_summary(self, group_id: str, hours_ago: int = 8,
|
||
min_messages: int = 50,
|
||
max_hours: int = 48,
|
||
max_results: int = 5000) -> List[Dict]:
|
||
"""智能获取用于总结的消息(自动调整时间范围)
|
||
|
||
Args:
|
||
group_id: 群组ID
|
||
hours_ago: 默认查询最近多少小时
|
||
min_messages: 最少需要多少条消息,如果不足会扩大时间范围
|
||
max_hours: 最大查询多少小时内的消息
|
||
max_results: 最多返回多少条消息(默认5000条,确保有足够数据)
|
||
|
||
Returns:
|
||
消息列表
|
||
"""
|
||
# 先尝试默认时间范围
|
||
messages = self.get_recent_messages(group_id, hours_ago=hours_ago)
|
||
|
||
# 如果消息不足,逐步扩大时间范围
|
||
current_hours = hours_ago
|
||
while len(messages) < min_messages and current_hours < max_hours:
|
||
current_hours += 8 # 每次增加8小时
|
||
messages = self.get_recent_messages(group_id, hours_ago=current_hours)
|
||
|
||
# 限制最大返回数量(5000条足以覆盖1-2天的活跃群聊)
|
||
return messages[:max_results] if messages else []
|
||
|
||
def get_messages_by_date_range(self, group_id: str, start_time: datetime, end_time: datetime) -> List[Dict]:
|
||
"""获取指定时间范围内的消息
|
||
|
||
Args:
|
||
group_id: 群组ID
|
||
start_time: 开始时间
|
||
end_time: 结束时间
|
||
|
||
Returns:
|
||
消息列表
|
||
"""
|
||
sql = """
|
||
SELECT timestamp, sender, content, message_type
|
||
FROM messages
|
||
WHERE timestamp >= %s
|
||
AND timestamp <= %s
|
||
AND message_type in (1, 49)
|
||
AND group_id = %s
|
||
AND length(content) > 6
|
||
AND CHAR_LENGTH(content) < 300
|
||
AND content NOT LIKE '/%'
|
||
ORDER BY timestamp ASC
|
||
"""
|
||
params = (start_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
end_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
group_id)
|
||
return self.execute_query(sql, params) or []
|
||
|
||
def count_messages_by_date_range(self, group_id: str, start_time: datetime, end_time: datetime) -> int:
|
||
"""统计指定时间范围内的消息数量
|
||
|
||
Args:
|
||
group_id: 群组ID
|
||
start_time: 开始时间
|
||
end_time: 结束时间
|
||
|
||
Returns:
|
||
消息数量
|
||
"""
|
||
sql = """
|
||
SELECT COUNT(*) as count
|
||
FROM messages
|
||
WHERE timestamp >= %s
|
||
AND timestamp <= %s
|
||
AND message_type in (1, 49)
|
||
AND group_id = %s
|
||
AND length(content) > 6
|
||
AND CHAR_LENGTH(content) < 300
|
||
AND content NOT LIKE '/%'
|
||
"""
|
||
params = (start_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
end_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
group_id)
|
||
result = self.execute_query(sql, params)
|
||
return result[0]['count'] if result else 0
|
||
|
||
def get_message_stats_by_date_range(self, group_id: str, start_time: datetime, end_time: datetime) -> Dict:
|
||
"""统计指定时间范围内的群消息概览"""
|
||
sql = """
|
||
SELECT
|
||
COUNT(*) AS total_count,
|
||
COUNT(DISTINCT sender) AS participant_count,
|
||
SUM(CASE WHEN message_type = 1 THEN 1 ELSE 0 END) AS text_count,
|
||
SUM(CASE WHEN message_type = 3 THEN 1 ELSE 0 END) AS image_count,
|
||
SUM(CASE WHEN message_type IN (43, 62) THEN 1 ELSE 0 END) AS video_count,
|
||
SUM(CASE WHEN message_type = 49 THEN 1 ELSE 0 END) AS link_count,
|
||
SUM(CASE WHEN message_type IN (47, 1048625, 1090519089) THEN 1 ELSE 0 END) AS emoji_count
|
||
FROM messages
|
||
WHERE timestamp >= %s
|
||
AND timestamp <= %s
|
||
AND group_id = %s
|
||
AND sender IS NOT NULL
|
||
AND sender <> ''
|
||
"""
|
||
params = (
|
||
start_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
end_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
group_id,
|
||
)
|
||
result = self.execute_query(sql, params, fetch_one=True) or {}
|
||
return {
|
||
"total_count": int(result.get("total_count") or 0),
|
||
"participant_count": int(result.get("participant_count") or 0),
|
||
"text_count": int(result.get("text_count") or 0),
|
||
"image_count": int(result.get("image_count") or 0),
|
||
"video_count": int(result.get("video_count") or 0),
|
||
"link_count": int(result.get("link_count") or 0),
|
||
"emoji_count": int(result.get("emoji_count") or 0),
|
||
}
|