From 32edc900768f7e73b7eb1bda7fd4a5f882bcf361 Mon Sep 17 00:00:00 2001 From: liuwei Date: Fri, 30 May 2025 09:08:43 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E7=BA=BF=E7=A8=8B=E5=8F=91?= =?UTF-8?q?=E9=80=81=E3=80=82=E9=98=B2=E6=AD=A2=E9=98=BB=E5=A1=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- admin/dashboard/blueprints/contacts.py | 94 +++++++------------------- 1 file changed, 25 insertions(+), 69 deletions(-) diff --git a/admin/dashboard/blueprints/contacts.py b/admin/dashboard/blueprints/contacts.py index e47deaa..ebc51a9 100644 --- a/admin/dashboard/blueprints/contacts.py +++ b/admin/dashboard/blueprints/contacts.py @@ -1,8 +1,5 @@ import asyncio import threading -from queue import Queue -from concurrent.futures import ThreadPoolExecutor - from flask import Blueprint, render_template, jsonify, request, current_app from .auth import login_required from loguru import logger @@ -10,43 +7,22 @@ from loguru import logger # 创建联系人管理蓝图 contacts_bp = Blueprint('contacts', __name__, url_prefix='/contacts') -# 创建线程池 -message_executor = ThreadPoolExecutor(max_workers=10) -message_queue = Queue() - -async def process_message(func, *args, **kwargs): - """异步处理单个消息""" - try: - return await func(*args, **kwargs) - except Exception as e: - logger.error(f"处理消息失败: {e}") - raise - -def process_message_queue(): - """后台处理消息队列的函数""" - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - while True: +def send_message_in_thread(func, *args, **kwargs): + """在独立线程中发送消息""" + def run(): try: - task = message_queue.get() - if task is None: - break - func, args, kwargs = task - # 创建新的协程任务 - future = asyncio.run_coroutine_threadsafe( - process_message(func, *args, **kwargs), - loop - ) - # 不等待结果,让任务在后台运行 - future.add_done_callback(lambda f: message_queue.task_done()) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(func(*args, **kwargs)) except Exception as e: - logger.error(f"处理消息队列任务失败: {e}") - message_queue.task_done() - -# 启动消息处理线程 -message_thread = threading.Thread(target=process_message_queue, daemon=True) -message_thread.start() + logger.error(f"发送消息失败: {e}") + finally: + loop.close() + + # 创建并启动线程 + thread = threading.Thread(target=run) + thread.daemon = True # 设置为守护线程,这样主程序退出时线程会自动结束 + thread.start() # 联系人管理页面 @contacts_bp.route('/') @@ -259,30 +235,22 @@ def api_send_message(): if not server or not server.client: return jsonify({'success': False, 'message': '机器人未初始化'}) - # 根据消息类型将任务加入队列 + # 根据消息类型发送消息 if msg_type == 'text': - message_queue.put(( - server.client.send_text_message, - (wxid, content), - {} - )) + send_message_in_thread(server.client.send_text_message, wxid, content) return jsonify({ 'success': True, - 'message': '消息已加入发送队列' + 'message': '消息发送中' }) elif msg_type == 'image': if 'file' not in request.files: return jsonify({'success': False, 'message': '未上传文件'}) file = request.files['file'] - message_queue.put(( - server.client.send_image_message, - (wxid, file.read()), - {} - )) + send_message_in_thread(server.client.send_image_message, wxid, file.read()) return jsonify({ 'success': True, - 'message': '消息已加入发送队列' + 'message': '消息发送中' }) elif msg_type == 'voice': @@ -298,42 +266,30 @@ def api_send_message(): 'success': False, 'message': '不支持的音频格式' }) - message_queue.put(( - server.client.send_voice_message, - (wxid, file.read()), - {'format': format_str} - )) + send_message_in_thread(server.client.send_voice_message, wxid, file.read(), format=format_str) return jsonify({ 'success': True, - 'message': '消息已加入发送队列' + 'message': '消息发送中' }) elif msg_type == 'video': if 'file' not in request.files: return jsonify({'success': False, 'message': '未上传文件'}) file = request.files['file'] - message_queue.put(( - server.client.send_video_message, - (wxid, file.read()), - {} - )) + send_message_in_thread(server.client.send_video_message, wxid, file.read()) return jsonify({ 'success': True, - 'message': '消息已加入发送队列' + 'message': '消息发送中' }) elif msg_type == 'link': url = content.get('url') title = content.get('title', '') description = content.get('description', '') - message_queue.put(( - server.client.send_link_message, - (wxid, url, title, description), - {} - )) + send_message_in_thread(server.client.send_link_message, wxid, url, title, description) return jsonify({ 'success': True, - 'message': '消息已加入发送队列' + 'message': '消息发送中' }) else: