import subprocess import time import markdown from pathlib import Path import psutil from playwright.async_api import async_playwright import os import asyncio import re from loguru import logger META_KEYWORDS = ["群", "群名", "时间", "日期", "成员", "消息", "统计", "总结", "来源", "生成", "记录"] async def safe_close_browser(browser, timeout: float = 4.0) -> None: if not browser: return for context in browser.contexts[:]: for page in context.pages[:]: try: await asyncio.wait_for(page.close(), timeout=1.5) except Exception: pass try: await asyncio.wait_for(context.close(), timeout=timeout) except Exception: pass try: await asyncio.wait_for(browser.close(), timeout=timeout) logger.debug("browser closed gracefully") return except (asyncio.TimeoutError, Exception) as e: logger.warning(f"browser.close failed: {e}") if browser.process and browser.process.pid: try: parent = psutil.Process(browser.process.pid) children = parent.children(recursive=True) for proc in children: try: proc.terminate() except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass try: parent.terminate() except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass try: gone, alive = psutil.wait_procs([parent] + children, timeout=2) except Exception: gone, alive = [], [parent] + children for proc in alive: try: proc.kill() except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass try: gone, alive = psutil.wait_procs([parent] + children, timeout=3) except Exception: alive = [] if alive: logger.warning(f"process still alive after kill: {[p.pid for p in alive]}") else: logger.debug("process tree terminated") except (psutil.NoSuchProcess, Exception) as e: logger.warning(f"force kill failed: {e}") def _clean_text(html: str) -> str: return re.sub(r'\s+', ' ', re.sub(r'<.*?>', ' ', html)).strip() def _looks_like_meta(html: str) -> bool: clean = _clean_text(html) if not clean: return False if any(k in clean for k in META_KEYWORDS): return True return len(clean) <= 80 def _split_hero(html_body: str): title_match = re.search(r'

(.*?)

', html_body, re.S | re.I) hero_title = _clean_text(title_match.group(1)) if title_match else "聊天总结" remain = re.sub(r'

.*?

', '', html_body, count=1, flags=re.S | re.I).strip() block_pattern = re.compile(r'^\s*(<(?:p|blockquote|ul|ol)[^>]*>.*?)', re.S | re.I) meta_blocks = [] for _ in range(4): m = block_pattern.match(remain) if not m: break block = m.group(1) if not _looks_like_meta(block): break meta_blocks.append(block.strip()) remain = remain[m.end():].strip() hero_meta = ''.join(meta_blocks) hero_enabled = bool(title_match or meta_blocks) return hero_title, hero_meta, remain, hero_enabled async def md_str_to_html_content(md_content): html_body = markdown.markdown(md_content, extensions=['extra', 'codehilite']) hero_title, hero_meta, remain_html, hero_enabled = _split_hero(html_body) css = """ """ hero_html = '' content_class = 'content hero-active' if hero_enabled else 'content' if hero_enabled: hero_html = f'''
AI 群聊总结

{hero_title}

{hero_meta}
''' full_html = f''' {css}
{hero_html}
{remain_html if hero_enabled else html_body}
''' return full_html def check_chromium_installed(path): return os.path.isfile(path) and os.access(path, os.X_OK) async def html_to_image(html_content, output_image): async with async_playwright() as p: browser_path = None if os.name == 'nt': possible_chrome_paths = [ r"C:\Users\Liu_WIN10\AppData\Local\Google\Chrome\Application\chrome.exe", r"C:\Users\Liu-OPEN\AppData\Local\Google\Chrome\Application\chrome.exe", r"C:\Program Files\Google\Chrome\Application\chrome.exe", r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe" ] for path in possible_chrome_paths: if check_chromium_installed(path): browser_path = path break else: import glob user_home = os.path.expanduser("~") glob_pattern = os.path.join(user_home, ".cache", "ms-playwright", "chromium-*", "chrome-linux", "chrome") chrome_paths = glob.glob(glob_pattern) for path in sorted(chrome_paths, reverse=True): if check_chromium_installed(path): browser_path = path break launch_args = ["--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-gpu"] if browser_path: logger.debug(f"Launch chromium with system chrome: {browser_path}") browser = await p.chromium.launch(executable_path=browser_path, args=launch_args) else: logger.debug("Launch chromium with bundled browser") browser = await p.chromium.launch(args=launch_args) try: context = await browser.new_context(viewport={"width": 780, "height": 960}, device_scale_factor=1.2) page = await context.new_page() logger.debug("Set page content") await page.set_content(html_content, wait_until='domcontentloaded', timeout=15000) logger.debug("Wait for fonts ready") await page.evaluate("document.fonts.ready") await asyncio.sleep(0.2) logger.debug(f"Take screenshot: output={output_image}") await page.screenshot(path=output_image, full_page=True, timeout=15000, animations="disabled") if not os.path.exists(output_image): raise RuntimeError(f"截图失败,输出文件不存在: {output_image}") finally: logger.debug("Closing browser") await safe_close_browser(browser) async def convert_md_str_to_image(md_content: str, output_image: str, max_retries: int = 2) -> str: if not md_content: raise ValueError("Markdown content cannot be empty") project_root = os.getcwd() temp_dir = Path(project_root) / "temp" / "md2image" temp_dir.mkdir(parents=True, exist_ok=True) output_image_path = temp_dir / output_image last_error = None for attempt in range(max_retries): try: logger.debug(f"尝试第 {attempt + 1}/{max_retries} 次生成图片") if output_image_path.exists(): os.remove(str(output_image_path)) full_html = await md_str_to_html_content(md_content) await html_to_image(full_html, str(output_image_path)) image_size = os.path.getsize(str(output_image_path)) if image_size < 1024: raise RuntimeError(f"图片生成异常,大小仅为: {image_size} bytes") logger.info(f"图片成功生成:{output_image_path}") return str(output_image_path.resolve()) except Exception as e: last_error = e logger.warning(f"第 {attempt + 1} 次尝试失败: {e}") if attempt < max_retries - 1: await asyncio.sleep(1.5) raise RuntimeError(f"图片生成失败,已重试 {max_retries} 次。最后错误: {last_error}")