Files
abot/plugins/message_push_task/main.py
2026-02-26 09:17:56 +08:00

478 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import json
import os
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Tuple
from loguru import logger
from pathlib import Path
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.connection import DBConnectionManager
from db.task_db import TaskDBOperator
from utils.decorator.async_job import async_job
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.appmsg_xml import LINK_XML_NORMAL
class MessagePushTask(MessagePluginInterface):
"""消息推送任务插件"""
# 功能权限常量
FEATURE_KEY = "MESSAGE_PUSH"
FEATURE_DESCRIPTION = "📢 消息推送功能 [推送, 消息推送, 定时推送]"
@property
def name(self) -> str:
return "消息推送任务"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供消息推送功能,支持定时推送、群发消息、语音消息和视频消息"
@property
def author(self) -> str:
return "liu.wei"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.feature = self.register_feature()
self.db = None
self._task_execution_records = {} # 用于记录任务执行状态
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.debug(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self.db_manager = DBConnectionManager.get_instance()
# 初始化组件
self.db = TaskDBOperator(self.db_manager)
# 初始化数据库表
if not self.db.init_tables():
self.LOG.error("初始化数据库表失败")
return False
async_job.every_seconds(5)(self.process_scheduled_tasks)
# 加载配置
self._commands = self._config.get("MessagePush", {}).get("command", ["推送", "消息推送"])
self.command_format = self._config.get("MessagePush", {}).get("command-format", "推送 消息内容")
self.enable = self._config.get("MessagePush", {}).get("enable", True)
self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.debug(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="消息推送任务")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 检查命令格式
if len(content.split(" ")) == 1:
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}",
sender)
return False, "命令格式错误"
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
return False, None
def _get_execution_key(self, task_id: str, hour: int, minute: int) -> str:
"""生成任务执行记录的唯一键"""
now = datetime.now()
return f"{task_id}_{now.date()}_{hour}_{minute}"
async def process_scheduled_tasks(self):
"""处理定时任务"""
try:
# 获取待执行的任务
tasks = self.db.get_scheduled_tasks()
if not tasks:
return
for task in tasks:
try:
self.LOG.debug(f"开始处理定时任务: {task['task_id']}")
# 检查任务是否应该执行
now = datetime.now()
schedule_time = task['schedule_time']
# 对于重复任务,检查是否到达执行时间点
if task['schedule_type'] == 'recurring':
recurring_time = task.get('recurring_time')
if recurring_time:
try:
# 处理 timedelta 对象
if isinstance(recurring_time, timedelta):
total_seconds = int(recurring_time.total_seconds())
hour = total_seconds // 3600
minute = (total_seconds % 3600) // 60
second = total_seconds % 60
else:
# 处理字符串格式
time_parts = str(recurring_time).split(':')
if len(time_parts) == 3:
hour, minute, second = map(int, time_parts)
else:
hour, minute = map(int, time_parts)
second = 0
# 检查当前时间是否在目标时间的前后5秒内
target_time = now.replace(hour=hour, minute=minute, second=second, microsecond=0)
time_diff = abs((now - target_time).total_seconds())
if time_diff > 5: # 允许5秒的误差
continue
# 检查是否已经在这个时间点执行过
execution_key = self._get_execution_key(task['task_id'], hour, minute)
if execution_key in self._task_execution_records:
self.LOG.debug(f"任务 {task['task_id']} 今天已经执行过,跳过")
continue
# 记录执行状态
self._task_execution_records[execution_key] = now
except (ValueError, AttributeError) as e:
self.LOG.error(f"无效的执行时间格式: {recurring_time}, 错误: {e}")
continue
# 更新任务状态为执行中
self.db.update_task(task['task_id'], {'status': 'running'})
# 获取任务内容
content_text = task.get('content_text')
content_image = task.get('content_image')
content_link = task.get('content_link')
content_miniprogram = task.get('content_miniprogram')
content_voice = task.get('content_voice') # 语音消息
content_video = task.get('content_video') # 视频消息
# 记录任务开始执行
self.db.log_task_action({
'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}",
'task_id': task['task_id'],
'action': 'update',
'user_id': task['creator_id'],
'changes': {'status': 'running'}
})
# 发送消息到目标群组
if not self.bot:
raise Exception("机器人实例未初始化")
# 发送链接消息,提前进行数据整理,上传缩略图
if content_link:
# content_link json 读取内容
link_data = json.loads(content_link)
thumburl_path = link_data.get('thumburl', '')
thumburl_wx = ""
if thumburl_path and os.path.exists(thumburl_path):
# Read local image file
with open(thumburl_path, 'rb') as image_file:
# Convert image to base64
base64_string = base64.b64encode(image_file.read()).decode('utf-8')
# Call upload method with base64 string
thumburl_wx = await self.bot.friend_circle_upload(base64=base64_string)
else:
print(f"Image file not found at: {thumburl_path}")
success_count = 0
fail_count = 0
groups = task.get('groups', [])
try:
groups = json.loads(groups)
except json.JSONDecodeError:
raise Exception("无发送清单")
for group_id in groups:
group_sent = False
for attempt in range(3):
try:
if content_text:
await self.bot.send_text_message(group_id, content_text)
if content_image:
await self.bot.send_image_message(group_id, Path(content_image))
if content_link:
link_data = json.loads(content_link)
xml_content = f"{LINK_XML_NORMAL}".format(
title=link_data.get('title', ''),
des=link_data.get('des', ''),
url=link_data.get('url', ''),
thumburl=thumburl_wx
)
await self.bot.send_link_xml_message(xml_content, group_id)
if content_voice:
voice_path = Path(task['content_voice'])
voice_type = 'wav' if voice_path.suffix.lower() == '.wav' else 'mp3'
await self.bot.send_voice_message(group_id, Path(content_voice), voice_type)
if content_video:
await self.bot.send_video_message(group_id, Path(content_video))
group_sent = True
break
except Exception as e:
self.LOG.error(f"发送消息到群组 {group_id}{attempt + 1}次失败: {e}")
if group_sent:
success_count += 1
else:
fail_count += 1
# 更新任务状态
if task['schedule_type'] == 'recurring':
recurring_end = task.get('recurring_end')
if recurring_end and now > recurring_end:
status = 'completed'
else:
status = 'scheduled'
else:
# 非重复任务的状态处理
if fail_count == 0:
status = 'completed'
else:
status = 'failed'
# 如果是重复任务且未超过结束时间,先计算下次执行时间
next_time = None
if task['schedule_type'] == 'recurring' and status != 'completed':
next_time = self._calculate_next_schedule_time(
task['schedule_time'],
task['recurring_interval'],
task['recurring_end'],
task['recurring_time'],
task['weekly_days'],
task['monthly_day']
)
if next_time:
self.LOG.info(f"计算任务 {task['task_id']} 的下次执行时间为: {next_time}")
else:
# 如果没有下次执行时间,标记为已完成
status = 'completed'
self.LOG.info(f"任务 {task['task_id']} 没有下次执行时间,标记为已完成")
# 更新任务状态和下次执行时间
update_data = {'status': status}
if next_time:
update_data['schedule_time'] = next_time
self.db.update_task(task['task_id'], update_data)
# 记录任务完成
self.db.log_task_action({
'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}",
'task_id': task['task_id'],
'action': 'update',
'user_id': task['creator_id'],
'changes': update_data
})
self.LOG.info(f"定时任务 {task['task_id']} 处理完成,状态: {status}, 下次执行时间: {next_time}")
except Exception as e:
self.LOG.error(f"处理定时任务 {task['task_id']} 失败: {e}")
# 更新任务状态为失败
self.db.update_task(task['task_id'], {'status': 'failed'})
# 记录错误日志
self.db.log_task_action({
'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}",
'task_id': task['task_id'],
'action': 'update',
'user_id': task['creator_id'],
'changes': {'status': 'failed', 'error': str(e)}
})
except Exception as e:
self.LOG.error(f"处理定时任务出错: {e}")
def _calculate_next_schedule_time(self, schedule_time: datetime, recurring_interval: str,
recurring_end: datetime, recurring_time: str,
weekly_days: str = None, monthly_day: int = None) -> Optional[datetime]:
"""计算下次执行时间"""
try:
# 解析执行时间
if recurring_time:
try:
# 处理 timedelta 对象
if isinstance(recurring_time, timedelta):
total_seconds = int(recurring_time.total_seconds())
hour = total_seconds // 3600
minute = (total_seconds % 3600) // 60
second = total_seconds % 60
else:
# 处理字符串格式
time_parts = str(recurring_time).split(':')
if len(time_parts) == 3:
hour, minute, second = map(int, time_parts)
else:
hour, minute = map(int, time_parts)
second = 0
except (ValueError, AttributeError) as e:
self.LOG.error(f"无效的执行时间格式: {recurring_time}, 错误: {e}")
return None
else:
hour, minute = schedule_time.hour, schedule_time.minute
second = 0
# 获取当前时间
now = datetime.now()
# 如果已经超过结束时间返回None
if recurring_end and now > recurring_end:
return None
# 根据重复间隔计算下次执行时间
if recurring_interval == 'daily':
# 每天执行
next_time = now.replace(hour=hour, minute=minute, second=second, microsecond=0)
if next_time <= now:
next_time = next_time + timedelta(days=1)
elif recurring_interval == 'weekly':
# 每周执行
try:
if isinstance(weekly_days, str):
weekly_days = json.loads(weekly_days)
if not weekly_days:
return None
# 获取当前是周几0-60是周一
current_weekday = now.weekday()
# 将数据库中的周几1-7转换为代码中的周几0-6
weekly_days = [int(day) - 1 for day in weekly_days]
weekly_days.sort()
# 找到下一个执行日
next_weekday = None
for day in weekly_days:
if day > current_weekday:
next_weekday = day
break
# 如果本周没有下一个执行日,取下周的第一个执行日
if next_weekday is None:
next_weekday = weekly_days[0]
# 直接计算到下周的指定日期
days_ahead = (7 - current_weekday) + next_weekday
else:
days_ahead = next_weekday - current_weekday
# 计算下次执行时间
next_time = now.replace(hour=hour, minute=minute, second=second, microsecond=0) + timedelta(
days=days_ahead)
# 如果计算出的时间小于等于当前时间,说明计算有误,需要再加一周
if next_time <= now:
next_time = next_time + timedelta(days=7)
self.LOG.info(
f"每周任务计算:当前周几={current_weekday}, 下次执行周几={next_weekday}, 天数差={days_ahead}, 下次执行时间={next_time}")
except (json.JSONDecodeError, ValueError, IndexError) as e:
self.LOG.error(f"处理每周执行日失败: {e}")
return None
elif recurring_interval == 'monthly':
# 每月执行
try:
if not monthly_day:
return None
# 获取当前日期
current_day = now.day
current_month = now.month
current_year = now.year
# 如果当前日期已经过了执行日,移到下个月
if current_day >= monthly_day:
if current_month == 12:
next_month = 1
next_year = current_year + 1
else:
next_month = current_month + 1
next_year = current_year
else:
next_month = current_month
next_year = current_year
# 处理无效日期如2月30日
try:
next_time = datetime(next_year, next_month, monthly_day, hour, minute, second)
except ValueError:
# 如果日期无效,使用该月的最后一天
if next_month == 12:
last_day = 31
else:
last_day = (datetime(next_year, next_month + 1, 1) - timedelta(days=1)).day
next_time = datetime(next_year, next_month, last_day, hour, minute, second)
except Exception as e:
self.LOG.error(f"处理每月执行日失败: {e}")
return None
else:
return None
# 检查是否超过结束时间
if recurring_end and next_time > recurring_end:
return None
return next_time
except Exception as e:
self.LOG.error(f"计算下次执行时间失败: {e}")
return None