优化markdown to_image 逻辑

This commit is contained in:
liuwei
2026-01-29 10:32:01 +08:00
parent 6b3b43ffc3
commit 34230e60ab
2 changed files with 108 additions and 355 deletions

View File

@@ -54,7 +54,7 @@ class AIAutoResponsePlugin(MessagePluginInterface):
super().__init__()
self.intervention_bot = None
self.group_messages = {} # 存储每个群的最近消息
self.max_messages = 20 # 每个群最多存储的消息数量
self.max_messages = 100 # 每个群最多存储的消息数量
# 注册功能权限
self.feature = self.register_feature()
@@ -164,7 +164,7 @@ class AIAutoResponsePlugin(MessagePluginInterface):
if response:
# 发送回复
await bot.send_text_message(roomid, response, sender)
return True, "自动回复成功"
return False, "自动回复成功"
else:
return False, "生成回复失败"

View File

@@ -1,427 +1,180 @@
import subprocess
import time
import markdown
from pathlib import Path
from playwright.async_api import async_playwright
import os
import asyncio
import aiofiles
from loguru import logger
# linux 下需要安装字体
# sudo apt-get install -y fonts-noto-cjk fonts-noto-cjk-extra
# sudo apt-get install -y fonts-noto-color-emoji fonts-noto-cjk fonts-wqy-microhei
# 将 Markdown 字符串转换为 HTML
async def md_str_to_html(md_content, output_html):
"""
将 Markdown 字符串转换为 HTML 文件,并添加支持中文和 Emoji 的样式(异步版本)。
# ================= 样式与 HTML 处理 =================
:param md_content: 输入的 Markdown 字符串
:param output_html: 输出的 HTML 文件路径
async def md_str_to_html_content(md_content):
"""
# 转换 Markdown 为 HTML,启用额外功能(如表格、代码高亮)
html_content = markdown.markdown(md_content, extensions=['extra', 'codehilite'])
Markdown 字符串转换为 HTML 内容字符串(逻辑保持不变)。
"""
# 转换 Markdown 为 HTML
html_body = markdown.markdown(md_content, extensions=['extra', 'codehilite'])
# 添加基本的 HTML 结构和样式,支持中文和 Emoji
# 保持你原有的 CSS 样式不变
css = """
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', 'Noto Sans CJK SC', 'Microsoft YaHei', sans-serif;
padding: 20px 25px;
line-height: 1.6;
max-width: 750px;
margin: 0 auto;
background-color: #f9f9f9;
color: #333;
border: 1px solid #f0f0f0;
font-size: 16px;
}
h1, h2, h3, h4, h5, h6 {
color: #222;
margin-top: 24px;
margin-bottom: 16px;
font-weight: 600;
line-height: 1.3;
}
h1 {
font-size: 2.2em;
padding-bottom: 12px;
border-bottom: 1px solid #eee;
text-align: center;
margin-bottom: 25px;
color: #1a1a1a;
}
h2 {
font-size: 1.8em;
padding-bottom: 10px;
margin-top: 30px;
border-bottom: 1px solid #eee;
color: #2c3e50;
}
h3 {
font-size: 1.5em;
margin-top: 25px;
padding-left: 12px;
border-left: 4px solid #ddd;
color: #34495e;
}
pre, code {
background-color: #f5f5f5;
padding: 12px;
border-radius: 4px;
font-family: 'Courier New', Courier, monospace;
font-size: 0.95em;
border: 1px solid #eee;
}
table {
border-collapse: collapse;
width: 100%;
margin: 18px 0;
background-color: white;
}
th, td {
border: 1px solid #eee;
padding: 10px 12px;
text-align: left;
}
th {
background-color: #fafafa;
font-weight: 600;
}
/* 确保 Emoji 正确渲染 */
span, p, li {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', 'Noto Sans CJK SC', 'Microsoft YaHei', sans-serif;
font-size: 16px;
}
p {
margin: 16px 0;
color: #333;
line-height: 1.8;
font-size: 16px;
}
ul, ol {
padding-left: 25px;
margin: 18px 0;
}
li {
margin: 8px 0;
line-height: 1.7;
font-size: 16px;
}
blockquote {
margin: 18px 0;
padding: 12px 18px;
background-color: #f8f8f8;
border-left: 5px solid #ddd;
color: #555;
font-size: 1em;
}
/* 强调样式 */
strong {
color: #222;
font-weight: 600;
}
/* 链接样式 */
a {
color: #3498db;
text-decoration: none;
}
a:hover {
text-decoration: underline;
color: #2980b9;
}
/* 星级评分样式 */
h3 em {
color: #fa8c16;
font-style: normal;
font-size: 1.1em;
}
/* 时间和标签样式 */
.time, .tag {
color: #777;
font-size: 0.95em;
}
/* 底部署名样式 */
.signature {
margin-top: 35px;
text-align: right;
color: #777;
font-size: 0.95em;
font-style: italic;
padding: 20px 25px; line-height: 1.6; max-width: 750px; margin: 0 auto;
background-color: #f9f9f9; color: #333; border: 1px solid #f0f0f0; font-size: 16px;
}
h1, h2, h3, h4, h5, h6 { color: #222; margin-top: 24px; margin-bottom: 16px; font-weight: 600; line-height: 1.3; }
h1 { font-size: 2.2em; padding-bottom: 12px; border-bottom: 1px solid #eee; text-align: center; margin-bottom: 25px; color: #1a1a1a; }
h2 { font-size: 1.8em; padding-bottom: 10px; margin-top: 30px; border-bottom: 1px solid #eee; color: #2c3e50; }
h3 { font-size: 1.5em; margin-top: 25px; padding-left: 12px; border-left: 4px solid #ddd; color: #34495e; }
pre, code { background-color: #f5f5f5; padding: 12px; border-radius: 4px; font-family: 'Courier New', Courier, monospace; font-size: 0.95em; border: 1px solid #eee; }
table { border-collapse: collapse; width: 100%; margin: 18px 0; background-color: white; }
th, td { border: 1px solid #eee; padding: 10px 12px; text-align: left; }
th { background-color: #fafafa; font-weight: 600; }
p { margin: 16px 0; color: #333; line-height: 1.8; font-size: 16px; }
ul, ol { padding-left: 25px; margin: 18px 0; }
li { margin: 8px 0; line-height: 1.7; font-size: 16px; }
blockquote { margin: 18px 0; padding: 12px 18px; background-color: #f8f8f8; border-left: 5px solid #ddd; color: #555; font-size: 1em; }
strong { color: #222; font-weight: 600; }
a { color: #3498db; text-decoration: none; }
h3 em { color: #fa8c16; font-style: normal; font-size: 1.1em; }
.signature { margin-top: 35px; text-align: right; color: #777; font-size: 0.95em; font-style: italic; }
</style>
"""
# 构建完整的 HTML 内容
full_html = f'''<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
{css}
</head>
<body>
{html_content}
{html_body}
</body>
</html>'''
return full_html
# 使用普通的文件写入,确保文件完全写入
try:
with open(output_html, 'w', encoding='utf-8') as f:
f.write(full_html)
f.flush() # 强制刷新缓冲区
os.fsync(f.fileno()) # 确保写入磁盘
except Exception as e:
logger.error(f"写入HTML文件失败: {e}")
raise
# 验证文件是否成功写入
try:
with open(output_html, 'r', encoding='utf-8') as f:
content = f.read()
if not content:
raise ValueError("HTML文件写入后为空")
except Exception as e:
logger.error(f"验证HTML文件失败: {e}")
raise
# 添加小延时确保文件系统同步
await asyncio.sleep(0.5)
# ================= 浏览器与图片处理 =================
def check_chromium_installed(path):
return os.path.isfile(path) and os.access(path, os.X_OK)
async def html_to_image(html_file, output_image):
async def html_to_image(html_content, output_image):
"""
使用 Playwright 加载 HTML 文件并截图(异步)
优化版:直接注入 HTML 字符串生成图片
"""
# 验证输入文件是否存在
if not os.path.exists(html_file):
raise FileNotFoundError(f"HTML文件不存在: {html_file}")
# 验证输入文件是否可读
if not os.access(html_file, os.R_OK):
raise PermissionError(f"HTML文件不可读: {html_file}")
try:
async with async_playwright() as p:
browser_path = None
async with async_playwright() as p:
browser_path = None
# 保持你原有的浏览器路径搜索逻辑
if os.name == 'nt':
possible_chrome_paths = [
r"C:\Users\Liu_WIN10\AppData\Local\Google\Chrome\Application\chrome.exe",
r"C:\Users\Liu-OPEN\AppData\Local\Google\Chrome\Application\chrome.exe",
r"C:\Program Files\Google\Chrome\Application\chrome.exe",
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe"
]
for path in possible_chrome_paths:
if check_chromium_installed(path):
browser_path = path
break
else:
import glob
user_home = os.path.expanduser("~")
glob_pattern = os.path.join(user_home, ".cache", "ms-playwright", "chromium-*", "chrome-linux", "chrome")
chrome_paths = glob.glob(glob_pattern)
for path in sorted(chrome_paths, reverse=True):
if check_chromium_installed(path):
browser_path = path
break
if os.name == 'nt': # Windows
possible_chrome_paths = [
r"C:\Users\Liu_WIN10\AppData\Local\Google\Chrome\Application\chrome.exe",
r"C:\Users\Liu-OPEN\AppData\Local\Google\Chrome\Application\chrome.exe",
r"C:\Program Files\Google\Chrome\Application\chrome.exe",
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe"
]
for path in possible_chrome_paths:
if check_chromium_installed(path):
browser_path = path
logger.debug(f"找到浏览器路径: {browser_path}")
break
else: # Linux
import glob
user_home = os.path.expanduser("~")
glob_pattern = os.path.join(user_home, ".cache", "ms-playwright", "chromium-*", "chrome-linux",
"chrome")
chrome_paths = glob.glob(glob_pattern)
browser_path = None
for path in sorted(chrome_paths, reverse=True): # 按版本名排序,最新优先
if check_chromium_installed(path):
browser_path = path
logger.debug(f"找到 Playwright Chromium 路径: {browser_path}")
break
# 启动浏览器,添加关键的稳定性参数
launch_args = [
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage" # 解决 Linux 内存共享问题
]
if not browser_path:
logger.debug("未找到已安装的 Chromium 浏览器,尝试使用 Playwright 默认安装")
try:
logger.debug("正在安装 Playwright 浏览器...")
subprocess.run(["playwright", "install", "chromium"], check=True)
logger.debug("Playwright 浏览器安装完成")
except Exception as install_error:
logger.debug(f"安装 Playwright 浏览器失败: {install_error}")
if browser_path:
browser = await p.chromium.launch(executable_path=browser_path, args=launch_args)
else:
browser = await p.chromium.launch(args=launch_args)
browser = await p.chromium.launch() # 使用默认路径
else:
browser = await p.chromium.launch(executable_path=browser_path)
try:
# 使用更高的 device_scale_factor 可以让图片更清晰
context = await browser.new_context(device_scale_factor=2)
page = await context.new_page()
# 业务逻辑不变
page = None
try:
page = await browser.new_page()
# 设置更长的超时时间,并添加更好的错误处理
file_url = f'file://{os.path.abspath(html_file)}'
logger.debug(f"正在加载文件: {file_url}")
# 使用更长的超时时间和更宽松的等待条件
await page.goto(file_url, timeout=120000, wait_until='domcontentloaded')
# 等待页面完全加载
await page.wait_for_timeout(2000)
# 设置视口大小
await page.set_viewport_size({"width": 750, "height": 800})
# 再次等待确保渲染完成
await page.wait_for_timeout(1000)
# 截图
await page.screenshot(path=output_image, full_page=True)
# 验证图片文件是否成功生成
if not os.path.exists(output_image):
raise RuntimeError(f"截图失败,输出文件不存在: {output_image}")
logger.debug(f"截图成功生成: {output_image}")
except Exception as e:
logger.error(f"截图过程中发生错误: {e}")
# 如果截图失败,确保删除可能的不完整文件
if os.path.exists(output_image):
try:
os.remove(output_image)
logger.debug(f"已删除不完整的截图文件: {output_image}")
except Exception as cleanup_error:
logger.warning(f"清理不完整文件失败: {cleanup_error}")
raise
finally:
if page:
await page.close()
await browser.close()
# 设置视口宽度,高度暂设大一点以便内容铺开
await page.set_viewport_size({"width": 750, "height": 1000})
except Exception as e:
logger.error(f"浏览器操作失败: {e}")
if "Executable doesn't exist" in str(e):
logger.error("请运行 'playwright install' 命令安装必要的浏览器组件")
raise
# 【优化核心】:直接设置 HTML 内容,不走 file:// 协议
# 这样可以彻底避免文件读取超时
await page.set_content(html_content, wait_until='load')
# 稍微等待一下确保 CSS 渲染完成
await asyncio.sleep(0.5)
# 截图full_page=True 会自动处理高度)
await page.screenshot(path=output_image, full_page=True)
if not os.path.exists(output_image):
raise RuntimeError(f"截图失败,输出文件不存在: {output_image}")
finally:
await browser.close()
# 主函数:从字符串转换 Markdown 到图片(异步版)
# ================= 主转换函数 =================
async def convert_md_str_to_image(md_content: str, output_image: str, max_retries: int = 3) -> str:
"""
Markdown 字符串转换为图片(异步)。
Args:
md_content (str): Markdown 内容字符串
output_image (str): 输出图片的文件名(不含路径)
max_retries (int): 最大重试次数默认3次
Returns:
str: 生成的图片文件的绝对路径
Raises:
FileNotFoundError: 如果临时目录无法创建或访问
ValueError: 如果 md_content 为空
RuntimeError: 如果重试次数耗尽后仍然失败
主函数:从字符串转换 Markdown 图片(异步)。
"""
# 验证输入
if not md_content:
raise ValueError("Markdown content cannot be empty")
# 获取项目根目录
# 路径准备
project_root = os.getcwd()
project_root_path = Path(project_root).resolve()
# 创建临时目录 - temp/md2image
temp_dir = project_root_path / "temp" / "md2image"
try:
temp_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.error(f"Failed to create temp directory: {e}")
raise FileNotFoundError(f"Could not create temp directory: {temp_dir}")
# 生成唯一的临时文件名
timestamp = int(time.time())
temp_html_filename = f"temp_output_{timestamp}.html"
temp_html_path = temp_dir / temp_html_filename
temp_dir = Path(project_root) / "temp" / "md2image"
temp_dir.mkdir(parents=True, exist_ok=True)
output_image_path = temp_dir / output_image
# 确保输出图片路径的父目录存在
output_image_path.parent.mkdir(parents=True, exist_ok=True)
last_error = None
for attempt in range(max_retries):
try:
logger.debug(f"尝试第 {attempt + 1}/{max_retries} 次生成图片")
# 清理之前的临时文件(如果存在)
if temp_html_path.exists():
os.remove(str(temp_html_path))
if output_image_path.exists():
os.remove(str(output_image_path))
# 将 Markdown 转换为 HTML
await md_str_to_html(md_content, str(temp_html_path))
# 添加更长的等待时间确保文件系统同步
await asyncio.sleep(1.0)
# 检查文件是否存在和可读
if not os.path.exists(str(temp_html_path)):
raise FileNotFoundError(f"HTML文件不存在: {temp_html_path}")
# 验证HTML文件内容
with open(str(temp_html_path), 'r', encoding='utf-8') as f:
html_content = f.read()
if len(html_content) < 100: # HTML文件太短可能有问题
raise ValueError(f"HTML文件内容异常长度仅为: {len(html_content)}")
logger.debug(f"HTML文件验证通过大小: {len(html_content)} 字符")
# 将 HTML 转换为图片
await html_to_image(str(temp_html_path), str(output_image_path))
# 1. 直接获取生成的 HTML 字符串,不再写临时文件
full_html = await md_str_to_html_content(md_content)
# 验证生成的图片文件
if not os.path.exists(str(output_image_path)):
raise RuntimeError(f"图片文件生成失败,文件不存在: {output_image_path}")
# 检查图片文件大小
# 2. 转换图片
await html_to_image(full_html, str(output_image_path))
# 3. 验证
image_size = os.path.getsize(str(output_image_path))
if image_size < 1024: # 小于1KB的图片可能有问题
raise RuntimeError(f"生成的图片文件异常,大小仅为: {image_size} bytes")
if image_size < 1024:
raise RuntimeError(f"图片生成异常,大小仅为: {image_size} bytes")
logger.debug(f"图片成功生成:{output_image_path},大小: {image_size} bytes")
logger.info(f"图片成功生成:{output_image_path}")
return str(output_image_path.resolve())
except Exception as e:
last_error = e
logger.warning(f"{attempt + 1} 次尝试失败: {e}")
# 清理失败的文件
try:
if temp_html_path.exists():
os.remove(str(temp_html_path))
if output_image_path.exists():
os.remove(str(output_image_path))
except Exception as cleanup_error:
logger.warning(f"清理临时文件失败: {cleanup_error}")
# 如果不是最后一次尝试,等待一段时间后重试
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 2 # 递增等待时间
logger.debug(f"等待 {wait_time} 秒后重试...")
await asyncio.sleep(wait_time)
# 所有重试都失败了
logger.error(f"经过 {max_retries} 次尝试后仍然失败")
raise RuntimeError(f"图片生成失败,已重试 {max_retries} 次。最后错误: {last_error}")
# finally:
# # 可选:清理临时 HTML 文件
# if temp_html_path.exists():
# try:
# # 使用异步方式删除文件
# # await asyncio.to_thread(os.remove, str(temp_html_path))
# # logger.debug(f"Deleted temporary HTML file: {temp_html_path}")
# except Exception as e:
# logger.warning(f"Failed to delete temporary HTML file: {e}")
await asyncio.sleep((attempt + 1) * 2)
raise RuntimeError(f"图片生成失败,已重试 {max_retries} 次。最后错误: {last_error}")
# 示例使用
if __name__ == "__main__":
# 示例 Markdown 字符串(包含中文和 Emoji
md_content = """#🌟「4KED康复训练群 - 05-30 总结」🌟