加入功能,自动撤回,将异常信息发送后,5秒撤回

This commit is contained in:
liuwei
2025-04-30 17:43:28 +08:00
parent 4321b2a1e5
commit 135d0292de
8 changed files with 181 additions and 11 deletions

0
utils/__init__.py Normal file
View File

0
utils/revoke/__init__.py Normal file
View File

View File

@@ -0,0 +1,47 @@
from typing import Optional
from utils.revoke.message_revoke_manager import MessageRevokeManager
from wechat_ipad import WechatAPIClient
class MessageAutoRevoke:
"""消息自动撤回工具类"""
def __init__(self, client: WechatAPIClient, default_delay: float = 90.0):
"""
初始化消息自动撤回工具
Args:
client: WechatAPIClient实例
default_delay: 默认撤回延时(秒)
"""
self.client = client
self.revoke_manager = MessageRevokeManager(default_delay)
# 设置撤回回调函数
self.revoke_manager.set_revoke_callback(self.client.revoke_message)
def add_message_to_revoke(self, wxid: str, client_msg_id: int, create_time: int, new_msg_id: int,
delay: Optional[float] = None) -> None:
"""
添加消息到撤回队列
Args:
wxid: 接收人wxid
client_msg_id: 消息ID
create_time: 消息创建时间
new_msg_id: 新消息ID
delay: 延迟撤回时间如果为None则使用默认值
"""
self.revoke_manager.add_message_to_revoke(wxid, client_msg_id, create_time, new_msg_id, delay)
def set_default_delay(self, delay: float) -> None:
"""
设置默认撤回延时
Args:
delay: 默认撤回延时(秒)
"""
self.revoke_manager.set_default_delay(delay)
def stop(self) -> None:
"""停止消息撤回任务"""
self.revoke_manager.stop()

View File

@@ -0,0 +1,118 @@
import asyncio
import time
from typing import Dict, List, Tuple, Optional, Callable
from loguru import logger
class MessageRevokeManager:
"""消息撤回管理器,用于管理需要延时撤回的消息"""
def __init__(self, default_delay: float = 90.0):
"""
初始化消息撤回管理器
Args:
default_delay: 默认撤回延时时间默认为90秒
"""
self.revoke_queue: List[Tuple[float, str, int, int, int]] = [] # (撤回时间, wxid, client_msg_id, create_time, new_msg_id)
self.default_delay = default_delay
self.running = False
self.task = None
self.logging = logger
self.revoke_callback = None
def add_message_to_revoke(self, wxid: str, client_msg_id: int, create_time: int, new_msg_id: int,
delay: Optional[float] = None) -> None:
"""
添加消息到撤回队列
Args:
wxid: 接收人wxid
client_msg_id: 消息ID
create_time: 消息创建时间
new_msg_id: 新消息ID
delay: 延迟撤回时间如果为None则使用默认值
"""
if delay is None:
delay = self.default_delay
revoke_time = time.time() + delay
self.revoke_queue.append((revoke_time, wxid, client_msg_id, create_time, new_msg_id))
self.revoke_queue.sort(key=lambda x: x[0]) # 按撤回时间排序
# 如果撤回任务未运行,则启动
if not self.running:
self.start()
def start(self) -> None:
"""启动消息撤回任务"""
if self.running:
return
self.running = True
self.task = asyncio.create_task(self._process_revoke_queue())
self.logging.info("消息自动撤回任务已启动")
def stop(self) -> None:
"""停止消息撤回任务"""
if not self.running:
return
self.running = False
if self.task:
self.task.cancel()
self.logging.info("消息自动撤回任务已停止")
def set_revoke_callback(self, callback: Callable) -> None:
"""
设置撤回回调函数
Args:
callback: 撤回消息的回调函数接受wxid, client_msg_id, create_time, new_msg_id参数
"""
self.revoke_callback = callback
def set_default_delay(self, delay: float) -> None:
"""
设置默认撤回延时
Args:
delay: 默认撤回延时(秒)
"""
self.default_delay = delay
async def _process_revoke_queue(self) -> None:
"""处理撤回队列的异步任务"""
try:
while self.running:
now = time.time()
# 检查是否有需要撤回的消息
while self.revoke_queue and self.revoke_queue[0][0] <= now:
revoke_time, wxid, client_msg_id, create_time, new_msg_id = self.revoke_queue.pop(0)
try:
# 通过回调函数调用实际的撤回方法
if self.revoke_callback and callable(self.revoke_callback):
await self.revoke_callback(wxid, client_msg_id, create_time, new_msg_id)
self.logging.info(f"已自动撤回消息: wxid={wxid}, msg_id={client_msg_id}")
except Exception as e:
self.logging.error(f"自动撤回消息失败: {e}")
# 如果队列为空,等待一段时间
if not self.revoke_queue:
self.running = False
self.logging.info("撤回队列为空,撤回任务已停止")
break
# 计算下一个需要撤回的消息的等待时间
next_revoke_time = self.revoke_queue[0][0]
wait_time = max(0.1, next_revoke_time - time.time())
# 等待到下一个撤回时间
await asyncio.sleep(wait_time)
except asyncio.CancelledError:
self.logging.info("消息撤回任务被取消")
except Exception as e:
self.logging.error(f"消息撤回任务异常: {e}")
self.running = False

View File

@@ -142,7 +142,7 @@ class GroupBotManager:
@staticmethod
def handle_command(group_id, command_str):
"""统一处理群功能指令"""
print(f"PermissionStatus handle_command command_str: {command_str}")
# print(f"PermissionStatus handle_command command_str: {command_str}")
# 命令解析
command_parts = command_str.strip().split("-")