import os import re import time import json import shutil import subprocess import traceback import requests import io from typing import Dict, Any, List, Optional, Tuple from urllib.parse import urlparse from loguru import logger from pathlib import Path from PIL import Image, ImageDraw, ImageFont from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from wechat_ipad import WechatAPIClient from wechat_ipad.models.appmsg_xml import LINK_XML_NORMAL, VIDEO_XML_MESSAGE from wechat_ipad.models.message import MessageType class DouyinParserError(Exception): """抖音解析器自定义异常基类""" pass class DouyinParserPlugin(MessagePluginInterface): """抖音无水印解析插件""" # 功能权限常量 FEATURE_KEY = "DOUYIN_PARSER" FEATURE_DESCRIPTION = "🎵 抖音解析功能 [自动解析抖音链接]" @property def name(self) -> str: return "抖音解析" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供抖音链接无水印解析功能,支持视频下载和分享" @property def author(self) -> str: return "姜不吃先生" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return [] # 不使用命令触发,而是通过消息内容匹配 @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.LOG = logger self.url_pattern = re.compile(r'https?://v\.douyin\.com/[^\s/]+/?') # 注册功能权限 self.feature = self.register_feature() # 修改为使用插件目录下的down_load_dir文件夹 self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir") # 确保下载目录存在 if not os.path.exists(self.download_dir): os.makedirs(self.download_dir, exist_ok=True) def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG.debug(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.event_system = context.get("event_system") self.gbm = context.get("gbm") self.download_dir = str(Path(Path(__file__).parent, "down_load_dir")) # 从配置中获取参数 douyin_config = self._config.get("Douyin", {}) self.enable = douyin_config.get("enable", True) self.http_proxy = douyin_config.get("http_proxy", "") # Cookie 配置说明: # 1) cookie: 直接粘贴请求头 Cookie 字符串; # 2) cookie_file: Netscape 格式 cookies 文件路径; # 3) 当二者同时存在时,后备提取优先 cookie_file(兼容性更好)。 self.cookie = douyin_config.get("cookie", "") or "" self.cookie_file = douyin_config.get("cookie_file", "") or "" self.download_mode = douyin_config.get("download_mode", "card") # card或file self.LOG.debug(f"[{self.name}] 插件初始化完成,代理设置: {self.http_proxy}") return True def start(self) -> bool: """启动插件""" self.LOG.debug(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False if message.get("type") != MessageType.TEXT: return False content = str(message.get("content", "")).strip() match = self.url_pattern.search(content) return match is not None @plugin_stats_decorator(plugin_name="抖音解析") async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.debug(f"插件执行: {self.name}:{content}") sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") # 检查权限 if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" try: match = self.url_pattern.search(content) if not match: return False, "未找到抖音链接" original_url = self._clean_url(match.group(0)) self.LOG.info(f"发现抖音链接: {original_url}") media_info = self._parse_douyin(original_url) if not media_info: self.LOG.error(f"❌无法解析抖音媒资信息") return False, "解析失败" media_type = media_info.get('type', 'video') if media_type == 'image': imgs = media_info.get('images') or [] if not imgs: return False, "未获取到图片地址" img_bytes_list: List[bytes] = [] for u in imgs: b = self._download_image_bytes(u) if b: img_bytes_list.append(b) if not img_bytes_list: return False, "下载图片失败" merged_pages = self._merge_images_vertical_paged(img_bytes_list, 1242, 65000) if not merged_pages: return False, "图片合并失败" title = media_info.get('title') or "" # 按你的需求,图文类型不再单独发送一条文本消息。 # 这里把文案直接绘制到合并后第一页的顶部,让“文字 + 图片”作为同一条图片消息的一部分发送。 if len(title) > 0: merged_pages[0] = self._append_title_to_image(merged_pages[0], title) for page in merged_pages: await self.bot.send_image_message((roomid if roomid else sender), page) return True, f"发送合并图片成功({len(merged_pages)}页)" else: video_url = media_info.get('url', '') title = media_info.get('title', '无标题') author = media_info.get('author', '未知作者') cover = media_info.get('cover', '') if not video_url: self.LOG.error(f"❌无法获取视频地址") return False, "获取视频地址失败" if self.download_mode == "file": video_filename = f"video_{int(time.time())}.mp4" save_path = os.path.join(self.download_dir, video_filename) self.LOG.info(f"开始下载视频到: {save_path}") mp4_path = self._download_stream(video_url, os.path.join(self.download_dir, save_path)) if mp4_path: await self.bot.send_video_message((roomid if roomid else sender), Path(mp4_path)) return True, "发送视频文件成功" else: self.LOG.error(f"❌下载视频失败") return False, "下载视频失败" else: xml_content = f"{VIDEO_XML_MESSAGE}".format(title=author, des=title, url=video_url, thumburl=cover ) await self.bot.send_link_xml_message(xml_content, (roomid if roomid else sender)) return True, "发送卡片成功" except DouyinParserError as e: self.LOG.error(f"抖音解析错误: {e}") self.LOG.error(f"❌抖音解析失败: {str(e)}") return False, f"解析错误: {e}" except Exception as e: self.LOG.error(f"处理抖音链接出错: {e}\n{traceback.format_exc()}") self.LOG.error(f"❌处理抖音链接出错: {str(e)}") return False, f"处理出错: {e}" def _clean_url(self, url: str) -> str: """清理URL""" cleaned_url = url.strip().replace(';', '').replace('\n', '').replace('\r', '') self.LOG.debug(f"[抖音] 清理后的URL: {cleaned_url}") return cleaned_url def _clean_response_data(self, data: Dict[str, Any]) -> Dict[str, Any]: """清理响应数据""" if not data: return data default_cover = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/v4/7c/49/e1/7c49e1af-ce92-d1c4-9a93-0a316e47ba94/AppIcon_TikTok-0-0-1x_U007epad-0-1-0-0-85-220.png/512x512bb.jpg" media_type = data.get('type') or 'video' if media_type == 'video': cover = data.get('cover') if isinstance(cover, str): c = cover.strip().strip('`') data['cover'] = c if c.startswith('http') else default_cover else: data['cover'] = default_cover else: imgs = data.get('images') or [] data['cover'] = imgs[0] if imgs else default_cover return data def _parse_douyin(self, url: str) -> Dict[str, Any]: try: clean_url = self._clean_url(url) # 第一优先级:你现有的内网解析服务,速度快、稳定性高,优先命中。 primary = self._parse_from_internal_api(clean_url) if primary and (primary.get('url') or primary.get('images')): return self._clean_response_data(primary) # 第二优先级:你现有的外部付费接口,作为内网服务不可用时的兜底。 secondary = self._parse_from_external_api(clean_url) if secondary and (secondary.get('url') or secondary.get('images')): return self._clean_response_data(secondary) # 第三优先级:本地提取后备方案(无需依赖远端解析API): # 1) 优先走 yt_dlp Python 库; # 2) 若库不可用,再尝试系统已安装的 yt-dlp 命令行。 # 这样当接口异常/限流时,仍可在本机直接提取无水印直链和元数据。 local_fallback = self._parse_from_local_extractor(clean_url) if local_fallback and (local_fallback.get('url') or local_fallback.get('images')): return self._clean_response_data(local_fallback) raise DouyinParserError("未获取到有效媒资数据") except Exception as e: self.LOG.error(f"[抖音] 解析过程发生未知错误: {str(e)}\n{traceback.format_exc()}") raise DouyinParserError(f"未知错误: {str(e)}") def _build_proxies(self) -> Optional[Dict[str, str]]: if self.http_proxy: return {"http": self.http_proxy, "https": self.http_proxy} return None def _build_request_headers(self) -> Dict[str, str]: """ 构建通用请求头。 设计说明: - User-Agent 保持常规浏览器标识,降低被目标站点直接拒绝的概率; - Cookie 在有配置时注入到请求头,提升受限资源的提取成功率。 """ headers = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ) } if self.cookie: headers["Cookie"] = self.cookie return headers def _parse_from_internal_api(self, clean_url: str) -> Optional[Dict[str, Any]]: try: endpoint = "http://192.168.2.32:8999/api/hybrid/video_data" headers = self._build_request_headers() headers["accept"] = "application/json" params = {"url": clean_url, "minimal": "false"} response = requests.get(endpoint, headers=headers, params=params, timeout=10, proxies=self._build_proxies()) if response.status_code != 200: return None body = response.json() or {} if body.get("code") != 200: return None data = body.get("data") or {} aweme_type = data.get("aweme_type") author = (data.get("author") or {}) nickname = author.get("nickname") or author.get("unique_id") or "未知作者" if aweme_type == 68 or (data.get("images") or data.get("image_list")): images_field = data.get("images") or [] images: List[str] = [] for img in images_field: ulist = img.get("url_list") or img.get("download_url_list") or [] chosen = self._prefer_image_url(ulist) if chosen: images.append(chosen) desc = data.get("desc") or data.get("caption") or "" result = {"type": "image", "images": images, "title": desc, "author": nickname, "cover": images[0] if images else ""} if images: return result return None video = data.get("video") or {} bit_rates = video.get("bit_rate") or [] chosen_url = "" mp4_sorted = sorted([br for br in bit_rates if br.get("format") == "mp4"], key=lambda x: x.get("bit_rate") or 0, reverse=True) for br in mp4_sorted: play_addr = br.get("play_addr") or {} urls = play_addr.get("url_list") or [] selected = self._prefer_v3_v10(urls) if selected: chosen_url = selected break if not chosen_url: play_addr = video.get("play_addr") or {} urls = play_addr.get("url_list") or [] selected = self._prefer_v3_v10(urls) if selected: chosen_url = selected cover = (video.get("cover") or {}).get("url_list") or [] cover_url = cover[0] if cover else "" caption = data.get("caption") or "无标题" author = (data.get("author") or {}) nickname = author.get("nickname") or author.get("unique_id") or "未知作者" result = {"type": "video", "url": chosen_url or "", "title": caption, "author": nickname, "cover": cover_url} if result.get("url"): return result return None except Exception: return None def _parse_from_external_api(self, clean_url: str) -> Optional[Dict[str, Any]]: try: pay_api_url = "https://api.pearktrue.cn/api/video/api.php" params = {"url": clean_url, "key": "f56c1fed0c6e64e7"} response = requests.post( pay_api_url, params=params, headers=self._build_request_headers(), timeout=10, proxies=self._build_proxies(), ) if response.status_code != 200: return None data = response.json() or {} if data.get("code") == 200: result = data.get("data", {}) if result.get("url"): return result return None except Exception: return None def _prefer_v3_v10(self, urls: List[str]) -> Optional[str]: try: if not urls: return None cleaned = [(u or "").strip().strip("`") for u in urls if u] def is_vx(n: str) -> bool: return bool(re.match(r"^v(3|4|5|6|7|8|9|10|11)(?:[\-.]|$)", n, re.I)) def is_douyinvod(n: str) -> bool: return "douyinvod.com" in n.lower() first = None for s in cleaned: netloc = urlparse(s).netloc if is_vx(netloc) and is_douyinvod(netloc): return s if first is None: first = s for s in cleaned: netloc = urlparse(s).netloc if is_vx(netloc): return s for s in cleaned: netloc = urlparse(s).netloc if is_douyinvod(netloc): return s return first except Exception: return urls[0] if urls else None def _prefer_image_url(self, urls: List[str]) -> Optional[str]: try: if not urls: return None cleaned = [(u or "").strip().strip("`") for u in urls if u] jpeg = next((u for u in cleaned if ".jpeg" in u.lower() or u.lower().endswith(".jpg")), None) if jpeg: return jpeg webp = next((u for u in cleaned if ".webp" in u.lower()), None) if webp: return webp return cleaned[0] except Exception: return urls[0] if urls else None def _download_stream(self, url, save_path): """ 从指定URL读取视频流并保存到本地 :param url: 视频流的URL :param save_path: 本地保存路径(包含文件名,例如 "video.mp4") """ try: # 发送GET请求,启用流式传输 response = requests.get( url, stream=True, headers=self._build_request_headers(), proxies=self._build_proxies(), timeout=30, ) # 检查请求是否成功 response.raise_for_status() # 如果状态码不是200,将抛出异常 # 确保保存路径的目录存在 os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True) # 检查是否是视频流(可选,根据Content-Type判断) content_type = response.headers.get("Content-Type", "").lower() if "video" not in content_type and "application/octet-stream" not in content_type: self.LOG.warning(f"警告: 返回的可能不是视频流,Content-Type: {content_type}") self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看 return None # 以二进制写入模式保存流数据 with open(save_path, "wb") as file: for chunk in response.iter_content(chunk_size=1024): # 分块读取,每块1KB if chunk: # 过滤空块 file.write(chunk) self.LOG.info(f"视频已下载到: {save_path}") return os.path.abspath(save_path) except requests.RequestException as e: self.LOG.error(f"请求失败: {e}") except IOError as e: self.LOG.error(f"文件写入失败: {e}") except Exception as e: self.LOG.error(f"发生未知错误: {e}") return None def _download_image_bytes(self, url: str) -> Optional[bytes]: try: resp = requests.get( url, headers=self._build_request_headers(), timeout=15, proxies=self._build_proxies(), ) if resp.status_code == 200: return resp.content return None except Exception: return None def _merge_images_vertical(self, images: List[bytes], target_width: int = 1242) -> Optional[bytes]: try: pil_images: List[Image.Image] = [] for b in images: img = Image.open(io.BytesIO(b)) if img.mode in ("RGBA", "P"): img = img.convert("RGB") w, h = img.size if w != target_width: ratio = target_width / float(w) img = img.resize((target_width, int(h * ratio))) pil_images.append(img) if not pil_images: return None total_height = sum(i.size[1] for i in pil_images) merged = Image.new("RGB", (target_width, total_height)) y = 0 for im in pil_images: merged.paste(im, (0, y)) y += im.size[1] output = io.BytesIO() merged.save(output, format="JPEG", quality=85) return output.getvalue() except Exception: return None def _merge_images_vertical_paged(self, images: List[bytes], target_width: int = 1242, max_total_height: int = 18000) -> Optional[List[bytes]]: try: outputs: List[bytes] = [] current_images: List[Image.Image] = [] current_height = 0 for b in images: try: img = Image.open(io.BytesIO(b)) if img.mode in ("RGBA", "P"): img = img.convert("RGB") w, h = img.size if w != target_width: ratio = target_width / float(w) img = img.resize((target_width, int(h * ratio))) ih = img.size[1] except Exception: continue if current_images and current_height + ih > max_total_height: merged = Image.new("RGB", (target_width, current_height)) y = 0 for im in current_images: merged.paste(im, (0, y)) y += im.size[1] out = io.BytesIO() merged.save(out, format="JPEG", quality=85) outputs.append(out.getvalue()) current_images = [img] current_height = img.size[1] else: current_images.append(img) current_height += ih if current_images: merged = Image.new("RGB", (target_width, current_height)) y = 0 for im in current_images: merged.paste(im, (0, y)) y += im.size[1] out = io.BytesIO() merged.save(out, format="JPEG", quality=85) outputs.append(out.getvalue()) return outputs if outputs else None except Exception: return None def _parse_from_local_extractor(self, clean_url: str) -> Optional[Dict[str, Any]]: """ 本地提取后备方案(接口不可用时启用)。 设计目标: 1) 不依赖你自建/第三方解析接口,避免单点故障; 2) 优先使用 Python 方式,减少进程开销; 3) 若 Python 库未安装,自动降级到命令行,最大化可用性。 """ try: info = self._extract_with_yt_dlp_python(clean_url) if not info: info = self._extract_with_yt_dlp_cli(clean_url) if not info: return None return self._normalize_yt_dlp_info(info) except Exception as e: self.LOG.warning(f"[抖音] 本地提取后备失败: {e}") return None def _extract_with_yt_dlp_python(self, clean_url: str) -> Optional[Dict[str, Any]]: """ 使用 yt_dlp Python 库提取信息。 注意: - skip_download=True 只提取元数据和直链,不下载文件; - 优先选取“含视频轨道且协议为http/https”的格式,降低后续发送失败概率。 """ try: import yt_dlp # type: ignore except Exception: return None ydl_opts = { "quiet": True, "no_warnings": True, "skip_download": True, "proxy": self.http_proxy or None, "nocheckcertificate": True, } # Cookie 注入策略: # - 优先使用 cookie_file(yt-dlp 官方支持的 cookies 文件,兼容性更高); # - 否则回退到手工 Cookie 请求头。 if self.cookie_file and os.path.exists(self.cookie_file): ydl_opts["cookiefile"] = self.cookie_file elif self.cookie: ydl_opts["http_headers"] = {"Cookie": self.cookie} with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(clean_url, download=False) if isinstance(info, dict): return info return None def _extract_with_yt_dlp_cli(self, clean_url: str) -> Optional[Dict[str, Any]]: """ 使用 yt-dlp 命令行提取信息。 适用场景: - 运行环境未安装 yt_dlp Python 包,但系统可执行文件已存在。 """ yt_dlp_bin = shutil.which("yt-dlp") if not yt_dlp_bin: return None cmd = [yt_dlp_bin, "-J", "--no-warnings", "--skip-download", clean_url] if self.http_proxy: cmd.extend(["--proxy", self.http_proxy]) # 命令行模式下同样注入 Cookie,确保与 Python 模式行为一致。 if self.cookie_file and os.path.exists(self.cookie_file): cmd.extend(["--cookies", self.cookie_file]) elif self.cookie: cmd.extend(["--add-header", f"Cookie: {self.cookie}"]) result = subprocess.run(cmd, capture_output=True, text=True, timeout=25) if result.returncode != 0: self.LOG.warning(f"[抖音] yt-dlp 命令行提取失败: code={result.returncode}, err={result.stderr[:200]}") return None try: data = json.loads(result.stdout or "{}") return data if isinstance(data, dict) else None except Exception: return None def _normalize_yt_dlp_info(self, info: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ 将 yt-dlp 的原始结构统一映射为插件内部 media_info 结构。 目标结构: - 视频:{"type":"video","url","title","author","cover"} - 图集:{"type":"image","images":[],"title","author","cover"} """ # 统一提取作者与标题,尽量优先更稳定字段,保证卡片/文本信息完整。 title = str(info.get("description") or info.get("title") or "无标题") author = str(info.get("uploader") or info.get("creator") or info.get("channel") or "未知作者") # 统一提取封面: # 1) thumbnail 字段; # 2) thumbnails 数组最后一项(通常分辨率更高)。 cover = str(info.get("thumbnail") or "") if not cover: thumbs = info.get("thumbnails") or [] if isinstance(thumbs, list) and thumbs: last = thumbs[-1] if isinstance(thumbs[-1], dict) else {} cover = str(last.get("url") or "") # 图集场景:yt-dlp 可能返回 playlist/entries,每项通常是图片或片段资源。 if info.get("_type") == "playlist": entries = info.get("entries") or [] image_urls: List[str] = [] if isinstance(entries, list): for item in entries: if not isinstance(item, dict): continue # 优先取原始URL,其次取页面URL,再次取thumbnail。 candidate = str(item.get("url") or item.get("webpage_url") or item.get("thumbnail") or "") if candidate and candidate.startswith("http"): image_urls.append(candidate) if image_urls: return { "type": "image", "images": image_urls, "title": title, "author": author, "cover": image_urls[0], } # 视频场景:优先从 formats 里选“有视频轨道”的直链,避免选到纯音频。 best_url = "" formats = info.get("formats") or [] scored_candidates: List[Tuple[int, str]] = [] if isinstance(formats, list): for fmt in formats: if not isinstance(fmt, dict): continue fmt_url = str(fmt.get("url") or "") if not fmt_url or not fmt_url.startswith("http"): continue # 必须含视频轨道(vcodec != none),并且协议优先 http/https。 vcodec = str(fmt.get("vcodec") or "") protocol = str(fmt.get("protocol") or "") if vcodec.lower() == "none": continue score = 0 if protocol in ("https", "http"): score += 50 # 优先高分辨率与高码率。 score += int(fmt.get("height") or 0) score += int(fmt.get("tbr") or 0) // 10 scored_candidates.append((score, fmt_url)) if scored_candidates: scored_candidates.sort(key=lambda x: x[0], reverse=True) best_url = scored_candidates[0][1] # 部分站点会直接在顶层给 url 字段,作为兜底读取。 if not best_url: fallback_url = str(info.get("url") or "") if fallback_url.startswith("http"): best_url = fallback_url if best_url: return { "type": "video", "url": best_url, "title": title, "author": author, "cover": cover, } return None def _append_title_to_image(self, image_bytes: bytes, title: str) -> bytes: """ 将标题绘制到图片顶部,返回新的图片二进制数据。 设计说明: 1) 微信接口没有“单条消息同时携带纯文本+图片”的通用发送 API; 2) 为了满足“图文合并发送”,这里把标题渲染为图片顶部文字区域; 3) 渲染失败时直接回退原图,避免影响主流程可用性。 """ if not title: return image_bytes try: source = Image.open(io.BytesIO(image_bytes)) if source.mode in ("RGBA", "P"): source = source.convert("RGB") width, height = source.size # 文字区域留出左右/上下内边距,保证可读性。 pad_x = 36 pad_y = 26 font = self._load_chinese_font(44) wrapped_lines = self._wrap_text_for_image(title.strip(), font, max(100, width - pad_x * 2)) if not wrapped_lines: return image_bytes # 行高按字体大小动态计算,并增加少量行间距。 line_height = max(44, int(font.size * 1.4)) text_block_height = pad_y * 2 + line_height * len(wrapped_lines) # 新建画布:上方白底承载文案,下方保留原图内容。 canvas = Image.new("RGB", (width, height + text_block_height), (255, 255, 255)) canvas.paste(source, (0, text_block_height)) draw = ImageDraw.Draw(canvas) y = pad_y for line in wrapped_lines: draw.text((pad_x, y), line, font=font, fill=(34, 34, 34)) y += line_height output = io.BytesIO() canvas.save(output, format="JPEG", quality=88) return output.getvalue() except Exception as e: self.LOG.warning(f"标题绘制失败,回退原图: {e}") return image_bytes def _load_chinese_font(self, size: int) -> ImageFont.FreeTypeFont: """ 尝试加载常见中文字体,保证标题在不同系统尽量可读。 如果都不可用,则回退到 Pillow 默认字体(可能不支持完整中文)。 """ font_candidates = [ "C:/Windows/Fonts/msyh.ttc", "C:/Windows/Fonts/simhei.ttf", "/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc", "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc", "/System/Library/Fonts/PingFang.ttc", ] for font_path in font_candidates: if os.path.exists(font_path): try: return ImageFont.truetype(font_path, size=size) except Exception: continue return ImageFont.load_default() def _wrap_text_for_image(self, text: str, font: ImageFont.ImageFont, max_width: int) -> List[str]: """ 按像素宽度将文本自动换行,避免标题超宽被截断。 实现策略: - 逐字追加,超过最大宽度就换行; - 保留原有换行语义(按行分段后再逐字处理)。 """ draw = ImageDraw.Draw(Image.new("RGB", (10, 10))) lines: List[str] = [] for para in text.splitlines(): if not para: lines.append("") continue current = "" for ch in para: test = current + ch text_width = int(draw.textlength(test, font=font)) if current and text_width > max_width: lines.append(current) current = ch else: current = test if current: lines.append(current) return lines