From 2a20e49893431ce1b6511faf4a2d5d646597e481 Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 10 Jun 2025 12:24:44 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=20=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=AE=9A=E6=97=B6=E6=8E=A8=E9=80=81=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- admin/dashboard/blueprints/message_push.py | 92 ++++++++++++++++++++-- 1 file changed, 86 insertions(+), 6 deletions(-) diff --git a/admin/dashboard/blueprints/message_push.py b/admin/dashboard/blueprints/message_push.py index acc6b00..dc324ad 100644 --- a/admin/dashboard/blueprints/message_push.py +++ b/admin/dashboard/blueprints/message_push.py @@ -1,3 +1,7 @@ +import asyncio +import threading +from concurrent.futures import ThreadPoolExecutor + from flask import Blueprint, render_template, jsonify, request, current_app, session from .auth import login_required from loguru import logger @@ -8,6 +12,56 @@ from datetime import datetime # 创建消息推送管理蓝图 message_push_bp = Blueprint('message_push', __name__, url_prefix='/message_push') +# 创建线程池 +message_thread_pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="message_sender_") + +# 创建共享的事件循环 +shared_loop = None +loop_lock = threading.Lock() + + +def get_or_create_loop(): + """获取或创建共享的事件循环""" + global shared_loop + with loop_lock: + if shared_loop is None: + shared_loop = asyncio.new_event_loop() + + # 在新线程中运行事件循环 + def run_loop(): + asyncio.set_event_loop(shared_loop) + shared_loop.run_forever() + + loop_thread = threading.Thread(target=run_loop, daemon=True) + loop_thread.start() + return shared_loop + + +def send_message_in_thread(func, *args, **kwargs): + """使用共享事件循环发送消息""" + + def run(): + try: + loop = get_or_create_loop() + + # 创建异步任务 + async def send(): + try: + await func(*args, **kwargs) + except Exception as e: + logger.error(f"发送消息失败: {e}") + + # 在共享事件循环中运行任务 + future = asyncio.run_coroutine_threadsafe(send(), loop) + # 等待任务完成,设置超时时间 + future.result(timeout=10) + except Exception as e: + logger.error(f"消息发送任务执行失败: {e}") + + # 使用线程池提交任务 + message_thread_pool.submit(run) + + # 消息推送管理页面 @message_push_bp.route('/') @login_required @@ -55,9 +109,7 @@ def api_create_task(): return jsonify({"success": False, "error": "无效的请求数据"}), 400 # 获取用户名 - logger.debug(f"Session before getting username: {dict(session)}") username = session.get('username') - logger.debug(f"Username from session: {username}") if not username: return jsonify({"success": False, "error": "未登录或会话已过期"}), 401 @@ -192,10 +244,38 @@ def api_preview_task(task_id): if not task: return jsonify({"success": False, "error": "任务不存在"}), 404 - # 发送预览 - message_push = current_app.dashboard_server.message_push_task.message_push - if not message_push.send_preview(task, [session.get('user_id')]): # 从session中获取用户ID - return jsonify({"success": False, "error": "发送预览失败"}), 500 + # 获取机器人实例 + server = current_app.dashboard_server + if not server or not server.client: + return jsonify({"success": False, "error": "机器人未初始化"}), 500 + + preview_user = session.get('user_id') + if not preview_user: + return jsonify({"success": False, "error": "未登录或会话已过期"}), 401 + + # 发送文本消息 + if task.get('content_text'): + send_message_in_thread(server.client.send_text_message, preview_user, task['content_text']) + + # 发送图片消息 + if task.get('content_image'): + send_message_in_thread(server.client.send_image_message, preview_user, task['content_image']) + + # 发送链接消息 + if task.get('content_link'): + send_message_in_thread(server.client.send_link_message, preview_user, task['content_link']) + + # 发送小程序消息 + if task.get('content_miniprogram'): + miniprogram = task['content_miniprogram'] + send_message_in_thread( + server.client.send_miniprogram_message, + preview_user, + miniprogram.get('title'), + miniprogram.get('appid'), + miniprogram.get('pagepath'), + miniprogram.get('thumb_url') + ) return jsonify({ "success": True,