From 34230e60ab5a602e340eb71301b09a2a43f0df1c Mon Sep 17 00:00:00 2001 From: liuwei Date: Thu, 29 Jan 2026 10:32:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96markdown=20to=5Fimage=20?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugins/ai_auto_response/main.py | 4 +- utils/markdown_to_image.py | 459 +++++++------------------------ 2 files changed, 108 insertions(+), 355 deletions(-) diff --git a/plugins/ai_auto_response/main.py b/plugins/ai_auto_response/main.py index ec82681..59b0c7c 100644 --- a/plugins/ai_auto_response/main.py +++ b/plugins/ai_auto_response/main.py @@ -54,7 +54,7 @@ class AIAutoResponsePlugin(MessagePluginInterface): super().__init__() self.intervention_bot = None self.group_messages = {} # 存储每个群的最近消息 - self.max_messages = 20 # 每个群最多存储的消息数量 + self.max_messages = 100 # 每个群最多存储的消息数量 # 注册功能权限 self.feature = self.register_feature() @@ -164,7 +164,7 @@ class AIAutoResponsePlugin(MessagePluginInterface): if response: # 发送回复 await bot.send_text_message(roomid, response, sender) - return True, "自动回复成功" + return False, "自动回复成功" else: return False, "生成回复失败" diff --git a/utils/markdown_to_image.py b/utils/markdown_to_image.py index 53e2d13..9566302 100644 --- a/utils/markdown_to_image.py +++ b/utils/markdown_to_image.py @@ -1,427 +1,180 @@ import subprocess import time - import markdown from pathlib import Path from playwright.async_api import async_playwright import os import asyncio -import aiofiles - from loguru import logger -# linux 下需要安装字体 -# sudo apt-get install -y fonts-noto-cjk fonts-noto-cjk-extra -# sudo apt-get install -y fonts-noto-color-emoji fonts-noto-cjk fonts-wqy-microhei -# 将 Markdown 字符串转换为 HTML -async def md_str_to_html(md_content, output_html): - """ - 将 Markdown 字符串转换为 HTML 文件,并添加支持中文和 Emoji 的样式(异步版本)。 +# ================= 样式与 HTML 处理 ================= - :param md_content: 输入的 Markdown 字符串 - :param output_html: 输出的 HTML 文件路径 +async def md_str_to_html_content(md_content): """ - # 转换 Markdown 为 HTML,启用额外功能(如表格、代码高亮) - html_content = markdown.markdown(md_content, extensions=['extra', 'codehilite']) + 将 Markdown 字符串转换为 HTML 内容字符串(逻辑保持不变)。 + """ + # 转换 Markdown 为 HTML + html_body = markdown.markdown(md_content, extensions=['extra', 'codehilite']) - # 添加基本的 HTML 结构和样式,支持中文和 Emoji + # 保持你原有的 CSS 样式不变 css = """ """ - # 构建完整的 HTML 内容 full_html = f''' + {css} - {html_content} + {html_body} ''' + return full_html - # 使用普通的文件写入,确保文件完全写入 - try: - with open(output_html, 'w', encoding='utf-8') as f: - f.write(full_html) - f.flush() # 强制刷新缓冲区 - os.fsync(f.fileno()) # 确保写入磁盘 - except Exception as e: - logger.error(f"写入HTML文件失败: {e}") - raise - - # 验证文件是否成功写入 - try: - with open(output_html, 'r', encoding='utf-8') as f: - content = f.read() - if not content: - raise ValueError("HTML文件写入后为空") - except Exception as e: - logger.error(f"验证HTML文件失败: {e}") - raise - - # 添加小延时确保文件系统同步 - await asyncio.sleep(0.5) +# ================= 浏览器与图片处理 ================= def check_chromium_installed(path): return os.path.isfile(path) and os.access(path, os.X_OK) -async def html_to_image(html_file, output_image): +async def html_to_image(html_content, output_image): """ - 使用 Playwright 加载 HTML 文件并截图(异步)。 + 优化版:直接注入 HTML 字符串生成图片。 """ - # 验证输入文件是否存在 - if not os.path.exists(html_file): - raise FileNotFoundError(f"HTML文件不存在: {html_file}") - - # 验证输入文件是否可读 - if not os.access(html_file, os.R_OK): - raise PermissionError(f"HTML文件不可读: {html_file}") - - try: - async with async_playwright() as p: - browser_path = None + async with async_playwright() as p: + browser_path = None + # 保持你原有的浏览器路径搜索逻辑 + if os.name == 'nt': + possible_chrome_paths = [ + r"C:\Users\Liu_WIN10\AppData\Local\Google\Chrome\Application\chrome.exe", + r"C:\Users\Liu-OPEN\AppData\Local\Google\Chrome\Application\chrome.exe", + r"C:\Program Files\Google\Chrome\Application\chrome.exe", + r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe" + ] + for path in possible_chrome_paths: + if check_chromium_installed(path): + browser_path = path + break + else: + import glob + user_home = os.path.expanduser("~") + glob_pattern = os.path.join(user_home, ".cache", "ms-playwright", "chromium-*", "chrome-linux", "chrome") + chrome_paths = glob.glob(glob_pattern) + for path in sorted(chrome_paths, reverse=True): + if check_chromium_installed(path): + browser_path = path + break - if os.name == 'nt': # Windows - possible_chrome_paths = [ - r"C:\Users\Liu_WIN10\AppData\Local\Google\Chrome\Application\chrome.exe", - r"C:\Users\Liu-OPEN\AppData\Local\Google\Chrome\Application\chrome.exe", - r"C:\Program Files\Google\Chrome\Application\chrome.exe", - r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe" - ] - for path in possible_chrome_paths: - if check_chromium_installed(path): - browser_path = path - logger.debug(f"找到浏览器路径: {browser_path}") - break - else: # Linux - import glob - user_home = os.path.expanduser("~") - glob_pattern = os.path.join(user_home, ".cache", "ms-playwright", "chromium-*", "chrome-linux", - "chrome") - chrome_paths = glob.glob(glob_pattern) - browser_path = None - for path in sorted(chrome_paths, reverse=True): # 按版本名排序,最新优先 - if check_chromium_installed(path): - browser_path = path - logger.debug(f"找到 Playwright Chromium 路径: {browser_path}") - break + # 启动浏览器,添加关键的稳定性参数 + launch_args = [ + "--no-sandbox", + "--disable-setuid-sandbox", + "--disable-dev-shm-usage" # 解决 Linux 内存共享问题 + ] - if not browser_path: - logger.debug("未找到已安装的 Chromium 浏览器,尝试使用 Playwright 默认安装") - try: - logger.debug("正在安装 Playwright 浏览器...") - subprocess.run(["playwright", "install", "chromium"], check=True) - logger.debug("Playwright 浏览器安装完成") - except Exception as install_error: - logger.debug(f"安装 Playwright 浏览器失败: {install_error}") + if browser_path: + browser = await p.chromium.launch(executable_path=browser_path, args=launch_args) + else: + browser = await p.chromium.launch(args=launch_args) - browser = await p.chromium.launch() # 使用默认路径 - else: - browser = await p.chromium.launch(executable_path=browser_path) + try: + # 使用更高的 device_scale_factor 可以让图片更清晰 + context = await browser.new_context(device_scale_factor=2) + page = await context.new_page() - # 业务逻辑不变 - page = None - try: - page = await browser.new_page() - - # 设置更长的超时时间,并添加更好的错误处理 - file_url = f'file://{os.path.abspath(html_file)}' - logger.debug(f"正在加载文件: {file_url}") - - # 使用更长的超时时间和更宽松的等待条件 - await page.goto(file_url, timeout=120000, wait_until='domcontentloaded') - - # 等待页面完全加载 - await page.wait_for_timeout(2000) - - # 设置视口大小 - await page.set_viewport_size({"width": 750, "height": 800}) - - # 再次等待确保渲染完成 - await page.wait_for_timeout(1000) - - # 截图 - await page.screenshot(path=output_image, full_page=True) - - # 验证图片文件是否成功生成 - if not os.path.exists(output_image): - raise RuntimeError(f"截图失败,输出文件不存在: {output_image}") - - logger.debug(f"截图成功生成: {output_image}") - - except Exception as e: - logger.error(f"截图过程中发生错误: {e}") - # 如果截图失败,确保删除可能的不完整文件 - if os.path.exists(output_image): - try: - os.remove(output_image) - logger.debug(f"已删除不完整的截图文件: {output_image}") - except Exception as cleanup_error: - logger.warning(f"清理不完整文件失败: {cleanup_error}") - raise - finally: - if page: - await page.close() - await browser.close() + # 设置视口宽度,高度暂设大一点以便内容铺开 + await page.set_viewport_size({"width": 750, "height": 1000}) - except Exception as e: - logger.error(f"浏览器操作失败: {e}") - if "Executable doesn't exist" in str(e): - logger.error("请运行 'playwright install' 命令安装必要的浏览器组件") - raise + # 【优化核心】:直接设置 HTML 内容,不走 file:// 协议 + # 这样可以彻底避免文件读取超时 + await page.set_content(html_content, wait_until='load') + + # 稍微等待一下确保 CSS 渲染完成 + await asyncio.sleep(0.5) + + # 截图(full_page=True 会自动处理高度) + await page.screenshot(path=output_image, full_page=True) + + if not os.path.exists(output_image): + raise RuntimeError(f"截图失败,输出文件不存在: {output_image}") + + finally: + await browser.close() -# 主函数:从字符串转换 Markdown 到图片(异步版) +# ================= 主转换函数 ================= + async def convert_md_str_to_image(md_content: str, output_image: str, max_retries: int = 3) -> str: """ - 将 Markdown 字符串转换为图片(异步)。 - - Args: - md_content (str): Markdown 内容字符串 - output_image (str): 输出图片的文件名(不含路径) - max_retries (int): 最大重试次数,默认3次 - - Returns: - str: 生成的图片文件的绝对路径 - - Raises: - FileNotFoundError: 如果临时目录无法创建或访问 - ValueError: 如果 md_content 为空 - RuntimeError: 如果重试次数耗尽后仍然失败 + 主函数:从字符串转换 Markdown 到图片(异步版)。 """ - # 验证输入 if not md_content: raise ValueError("Markdown content cannot be empty") - # 获取项目根目录 + # 路径准备 project_root = os.getcwd() - project_root_path = Path(project_root).resolve() - - # 创建临时目录 - temp/md2image - temp_dir = project_root_path / "temp" / "md2image" - try: - temp_dir.mkdir(parents=True, exist_ok=True) - except Exception as e: - logger.error(f"Failed to create temp directory: {e}") - raise FileNotFoundError(f"Could not create temp directory: {temp_dir}") - - # 生成唯一的临时文件名 - timestamp = int(time.time()) - temp_html_filename = f"temp_output_{timestamp}.html" - temp_html_path = temp_dir / temp_html_filename + temp_dir = Path(project_root) / "temp" / "md2image" + temp_dir.mkdir(parents=True, exist_ok=True) output_image_path = temp_dir / output_image - # 确保输出图片路径的父目录存在 - output_image_path.parent.mkdir(parents=True, exist_ok=True) - last_error = None - + for attempt in range(max_retries): try: logger.debug(f"尝试第 {attempt + 1}/{max_retries} 次生成图片") - - # 清理之前的临时文件(如果存在) - if temp_html_path.exists(): - os.remove(str(temp_html_path)) + if output_image_path.exists(): os.remove(str(output_image_path)) - - # 将 Markdown 转换为 HTML - await md_str_to_html(md_content, str(temp_html_path)) - # 添加更长的等待时间确保文件系统同步 - await asyncio.sleep(1.0) - - # 检查文件是否存在和可读 - if not os.path.exists(str(temp_html_path)): - raise FileNotFoundError(f"HTML文件不存在: {temp_html_path}") - - # 验证HTML文件内容 - with open(str(temp_html_path), 'r', encoding='utf-8') as f: - html_content = f.read() - if len(html_content) < 100: # HTML文件太短,可能有问题 - raise ValueError(f"HTML文件内容异常,长度仅为: {len(html_content)}") - - logger.debug(f"HTML文件验证通过,大小: {len(html_content)} 字符") - - # 将 HTML 转换为图片 - await html_to_image(str(temp_html_path), str(output_image_path)) + # 1. 直接获取生成的 HTML 字符串,不再写临时文件 + full_html = await md_str_to_html_content(md_content) - # 验证生成的图片文件 - if not os.path.exists(str(output_image_path)): - raise RuntimeError(f"图片文件生成失败,文件不存在: {output_image_path}") - - # 检查图片文件大小 + # 2. 转换图片 + await html_to_image(full_html, str(output_image_path)) + + # 3. 验证 image_size = os.path.getsize(str(output_image_path)) - if image_size < 1024: # 小于1KB的图片可能有问题 - raise RuntimeError(f"生成的图片文件异常,大小仅为: {image_size} bytes") + if image_size < 1024: + raise RuntimeError(f"图片生成异常,大小仅为: {image_size} bytes") - logger.debug(f"图片已成功生成:{output_image_path},大小: {image_size} bytes") + logger.info(f"图片成功生成:{output_image_path}") return str(output_image_path.resolve()) except Exception as e: last_error = e logger.warning(f"第 {attempt + 1} 次尝试失败: {e}") - - # 清理失败的文件 - try: - if temp_html_path.exists(): - os.remove(str(temp_html_path)) - if output_image_path.exists(): - os.remove(str(output_image_path)) - except Exception as cleanup_error: - logger.warning(f"清理临时文件失败: {cleanup_error}") - - # 如果不是最后一次尝试,等待一段时间后重试 if attempt < max_retries - 1: - wait_time = (attempt + 1) * 2 # 递增等待时间 - logger.debug(f"等待 {wait_time} 秒后重试...") - await asyncio.sleep(wait_time) - - # 所有重试都失败了 - logger.error(f"经过 {max_retries} 次尝试后仍然失败") - raise RuntimeError(f"图片生成失败,已重试 {max_retries} 次。最后错误: {last_error}") - - # finally: - # # 可选:清理临时 HTML 文件 - # if temp_html_path.exists(): - # try: - # # 使用异步方式删除文件 - # # await asyncio.to_thread(os.remove, str(temp_html_path)) - # # logger.debug(f"Deleted temporary HTML file: {temp_html_path}") - # except Exception as e: - # logger.warning(f"Failed to delete temporary HTML file: {e}") + await asyncio.sleep((attempt + 1) * 2) + + raise RuntimeError(f"图片生成失败,已重试 {max_retries} 次。最后错误: {last_error}") + -# 示例使用 if __name__ == "__main__": # 示例 Markdown 字符串(包含中文和 Emoji) md_content = """#🌟「4KED康复训练群 - 05-30 总结」🌟