Files
abot/utils/markdown_to_image.py
liuwei f90c0720b3 实现转图浏览器常驻复用并支持失败自动重建
变更项:1) 新增常驻浏览器管理器,避免每次截图都冷启动 Chromium。2) 截图流程改为复用同一 Browser,每次仅创建并关闭 context/page,提升稳定性与性能。3) 增加浏览器失联检测与自动重建机制,截图失败后重建一次并重试。4) 保留多候选浏览器启动策略(system/playwright-cache/bundled),并输出准确来源日志。5) 补充中文注释,明确常驻设计目标与故障恢复逻辑。
2026-04-17 09:26:23 +08:00

703 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import subprocess
import time
from pathlib import Path
import shutil
from typing import Optional, Tuple
import psutil
from playwright.async_api import async_playwright
import os
import asyncio
import re
from loguru import logger
try:
import markdown
except ImportError:
markdown = None
META_KEYWORDS = ["", "群名", "时间", "日期", "成员", "消息", "统计", "总结", "来源", "生成", "记录"]
STAT_PILL_CLASSES = {
"": "total",
"人数": "people",
"文本": "text",
"图片": "image",
"视频": "video",
"链接": "link",
"表情": "emoji",
}
def _extract_stats_pills_from_markdown(md_content: str) -> str:
text = str(md_content or "")
pattern = re.compile(
r"(^##\s+群概览\s*\n)([^\n]+)(?=\n(?:\n|##\s|###\s|$))",
re.M,
)
def replace(match):
stats_line = match.group(2).strip()
parts = [part.strip() for part in stats_line.split("·") if part.strip()]
pills = []
for part in parts:
item_match = re.match(r"(?:\*\*)?([^*\s]+)(?:\*\*)?\s+(\d+)", part)
if not item_match:
continue
label = item_match.group(1).strip()
value = item_match.group(2).strip()
kind = STAT_PILL_CLASSES.get(label, "default")
pills.append(
f'<span class="stats-pill stats-pill-{kind}"><span class="stats-pill-label">{label}</span><span class="stats-pill-value">{value}</span></span>'
)
if not pills:
return match.group(0)
return match.group(1) + f'<div class="stats-pills">{"".join(pills)}</div>'
return pattern.sub(replace, text, count=1)
def _simple_markdown_to_html(md_content: str) -> str:
lines = str(md_content or "").splitlines()
html_parts = []
in_ul = False
paragraph_lines = []
def flush_paragraph():
nonlocal paragraph_lines
if paragraph_lines:
text = " ".join(item.strip() for item in paragraph_lines if item.strip())
if text:
html_parts.append(f"<p>{text}</p>")
paragraph_lines = []
def close_ul():
nonlocal in_ul
if in_ul:
html_parts.append("</ul>")
in_ul = False
for raw_line in lines:
line = raw_line.rstrip()
stripped = line.strip()
if not stripped:
flush_paragraph()
close_ul()
continue
if stripped.startswith("# "):
flush_paragraph()
close_ul()
html_parts.append(f"<h1>{stripped[2:].strip()}</h1>")
continue
if stripped.startswith("## "):
flush_paragraph()
close_ul()
html_parts.append(f"<h2>{stripped[3:].strip()}</h2>")
continue
if stripped.startswith("### "):
flush_paragraph()
close_ul()
html_parts.append(f"<h3>{stripped[4:].strip()}</h3>")
continue
if stripped.startswith("<div ") and stripped.endswith("</div>"):
flush_paragraph()
close_ul()
html_parts.append(stripped)
continue
if stripped.startswith("- "):
flush_paragraph()
if not in_ul:
html_parts.append("<ul>")
in_ul = True
html_parts.append(f"<li>{stripped[2:].strip()}</li>")
continue
close_ul()
paragraph_lines.append(stripped)
flush_paragraph()
close_ul()
return "\n".join(html_parts)
async def safe_close_browser(browser, timeout: float = 4.0) -> None:
if not browser:
return
for context in browser.contexts[:]:
for page in context.pages[:]:
try:
await asyncio.wait_for(page.close(), timeout=1.5)
except Exception:
pass
try:
await asyncio.wait_for(context.close(), timeout=timeout)
except Exception:
pass
try:
await asyncio.wait_for(browser.close(), timeout=timeout)
logger.debug("browser closed gracefully")
return
except (asyncio.TimeoutError, Exception) as e:
logger.warning(f"browser.close failed: {e}")
if browser.process and browser.process.pid:
try:
parent = psutil.Process(browser.process.pid)
children = parent.children(recursive=True)
for proc in children:
try:
proc.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
try:
parent.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
try:
gone, alive = psutil.wait_procs([parent] + children, timeout=2)
except Exception:
gone, alive = [], [parent] + children
for proc in alive:
try:
proc.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
try:
gone, alive = psutil.wait_procs([parent] + children, timeout=3)
except Exception:
alive = []
if alive:
logger.warning(f"process still alive after kill: {[p.pid for p in alive]}")
else:
logger.debug("process tree terminated")
except (psutil.NoSuchProcess, Exception) as e:
logger.warning(f"force kill failed: {e}")
def _clean_text(html: str) -> str:
return re.sub(r'\s+', ' ', re.sub(r'<.*?>', ' ', html)).strip()
def _looks_like_meta(html: str) -> bool:
clean = _clean_text(html)
if not clean:
return False
if any(k in clean for k in META_KEYWORDS):
return True
return len(clean) <= 80
def _split_hero(html_body: str):
title_match = re.search(r'<h1>(.*?)</h1>', html_body, re.S | re.I)
hero_title = _clean_text(title_match.group(1)) if title_match else "聊天总结"
remain = re.sub(r'<h1>.*?</h1>', '', html_body, count=1, flags=re.S | re.I).strip()
block_pattern = re.compile(r'^\s*(<(?:p|blockquote|ul|ol)[^>]*>.*?</(?:p|blockquote|ul|ol)>)', re.S | re.I)
meta_blocks = []
for _ in range(4):
m = block_pattern.match(remain)
if not m:
break
block = m.group(1)
if not _looks_like_meta(block):
break
meta_blocks.append(block.strip())
remain = remain[m.end():].strip()
hero_meta = ''.join(meta_blocks)
hero_enabled = bool(title_match or meta_blocks)
return hero_title, hero_meta, remain, hero_enabled
async def md_str_to_html_content(md_content):
md_content = _extract_stats_pills_from_markdown(md_content)
if markdown is not None:
html_body = markdown.markdown(md_content, extensions=['extra', 'codehilite'])
else:
html_body = _simple_markdown_to_html(md_content)
hero_title, hero_meta, remain_html, hero_enabled = _split_hero(html_body)
css = """
<style>
:root {
--bg1: #0f172a;
--bg2: #172554;
--paper: #ffffff;
--text: #233044;
--muted: #c7d2e3;
--muted-2: rgba(235, 241, 255, 0.82);
--primary: #8b7cff;
--primary-soft: rgba(109,94,252,0.10);
--accent: #22c3b5;
--line: rgba(148,163,184,0.18);
--code-bg: #0f172a;
--code-fg: #e2e8f0;
--quote-bg: rgba(20,184,166,0.08);
--shadow: 0 20px 45px rgba(80, 84, 125, 0.10);
}
* { box-sizing: border-box; }
html, body { margin: 0; padding: 0; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', 'Noto Sans CJK SC', 'Microsoft YaHei', sans-serif;
color: var(--text);
font-size: 16px;
line-height: 1.78;
background:
radial-gradient(circle at top left, rgba(126, 93, 255, 0.14) 0%, transparent 28%),
radial-gradient(circle at top right, rgba(34, 195, 181, 0.12) 0%, transparent 24%),
linear-gradient(180deg, #eef4fb 0%, #e8f0f8 100%);
padding: 26px;
}
.wrap {
max-width: 820px;
margin: 0 auto;
background: rgba(255,255,255,0.97);
border: 1px solid rgba(255,255,255,0.7);
border-radius: 28px;
box-shadow: var(--shadow);
overflow: hidden;
}
.hero {
position: relative;
padding: 30px 34px 24px;
background:
radial-gradient(circle at 18% 18%, rgba(255,255,255,0.10) 0%, transparent 18%),
radial-gradient(circle at 85% 22%, rgba(255,255,255,0.12) 0%, transparent 20%),
linear-gradient(135deg, #1e1b4b 0%, #1d4ed8 52%, #0f766e 100%);
border-bottom: 1px solid rgba(255,255,255,0.08);
}
.hero::before {
content: "";
position: absolute;
inset: 0;
background:
linear-gradient(125deg, rgba(255,255,255,0.05) 0%, transparent 38%),
linear-gradient(300deg, rgba(255,255,255,0.04) 0%, transparent 30%);
pointer-events: none;
}
.hero::after {
content: "";
position: absolute;
right: -40px;
top: -36px;
width: 200px;
height: 200px;
border-radius: 50%;
border: 1px solid rgba(255,255,255,0.12);
box-shadow: 0 0 0 24px rgba(255,255,255,0.04), 0 0 0 56px rgba(255,255,255,0.025);
opacity: 0.9;
pointer-events: none;
}
.hero-badge {
position: relative;
display: inline-block;
padding: 6px 12px;
border-radius: 999px;
font-size: 12px;
color: #f8faff;
background: rgba(255,255,255,0.12);
border: 1px solid rgba(255,255,255,0.18);
margin-bottom: 14px;
letter-spacing: .06em;
}
.hero-title {
position: relative;
margin: 0;
font-size: 2.05em;
line-height: 1.28;
font-weight: 800;
color: #ffffff;
text-align: center;
letter-spacing: -0.02em;
text-shadow: 0 2px 10px rgba(0,0,0,0.12);
}
.hero-meta {
position: relative;
margin: 12px auto 0;
max-width: 660px;
text-align: center;
color: var(--muted-2);
font-size: 0.84em;
line-height: 1.72;
}
.hero-meta p, .hero-meta blockquote, .hero-meta ul, .hero-meta ol {
margin: 4px 0;
color: inherit;
background: transparent;
border: none;
padding: 0;
}
.hero-meta ul, .hero-meta ol { list-style: none; padding-left: 0; }
.content { padding: 24px 34px 34px; }
h1, h2, h3, h4, h5, h6 {
color: var(--text);
margin-top: 24px;
margin-bottom: 14px;
font-weight: 700;
line-height: 1.35;
letter-spacing: -0.01em;
}
.content.hero-active h1:first-of-type { display: none; }
h2 {
font-size: 1.42em;
margin-top: 30px;
padding: 10px 14px;
background: linear-gradient(90deg, var(--primary-soft), rgba(255,255,255,0));
border-left: 4px solid var(--primary);
border-radius: 12px;
}
h3 {
font-size: 1.15em;
margin-top: 24px;
color: #30435f;
padding-left: 12px;
border-left: 3px solid rgba(20,184,166,0.55);
}
p { margin: 14px 0; color: #334155; line-height: 1.88; }
ul, ol { padding-left: 26px; margin: 14px 0 18px; }
li { margin: 8px 0; color: #334155; }
li::marker { color: var(--primary); }
strong { color: #1e293b; font-weight: 700; }
em { color: #5b6b84; }
code {
background: rgba(109,94,252,0.08);
color: #5b3df5;
padding: 2px 8px;
border-radius: 8px;
font-size: 0.92em;
border: 1px solid rgba(109,94,252,0.10);
}
pre {
background: var(--code-bg);
color: var(--code-fg);
padding: 16px 18px;
border-radius: 16px;
overflow-x: auto;
border: 1px solid rgba(255,255,255,0.06);
box-shadow: inset 0 1px 0 rgba(255,255,255,0.03);
}
pre code { background: transparent; color: inherit; border: none; padding: 0; }
table {
border-collapse: separate;
border-spacing: 0;
width: 100%;
margin: 20px 0;
background: rgba(255,255,255,0.96);
border: 1px solid rgba(148,163,184,0.16);
border-radius: 16px;
overflow: hidden;
box-shadow: 0 8px 24px rgba(15,23,42,0.05);
}
th, td { padding: 12px 14px; text-align: left; border-bottom: 1px solid rgba(148,163,184,0.12); }
tr:last-child td { border-bottom: none; }
th { background: linear-gradient(180deg, rgba(109,94,252,0.10), rgba(109,94,252,0.04)); color: #334155; font-weight: 700; }
blockquote {
margin: 18px 0;
padding: 14px 18px;
background: var(--quote-bg);
border: 1px solid rgba(20,184,166,0.16);
border-left: 5px solid var(--accent);
border-radius: 14px;
color: #355468;
}
.stats-pills {
display: flex;
flex-wrap: wrap;
gap: 10px;
margin: 12px 0 8px;
}
.stats-pill {
display: inline-flex;
align-items: center;
gap: 8px;
padding: 7px 12px;
border-radius: 999px;
font-size: 0.92em;
line-height: 1;
border: 1px solid rgba(148,163,184,0.16);
background: linear-gradient(180deg, rgba(255,255,255,0.96), rgba(248,250,252,0.92));
color: #334155;
box-shadow: 0 8px 18px rgba(15,23,42,0.05);
}
.stats-pill-label {
display: inline-flex;
align-items: center;
justify-content: center;
padding: 4px 8px;
border-radius: 999px;
font-size: 0.82em;
font-weight: 700;
color: #ffffff;
background: linear-gradient(135deg, #64748b, #475569);
}
.stats-pill-value {
font-weight: 800;
color: #1e293b;
min-width: 20px;
}
.stats-pill-total .stats-pill-label { background: linear-gradient(135deg, #3b82f6, #1d4ed8); }
.stats-pill-people .stats-pill-label { background: linear-gradient(135deg, #0f766e, #14b8a6); }
.stats-pill-text .stats-pill-label { background: linear-gradient(135deg, #8b5cf6, #7c3aed); }
.stats-pill-image .stats-pill-label { background: linear-gradient(135deg, #ec4899, #db2777); }
.stats-pill-video .stats-pill-label { background: linear-gradient(135deg, #f97316, #ea580c); }
.stats-pill-link .stats-pill-label { background: linear-gradient(135deg, #22c55e, #16a34a); }
.stats-pill-emoji .stats-pill-label { background: linear-gradient(135deg, #eab308, #ca8a04); }
hr { border: none; height: 1px; background: linear-gradient(90deg, transparent, rgba(148,163,184,0.35), transparent); margin: 26px 0; }
a { color: var(--primary); text-decoration: none; border-bottom: 1px dashed rgba(109,94,252,0.35); }
.signature { margin-top: 34px; text-align: right; color: #73849c; font-size: 0.95em; font-style: italic; }
</style>
"""
hero_html = ''
content_class = 'content hero-active' if hero_enabled else 'content'
if hero_enabled:
hero_html = f'''
<div class="hero">
<div class="hero-badge">AI 群聊总结</div>
<h1 class="hero-title">{hero_title}</h1>
<div class="hero-meta">{hero_meta}</div>
</div>'''
full_html = f'''<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
{css}
</head>
<body>
<div class="wrap">{hero_html}
<div class="{content_class}">
{remain_html if hero_enabled else html_body}
</div>
</div>
</body>
</html>'''
return full_html
def check_chromium_installed(path):
return os.path.isfile(path) and os.access(path, os.X_OK)
def _collect_browser_candidates() -> list[Tuple[str, str]]:
candidates = []
if os.name == 'nt':
possible_chrome_paths = [
r"C:\Users\Liu_WIN10\AppData\Local\Google\Chrome\Application\chrome.exe",
r"C:\Users\Liu-OPEN\AppData\Local\Google\Chrome\Application\chrome.exe",
r"C:\Program Files\Google\Chrome\Application\chrome.exe",
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe",
]
for path in possible_chrome_paths:
if check_chromium_installed(path):
candidates.append(("system", path))
else:
import glob
for bin_name in ("google-chrome", "google-chrome-stable", "chromium", "chromium-browser"):
bin_path = shutil.which(bin_name)
if bin_path and check_chromium_installed(bin_path):
candidates.append(("system", bin_path))
user_home = os.path.expanduser("~")
glob_pattern = os.path.join(user_home, ".cache", "ms-playwright", "chromium-*", "chrome-linux", "chrome")
chrome_paths = glob.glob(glob_pattern)
for path in sorted(chrome_paths, reverse=True):
if check_chromium_installed(path):
candidates.append(("playwright-cache", path))
return candidates
class _PersistentBrowser:
"""常驻浏览器管理器。
目标:避免每次截图都冷启动 Chromium降低失败率并提升速度。
"""
def __init__(self):
self._playwright = None
self._browser = None
self._lock = asyncio.Lock()
self._launch_args = ["--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-gpu"]
self._last_launch_source = "unknown"
async def _launch_browser(self):
if self._playwright is None:
self._playwright = await async_playwright().start()
for source, browser_path in _collect_browser_candidates():
try:
logger.debug(f"Launch chromium with {source}: {browser_path}")
browser = await self._playwright.chromium.launch(
executable_path=browser_path,
args=self._launch_args,
timeout=20000,
)
self._last_launch_source = f"{source}:{browser_path}"
return browser
except Exception as e:
logger.warning(f"Launch chromium failed with {source}: {browser_path}, error={e}")
logger.debug("Launch chromium with bundled browser")
browser = await self._playwright.chromium.launch(args=self._launch_args)
self._last_launch_source = "bundled"
return browser
async def ensure_browser(self):
if self._browser and self._browser.is_connected():
return self._browser
async with self._lock:
if self._browser and self._browser.is_connected():
return self._browser
# 浏览器失联时先做一次清理,避免残留句柄影响重建。
if self._browser:
try:
await safe_close_browser(self._browser)
except Exception:
pass
self._browser = None
self._browser = await self._launch_browser()
logger.info(f"[md2img] 常驻浏览器就绪: source={self._last_launch_source}")
return self._browser
async def restart_browser(self):
async with self._lock:
if self._browser:
try:
await safe_close_browser(self._browser)
except Exception:
pass
self._browser = None
self._browser = await self._launch_browser()
logger.info(f"[md2img] 常驻浏览器已重建: source={self._last_launch_source}")
return self._browser
async def screenshot(self, html_content: str, output_image: str):
browser = await self.ensure_browser()
async def _capture_with_browser(active_browser):
context = await active_browser.new_context(viewport={"width": 780, "height": 960}, device_scale_factor=1.2)
try:
page = await context.new_page()
logger.debug("Set page content")
await page.set_content(html_content, wait_until='domcontentloaded', timeout=15000)
logger.debug("Wait for fonts ready")
await page.evaluate("document.fonts.ready")
await asyncio.sleep(0.2)
logger.debug(f"Take screenshot: output={output_image}")
await page.screenshot(path=output_image, full_page=True, timeout=15000, animations="disabled")
if not os.path.exists(output_image):
raise RuntimeError(f"截图失败,输出文件不存在: {output_image}")
finally:
try:
await context.close()
except Exception:
pass
try:
await _capture_with_browser(browser)
except Exception as e:
# 首次失败后重建一次浏览器再重试,提升抗偶发故障能力。
logger.warning(f"[md2img] 常驻浏览器截图失败,准备重建后重试: {e}")
browser = await self.restart_browser()
await _capture_with_browser(browser)
_BROWSER_MANAGER: Optional[_PersistentBrowser] = None
def _get_browser_manager() -> _PersistentBrowser:
global _BROWSER_MANAGER
if _BROWSER_MANAGER is None:
_BROWSER_MANAGER = _PersistentBrowser()
return _BROWSER_MANAGER
async def html_to_image(html_content, output_image):
manager = _get_browser_manager()
await manager.screenshot(html_content, output_image)
async def _await_with_progress(coro, timeout_seconds: int, stage_name: str, progress_interval_seconds: int = 10):
"""等待协程并周期输出进度,避免长时间无日志看起来像假死。"""
task = asyncio.create_task(coro)
start_ts = time.monotonic()
next_progress_at = progress_interval_seconds
try:
while True:
done, _ = await asyncio.wait({task}, timeout=1.0)
if done:
return task.result()
elapsed = int(time.monotonic() - start_ts)
if elapsed >= next_progress_at:
logger.info(f"[md2img] 阶段进行中: {stage_name}, elapsed={elapsed}s/{timeout_seconds}s")
next_progress_at += progress_interval_seconds
if elapsed >= timeout_seconds:
task.cancel()
raise asyncio.TimeoutError(f"[md2img] 阶段超时: {stage_name}, timeout={timeout_seconds}s")
finally:
if not task.done():
task.cancel()
async def convert_md_str_to_image(
md_content: str,
output_image: str,
max_retries: int = 2,
render_timeout_seconds: int = 90,
html_timeout_seconds: int = 30,
) -> str:
if not md_content:
raise ValueError("Markdown content cannot be empty")
project_root = os.getcwd()
temp_dir = Path(project_root) / "temp" / "md2image"
temp_dir.mkdir(parents=True, exist_ok=True)
output_image_path = temp_dir / output_image
last_error = None
for attempt in range(max_retries):
try:
attempt_no = attempt + 1
logger.debug(
f"尝试第 {attempt_no}/{max_retries} 次生成图片 "
f"(html_timeout={html_timeout_seconds}s, render_timeout={render_timeout_seconds}s)"
)
if output_image_path.exists():
os.remove(str(output_image_path))
stage_start = time.monotonic()
# 阶段一Markdown -> HTML。加超时可避免极端文本导致长期阻塞。
logger.info(f"[md2img] 开始阶段: markdown_to_html, attempt={attempt_no}/{max_retries}")
full_html = await _await_with_progress(
md_str_to_html_content(md_content),
timeout_seconds=max(5, int(html_timeout_seconds)),
stage_name="markdown_to_html",
)
logger.debug(f"{attempt_no} 次 HTML 生成耗时: {time.monotonic() - stage_start:.2f}s")
# 阶段二Playwright 渲染截图。加超时防止浏览器进程异常卡死。
stage_start = time.monotonic()
logger.info(f"[md2img] 开始阶段: html_to_image, attempt={attempt_no}/{max_retries}")
await _await_with_progress(
html_to_image(full_html, str(output_image_path)),
timeout_seconds=max(10, int(render_timeout_seconds)),
stage_name="html_to_image",
)
logger.debug(f"{attempt_no} 次截图耗时: {time.monotonic() - stage_start:.2f}s")
image_size = os.path.getsize(str(output_image_path))
if image_size < 1024:
raise RuntimeError(f"图片生成异常,大小仅为: {image_size} bytes")
logger.info(f"图片成功生成:{output_image_path}")
return str(output_image_path.resolve())
except asyncio.TimeoutError as e:
last_error = RuntimeError(
f"图片生成超时(attempt={attempt_no}/{max_retries}, "
f"html_timeout={html_timeout_seconds}s, render_timeout={render_timeout_seconds}s)"
)
logger.warning(str(last_error))
except Exception as e:
last_error = e
logger.warning(f"{attempt_no} 次尝试失败: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(1.5)
raise RuntimeError(f"图片生成失败,已重试 {max_retries} 次。最后错误: {last_error}")