Files
abot/utils/markdown_to_image.py
2026-05-01 12:45:35 +08:00

1029 lines
41 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import subprocess
import time
from pathlib import Path
import shutil
from typing import Optional, Tuple
import threading
from concurrent.futures import Future as ConcurrentFuture
import psutil
from playwright.async_api import async_playwright
import os
import asyncio
import re
from loguru import logger
try:
import markdown
except ImportError:
markdown = None
META_KEYWORDS = ["", "群名", "时间", "日期", "成员", "消息", "统计", "总结", "来源", "生成", "记录"]
STAT_PILL_CLASSES = {
"": "total",
"人数": "people",
"文本": "text",
"图片": "image",
"视频": "video",
"链接": "link",
"表情": "emoji",
}
def _extract_stats_pills_from_markdown(md_content: str) -> str:
text = str(md_content or "")
pattern = re.compile(
r"(^##\s+群概览\s*\n)([^\n]+)(?=\n(?:\n|##\s|###\s|$))",
re.M,
)
def replace(match):
stats_line = match.group(2).strip()
parts = [part.strip() for part in stats_line.split("·") if part.strip()]
pills = []
for part in parts:
item_match = re.match(r"(?:\*\*)?([^*\s]+)(?:\*\*)?\s+(\d+)", part)
if not item_match:
continue
label = item_match.group(1).strip()
value = item_match.group(2).strip()
kind = STAT_PILL_CLASSES.get(label, "default")
pills.append(
f'<span class="stats-pill stats-pill-{kind}"><span class="stats-pill-label">{label}</span><span class="stats-pill-value">{value}</span></span>'
)
if not pills:
return match.group(0)
return match.group(1) + f'<div class="stats-pills">{"".join(pills)}</div>'
return pattern.sub(replace, text, count=1)
def _simple_markdown_to_html(md_content: str) -> str:
lines = str(md_content or "").splitlines()
html_parts = []
in_ul = False
paragraph_lines = []
def flush_paragraph():
nonlocal paragraph_lines
if paragraph_lines:
text = " ".join(item.strip() for item in paragraph_lines if item.strip())
if text:
html_parts.append(f"<p>{text}</p>")
paragraph_lines = []
def close_ul():
nonlocal in_ul
if in_ul:
html_parts.append("</ul>")
in_ul = False
for raw_line in lines:
line = raw_line.rstrip()
stripped = line.strip()
if not stripped:
flush_paragraph()
close_ul()
continue
if stripped.startswith("# "):
flush_paragraph()
close_ul()
html_parts.append(f"<h1>{stripped[2:].strip()}</h1>")
continue
if stripped.startswith("## "):
flush_paragraph()
close_ul()
html_parts.append(f"<h2>{stripped[3:].strip()}</h2>")
continue
if stripped.startswith("### "):
flush_paragraph()
close_ul()
html_parts.append(f"<h3>{stripped[4:].strip()}</h3>")
continue
if stripped.startswith("<div ") and stripped.endswith("</div>"):
flush_paragraph()
close_ul()
html_parts.append(stripped)
continue
if stripped.startswith("- "):
flush_paragraph()
if not in_ul:
html_parts.append("<ul>")
in_ul = True
html_parts.append(f"<li>{stripped[2:].strip()}</li>")
continue
close_ul()
paragraph_lines.append(stripped)
flush_paragraph()
close_ul()
return "\n".join(html_parts)
async def safe_close_browser(browser, timeout: float = 4.0) -> None:
"""保活优先:不执行浏览器关闭。
设计说明:
1. 当前业务要求浏览器进程长期常驻,因此这里显式禁用“主动关闭进程”;
2. 即使上层调用了 safe_close_browser也只记录日志并直接返回
3. 保留函数签名是为了兼容现有调用点,避免大范围改动影响稳定性。
"""
if not browser:
return
logger.info("[md2img] 保活模式:跳过 safe_close_browser不主动关闭浏览器进程")
return
def _clean_text(html: str) -> str:
return re.sub(r'\s+', ' ', re.sub(r'<.*?>', ' ', html)).strip()
def _looks_like_meta(html: str) -> bool:
clean = _clean_text(html)
if not clean:
return False
if any(k in clean for k in META_KEYWORDS):
return True
return len(clean) <= 80
def _split_hero(html_body: str):
title_match = re.search(r'<h1>(.*?)</h1>', html_body, re.S | re.I)
hero_title = _clean_text(title_match.group(1)) if title_match else "聊天总结"
remain = re.sub(r'<h1>.*?</h1>', '', html_body, count=1, flags=re.S | re.I).strip()
block_pattern = re.compile(r'^\s*(<(?:p|blockquote|ul|ol)[^>]*>.*?</(?:p|blockquote|ul|ol)>)', re.S | re.I)
meta_blocks = []
for _ in range(4):
m = block_pattern.match(remain)
if not m:
break
block = m.group(1)
if not _looks_like_meta(block):
break
meta_blocks.append(block.strip())
remain = remain[m.end():].strip()
hero_meta = ''.join(meta_blocks)
hero_enabled = bool(title_match or meta_blocks)
return hero_title, hero_meta, remain, hero_enabled
async def md_str_to_html_content(md_content):
md_content = _extract_stats_pills_from_markdown(md_content)
if markdown is not None:
html_body = markdown.markdown(md_content, extensions=['extra', 'codehilite'])
else:
html_body = _simple_markdown_to_html(md_content)
hero_title, hero_meta, remain_html, hero_enabled = _split_hero(html_body)
css = """
<style>
:root {
--bg1: #0f172a;
--bg2: #172554;
--paper: #ffffff;
--text: #233044;
--muted: #c7d2e3;
--muted-2: rgba(235, 241, 255, 0.82);
--primary: #8b7cff;
--primary-soft: rgba(109,94,252,0.10);
--accent: #22c3b5;
--line: rgba(148,163,184,0.18);
--code-bg: #0f172a;
--code-fg: #e2e8f0;
--quote-bg: rgba(20,184,166,0.08);
--shadow: 0 20px 45px rgba(80, 84, 125, 0.10);
}
* { box-sizing: border-box; }
html, body { margin: 0; padding: 0; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', 'Noto Sans CJK SC', 'Microsoft YaHei', sans-serif;
color: var(--text);
font-size: 16px;
line-height: 1.78;
background:
radial-gradient(circle at top left, rgba(126, 93, 255, 0.14) 0%, transparent 28%),
radial-gradient(circle at top right, rgba(34, 195, 181, 0.12) 0%, transparent 24%),
linear-gradient(180deg, #eef4fb 0%, #e8f0f8 100%);
padding: 26px;
}
.wrap {
max-width: 820px;
margin: 0 auto;
background: rgba(255,255,255,0.97);
border: 1px solid rgba(255,255,255,0.7);
border-radius: 28px;
box-shadow: var(--shadow);
overflow: hidden;
}
.hero {
position: relative;
padding: 30px 34px 24px;
background:
radial-gradient(circle at 18% 18%, rgba(255,255,255,0.10) 0%, transparent 18%),
radial-gradient(circle at 85% 22%, rgba(255,255,255,0.12) 0%, transparent 20%),
linear-gradient(135deg, #1e1b4b 0%, #1d4ed8 52%, #0f766e 100%);
border-bottom: 1px solid rgba(255,255,255,0.08);
}
.hero::before {
content: "";
position: absolute;
inset: 0;
background:
linear-gradient(125deg, rgba(255,255,255,0.05) 0%, transparent 38%),
linear-gradient(300deg, rgba(255,255,255,0.04) 0%, transparent 30%);
pointer-events: none;
}
.hero::after {
content: "";
position: absolute;
right: -40px;
top: -36px;
width: 200px;
height: 200px;
border-radius: 50%;
border: 1px solid rgba(255,255,255,0.12);
box-shadow: 0 0 0 24px rgba(255,255,255,0.04), 0 0 0 56px rgba(255,255,255,0.025);
opacity: 0.9;
pointer-events: none;
}
.hero-badge {
position: relative;
display: inline-block;
padding: 6px 12px;
border-radius: 999px;
font-size: 12px;
color: #f8faff;
background: rgba(255,255,255,0.12);
border: 1px solid rgba(255,255,255,0.18);
margin-bottom: 14px;
letter-spacing: .06em;
}
.hero-title {
position: relative;
margin: 0;
font-size: 2.05em;
line-height: 1.28;
font-weight: 800;
color: #ffffff;
text-align: center;
letter-spacing: -0.02em;
text-shadow: 0 2px 10px rgba(0,0,0,0.12);
}
.hero-meta {
position: relative;
margin: 12px auto 0;
max-width: 660px;
text-align: center;
color: var(--muted-2);
font-size: 0.84em;
line-height: 1.72;
}
.hero-meta p, .hero-meta blockquote, .hero-meta ul, .hero-meta ol {
margin: 4px 0;
color: inherit;
background: transparent;
border: none;
padding: 0;
}
.hero-meta ul, .hero-meta ol { list-style: none; padding-left: 0; }
.content { padding: 24px 34px 34px; }
h1, h2, h3, h4, h5, h6 {
color: var(--text);
margin-top: 24px;
margin-bottom: 14px;
font-weight: 700;
line-height: 1.35;
letter-spacing: -0.01em;
}
.content.hero-active h1:first-of-type { display: none; }
h2 {
font-size: 1.42em;
margin-top: 30px;
padding: 10px 14px;
background: linear-gradient(90deg, var(--primary-soft), rgba(255,255,255,0));
border-left: 4px solid var(--primary);
border-radius: 12px;
}
h3 {
font-size: 1.15em;
margin-top: 24px;
color: #30435f;
padding-left: 12px;
border-left: 3px solid rgba(20,184,166,0.55);
}
p { margin: 14px 0; color: #334155; line-height: 1.88; }
ul, ol { padding-left: 26px; margin: 14px 0 18px; }
li { margin: 8px 0; color: #334155; }
li::marker { color: var(--primary); }
strong { color: #1e293b; font-weight: 700; }
em { color: #5b6b84; }
code {
background: rgba(109,94,252,0.08);
color: #5b3df5;
padding: 2px 8px;
border-radius: 8px;
font-size: 0.92em;
border: 1px solid rgba(109,94,252,0.10);
}
pre {
background: var(--code-bg);
color: var(--code-fg);
padding: 16px 18px;
border-radius: 16px;
overflow-x: auto;
border: 1px solid rgba(255,255,255,0.06);
box-shadow: inset 0 1px 0 rgba(255,255,255,0.03);
}
pre code { background: transparent; color: inherit; border: none; padding: 0; }
table {
border-collapse: separate;
border-spacing: 0;
width: 100%;
margin: 20px 0;
background: rgba(255,255,255,0.96);
border: 1px solid rgba(148,163,184,0.16);
border-radius: 16px;
overflow: hidden;
box-shadow: 0 8px 24px rgba(15,23,42,0.05);
}
th, td { padding: 12px 14px; text-align: left; border-bottom: 1px solid rgba(148,163,184,0.12); }
tr:last-child td { border-bottom: none; }
th { background: linear-gradient(180deg, rgba(109,94,252,0.10), rgba(109,94,252,0.04)); color: #334155; font-weight: 700; }
blockquote {
margin: 18px 0;
padding: 14px 18px;
background: var(--quote-bg);
border: 1px solid rgba(20,184,166,0.16);
border-left: 5px solid var(--accent);
border-radius: 14px;
color: #355468;
}
.stats-pills {
display: flex;
flex-wrap: wrap;
gap: 10px;
margin: 12px 0 8px;
}
.stats-pill {
display: inline-flex;
align-items: center;
gap: 8px;
padding: 7px 12px;
border-radius: 999px;
font-size: 0.92em;
line-height: 1;
border: 1px solid rgba(148,163,184,0.16);
background: linear-gradient(180deg, rgba(255,255,255,0.96), rgba(248,250,252,0.92));
color: #334155;
box-shadow: 0 8px 18px rgba(15,23,42,0.05);
}
.stats-pill-label {
display: inline-flex;
align-items: center;
justify-content: center;
padding: 4px 8px;
border-radius: 999px;
font-size: 0.82em;
font-weight: 700;
color: #ffffff;
background: linear-gradient(135deg, #64748b, #475569);
}
.stats-pill-value {
font-weight: 800;
color: #1e293b;
min-width: 20px;
}
.stats-pill-total .stats-pill-label { background: linear-gradient(135deg, #3b82f6, #1d4ed8); }
.stats-pill-people .stats-pill-label { background: linear-gradient(135deg, #0f766e, #14b8a6); }
.stats-pill-text .stats-pill-label { background: linear-gradient(135deg, #8b5cf6, #7c3aed); }
.stats-pill-image .stats-pill-label { background: linear-gradient(135deg, #ec4899, #db2777); }
.stats-pill-video .stats-pill-label { background: linear-gradient(135deg, #f97316, #ea580c); }
.stats-pill-link .stats-pill-label { background: linear-gradient(135deg, #22c55e, #16a34a); }
.stats-pill-emoji .stats-pill-label { background: linear-gradient(135deg, #eab308, #ca8a04); }
hr { border: none; height: 1px; background: linear-gradient(90deg, transparent, rgba(148,163,184,0.35), transparent); margin: 26px 0; }
a { color: var(--primary); text-decoration: none; border-bottom: 1px dashed rgba(109,94,252,0.35); }
.signature { margin-top: 34px; text-align: right; color: #73849c; font-size: 0.95em; font-style: italic; }
</style>
"""
hero_html = ''
content_class = 'content hero-active' if hero_enabled else 'content'
if hero_enabled:
hero_html = f'''
<div class="hero">
<div class="hero-badge">AI 群聊总结</div>
<h1 class="hero-title">{hero_title}</h1>
<div class="hero-meta">{hero_meta}</div>
</div>'''
full_html = f'''<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
{css}
</head>
<body>
<div class="wrap">{hero_html}
<div class="{content_class}">
{remain_html if hero_enabled else html_body}
</div>
</div>
</body>
</html>'''
return full_html
def check_chromium_installed(path):
return os.path.isfile(path) and os.access(path, os.X_OK)
def _collect_browser_candidates() -> list[Tuple[str, str]]:
candidates = []
if os.name == 'nt':
possible_chrome_paths = [
r"C:\Users\Liu_WIN10\AppData\Local\Google\Chrome\Application\chrome.exe",
r"C:\Users\Liu-OPEN\AppData\Local\Google\Chrome\Application\chrome.exe",
r"C:\Program Files\Google\Chrome\Application\chrome.exe",
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe",
]
for path in possible_chrome_paths:
if check_chromium_installed(path):
candidates.append(("system", path))
else:
import glob
for bin_name in ("google-chrome", "google-chrome-stable", "chromium", "chromium-browser"):
bin_path = shutil.which(bin_name)
if bin_path and check_chromium_installed(bin_path):
candidates.append(("system", bin_path))
user_home = os.path.expanduser("~")
glob_pattern = os.path.join(user_home, ".cache", "ms-playwright", "chromium-*", "chrome-linux", "chrome")
chrome_paths = glob.glob(glob_pattern)
for path in sorted(chrome_paths, reverse=True):
if check_chromium_installed(path):
candidates.append(("playwright-cache", path))
return candidates
class _PersistentBrowser:
"""常驻浏览器管理器。
目标:避免每次截图都冷启动 Chromium降低失败率并提升速度。
"""
def __init__(self):
self._playwright = None
self._browser = None
self._lock = asyncio.Lock()
self._launch_args = ["--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-gpu"]
self._last_launch_source = "unknown"
# 记录当前常驻浏览器所属事件循环,避免跨 loop 复用导致的句柄异常。
self._owner_loop_id: Optional[int] = None
# 保活心跳任务:定期探测浏览器连通性,异常时自动重建。
self._heartbeat_task: Optional[asyncio.Task] = None
# 心跳断连计数:避免单次抖动就触发重建。
self._disconnect_streak = 0
# 截图进行中标记:心跳期间若业务在跑,跳过本轮探测以避免误判。
self._capture_in_progress = False
async def _launch_browser(self):
if self._playwright is None:
self._playwright = await async_playwright().start()
for source, browser_path in _collect_browser_candidates():
try:
logger.debug(f"Launch chromium with {source}: {browser_path}")
browser = await self._playwright.chromium.launch(
executable_path=browser_path,
args=self._launch_args,
timeout=20000,
)
self._last_launch_source = f"{source}:{browser_path}"
return browser
except Exception as e:
logger.warning(f"Launch chromium failed with {source}: {browser_path}, error={e}")
logger.debug("Launch chromium with bundled browser")
browser = await self._playwright.chromium.launch(args=self._launch_args)
self._last_launch_source = "bundled"
return browser
async def ensure_browser(self):
current_loop_id = id(asyncio.get_running_loop())
if self._owner_loop_id is not None and self._owner_loop_id != current_loop_id:
# 发生跨事件循环访问时,主动丢弃旧句柄并在新 loop 重建。
# 注意:旧 loop 中的进程资源可能已被 runtime 回收,这里不再尝试跨 loop 强关,避免引入新死锁点。
logger.warning(
f"[md2img] 检测到跨事件循环复用,准备重建常驻浏览器: "
f"owner_loop={self._owner_loop_id}, current_loop={current_loop_id}"
)
self._browser = None
self._playwright = None
self._owner_loop_id = None
if self._browser and self._browser.is_connected():
return self._browser
async with self._lock:
if self._browser and self._browser.is_connected():
return self._browser
# 浏览器失联时先做一次清理,避免残留句柄影响重建。
if self._browser:
try:
await safe_close_browser(self._browser)
except Exception:
pass
self._browser = None
self._browser = await self._launch_browser()
self._owner_loop_id = current_loop_id
browser_pid = getattr(getattr(self._browser, "process", None), "pid", None)
logger.info(
f"[md2img] 常驻浏览器就绪: source={self._last_launch_source}, "
f"loop={self._owner_loop_id}, pid={browser_pid}"
)
self._ensure_heartbeat_task()
return self._browser
async def restart_browser(self, reason: str = "unknown"):
async with self._lock:
if self._browser:
try:
await safe_close_browser(self._browser)
except Exception:
pass
self._browser = None
self._browser = await self._launch_browser()
self._owner_loop_id = id(asyncio.get_running_loop())
self._disconnect_streak = 0
browser_pid = getattr(getattr(self._browser, "process", None), "pid", None)
logger.info(
f"[md2img] 常驻浏览器已重建: source={self._last_launch_source}, "
f"loop={self._owner_loop_id}, pid={browser_pid}, reason={reason}"
)
self._ensure_heartbeat_task()
return self._browser
async def _is_browser_alive(
self,
browser,
timeout_seconds: float = 3.0,
retry_count: int = 1,
retry_interval_seconds: float = 0.25,
) -> bool:
"""探测浏览器是否仍可用。
说明:
1. 单次探测失败可能只是瞬时抖动(例如驱动短暂繁忙);
2. 因此这里支持短重试,避免把“短暂不可用”误判为“已断连”;
3. 仅当多次探测都失败时,才认为浏览器真正失活。
"""
if not browser:
return False
probe_times = max(1, int(retry_count or 1))
timeout = max(float(timeout_seconds or 0), 0.8)
sleep_interval = max(float(retry_interval_seconds or 0), 0.05)
for idx in range(probe_times):
# 每次探测前都先看连接态,避免对明显失联句柄继续调用 API。
if not browser.is_connected():
if idx < probe_times - 1:
await asyncio.sleep(sleep_interval)
continue
return False
try:
await asyncio.wait_for(browser.version(), timeout=timeout)
return True
except Exception:
if idx < probe_times - 1:
await asyncio.sleep(sleep_interval)
continue
return False
return False
async def _heartbeat_loop(self):
"""周期性探测浏览器可用性,断连后自动重建。"""
while True:
try:
await asyncio.sleep(20)
# 没有浏览器实例时只保持心跳存活,不主动创建,避免空闲时不必要消耗。
if not self._browser:
self._disconnect_streak = 0
continue
if self._capture_in_progress:
# 截图期间跳过探测,避免与业务并发导致误判。
continue
if self._browser and self._browser.is_connected():
self._disconnect_streak = 0
continue
self._disconnect_streak += 1
if self._disconnect_streak >= 3:
logger.warning(
f"[md2img] 心跳探测连续{self._disconnect_streak}次发现浏览器断连,准备自动重建"
)
await self.restart_browser(reason="heartbeat_disconnected")
except asyncio.CancelledError:
raise
except Exception as e:
logger.warning(f"[md2img] 心跳探测异常: {e}")
def _ensure_heartbeat_task(self):
"""确保保活任务已启动(幂等)。"""
if self._heartbeat_task and not self._heartbeat_task.done():
return
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop(), name="md2img:heartbeat")
async def screenshot(
self,
html_content: str,
output_image: str,
viewport_width: int = 780,
viewport_height: int = 960,
device_scale_factor: float = 1.2,
):
browser = await self.ensure_browser()
async def _capture_with_browser(active_browser):
self._capture_in_progress = True
context = None
try:
# 说明:
# 1. 允许调用方按业务场景传入截图宽度(例如卡片模板可用更窄视口);
# 2. 默认值保持历史行为,确保未改造调用方不受影响;
# 3. 这里做最小边界保护,避免传入异常值导致 Playwright 抛错。
safe_width = max(320, int(viewport_width or 780))
safe_height = max(320, int(viewport_height or 960))
safe_scale = float(device_scale_factor or 1.2)
if safe_scale < 1.0:
safe_scale = 1.0
if safe_scale > 3.0:
safe_scale = 3.0
context = await active_browser.new_context(
viewport={"width": safe_width, "height": safe_height},
device_scale_factor=safe_scale,
)
page = await context.new_page()
logger.debug("Set page content")
await page.set_content(html_content, wait_until='domcontentloaded', timeout=15000)
logger.debug("Wait for fonts ready")
await page.evaluate("document.fonts.ready")
await asyncio.sleep(0.2)
logger.debug(f"Take screenshot: output={output_image}")
await page.screenshot(path=output_image, full_page=True, timeout=15000, animations="disabled")
if not os.path.exists(output_image):
raise RuntimeError(f"截图失败,输出文件不存在: {output_image}")
finally:
try:
if context:
await context.close()
except Exception:
pass
self._capture_in_progress = False
try:
await _capture_with_browser(browser)
# 截图完成后立刻做一次可用性探测。
# 在部分系统环境中,浏览器可能在任务完成后出现短暂抖动。
# 这里优先“保活语义”:不在截图收尾阶段立刻触发重建,避免误判时把本来可用的浏览器关掉。
# 这里使用“短重试探测”过滤瞬时抖动,避免误判触发不必要重建。
if not await self._is_browser_alive(
browser,
timeout_seconds=2.5,
retry_count=3,
retry_interval_seconds=0.35,
):
# 说明:
# 1. 仅做告警与“断连计数”推进,不立即调用 restart_browser
# 2. 后续由心跳任务统一判定是否需要重建(连续多次断连才重建);
# 3. 这样可以最大化避免“截图后立刻重建”导致的保活抖动。
logger.warning("[md2img] 截图后探测到浏览器可能断连,暂不立即重建,等待心跳进一步确认")
self._disconnect_streak = max(self._disconnect_streak, 2)
except Exception as e:
# 首次失败后重建一次浏览器再重试,提升抗偶发故障能力。
logger.warning(f"[md2img] 常驻浏览器截图失败,准备重建后重试: {e}")
browser = await self.restart_browser(reason="capture_exception_retry")
await _capture_with_browser(browser)
_BROWSER_MANAGER: Optional[_PersistentBrowser] = None
_MD2IMG_RUNTIME = None
_MD2IMG_RUNTIME_LOCK = threading.Lock()
class _Md2ImgRuntime:
"""Markdown 转图专用运行时。
设计目的:
1. 在独立线程中维护唯一事件循环,所有浏览器操作都在这个 loop 执行;
2. 彻底避免“预热在 A loop、截图在 B loop”的跨 loop 复用问题;
3. 为消息处理与定时任务提供统一稳定的浏览器执行上下文。
"""
def __init__(self):
self._thread: Optional[threading.Thread] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._lock = threading.Lock()
self._ready = threading.Event()
# 启动中标记:避免并发调用 ensure_started 时重复创建线程。
self._starting = False
@property
def loop(self) -> Optional[asyncio.AbstractEventLoop]:
return self._loop
def _thread_main(self):
"""运行时线程入口:创建并常驻事件循环。"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._loop = loop
self._ready.set()
logger.info(f"[md2img] 专用运行时已启动: thread={threading.current_thread().name}, loop={id(loop)}")
loop.run_forever()
def ensure_started(self):
"""确保运行时已启动(幂等)。"""
if self._thread and self._thread.is_alive() and self._loop and self._loop.is_running():
return
with self._lock:
if self._thread and self._thread.is_alive() and self._loop and self._loop.is_running():
return
if self._starting:
# 已有其他调用在启动中,当前线程等待启动完成即可。
pass
else:
self._starting = True
self._ready.clear()
self._thread = threading.Thread(target=self._thread_main, name="md2img-runtime", daemon=True)
self._thread.start()
# 注意:等待动作放到锁外,避免阻塞其他读取逻辑。
if not self._ready.wait(timeout=10):
with self._lock:
self._starting = False
raise RuntimeError("md2img 专用运行时启动超时")
with self._lock:
self._starting = False
def submit(self, coro) -> ConcurrentFuture:
"""向专用运行时提交协程任务。"""
self.ensure_started()
if not self._loop:
raise RuntimeError("md2img 运行时事件循环未就绪")
return asyncio.run_coroutine_threadsafe(coro, self._loop)
def _get_md2img_runtime() -> _Md2ImgRuntime:
global _MD2IMG_RUNTIME
# 并发首次访问时要加锁,避免创建出多个 runtime 实例,
# 进而出现“专用运行时已启动”日志重复与多线程并存问题。
if _MD2IMG_RUNTIME is None:
with _MD2IMG_RUNTIME_LOCK:
if _MD2IMG_RUNTIME is None:
_MD2IMG_RUNTIME = _Md2ImgRuntime()
return _MD2IMG_RUNTIME
def get_md2img_health_snapshot(ensure_runtime: bool = False) -> dict:
"""获取 Markdown 转图运行时健康快照(同步)。
Args:
ensure_runtime: 是否在采集前确保运行时已启动。
- False: 仅观察当前状态,不主动拉起线程;
- True: 先启动 md2img runtime再返回状态适合后台手动“刷新并拉起”场景。
Returns:
dict: 结构化健康信息,便于后台页面直接展示。
"""
runtime = _get_md2img_runtime()
if ensure_runtime:
# 显式拉起运行时,方便后台做一次“冷启动检查”。
runtime.ensure_started()
thread_obj = getattr(runtime, "_thread", None)
loop_obj = getattr(runtime, "_loop", None)
runtime_started = bool(thread_obj is not None)
runtime_thread_alive = bool(thread_obj.is_alive()) if thread_obj else False
runtime_loop_running = bool(loop_obj.is_running()) if loop_obj else False
runtime_loop_id = id(loop_obj) if loop_obj else None
runtime_thread_name = thread_obj.name if thread_obj else ""
browser_manager = _BROWSER_MANAGER
browser_connected = False
browser_loop_owner = None
browser_launch_source = ""
browser_pid = None
browser_proc_alive = None
browser_error = ""
if browser_manager is not None:
try:
browser_obj = getattr(browser_manager, "_browser", None)
browser_connected = bool(browser_obj and browser_obj.is_connected())
browser_loop_owner = getattr(browser_manager, "_owner_loop_id", None)
browser_launch_source = str(getattr(browser_manager, "_last_launch_source", "") or "")
browser_pid = getattr(getattr(browser_obj, "process", None), "pid", None) if browser_obj else None
if browser_pid:
# 通过 psutil 二次确认进程是否仍在,避免只看到历史 PID。
browser_proc_alive = psutil.pid_exists(int(browser_pid))
else:
browser_proc_alive = None
except Exception as e:
browser_error = str(e)
return {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"runtime": {
"started": runtime_started,
"thread_alive": runtime_thread_alive,
"thread_name": runtime_thread_name,
"loop_running": runtime_loop_running,
"loop_id": runtime_loop_id,
},
"browser": {
"connected": browser_connected,
"owner_loop_id": browser_loop_owner,
"launch_source": browser_launch_source,
"pid": browser_pid,
"pid_alive": browser_proc_alive,
"error": browser_error,
},
}
async def _run_in_md2img_runtime(coro, timeout_seconds: Optional[int] = None):
"""在 md2img 专用事件循环中执行协程,并在当前调用方 loop 中异步等待结果。"""
runtime = _get_md2img_runtime()
runtime.ensure_started()
target_loop = runtime.loop
current_loop = asyncio.get_running_loop()
# 若当前已在专用 loop 内,直接执行,避免不必要的线程跳转。
if target_loop is current_loop:
if timeout_seconds is not None:
return await asyncio.wait_for(coro, timeout=max(1, int(timeout_seconds)))
return await coro
future = runtime.submit(coro)
awaitable_future = asyncio.wrap_future(future)
if timeout_seconds is not None:
return await asyncio.wait_for(awaitable_future, timeout=max(1, int(timeout_seconds)))
# 关键修复:
# 之前这里直接 return Future 对象,调用方 await 后只拿到 Future 本身,
# 导致业务层误以为截图已完成,实际截图仍在后台执行,出现“先判失败后截图”的时序错乱。
# 这里必须等待 Future 完成并返回真实结果,保证调用链严格串行。
return await awaitable_future
def _get_browser_manager() -> _PersistentBrowser:
global _BROWSER_MANAGER
if _BROWSER_MANAGER is None:
_BROWSER_MANAGER = _PersistentBrowser()
return _BROWSER_MANAGER
async def warmup_md2img_browser(timeout_seconds: int = 45) -> bool:
"""预热 Markdown 转图浏览器(异步)。
设计目的:
1. 服务启动后提前完成浏览器冷启动,减少首个截图任务的等待和失败概率;
2. 不执行实际业务截图,仅确保常驻浏览器已可用。
"""
try:
logger.info(f"[md2img] 开始浏览器预热: caller_loop={id(asyncio.get_running_loop())}, timeout={int(timeout_seconds)}s")
async def _warmup_impl():
manager = _get_browser_manager()
await asyncio.wait_for(manager.ensure_browser(), timeout=max(10, int(timeout_seconds)))
browser = manager._browser
browser_pid = getattr(getattr(browser, "process", None), "pid", None) if browser else None
logger.info(f"[md2img] 浏览器预热完成: runtime_loop={id(asyncio.get_running_loop())}, pid={browser_pid}")
return True
await _run_in_md2img_runtime(_warmup_impl(), timeout_seconds=max(10, int(timeout_seconds) + 5))
return True
except Exception as e:
logger.error(f"[md2img] 浏览器预热失败: {e}")
return False
def warmup_md2img_browser_sync(timeout_seconds: int = 45) -> bool:
"""预热 Markdown 转图浏览器(同步包装,适合在线程中调用)。"""
try:
return asyncio.run(warmup_md2img_browser(timeout_seconds=timeout_seconds))
except Exception as e:
logger.error(f"[md2img] 同步预热执行失败: {e}")
return False
async def html_to_image(
html_content,
output_image,
viewport_width: int = 780,
viewport_height: int = 960,
device_scale_factor: float = 1.2,
):
"""将 HTML 渲染为图片。
说明:
1. 实际截图逻辑固定在 md2img 专用事件循环执行;
2. 调用方无论来自哪个线程/loop都只会复用同一套常驻浏览器。
"""
async def _html_to_image_impl():
manager = _get_browser_manager()
await manager.screenshot(
html_content,
output_image,
viewport_width=viewport_width,
viewport_height=viewport_height,
device_scale_factor=device_scale_factor,
)
await _run_in_md2img_runtime(_html_to_image_impl())
async def _await_with_progress(coro, timeout_seconds: int, stage_name: str, progress_interval_seconds: int = 10):
"""等待协程并周期输出进度,避免长时间无日志看起来像假死。"""
task = asyncio.create_task(coro)
start_ts = time.monotonic()
next_progress_at = progress_interval_seconds
try:
while True:
done, _ = await asyncio.wait({task}, timeout=1.0)
if done:
return task.result()
elapsed = int(time.monotonic() - start_ts)
if elapsed >= next_progress_at:
logger.info(f"[md2img] 阶段进行中: {stage_name}, elapsed={elapsed}s/{timeout_seconds}s")
next_progress_at += progress_interval_seconds
if elapsed >= timeout_seconds:
task.cancel()
raise asyncio.TimeoutError(f"[md2img] 阶段超时: {stage_name}, timeout={timeout_seconds}s")
finally:
if not task.done():
task.cancel()
async def convert_md_str_to_image(
md_content: str,
output_image: str,
max_retries: int = 2,
render_timeout_seconds: int = 90,
html_timeout_seconds: int = 30,
) -> str:
if not md_content:
raise ValueError("Markdown content cannot be empty")
project_root = os.getcwd()
temp_dir = Path(project_root) / "temp" / "md2image"
temp_dir.mkdir(parents=True, exist_ok=True)
output_image_path = temp_dir / output_image
last_error = None
for attempt in range(max_retries):
try:
attempt_no = attempt + 1
logger.debug(
f"尝试第 {attempt_no}/{max_retries} 次生成图片 "
f"(html_timeout={html_timeout_seconds}s, render_timeout={render_timeout_seconds}s)"
)
if output_image_path.exists():
os.remove(str(output_image_path))
stage_start = time.monotonic()
# 阶段一Markdown -> HTML。加超时可避免极端文本导致长期阻塞。
logger.info(f"[md2img] 开始阶段: markdown_to_html, attempt={attempt_no}/{max_retries}")
full_html = await _await_with_progress(
md_str_to_html_content(md_content),
timeout_seconds=max(5, int(html_timeout_seconds)),
stage_name="markdown_to_html",
)
logger.debug(f"{attempt_no} 次 HTML 生成耗时: {time.monotonic() - stage_start:.2f}s")
# 阶段二Playwright 渲染截图。加超时防止浏览器进程异常卡死。
stage_start = time.monotonic()
logger.info(f"[md2img] 开始阶段: html_to_image, attempt={attempt_no}/{max_retries}")
await _await_with_progress(
html_to_image(full_html, str(output_image_path)),
timeout_seconds=max(10, int(render_timeout_seconds)),
stage_name="html_to_image",
)
logger.debug(f"{attempt_no} 次截图耗时: {time.monotonic() - stage_start:.2f}s")
image_size = os.path.getsize(str(output_image_path))
if image_size < 1024:
raise RuntimeError(f"图片生成异常,大小仅为: {image_size} bytes")
logger.info(f"图片成功生成:{output_image_path}")
return str(output_image_path.resolve())
except asyncio.TimeoutError as e:
last_error = RuntimeError(
f"图片生成超时(attempt={attempt_no}/{max_retries}, "
f"html_timeout={html_timeout_seconds}s, render_timeout={render_timeout_seconds}s)"
)
logger.warning(str(last_error))
except Exception as e:
last_error = e
logger.warning(f"{attempt_no} 次尝试失败: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(1.5)
raise RuntimeError(f"图片生成失败,已重试 {max_retries} 次。最后错误: {last_error}")