Files
abot/plugins/message_push_task/main.py
2025-06-10 13:01:08 +08:00

280 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
from datetime import datetime
from typing import Dict, Any, List, Optional, Tuple
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.connection import DBConnectionManager
from db.task_db import TaskDBOperator
from utils.decorator.async_job import async_job
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
class MessagePushTask(MessagePluginInterface):
"""消息推送任务插件"""
# 功能权限常量
FEATURE_KEY = "MESSAGE_PUSH"
FEATURE_DESCRIPTION = "📢 消息推送功能 [推送, 消息推送, 定时推送]"
@property
def name(self) -> str:
return "消息推送任务"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供消息推送功能,支持定时推送和群发消息"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.feature = self.register_feature()
self.db = None
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self.db_manager = DBConnectionManager.get_instance()
# 初始化组件
self.db = TaskDBOperator(self.db_manager)
# 初始化数据库表
if not self.db.init_tables():
self.LOG.error("初始化数据库表失败")
return False
async_job.every_seconds(5)(self.process_scheduled_tasks)
# 加载配置
self._commands = self._config.get("MessagePush", {}).get("command", ["推送", "消息推送"])
self.command_format = self._config.get("MessagePush", {}).get("command-format", "推送 消息内容")
self.enable = self._config.get("MessagePush", {}).get("enable", True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="消息推送任务")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 检查命令格式
if len(content.split(" ")) == 1:
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}",
sender)
return False, "命令格式错误"
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
async def process_scheduled_tasks(self):
"""处理定时任务"""
try:
# 获取待执行的任务
tasks = self.db.get_scheduled_tasks()
if not tasks:
return
for task in tasks:
try:
self.LOG.info(f"开始处理定时任务: {task['task_id']}")
# 更新任务状态为执行中
self.db.update_task(task['task_id'], {'status': 'running'})
# 获取任务内容
content_text = task.get('content_text')
content_image = task.get('content_image')
content_link = task.get('content_link')
content_miniprogram = task.get('content_miniprogram')
groups = task.get('groups', [])
try:
groups = json.loads(groups)
except json.JSONDecodeError:
raise Exception("无发送清单")
# 记录任务开始执行
self.db.log_task_action({
'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}",
'task_id': task['task_id'],
'action': 'update', # 使用现有的action类型
'user_id': task['creator_id'],
'changes': {'status': 'running'}
})
# 发送消息到目标群组
if not self.bot:
raise Exception("机器人实例未初始化")
success_count = 0
fail_count = 0
for group_id in groups:
try:
# 发送文本消息
if content_text:
await self.bot.send_text_message(group_id, content_text)
# 发送图片消息
if content_image:
await self.bot.send_image_message(group_id, content_image)
# 发送链接消息
if content_link:
await self.bot.send_link_message(group_id, content_link)
# # 发送小程序消息
# if content_miniprogram:
# await self.bot.send_miniprogram_message(
# group_id,
# content_miniprogram.get('title'),
# content_miniprogram.get('appid'),
# content_miniprogram.get('pagepath'),
# content_miniprogram.get('thumb_url')
# )
success_count += 1
except Exception as e:
self.LOG.error(f"发送消息到群组 {group_id} 失败: {e}")
fail_count += 1
# 更新任务状态
if fail_count == 0:
status = 'completed'
elif success_count == 0:
status = 'failed'
else:
status = 'partially_completed'
self.db.update_task(task['task_id'], {'status': status})
# 记录任务完成
self.db.log_task_action({
'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}",
'task_id': task['task_id'],
'action': 'update', # 使用现有的action类型
'user_id': task['creator_id'],
'changes': {'status': status}
})
# 如果是重复任务,更新下次执行时间
if task['schedule_type'] == 'recurring':
next_time = self._calculate_next_schedule_time(
task['schedule_time'],
task['recurring_interval'],
task['recurring_end']
)
if next_time:
self.db.update_task(task['task_id'], {
'schedule_time': next_time,
'status': 'scheduled'
})
self.LOG.info(f"定时任务 {task['task_id']} 处理完成,状态: {status}")
except Exception as e:
self.LOG.error(f"处理定时任务 {task['task_id']} 失败: {e}")
# 更新任务状态为失败
self.db.update_task(task['task_id'], {'status': 'failed'})
# 记录错误日志
self.db.log_task_action({
'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}",
'task_id': task['task_id'],
'action': 'update', # 使用现有的action类型
'user_id': task['creator_id'],
'changes': {'status': 'failed', 'error': str(e)}
})
except Exception as e:
self.LOG.error(f"处理定时任务出错: {e}")
def _calculate_next_schedule_time(self, current_time: datetime, interval: str, end_time: datetime = None) -> \
Optional[datetime]:
"""计算下次执行时间
Args:
current_time: 当前执行时间
interval: 重复间隔daily/weekly/monthly
end_time: 结束时间
Returns:
下次执行时间如果已超过结束时间则返回None
"""
try:
if not end_time or current_time < end_time:
if interval == 'daily':
next_time = current_time.replace(day=current_time.day + 1)
elif interval == 'weekly':
next_time = current_time.replace(day=current_time.day + 7)
elif interval == 'monthly':
# 处理月份边界情况
if current_time.month == 12:
next_time = current_time.replace(year=current_time.year + 1, month=1)
else:
next_time = current_time.replace(month=current_time.month + 1)
else:
return None
# 检查是否超过结束时间
if end_time and next_time > end_time:
return None
return next_time
return None
except Exception as e:
self.LOG.error(f"计算下次执行时间失败: {e}")
return None