Files
abot/admin/dashboard/blueprints/message_push.py
2025-06-10 15:40:09 +08:00

528 lines
17 KiB
Python

import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
import os
import json
import uuid
from datetime import datetime
from flask import Blueprint, render_template, jsonify, request, current_app, session
from pathlib import Path
from werkzeug.utils import secure_filename
from wechat_ipad.models.appmsg_xml import LINK_XML_NORMAL
from .auth import login_required
from loguru import logger
# 创建消息推送管理蓝图
message_push_bp = Blueprint('message_push', __name__, url_prefix='/message_push')
# 创建线程池
message_thread_pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="message_sender_")
# 创建共享的事件循环
shared_loop = None
loop_lock = threading.Lock()
# 允许的图片文件扩展名
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
def get_or_create_loop():
"""获取或创建共享的事件循环"""
global shared_loop
with loop_lock:
if shared_loop is None:
shared_loop = asyncio.new_event_loop()
# 在新线程中运行事件循环
def run_loop():
asyncio.set_event_loop(shared_loop)
shared_loop.run_forever()
loop_thread = threading.Thread(target=run_loop, daemon=True)
loop_thread.start()
return shared_loop
def send_message_in_thread(func, *args, **kwargs):
"""使用共享事件循环发送消息"""
def run():
try:
loop = get_or_create_loop()
# 创建异步任务
async def send():
try:
await func(*args, **kwargs)
except Exception as e:
logger.error(f"发送消息失败: {e}")
# 在共享事件循环中运行任务
future = asyncio.run_coroutine_threadsafe(send(), loop)
# 等待任务完成,设置超时时间
future.result(timeout=10)
except Exception as e:
logger.error(f"消息发送任务执行失败: {e}")
# 使用线程池提交任务
message_thread_pool.submit(run)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# 消息推送管理页面
@message_push_bp.route('/')
@login_required
def message_push_management():
"""消息推送管理页面"""
return render_template('message_push_management.html')
# API路由
@message_push_bp.route('/api/tasks', methods=['GET'])
@login_required
def api_tasks_list():
"""获取任务列表API"""
try:
# 获取查询参数
status = request.args.get('status')
start_time = request.args.get('start_time')
end_time = request.args.get('end_time')
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 20))
# 获取任务列表
db = current_app.dashboard_server.task_db
tasks, total = db.get_tasks_list(status, start_time, end_time, page, limit)
return jsonify({
"success": True,
"data": {
"tasks": tasks,
"total": total,
"page": page,
"limit": limit
}
})
except Exception as e:
logger.error(f"获取任务列表失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/tasks', methods=['POST'])
@login_required
def api_create_task():
"""创建任务API"""
try:
data = request.json
if not data:
return jsonify({"success": False, "error": "无效的请求数据"}), 400
# 获取用户名
username = session.get('username')
if not username:
return jsonify({"success": False, "error": "未登录或会话已过期"}), 401
# 生成任务ID
data['task_id'] = str(uuid.uuid4())
data['creator_id'] = username
# 创建任务
db = current_app.dashboard_server.task_db
task = db.create_task(data)
if not task:
return jsonify({"success": False, "error": "创建任务失败"}), 500
return jsonify({
"success": True,
"data": {
"task": task
}
})
except Exception as e:
logger.error(f"创建任务失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/tasks/<task_id>', methods=['PUT'])
@login_required
def update_task(task_id):
"""更新任务"""
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '无效的请求数据'
})
# 获取任务信息
db = current_app.dashboard_server.task_db
task = db.get_task(task_id)
if not task:
return jsonify({
'success': False,
'message': '任务不存在'
})
# 检查任务状态
if task['status'] not in ['draft', 'paused']:
return jsonify({
'success': False,
'message': '只能编辑草稿或已暂停状态的任务'
})
# 确保 content_miniprogram 是 JSON 字符串
if 'content_miniprogram' in data and isinstance(data['content_miniprogram'], dict):
data['content_miniprogram'] = json.dumps(data['content_miniprogram'])
# 确保 groups 是 JSON 字符串
if 'groups' in data and isinstance(data['groups'], list):
data['groups'] = json.dumps(data['groups'])
# 确保 preview_recipients 是 JSON 字符串
if 'preview_recipients' in data and isinstance(data['preview_recipients'], list):
data['preview_recipients'] = json.dumps(data['preview_recipients'])
# 转换时间格式
if 'created_at' in data:
try:
date = datetime.strptime(data['created_at'], '%a, %d %b %Y %H:%M:%S GMT')
data['created_at'] = date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
pass
if 'updated_at' in data:
try:
date = datetime.strptime(data['updated_at'], '%a, %d %b %Y %H:%M:%S GMT')
data['updated_at'] = date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
pass
# 更新任务
db.update_task(task_id, data)
# 记录操作日志
db.log_task_action({
'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}",
'task_id': task_id,
'action': 'update',
'user_id': session.get('username', 'admin'), # 使用 admin 作为默认值
'changes': data
})
return jsonify({
'success': True,
'message': '更新成功'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'更新失败: {str(e)}'
})
@message_push_bp.route('/api/tasks/<task_id>', methods=['DELETE'])
@login_required
def api_delete_task(task_id):
"""删除任务API"""
try:
# 获取任务
db = current_app.dashboard_server.task_db
task = db.get_task(task_id)
if not task:
return jsonify({"success": False, "error": "任务不存在"}), 404
# 删除任务
if not db.delete_task(task_id):
return jsonify({"success": False, "error": "删除任务失败"}), 500
return jsonify({
"success": True,
"message": "任务已删除"
})
except Exception as e:
logger.error(f"删除任务失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/tasks/<task_id>/pause', methods=['POST'])
@login_required
def api_pause_task(task_id):
"""暂停任务API"""
try:
# 获取任务
db = current_app.dashboard_server.task_db
task = db.get_task(task_id)
if not task:
return jsonify({"success": False, "error": "任务不存在"}), 404
# 暂停任务
if not db.update_task(task_id, {'status': 'paused'}):
return jsonify({"success": False, "error": "暂停任务失败"}), 500
return jsonify({
"success": True,
"message": "任务已暂停"
})
except Exception as e:
logger.error(f"暂停任务失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/tasks/<task_id>/resume', methods=['POST'])
@login_required
def api_resume_task(task_id):
"""恢复任务API"""
try:
# 获取任务
db = current_app.dashboard_server.task_db
task = db.get_task(task_id)
if not task:
return jsonify({"success": False, "error": "任务不存在"}), 404
# 恢复任务
if not db.update_task(task_id, {'status': 'scheduled'}):
return jsonify({"success": False, "error": "恢复任务失败"}), 500
return jsonify({
"success": True,
"message": "任务已恢复"
})
except Exception as e:
logger.error(f"恢复任务失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/tasks/<task_id>/preview', methods=['POST'])
@login_required
def api_preview_task(task_id):
"""预览任务API"""
try:
# 获取任务
db = current_app.dashboard_server.task_db
task = db.get_task(task_id)
if not task:
return jsonify({"success": False, "error": "任务不存在"}), 404
# 获取机器人实例
server = current_app.dashboard_server
if not server or not server.client:
return jsonify({"success": False, "error": "机器人未初始化"}), 500
# 获取用户名
preview_user = session.get('username')
if not preview_user:
return jsonify({"success": False, "error": "未登录或会话已过期"}), 401
# 获取预览接收者并解析JSON
preview_recipients_str = task.get("preview_recipients", "[]")
try:
preview_recipients = json.loads(preview_recipients_str)
except json.JSONDecodeError:
return jsonify({"success": False, "error": "预览接收者格式错误"}), 400
if not preview_recipients:
return jsonify({"success": False, "error": "未设置预览接收者"}), 400
# 为每个接收者发送预览消息
for recipient in preview_recipients:
try:
# 发送文本消息
if task.get('content_text'):
send_message_in_thread(server.client.send_text_message, recipient, task['content_text'])
# 发送图片消息
if task.get('content_image'):
send_message_in_thread(server.client.send_image_message, recipient, Path(task['content_image']))
# 发送链接消息
if task.get('content_link'):
try:
link_data = json.loads(task['content_link'])
# content_link json 读取内容
xml_content = f"{LINK_XML_NORMAL}".format(title=link_data.get('title', ''),
des=link_data.get('des', ''),
url=link_data.get('url', ''),
thumburl=link_data.get('thumburl', '')
)
send_message_in_thread(
server.client.send_link_xml_message,
xml_content,
recipient
)
except json.JSONDecodeError:
logger.error(f"解析链接内容失败: {task['content_link']}")
continue
# # 发送小程序消息
# if task.get('content_miniprogram'):
# miniprogram = task['content_miniprogram']
# send_message_in_thread(
# server.client.send_miniprogram_message,
# recipient,
# miniprogram.get('title'),
# miniprogram.get('appid'),
# miniprogram.get('pagepath'),
# miniprogram.get('thumb_url')
# )
except Exception as e:
logger.error(f"发送预览消息到 {recipient} 失败: {e}")
continue
return jsonify({
"success": True,
"message": "预览已发送"
})
except Exception as e:
logger.error(f"发送预览失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/tasks/<task_id>/logs', methods=['GET'])
@login_required
def api_task_logs(task_id):
"""获取任务日志API"""
try:
# 获取查询参数
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 20))
# 查询日志
db = current_app.dashboard_server.task_db
logs_data = db.get_task_logs_with_pagination(task_id, page, limit)
return jsonify({
"success": True,
"data": logs_data
})
except Exception as e:
logger.error(f"获取任务日志失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/statistics', methods=['GET'])
@login_required
def api_statistics():
"""获取任务统计信息API"""
try:
# 获取任务数据库实例
db = current_app.dashboard_server.task_db
# 获取各种状态的任务数量
total = db.get_tasks_count()
scheduled = db.get_tasks_count_by_status('scheduled')
paused = db.get_tasks_count_by_status('paused')
completed = db.get_tasks_count_by_status('completed')
failed = db.get_tasks_count_by_status('failed')
# 获取今日任务数量
today = db.get_tasks_count_by_date(datetime.now().strftime('%Y-%m-%d'))
return jsonify({
"success": True,
"data": {
"total": total,
"scheduled": scheduled,
"paused": paused,
"completed": completed,
"failed": failed,
"today": today
}
})
except Exception as e:
logger.error(f"获取任务统计信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/upload', methods=['POST'])
def upload_file():
"""处理图片上传"""
if 'file' not in request.files:
return jsonify({
'success': False,
'message': '没有文件'
})
file = request.files['file']
if file.filename == '':
return jsonify({
'success': False,
'message': '没有选择文件'
})
if file and allowed_file(file.filename):
# 生成安全的文件名
filename = secure_filename(file.filename)
# 生成唯一文件名
unique_filename = f"{uuid.uuid4().hex}_{filename}"
# 确保上传目录存在
upload_folder = os.path.join(current_app.root_path, 'static', 'uploads')
os.makedirs(upload_folder, exist_ok=True)
# 保存文件
file_path = os.path.join(upload_folder, unique_filename)
file.save(file_path)
# 返回文件的绝对路径
return jsonify({
'success': True,
'data': {
'url': file_path # 返回绝对路径
}
})
return jsonify({
'success': False,
'message': '不支持的文件类型'
})
@message_push_bp.route('/api/tasks/<task_id>/audit', methods=['POST'])
@login_required
def audit_task(task_id):
"""审核任务"""
try:
# 获取任务信息
db = current_app.dashboard_server.task_db
task = db.get_task(task_id)
if not task:
return jsonify({
'success': False,
'message': '任务不存在'
})
# 检查任务状态
if task['status'] != 'draft':
return jsonify({
'success': False,
'message': '只能审核草稿状态的任务'
})
# 更新任务状态为已排期
db.update_task(task_id, {'status': 'scheduled'})
# 记录操作日志
db.log_task_action({
'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}",
'task_id': task_id,
'action': 'update',
'user_id': session.get('user_id'),
'changes': {'status': 'scheduled', 'action': 'audit'}
})
return jsonify({
'success': True,
'message': '审核成功'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'审核失败: {str(e)}'
})