测试线程发送。防止阻塞

This commit is contained in:
liuwei
2025-05-30 09:05:01 +08:00
parent 30a34454e0
commit 8ee0d94629

View File

@@ -14,19 +14,34 @@ contacts_bp = Blueprint('contacts', __name__, url_prefix='/contacts')
message_executor = ThreadPoolExecutor(max_workers=10)
message_queue = Queue()
async def process_message(func, *args, **kwargs):
"""异步处理单个消息"""
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"处理消息失败: {e}")
raise
def process_message_queue():
"""后台处理消息队列的函数"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while True:
try:
task = message_queue.get()
if task is None:
break
func, args, kwargs = task
loop = get_or_create_eventloop()
loop.run_until_complete(func(*args, **kwargs))
# 创建新的协程任务
future = asyncio.run_coroutine_threadsafe(
process_message(func, *args, **kwargs),
loop
)
# 不等待结果,让任务在后台运行
future.add_done_callback(lambda f: message_queue.task_done())
except Exception as e:
logger.error(f"处理消息队列任务失败: {e}")
finally:
message_queue.task_done()
# 启动消息处理线程
@@ -218,16 +233,6 @@ def api_contacts_update():
return jsonify({"success": False, "message": f"更新通讯录失败: {str(e)}"}), 500
def get_or_create_eventloop():
"""获取或创建事件循环"""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
@contacts_bp.route('/api/send_message', methods=['POST'])
@login_required
def api_send_message():