diff --git a/admin/dashboard/blueprints/message_push.py b/admin/dashboard/blueprints/message_push.py new file mode 100644 index 0000000..312a549 --- /dev/null +++ b/admin/dashboard/blueprints/message_push.py @@ -0,0 +1,220 @@ +from flask import Blueprint, render_template, jsonify, request, current_app +from .auth import login_required +from loguru import logger +import json +import uuid +from datetime import datetime + +# 创建消息推送管理蓝图 +message_push_bp = Blueprint('message_push', __name__, url_prefix='/message_push') + +# 消息推送管理页面 +@message_push_bp.route('/') +@login_required +def message_push_management(): + """消息推送管理页面""" + return render_template('message_push_management.html') + +# API路由 +@message_push_bp.route('/api/tasks', methods=['GET']) +@login_required +def api_tasks_list(): + """获取任务列表API""" + try: + # 获取查询参数 + status = request.args.get('status') + start_time = request.args.get('start_time') + end_time = request.args.get('end_time') + page = int(request.args.get('page', 1)) + limit = int(request.args.get('limit', 20)) + + # 获取任务列表 + db = current_app.dashboard_server.task_db + tasks, total = db.get_tasks_list(status, start_time, end_time, page, limit) + + return jsonify({ + "success": True, + "data": { + "tasks": tasks, + "total": total, + "page": page, + "limit": limit + } + }) + except Exception as e: + logger.error(f"获取任务列表失败: {e}") + return jsonify({"success": False, "error": str(e)}), 500 + +@message_push_bp.route('/api/tasks', methods=['POST']) +@login_required +def api_create_task(): + """创建任务API""" + try: + data = request.json + if not data: + return jsonify({"success": False, "error": "无效的请求数据"}), 400 + + # 生成任务ID + data['task_id'] = str(uuid.uuid4()) + data['creator_id'] = request.user.get('id') + + # 创建任务 + db = current_app.dashboard_server.task_db + task = db.create_task(data) + if not task: + return jsonify({"success": False, "error": "创建任务失败"}), 500 + + return jsonify({ + "success": True, + "data": { + "task": task + } + }) + except Exception as e: + logger.error(f"创建任务失败: {e}") + return jsonify({"success": False, "error": str(e)}), 500 + +@message_push_bp.route('/api/tasks/', methods=['PUT']) +@login_required +def api_update_task(task_id): + """更新任务API""" + try: + data = request.json + if not data: + return jsonify({"success": False, "error": "无效的请求数据"}), 400 + + # 获取任务 + db = current_app.dashboard_server.task_db + task = db.get_task(task_id) + if not task: + return jsonify({"success": False, "error": "任务不存在"}), 404 + + # 更新任务 + if not db.update_task(task_id, data): + return jsonify({"success": False, "error": "更新任务失败"}), 500 + + # 获取更新后的任务 + updated_task = db.get_task(task_id) + return jsonify({ + "success": True, + "data": { + "task": updated_task + } + }) + except Exception as e: + logger.error(f"更新任务失败: {e}") + return jsonify({"success": False, "error": str(e)}), 500 + +@message_push_bp.route('/api/tasks/', methods=['DELETE']) +@login_required +def api_delete_task(task_id): + """删除任务API""" + try: + # 获取任务 + db = current_app.dashboard_server.task_db + task = db.get_task(task_id) + if not task: + return jsonify({"success": False, "error": "任务不存在"}), 404 + + # 删除任务 + if not db.delete_task(task_id): + return jsonify({"success": False, "error": "删除任务失败"}), 500 + + return jsonify({ + "success": True, + "message": "任务已删除" + }) + except Exception as e: + logger.error(f"删除任务失败: {e}") + return jsonify({"success": False, "error": str(e)}), 500 + +@message_push_bp.route('/api/tasks//pause', methods=['POST']) +@login_required +def api_pause_task(task_id): + """暂停任务API""" + try: + # 获取任务 + db = current_app.dashboard_server.task_db + task = db.get_task(task_id) + if not task: + return jsonify({"success": False, "error": "任务不存在"}), 404 + + # 暂停任务 + if not db.update_task(task_id, {'status': 'paused'}): + return jsonify({"success": False, "error": "暂停任务失败"}), 500 + + return jsonify({ + "success": True, + "message": "任务已暂停" + }) + except Exception as e: + logger.error(f"暂停任务失败: {e}") + return jsonify({"success": False, "error": str(e)}), 500 + +@message_push_bp.route('/api/tasks//resume', methods=['POST']) +@login_required +def api_resume_task(task_id): + """恢复任务API""" + try: + # 获取任务 + db = current_app.dashboard_server.task_db + task = db.get_task(task_id) + if not task: + return jsonify({"success": False, "error": "任务不存在"}), 404 + + # 恢复任务 + if not db.update_task(task_id, {'status': 'scheduled'}): + return jsonify({"success": False, "error": "恢复任务失败"}), 500 + + return jsonify({ + "success": True, + "message": "任务已恢复" + }) + except Exception as e: + logger.error(f"恢复任务失败: {e}") + return jsonify({"success": False, "error": str(e)}), 500 + +@message_push_bp.route('/api/tasks//preview', methods=['POST']) +@login_required +def api_preview_task(task_id): + """预览任务API""" + try: + # 获取任务 + db = current_app.dashboard_server.task_db + task = db.get_task(task_id) + if not task: + return jsonify({"success": False, "error": "任务不存在"}), 404 + + # 发送预览 + message_push = current_app.dashboard_server.message_push_task.message_push + if not message_push.send_preview(task, [request.user.get('id')]): + return jsonify({"success": False, "error": "发送预览失败"}), 500 + + return jsonify({ + "success": True, + "message": "预览已发送" + }) + except Exception as e: + logger.error(f"发送预览失败: {e}") + return jsonify({"success": False, "error": str(e)}), 500 + +@message_push_bp.route('/api/tasks//logs', methods=['GET']) +@login_required +def api_task_logs(task_id): + """获取任务日志API""" + try: + # 获取查询参数 + page = int(request.args.get('page', 1)) + limit = int(request.args.get('limit', 20)) + + # 查询日志 + db = current_app.dashboard_server.task_db + logs_data = db.get_task_logs_with_pagination(task_id, page, limit) + + return jsonify({ + "success": True, + "data": logs_data + }) + except Exception as e: + logger.error(f"获取任务日志失败: {e}") + return jsonify({"success": False, "error": str(e)}), 500 \ No newline at end of file diff --git a/admin/dashboard/server.py b/admin/dashboard/server.py index 6562b71..71f1d27 100644 --- a/admin/dashboard/server.py +++ b/admin/dashboard/server.py @@ -5,14 +5,15 @@ import os import sys import threading + +import toml +from flask import Flask, send_from_directory from loguru import logger from db.contacts_db import ContactsDBOperator from db.message_storage import MessageStorageDB from db.stats_db import StatsDBOperator -from flask import Flask, send_from_directory -import toml - +from db.task_db import TaskDBOperator from wechat_ipad import WechatAPIClient # 添加项目根目录到系统路径,确保可以导入项目模块 @@ -42,6 +43,7 @@ class DashboardServer: self.stats_db = StatsDBOperator(self.db_manager) self.message_storage = MessageStorageDB(self.db_manager) self.contact_db: ContactsDBOperator = ContactsDBOperator(self.db_manager) + self.task_db: TaskDBOperator = TaskDBOperator(self.db_manager) # 获取联系人管理器实例 self.contact_manager = robot_instance.contact_manager self.plugin_manager = robot_instance.plugin_manager diff --git a/admin/dashboard/templates/message_push_management.html b/admin/dashboard/templates/message_push_management.html new file mode 100644 index 0000000..546623e --- /dev/null +++ b/admin/dashboard/templates/message_push_management.html @@ -0,0 +1,439 @@ +{% extends "base.html" %} + +{% block title %}消息推送管理{% endblock %} + +{% block content %} +
+
+
+
+
+

消息推送管理

+
+ +
+
+
+ +
+
+ +
+
+ +
+
+ +
+
+ + +
+
+ + + + + + + + + + + + + + + + +
任务ID任务名称状态计划时间创建时间操作
+ + +
+
+
+ 显示 0 到 0 条,共 0 条记录 +
+
+
+
+ +
+
+
+
+
+
+
+
+ + + + + + +{% endblock %} + +{% block scripts %} + +{% endblock %} \ No newline at end of file diff --git a/db/task_db.py b/db/task_db.py new file mode 100644 index 0000000..85ac11a --- /dev/null +++ b/db/task_db.py @@ -0,0 +1,491 @@ +from typing import List, Dict, Optional, Tuple + +from db.base import BaseDBOperator +from db.connection import DBConnectionManager + +from loguru import logger +import json + + +class TaskDBOperator(BaseDBOperator): + """消息推送任务数据库操作类""" + + def __init__(self, db_manager: DBConnectionManager): + super().__init__(db_manager) + + def init_tables(self) -> bool: + """初始化数据库表""" + try: + # 创建任务表 + self.execute_update(""" + CREATE TABLE IF NOT EXISTS t_push_tasks ( + task_id VARCHAR(36) PRIMARY KEY, + name VARCHAR(50) NOT NULL, + schedule_type ENUM('once', 'recurring') NOT NULL, + schedule_time DATETIME NOT NULL, + recurring_interval ENUM('daily', 'weekly', 'monthly') DEFAULT NULL, + recurring_end DATETIME DEFAULT NULL, + content_text TEXT(500), + content_image VARCHAR(255), + content_link VARCHAR(255), + content_miniprogram JSON, + groups JSON, + priority ENUM('high', 'medium', 'low') DEFAULT 'medium', + status ENUM('draft', 'scheduled', 'running', 'completed', 'failed', 'paused') DEFAULT 'draft', + creator_id VARCHAR(50) NOT NULL, + preview_recipients JSON, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP + ) + """) + + # 创建任务日志表 + self.execute_update(""" + CREATE TABLE IF NOT EXISTS t_push_task_logs ( + log_id VARCHAR(36) PRIMARY KEY, + task_id VARCHAR(36) NOT NULL, + action ENUM('create', 'update', 'delete', 'pause', 'resume') NOT NULL, + user_id VARCHAR(50) NOT NULL, + changes JSON, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (task_id) REFERENCES t_push_tasks(task_id) + ) + """) + + # 创建预览表 + self.execute_update(""" + CREATE TABLE IF NOT EXISTS t_push_previews ( + preview_id VARCHAR(36) PRIMARY KEY, + task_id VARCHAR(36) NOT NULL, + content JSON NOT NULL, + recipients JSON NOT NULL, + validation JSON, + status ENUM('sent', 'confirmed', 'modified') DEFAULT 'sent', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (task_id) REFERENCES t_push_tasks(task_id) + ) + """) + + # 创建反馈表 + self.execute_update(""" + CREATE TABLE IF NOT EXISTS t_push_feedback ( + feedback_id VARCHAR(36) PRIMARY KEY, + task_id VARCHAR(36) NOT NULL, + user_id VARCHAR(50) NOT NULL, + content TEXT NOT NULL, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (task_id) REFERENCES t_push_tasks(task_id) + ) + """) + + return True + except Exception as e: + logger.error(f"初始化数据库表失败: {e}") + return False + + def create_task(self, task_data: Dict) -> Optional[Dict]: + """创建新任务 + + Args: + task_data: 任务数据 + + Returns: + 创建的任务数据,失败返回None + """ + try: + sql = """ + INSERT INTO t_push_tasks ( + task_id, name, schedule_type, schedule_time, recurring_interval, + recurring_end, content_text, content_image, content_link, + content_miniprogram, groups, priority, status, creator_id, + preview_recipients + ) VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + ) + """ + params = ( + task_data['task_id'], + task_data['name'], + task_data['schedule_type'], + task_data['schedule_time'], + task_data.get('recurring_interval'), + task_data.get('recurring_end'), + task_data.get('content_text'), + task_data.get('content_image'), + task_data.get('content_link'), + task_data.get('content_miniprogram'), + task_data.get('groups'), + task_data.get('priority', 'medium'), + task_data.get('status', 'draft'), + task_data['creator_id'], + task_data.get('preview_recipients') + ) + + if self.execute_update(sql, params): + return self.get_task(task_data['task_id']) + return None + except Exception as e: + logger.error(f"创建任务失败: {e}") + return None + + def get_task(self, task_id: str) -> Optional[Dict]: + """获取任务 + + Args: + task_id: 任务ID + + Returns: + 任务数据,不存在返回None + """ + sql = "SELECT * FROM t_push_tasks WHERE task_id = %s" + return self.execute_query(sql, (task_id,), fetch_one=True) + + def update_task(self, task_id: str, updates: Dict) -> bool: + """更新任务 + + Args: + task_id: 任务ID + updates: 更新的字段和值 + + Returns: + 是否更新成功 + """ + try: + fields = [] + values = [] + for key, value in updates.items(): + if key in ['groups', 'content_miniprogram', 'preview_recipients']: + fields.append(f"{key} = %s") + values.append(value) + else: + fields.append(f"{key} = %s") + values.append(value) + values.append(task_id) + + sql = f""" + UPDATE t_push_tasks + SET {', '.join(fields)} + WHERE task_id = %s + """ + return self.execute_update(sql, values) + except Exception as e: + logger.error(f"更新任务失败: {e}") + return False + + def delete_task(self, task_id: str) -> bool: + """删除任务 + + Args: + task_id: 任务ID + + Returns: + 是否删除成功 + """ + sql = "DELETE FROM t_push_tasks WHERE task_id = %s" + return self.execute_update(sql, (task_id,)) + + def get_scheduled_tasks(self) -> List[Dict]: + """获取待执行的任务 + + Returns: + 待执行的任务列表 + """ + sql = """ + SELECT * FROM t_push_tasks + WHERE status = 'scheduled' + AND schedule_time <= NOW() + """ + return self.execute_query(sql) + + def log_task_action(self, log_data: Dict) -> bool: + """记录任务操作日志 + + Args: + log_data: 日志数据 + + Returns: + 是否记录成功 + """ + try: + sql = """ + INSERT INTO t_push_task_logs ( + log_id, task_id, action, user_id, changes + ) VALUES ( + %s, %s, %s, %s, %s + ) + """ + params = ( + log_data['log_id'], + log_data['task_id'], + log_data['action'], + log_data['user_id'], + log_data['changes'] + ) + return self.execute_update(sql, params) + except Exception as e: + logger.error(f"记录任务操作日志失败: {e}") + return False + + def get_task_logs(self, task_id: str, page: int = 1, limit: int = 20) -> Dict: + """获取任务日志 + + Args: + task_id: 任务ID + page: 页码 + limit: 每页数量 + + Returns: + 包含日志列表和总数的字典 + """ + try: + # 获取总数 + count_sql = """ + SELECT COUNT(*) as total + FROM t_push_task_logs + WHERE task_id = %s + """ + count_result = self.execute_query(count_sql, (task_id,), fetch_one=True) + total = count_result['total'] if count_result else 0 + + # 获取日志列表 + sql = """ + SELECT * + FROM t_push_task_logs + WHERE task_id = %s + ORDER BY timestamp DESC + LIMIT %s OFFSET %s + """ + offset = (page - 1) * limit + logs = self.execute_query(sql, (task_id, limit, offset)) + + return { + 'total': total, + 'logs': logs + } + except Exception as e: + logger.error(f"获取任务日志失败: {e}") + return {'total': 0, 'logs': []} + + def create_preview(self, preview_data: Dict) -> Optional[Dict]: + """创建预览记录 + + Args: + preview_data: 预览数据 + + Returns: + 创建的预览记录,失败返回None + """ + try: + sql = """ + INSERT INTO t_push_previews ( + preview_id, task_id, content, recipients, validation + ) VALUES ( + %s, %s, %s, %s, %s + ) + """ + params = ( + preview_data['preview_id'], + preview_data['task_id'], + preview_data['content'], + preview_data['recipients'], + preview_data.get('validation') + ) + + if self.execute_update(sql, params): + return self.get_preview(preview_data['preview_id']) + return None + except Exception as e: + logger.error(f"创建预览记录失败: {e}") + return None + + def get_preview(self, preview_id: str) -> Optional[Dict]: + """获取预览记录 + + Args: + preview_id: 预览ID + + Returns: + 预览记录,不存在返回None + """ + sql = "SELECT * FROM t_push_previews WHERE preview_id = %s" + return self.execute_query(sql, (preview_id,), fetch_one=True) + + def update_preview_status(self, preview_id: str, status: str) -> bool: + """更新预览状态 + + Args: + preview_id: 预览ID + status: 新状态 + + Returns: + 是否更新成功 + """ + sql = """ + UPDATE t_push_previews + SET status = %s + WHERE preview_id = %s + """ + return self.execute_update(sql, (status, preview_id)) + + def create_feedback(self, feedback_data: Dict) -> bool: + """创建反馈记录 + + Args: + feedback_data: 反馈数据 + + Returns: + 是否创建成功 + """ + try: + sql = """ + INSERT INTO t_push_feedback ( + feedback_id, task_id, user_id, content + ) VALUES ( + %s, %s, %s, %s + ) + """ + params = ( + feedback_data['feedback_id'], + feedback_data['task_id'], + feedback_data['user_id'], + feedback_data['content'] + ) + return self.execute_update(sql, params) + except Exception as e: + logger.error(f"创建反馈记录失败: {e}") + return False + + def get_task_feedback(self, task_id: str, start_time: str = None, end_time: str = None) -> List[Dict]: + """获取任务反馈 + + Args: + task_id: 任务ID + start_time: 开始时间 + end_time: 结束时间 + + Returns: + 反馈列表 + """ + try: + sql = """ + SELECT * + FROM t_push_feedback + WHERE task_id = %s + """ + params = [task_id] + + if start_time: + sql += " AND timestamp >= %s" + params.append(start_time) + if end_time: + sql += " AND timestamp <= %s" + params.append(end_time) + + sql += " ORDER BY timestamp DESC" + return self.execute_query(sql, tuple(params)) + except Exception as e: + logger.error(f"获取任务反馈失败: {e}") + return [] + + def get_tasks_list(self, status: str = None, start_time: str = None, end_time: str = None, page: int = 1, + limit: int = 20) -> Tuple[List[Dict], int]: + """获取任务列表 + + Args: + status: 任务状态 + start_time: 开始时间 + end_time: 结束时间 + page: 页码 + limit: 每页数量 + + Returns: + (任务列表, 总数) + """ + try: + # 构建查询条件 + conditions = [] + params = [] + if status: + conditions.append("status = %s") + params.append(status) + if start_time: + conditions.append("schedule_time >= %s") + params.append(start_time) + if end_time: + conditions.append("schedule_time <= %s") + params.append(end_time) + + # 构建SQL + sql = "SELECT * FROM t_push_tasks" + if conditions: + sql += " WHERE " + " AND ".join(conditions) + sql += " ORDER BY created_at DESC LIMIT %s OFFSET %s" + params.extend([limit, (page - 1) * limit]) + + # 查询总数 + count_sql = f"SELECT COUNT(*) FROM t_push_tasks" + if conditions: + count_sql += " WHERE " + " AND ".join(conditions) + count_result = self.execute_query(count_sql, params[:-2], fetch_one=True) + total = count_result['COUNT(*)'] if count_result else 0 + + # 查询数据 + tasks = self.execute_query(sql, params) + + # 处理JSON字段 + for task in tasks: + for field in ['groups', 'content_miniprogram', 'preview_recipients']: + if task.get(field): + task[field] = json.loads(task[field]) + + return tasks, total + except Exception as e: + logger.error(f"获取任务列表失败: {e}") + return [], 0 + + def get_task_logs_with_pagination(self, task_id: str, page: int = 1, limit: int = 20) -> Dict: + """获取任务日志(带分页) + + Args: + task_id: 任务ID + page: 页码 + limit: 每页数量 + + Returns: + 包含日志列表和分页信息的字典 + """ + try: + # 查询总数 + count_sql = """ + SELECT COUNT(*) as total + FROM t_push_task_logs + WHERE task_id = %s + """ + count_result = self.execute_query(count_sql, (task_id,), fetch_one=True) + total = count_result['total'] if count_result else 0 + + # 查询日志列表 + sql = """ + SELECT * + FROM t_push_task_logs + WHERE task_id = %s + ORDER BY timestamp DESC + LIMIT %s OFFSET %s + """ + offset = (page - 1) * limit + logs = self.execute_query(sql, (task_id, limit, offset)) + + # 处理JSON字段 + for log in logs: + if log.get('changes'): + log['changes'] = json.loads(log['changes']) + + return { + 'logs': logs, + 'total': total, + 'page': page, + 'limit': limit + } + except Exception as e: + logger.error(f"获取任务日志失败: {e}") + return {'logs': [], 'total': 0, 'page': page, 'limit': limit} diff --git a/plugins/message_push_task/PRD.md b/plugins/message_push_task/PRD.md new file mode 100644 index 0000000..eb07225 --- /dev/null +++ b/plugins/message_push_task/PRD.md @@ -0,0 +1,565 @@ +# 产品需求文档(PRD):定时推送功能(更新) + +## 1. 文档信息 +- **文档名称**:定时推送功能产品需求文档 +- **版本**:1.7 +- **作者**:Grok(产品经理助手) +- **创建日期**:2025年6月9日 +- **更新日期**:2025年6月9日 +- **状态**:更新稿 + +## 2. 背景与目标 +### 2.1 背景 +目标产品为基于微信平台的运营工具,服务于微信群管理者(如教育机构、社区运营者、商家等),用于向微信群用户推送消息。人工定时推送耗时耗力,管理复杂。本功能通过管理后台提供自动化定时推送和任务管理,预览结果通过微信服务通知发送给指定人员,减少操作负担,提升效率。 + +### 2.2 目标 +- **主要目标**:通过管理后台实现高效任务管理(创建、编辑、删除、批量操作),通过微信服务通知发送预览结果,确保推送内容准确。 +- **次要目标**: + - 提供直观的任务管理界面,简化多群管理。 + - 确保预览结果通过微信可靠送达,减少错误推送。 + - 提升群用户互动率,增加群活跃度。 + +### 2.3 成功指标(KPI) +- 群管理者手动推送时间减少80%. +- 定时推送消息送达率≥98%. +- 预览结果发送成功率≥95%. +- 批量操作使用率≥50%. +- API响应时间<500ms(95%请求)。 +- 管理后台满意度评分≥4.5/5。 + +## 3. 用户画像 +- **目标用户**:微信群管理者(教育机构老师、社区管理员、电商运营者),管理多个微信群,需定期推送消息。 +- **用户痛点**: + - 多群任务管理复杂,缺乏批量操作。 + - 推送内容易出错,需可靠预览机制。 +- **用户需求**: + - 集中化任务管理,支持批量操作。 + - 通过微信接收准确的预览结果,确认内容。 + - 可靠的API支持,确保功能稳定。 + +## 4. 功能需求 +### 4.1 核心功能 +#### 4.1.1 管理后台 - 任务管理 +- **任务创建**: + - 配置项: + - **任务名称**:≤50字,例:“周一课程提醒”。 + - **推送时间**:单次(例:2025-06-10 18:00 PDT)或周期性(例:每周一 18:00 PDT,持续N周或永久)。 + - **推送内容**:文本(≤500字)、图片(≤2MB,JPEG/PNG)、链接(支持小程序链接)、小程序卡片。 + - **目标群**:多选(≤100个群),支持搜索和全选/反选。 + - **优先级**:高/中/低,高优先级任务优先调度。 + - **预览接收人**:指定微信用户(通过微信ID),接收预览结果。 + - 支持“保存为草稿”。 +- **任务列表**: + - 显示字段:任务ID、名称、时间、目标群、状态(草稿/待执行/进行中/已完成/失败/暂停)、创建者、预览接收人、操作(编辑/删除/暂停/恢复/复制)。 + - 支持筛选(状态/时间/群/创建者/接收人)、排序(时间/状态/优先级)、分页(每页20条)、导出(CSV)。 +- **批量操作**:批量编辑(时间/目标群/接收人)、删除、暂停/恢复、复制。 +- **任务编辑**:修改未执行/周期性任务,记录修改日志(30天)。 +- **任务删除/暂停**:删除需二次确认,暂停支持恢复时间设置。 +- **任务状态监控**:实时更新状态,失败任务显示错误原因(例:“群聊不存在”)。 + +#### 4.1.2 预览功能 +- **微信预览**: + - 在任务创建/编辑时,点击“发送预览”,系统通过微信服务通知将推送内容发送给指定接收人。 + - 预览内容: + - 文本:支持字体、换行、表情符号。 + - 图片:压缩至微信规范(≤2MB)。 + - 链接/小程序卡片:显示标题、缩略图、可点击。 + - 预览消息模拟微信群聊效果,附带任务名称和确认提示(例:“请确认内容,回复‘确认’或‘修改’”)。 +- **预览校验**: + - 自动校验: + - 文本:≤500字,含表情符号。 + - 图片:≤2MB,JPEG/PNG。 + - 链接:有效URL,优先支持小程序链接。 + - 校验失败时,管理后台显示警告(例:“文本超长,请精简”),禁止发送预览。 +- **预览确认**: + - 接收人通过微信回复“确认”或“修改”。 + - 确认后,任务标记为“预览通过”,允许提交。 + - 修改请求返回管理后台,提示重新编辑。 +- **预览模板**: + - 提供模板(例:课程提醒、活动通知),一键应用。 + - 支持自定义模板并保存,供复用。 + +#### 4.1.3 消息发送 +- 通过微信API批量发送消息(≤100群),失败重试3次。 +- 记录送达时间、状态。 + +#### 4.1.4 通知与反馈 +- **管理者通知**: + - 推送前1小时提醒指定接收人(微信服务通知)。 + - 推送完成后,通知创建者和接收人(成功/失败,含错误原因)。 +- **用户反馈**: + - 推送消息可嵌入反馈链接(小程序表单)。 + - 反馈汇总至后台,生成报表(互动率、常见意见)。 + +### 4.2 非功能性需求 +- **性能**: + - 任务列表加载<2秒,预览发送<3秒,API响应<500ms。 + - 推送时间偏差<±1分钟。 +- **可靠性**:送达率≥98%,预览发送成功率≥95%,后台uptime 99.9%. +- **安全性与合规**: + - 遵守微信API规范和《个人信息保护法》。 + - 数据使用AES-256加密。 +- **平台支持**: + - Web:HTML + ElementUI(Vue.js 2.x),兼容Chrome、Safari、Edge. + - 微信小程序:iOS 15+/Android 10+. +- **多语言支持**:默认中文,预留英文。 + +## 5. 用户流程 +1. 管理者登录管理后台(Web/小程序)。 +2. 进入“定时推送”,点击“新建任务”。 +3. 配置任务(名称、时间、内容、群、预览接收人)。 +4. 点击“发送预览”,系统校验内容并通过微信服务通知发送预览。 +5. 接收人通过微信回复“确认”或“修改”。 +6. 确认后提交任务,推送前1小时收到提醒。 +7. 系统通过微信API发送消息,记录结果。 +8. 管理者查看/编辑/批量操作任务。 + +## 6. 界面 Design (Wireframe) +- **Web管理后台(HTML + ElementUI)**: + - **任务列表**: + - ElementUI Table组件,字段:任务ID、名称、时间、群、状态、创建者、接收人、操作。 + - 顶部:Select/Input(筛选)、Search(搜索)、Button(导出)。 + - 底部:Pagination组件。 + - **任务创建/编辑**: + - Form组件:Input(名称)、DatePicker(时间)、TextArea(内容)、Upload(图片)、Select(群、接收人)。 + - 底部:Button(发送预览、保存草稿、提交、取消)。 + - **任务详情**:Descriptions组件显示详情,Table组件显示日志,Button(编辑/删除/暂停/复制)。 +- **小程序**: + - **任务列表**:卡片式,滑动查看,顶部筛选/搜索。 + - **任务创建/编辑**:分步表单(时间→内容→群→接收人→发送预览)。 +- **预览**:通过微信服务通知发送,无前端界面。 + +## 7. 技术实现 +- **前端**: + - **Web**:HTML + ElementUI(Vue.js 2.x),通过CDN加载ElementUI。 + - **小程序**:微信小程序框架(WXML + WXSS + JavaScript)。 +- **后端**: + - **框架**:Python + Flask。 + - **数据库**:MySQL(任务、日志存储),Redis(调度、缓存)。 + - **定时任务**:Python `asyncio.gather` 实现并发任务调度,结合MySQL存储任务状态。 + - **预览实现**:Flask调用微信服务通知API,生成模拟群聊格式。 + - **依赖库**: + - Flask:Web框架。 + - PyMySQL:MySQL连接。 + - redis-py:Redis连接。 + - aiohttp:异步HTTP请求(微信API调用)。 + - python-jwt:JWT认证。 +- **微信API**: + - 消息推送:`cgi-bin/message/send`。 + - 服务通知:小程序服务通知接口。 +- **监控**: + - 日志:Flask-Logging记录任务执行和预览发送,存储30天。 + - 告警:失败通过企业微信通知开发团队。 + +### 7.1 数据库 Schema (MySQL) +- **t_tasks** 表: + ```sql + CREATE TABLE t_tasks ( + task_id VARCHAR(36) PRIMARY KEY, + name VARCHAR(50) NOT NULL, + schedule_type ENUM('once', 'recurring') NOT NULL, + schedule_time DATETIME NOT NULL, + recurring_interval ENUM('daily', 'weekly', 'monthly') DEFAULT NULL, + recurring_end DATETIME DEFAULT NULL, + content_text TEXT(500), + content_image VARCHAR(255), + content_link VARCHAR(255), + content_miniprogram JSON, + groups JSON, + priority ENUM('high', 'medium', 'low') DEFAULT 'medium', + status ENUM('draft', 'scheduled', 'running', 'completed', 'failed', 'paused') DEFAULT 'draft', + creator_id VARCHAR(50) NOT NULL, + preview_recipients JSON, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP + ); + ``` +- **t_task_logs** 表: + ```sql + CREATE TABLE t_task_logs ( + log_id VARCHAR(36) PRIMARY KEY, + task_id VARCHAR(36) NOT NULL, + action ENUM('create', 'update', 'delete', 'pause', 'resume') NOT NULL, + user_id VARCHAR(50) NOT NULL, + changes JSON, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (task_id) REFERENCES t_tasks(task_id) + ); + ``` +- **t_previews** 表: + ```sql + CREATE TABLE t_previews ( + preview_id VARCHAR(36) PRIMARY KEY, + task_id VARCHAR(36) NOT NULL, + content JSON NOT NULL, + recipients JSON NOT NULL, + validation JSON, + status ENUM('sent', 'confirmed', 'modified') DEFAULT 'sent', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (task_id) REFERENCES t_tasks(task_id) + ); + ``` +- **t_feedback** 表: + ```sql + CREATE TABLE t_feedback ( + feedback_id VARCHAR(36) PRIMARY KEY, + task_id VARCHAR(36) NOT NULL, + user_id VARCHAR(50) NOT NULL, + content TEXT NOT NULL, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (task_id) REFERENCES t_tasks(task_id) + ); + ``` + +### 7.2 定时任务实现 +- 使用 `asyncio.gather` 实现并发任务调度: + - 启动时从MySQL加载待执行任务(`status='scheduled'`)。 + - 每个任务分配协程,检查时间是否到达。 + - 使用 `asyncio.sleep` 控制调度间隔(例:每分钟检查)。 + - 并发调用微信API发送消息,记录结果。 +- Redis缓存任务状态,减少MySQL压力。 +- 示例伪代码: + ```python + import asyncio + import pymysql + import redis + import aiohttp + + async def schedule_tasks(): + conn = pymysql.connect(...) # MySQL连接 + redis_client = redis.Redis(...) # Redis连接 + async with aiohttp.ClientSession() as session: + while True: + with conn.cursor() as cursor: + cursor.execute("SELECT task_id, schedule_time, content, groups FROM t_tasks WHERE status = 'scheduled' AND schedule_time <= NOW()") + tasks = cursor.fetchall() + coroutines = [execute_task(task, session) for task in tasks] + await asyncio.gather(*coroutines) + await asyncio.sleep(60) # 每分钟检查 + + async def execute_task(task, session): + # 调用微信API发送消息 + async with session.post('https://api.weixin.qq.com/cgi-bin/message/send', json=task['content']) as resp: + result = await resp.json() + # 更新任务状态 + with conn.cursor() as cursor: + cursor.execute("UPDATE t_tasks SET status = %s WHERE task_id = %s", (result['status'], task['task_id'])) + conn.commit() + ``` + +## 8. 接口文档 +### 8.1 概述 +RESTful API,JSON格式,HTTPS协议,使用JWT认证(`Authorization`头)。基于Flask实现,遵守微信API规范。 + +### 8.2 认证 +- **登录**:`/api/auth/login` + - **Method**: POST + - **Request**: + ```json + { + "wechatId": "string", + "password": "string" + } + ``` + - **Response**: + - 200: `{ "token": "string", "user": { "id": "string" } }` + - 401: `{ "error": "Invalid credentials" }` + - **Description**: 验证用户身份,返回JWT。 + +### 8.3 任务管理 +- **创建任务**:`/api/tasks` + - **Method**: POST + - **Request**: + ```json + { + "name": "string", + "schedule": { + "type": "once/recurring", + "time": "2025-06-10T18:00:00Z", + "recurringInterval": "weekly", + "recurringEnd": "2025-12-31T23:59:59Z" + }, + "content": { + "text": "string", + "image": "string", + "link": "string", + "miniprogram": { "appid": "string", "pagePath": "string" } + }, + "groups": ["string"], + "priority": "high/medium/low", + "status": "draft/scheduled", + "previewRecipients": ["string"] + } + ``` + - **Response**: + - 201: `{ "taskId": "string", "message": "Task created" }` + - 400: `{ "error": "Invalid input" }` + - **Description**: 创建任务,包含预览接收人。 + +- **获取任务列表**:`/api/tasks` + - **Method**: GET + - **Query Parameters**: + - `status`: string + - `startTime`: string (ISO 8601) + - `endTime`: string (ISO 8601) + - `groupId`: string + - `creatorId`: string + - `recipientId`: string + - `page`: number (default: 1) + - `limit`: number (default: 20) + - **Response**: + - 200: + ```json + { + "tasks": [ + { + "taskId": "string", + "name": "string", + "schedule": { ... }, + "content": { ... }, + "groups": ["string"], + "priority": "high/medium/low", + "status": "draft/scheduled/running/completed/failed/paused", + "creatorId": "string", + "previewRecipients": ["string"], + "createdAt": "2025-06-09T18:00:00Z", + "updatedAt": "2025-06-09T18:00:00Z" + } + ], + "total": number, + "page": number, + "limit": number + } + ``` + - 400: `{ "error": "Invalid query" }` + - **Description**: 获取任务列表,支持筛选。 + +- **更新任务**:`/api/tasks/` + - **Method**: PUT + - **Request**: 同创建任务,支持部分更新。 + - **Response**: + - 200: `{ "taskId": "string", "message": "Task updated" }` + - 404: `{ "error": "Task not found" }` + - **Description**: 编辑任务,记录修改日志。 + +- **删除任务**:`/api/tasks/` + - **Method**: DELETE + - **Response**: + - 200: `{ "message": "Task deleted" }` + - 404: `{ "error": "Task not found" }` + - **Description**: 删除草稿或暂停任务。 + +- **批量操作**:`/api/tasks/batch` + - **Method**: POST + - **Request**: + ```json + { + "taskIds": ["string"], + "action": "delete/pause/resume/copy", + "updates": { ... } + } + ``` + - **Response**: + - 200: `{ "message": "Batch operation completed", "failedTasks": [{ "taskId": "string", "error": "string" }] }` + - 400: `{ "error": "Invalid request" }` + - **Description**: 批量操作任务。 + +- **获取任务日志**:`/api/tasks//logs` + - **Method**: GET + - **Response**: + - 200: + ```json + [ + { + "logId": "string", + "taskId": "string", + "action": "create/update/delete/pause/resume", + "userId": "string", + "timestamp": "2025-06-09T18:00:00Z", + "changes": { ... } + } + ] + ``` + - 404: `{ "error": "Task not found" }` + - **Description**: 获取任务修改日志。 + +### 8.4 预览功能 +- **发送预览**:`/api/preview` + - **Method**: POST + - **Request**: + ```json + { + "taskId": "string", + "content": { + "text": "string", + "image": "string", + "link": "string", + "miniprogram": { "appid": "string", "pagePath": "string" } + }, + "recipients": ["string"] + } + ``` + - **Response**: + - 200: + ```json + { + "previewId": "string", + "validation": { + "isValid": boolean, + "errors": ["string"] + } + } + ``` + - 400: `{ "error": "Invalid content" }` + - 429: `{ "error": "Rate limit exceeded" }` + - **Description**: 校验内容并通过微信服务通知发送预览。 + +- **确认预览**:`/api/preview/confirm` + - **Method**: POST + - **Request**: + ```json + { + "previewId": "string", + "response": "confirm/modify", + "comment": "string" + } + ``` + - **Response**: + - 200: `{ "message": "Preview confirmed" }` + - 404: `{ "error": "Preview not found" }` + - **Description**: 接收人回复确认或修改。 + +- **获取预览模板**:`/api/preview/templates` + - **Method**: GET + - **Response**: + - 200: + ```json + [ + { + "templateId": "string", + "name": "string", + "content": { ... } + } + ] + ``` + - **Description**: 获取模板列表。 + +- **保存预览模板**:`/api/preview/templates` + - **Method**: POST + - **Request**: + ```json + { + "name": "string", + "content": { ... } + } + ``` + - **Response**: + - 201: `{ "templateId": "string", "message": "Template saved" }` + - 400: `{ "error": "Invalid template" }` + - **Description**: 保存自定义模板。 + +### 8.5 消息推送 +- **触发推送**:`/api/push` + - **Method**: POST + - **Request**: + ```json + { + "taskId": "string" + } + ``` + - **Response**: + - 200: `{ "message": "Push started", "pushId": "string" }` + - 404: `{ "error": "Task not found" }` + - **Description**: 手动触发推送(测试用)。 + +- **推送状态**:`/api/push/` + - **Method**: GET + - **Response**: + - 200: + ```json + { + "pushId": "string", + "taskId": "string", + "status": "running/completed/failed", + "sentGroups": ["string"], + "failedGroups": [{ "groupId": "string", "error": "string" }], + "timestamp": "2025-06-09T18:00:00Z" + } + ``` + - 404: `{ "error": "Push not found" }` + - **Description**: 查询推送状态。 + +### 8.6 通知与反馈 +- **发送通知**:`/api/notifications` + - **Method**: POST + - **Request**: + ```json + { + "userId": "string", + "type": "reminder/result/preview", + "taskId": "string", + "content": { "message": "string", "status": "success/failed", "error": "string" } + } + ``` + - **Response**: + - 200: `{ "message": "Notification sent" }` + - 400: `{ "error": "Invalid notification" }` + - **Description**: 发送预览、提醒或结果通知。 + +- **获取反馈**:`/api/feedback` + - **Method**: GET + - **Query Parameters**: + - `taskId`: string + - `startTime`: string (ISO 8601) + - `endTime`: string (ISO 8601) + - **Response**: + - 200: + ```json + { + "feedback": [ + { + "feedbackId": "string", + "taskId": "string", + "userId": "string", + "content": "string", + "timestamp": "2025-06-09T18:00:00Z" + } + ], + "total": number + } + ``` + - **Description**: 获取群用户反馈。 + +## 9. 风险与挑战 +- **风险**: + - 微信API限制影响推送或预览发送。 + - 高并发任务可能导致数据库瓶颈(无消息队列)。 + - 预览消息与实际推送不一致。 +- **应对措施**: + - 申请高额度API,限制并发任务数量(≤100群)。 + - 优化MySQL查询,使用索引,Redis缓存任务状态。 + - 预览消息遵循微信格式,增加测试用例。 + +## 10. 时间计划 +- **需求分析**:2025年6月10日 - 6月15日 +- **设计与原型**:2025年6月16日 - 6月25日 +- **开发**:2025年6月26日 - 8月10日 +- **测试与优化**:2025年8月11日 - 8月20日 +- **上线**:2025年8月25日 + +## 11. 依赖与资源 +- **依赖**: + - 微信公众平台账号(消息推送、服务通知权限)。 + - 开发团队熟悉微信API、Vue.js、Python、Flask、MySQL。 + - 依赖库:Flask, PyMySQL, redis-py, aiohttp, python-jwt. +- **资源**: + - 1名产品经理 + - 2名前端工程师(Web + 小程序) + - 2名后端工程师 + - 1名UI/UX设计师 + - 1名测试工程师 + +## 12. 后续计划 +- **1.8版本**:支持A/B测试,优化推送效果。 +- **1.9版本**:AI生成推送内容,减少编辑时间。 +- **长期目标**:扩展至企业微信、短信等场景。 \ No newline at end of file diff --git a/plugins/message_push_task/__init__.py b/plugins/message_push_task/__init__.py new file mode 100644 index 0000000..6349b04 --- /dev/null +++ b/plugins/message_push_task/__init__.py @@ -0,0 +1,7 @@ +# 从当前包的main模块导入MessagePushTask类 +from .main import MessagePushTask + +# 提供get_plugin函数,返回插件实例 +def get_plugin(): + """获取插件实例""" + return MessagePushTask() diff --git a/plugins/message_push_task/config.toml b/plugins/message_push_task/config.toml new file mode 100644 index 0000000..00beb97 --- /dev/null +++ b/plugins/message_push_task/config.toml @@ -0,0 +1,9 @@ +[MessagePush] +enable = true +command = ["推送", "消息推送", "定时推送"] +command-format = """ +📢推送指令: +推送 消息内容 +任务 创建/编辑/删除/暂停/恢复 +确认/修改 预览消息 +""" \ No newline at end of file diff --git a/plugins/message_push_task/main.py b/plugins/message_push_task/main.py new file mode 100644 index 0000000..0ae35f8 --- /dev/null +++ b/plugins/message_push_task/main.py @@ -0,0 +1,279 @@ +from datetime import datetime +from typing import Dict, Any, List, Optional, Tuple + +from loguru import logger + +from base.plugin_common.message_plugin_interface import MessagePluginInterface +from base.plugin_common.plugin_interface import PluginStatus +from db.task_db import TaskDBOperator +from utils.decorator.async_job import async_job +from utils.decorator.plugin_decorators import plugin_stats_decorator +from utils.robot_cmd.robot_command import PermissionStatus, GroupBotManager +from wechat_ipad import WechatAPIClient + + +class MessagePushTask(MessagePluginInterface): + """消息推送任务插件""" + + # 功能权限常量 + FEATURE_KEY = "MESSAGE_PUSH" + FEATURE_DESCRIPTION = "📢 消息推送功能 [推送, 消息推送, 定时推送]" + + @property + def name(self) -> str: + return "消息推送任务" + + @property + def version(self) -> str: + return "1.0.0" + + @property + def description(self) -> str: + return "提供消息推送功能,支持定时推送和群发消息" + + @property + def author(self) -> str: + return "Trae AI" + + @property + def command_prefix(self) -> Optional[str]: + return "" # 不需要前缀,直接匹配命令 + + @property + def commands(self) -> List[str]: + return self._commands + + def __init__(self): + super().__init__() + self.feature = self.register_feature() + self.db = None + async_job.every_seconds(5)(self.process_scheduled_tasks) + + def initialize(self, context: Dict[str, Any]) -> bool: + """初始化插件""" + self.LOG = logger + self.LOG.info(f"正在初始化 {self.name} 插件...") + + # 保存上下文对象 + self.event_system = context.get("event_system") + self.db_manager = context.get("db_manager") + + # 初始化组件 + self.db = TaskDBOperator(self.db_manager) + + # 初始化数据库表 + if not self.db.init_tables(): + self.LOG.error("初始化数据库表失败") + return False + + # 加载配置 + self._commands = self._config.get("MessagePush", {}).get("command", ["推送", "消息推送"]) + self.command_format = self._config.get("MessagePush", {}).get("command-format", "推送 消息内容") + self.enable = self._config.get("MessagePush", {}).get("enable", True) + + self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") + return True + + def start(self) -> bool: + """启动插件""" + self.LOG.info(f"[{self.name}] 插件已启动") + self.status = PluginStatus.RUNNING + return True + + def stop(self) -> bool: + """停止插件""" + self.LOG.info(f"[{self.name}] 插件已停止") + self.status = PluginStatus.STOPPED + return True + + def can_process(self, message: Dict[str, Any]) -> bool: + """检查是否可以处理该消息""" + if not self.enable: + return False + + content = str(message.get("content", "")).strip() + command = content.split(" ")[0] + + return command in self._commands + + @plugin_stats_decorator(plugin_name="消息推送任务") + async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """处理消息""" + content = str(message.get("content", "")).strip() + self.LOG.debug(f"插件执行: {self.name}:{content}") + command = content.split(" ")[0] + sender = message.get("sender") + roomid = message.get("roomid", "") + gbm: GroupBotManager = message.get("gbm") + bot: WechatAPIClient = message.get("bot") + + # 检查命令格式 + if len(content.split(" ")) == 1: + await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", + sender) + return False, "命令格式错误" + + # 检查权限 + if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: + return False, "没有权限" + + async def process_scheduled_tasks(self): + """处理定时任务""" + try: + # 获取待执行的任务 + tasks = self.db.get_scheduled_tasks() + if not tasks: + return + + for task in tasks: + try: + self.LOG.info(f"开始处理定时任务: {task['task_id']}") + + # 更新任务状态为执行中 + self.db.update_task(task['task_id'], {'status': 'running'}) + + # 获取任务内容 + content_text = task.get('content_text') + content_image = task.get('content_image') + content_link = task.get('content_link') + content_miniprogram = task.get('content_miniprogram') + groups = task.get('groups', []) + + # 记录任务开始执行 + self.db.log_task_action({ + 'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}", + 'task_id': task['task_id'], + 'action': 'start', + 'user_id': task['creator_id'], + 'changes': {'status': 'running'} + }) + + # 发送消息到目标群组 + bot = self.event_system.get_bot() + success_count = 0 + fail_count = 0 + + for group_id in groups: + try: + # 发送文本消息 + if content_text: + await bot.send_text_message(group_id, content_text) + + # 发送图片消息 + if content_image: + await bot.send_image_message(group_id, content_image) + + # 发送链接消息 + if content_link: + await bot.send_link_message(group_id, content_link) + + # 发送小程序消息 + if content_miniprogram: + await bot.send_miniprogram_message( + group_id, + content_miniprogram.get('title'), + content_miniprogram.get('appid'), + content_miniprogram.get('pagepath'), + content_miniprogram.get('thumb_url') + ) + + success_count += 1 + + except Exception as e: + self.LOG.error(f"发送消息到群组 {group_id} 失败: {e}") + fail_count += 1 + + # 更新任务状态 + if fail_count == 0: + status = 'completed' + elif success_count == 0: + status = 'failed' + else: + status = 'partially_completed' + + self.db.update_task(task['task_id'], { + 'status': status, + 'success_count': success_count, + 'fail_count': fail_count + }) + + # 记录任务完成 + self.db.log_task_action({ + 'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}", + 'task_id': task['task_id'], + 'action': 'complete', + 'user_id': task['creator_id'], + 'changes': { + 'status': status, + 'success_count': success_count, + 'fail_count': fail_count + } + }) + + # 如果是重复任务,更新下次执行时间 + if task['schedule_type'] == 'recurring': + next_time = self._calculate_next_schedule_time( + task['schedule_time'], + task['recurring_interval'], + task['recurring_end'] + ) + if next_time: + self.db.update_task(task['task_id'], { + 'schedule_time': next_time, + 'status': 'scheduled' + }) + + self.LOG.info(f"定时任务 {task['task_id']} 处理完成,状态: {status}") + + except Exception as e: + self.LOG.error(f"处理定时任务 {task['task_id']} 失败: {e}") + # 更新任务状态为失败 + self.db.update_task(task['task_id'], {'status': 'failed'}) + # 记录错误日志 + self.db.log_task_action({ + 'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}", + 'task_id': task['task_id'], + 'action': 'error', + 'user_id': task['creator_id'], + 'changes': {'error': str(e)} + }) + + except Exception as e: + self.LOG.error(f"处理定时任务出错: {e}") + + def _calculate_next_schedule_time(self, current_time: datetime, interval: str, end_time: datetime = None) -> \ + Optional[datetime]: + """计算下次执行时间 + + Args: + current_time: 当前执行时间 + interval: 重复间隔(daily/weekly/monthly) + end_time: 结束时间 + + Returns: + 下次执行时间,如果已超过结束时间则返回None + """ + try: + if not end_time or current_time < end_time: + if interval == 'daily': + next_time = current_time.replace(day=current_time.day + 1) + elif interval == 'weekly': + next_time = current_time.replace(day=current_time.day + 7) + elif interval == 'monthly': + # 处理月份边界情况 + if current_time.month == 12: + next_time = current_time.replace(year=current_time.year + 1, month=1) + else: + next_time = current_time.replace(month=current_time.month + 1) + else: + return None + + # 检查是否超过结束时间 + if end_time and next_time > end_time: + return None + + return next_time + return None + except Exception as e: + self.LOG.error(f"计算下次执行时间失败: {e}") + return None