Files
abot/admin/dashboard/blueprints/message_push.py
2025-06-12 13:34:49 +08:00

613 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
import os
import json
import uuid
from datetime import datetime, timedelta
from flask import Blueprint, render_template, jsonify, request, current_app, session
from pathlib import Path
from werkzeug.utils import secure_filename
from wechat_ipad.models.appmsg_xml import LINK_XML_NORMAL
from .auth import login_required
from loguru import logger
# 创建消息推送管理蓝图
message_push_bp = Blueprint('message_push', __name__, url_prefix='/message_push')
# 创建线程池
message_thread_pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="message_sender_")
# 创建共享的事件循环
shared_loop = None
loop_lock = threading.Lock()
# 允许的文件扩展名
ALLOWED_EXTENSIONS = {
'image': {'png', 'jpg', 'jpeg', 'gif'},
'voice': {'mp3', 'wav'},
'video': {'mp4'}
}
def get_or_create_loop():
"""获取或创建共享的事件循环"""
global shared_loop
with loop_lock:
if shared_loop is None:
shared_loop = asyncio.new_event_loop()
# 在新线程中运行事件循环
def run_loop():
asyncio.set_event_loop(shared_loop)
shared_loop.run_forever()
loop_thread = threading.Thread(target=run_loop, daemon=True)
loop_thread.start()
return shared_loop
def send_message_in_thread(func, *args, **kwargs):
"""使用共享事件循环发送消息"""
def run():
try:
loop = get_or_create_loop()
# 创建异步任务
async def send():
try:
await func(*args, **kwargs)
except Exception as e:
logger.error(f"发送消息失败: {e}")
# 在共享事件循环中运行任务
future = asyncio.run_coroutine_threadsafe(send(), loop)
# 等待任务完成,设置超时时间
future.result(timeout=30)
except Exception as e:
logger.error(f"消息发送任务执行失败: {e}")
# 使用线程池提交任务
message_thread_pool.submit(run)
def allowed_file(filename, file_type='image'):
"""检查文件类型是否允许上传"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS.get(file_type, set())
# 消息推送管理页面
@message_push_bp.route('/')
@login_required
def message_push_management():
"""消息推送管理页面"""
return render_template('message_push_management.html')
# API路由
@message_push_bp.route('/api/tasks', methods=['GET'])
@login_required
def api_tasks_list():
"""获取任务列表API"""
try:
# 获取查询参数
status = request.args.get('status')
start_time = request.args.get('start_time')
end_time = request.args.get('end_time')
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 20))
# 获取任务列表
db = current_app.dashboard_server.task_db
tasks, total = db.get_tasks_list(status, start_time, end_time, page, limit)
# 处理任务数据确保所有字段都可以JSON序列化
serialized_tasks = []
for task in tasks:
task_dict = dict(task)
# 处理recurring_time
if task_dict.get('recurring_time'):
hours = task_dict['recurring_time'].seconds // 3600
minutes = (task_dict['recurring_time'].seconds % 3600) // 60
task_dict['recurring_time'] = f"{hours:02d}:{minutes:02d}"
serialized_tasks.append(task_dict)
return jsonify({
"success": True,
"data": {
"tasks": serialized_tasks,
"total": total,
"page": page,
"limit": limit
}
})
except Exception as e:
logger.error(f"获取任务列表失败: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/tasks', methods=['POST'])
@login_required
def api_create_task():
"""创建任务API"""
try:
data = request.json
if not data:
return jsonify({"success": False, "error": "无效的请求数据"}), 400
# 获取用户名
username = session.get('username')
if not username:
return jsonify({"success": False, "error": "未登录或会话已过期"}), 401
# 生成任务ID
data['task_id'] = str(uuid.uuid4())
data['creator_id'] = username
# 转换时间格式
if 'created_at' in data:
try:
date = datetime.strptime(data['created_at'], '%a, %d %b %Y %H:%M:%S GMT')
data['created_at'] = date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
pass
if 'updated_at' in data:
try:
date = datetime.strptime(data['updated_at'], '%a, %d %b %Y %H:%M:%S GMT')
data['updated_at'] = date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
pass
if 'schedule_time' in data:
try:
date = datetime.strptime(data['schedule_time'], '%a, %d %b %Y %H:%M:%S GMT')
data['schedule_time'] = date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
pass
if 'recurring_end' in data and data['recurring_end']:
try:
date = datetime.strptime(data['recurring_end'], '%a, %d %b %Y %H:%M:%S GMT')
data['recurring_end'] = date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
pass
# 创建任务
db = current_app.dashboard_server.task_db
task = db.create_task(data)
if not task:
return jsonify({"success": False, "error": "创建任务失败"}), 500
# 处理返回数据中的 timedelta 类型
if task:
for key, value in task.items():
if isinstance(value, timedelta):
task[key] = str(value)
elif isinstance(value, datetime):
task[key] = value.strftime('%Y-%m-%d %H:%M:%S')
return jsonify({
"success": True,
"data": {
"task": task
}
})
except Exception as e:
logger.exception(f"创建任务失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/tasks/<task_id>', methods=['PUT'])
@login_required
def update_task(task_id):
"""更新任务"""
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '无效的请求数据'
})
# 获取任务信息
db = current_app.dashboard_server.task_db
task = db.get_task(task_id)
if not task:
return jsonify({
'success': False,
'message': '任务不存在'
})
# 检查任务状态
if task['status'] not in ['draft', 'paused']:
return jsonify({
'success': False,
'message': '只能编辑草稿或已暂停状态的任务'
})
# 确保 content_miniprogram 是 JSON 字符串
if 'content_miniprogram' in data and isinstance(data['content_miniprogram'], dict):
data['content_miniprogram'] = json.dumps(data['content_miniprogram'])
# 确保 groups 是 JSON 字符串
if 'groups' in data and isinstance(data['groups'], list):
data['groups'] = json.dumps(data['groups'])
# 确保 preview_recipients 是 JSON 字符串
if 'preview_recipients' in data and isinstance(data['preview_recipients'], list):
data['preview_recipients'] = json.dumps(data['preview_recipients'])
# 转换时间格式
if 'created_at' in data:
try:
date = datetime.strptime(data['created_at'], '%a, %d %b %Y %H:%M:%S GMT')
data['created_at'] = date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
pass
if 'updated_at' in data:
try:
date = datetime.strptime(data['updated_at'], '%a, %d %b %Y %H:%M:%S GMT')
data['updated_at'] = date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
pass
if 'schedule_time' in data:
try:
date = datetime.strptime(data['schedule_time'], '%a, %d %b %Y %H:%M:%S GMT')
data['schedule_time'] = date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
pass
if 'recurring_end' in data and data['recurring_end']:
try:
date = datetime.strptime(data['recurring_end'], '%a, %d %b %Y %H:%M:%S GMT')
data['recurring_end'] = date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
pass
# 更新任务
db.update_task(task_id, data)
# 记录操作日志
db.log_task_action({
'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}",
'task_id': task_id,
'action': 'update',
'user_id': session.get('username', 'admin'), # 使用 admin 作为默认值
'changes': data
})
return jsonify({
'success': True,
'message': '更新成功'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'更新失败: {str(e)}'
})
@message_push_bp.route('/api/tasks/<task_id>', methods=['DELETE'])
@login_required
def api_delete_task(task_id):
"""删除任务API"""
try:
# 获取任务
db = current_app.dashboard_server.task_db
task = db.get_task(task_id)
if not task:
return jsonify({"success": False, "error": "任务不存在"}), 404
# 删除任务
if not db.delete_task(task_id):
return jsonify({"success": False, "error": "删除任务失败"}), 500
return jsonify({
"success": True,
"message": "任务已删除"
})
except Exception as e:
logger.error(f"删除任务失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/tasks/<task_id>/pause', methods=['POST'])
@login_required
def api_pause_task(task_id):
"""暂停任务API"""
try:
# 获取任务
db = current_app.dashboard_server.task_db
task = db.get_task(task_id)
if not task:
return jsonify({"success": False, "error": "任务不存在"}), 404
# 暂停任务
if not db.update_task(task_id, {'status': 'paused'}):
return jsonify({"success": False, "error": "暂停任务失败"}), 500
return jsonify({
"success": True,
"message": "任务已暂停"
})
except Exception as e:
logger.error(f"暂停任务失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/tasks/<task_id>/resume', methods=['POST'])
@login_required
def api_resume_task(task_id):
"""恢复任务API"""
try:
# 获取任务
db = current_app.dashboard_server.task_db
task = db.get_task(task_id)
if not task:
return jsonify({"success": False, "error": "任务不存在"}), 404
# 恢复任务
if not db.update_task(task_id, {'status': 'scheduled'}):
return jsonify({"success": False, "error": "恢复任务失败"}), 500
return jsonify({
"success": True,
"message": "任务已恢复"
})
except Exception as e:
logger.error(f"恢复任务失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/tasks/<task_id>/preview', methods=['POST'])
@login_required
def api_preview_task(task_id):
"""预览任务API"""
try:
# 获取任务
db = current_app.dashboard_server.task_db
task = db.get_task(task_id)
if not task:
return jsonify({"success": False, "error": "任务不存在"}), 404
# 获取机器人实例
server = current_app.dashboard_server
if not server or not server.client:
return jsonify({"success": False, "error": "机器人未初始化"}), 500
# 获取用户名
preview_user = session.get('username')
if not preview_user:
return jsonify({"success": False, "error": "未登录或会话已过期"}), 401
# 获取预览接收者并解析JSON
preview_recipients_str = task.get("preview_recipients", "[]")
try:
preview_recipients = json.loads(preview_recipients_str)
except json.JSONDecodeError:
return jsonify({"success": False, "error": "预览接收者格式错误"}), 400
if not preview_recipients:
return jsonify({"success": False, "error": "未设置预览接收者"}), 400
# 为每个接收者发送预览消息
for recipient in preview_recipients:
try:
# 发送文本消息
if task.get('content_text'):
send_message_in_thread(server.client.send_text_message, recipient, task['content_text'])
# 发送图片消息
if task.get('content_image'):
send_message_in_thread(server.client.send_image_message, recipient, Path(task['content_image']))
# 发送语音消息
if task.get('content_voice'):
voice_path = Path(task['content_voice'])
# 根据文件扩展名确定类型
voice_type = 'wav' if voice_path.suffix.lower() == '.wav' else 'mp3'
send_message_in_thread(server.client.send_voice_message, recipient, voice_path, voice_type)
# 发送视频消息
if task.get('content_video'):
send_message_in_thread(server.client.send_video_message, recipient, Path(task['content_video']))
# 发送链接消息
if task.get('content_link'):
try:
link_data = json.loads(task['content_link'])
# content_link json 读取内容
xml_content = f"{LINK_XML_NORMAL}".format(title=link_data.get('title', ''),
des=link_data.get('des', ''),
url=link_data.get('url', ''),
thumburl=link_data.get('thumburl', '')
)
send_message_in_thread(
server.client.send_link_xml_message,
xml_content,
recipient
)
except json.JSONDecodeError:
logger.error(f"解析链接内容失败: {task['content_link']}")
continue
# # 发送小程序消息
# if task.get('content_miniprogram'):
# miniprogram = task['content_miniprogram']
# send_message_in_thread(
# server.client.send_miniprogram_message,
# recipient,
# miniprogram.get('title'),
# miniprogram.get('appid'),
# miniprogram.get('pagepath'),
# miniprogram.get('thumb_url')
# )
except Exception as e:
logger.error(f"发送预览消息到 {recipient} 失败: {e}")
continue
return jsonify({
"success": True,
"message": "预览已发送"
})
except Exception as e:
logger.error(f"发送预览失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/tasks/<task_id>/logs', methods=['GET'])
@login_required
def api_task_logs(task_id):
"""获取任务日志API"""
try:
# 获取查询参数
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 20))
# 查询日志
db = current_app.dashboard_server.task_db
logs_data = db.get_task_logs_with_pagination(task_id, page, limit)
return jsonify({
"success": True,
"data": logs_data
})
except Exception as e:
logger.error(f"获取任务日志失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/statistics', methods=['GET'])
@login_required
def api_statistics():
"""获取任务统计信息API"""
try:
# 获取任务数据库实例
db = current_app.dashboard_server.task_db
# 获取各种状态的任务数量
total = db.get_tasks_count()
scheduled = db.get_tasks_count_by_status('scheduled')
paused = db.get_tasks_count_by_status('paused')
completed = db.get_tasks_count_by_status('completed')
failed = db.get_tasks_count_by_status('failed')
# 获取今日任务数量
today = db.get_tasks_count_by_date(datetime.now().strftime('%Y-%m-%d'))
return jsonify({
"success": True,
"data": {
"total": total,
"scheduled": scheduled,
"paused": paused,
"completed": completed,
"failed": failed,
"today": today
}
})
except Exception as e:
logger.error(f"获取任务统计信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@message_push_bp.route('/api/upload', methods=['POST'])
def upload_file():
"""处理文件上传"""
if 'file' not in request.files:
return jsonify({
'success': False,
'message': '没有文件'
})
file = request.files['file']
if file.filename == '':
return jsonify({
'success': False,
'message': '没有选择文件'
})
# 根据文件类型检查
file_type = 'image' # 默认类型
if file.content_type.startswith('audio/'):
file_type = 'voice'
elif file.content_type.startswith('video/'):
file_type = 'video'
if file and allowed_file(file.filename, file_type):
# 生成安全的文件名
filename = secure_filename(file.filename)
# 生成唯一文件名
unique_filename = f"{uuid.uuid4().hex}.{filename}"
# 确保上传目录存在
upload_folder = os.path.join(current_app.root_path, 'static', 'uploads')
os.makedirs(upload_folder, exist_ok=True)
# 保存文件
file_path = os.path.join(upload_folder, unique_filename)
file.save(file_path)
# 返回文件的绝对路径
return jsonify({
'success': True,
'data': {
'url': file_path # 返回绝对路径
}
})
return jsonify({
'success': False,
'message': '不支持的文件类型'
})
@message_push_bp.route('/api/tasks/<task_id>/audit', methods=['POST'])
@login_required
def audit_task(task_id):
"""审核任务"""
try:
# 获取任务信息
db = current_app.dashboard_server.task_db
task = db.get_task(task_id)
if not task:
return jsonify({
'success': False,
'message': '任务不存在'
})
# 检查任务状态
if task['status'] != 'draft':
return jsonify({
'success': False,
'message': '只能审核草稿状态的任务'
})
# 更新任务状态为已排期
db.update_task(task_id, {'status': 'scheduled'})
# 记录操作日志
db.log_task_action({
'log_id': f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}",
'task_id': task_id,
'action': 'update',
'user_id': session.get('username', 'admin'),
'changes': {'status': 'scheduled', 'action': 'audit'}
})
return jsonify({
'success': True,
'message': '审核成功'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'审核失败: {str(e)}'
})