Files
abot/utils/markdown_to_image.py
liuwei 43c334354f 增强转图浏览器启动健壮性并修正日志来源标识
变更项:1) html_to_image 改为候选浏览器逐个尝试启动,避免单一路径失败导致整体异常。2) Linux 增加系统浏览器路径探测(google-chrome/chromium)并保留 Playwright 缓存路径作为候选。3) 修正启动日志来源标识,区分 system 与 playwright-cache,避免误判。4) 所有候选失败时自动回退到 bundled 浏览器,提高转图成功率。
2026-04-17 09:18:02 +08:00

633 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import subprocess
import time
from pathlib import Path
import shutil
import psutil
from playwright.async_api import async_playwright
import os
import asyncio
import re
from loguru import logger
try:
import markdown
except ImportError:
markdown = None
META_KEYWORDS = ["", "群名", "时间", "日期", "成员", "消息", "统计", "总结", "来源", "生成", "记录"]
STAT_PILL_CLASSES = {
"": "total",
"人数": "people",
"文本": "text",
"图片": "image",
"视频": "video",
"链接": "link",
"表情": "emoji",
}
def _extract_stats_pills_from_markdown(md_content: str) -> str:
text = str(md_content or "")
pattern = re.compile(
r"(^##\s+群概览\s*\n)([^\n]+)(?=\n(?:\n|##\s|###\s|$))",
re.M,
)
def replace(match):
stats_line = match.group(2).strip()
parts = [part.strip() for part in stats_line.split("·") if part.strip()]
pills = []
for part in parts:
item_match = re.match(r"(?:\*\*)?([^*\s]+)(?:\*\*)?\s+(\d+)", part)
if not item_match:
continue
label = item_match.group(1).strip()
value = item_match.group(2).strip()
kind = STAT_PILL_CLASSES.get(label, "default")
pills.append(
f'<span class="stats-pill stats-pill-{kind}"><span class="stats-pill-label">{label}</span><span class="stats-pill-value">{value}</span></span>'
)
if not pills:
return match.group(0)
return match.group(1) + f'<div class="stats-pills">{"".join(pills)}</div>'
return pattern.sub(replace, text, count=1)
def _simple_markdown_to_html(md_content: str) -> str:
lines = str(md_content or "").splitlines()
html_parts = []
in_ul = False
paragraph_lines = []
def flush_paragraph():
nonlocal paragraph_lines
if paragraph_lines:
text = " ".join(item.strip() for item in paragraph_lines if item.strip())
if text:
html_parts.append(f"<p>{text}</p>")
paragraph_lines = []
def close_ul():
nonlocal in_ul
if in_ul:
html_parts.append("</ul>")
in_ul = False
for raw_line in lines:
line = raw_line.rstrip()
stripped = line.strip()
if not stripped:
flush_paragraph()
close_ul()
continue
if stripped.startswith("# "):
flush_paragraph()
close_ul()
html_parts.append(f"<h1>{stripped[2:].strip()}</h1>")
continue
if stripped.startswith("## "):
flush_paragraph()
close_ul()
html_parts.append(f"<h2>{stripped[3:].strip()}</h2>")
continue
if stripped.startswith("### "):
flush_paragraph()
close_ul()
html_parts.append(f"<h3>{stripped[4:].strip()}</h3>")
continue
if stripped.startswith("<div ") and stripped.endswith("</div>"):
flush_paragraph()
close_ul()
html_parts.append(stripped)
continue
if stripped.startswith("- "):
flush_paragraph()
if not in_ul:
html_parts.append("<ul>")
in_ul = True
html_parts.append(f"<li>{stripped[2:].strip()}</li>")
continue
close_ul()
paragraph_lines.append(stripped)
flush_paragraph()
close_ul()
return "\n".join(html_parts)
async def safe_close_browser(browser, timeout: float = 4.0) -> None:
if not browser:
return
for context in browser.contexts[:]:
for page in context.pages[:]:
try:
await asyncio.wait_for(page.close(), timeout=1.5)
except Exception:
pass
try:
await asyncio.wait_for(context.close(), timeout=timeout)
except Exception:
pass
try:
await asyncio.wait_for(browser.close(), timeout=timeout)
logger.debug("browser closed gracefully")
return
except (asyncio.TimeoutError, Exception) as e:
logger.warning(f"browser.close failed: {e}")
if browser.process and browser.process.pid:
try:
parent = psutil.Process(browser.process.pid)
children = parent.children(recursive=True)
for proc in children:
try:
proc.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
try:
parent.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
try:
gone, alive = psutil.wait_procs([parent] + children, timeout=2)
except Exception:
gone, alive = [], [parent] + children
for proc in alive:
try:
proc.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
try:
gone, alive = psutil.wait_procs([parent] + children, timeout=3)
except Exception:
alive = []
if alive:
logger.warning(f"process still alive after kill: {[p.pid for p in alive]}")
else:
logger.debug("process tree terminated")
except (psutil.NoSuchProcess, Exception) as e:
logger.warning(f"force kill failed: {e}")
def _clean_text(html: str) -> str:
return re.sub(r'\s+', ' ', re.sub(r'<.*?>', ' ', html)).strip()
def _looks_like_meta(html: str) -> bool:
clean = _clean_text(html)
if not clean:
return False
if any(k in clean for k in META_KEYWORDS):
return True
return len(clean) <= 80
def _split_hero(html_body: str):
title_match = re.search(r'<h1>(.*?)</h1>', html_body, re.S | re.I)
hero_title = _clean_text(title_match.group(1)) if title_match else "聊天总结"
remain = re.sub(r'<h1>.*?</h1>', '', html_body, count=1, flags=re.S | re.I).strip()
block_pattern = re.compile(r'^\s*(<(?:p|blockquote|ul|ol)[^>]*>.*?</(?:p|blockquote|ul|ol)>)', re.S | re.I)
meta_blocks = []
for _ in range(4):
m = block_pattern.match(remain)
if not m:
break
block = m.group(1)
if not _looks_like_meta(block):
break
meta_blocks.append(block.strip())
remain = remain[m.end():].strip()
hero_meta = ''.join(meta_blocks)
hero_enabled = bool(title_match or meta_blocks)
return hero_title, hero_meta, remain, hero_enabled
async def md_str_to_html_content(md_content):
md_content = _extract_stats_pills_from_markdown(md_content)
if markdown is not None:
html_body = markdown.markdown(md_content, extensions=['extra', 'codehilite'])
else:
html_body = _simple_markdown_to_html(md_content)
hero_title, hero_meta, remain_html, hero_enabled = _split_hero(html_body)
css = """
<style>
:root {
--bg1: #0f172a;
--bg2: #172554;
--paper: #ffffff;
--text: #233044;
--muted: #c7d2e3;
--muted-2: rgba(235, 241, 255, 0.82);
--primary: #8b7cff;
--primary-soft: rgba(109,94,252,0.10);
--accent: #22c3b5;
--line: rgba(148,163,184,0.18);
--code-bg: #0f172a;
--code-fg: #e2e8f0;
--quote-bg: rgba(20,184,166,0.08);
--shadow: 0 20px 45px rgba(80, 84, 125, 0.10);
}
* { box-sizing: border-box; }
html, body { margin: 0; padding: 0; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', 'Noto Sans CJK SC', 'Microsoft YaHei', sans-serif;
color: var(--text);
font-size: 16px;
line-height: 1.78;
background:
radial-gradient(circle at top left, rgba(126, 93, 255, 0.14) 0%, transparent 28%),
radial-gradient(circle at top right, rgba(34, 195, 181, 0.12) 0%, transparent 24%),
linear-gradient(180deg, #eef4fb 0%, #e8f0f8 100%);
padding: 26px;
}
.wrap {
max-width: 820px;
margin: 0 auto;
background: rgba(255,255,255,0.97);
border: 1px solid rgba(255,255,255,0.7);
border-radius: 28px;
box-shadow: var(--shadow);
overflow: hidden;
}
.hero {
position: relative;
padding: 30px 34px 24px;
background:
radial-gradient(circle at 18% 18%, rgba(255,255,255,0.10) 0%, transparent 18%),
radial-gradient(circle at 85% 22%, rgba(255,255,255,0.12) 0%, transparent 20%),
linear-gradient(135deg, #1e1b4b 0%, #1d4ed8 52%, #0f766e 100%);
border-bottom: 1px solid rgba(255,255,255,0.08);
}
.hero::before {
content: "";
position: absolute;
inset: 0;
background:
linear-gradient(125deg, rgba(255,255,255,0.05) 0%, transparent 38%),
linear-gradient(300deg, rgba(255,255,255,0.04) 0%, transparent 30%);
pointer-events: none;
}
.hero::after {
content: "";
position: absolute;
right: -40px;
top: -36px;
width: 200px;
height: 200px;
border-radius: 50%;
border: 1px solid rgba(255,255,255,0.12);
box-shadow: 0 0 0 24px rgba(255,255,255,0.04), 0 0 0 56px rgba(255,255,255,0.025);
opacity: 0.9;
pointer-events: none;
}
.hero-badge {
position: relative;
display: inline-block;
padding: 6px 12px;
border-radius: 999px;
font-size: 12px;
color: #f8faff;
background: rgba(255,255,255,0.12);
border: 1px solid rgba(255,255,255,0.18);
margin-bottom: 14px;
letter-spacing: .06em;
}
.hero-title {
position: relative;
margin: 0;
font-size: 2.05em;
line-height: 1.28;
font-weight: 800;
color: #ffffff;
text-align: center;
letter-spacing: -0.02em;
text-shadow: 0 2px 10px rgba(0,0,0,0.12);
}
.hero-meta {
position: relative;
margin: 12px auto 0;
max-width: 660px;
text-align: center;
color: var(--muted-2);
font-size: 0.84em;
line-height: 1.72;
}
.hero-meta p, .hero-meta blockquote, .hero-meta ul, .hero-meta ol {
margin: 4px 0;
color: inherit;
background: transparent;
border: none;
padding: 0;
}
.hero-meta ul, .hero-meta ol { list-style: none; padding-left: 0; }
.content { padding: 24px 34px 34px; }
h1, h2, h3, h4, h5, h6 {
color: var(--text);
margin-top: 24px;
margin-bottom: 14px;
font-weight: 700;
line-height: 1.35;
letter-spacing: -0.01em;
}
.content.hero-active h1:first-of-type { display: none; }
h2 {
font-size: 1.42em;
margin-top: 30px;
padding: 10px 14px;
background: linear-gradient(90deg, var(--primary-soft), rgba(255,255,255,0));
border-left: 4px solid var(--primary);
border-radius: 12px;
}
h3 {
font-size: 1.15em;
margin-top: 24px;
color: #30435f;
padding-left: 12px;
border-left: 3px solid rgba(20,184,166,0.55);
}
p { margin: 14px 0; color: #334155; line-height: 1.88; }
ul, ol { padding-left: 26px; margin: 14px 0 18px; }
li { margin: 8px 0; color: #334155; }
li::marker { color: var(--primary); }
strong { color: #1e293b; font-weight: 700; }
em { color: #5b6b84; }
code {
background: rgba(109,94,252,0.08);
color: #5b3df5;
padding: 2px 8px;
border-radius: 8px;
font-size: 0.92em;
border: 1px solid rgba(109,94,252,0.10);
}
pre {
background: var(--code-bg);
color: var(--code-fg);
padding: 16px 18px;
border-radius: 16px;
overflow-x: auto;
border: 1px solid rgba(255,255,255,0.06);
box-shadow: inset 0 1px 0 rgba(255,255,255,0.03);
}
pre code { background: transparent; color: inherit; border: none; padding: 0; }
table {
border-collapse: separate;
border-spacing: 0;
width: 100%;
margin: 20px 0;
background: rgba(255,255,255,0.96);
border: 1px solid rgba(148,163,184,0.16);
border-radius: 16px;
overflow: hidden;
box-shadow: 0 8px 24px rgba(15,23,42,0.05);
}
th, td { padding: 12px 14px; text-align: left; border-bottom: 1px solid rgba(148,163,184,0.12); }
tr:last-child td { border-bottom: none; }
th { background: linear-gradient(180deg, rgba(109,94,252,0.10), rgba(109,94,252,0.04)); color: #334155; font-weight: 700; }
blockquote {
margin: 18px 0;
padding: 14px 18px;
background: var(--quote-bg);
border: 1px solid rgba(20,184,166,0.16);
border-left: 5px solid var(--accent);
border-radius: 14px;
color: #355468;
}
.stats-pills {
display: flex;
flex-wrap: wrap;
gap: 10px;
margin: 12px 0 8px;
}
.stats-pill {
display: inline-flex;
align-items: center;
gap: 8px;
padding: 7px 12px;
border-radius: 999px;
font-size: 0.92em;
line-height: 1;
border: 1px solid rgba(148,163,184,0.16);
background: linear-gradient(180deg, rgba(255,255,255,0.96), rgba(248,250,252,0.92));
color: #334155;
box-shadow: 0 8px 18px rgba(15,23,42,0.05);
}
.stats-pill-label {
display: inline-flex;
align-items: center;
justify-content: center;
padding: 4px 8px;
border-radius: 999px;
font-size: 0.82em;
font-weight: 700;
color: #ffffff;
background: linear-gradient(135deg, #64748b, #475569);
}
.stats-pill-value {
font-weight: 800;
color: #1e293b;
min-width: 20px;
}
.stats-pill-total .stats-pill-label { background: linear-gradient(135deg, #3b82f6, #1d4ed8); }
.stats-pill-people .stats-pill-label { background: linear-gradient(135deg, #0f766e, #14b8a6); }
.stats-pill-text .stats-pill-label { background: linear-gradient(135deg, #8b5cf6, #7c3aed); }
.stats-pill-image .stats-pill-label { background: linear-gradient(135deg, #ec4899, #db2777); }
.stats-pill-video .stats-pill-label { background: linear-gradient(135deg, #f97316, #ea580c); }
.stats-pill-link .stats-pill-label { background: linear-gradient(135deg, #22c55e, #16a34a); }
.stats-pill-emoji .stats-pill-label { background: linear-gradient(135deg, #eab308, #ca8a04); }
hr { border: none; height: 1px; background: linear-gradient(90deg, transparent, rgba(148,163,184,0.35), transparent); margin: 26px 0; }
a { color: var(--primary); text-decoration: none; border-bottom: 1px dashed rgba(109,94,252,0.35); }
.signature { margin-top: 34px; text-align: right; color: #73849c; font-size: 0.95em; font-style: italic; }
</style>
"""
hero_html = ''
content_class = 'content hero-active' if hero_enabled else 'content'
if hero_enabled:
hero_html = f'''
<div class="hero">
<div class="hero-badge">AI 群聊总结</div>
<h1 class="hero-title">{hero_title}</h1>
<div class="hero-meta">{hero_meta}</div>
</div>'''
full_html = f'''<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
{css}
</head>
<body>
<div class="wrap">{hero_html}
<div class="{content_class}">
{remain_html if hero_enabled else html_body}
</div>
</div>
</body>
</html>'''
return full_html
def check_chromium_installed(path):
return os.path.isfile(path) and os.access(path, os.X_OK)
async def html_to_image(html_content, output_image):
async with async_playwright() as p:
browser_candidates = []
if os.name == 'nt':
# Windows 优先尝试常见系统安装路径。
possible_chrome_paths = [
r"C:\Users\Liu_WIN10\AppData\Local\Google\Chrome\Application\chrome.exe",
r"C:\Users\Liu-OPEN\AppData\Local\Google\Chrome\Application\chrome.exe",
r"C:\Program Files\Google\Chrome\Application\chrome.exe",
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe"
]
for path in possible_chrome_paths:
if check_chromium_installed(path):
browser_candidates.append(("system", path))
else:
import glob
# Linux 先尝试系统可执行文件,再尝试 Playwright 缓存浏览器。
for bin_name in ("google-chrome", "google-chrome-stable", "chromium", "chromium-browser"):
bin_path = shutil.which(bin_name)
if bin_path and check_chromium_installed(bin_path):
browser_candidates.append(("system", bin_path))
user_home = os.path.expanduser("~")
glob_pattern = os.path.join(user_home, ".cache", "ms-playwright", "chromium-*", "chrome-linux", "chrome")
chrome_paths = glob.glob(glob_pattern)
for path in sorted(chrome_paths, reverse=True):
if check_chromium_installed(path):
browser_candidates.append(("playwright-cache", path))
launch_args = ["--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-gpu"]
browser = None
launch_errors = []
# 优先按候选路径逐个尝试,失败自动降级,不让单一路径问题导致整体失败。
for source, browser_path in browser_candidates:
try:
logger.debug(f"Launch chromium with {source}: {browser_path}")
browser = await p.chromium.launch(
executable_path=browser_path,
args=launch_args,
timeout=20000,
)
break
except Exception as e:
launch_errors.append(f"{source}:{browser_path} -> {e}")
logger.warning(f"Launch chromium failed with {source}: {browser_path}, error={e}")
# 如果候选都失败,回退到 Playwright bundled 浏览器。
if not browser:
logger.debug("Launch chromium with bundled browser")
browser = await p.chromium.launch(args=launch_args)
try:
context = await browser.new_context(viewport={"width": 780, "height": 960}, device_scale_factor=1.2)
page = await context.new_page()
logger.debug("Set page content")
await page.set_content(html_content, wait_until='domcontentloaded', timeout=15000)
logger.debug("Wait for fonts ready")
await page.evaluate("document.fonts.ready")
await asyncio.sleep(0.2)
logger.debug(f"Take screenshot: output={output_image}")
await page.screenshot(path=output_image, full_page=True, timeout=15000, animations="disabled")
if not os.path.exists(output_image):
raise RuntimeError(f"截图失败,输出文件不存在: {output_image}")
finally:
logger.debug("Closing browser")
await safe_close_browser(browser)
async def _await_with_progress(coro, timeout_seconds: int, stage_name: str, progress_interval_seconds: int = 10):
"""等待协程并周期输出进度,避免长时间无日志看起来像假死。"""
task = asyncio.create_task(coro)
start_ts = time.monotonic()
next_progress_at = progress_interval_seconds
try:
while True:
done, _ = await asyncio.wait({task}, timeout=1.0)
if done:
return task.result()
elapsed = int(time.monotonic() - start_ts)
if elapsed >= next_progress_at:
logger.info(f"[md2img] 阶段进行中: {stage_name}, elapsed={elapsed}s/{timeout_seconds}s")
next_progress_at += progress_interval_seconds
if elapsed >= timeout_seconds:
task.cancel()
raise asyncio.TimeoutError(f"[md2img] 阶段超时: {stage_name}, timeout={timeout_seconds}s")
finally:
if not task.done():
task.cancel()
async def convert_md_str_to_image(
md_content: str,
output_image: str,
max_retries: int = 2,
render_timeout_seconds: int = 90,
html_timeout_seconds: int = 30,
) -> str:
if not md_content:
raise ValueError("Markdown content cannot be empty")
project_root = os.getcwd()
temp_dir = Path(project_root) / "temp" / "md2image"
temp_dir.mkdir(parents=True, exist_ok=True)
output_image_path = temp_dir / output_image
last_error = None
for attempt in range(max_retries):
try:
attempt_no = attempt + 1
logger.debug(
f"尝试第 {attempt_no}/{max_retries} 次生成图片 "
f"(html_timeout={html_timeout_seconds}s, render_timeout={render_timeout_seconds}s)"
)
if output_image_path.exists():
os.remove(str(output_image_path))
stage_start = time.monotonic()
# 阶段一Markdown -> HTML。加超时可避免极端文本导致长期阻塞。
logger.info(f"[md2img] 开始阶段: markdown_to_html, attempt={attempt_no}/{max_retries}")
full_html = await _await_with_progress(
md_str_to_html_content(md_content),
timeout_seconds=max(5, int(html_timeout_seconds)),
stage_name="markdown_to_html",
)
logger.debug(f"{attempt_no} 次 HTML 生成耗时: {time.monotonic() - stage_start:.2f}s")
# 阶段二Playwright 渲染截图。加超时防止浏览器进程异常卡死。
stage_start = time.monotonic()
logger.info(f"[md2img] 开始阶段: html_to_image, attempt={attempt_no}/{max_retries}")
await _await_with_progress(
html_to_image(full_html, str(output_image_path)),
timeout_seconds=max(10, int(render_timeout_seconds)),
stage_name="html_to_image",
)
logger.debug(f"{attempt_no} 次截图耗时: {time.monotonic() - stage_start:.2f}s")
image_size = os.path.getsize(str(output_image_path))
if image_size < 1024:
raise RuntimeError(f"图片生成异常,大小仅为: {image_size} bytes")
logger.info(f"图片成功生成:{output_image_path}")
return str(output_image_path.resolve())
except asyncio.TimeoutError as e:
last_error = RuntimeError(
f"图片生成超时(attempt={attempt_no}/{max_retries}, "
f"html_timeout={html_timeout_seconds}s, render_timeout={render_timeout_seconds}s)"
)
logger.warning(str(last_error))
except Exception as e:
last_error = e
logger.warning(f"{attempt_no} 次尝试失败: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(1.5)
raise RuntimeError(f"图片生成失败,已重试 {max_retries} 次。最后错误: {last_error}")