613 lines
21 KiB
Python
613 lines
21 KiB
Python
import asyncio
|
||
import threading
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
import os
|
||
import json
|
||
import uuid
|
||
from datetime import datetime, timedelta
|
||
from flask import Blueprint, render_template, jsonify, request, current_app, session
|
||
from pathlib import Path
|
||
from werkzeug.utils import secure_filename
|
||
|
||
from wechat_ipad.models.appmsg_xml import LINK_XML_NORMAL
|
||
from .auth import login_required
|
||
from loguru import logger
|
||
|
||
# 创建消息推送管理蓝图
|
||
message_push_bp = Blueprint('message_push', __name__, url_prefix='/message_push')
|
||
|
||
# 创建线程池
|
||
message_thread_pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="message_sender_")
|
||
|
||
# 创建共享的事件循环
|
||
shared_loop = None
|
||
loop_lock = threading.Lock()
|
||
|
||
# 允许的文件扩展名
|
||
ALLOWED_EXTENSIONS = {
|
||
'image': {'png', 'jpg', 'jpeg', 'gif'},
|
||
'voice': {'mp3', 'wav'},
|
||
'video': {'mp4'}
|
||
}
|
||
|
||
|
||
def get_or_create_loop():
|
||
"""获取或创建共享的事件循环"""
|
||
global shared_loop
|
||
with loop_lock:
|
||
if shared_loop is None:
|
||
shared_loop = asyncio.new_event_loop()
|
||
|
||
# 在新线程中运行事件循环
|
||
def run_loop():
|
||
asyncio.set_event_loop(shared_loop)
|
||
shared_loop.run_forever()
|
||
|
||
loop_thread = threading.Thread(target=run_loop, daemon=True)
|
||
loop_thread.start()
|
||
return shared_loop
|
||
|
||
|
||
def send_message_in_thread(func, *args, **kwargs):
|
||
"""使用共享事件循环发送消息"""
|
||
|
||
def run():
|
||
try:
|
||
loop = get_or_create_loop()
|
||
|
||
# 创建异步任务
|
||
async def send():
|
||
try:
|
||
await func(*args, **kwargs)
|
||
except Exception as e:
|
||
logger.error(f"发送消息失败: {e}")
|
||
|
||
# 在共享事件循环中运行任务
|
||
future = asyncio.run_coroutine_threadsafe(send(), loop)
|
||
# 等待任务完成,设置超时时间
|
||
future.result(timeout=30)
|
||
except Exception as e:
|
||
logger.error(f"消息发送任务执行失败: {e}")
|
||
|
||
# 使用线程池提交任务
|
||
message_thread_pool.submit(run)
|
||
|
||
|
||
def allowed_file(filename, file_type='image'):
|
||
"""检查文件类型是否允许上传"""
|
||
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS.get(file_type, set())
|
||
|
||
|
||
# 消息推送管理页面
|
||
@message_push_bp.route('/')
|
||
@login_required
|
||
def message_push_management():
|
||
"""消息推送管理页面"""
|
||
return render_template('message_push_management.html')
|
||
|
||
|
||
# API路由
|
||
@message_push_bp.route('/api/tasks', methods=['GET'])
|
||
@login_required
|
||
def api_tasks_list():
|
||
"""获取任务列表API"""
|
||
try:
|
||
# 获取查询参数
|
||
status = request.args.get('status')
|
||
start_time = request.args.get('start_time')
|
||
end_time = request.args.get('end_time')
|
||
page = int(request.args.get('page', 1))
|
||
limit = int(request.args.get('limit', 20))
|
||
|
||
# 获取任务列表
|
||
db = current_app.dashboard_server.task_db
|
||
tasks, total = db.get_tasks_list(status, start_time, end_time, page, limit)
|
||
|
||
# 处理任务数据,确保所有字段都可以JSON序列化
|
||
serialized_tasks = []
|
||
for task in tasks:
|
||
task_dict = dict(task)
|
||
# 处理recurring_time
|
||
if task_dict.get('recurring_time'):
|
||
hours = task_dict['recurring_time'].seconds // 3600
|
||
minutes = (task_dict['recurring_time'].seconds % 3600) // 60
|
||
task_dict['recurring_time'] = f"{hours:02d}:{minutes:02d}"
|
||
serialized_tasks.append(task_dict)
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"data": {
|
||
"tasks": serialized_tasks,
|
||
"total": total,
|
||
"page": page,
|
||
"limit": limit
|
||
}
|
||
})
|
||
except Exception as e:
|
||
logger.error(f"获取任务列表失败: {str(e)}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@message_push_bp.route('/api/tasks', methods=['POST'])
|
||
@login_required
|
||
def api_create_task():
|
||
"""创建任务API"""
|
||
try:
|
||
data = request.json
|
||
if not data:
|
||
return jsonify({"success": False, "error": "无效的请求数据"}), 400
|
||
|
||
# 获取用户名
|
||
username = session.get('username')
|
||
if not username:
|
||
return jsonify({"success": False, "error": "未登录或会话已过期"}), 401
|
||
|
||
# 生成任务ID
|
||
data['task_id'] = str(uuid.uuid4())
|
||
data['creator_id'] = username
|
||
|
||
# 转换时间格式
|
||
if 'created_at' in data:
|
||
try:
|
||
date = datetime.strptime(data['created_at'], '%a, %d %b %Y %H:%M:%S GMT')
|
||
data['created_at'] = date.strftime('%Y-%m-%d %H:%M:%S')
|
||
except ValueError:
|
||
pass
|
||
|
||
if 'updated_at' in data:
|
||
try:
|
||
date = datetime.strptime(data['updated_at'], '%a, %d %b %Y %H:%M:%S GMT')
|
||
data['updated_at'] = date.strftime('%Y-%m-%d %H:%M:%S')
|
||
except ValueError:
|
||
pass
|
||
|
||
if 'schedule_time' in data:
|
||
try:
|
||
date = datetime.strptime(data['schedule_time'], '%a, %d %b %Y %H:%M:%S GMT')
|
||
data['schedule_time'] = date.strftime('%Y-%m-%d %H:%M:%S')
|
||
except ValueError:
|
||
pass
|
||
|
||
if 'recurring_end' in data and data['recurring_end']:
|
||
try:
|
||
date = datetime.strptime(data['recurring_end'], '%a, %d %b %Y %H:%M:%S GMT')
|
||
data['recurring_end'] = date.strftime('%Y-%m-%d %H:%M:%S')
|
||
except ValueError:
|
||
pass
|
||
|
||
# 创建任务
|
||
db = current_app.dashboard_server.task_db
|
||
task = db.create_task(data)
|
||
if not task:
|
||
return jsonify({"success": False, "error": "创建任务失败"}), 500
|
||
|
||
# 处理返回数据中的 timedelta 类型
|
||
if task:
|
||
for key, value in task.items():
|
||
if isinstance(value, timedelta):
|
||
task[key] = str(value)
|
||
elif isinstance(value, datetime):
|
||
task[key] = value.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"data": {
|
||
"task": task
|
||
}
|
||
})
|
||
except Exception as e:
|
||
logger.exception(f"创建任务失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@message_push_bp.route('/api/tasks/<task_id>', methods=['PUT'])
|
||
@login_required
|
||
def update_task(task_id):
|
||
"""更新任务"""
|
||
try:
|
||
data = request.get_json()
|
||
if not data:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '无效的请求数据'
|
||
})
|
||
|
||
# 获取任务信息
|
||
db = current_app.dashboard_server.task_db
|
||
task = db.get_task(task_id)
|
||
if not task:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '任务不存在'
|
||
})
|
||
|
||
# 检查任务状态
|
||
if task['status'] not in ['draft', 'paused']:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '只能编辑草稿或已暂停状态的任务'
|
||
})
|
||
|
||
# 确保 content_miniprogram 是 JSON 字符串
|
||
if 'content_miniprogram' in data and isinstance(data['content_miniprogram'], dict):
|
||
data['content_miniprogram'] = json.dumps(data['content_miniprogram'])
|
||
|
||
# 确保 groups 是 JSON 字符串
|
||
if 'groups' in data and isinstance(data['groups'], list):
|
||
data['groups'] = json.dumps(data['groups'])
|
||
|
||
# 确保 preview_recipients 是 JSON 字符串
|
||
if 'preview_recipients' in data and isinstance(data['preview_recipients'], list):
|
||
data['preview_recipients'] = json.dumps(data['preview_recipients'])
|
||
|
||
# 转换时间格式
|
||
if 'created_at' in data:
|
||
try:
|
||
date = datetime.strptime(data['created_at'], '%a, %d %b %Y %H:%M:%S GMT')
|
||
data['created_at'] = date.strftime('%Y-%m-%d %H:%M:%S')
|
||
except ValueError:
|
||
pass
|
||
|
||
if 'updated_at' in data:
|
||
try:
|
||
date = datetime.strptime(data['updated_at'], '%a, %d %b %Y %H:%M:%S GMT')
|
||
data['updated_at'] = date.strftime('%Y-%m-%d %H:%M:%S')
|
||
except ValueError:
|
||
pass
|
||
|
||
if 'schedule_time' in data:
|
||
try:
|
||
date = datetime.strptime(data['schedule_time'], '%a, %d %b %Y %H:%M:%S GMT')
|
||
data['schedule_time'] = date.strftime('%Y-%m-%d %H:%M:%S')
|
||
except ValueError:
|
||
pass
|
||
|
||
if 'recurring_end' in data and data['recurring_end']:
|
||
try:
|
||
date = datetime.strptime(data['recurring_end'], '%a, %d %b %Y %H:%M:%S GMT')
|
||
data['recurring_end'] = date.strftime('%Y-%m-%d %H:%M:%S')
|
||
except ValueError:
|
||
pass
|
||
|
||
# 更新任务
|
||
db.update_task(task_id, data)
|
||
|
||
# 记录操作日志
|
||
db.log_task_action({
|
||
'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}",
|
||
'task_id': task_id,
|
||
'action': 'update',
|
||
'user_id': session.get('username', 'admin'), # 使用 admin 作为默认值
|
||
'changes': data
|
||
})
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': '更新成功'
|
||
})
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'更新失败: {str(e)}'
|
||
})
|
||
|
||
|
||
@message_push_bp.route('/api/tasks/<task_id>', methods=['DELETE'])
|
||
@login_required
|
||
def api_delete_task(task_id):
|
||
"""删除任务API"""
|
||
try:
|
||
# 获取任务
|
||
db = current_app.dashboard_server.task_db
|
||
task = db.get_task(task_id)
|
||
if not task:
|
||
return jsonify({"success": False, "error": "任务不存在"}), 404
|
||
|
||
# 删除任务
|
||
if not db.delete_task(task_id):
|
||
return jsonify({"success": False, "error": "删除任务失败"}), 500
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"message": "任务已删除"
|
||
})
|
||
except Exception as e:
|
||
logger.error(f"删除任务失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@message_push_bp.route('/api/tasks/<task_id>/pause', methods=['POST'])
|
||
@login_required
|
||
def api_pause_task(task_id):
|
||
"""暂停任务API"""
|
||
try:
|
||
# 获取任务
|
||
db = current_app.dashboard_server.task_db
|
||
task = db.get_task(task_id)
|
||
if not task:
|
||
return jsonify({"success": False, "error": "任务不存在"}), 404
|
||
|
||
# 暂停任务
|
||
if not db.update_task(task_id, {'status': 'paused'}):
|
||
return jsonify({"success": False, "error": "暂停任务失败"}), 500
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"message": "任务已暂停"
|
||
})
|
||
except Exception as e:
|
||
logger.error(f"暂停任务失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@message_push_bp.route('/api/tasks/<task_id>/resume', methods=['POST'])
|
||
@login_required
|
||
def api_resume_task(task_id):
|
||
"""恢复任务API"""
|
||
try:
|
||
# 获取任务
|
||
db = current_app.dashboard_server.task_db
|
||
task = db.get_task(task_id)
|
||
if not task:
|
||
return jsonify({"success": False, "error": "任务不存在"}), 404
|
||
|
||
# 恢复任务
|
||
if not db.update_task(task_id, {'status': 'scheduled'}):
|
||
return jsonify({"success": False, "error": "恢复任务失败"}), 500
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"message": "任务已恢复"
|
||
})
|
||
except Exception as e:
|
||
logger.error(f"恢复任务失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@message_push_bp.route('/api/tasks/<task_id>/preview', methods=['POST'])
|
||
@login_required
|
||
def api_preview_task(task_id):
|
||
"""预览任务API"""
|
||
try:
|
||
# 获取任务
|
||
db = current_app.dashboard_server.task_db
|
||
task = db.get_task(task_id)
|
||
if not task:
|
||
return jsonify({"success": False, "error": "任务不存在"}), 404
|
||
|
||
# 获取机器人实例
|
||
server = current_app.dashboard_server
|
||
if not server or not server.client:
|
||
return jsonify({"success": False, "error": "机器人未初始化"}), 500
|
||
# 获取用户名
|
||
preview_user = session.get('username')
|
||
if not preview_user:
|
||
return jsonify({"success": False, "error": "未登录或会话已过期"}), 401
|
||
|
||
# 获取预览接收者并解析JSON
|
||
preview_recipients_str = task.get("preview_recipients", "[]")
|
||
try:
|
||
preview_recipients = json.loads(preview_recipients_str)
|
||
except json.JSONDecodeError:
|
||
return jsonify({"success": False, "error": "预览接收者格式错误"}), 400
|
||
|
||
if not preview_recipients:
|
||
return jsonify({"success": False, "error": "未设置预览接收者"}), 400
|
||
|
||
# 为每个接收者发送预览消息
|
||
for recipient in preview_recipients:
|
||
try:
|
||
# 发送文本消息
|
||
if task.get('content_text'):
|
||
send_message_in_thread(server.client.send_text_message, recipient, task['content_text'])
|
||
|
||
# 发送图片消息
|
||
if task.get('content_image'):
|
||
send_message_in_thread(server.client.send_image_message, recipient, Path(task['content_image']))
|
||
|
||
# 发送语音消息
|
||
if task.get('content_voice'):
|
||
voice_path = Path(task['content_voice'])
|
||
# 根据文件扩展名确定类型
|
||
voice_type = 'wav' if voice_path.suffix.lower() == '.wav' else 'mp3'
|
||
send_message_in_thread(server.client.send_voice_message, recipient, voice_path, voice_type)
|
||
|
||
# 发送视频消息
|
||
if task.get('content_video'):
|
||
send_message_in_thread(server.client.send_video_message, recipient, Path(task['content_video']))
|
||
|
||
# 发送链接消息
|
||
if task.get('content_link'):
|
||
try:
|
||
link_data = json.loads(task['content_link'])
|
||
|
||
# content_link json 读取内容
|
||
xml_content = f"{LINK_XML_NORMAL}".format(title=link_data.get('title', ''),
|
||
des=link_data.get('des', ''),
|
||
url=link_data.get('url', ''),
|
||
thumburl=link_data.get('thumburl', '')
|
||
)
|
||
|
||
send_message_in_thread(
|
||
server.client.send_link_xml_message,
|
||
xml_content,
|
||
recipient
|
||
)
|
||
except json.JSONDecodeError:
|
||
logger.error(f"解析链接内容失败: {task['content_link']}")
|
||
continue
|
||
|
||
# # 发送小程序消息
|
||
# if task.get('content_miniprogram'):
|
||
# miniprogram = task['content_miniprogram']
|
||
# send_message_in_thread(
|
||
# server.client.send_miniprogram_message,
|
||
# recipient,
|
||
# miniprogram.get('title'),
|
||
# miniprogram.get('appid'),
|
||
# miniprogram.get('pagepath'),
|
||
# miniprogram.get('thumb_url')
|
||
# )
|
||
except Exception as e:
|
||
logger.error(f"发送预览消息到 {recipient} 失败: {e}")
|
||
continue
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"message": "预览已发送"
|
||
})
|
||
except Exception as e:
|
||
logger.error(f"发送预览失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@message_push_bp.route('/api/tasks/<task_id>/logs', methods=['GET'])
|
||
@login_required
|
||
def api_task_logs(task_id):
|
||
"""获取任务日志API"""
|
||
try:
|
||
# 获取查询参数
|
||
page = int(request.args.get('page', 1))
|
||
limit = int(request.args.get('limit', 20))
|
||
|
||
# 查询日志
|
||
db = current_app.dashboard_server.task_db
|
||
logs_data = db.get_task_logs_with_pagination(task_id, page, limit)
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"data": logs_data
|
||
})
|
||
except Exception as e:
|
||
logger.error(f"获取任务日志失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@message_push_bp.route('/api/statistics', methods=['GET'])
|
||
@login_required
|
||
def api_statistics():
|
||
"""获取任务统计信息API"""
|
||
try:
|
||
# 获取任务数据库实例
|
||
db = current_app.dashboard_server.task_db
|
||
|
||
# 获取各种状态的任务数量
|
||
total = db.get_tasks_count()
|
||
scheduled = db.get_tasks_count_by_status('scheduled')
|
||
paused = db.get_tasks_count_by_status('paused')
|
||
completed = db.get_tasks_count_by_status('completed')
|
||
failed = db.get_tasks_count_by_status('failed')
|
||
|
||
# 获取今日任务数量
|
||
today = db.get_tasks_count_by_date(datetime.now().strftime('%Y-%m-%d'))
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"data": {
|
||
"total": total,
|
||
"scheduled": scheduled,
|
||
"paused": paused,
|
||
"completed": completed,
|
||
"failed": failed,
|
||
"today": today
|
||
}
|
||
})
|
||
except Exception as e:
|
||
logger.error(f"获取任务统计信息失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@message_push_bp.route('/api/upload', methods=['POST'])
|
||
def upload_file():
|
||
"""处理文件上传"""
|
||
if 'file' not in request.files:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '没有文件'
|
||
})
|
||
|
||
file = request.files['file']
|
||
if file.filename == '':
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '没有选择文件'
|
||
})
|
||
|
||
# 根据文件类型检查
|
||
file_type = 'image' # 默认类型
|
||
if file.content_type.startswith('audio/'):
|
||
file_type = 'voice'
|
||
elif file.content_type.startswith('video/'):
|
||
file_type = 'video'
|
||
|
||
if file and allowed_file(file.filename, file_type):
|
||
# 生成安全的文件名
|
||
filename = secure_filename(file.filename)
|
||
# 生成唯一文件名
|
||
unique_filename = f"{uuid.uuid4().hex}.{filename}"
|
||
|
||
# 确保上传目录存在
|
||
upload_folder = os.path.join(current_app.root_path, 'static', 'uploads')
|
||
os.makedirs(upload_folder, exist_ok=True)
|
||
|
||
# 保存文件
|
||
file_path = os.path.join(upload_folder, unique_filename)
|
||
file.save(file_path)
|
||
|
||
# 返回文件的绝对路径
|
||
return jsonify({
|
||
'success': True,
|
||
'data': {
|
||
'url': file_path # 返回绝对路径
|
||
}
|
||
})
|
||
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '不支持的文件类型'
|
||
})
|
||
|
||
|
||
@message_push_bp.route('/api/tasks/<task_id>/audit', methods=['POST'])
|
||
@login_required
|
||
def audit_task(task_id):
|
||
"""审核任务"""
|
||
try:
|
||
# 获取任务信息
|
||
db = current_app.dashboard_server.task_db
|
||
task = db.get_task(task_id)
|
||
if not task:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '任务不存在'
|
||
})
|
||
|
||
# 检查任务状态
|
||
if task['status'] != 'draft':
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '只能审核草稿状态的任务'
|
||
})
|
||
|
||
# 更新任务状态为已排期
|
||
db.update_task(task_id, {'status': 'scheduled'})
|
||
|
||
# 记录操作日志
|
||
db.log_task_action({
|
||
'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}",
|
||
'task_id': task_id,
|
||
'action': 'update',
|
||
'user_id': session.get('username', 'admin'),
|
||
'changes': {'status': 'scheduled', 'action': 'audit'}
|
||
})
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': '审核成功'
|
||
})
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'审核失败: {str(e)}'
|
||
})
|