Files
abot/plugins/message_push_task/main.py

370 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Tuple
from loguru import logger
from pathlib import Path
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.connection import DBConnectionManager
from db.task_db import TaskDBOperator
from utils.decorator.async_job import async_job
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.appmsg_xml import LINK_XML_NORMAL
class MessagePushTask(MessagePluginInterface):
"""消息推送任务插件"""
# 功能权限常量
FEATURE_KEY = "MESSAGE_PUSH"
FEATURE_DESCRIPTION = "📢 消息推送功能 [推送, 消息推送, 定时推送]"
@property
def name(self) -> str:
return "消息推送任务"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供消息推送功能,支持定时推送和群发消息"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.feature = self.register_feature()
self.db = None
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self.db_manager = DBConnectionManager.get_instance()
# 初始化组件
self.db = TaskDBOperator(self.db_manager)
# 初始化数据库表
if not self.db.init_tables():
self.LOG.error("初始化数据库表失败")
return False
async_job.every_seconds(5)(self.process_scheduled_tasks)
# 加载配置
self._commands = self._config.get("MessagePush", {}).get("command", ["推送", "消息推送"])
self.command_format = self._config.get("MessagePush", {}).get("command-format", "推送 消息内容")
self.enable = self._config.get("MessagePush", {}).get("enable", True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="消息推送任务")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 检查命令格式
if len(content.split(" ")) == 1:
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}",
sender)
return False, "命令格式错误"
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
async def process_scheduled_tasks(self):
"""处理定时任务"""
try:
# 获取待执行的任务
tasks = self.db.get_scheduled_tasks()
if not tasks:
return
for task in tasks:
try:
self.LOG.info(f"开始处理定时任务: {task['task_id']}")
# 更新任务状态为执行中
self.db.update_task(task['task_id'], {'status': 'running'})
# 获取任务内容
content_text = task.get('content_text')
content_image = task.get('content_image')
content_link = task.get('content_link')
content_miniprogram = task.get('content_miniprogram')
groups = task.get('groups', [])
try:
groups = json.loads(groups)
except json.JSONDecodeError:
raise Exception("无发送清单")
# 记录任务开始执行
self.db.log_task_action({
'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}",
'task_id': task['task_id'],
'action': 'update', # 使用现有的action类型
'user_id': task['creator_id'],
'changes': {'status': 'running'}
})
# 发送消息到目标群组
if not self.bot:
raise Exception("机器人实例未初始化")
success_count = 0
fail_count = 0
for group_id in groups:
try:
# 发送文本消息
if content_text:
await self.bot.send_text_message(group_id, content_text)
# 发送图片消息
if content_image:
await self.bot.send_image_message(group_id, Path(content_image))
# 发送链接消息
if content_link:
# content_link json 读取内容
link_data = json.loads(content_link)
xml_content = f"{LINK_XML_NORMAL}".format(title=link_data.get('title', ''),
des=link_data.get('des', ''),
url=link_data.get('url', ''),
thumburl=link_data.get('thumburl', '')
)
await self.bot.send_link_xml_message(xml_content, group_id)
# # 发送小程序消息
# if content_miniprogram:
# await self.bot.send_miniprogram_message(
# group_id,
# content_miniprogram.get('title'),
# content_miniprogram.get('appid'),
# content_miniprogram.get('pagepath'),
# content_miniprogram.get('thumb_url')
# )
success_count += 1
except Exception as e:
self.LOG.error(f"发送消息到群组 {group_id} 失败: {e}")
fail_count += 1
# 更新任务状态
if fail_count == 0:
status = 'completed'
elif success_count == 0:
status = 'failed'
else:
status = 'partially_completed'
self.db.update_task(task['task_id'], {'status': status})
# 记录任务完成
self.db.log_task_action({
'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}",
'task_id': task['task_id'],
'action': 'update', # 使用现有的action类型
'user_id': task['creator_id'],
'changes': {'status': status}
})
# 如果是重复任务,更新下次执行时间
if task['schedule_type'] == 'recurring':
next_time = self._calculate_next_schedule_time(
task['schedule_time'],
task['recurring_interval'],
task['recurring_end'],
task['recurring_time'],
task['weekly_days'],
task['monthly_day']
)
if next_time:
self.db.update_task(task['task_id'], {
'schedule_time': next_time,
'status': 'scheduled'
})
self.LOG.info(f"定时任务 {task['task_id']} 处理完成,状态: {status}")
except Exception as e:
self.LOG.error(f"处理定时任务 {task['task_id']} 失败: {e}")
# 更新任务状态为失败
self.db.update_task(task['task_id'], {'status': 'failed'})
# 记录错误日志
self.db.log_task_action({
'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}",
'task_id': task['task_id'],
'action': 'update', # 使用现有的action类型
'user_id': task['creator_id'],
'changes': {'status': 'failed', 'error': str(e)}
})
except Exception as e:
self.LOG.error(f"处理定时任务出错: {e}")
def _calculate_next_schedule_time(self, current_time: datetime, interval: str, end_time: datetime = None,
recurring_time: str = None, weekly_days: List[int] = None,
monthly_day: int = None) -> Optional[datetime]:
"""计算下次执行时间
Args:
current_time: 当前执行时间
interval: 重复间隔daily/weekly/monthly
end_time: 结束时间
recurring_time: 重复执行时间HH:mm格式
weekly_days: 每周执行日列表0-60表示周日
monthly_day: 每月执行日1-31
Returns:
下次执行时间如果已超过结束时间则返回None
"""
try:
if not end_time or current_time < end_time:
# 解析执行时间
if recurring_time:
try:
hour, minute = map(int, recurring_time.split(':'))
if not (0 <= hour <= 23 and 0 <= minute <= 59):
self.LOG.error(f"无效的执行时间格式: {recurring_time}")
return None
except ValueError:
self.LOG.error(f"执行时间格式错误: {recurring_time}")
return None
else:
hour, minute = current_time.hour, current_time.minute
if interval == 'daily':
# 如果是首次执行,使用当前时间
if current_time == datetime.now():
next_time = current_time
else:
# 否则,设置为明天的同一时间
next_time = current_time + timedelta(days=1)
next_time = next_time.replace(hour=hour, minute=minute)
elif interval == 'weekly' and weekly_days:
# 验证每周执行日
if not all(0 <= day <= 6 for day in weekly_days):
self.LOG.error(f"无效的每周执行日: {weekly_days}")
return None
# 获取当前是周几0-60表示周日
current_weekday = current_time.weekday()
# 找到下一个执行日
next_weekday = None
for day in sorted(weekly_days):
if day > current_weekday:
next_weekday = day
break
if next_weekday is None:
# 如果当前是本周最后一个执行日,则从下周第一个执行日开始
next_weekday = min(weekly_days)
days_ahead = 7 - current_weekday + next_weekday
else:
days_ahead = next_weekday - current_weekday
next_time = current_time + timedelta(days=days_ahead)
next_time = next_time.replace(hour=hour, minute=minute)
elif interval == 'monthly' and monthly_day:
# 验证每月执行日
if not (1 <= monthly_day <= 31):
self.LOG.error(f"无效的每月执行日: {monthly_day}")
return None
# 获取当前日期
current_day = current_time.day
# 计算下一个执行日期
if current_day < monthly_day:
# 如果当前日期小于执行日期,则在本月执行
try:
next_time = current_time.replace(day=monthly_day)
except ValueError:
# 处理无效日期如2月30日
if current_time.month == 12:
next_time = current_time.replace(year=current_time.year + 1, month=1, day=1)
else:
next_time = current_time.replace(month=current_time.month + 1, day=1)
else:
# 否则在下个月执行
if current_time.month == 12:
next_time = current_time.replace(year=current_time.year + 1, month=1, day=1)
else:
next_time = current_time.replace(month=current_time.month + 1, day=1)
# 尝试设置到指定的日期
try:
next_time = next_time.replace(day=monthly_day)
except ValueError:
# 如果下个月的指定日期无效,则使用下个月的最后一天
if next_time.month == 12:
next_time = next_time.replace(year=next_time.year + 1, month=1, day=1)
else:
next_time = next_time.replace(month=next_time.month + 1, day=1)
next_time = next_time - timedelta(days=1)
next_time = next_time.replace(hour=hour, minute=minute)
else:
return None
# 检查是否超过结束时间
if end_time and next_time > end_time:
return None
return next_time
return None
except Exception as e:
self.LOG.error(f"计算下次执行时间失败: {e}")
return None