diff --git a/main.py b/main.py index b3e76cd..283db47 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,7 @@ import threading from admin.GlancesMonitor import GlancesMonitor from utils.decorator.async_job import async_job -from utils.markdown_to_image import warmup_md2img_browser_sync +from utils.markdown_to_image import warmup_md2img_browser from configuration import Config from robot import Robot @@ -107,19 +107,19 @@ def main(): except Exception as e: robot.LOG.error(f"GlancesMonitor服务器启动失败: {e}") - # 启动后异步预热 Markdown 转图浏览器,降低首个转图任务冷启动失败概率。 + # 启动后在“调度器同一事件循环”中预热 Markdown 转图浏览器。 + # 这样可确保预热得到的常驻浏览器与后续截图任务复用同一 loop,避免跨 loop 句柄失效。 try: - def _warmup_md2img(): - ok = warmup_md2img_browser_sync(timeout_seconds=60) + async def _warmup_md2img(): + ok = await warmup_md2img_browser(timeout_seconds=60) if ok: - robot.LOG.info("Markdown 转图浏览器预热成功") + robot.LOG.info("Markdown 转图浏览器预热成功(调度器事件循环)") else: robot.LOG.warning("Markdown 转图浏览器预热失败,运行期将按需重试") - warmup_thread = threading.Thread(target=_warmup_md2img, daemon=True) - warmup_thread.start() + async_job.add_startup_job(_warmup_md2img, name="md2img_warmup") except Exception as e: - robot.LOG.error(f"启动 Markdown 转图预热线程失败: {e}") + robot.LOG.error(f"注册 Markdown 转图预热任务失败: {e}") robot.LOG.info(f"=" * 50) asyncio.run(async_job.run_all()) diff --git a/utils/decorator/async_job.py b/utils/decorator/async_job.py index eea4351..1f12d2b 100644 --- a/utils/decorator/async_job.py +++ b/utils/decorator/async_job.py @@ -19,6 +19,9 @@ class AsyncJob: def __init__(self): self._jobs: Dict[str, Dict[str, Any]] = {} self._running_tasks: Dict[str, asyncio.Task] = {} + # 启动钩子任务:在调度器事件循环就绪后仅执行一次。 + # 典型场景:浏览器预热、外部连接预热等需要“与调度器同一事件循环”执行的初始化逻辑。 + self._startup_jobs: List[Dict[str, Any]] = [] self._running = False self._loop: Optional[asyncio.AbstractEventLoop] = None self._stop_event: Optional[asyncio.Event] = None @@ -428,6 +431,33 @@ class AsyncJob: job_key=job_key, ) + def add_startup_job(self, func: Callable, name: Optional[str] = None): + """注册调度器启动钩子。 + + 关键语义: + 1. 只在 `run_all` 对应的事件循环中执行; + 2. 每次调度器启动最多执行一次; + 3. 支持同步函数和异步函数。 + """ + if not callable(func): + raise ValueError("startup job 必须是可调用对象") + display_name = str(name or getattr(func, "__name__", "") or "startup_job").strip() + with self._lock: + self._startup_jobs.append( + { + "name": display_name, + "func": func, + "done": False, + } + ) + + async def _run_startup_job(self, startup_job: Dict[str, Any]): + """执行单个启动钩子,并吞掉异常,避免影响主调度循环。""" + func = startup_job.get("func") + result = func() + if inspect.isawaitable(result): + await result + def set_job_enabled(self, job_id: str, enabled: bool) -> Tuple[bool, str]: with self._lock: job = self._jobs.get(job_id) @@ -575,10 +605,25 @@ class AsyncJob: self._loop = asyncio.get_running_loop() self._stop_event = asyncio.Event() job_ids = list(self._jobs.keys()) + startup_jobs = list(self._startup_jobs) for job_id in job_ids: self._start_job_in_loop(job_id) + # 启动钩子采用“并发后台执行”策略,避免阻塞调度循环。 + # 失败不会中断 run_all,由各钩子自身负责记录日志。 + for startup_job in startup_jobs: + if startup_job.get("done"): + continue + + async def _runner(job_entry=startup_job): + try: + await self._run_startup_job(job_entry) + finally: + job_entry["done"] = True + + asyncio.create_task(_runner(), name=f"async_job:startup:{startup_job.get('name', 'job')}") + await self._stop_event.wait() def stop_all(self): diff --git a/utils/markdown_to_image.py b/utils/markdown_to_image.py index 5b71127..2cf9ec8 100644 --- a/utils/markdown_to_image.py +++ b/utils/markdown_to_image.py @@ -517,6 +517,8 @@ class _PersistentBrowser: self._lock = asyncio.Lock() self._launch_args = ["--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-gpu"] self._last_launch_source = "unknown" + # 记录当前常驻浏览器所属事件循环,避免跨 loop 复用导致的句柄异常。 + self._owner_loop_id: Optional[int] = None async def _launch_browser(self): if self._playwright is None: @@ -541,6 +543,18 @@ class _PersistentBrowser: return browser async def ensure_browser(self): + current_loop_id = id(asyncio.get_running_loop()) + if self._owner_loop_id is not None and self._owner_loop_id != current_loop_id: + # 发生跨事件循环访问时,主动丢弃旧句柄并在新 loop 重建。 + # 注意:旧 loop 中的进程资源可能已被 runtime 回收,这里不再尝试跨 loop 强关,避免引入新死锁点。 + logger.warning( + f"[md2img] 检测到跨事件循环复用,准备重建常驻浏览器: " + f"owner_loop={self._owner_loop_id}, current_loop={current_loop_id}" + ) + self._browser = None + self._playwright = None + self._owner_loop_id = None + if self._browser and self._browser.is_connected(): return self._browser async with self._lock: @@ -554,7 +568,12 @@ class _PersistentBrowser: pass self._browser = None self._browser = await self._launch_browser() - logger.info(f"[md2img] 常驻浏览器就绪: source={self._last_launch_source}") + self._owner_loop_id = current_loop_id + browser_pid = getattr(getattr(self._browser, "process", None), "pid", None) + logger.info( + f"[md2img] 常驻浏览器就绪: source={self._last_launch_source}, " + f"loop={self._owner_loop_id}, pid={browser_pid}" + ) return self._browser async def restart_browser(self): @@ -566,7 +585,12 @@ class _PersistentBrowser: pass self._browser = None self._browser = await self._launch_browser() - logger.info(f"[md2img] 常驻浏览器已重建: source={self._last_launch_source}") + self._owner_loop_id = id(asyncio.get_running_loop()) + browser_pid = getattr(getattr(self._browser, "process", None), "pid", None) + logger.info( + f"[md2img] 常驻浏览器已重建: source={self._last_launch_source}, " + f"loop={self._owner_loop_id}, pid={browser_pid}" + ) return self._browser async def screenshot(self, html_content: str, output_image: str): @@ -618,9 +642,13 @@ async def warmup_md2img_browser(timeout_seconds: int = 45) -> bool: 2. 不执行实际业务截图,仅确保常驻浏览器已可用。 """ try: + current_loop_id = id(asyncio.get_running_loop()) + logger.info(f"[md2img] 开始浏览器预热: loop={current_loop_id}, timeout={int(timeout_seconds)}s") manager = _get_browser_manager() await asyncio.wait_for(manager.ensure_browser(), timeout=max(10, int(timeout_seconds))) - logger.info("[md2img] 浏览器预热完成") + browser = manager._browser + browser_pid = getattr(getattr(browser, "process", None), "pid", None) if browser else None + logger.info(f"[md2img] 浏览器预热完成: loop={current_loop_id}, pid={browser_pid}") return True except Exception as e: logger.error(f"[md2img] 浏览器预热失败: {e}")