Files
abot/utils/wechat/message_to_db.py
2025-04-28 11:44:56 +08:00

380 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime, timedelta
import xml.etree.ElementTree as ET
import logging
import concurrent.futures # 添加线程池支持
import os
from gewechat_client import GewechatClient
from db.connection import DBConnectionManager
from db.message_storage import MessageStorageDB
# 导入积分系统
from db.points_db import PointsDBOperator, PointSource
from gewechat.call_back_message.message import WxMessage
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("MessageStorage")
class MessageStorage:
def __init__(self, client: GewechatClient = None):
# 获取数据库连接管理器的单例
self.db_manager = DBConnectionManager.get_instance()
self.message_db = MessageStorageDB(self.db_manager)
self.points_db = PointsDBOperator(self.db_manager)
# 初始化本地缓存字典,使用 group_id 作为键
self.local_membercounts = {}
self.local_members = {}
# 创建线程池,用于异步存储消息
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
# 用于跟踪异步任务的列表
self.pending_tasks = []
# 保存WCF实例用于图片处理
self.client = client
# 图片处理相关初始化
self.image_executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) # 专用于图片处理的线程池
self.image_tasks = []
# 修改为项目根目录下的 static/images
self.image_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static", "images")
# 确保图片存储目录存在
if not os.path.exists(self.image_dir):
os.makedirs(self.image_dir, exist_ok=True)
logger.info(f"图片存储目录: {self.image_dir}")
def process_message(self, message: WxMessage):
# 示例message字符串
current_date = datetime.now().strftime('%Y-%m-%d')
# 生成Redis key
key = f"{message.roomid}:{message.sender}:{current_date}:count"
# 获取 Redis 连接
redis_conn = self.db_manager.get_redis_connection()
# 使用Redis哈希或字符串增加发言次数
redis_conn.hincrby(key, 'count', 1) # 这里使用哈希但也可以考虑用字符串的INCR操作
# 设置时效为48小时
redis_conn.expire(key, 86400 * 2)
# 或者使用字符串r.incr(key) # 如果只存储一个整数值,字符串类型可能更简单
def archive_message(self, msg: WxMessage):
"""异步存档消息,防止堵塞主线程"""
# 提交任务到线程池
future = self.executor.submit(self._archive_message_task, msg)
# 可选:添加回调函数处理完成后的操作
future.add_done_callback(self._archive_callback)
# 将任务添加到待处理列表
self.pending_tasks.append(future)
# 清理已完成的任务
self._cleanup_completed_tasks()
def _archive_message_task(self, msg: WxMessage):
"""实际执行消息存档的任务函数"""
try:
# 使用 MessageStorageDB 类存档消息
result = self.message_db.archive_message(msg)
return {
'success': result,
'roomid': msg.roomid,
'sender': msg.sender,
'content': msg.content.raw_content, # 添加消息内容
'message_id': msg.msg_id # 添加消息ID
}
except Exception as e:
logger.error(f"存档消息出错: {e}")
return {
'success': False,
'roomid': msg.roomid,
'sender': msg.sender,
'content': msg.content.raw_content, # 添加消息内容
'message_id': msg.msg_id, # 添加消息ID
'error': str(e)
}
def process_image(self, msg: WxMessage):
"""异步处理图片消息,与消息存档分离"""
if msg.msg_type != 3 or not self.client: # 不是图片消息或没有client实例
return False
# 提交任务到图片处理线程池
future = self.image_executor.submit(self._process_image_task, msg)
# 添加回调函数
future.add_done_callback(self._process_image_callback)
# 将任务添加到待处理列表
self.image_tasks.append(future)
# 清理已完成的任务
self._cleanup_completed_tasks()
return True
def _process_image_task(self, msg: WxMessage):
"""实际执行图片处理的任务函数"""
try:
# 使用wcf下载图片确保图片存在
if self.client and msg.msg_id:
# 创建按群ID或个人wxid分割的目录
target_dir = os.path.join(self.image_dir, msg.roomid if msg.roomid else msg.sender)
# 确保目标目录存在
if not os.path.exists(target_dir):
os.makedirs(target_dir, exist_ok=True)
# 尝试使用wcf下载图片到分组后的目录
json = self.client.download_image(msg.msg_id, msg.content.xml_content, 1)
# {
# "ret": 200,
# "msg": "操作成功",
# "data": {
# "fileUrl": "/download/20240720/wx_BTVoJ_o_r6DpxNCNiycFE/0ca5b675-8e2c-4dc1-b288-3c44a40086ec4"
# }
# }
# 解析JSON
if json and json.get('data') and json['data'].get('fileUrl'):
file_url = json['data']['fileUrl']
if file_url:
logger.info(f"记录gewe服务端图片路径成功: {msg.msg_id} -> {file_url}")
# 后续如果需要使用,则去服务器端提取图片
# 直接使用下载后的路径更新数据库
self.message_db.update_message_image_path(msg.msg_id, file_url)
return {
'success': True,
'message_id': msg.msg_id,
'roomid': msg.roomid,
'sender': msg.sender,
'file_path': file_url
}
else:
return {
'success': False,
'message_id': msg.msg_id,
'roomid': msg.roomid,
'sender': msg.sender,
'error': "图片下载失败"
}
else:
return {
'success': False,
'message_id': msg.msg_id,
'roomid': msg.roomid,
'sender': msg.sender,
'error': "实例不存在或消息ID无效"
}
except Exception as e:
logger.error(f"图片处理出错: {msg.msg_id}, 错误: {e}")
return {
'success': False,
'message_id': msg.msg_id,
'roomid': msg.roomid,
'sender': msg.sender,
'error': str(e)
}
def _process_image_callback(self, future):
"""处理异步图片处理任务完成后的回调"""
try:
result = future.result()
if result['success']:
skipped_info = " (已存在)" if result.get('skipped') else ""
logger.info(f"图片处理成功{skipped_info}: {result['roomid']}:{result['sender']}:{result['message_id']}")
else:
error_msg = result.get('error', '未知错误')
logger.error(
f"图片处理失败: {result.get('roomid', '')}:{result.get('sender', '')}:{result.get('message_id', '')} - {error_msg}")
except Exception as e:
logger.error(f"处理图片回调时出错: {e}")
def _archive_callback(self, future):
"""处理异步存档任务完成后的回调"""
try:
result = future.result()
if result['success']:
# 修改日志输出,包含消息内容
compressed = result['content'].replace('\n', '').replace('\r', '')
logger.info(f"archive_success: {result['roomid']}:{result['sender']}: {compressed}")
else:
error_msg = result.get('error', '未知错误')
logger.error(f"archive_fail: {result['roomid']}:{result['sender']} - {error_msg}")
except Exception as e:
logger.error(f"处理存档回调时出错: {e}")
def _cleanup_completed_tasks(self):
"""清理已完成的任务,防止内存泄漏"""
# 只有当任务数量超过阈值时才进行清理,减少频繁操作
if len(self.pending_tasks) > 20:
# 过滤出已完成的任务
completed_tasks = [task for task in self.pending_tasks if task.done()]
# 从待处理列表中移除已完成的任务
for task in completed_tasks:
self.pending_tasks.remove(task)
# 如果待处理任务过多,记录警告日志
if len(self.pending_tasks) > 100:
logger.warning(f"待处理的存档任务数量过多: {len(self.pending_tasks)}")
# 只有当图片任务数量超过阈值时才进行清理
if len(self.image_tasks) > 10:
# 清理已完成的图片处理任务
completed_image_tasks = [task for task in self.image_tasks if task.done()]
for task in completed_image_tasks:
self.image_tasks.remove(task)
# 如果待处理任务过多,记录警告日志
if len(self.image_tasks) > 50:
logger.warning(f"待处理的图片处理任务数量过多: {len(self.image_tasks)}")
def write_to_db(self):
"""从Redis读取发言统计数据并写入数据库"""
# 获取Redis连接
redis_conn = self.db_manager.get_redis_connection()
# 获取当前日期的前一天
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
# 遍历Redis中所有与昨天日期相关的key并写入数据库
for key_item in redis_conn.keys(f"*:*:{yesterday}:count"):
# 检查key是否为字节类型如果是则解码
key = key_item.decode('utf-8') if isinstance(key_item, bytes) else key_item
parts = key.split(':')
group_id, wx_id, _date = parts[0], parts[1], parts[2] # _date应该是yesterday
# 获取计数值
count_bytes = redis_conn.hget(key, 'count')
count = int(count_bytes) if isinstance(count_bytes, bytes) else int(count_bytes) if count_bytes else 0
# 使用MessageStorageDB插入数据
try:
result = self.message_db.insert_speech_count(group_id, wx_id, yesterday, count)
if result:
logging.info(f"成功写入发言统计: {group_id}, {wx_id}, {yesterday}, {count}")
else:
logging.error(f"写入发言统计失败: {group_id}, {wx_id}, {yesterday}, {count}")
except Exception as e:
logging.error(f"写入发言统计出错: {e}")
def generate_and_send_ranking(self, groupId, allContacts: dict):
"""生成并发送群聊发言排名,并根据排名发放积分奖励"""
try:
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
# 使用数据库操作类获取排名数据
results = self.message_db.get_speech_ranking(yesterday, groupId, limit=20)
if not results:
logging.info(f"没有找到 {yesterday} 的群聊 {groupId} 发言记录")
return f"📊 {yesterday} 没有发言记录"
# 格式化输出字符串添加emoji和美化格式
ranking_str = f"🏆 {yesterday} 发言排行榜 🏆\n"
# 为不同名次添加不同的奖杯和样式,并发放积分
for rank, result in enumerate(results, start=1):
username = result['wx_id']
speech_count = result['speech_count']
display_name = allContacts.get(username, username)
# 根据排名发放不同数量的积分
reward_points = 0
if rank == 1:
reward_points = 50
ranking_str += f"🥇🐲 {rank}.{display_name}: {speech_count}次 🔥 +{reward_points}积分\n"
elif rank == 2:
reward_points = 30
ranking_str += f"🥈 {rank}.{display_name}: {speech_count}次 ✨ +{reward_points}积分\n"
elif rank == 3:
reward_points = 20
ranking_str += f"🥉 {rank}.{display_name}: {speech_count}次 👏 +{reward_points}积分\n"
elif rank <= 10:
reward_points = 10
ranking_str += f"🌟 {rank}.{display_name}: {speech_count}次 +{reward_points}积分\n"
else:
reward_points = 5
ranking_str += f"👍 {rank}.{display_name}: {speech_count}次 +{reward_points}积分\n"
# 发放积分奖励
if reward_points > 0:
success, _ = self.points_db.add_points(
username,
groupId,
reward_points,
PointSource.OTHER,
f"{yesterday}发言排行第{rank}名奖励"
)
if not success:
logging.error(f"发放积分失败: {username}, {groupId}, {reward_points}")
logging.info(f"成功生成 {yesterday} 的群聊 {groupId} 发言排名并发放积分")
return ranking_str
except Exception as e:
logging.error(f"生成发言排名出错: {e}")
return f"❌ 生成发言排名出错: {e}"
def get_messages(self, group_id, all_contacts: dict):
try:
# 获取 Redis 连接
redis_conn = self.db_manager.get_redis_connection()
# 获取 redis 中的上次总结时间,本次从上次开始算,若没有,则从 8 小时之前开始计算
key = f"{group_id}:summary_time"
last_summary_time = redis_conn.get(key)
logger.info(f"上次总结时间: {last_summary_time}")
current_time = datetime.now()
current_date = current_time.strftime('%Y-%m-%d %H:%M:%S')
if not last_summary_time:
# 获取当前时间并计算 8 小时前的时间
eight_hours_ago = current_time - timedelta(hours=8)
last_summary_time = eight_hours_ago.strftime('%Y-%m-%d %H:%M:%S')
else:
# 如果 Redis 返回值为字节类型,转换为字符串
if isinstance(last_summary_time, bytes):
last_summary_time = last_summary_time.decode('utf-8')
# 检查 redis 中的时间与当前时间差是否小于 3 小时
last_summary_time_obj = datetime.strptime(last_summary_time, '%Y-%m-%d %H:%M:%S')
time_diff = current_time - last_summary_time_obj
if time_diff < timedelta(hours=3):
# 小于 3 小时,取 8 小时前
last_summary_time = (current_time - timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
elif time_diff > timedelta(days=1):
# 大于 24 小时,取 10 小时前
last_summary_time = (current_time - timedelta(hours=10)).strftime('%Y-%m-%d %H:%M:%S')
# 更新 Redis 存储的当前时间
redis_conn.set(key, current_date)
# 使用 MessageStorageDB 类获取最近消息
hours_ago = int(
(current_time - datetime.strptime(last_summary_time, '%Y-%m-%d %H:%M:%S')).total_seconds() / 3600) + 1
messages = self.message_db.get_recent_messages(group_id, hours_ago=hours_ago)
# 构建最终的结果字符串
result = []
for msg in messages:
timestamp, sender, content, message_type = msg['timestamp'], msg['sender'], msg['content'], msg[
'message_type']
try:
if message_type == 49: # 应用消息类型
# 其他类型的应用消息,解析 XML 提取标题
root = ET.fromstring(content)
title_elem = root.find('.//title')
if title_elem is not None:
content = title_elem.text
except Exception as e:
logger.error(f"解析消息类型49出错: {e}")
sender_name = all_contacts.get(sender, sender) # 获取发送者的名字,若找不到则使用原 ID
result.append(f"{timestamp},{sender_name},{content}")
result_str = "\n".join(result) # 将结果拼接为最终字符串
return result_str
except Exception as e:
logger.error(f"获取消息出错: {e}")
return ""