测试线程发送。防止阻塞

This commit is contained in:
liuwei
2025-05-30 09:08:43 +08:00
parent 8ee0d94629
commit 32edc90076

View File

@@ -1,8 +1,5 @@
import asyncio import asyncio
import threading import threading
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
from flask import Blueprint, render_template, jsonify, request, current_app from flask import Blueprint, render_template, jsonify, request, current_app
from .auth import login_required from .auth import login_required
from loguru import logger from loguru import logger
@@ -10,43 +7,22 @@ from loguru import logger
# 创建联系人管理蓝图 # 创建联系人管理蓝图
contacts_bp = Blueprint('contacts', __name__, url_prefix='/contacts') contacts_bp = Blueprint('contacts', __name__, url_prefix='/contacts')
# 创建线程池 def send_message_in_thread(func, *args, **kwargs):
message_executor = ThreadPoolExecutor(max_workers=10) """在独立线程中发送消息"""
message_queue = Queue() def run():
async def process_message(func, *args, **kwargs):
"""异步处理单个消息"""
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"处理消息失败: {e}")
raise
def process_message_queue():
"""后台处理消息队列的函数"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while True:
try: try:
task = message_queue.get() loop = asyncio.new_event_loop()
if task is None: asyncio.set_event_loop(loop)
break loop.run_until_complete(func(*args, **kwargs))
func, args, kwargs = task
# 创建新的协程任务
future = asyncio.run_coroutine_threadsafe(
process_message(func, *args, **kwargs),
loop
)
# 不等待结果,让任务在后台运行
future.add_done_callback(lambda f: message_queue.task_done())
except Exception as e: except Exception as e:
logger.error(f"处理消息队列任务失败: {e}") logger.error(f"发送消息失败: {e}")
message_queue.task_done() finally:
loop.close()
# 启动消息处理线程
message_thread = threading.Thread(target=process_message_queue, daemon=True) # 创建并启动线程
message_thread.start() thread = threading.Thread(target=run)
thread.daemon = True # 设置为守护线程,这样主程序退出时线程会自动结束
thread.start()
# 联系人管理页面 # 联系人管理页面
@contacts_bp.route('/') @contacts_bp.route('/')
@@ -259,30 +235,22 @@ def api_send_message():
if not server or not server.client: if not server or not server.client:
return jsonify({'success': False, 'message': '机器人未初始化'}) return jsonify({'success': False, 'message': '机器人未初始化'})
# 根据消息类型将任务加入队列 # 根据消息类型发送消息
if msg_type == 'text': if msg_type == 'text':
message_queue.put(( send_message_in_thread(server.client.send_text_message, wxid, content)
server.client.send_text_message,
(wxid, content),
{}
))
return jsonify({ return jsonify({
'success': True, 'success': True,
'message': '消息已加入发送队列' 'message': '消息发送中'
}) })
elif msg_type == 'image': elif msg_type == 'image':
if 'file' not in request.files: if 'file' not in request.files:
return jsonify({'success': False, 'message': '未上传文件'}) return jsonify({'success': False, 'message': '未上传文件'})
file = request.files['file'] file = request.files['file']
message_queue.put(( send_message_in_thread(server.client.send_image_message, wxid, file.read())
server.client.send_image_message,
(wxid, file.read()),
{}
))
return jsonify({ return jsonify({
'success': True, 'success': True,
'message': '消息已加入发送队列' 'message': '消息发送中'
}) })
elif msg_type == 'voice': elif msg_type == 'voice':
@@ -298,42 +266,30 @@ def api_send_message():
'success': False, 'success': False,
'message': '不支持的音频格式' 'message': '不支持的音频格式'
}) })
message_queue.put(( send_message_in_thread(server.client.send_voice_message, wxid, file.read(), format=format_str)
server.client.send_voice_message,
(wxid, file.read()),
{'format': format_str}
))
return jsonify({ return jsonify({
'success': True, 'success': True,
'message': '消息已加入发送队列' 'message': '消息发送中'
}) })
elif msg_type == 'video': elif msg_type == 'video':
if 'file' not in request.files: if 'file' not in request.files:
return jsonify({'success': False, 'message': '未上传文件'}) return jsonify({'success': False, 'message': '未上传文件'})
file = request.files['file'] file = request.files['file']
message_queue.put(( send_message_in_thread(server.client.send_video_message, wxid, file.read())
server.client.send_video_message,
(wxid, file.read()),
{}
))
return jsonify({ return jsonify({
'success': True, 'success': True,
'message': '消息已加入发送队列' 'message': '消息发送中'
}) })
elif msg_type == 'link': elif msg_type == 'link':
url = content.get('url') url = content.get('url')
title = content.get('title', '') title = content.get('title', '')
description = content.get('description', '') description = content.get('description', '')
message_queue.put(( send_message_in_thread(server.client.send_link_message, wxid, url, title, description)
server.client.send_link_message,
(wxid, url, title, description),
{}
))
return jsonify({ return jsonify({
'success': True, 'success': True,
'message': '消息已加入发送队列' 'message': '消息发送中'
}) })
else: else: