import subprocess
import time
import markdown
from pathlib import Path
import psutil
from playwright.async_api import async_playwright
import os
import asyncio
import re
from loguru import logger
async def safe_close_browser(browser, timeout: float = 4.0) -> None:
if not browser:
return
for context in browser.contexts[:]:
for page in context.pages[:]:
try:
await asyncio.wait_for(page.close(), timeout=1.5)
except Exception:
pass
try:
await asyncio.wait_for(context.close(), timeout=timeout)
except Exception:
pass
try:
await asyncio.wait_for(browser.close(), timeout=timeout)
logger.debug("browser closed gracefully")
return
except (asyncio.TimeoutError, Exception) as e:
logger.warning(f"browser.close failed: {e}")
if browser.process and browser.process.pid:
try:
parent = psutil.Process(browser.process.pid)
children = parent.children(recursive=True)
for proc in children:
try:
proc.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
try:
parent.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
try:
gone, alive = psutil.wait_procs([parent] + children, timeout=2)
except Exception:
gone, alive = [], [parent] + children
for proc in alive:
try:
proc.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
try:
gone, alive = psutil.wait_procs([parent] + children, timeout=3)
except Exception:
alive = []
if alive:
logger.warning(f"process still alive after kill: {[p.pid for p in alive]}")
else:
logger.debug("process tree terminated")
except (psutil.NoSuchProcess, Exception) as e:
logger.warning(f"force kill failed: {e}")
def _split_hero(html_body: str):
title_match = re.search(r'
(.*?)
', html_body, re.S | re.I)
hero_title = title_match.group(1).strip() if title_match else "聊天总结"
remain = re.sub(r'.*?
', '', html_body, count=1, flags=re.S | re.I).strip()
meta_match = re.match(r'^(.*?)
', remain, re.S | re.I)
hero_meta = meta_match.group(1).strip() if meta_match else "群聊总结 / 自动生成"
if meta_match:
remain = re.sub(r'^.*?
', '', remain, count=1, flags=re.S | re.I).strip()
return hero_title, hero_meta, remain
async def md_str_to_html_content(md_content):
"""将 Markdown 字符串转换为更美观的 HTML 内容。"""
html_body = markdown.markdown(md_content, extensions=['extra', 'codehilite'])
hero_title, hero_meta, remain_html = _split_hero(html_body)
css = """
"""
full_html = f'''
{css}
AI 群聊总结
{hero_title}
{hero_meta}
{remain_html}
'''
return full_html
def check_chromium_installed(path):
return os.path.isfile(path) and os.access(path, os.X_OK)
async def html_to_image(html_content, output_image):
async with async_playwright() as p:
browser_path = None
if os.name == 'nt':
possible_chrome_paths = [
r"C:\Users\Liu_WIN10\AppData\Local\Google\Chrome\Application\chrome.exe",
r"C:\Users\Liu-OPEN\AppData\Local\Google\Chrome\Application\chrome.exe",
r"C:\Program Files\Google\Chrome\Application\chrome.exe",
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe"
]
for path in possible_chrome_paths:
if check_chromium_installed(path):
browser_path = path
break
else:
import glob
user_home = os.path.expanduser("~")
glob_pattern = os.path.join(user_home, ".cache", "ms-playwright", "chromium-*", "chrome-linux", "chrome")
chrome_paths = glob.glob(glob_pattern)
for path in sorted(chrome_paths, reverse=True):
if check_chromium_installed(path):
browser_path = path
break
launch_args = [
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu"
]
if browser_path:
logger.debug(f"Launch chromium with system chrome: {browser_path}")
browser = await p.chromium.launch(executable_path=browser_path, args=launch_args)
else:
logger.debug("Launch chromium with bundled browser")
browser = await p.chromium.launch(args=launch_args)
try:
context = await browser.new_context(
viewport={"width": 780, "height": 960},
device_scale_factor=1.2
)
page = await context.new_page()
logger.debug("Set page content")
await page.set_content(html_content, wait_until='domcontentloaded', timeout=15000)
logger.debug("Wait for fonts ready")
await page.evaluate("document.fonts.ready")
await asyncio.sleep(0.2)
logger.debug(f"Take screenshot: output={output_image}")
await page.screenshot(
path=output_image,
full_page=True,
timeout=15000,
animations="disabled"
)
if not os.path.exists(output_image):
raise RuntimeError(f"截图失败,输出文件不存在: {output_image}")
finally:
logger.debug("Closing browser")
await safe_close_browser(browser)
async def convert_md_str_to_image(md_content: str, output_image: str, max_retries: int = 2) -> str:
if not md_content:
raise ValueError("Markdown content cannot be empty")
project_root = os.getcwd()
temp_dir = Path(project_root) / "temp" / "md2image"
temp_dir.mkdir(parents=True, exist_ok=True)
output_image_path = temp_dir / output_image
last_error = None
for attempt in range(max_retries):
try:
logger.debug(f"尝试第 {attempt + 1}/{max_retries} 次生成图片")
if output_image_path.exists():
os.remove(str(output_image_path))
full_html = await md_str_to_html_content(md_content)
await html_to_image(full_html, str(output_image_path))
image_size = os.path.getsize(str(output_image_path))
if image_size < 1024:
raise RuntimeError(f"图片生成异常,大小仅为: {image_size} bytes")
logger.info(f"图片成功生成:{output_image_path}")
return str(output_image_path.resolve())
except Exception as e:
last_error = e
logger.warning(f"第 {attempt + 1} 次尝试失败: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(1.5)
raise RuntimeError(f"图片生成失败,已重试 {max_retries} 次。最后错误: {last_error}")