Files
abot/utils/revoke/message_revoke_manager.py

118 lines
4.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import time
from typing import Dict, List, Tuple, Optional, Callable
from loguru import logger
class MessageRevokeManager:
"""消息撤回管理器,用于管理需要延时撤回的消息"""
def __init__(self, default_delay: float = 90.0):
"""
初始化消息撤回管理器
Args:
default_delay: 默认撤回延时时间默认为90秒
"""
self.revoke_queue: List[Tuple[float, str, int, int, int]] = [] # (撤回时间, wxid, client_msg_id, create_time, new_msg_id)
self.default_delay = default_delay
self.running = False
self.task = None
self.logging = logger
self.revoke_callback = None
def add_message_to_revoke(self, wxid: str, client_msg_id: int, create_time: int, new_msg_id: int,
delay: Optional[float] = None) -> None:
"""
添加消息到撤回队列
Args:
wxid: 接收人wxid
client_msg_id: 消息ID
create_time: 消息创建时间
new_msg_id: 新消息ID
delay: 延迟撤回时间如果为None则使用默认值
"""
if delay is None:
delay = self.default_delay
revoke_time = time.time() + delay
self.revoke_queue.append((revoke_time, wxid, client_msg_id, create_time, new_msg_id))
self.revoke_queue.sort(key=lambda x: x[0]) # 按撤回时间排序
# 如果撤回任务未运行,则启动
if not self.running:
self.start()
def start(self) -> None:
"""启动消息撤回任务"""
if self.running:
return
self.running = True
self.task = asyncio.create_task(self._process_revoke_queue())
self.logging.info("消息自动撤回任务已启动")
def stop(self) -> None:
"""停止消息撤回任务"""
if not self.running:
return
self.running = False
if self.task:
self.task.cancel()
self.logging.info("消息自动撤回任务已停止")
def set_revoke_callback(self, callback: Callable) -> None:
"""
设置撤回回调函数
Args:
callback: 撤回消息的回调函数接受wxid, client_msg_id, create_time, new_msg_id参数
"""
self.revoke_callback = callback
def set_default_delay(self, delay: float) -> None:
"""
设置默认撤回延时
Args:
delay: 默认撤回延时(秒)
"""
self.default_delay = delay
async def _process_revoke_queue(self) -> None:
"""处理撤回队列的异步任务"""
try:
while self.running:
now = time.time()
# 检查是否有需要撤回的消息
while self.revoke_queue and self.revoke_queue[0][0] <= now:
revoke_time, wxid, client_msg_id, create_time, new_msg_id = self.revoke_queue.pop(0)
try:
# 通过回调函数调用实际的撤回方法
if self.revoke_callback and callable(self.revoke_callback):
await self.revoke_callback(wxid, client_msg_id, create_time, new_msg_id)
self.logging.info(f"已自动撤回消息: wxid={wxid}, msg_id={client_msg_id}")
except Exception as e:
self.logging.error(f"自动撤回消息失败: {e}")
# 如果队列为空,等待一段时间
if not self.revoke_queue:
self.running = False
self.logging.info("撤回队列为空,撤回任务已停止")
break
# 计算下一个需要撤回的消息的等待时间
next_revoke_time = self.revoke_queue[0][0]
wait_time = max(0.1, next_revoke_time - time.time())
# 等待到下一个撤回时间
await asyncio.sleep(wait_time)
except asyncio.CancelledError:
self.logging.info("消息撤回任务被取消")
except Exception as e:
self.logging.error(f"消息撤回任务异常: {e}")
self.running = False