diff --git a/admin/dashboard/blueprints/contacts.py b/admin/dashboard/blueprints/contacts.py index dbe11bd..e47deaa 100644 --- a/admin/dashboard/blueprints/contacts.py +++ b/admin/dashboard/blueprints/contacts.py @@ -14,19 +14,34 @@ contacts_bp = Blueprint('contacts', __name__, url_prefix='/contacts') message_executor = ThreadPoolExecutor(max_workers=10) message_queue = Queue() +async def process_message(func, *args, **kwargs): + """异步处理单个消息""" + try: + return await func(*args, **kwargs) + except Exception as e: + logger.error(f"处理消息失败: {e}") + raise + def process_message_queue(): """后台处理消息队列的函数""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + while True: try: task = message_queue.get() if task is None: break func, args, kwargs = task - loop = get_or_create_eventloop() - loop.run_until_complete(func(*args, **kwargs)) + # 创建新的协程任务 + future = asyncio.run_coroutine_threadsafe( + process_message(func, *args, **kwargs), + loop + ) + # 不等待结果,让任务在后台运行 + future.add_done_callback(lambda f: message_queue.task_done()) except Exception as e: logger.error(f"处理消息队列任务失败: {e}") - finally: message_queue.task_done() # 启动消息处理线程 @@ -218,16 +233,6 @@ def api_contacts_update(): return jsonify({"success": False, "message": f"更新通讯录失败: {str(e)}"}), 500 -def get_or_create_eventloop(): - """获取或创建事件循环""" - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - return loop - - @contacts_bp.route('/api/send_message', methods=['POST']) @login_required def api_send_message():