1.添加手动添加群组的能力
2.添加异步入库的能力
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
from datetime import datetime, timedelta
|
||||
import xml.etree.ElementTree as ET
|
||||
import logging
|
||||
import concurrent.futures # 添加线程池支持
|
||||
|
||||
from wcferry import WxMsg
|
||||
|
||||
@@ -24,6 +25,10 @@ class MessageStorage:
|
||||
# 初始化本地缓存字典,使用 group_id 作为键
|
||||
self.local_membercounts = {}
|
||||
self.local_members = {}
|
||||
# 创建线程池,用于异步存储消息
|
||||
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
|
||||
# 用于跟踪异步任务的列表
|
||||
self.pending_tasks = []
|
||||
|
||||
def process_message(self, message: WxMsg):
|
||||
# 示例message字符串
|
||||
@@ -39,15 +44,58 @@ class MessageStorage:
|
||||
# 或者使用字符串:r.incr(key) # 如果只存储一个整数值,字符串类型可能更简单
|
||||
|
||||
def archive_message(self, msg: WxMsg):
|
||||
"""异步存档消息,防止堵塞主线程"""
|
||||
# 提交任务到线程池
|
||||
future = self.executor.submit(self._archive_message_task, msg)
|
||||
# 可选:添加回调函数处理完成后的操作
|
||||
future.add_done_callback(self._archive_callback)
|
||||
# 将任务添加到待处理列表
|
||||
self.pending_tasks.append(future)
|
||||
# 清理已完成的任务
|
||||
self._cleanup_completed_tasks()
|
||||
|
||||
def _archive_message_task(self, msg: WxMsg):
|
||||
"""实际执行消息存档的任务函数"""
|
||||
try:
|
||||
# 使用 MessageStorageDB 类存档消息
|
||||
result = self.message_db.archive_message(msg)
|
||||
if result:
|
||||
logger.info(f"消息存档成功: {msg.roomid}:{msg.sender}: {msg.content}")
|
||||
else:
|
||||
logger.error(f"消息存档失败: {msg.roomid}:{msg.sender}")
|
||||
return {
|
||||
'success': result,
|
||||
'roomid': msg.roomid,
|
||||
'sender': msg.sender
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"存档消息出错: {e}")
|
||||
return {
|
||||
'success': False,
|
||||
'roomid': msg.roomid,
|
||||
'sender': msg.sender,
|
||||
'error': str(e)
|
||||
}
|
||||
|
||||
def _archive_callback(self, future):
|
||||
"""处理异步存档任务完成后的回调"""
|
||||
try:
|
||||
result = future.result()
|
||||
if result['success']:
|
||||
logger.info(f"消息存档成功: {result['roomid']}:{result['sender']}")
|
||||
else:
|
||||
error_msg = result.get('error', '未知错误')
|
||||
logger.error(f"消息存档失败: {result['roomid']}:{result['sender']} - {error_msg}")
|
||||
except Exception as e:
|
||||
logger.error(f"处理存档回调时出错: {e}")
|
||||
|
||||
def _cleanup_completed_tasks(self):
|
||||
"""清理已完成的任务,防止内存泄漏"""
|
||||
# 过滤出已完成的任务
|
||||
completed_tasks = [task for task in self.pending_tasks if task.done()]
|
||||
# 从待处理列表中移除已完成的任务
|
||||
for task in completed_tasks:
|
||||
self.pending_tasks.remove(task)
|
||||
|
||||
# 如果待处理任务过多,记录警告日志
|
||||
if len(self.pending_tasks) > 100:
|
||||
logger.warning(f"待处理的存档任务数量过多: {len(self.pending_tasks)}")
|
||||
|
||||
def write_to_db(self):
|
||||
"""从Redis读取发言统计数据并写入数据库"""
|
||||
|
||||
Reference in New Issue
Block a user