测试其他线程问题。
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import threading
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from flask import Blueprint, render_template, jsonify, request, current_app
|
||||
from .auth import login_required
|
||||
from loguru import logger
|
||||
@@ -7,59 +8,49 @@ from loguru import logger
|
||||
# 创建联系人管理蓝图
|
||||
contacts_bp = Blueprint('contacts', __name__, url_prefix='/contacts')
|
||||
|
||||
# 创建线程池
|
||||
message_thread_pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="message_sender_")
|
||||
|
||||
# 创建共享的事件循环
|
||||
shared_loop = None
|
||||
loop_lock = threading.Lock()
|
||||
|
||||
def get_or_create_loop():
|
||||
"""获取或创建共享的事件循环"""
|
||||
global shared_loop
|
||||
with loop_lock:
|
||||
if shared_loop is None:
|
||||
shared_loop = asyncio.new_event_loop()
|
||||
# 在新线程中运行事件循环
|
||||
def run_loop():
|
||||
asyncio.set_event_loop(shared_loop)
|
||||
shared_loop.run_forever()
|
||||
|
||||
loop_thread = threading.Thread(target=run_loop, daemon=True)
|
||||
loop_thread.start()
|
||||
return shared_loop
|
||||
|
||||
def send_message_in_thread(func, *args, **kwargs):
|
||||
"""在独立线程中发送消息"""
|
||||
|
||||
"""使用共享事件循环发送消息"""
|
||||
def run():
|
||||
loop = None
|
||||
try:
|
||||
# 创建新的事件循环
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
loop = get_or_create_loop()
|
||||
# 创建异步任务
|
||||
async def send():
|
||||
try:
|
||||
await func(*args, **kwargs)
|
||||
except Exception as e:
|
||||
logger.error(f"发送消息失败: {e}")
|
||||
finally:
|
||||
# 发送完成后停止事件循环
|
||||
loop.stop()
|
||||
|
||||
# 创建并运行任务
|
||||
asyncio.run_coroutine_threadsafe(send(), loop)
|
||||
|
||||
# 运行事件循环直到停止
|
||||
loop.run_forever()
|
||||
|
||||
# 在共享事件循环中运行任务
|
||||
future = asyncio.run_coroutine_threadsafe(send(), loop)
|
||||
# 等待任务完成,设置超时时间
|
||||
future.result(timeout=10)
|
||||
except Exception as e:
|
||||
logger.error(f"线程执行失败: {e}")
|
||||
finally:
|
||||
# 确保清理资源
|
||||
if loop is not None:
|
||||
try:
|
||||
# 取消所有待处理的任务
|
||||
pending = asyncio.all_tasks(loop)
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
logger.error(f"消息发送任务执行失败: {e}")
|
||||
|
||||
# 运行事件循环直到所有任务都被取消
|
||||
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
||||
|
||||
# 停止事件循环
|
||||
loop.stop()
|
||||
|
||||
# 关闭事件循环
|
||||
loop.close()
|
||||
except Exception as e:
|
||||
logger.error(f"清理资源失败: {e}")
|
||||
|
||||
# 创建并启动线程
|
||||
thread = threading.Thread(target=run)
|
||||
thread.daemon = True # 设置为守护线程,这样主程序退出时线程会自动结束
|
||||
thread.start()
|
||||
# 使用线程池提交任务
|
||||
message_thread_pool.submit(run)
|
||||
|
||||
|
||||
# 联系人管理页面
|
||||
|
||||
Reference in New Issue
Block a user