From 8ee0d94629034db371e0cef2ba3429ed64fe003e Mon Sep 17 00:00:00 2001 From: liuwei Date: Fri, 30 May 2025 09:05:01 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E7=BA=BF=E7=A8=8B=E5=8F=91?= =?UTF-8?q?=E9=80=81=E3=80=82=E9=98=B2=E6=AD=A2=E9=98=BB=E5=A1=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- admin/dashboard/blueprints/contacts.py | 31 +++++++++++++++----------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/admin/dashboard/blueprints/contacts.py b/admin/dashboard/blueprints/contacts.py index dbe11bd..e47deaa 100644 --- a/admin/dashboard/blueprints/contacts.py +++ b/admin/dashboard/blueprints/contacts.py @@ -14,19 +14,34 @@ contacts_bp = Blueprint('contacts', __name__, url_prefix='/contacts') message_executor = ThreadPoolExecutor(max_workers=10) message_queue = Queue() +async def process_message(func, *args, **kwargs): + """异步处理单个消息""" + try: + return await func(*args, **kwargs) + except Exception as e: + logger.error(f"处理消息失败: {e}") + raise + def process_message_queue(): """后台处理消息队列的函数""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + while True: try: task = message_queue.get() if task is None: break func, args, kwargs = task - loop = get_or_create_eventloop() - loop.run_until_complete(func(*args, **kwargs)) + # 创建新的协程任务 + future = asyncio.run_coroutine_threadsafe( + process_message(func, *args, **kwargs), + loop + ) + # 不等待结果,让任务在后台运行 + future.add_done_callback(lambda f: message_queue.task_done()) except Exception as e: logger.error(f"处理消息队列任务失败: {e}") - finally: message_queue.task_done() # 启动消息处理线程 @@ -218,16 +233,6 @@ def api_contacts_update(): return jsonify({"success": False, "message": f"更新通讯录失败: {str(e)}"}), 500 -def get_or_create_eventloop(): - """获取或创建事件循环""" - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - return loop - - @contacts_bp.route('/api/send_message', methods=['POST']) @login_required def api_send_message():