366 lines
12 KiB
Python
366 lines
12 KiB
Python
import asyncio
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
from flask import Blueprint, render_template, jsonify, request, current_app, session
|
|
from .auth import login_required
|
|
from loguru import logger
|
|
import json
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
# 创建消息推送管理蓝图
|
|
message_push_bp = Blueprint('message_push', __name__, url_prefix='/message_push')
|
|
|
|
# 创建线程池
|
|
message_thread_pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="message_sender_")
|
|
|
|
# 创建共享的事件循环
|
|
shared_loop = None
|
|
loop_lock = threading.Lock()
|
|
|
|
|
|
def get_or_create_loop():
|
|
"""获取或创建共享的事件循环"""
|
|
global shared_loop
|
|
with loop_lock:
|
|
if shared_loop is None:
|
|
shared_loop = asyncio.new_event_loop()
|
|
|
|
# 在新线程中运行事件循环
|
|
def run_loop():
|
|
asyncio.set_event_loop(shared_loop)
|
|
shared_loop.run_forever()
|
|
|
|
loop_thread = threading.Thread(target=run_loop, daemon=True)
|
|
loop_thread.start()
|
|
return shared_loop
|
|
|
|
|
|
def send_message_in_thread(func, *args, **kwargs):
|
|
"""使用共享事件循环发送消息"""
|
|
|
|
def run():
|
|
try:
|
|
loop = get_or_create_loop()
|
|
|
|
# 创建异步任务
|
|
async def send():
|
|
try:
|
|
await func(*args, **kwargs)
|
|
except Exception as e:
|
|
logger.error(f"发送消息失败: {e}")
|
|
|
|
# 在共享事件循环中运行任务
|
|
future = asyncio.run_coroutine_threadsafe(send(), loop)
|
|
# 等待任务完成,设置超时时间
|
|
future.result(timeout=10)
|
|
except Exception as e:
|
|
logger.error(f"消息发送任务执行失败: {e}")
|
|
|
|
# 使用线程池提交任务
|
|
message_thread_pool.submit(run)
|
|
|
|
|
|
# 消息推送管理页面
|
|
@message_push_bp.route('/')
|
|
@login_required
|
|
def message_push_management():
|
|
"""消息推送管理页面"""
|
|
return render_template('message_push_management.html')
|
|
|
|
|
|
# API路由
|
|
@message_push_bp.route('/api/tasks', methods=['GET'])
|
|
@login_required
|
|
def api_tasks_list():
|
|
"""获取任务列表API"""
|
|
try:
|
|
# 获取查询参数
|
|
status = request.args.get('status')
|
|
start_time = request.args.get('start_time')
|
|
end_time = request.args.get('end_time')
|
|
page = int(request.args.get('page', 1))
|
|
limit = int(request.args.get('limit', 20))
|
|
|
|
# 获取任务列表
|
|
db = current_app.dashboard_server.task_db
|
|
tasks, total = db.get_tasks_list(status, start_time, end_time, page, limit)
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"data": {
|
|
"tasks": tasks,
|
|
"total": total,
|
|
"page": page,
|
|
"limit": limit
|
|
}
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"获取任务列表失败: {e}")
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
@message_push_bp.route('/api/tasks', methods=['POST'])
|
|
@login_required
|
|
def api_create_task():
|
|
"""创建任务API"""
|
|
try:
|
|
data = request.json
|
|
if not data:
|
|
return jsonify({"success": False, "error": "无效的请求数据"}), 400
|
|
|
|
# 获取用户名
|
|
username = session.get('username')
|
|
if not username:
|
|
return jsonify({"success": False, "error": "未登录或会话已过期"}), 401
|
|
|
|
# 生成任务ID
|
|
data['task_id'] = str(uuid.uuid4())
|
|
data['creator_id'] = username
|
|
|
|
# 创建任务
|
|
db = current_app.dashboard_server.task_db
|
|
task = db.create_task(data)
|
|
if not task:
|
|
return jsonify({"success": False, "error": "创建任务失败"}), 500
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"data": {
|
|
"task": task
|
|
}
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"创建任务失败: {e}")
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
@message_push_bp.route('/api/tasks/<task_id>', methods=['PUT'])
|
|
@login_required
|
|
def api_update_task(task_id):
|
|
"""更新任务API"""
|
|
try:
|
|
data = request.json
|
|
if not data:
|
|
return jsonify({"success": False, "error": "无效的请求数据"}), 400
|
|
|
|
# 获取任务
|
|
db = current_app.dashboard_server.task_db
|
|
task = db.get_task(task_id)
|
|
if not task:
|
|
return jsonify({"success": False, "error": "任务不存在"}), 404
|
|
|
|
# 更新任务
|
|
if not db.update_task(task_id, data):
|
|
return jsonify({"success": False, "error": "更新任务失败"}), 500
|
|
|
|
# 获取更新后的任务
|
|
updated_task = db.get_task(task_id)
|
|
return jsonify({
|
|
"success": True,
|
|
"data": {
|
|
"task": updated_task
|
|
}
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"更新任务失败: {e}")
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
@message_push_bp.route('/api/tasks/<task_id>', methods=['DELETE'])
|
|
@login_required
|
|
def api_delete_task(task_id):
|
|
"""删除任务API"""
|
|
try:
|
|
# 获取任务
|
|
db = current_app.dashboard_server.task_db
|
|
task = db.get_task(task_id)
|
|
if not task:
|
|
return jsonify({"success": False, "error": "任务不存在"}), 404
|
|
|
|
# 删除任务
|
|
if not db.delete_task(task_id):
|
|
return jsonify({"success": False, "error": "删除任务失败"}), 500
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "任务已删除"
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"删除任务失败: {e}")
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
@message_push_bp.route('/api/tasks/<task_id>/pause', methods=['POST'])
|
|
@login_required
|
|
def api_pause_task(task_id):
|
|
"""暂停任务API"""
|
|
try:
|
|
# 获取任务
|
|
db = current_app.dashboard_server.task_db
|
|
task = db.get_task(task_id)
|
|
if not task:
|
|
return jsonify({"success": False, "error": "任务不存在"}), 404
|
|
|
|
# 暂停任务
|
|
if not db.update_task(task_id, {'status': 'paused'}):
|
|
return jsonify({"success": False, "error": "暂停任务失败"}), 500
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "任务已暂停"
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"暂停任务失败: {e}")
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
@message_push_bp.route('/api/tasks/<task_id>/resume', methods=['POST'])
|
|
@login_required
|
|
def api_resume_task(task_id):
|
|
"""恢复任务API"""
|
|
try:
|
|
# 获取任务
|
|
db = current_app.dashboard_server.task_db
|
|
task = db.get_task(task_id)
|
|
if not task:
|
|
return jsonify({"success": False, "error": "任务不存在"}), 404
|
|
|
|
# 恢复任务
|
|
if not db.update_task(task_id, {'status': 'scheduled'}):
|
|
return jsonify({"success": False, "error": "恢复任务失败"}), 500
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "任务已恢复"
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"恢复任务失败: {e}")
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
@message_push_bp.route('/api/tasks/<task_id>/preview', methods=['POST'])
|
|
@login_required
|
|
def api_preview_task(task_id):
|
|
"""预览任务API"""
|
|
try:
|
|
# 获取任务
|
|
db = current_app.dashboard_server.task_db
|
|
task = db.get_task(task_id)
|
|
if not task:
|
|
return jsonify({"success": False, "error": "任务不存在"}), 404
|
|
|
|
# 获取机器人实例
|
|
server = current_app.dashboard_server
|
|
if not server or not server.client:
|
|
return jsonify({"success": False, "error": "机器人未初始化"}), 500
|
|
# 获取用户名
|
|
preview_user = session.get('username')
|
|
if not preview_user:
|
|
return jsonify({"success": False, "error": "未登录或会话已过期"}), 401
|
|
|
|
# 获取预览接收者并解析JSON
|
|
preview_recipients_str = task.get("preview_recipients", "[]")
|
|
try:
|
|
preview_recipients = json.loads(preview_recipients_str)
|
|
except json.JSONDecodeError:
|
|
return jsonify({"success": False, "error": "预览接收者格式错误"}), 400
|
|
|
|
if not preview_recipients:
|
|
return jsonify({"success": False, "error": "未设置预览接收者"}), 400
|
|
|
|
# 为每个接收者发送预览消息
|
|
for recipient in preview_recipients:
|
|
try:
|
|
# 发送文本消息
|
|
if task.get('content_text'):
|
|
send_message_in_thread(server.client.send_text_message, recipient, task['content_text'])
|
|
|
|
# 发送图片消息
|
|
if task.get('content_image'):
|
|
send_message_in_thread(server.client.send_image_message, recipient, task['content_image'])
|
|
|
|
# 发送链接消息
|
|
if task.get('content_link'):
|
|
send_message_in_thread(server.client.send_link_message, recipient, task['content_link'])
|
|
|
|
# # 发送小程序消息
|
|
# if task.get('content_miniprogram'):
|
|
# miniprogram = task['content_miniprogram']
|
|
# send_message_in_thread(
|
|
# server.client.send_miniprogram_message,
|
|
# recipient,
|
|
# miniprogram.get('title'),
|
|
# miniprogram.get('appid'),
|
|
# miniprogram.get('pagepath'),
|
|
# miniprogram.get('thumb_url')
|
|
# )
|
|
except Exception as e:
|
|
logger.error(f"发送预览消息到 {recipient} 失败: {e}")
|
|
continue
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "预览已发送"
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"发送预览失败: {e}")
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
@message_push_bp.route('/api/tasks/<task_id>/logs', methods=['GET'])
|
|
@login_required
|
|
def api_task_logs(task_id):
|
|
"""获取任务日志API"""
|
|
try:
|
|
# 获取查询参数
|
|
page = int(request.args.get('page', 1))
|
|
limit = int(request.args.get('limit', 20))
|
|
|
|
# 查询日志
|
|
db = current_app.dashboard_server.task_db
|
|
logs_data = db.get_task_logs_with_pagination(task_id, page, limit)
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"data": logs_data
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"获取任务日志失败: {e}")
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
@message_push_bp.route('/api/statistics', methods=['GET'])
|
|
@login_required
|
|
def api_statistics():
|
|
"""获取任务统计信息API"""
|
|
try:
|
|
# 获取任务数据库实例
|
|
db = current_app.dashboard_server.task_db
|
|
|
|
# 获取各种状态的任务数量
|
|
total = db.get_tasks_count()
|
|
scheduled = db.get_tasks_count_by_status('scheduled')
|
|
paused = db.get_tasks_count_by_status('paused')
|
|
completed = db.get_tasks_count_by_status('completed')
|
|
failed = db.get_tasks_count_by_status('failed')
|
|
|
|
# 获取今日任务数量
|
|
today = db.get_tasks_count_by_date(datetime.now().strftime('%Y-%m-%d'))
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"data": {
|
|
"total": total,
|
|
"scheduled": scheduled,
|
|
"paused": paused,
|
|
"completed": completed,
|
|
"failed": failed,
|
|
"today": today
|
|
}
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"获取任务统计信息失败: {e}")
|
|
return jsonify({"success": False, "error": str(e)}), 500
|