367 lines
16 KiB
Python
367 lines
16 KiB
Python
from datetime import datetime, timedelta
|
||
import xml.etree.ElementTree as ET
|
||
import logging
|
||
import concurrent.futures # 添加线程池支持
|
||
import os
|
||
from wcferry import WxMsg, Wcf
|
||
|
||
from db.connection import DBConnectionManager
|
||
from db.message_storage import MessageStorageDB
|
||
# 导入积分系统
|
||
from db.points_db import PointsDBOperator, PointSource
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger("MessageStorage")
|
||
|
||
|
||
class MessageStorage:
|
||
|
||
def __init__(self, wcf: Wcf = None):
|
||
# 获取数据库连接管理器的单例
|
||
self.db_manager = DBConnectionManager.get_instance()
|
||
self.message_db = MessageStorageDB(self.db_manager)
|
||
|
||
self.points_db = PointsDBOperator(self.db_manager)
|
||
# 初始化本地缓存字典,使用 group_id 作为键
|
||
self.local_membercounts = {}
|
||
self.local_members = {}
|
||
# 创建线程池,用于异步存储消息
|
||
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
|
||
# 用于跟踪异步任务的列表
|
||
self.pending_tasks = []
|
||
|
||
# 保存WCF实例,用于图片处理
|
||
self.wcf = wcf
|
||
|
||
# 图片处理相关初始化
|
||
self.image_executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) # 专用于图片处理的线程池
|
||
self.image_tasks = []
|
||
# 修改为项目根目录下的 static/images
|
||
self.image_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static", "images")
|
||
# 确保图片存储目录存在
|
||
if not os.path.exists(self.image_dir):
|
||
os.makedirs(self.image_dir, exist_ok=True)
|
||
logger.info(f"图片存储目录: {self.image_dir}")
|
||
|
||
def process_message(self, message: WxMsg):
|
||
# 示例message字符串
|
||
current_date = datetime.now().strftime('%Y-%m-%d')
|
||
# 生成Redis key
|
||
key = f"{message.roomid}:{message.sender}:{current_date}:count"
|
||
# 获取 Redis 连接
|
||
redis_conn = self.db_manager.get_redis_connection()
|
||
# 使用Redis哈希(或字符串)增加发言次数
|
||
redis_conn.hincrby(key, 'count', 1) # 这里使用哈希,但也可以考虑用字符串的INCR操作
|
||
# 设置时效为48小时
|
||
redis_conn.expire(key, 86400 * 2)
|
||
# 或者使用字符串:r.incr(key) # 如果只存储一个整数值,字符串类型可能更简单
|
||
|
||
def archive_message(self, msg: WxMsg):
|
||
"""异步存档消息,防止堵塞主线程"""
|
||
# 提交任务到线程池
|
||
future = self.executor.submit(self._archive_message_task, msg)
|
||
# 可选:添加回调函数处理完成后的操作
|
||
future.add_done_callback(self._archive_callback)
|
||
# 将任务添加到待处理列表
|
||
self.pending_tasks.append(future)
|
||
# 清理已完成的任务
|
||
self._cleanup_completed_tasks()
|
||
|
||
def _archive_message_task(self, msg: WxMsg):
|
||
"""实际执行消息存档的任务函数"""
|
||
try:
|
||
# 使用 MessageStorageDB 类存档消息
|
||
result = self.message_db.archive_message(msg)
|
||
return {
|
||
'success': result,
|
||
'roomid': msg.roomid,
|
||
'sender': msg.sender,
|
||
'content': msg.content, # 添加消息内容
|
||
'message_id': msg.id # 添加消息ID
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"存档消息出错: {e}")
|
||
return {
|
||
'success': False,
|
||
'roomid': msg.roomid,
|
||
'sender': msg.sender,
|
||
'content': msg.content, # 添加消息内容
|
||
'message_id': msg.id, # 添加消息ID
|
||
'error': str(e)
|
||
}
|
||
|
||
def process_image(self, msg: WxMsg):
|
||
"""异步处理图片消息,与消息存档分离"""
|
||
if msg.type != 3 or not self.wcf: # 不是图片消息或没有WCF实例
|
||
return False
|
||
|
||
# 提交任务到图片处理线程池
|
||
future = self.image_executor.submit(self._process_image_task, msg)
|
||
# 添加回调函数
|
||
future.add_done_callback(self._process_image_callback)
|
||
# 将任务添加到待处理列表
|
||
self.image_tasks.append(future)
|
||
# 清理已完成的任务
|
||
self._cleanup_completed_tasks()
|
||
return True
|
||
|
||
def _process_image_task(self, msg: WxMsg):
|
||
"""实际执行图片处理的任务函数"""
|
||
try:
|
||
# 使用wcf下载图片,确保图片存在
|
||
if self.wcf and msg.id:
|
||
# 创建按群ID或个人wxid分割的目录
|
||
target_dir = os.path.join(self.image_dir, msg.roomid if msg.roomid else msg.sender)
|
||
# 确保目标目录存在
|
||
if not os.path.exists(target_dir):
|
||
os.makedirs(target_dir, exist_ok=True)
|
||
|
||
# 尝试使用wcf下载图片到分组后的目录
|
||
download_path = self.wcf.download_image(msg.id, msg.extra, target_dir)
|
||
if download_path:
|
||
logger.info(f"使用wcf下载图片成功: {msg.id} -> {download_path}")
|
||
|
||
# 直接使用下载后的路径更新数据库
|
||
self.message_db.update_message_image_path(msg.id, download_path)
|
||
|
||
return {
|
||
'success': True,
|
||
'message_id': msg.id,
|
||
'roomid': msg.roomid,
|
||
'sender': msg.sender,
|
||
'file_path': download_path
|
||
}
|
||
else:
|
||
return {
|
||
'success': False,
|
||
'message_id': msg.id,
|
||
'roomid': msg.roomid,
|
||
'sender': msg.sender,
|
||
'error': "图片下载失败"
|
||
}
|
||
else:
|
||
return {
|
||
'success': False,
|
||
'message_id': msg.id,
|
||
'roomid': msg.roomid,
|
||
'sender': msg.sender,
|
||
'error': "WCF实例不存在或消息ID无效"
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"图片处理出错: {msg.id}, 错误: {e}")
|
||
return {
|
||
'success': False,
|
||
'message_id': msg.id,
|
||
'roomid': msg.roomid,
|
||
'sender': msg.sender,
|
||
'error': str(e)
|
||
}
|
||
|
||
def _process_image_callback(self, future):
|
||
"""处理异步图片处理任务完成后的回调"""
|
||
try:
|
||
result = future.result()
|
||
if result['success']:
|
||
skipped_info = " (已存在)" if result.get('skipped') else ""
|
||
logger.info(f"图片处理成功{skipped_info}: {result['roomid']}:{result['sender']}:{result['message_id']}")
|
||
else:
|
||
error_msg = result.get('error', '未知错误')
|
||
logger.error(
|
||
f"图片处理失败: {result.get('roomid', '')}:{result.get('sender', '')}:{result.get('message_id', '')} - {error_msg}")
|
||
except Exception as e:
|
||
logger.error(f"处理图片回调时出错: {e}")
|
||
|
||
def _archive_callback(self, future):
|
||
"""处理异步存档任务完成后的回调"""
|
||
try:
|
||
result = future.result()
|
||
if result['success']:
|
||
# 修改日志输出,包含消息内容
|
||
compressed = result['content'].replace('\n', '').replace('\r', '')
|
||
logger.info(f"archive_success: {result['roomid']}:{result['sender']}: {compressed}")
|
||
else:
|
||
error_msg = result.get('error', '未知错误')
|
||
logger.error(f"archive_fail: {result['roomid']}:{result['sender']} - {error_msg}")
|
||
except Exception as e:
|
||
logger.error(f"处理存档回调时出错: {e}")
|
||
|
||
def _cleanup_completed_tasks(self):
|
||
"""清理已完成的任务,防止内存泄漏"""
|
||
# 只有当任务数量超过阈值时才进行清理,减少频繁操作
|
||
if len(self.pending_tasks) > 20:
|
||
# 过滤出已完成的任务
|
||
completed_tasks = [task for task in self.pending_tasks if task.done()]
|
||
# 从待处理列表中移除已完成的任务
|
||
for task in completed_tasks:
|
||
self.pending_tasks.remove(task)
|
||
|
||
# 如果待处理任务过多,记录警告日志
|
||
if len(self.pending_tasks) > 100:
|
||
logger.warning(f"待处理的存档任务数量过多: {len(self.pending_tasks)}")
|
||
|
||
# 只有当图片任务数量超过阈值时才进行清理
|
||
if len(self.image_tasks) > 10:
|
||
# 清理已完成的图片处理任务
|
||
completed_image_tasks = [task for task in self.image_tasks if task.done()]
|
||
for task in completed_image_tasks:
|
||
self.image_tasks.remove(task)
|
||
|
||
# 如果待处理任务过多,记录警告日志
|
||
if len(self.image_tasks) > 50:
|
||
logger.warning(f"待处理的图片处理任务数量过多: {len(self.image_tasks)}")
|
||
|
||
def write_to_db(self):
|
||
"""从Redis读取发言统计数据并写入数据库"""
|
||
# 获取Redis连接
|
||
redis_conn = self.db_manager.get_redis_connection()
|
||
|
||
# 获取当前日期的前一天
|
||
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
|
||
|
||
# 遍历Redis中所有与昨天日期相关的key,并写入数据库
|
||
for key_item in redis_conn.keys(f"*:*:{yesterday}:count"):
|
||
# 检查key是否为字节类型,如果是则解码
|
||
key = key_item.decode('utf-8') if isinstance(key_item, bytes) else key_item
|
||
parts = key.split(':')
|
||
group_id, wx_id, _date = parts[0], parts[1], parts[2] # _date应该是yesterday
|
||
|
||
# 获取计数值
|
||
count_bytes = redis_conn.hget(key, 'count')
|
||
count = int(count_bytes) if isinstance(count_bytes, bytes) else int(count_bytes) if count_bytes else 0
|
||
|
||
# 使用MessageStorageDB插入数据
|
||
try:
|
||
result = self.message_db.insert_speech_count(group_id, wx_id, yesterday, count)
|
||
if result:
|
||
logging.info(f"成功写入发言统计: {group_id}, {wx_id}, {yesterday}, {count}")
|
||
else:
|
||
logging.error(f"写入发言统计失败: {group_id}, {wx_id}, {yesterday}, {count}")
|
||
except Exception as e:
|
||
logging.error(f"写入发言统计出错: {e}")
|
||
|
||
def generate_and_send_ranking(self, groupId, allContacts: dict):
|
||
"""生成并发送群聊发言排名,并根据排名发放积分奖励"""
|
||
try:
|
||
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
|
||
|
||
# 使用数据库操作类获取排名数据
|
||
results = self.message_db.get_speech_ranking(yesterday, groupId, limit=20)
|
||
|
||
if not results:
|
||
logging.info(f"没有找到 {yesterday} 的群聊 {groupId} 发言记录")
|
||
return f"📊 {yesterday} 没有发言记录"
|
||
|
||
# 格式化输出字符串,添加emoji和美化格式
|
||
ranking_str = f"🏆 {yesterday} 发言排行榜 🏆\n"
|
||
|
||
# 为不同名次添加不同的奖杯和样式,并发放积分
|
||
for rank, result in enumerate(results, start=1):
|
||
username = result['wx_id']
|
||
speech_count = result['speech_count']
|
||
display_name = allContacts.get(username, username)
|
||
|
||
# 根据排名发放不同数量的积分
|
||
reward_points = 0
|
||
if rank == 1:
|
||
reward_points = 50
|
||
ranking_str += f"🥇🐲 {rank}.{display_name}: {speech_count}次 🔥 +{reward_points}积分\n"
|
||
elif rank == 2:
|
||
reward_points = 30
|
||
ranking_str += f"🥈 {rank}.{display_name}: {speech_count}次 ✨ +{reward_points}积分\n"
|
||
elif rank == 3:
|
||
reward_points = 20
|
||
ranking_str += f"🥉 {rank}.{display_name}: {speech_count}次 👏 +{reward_points}积分\n"
|
||
elif rank <= 10:
|
||
reward_points = 10
|
||
ranking_str += f"🌟 {rank}.{display_name}: {speech_count}次 +{reward_points}积分\n"
|
||
else:
|
||
reward_points = 5
|
||
ranking_str += f"👍 {rank}.{display_name}: {speech_count}次 +{reward_points}积分\n"
|
||
|
||
# 发放积分奖励
|
||
if reward_points > 0:
|
||
success, _ = self.points_db.add_points(
|
||
username,
|
||
groupId,
|
||
reward_points,
|
||
PointSource.OTHER,
|
||
f"{yesterday}发言排行第{rank}名奖励"
|
||
)
|
||
if not success:
|
||
logging.error(f"发放积分失败: {username}, {groupId}, {reward_points}")
|
||
|
||
logging.info(f"成功生成 {yesterday} 的群聊 {groupId} 发言排名并发放积分")
|
||
return ranking_str
|
||
|
||
except Exception as e:
|
||
logging.error(f"生成发言排名出错: {e}")
|
||
return f"❌ 生成发言排名出错: {e}"
|
||
|
||
def get_messages(self, group_id, all_contacts: dict):
|
||
try:
|
||
# 获取 Redis 连接
|
||
redis_conn = self.db_manager.get_redis_connection()
|
||
|
||
# 获取 redis 中的上次总结时间,本次从上次开始算,若没有,则从 8 小时之前开始计算
|
||
key = f"{group_id}:summary_time"
|
||
last_summary_time = redis_conn.get(key)
|
||
logger.info(f"上次总结时间: {last_summary_time}")
|
||
|
||
current_time = datetime.now()
|
||
current_date = current_time.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
if not last_summary_time:
|
||
# 获取当前时间并计算 8 小时前的时间
|
||
eight_hours_ago = current_time - timedelta(hours=8)
|
||
last_summary_time = eight_hours_ago.strftime('%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
# 如果 Redis 返回值为字节类型,转换为字符串
|
||
if isinstance(last_summary_time, bytes):
|
||
last_summary_time = last_summary_time.decode('utf-8')
|
||
|
||
# 检查 redis 中的时间与当前时间差是否小于 3 小时
|
||
last_summary_time_obj = datetime.strptime(last_summary_time, '%Y-%m-%d %H:%M:%S')
|
||
time_diff = current_time - last_summary_time_obj
|
||
|
||
if time_diff < timedelta(hours=3):
|
||
# 小于 3 小时,取 8 小时前
|
||
last_summary_time = (current_time - timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
|
||
elif time_diff > timedelta(days=1):
|
||
# 大于 24 小时,取 10 小时前
|
||
last_summary_time = (current_time - timedelta(hours=10)).strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
# 更新 Redis 存储的当前时间
|
||
redis_conn.set(key, current_date)
|
||
|
||
# 使用 MessageStorageDB 类获取最近消息
|
||
hours_ago = int(
|
||
(current_time - datetime.strptime(last_summary_time, '%Y-%m-%d %H:%M:%S')).total_seconds() / 3600) + 1
|
||
messages = self.message_db.get_recent_messages(group_id, hours_ago=hours_ago)
|
||
# 构建最终的结果字符串
|
||
result = []
|
||
for msg in messages:
|
||
timestamp, sender, content, message_type = msg['timestamp'], msg['sender'], msg['content'], msg[
|
||
'message_type']
|
||
try:
|
||
if message_type == 49: # 应用消息类型
|
||
# 其他类型的应用消息,解析 XML 提取标题
|
||
root = ET.fromstring(content)
|
||
title_elem = root.find('.//title')
|
||
if title_elem is not None:
|
||
content = title_elem.text
|
||
except Exception as e:
|
||
logger.error(f"解析消息类型49出错: {e}")
|
||
|
||
sender_name = all_contacts.get(sender, sender) # 获取发送者的名字,若找不到则使用原 ID
|
||
result.append(f"{timestamp},{sender_name},{content}")
|
||
|
||
result_str = "\n".join(result) # 将结果拼接为最终字符串
|
||
return result_str
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取消息出错: {e}")
|
||
return ""
|