import json from datetime import datetime from typing import Dict, Any, List, Optional, Tuple from loguru import logger from pathlib import Path from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from db.connection import DBConnectionManager from db.task_db import TaskDBOperator from utils.decorator.async_job import async_job from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import PermissionStatus, GroupBotManager from wechat_ipad import WechatAPIClient from wechat_ipad.models.appmsg_xml import LINK_XML_NORMAL class MessagePushTask(MessagePluginInterface): """消息推送任务插件""" # 功能权限常量 FEATURE_KEY = "MESSAGE_PUSH" FEATURE_DESCRIPTION = "📢 消息推送功能 [推送, 消息推送, 定时推送]" @property def name(self) -> str: return "消息推送任务" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供消息推送功能,支持定时推送和群发消息" @property def author(self) -> str: return "Trae AI" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands def __init__(self): super().__init__() self.feature = self.register_feature() self.db = None def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logger self.LOG.info(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.event_system = context.get("event_system") self.db_manager = DBConnectionManager.get_instance() # 初始化组件 self.db = TaskDBOperator(self.db_manager) # 初始化数据库表 if not self.db.init_tables(): self.LOG.error("初始化数据库表失败") return False async_job.every_seconds(5)(self.process_scheduled_tasks) # 加载配置 self._commands = self._config.get("MessagePush", {}).get("command", ["推送", "消息推送"]) self.command_format = self._config.get("MessagePush", {}).get("command-format", "推送 消息内容") self.enable = self._config.get("MessagePush", {}).get("enable", True) self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() command = content.split(" ")[0] return command in self._commands @plugin_stats_decorator(plugin_name="消息推送任务") async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.debug(f"插件执行: {self.name}:{content}") command = content.split(" ")[0] sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") # 检查命令格式 if len(content.split(" ")) == 1: await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", sender) return False, "命令格式错误" # 检查权限 if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" async def process_scheduled_tasks(self): """处理定时任务""" try: # 获取待执行的任务 tasks = self.db.get_scheduled_tasks() if not tasks: return for task in tasks: try: self.LOG.info(f"开始处理定时任务: {task['task_id']}") # 更新任务状态为执行中 self.db.update_task(task['task_id'], {'status': 'running'}) # 获取任务内容 content_text = task.get('content_text') content_image = task.get('content_image') content_link = task.get('content_link') content_miniprogram = task.get('content_miniprogram') groups = task.get('groups', []) try: groups = json.loads(groups) except json.JSONDecodeError: raise Exception("无发送清单") # 记录任务开始执行 self.db.log_task_action({ 'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}", 'task_id': task['task_id'], 'action': 'update', # 使用现有的action类型 'user_id': task['creator_id'], 'changes': {'status': 'running'} }) # 发送消息到目标群组 if not self.bot: raise Exception("机器人实例未初始化") success_count = 0 fail_count = 0 for group_id in groups: try: # 发送文本消息 if content_text: await self.bot.send_text_message(group_id, content_text) # 发送图片消息 if content_image: await self.bot.send_image_message(group_id, Path(content_image)) # 发送链接消息 if content_link: # content_link json 读取内容 link_data = json.loads(content_link) xml_content = f"{LINK_XML_NORMAL}".format(title=link_data.get('title', ''), des=link_data.get('des', ''), url=link_data.get('url', ''), thumburl=link_data.get('thumburl', '') ) await self.bot.send_link_xml_message(xml_content, group_id) # # 发送小程序消息 # if content_miniprogram: # await self.bot.send_miniprogram_message( # group_id, # content_miniprogram.get('title'), # content_miniprogram.get('appid'), # content_miniprogram.get('pagepath'), # content_miniprogram.get('thumb_url') # ) success_count += 1 except Exception as e: self.LOG.error(f"发送消息到群组 {group_id} 失败: {e}") fail_count += 1 # 更新任务状态 if fail_count == 0: status = 'completed' elif success_count == 0: status = 'failed' else: status = 'partially_completed' self.db.update_task(task['task_id'], {'status': status}) # 记录任务完成 self.db.log_task_action({ 'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}", 'task_id': task['task_id'], 'action': 'update', # 使用现有的action类型 'user_id': task['creator_id'], 'changes': {'status': status} }) # 如果是重复任务,更新下次执行时间 if task['schedule_type'] == 'recurring': next_time = self._calculate_next_schedule_time( task['schedule_time'], task['recurring_interval'], task['recurring_end'] ) if next_time: self.db.update_task(task['task_id'], { 'schedule_time': next_time, 'status': 'scheduled' }) self.LOG.info(f"定时任务 {task['task_id']} 处理完成,状态: {status}") except Exception as e: self.LOG.error(f"处理定时任务 {task['task_id']} 失败: {e}") # 更新任务状态为失败 self.db.update_task(task['task_id'], {'status': 'failed'}) # 记录错误日志 self.db.log_task_action({ 'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}", 'task_id': task['task_id'], 'action': 'update', # 使用现有的action类型 'user_id': task['creator_id'], 'changes': {'status': 'failed', 'error': str(e)} }) except Exception as e: self.LOG.error(f"处理定时任务出错: {e}") def _calculate_next_schedule_time(self, current_time: datetime, interval: str, end_time: datetime = None) -> \ Optional[datetime]: """计算下次执行时间 Args: current_time: 当前执行时间 interval: 重复间隔(daily/weekly/monthly) end_time: 结束时间 Returns: 下次执行时间,如果已超过结束时间则返回None """ try: if not end_time or current_time < end_time: if interval == 'daily': next_time = current_time.replace(day=current_time.day + 1) elif interval == 'weekly': next_time = current_time.replace(day=current_time.day + 7) elif interval == 'monthly': # 处理月份边界情况 if current_time.month == 12: next_time = current_time.replace(year=current_time.year + 1, month=1) else: next_time = current_time.replace(month=current_time.month + 1) else: return None # 检查是否超过结束时间 if end_time and next_time > end_time: return None return next_time return None except Exception as e: self.LOG.error(f"计算下次执行时间失败: {e}") return None