""" 全局操作锁 用于在关键操作期间暂停消息处理(例如全量成员同步) """ import asyncio from loguru import logger class OperationLock: """全局暂停/恢复控制""" _paused: bool = False _reason: str = "" _pause_event: asyncio.Event = asyncio.Event() _pause_event.set() @classmethod def pause(cls, reason: str = "") -> None: if cls._paused: return cls._paused = True cls._reason = reason or "" cls._pause_event.clear() logger.warning(f"[OperationLock] 消息处理已暂停: {cls._reason}") @classmethod def resume(cls) -> None: if not cls._paused: return cls._paused = False cls._reason = "" cls._pause_event.set() logger.warning("[OperationLock] 消息处理已恢复") @classmethod def is_paused(cls) -> bool: return cls._paused @classmethod def reason(cls) -> str: return cls._reason @classmethod async def wait_if_paused(cls) -> None: if cls._paused: await cls._pause_event.wait()