Files
abot/utils/markdown_to_image.py

558 lines
20 KiB
Python

import subprocess
import time
from pathlib import Path
import psutil
from playwright.async_api import async_playwright
import os
import asyncio
import re
from loguru import logger
try:
import markdown
except ImportError:
markdown = None
META_KEYWORDS = ["", "群名", "时间", "日期", "成员", "消息", "统计", "总结", "来源", "生成", "记录"]
STAT_PILL_CLASSES = {
"": "total",
"人数": "people",
"文本": "text",
"图片": "image",
"视频": "video",
"链接": "link",
"表情": "emoji",
}
def _extract_stats_pills_from_markdown(md_content: str) -> str:
text = str(md_content or "")
pattern = re.compile(
r"(^##\s+群概览\s*\n)([^\n]+)(?=\n(?:\n|##\s|###\s|$))",
re.M,
)
def replace(match):
stats_line = match.group(2).strip()
parts = [part.strip() for part in stats_line.split("·") if part.strip()]
pills = []
for part in parts:
item_match = re.match(r"(?:\*\*)?([^*\s]+)(?:\*\*)?\s+(\d+)", part)
if not item_match:
continue
label = item_match.group(1).strip()
value = item_match.group(2).strip()
kind = STAT_PILL_CLASSES.get(label, "default")
pills.append(
f'<span class="stats-pill stats-pill-{kind}"><span class="stats-pill-label">{label}</span><span class="stats-pill-value">{value}</span></span>'
)
if not pills:
return match.group(0)
return match.group(1) + f'<div class="stats-pills">{"".join(pills)}</div>'
return pattern.sub(replace, text, count=1)
def _simple_markdown_to_html(md_content: str) -> str:
lines = str(md_content or "").splitlines()
html_parts = []
in_ul = False
paragraph_lines = []
def flush_paragraph():
nonlocal paragraph_lines
if paragraph_lines:
text = " ".join(item.strip() for item in paragraph_lines if item.strip())
if text:
html_parts.append(f"<p>{text}</p>")
paragraph_lines = []
def close_ul():
nonlocal in_ul
if in_ul:
html_parts.append("</ul>")
in_ul = False
for raw_line in lines:
line = raw_line.rstrip()
stripped = line.strip()
if not stripped:
flush_paragraph()
close_ul()
continue
if stripped.startswith("# "):
flush_paragraph()
close_ul()
html_parts.append(f"<h1>{stripped[2:].strip()}</h1>")
continue
if stripped.startswith("## "):
flush_paragraph()
close_ul()
html_parts.append(f"<h2>{stripped[3:].strip()}</h2>")
continue
if stripped.startswith("### "):
flush_paragraph()
close_ul()
html_parts.append(f"<h3>{stripped[4:].strip()}</h3>")
continue
if stripped.startswith("<div ") and stripped.endswith("</div>"):
flush_paragraph()
close_ul()
html_parts.append(stripped)
continue
if stripped.startswith("- "):
flush_paragraph()
if not in_ul:
html_parts.append("<ul>")
in_ul = True
html_parts.append(f"<li>{stripped[2:].strip()}</li>")
continue
close_ul()
paragraph_lines.append(stripped)
flush_paragraph()
close_ul()
return "\n".join(html_parts)
async def safe_close_browser(browser, timeout: float = 4.0) -> None:
if not browser:
return
for context in browser.contexts[:]:
for page in context.pages[:]:
try:
await asyncio.wait_for(page.close(), timeout=1.5)
except Exception:
pass
try:
await asyncio.wait_for(context.close(), timeout=timeout)
except Exception:
pass
try:
await asyncio.wait_for(browser.close(), timeout=timeout)
logger.debug("browser closed gracefully")
return
except (asyncio.TimeoutError, Exception) as e:
logger.warning(f"browser.close failed: {e}")
if browser.process and browser.process.pid:
try:
parent = psutil.Process(browser.process.pid)
children = parent.children(recursive=True)
for proc in children:
try:
proc.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
try:
parent.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
try:
gone, alive = psutil.wait_procs([parent] + children, timeout=2)
except Exception:
gone, alive = [], [parent] + children
for proc in alive:
try:
proc.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
try:
gone, alive = psutil.wait_procs([parent] + children, timeout=3)
except Exception:
alive = []
if alive:
logger.warning(f"process still alive after kill: {[p.pid for p in alive]}")
else:
logger.debug("process tree terminated")
except (psutil.NoSuchProcess, Exception) as e:
logger.warning(f"force kill failed: {e}")
def _clean_text(html: str) -> str:
return re.sub(r'\s+', ' ', re.sub(r'<.*?>', ' ', html)).strip()
def _looks_like_meta(html: str) -> bool:
clean = _clean_text(html)
if not clean:
return False
if any(k in clean for k in META_KEYWORDS):
return True
return len(clean) <= 80
def _split_hero(html_body: str):
title_match = re.search(r'<h1>(.*?)</h1>', html_body, re.S | re.I)
hero_title = _clean_text(title_match.group(1)) if title_match else "聊天总结"
remain = re.sub(r'<h1>.*?</h1>', '', html_body, count=1, flags=re.S | re.I).strip()
block_pattern = re.compile(r'^\s*(<(?:p|blockquote|ul|ol)[^>]*>.*?</(?:p|blockquote|ul|ol)>)', re.S | re.I)
meta_blocks = []
for _ in range(4):
m = block_pattern.match(remain)
if not m:
break
block = m.group(1)
if not _looks_like_meta(block):
break
meta_blocks.append(block.strip())
remain = remain[m.end():].strip()
hero_meta = ''.join(meta_blocks)
hero_enabled = bool(title_match or meta_blocks)
return hero_title, hero_meta, remain, hero_enabled
async def md_str_to_html_content(md_content):
md_content = _extract_stats_pills_from_markdown(md_content)
if markdown is not None:
html_body = markdown.markdown(md_content, extensions=['extra', 'codehilite'])
else:
html_body = _simple_markdown_to_html(md_content)
hero_title, hero_meta, remain_html, hero_enabled = _split_hero(html_body)
css = """
<style>
:root {
--bg1: #0f172a;
--bg2: #172554;
--paper: #ffffff;
--text: #233044;
--muted: #c7d2e3;
--muted-2: rgba(235, 241, 255, 0.82);
--primary: #8b7cff;
--primary-soft: rgba(109,94,252,0.10);
--accent: #22c3b5;
--line: rgba(148,163,184,0.18);
--code-bg: #0f172a;
--code-fg: #e2e8f0;
--quote-bg: rgba(20,184,166,0.08);
--shadow: 0 20px 45px rgba(80, 84, 125, 0.10);
}
* { box-sizing: border-box; }
html, body { margin: 0; padding: 0; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', 'Noto Sans CJK SC', 'Microsoft YaHei', sans-serif;
color: var(--text);
font-size: 16px;
line-height: 1.78;
background:
radial-gradient(circle at top left, rgba(126, 93, 255, 0.14) 0%, transparent 28%),
radial-gradient(circle at top right, rgba(34, 195, 181, 0.12) 0%, transparent 24%),
linear-gradient(180deg, #eef4fb 0%, #e8f0f8 100%);
padding: 26px;
}
.wrap {
max-width: 820px;
margin: 0 auto;
background: rgba(255,255,255,0.97);
border: 1px solid rgba(255,255,255,0.7);
border-radius: 28px;
box-shadow: var(--shadow);
overflow: hidden;
}
.hero {
position: relative;
padding: 30px 34px 24px;
background:
radial-gradient(circle at 18% 18%, rgba(255,255,255,0.10) 0%, transparent 18%),
radial-gradient(circle at 85% 22%, rgba(255,255,255,0.12) 0%, transparent 20%),
linear-gradient(135deg, #1e1b4b 0%, #1d4ed8 52%, #0f766e 100%);
border-bottom: 1px solid rgba(255,255,255,0.08);
}
.hero::before {
content: "";
position: absolute;
inset: 0;
background:
linear-gradient(125deg, rgba(255,255,255,0.05) 0%, transparent 38%),
linear-gradient(300deg, rgba(255,255,255,0.04) 0%, transparent 30%);
pointer-events: none;
}
.hero::after {
content: "";
position: absolute;
right: -40px;
top: -36px;
width: 200px;
height: 200px;
border-radius: 50%;
border: 1px solid rgba(255,255,255,0.12);
box-shadow: 0 0 0 24px rgba(255,255,255,0.04), 0 0 0 56px rgba(255,255,255,0.025);
opacity: 0.9;
pointer-events: none;
}
.hero-badge {
position: relative;
display: inline-block;
padding: 6px 12px;
border-radius: 999px;
font-size: 12px;
color: #f8faff;
background: rgba(255,255,255,0.12);
border: 1px solid rgba(255,255,255,0.18);
margin-bottom: 14px;
letter-spacing: .06em;
}
.hero-title {
position: relative;
margin: 0;
font-size: 2.05em;
line-height: 1.28;
font-weight: 800;
color: #ffffff;
text-align: center;
letter-spacing: -0.02em;
text-shadow: 0 2px 10px rgba(0,0,0,0.12);
}
.hero-meta {
position: relative;
margin: 12px auto 0;
max-width: 660px;
text-align: center;
color: var(--muted-2);
font-size: 0.84em;
line-height: 1.72;
}
.hero-meta p, .hero-meta blockquote, .hero-meta ul, .hero-meta ol {
margin: 4px 0;
color: inherit;
background: transparent;
border: none;
padding: 0;
}
.hero-meta ul, .hero-meta ol { list-style: none; padding-left: 0; }
.content { padding: 24px 34px 34px; }
h1, h2, h3, h4, h5, h6 {
color: var(--text);
margin-top: 24px;
margin-bottom: 14px;
font-weight: 700;
line-height: 1.35;
letter-spacing: -0.01em;
}
.content.hero-active h1:first-of-type { display: none; }
h2 {
font-size: 1.42em;
margin-top: 30px;
padding: 10px 14px;
background: linear-gradient(90deg, var(--primary-soft), rgba(255,255,255,0));
border-left: 4px solid var(--primary);
border-radius: 12px;
}
h3 {
font-size: 1.15em;
margin-top: 24px;
color: #30435f;
padding-left: 12px;
border-left: 3px solid rgba(20,184,166,0.55);
}
p { margin: 14px 0; color: #334155; line-height: 1.88; }
ul, ol { padding-left: 26px; margin: 14px 0 18px; }
li { margin: 8px 0; color: #334155; }
li::marker { color: var(--primary); }
strong { color: #1e293b; font-weight: 700; }
em { color: #5b6b84; }
code {
background: rgba(109,94,252,0.08);
color: #5b3df5;
padding: 2px 8px;
border-radius: 8px;
font-size: 0.92em;
border: 1px solid rgba(109,94,252,0.10);
}
pre {
background: var(--code-bg);
color: var(--code-fg);
padding: 16px 18px;
border-radius: 16px;
overflow-x: auto;
border: 1px solid rgba(255,255,255,0.06);
box-shadow: inset 0 1px 0 rgba(255,255,255,0.03);
}
pre code { background: transparent; color: inherit; border: none; padding: 0; }
table {
border-collapse: separate;
border-spacing: 0;
width: 100%;
margin: 20px 0;
background: rgba(255,255,255,0.96);
border: 1px solid rgba(148,163,184,0.16);
border-radius: 16px;
overflow: hidden;
box-shadow: 0 8px 24px rgba(15,23,42,0.05);
}
th, td { padding: 12px 14px; text-align: left; border-bottom: 1px solid rgba(148,163,184,0.12); }
tr:last-child td { border-bottom: none; }
th { background: linear-gradient(180deg, rgba(109,94,252,0.10), rgba(109,94,252,0.04)); color: #334155; font-weight: 700; }
blockquote {
margin: 18px 0;
padding: 14px 18px;
background: var(--quote-bg);
border: 1px solid rgba(20,184,166,0.16);
border-left: 5px solid var(--accent);
border-radius: 14px;
color: #355468;
}
.stats-pills {
display: flex;
flex-wrap: wrap;
gap: 10px;
margin: 12px 0 8px;
}
.stats-pill {
display: inline-flex;
align-items: center;
gap: 8px;
padding: 7px 12px;
border-radius: 999px;
font-size: 0.92em;
line-height: 1;
border: 1px solid rgba(148,163,184,0.16);
background: linear-gradient(180deg, rgba(255,255,255,0.96), rgba(248,250,252,0.92));
color: #334155;
box-shadow: 0 8px 18px rgba(15,23,42,0.05);
}
.stats-pill-label {
display: inline-flex;
align-items: center;
justify-content: center;
padding: 4px 8px;
border-radius: 999px;
font-size: 0.82em;
font-weight: 700;
color: #ffffff;
background: linear-gradient(135deg, #64748b, #475569);
}
.stats-pill-value {
font-weight: 800;
color: #1e293b;
min-width: 20px;
}
.stats-pill-total .stats-pill-label { background: linear-gradient(135deg, #3b82f6, #1d4ed8); }
.stats-pill-people .stats-pill-label { background: linear-gradient(135deg, #0f766e, #14b8a6); }
.stats-pill-text .stats-pill-label { background: linear-gradient(135deg, #8b5cf6, #7c3aed); }
.stats-pill-image .stats-pill-label { background: linear-gradient(135deg, #ec4899, #db2777); }
.stats-pill-video .stats-pill-label { background: linear-gradient(135deg, #f97316, #ea580c); }
.stats-pill-link .stats-pill-label { background: linear-gradient(135deg, #22c55e, #16a34a); }
.stats-pill-emoji .stats-pill-label { background: linear-gradient(135deg, #eab308, #ca8a04); }
hr { border: none; height: 1px; background: linear-gradient(90deg, transparent, rgba(148,163,184,0.35), transparent); margin: 26px 0; }
a { color: var(--primary); text-decoration: none; border-bottom: 1px dashed rgba(109,94,252,0.35); }
.signature { margin-top: 34px; text-align: right; color: #73849c; font-size: 0.95em; font-style: italic; }
</style>
"""
hero_html = ''
content_class = 'content hero-active' if hero_enabled else 'content'
if hero_enabled:
hero_html = f'''
<div class="hero">
<div class="hero-badge">AI 群聊总结</div>
<h1 class="hero-title">{hero_title}</h1>
<div class="hero-meta">{hero_meta}</div>
</div>'''
full_html = f'''<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
{css}
</head>
<body>
<div class="wrap">{hero_html}
<div class="{content_class}">
{remain_html if hero_enabled else html_body}
</div>
</div>
</body>
</html>'''
return full_html
def check_chromium_installed(path):
return os.path.isfile(path) and os.access(path, os.X_OK)
async def html_to_image(html_content, output_image):
async with async_playwright() as p:
browser_path = None
if os.name == 'nt':
possible_chrome_paths = [
r"C:\Users\Liu_WIN10\AppData\Local\Google\Chrome\Application\chrome.exe",
r"C:\Users\Liu-OPEN\AppData\Local\Google\Chrome\Application\chrome.exe",
r"C:\Program Files\Google\Chrome\Application\chrome.exe",
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe"
]
for path in possible_chrome_paths:
if check_chromium_installed(path):
browser_path = path
break
else:
import glob
user_home = os.path.expanduser("~")
glob_pattern = os.path.join(user_home, ".cache", "ms-playwright", "chromium-*", "chrome-linux", "chrome")
chrome_paths = glob.glob(glob_pattern)
for path in sorted(chrome_paths, reverse=True):
if check_chromium_installed(path):
browser_path = path
break
launch_args = ["--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-gpu"]
if browser_path:
logger.debug(f"Launch chromium with system chrome: {browser_path}")
browser = await p.chromium.launch(executable_path=browser_path, args=launch_args)
else:
logger.debug("Launch chromium with bundled browser")
browser = await p.chromium.launch(args=launch_args)
try:
context = await browser.new_context(viewport={"width": 780, "height": 960}, device_scale_factor=1.2)
page = await context.new_page()
logger.debug("Set page content")
await page.set_content(html_content, wait_until='domcontentloaded', timeout=15000)
logger.debug("Wait for fonts ready")
await page.evaluate("document.fonts.ready")
await asyncio.sleep(0.2)
logger.debug(f"Take screenshot: output={output_image}")
await page.screenshot(path=output_image, full_page=True, timeout=15000, animations="disabled")
if not os.path.exists(output_image):
raise RuntimeError(f"截图失败,输出文件不存在: {output_image}")
finally:
logger.debug("Closing browser")
await safe_close_browser(browser)
async def convert_md_str_to_image(md_content: str, output_image: str, max_retries: int = 2) -> str:
if not md_content:
raise ValueError("Markdown content cannot be empty")
project_root = os.getcwd()
temp_dir = Path(project_root) / "temp" / "md2image"
temp_dir.mkdir(parents=True, exist_ok=True)
output_image_path = temp_dir / output_image
last_error = None
for attempt in range(max_retries):
try:
logger.debug(f"尝试第 {attempt + 1}/{max_retries} 次生成图片")
if output_image_path.exists():
os.remove(str(output_image_path))
full_html = await md_str_to_html_content(md_content)
await html_to_image(full_html, str(output_image_path))
image_size = os.path.getsize(str(output_image_path))
if image_size < 1024:
raise RuntimeError(f"图片生成异常,大小仅为: {image_size} bytes")
logger.info(f"图片成功生成:{output_image_path}")
return str(output_image_path.resolve())
except Exception as e:
last_error = e
logger.warning(f"{attempt + 1} 次尝试失败: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(1.5)
raise RuntimeError(f"图片生成失败,已重试 {max_retries} 次。最后错误: {last_error}")