Files
abot/plugins/douyin_parser/main.py
Liu 6e0483a49e 抖音解析改为本地页面优先
1. 参考外部 DouyinParser 项目,新增基于分享页 HTML 和 _ROUTER_DATA 的本地解析链路。
2. 抖音解析现在按本地页面解析 -> 原内网接口 -> 原外部接口的顺序依次兜底。
3. 放宽链接匹配范围到 douyin.com / iesdouyin.com,并新增本地解析超时配置项。
2026-05-01 11:49:46 +08:00

871 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import time
import traceback
import html
import json
import requests
import io
from typing import Dict, Any, List, Optional, Tuple
from urllib.parse import urlparse
from loguru import logger
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.appmsg_xml import LINK_XML_NORMAL, VIDEO_XML_MESSAGE
from wechat_ipad.models.message import MessageType
class DouyinParserError(Exception):
"""抖音解析器自定义异常基类"""
pass
class DouyinParserPlugin(MessagePluginInterface):
"""抖音无水印解析插件"""
# 功能权限常量
FEATURE_KEY = "DOUYIN_PARSER"
FEATURE_DESCRIPTION = "🎵 抖音解析功能 [自动解析抖音链接]"
# 参考本地解析项目,把链接匹配范围放宽到 douyin.com / iesdouyin.com
# 1. 原来只匹配 `v.douyin.com` 短链,用户直接转发长链时插件不会命中;
# 2. 本地页面解析本身就是基于真实分享页 HTML因此长链也应该纳入同一套入口
# 3. 这里统一抽 URL 后再做清洗,避免句尾标点被误带入请求。
DOUYIN_URL_RE = re.compile(r'https?://[^\s<>"]+?(?:douyin\.com|iesdouyin\.com)[^\s<>"]*')
# 参考项目优先从 `window._ROUTER_DATA` 里拿 `loaderData -> videoInfoRes -> item_list[0]`
# 1. 这是当前抖音分享页里最稳定的一份结构化首屏数据;
# 2. 能同时覆盖视频作品和图文作品;
# 3. 命中后可以直接绕开外部接口,减少第三方依赖。
ROUTER_DATA_RE = re.compile(r"window\._ROUTER_DATA\s*=\s*({.*?})\s*</script>", re.S)
LEGACY_PLAY_ADDR_RE = re.compile(r'"play_addr":\s*{\s*"uri":\s*"[^"]*",\s*"url_list":\s*\[([^\]]*)\]')
@property
def name(self) -> str:
return "抖音解析"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供抖音链接无水印解析功能,支持视频下载和分享"
@property
def author(self) -> str:
return "姜不吃先生"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return [] # 不使用命令触发,而是通过消息内容匹配
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.LOG = logger
self.url_pattern = self.DOUYIN_URL_RE
# 注册功能权限
self.feature = self.register_feature()
# 修改为使用插件目录下的down_load_dir文件夹
self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir")
# 确保下载目录存在
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir, exist_ok=True)
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG.debug(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self.gbm = context.get("gbm")
self.download_dir = str(Path(Path(__file__).parent, "down_load_dir"))
# 从配置中获取参数
douyin_config = self._config.get("Douyin", {})
self.enable = douyin_config.get("enable", True)
self.http_proxy = douyin_config.get("http_proxy", "")
# Cookie 配置说明:
# 1) cookie: 直接粘贴请求头 Cookie 字符串;
# 2) cookie_file: Netscape 格式 cookies 文件路径;
# 3) 当二者同时存在时,后备提取优先 cookie_file兼容性更好
self.cookie = douyin_config.get("cookie", "") or ""
self.cookie_file = douyin_config.get("cookie_file", "") or ""
self.download_mode = douyin_config.get("download_mode", "card") # card或file
# 本地页面解析走真实抖音分享页,网络链路通常比内网接口更长一些:
# 1. 这里单独给一个本地解析超时,避免抖音页面偶发慢响应时无限挂起;
# 2. 超时只作用于“本地 HTML 解析优先链路”,不会改变后续旧接口的既有配置;
# 3. 若后续你觉得本地网络较慢,只需要改配置即可,不必再动代码。
self.local_parse_timeout_seconds = max(int(douyin_config.get("local_parse_timeout_seconds", 12) or 12), 5)
self.LOG.debug(f"[{self.name}] 插件初始化完成,代理设置: {self.http_proxy}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.debug(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
if message.get("type") != MessageType.TEXT:
return False
content = str(message.get("content", "")).strip()
return self._extract_douyin_url(content) is not None
@plugin_stats_decorator(plugin_name="抖音解析")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
try:
original_url = self._extract_douyin_url(content)
if not original_url:
return False, "未找到抖音链接"
self.LOG.info(f"发现抖音链接: {original_url}")
media_info = self._parse_douyin(original_url)
if not media_info:
self.LOG.error(f"❌无法解析抖音媒资信息")
return False, "解析失败"
media_type = media_info.get('type', 'video')
if media_type == 'image':
imgs = media_info.get('images') or []
if not imgs:
return False, "未获取到图片地址"
img_bytes_list: List[bytes] = []
for u in imgs:
b = self._download_image_bytes(u)
if b:
img_bytes_list.append(b)
if not img_bytes_list:
return False, "下载图片失败"
merged_pages = self._merge_images_vertical_paged(img_bytes_list, 1242, 65000)
if not merged_pages:
return False, "图片合并失败"
title = media_info.get('title') or ""
# 按你的需求,图文类型不再单独发送一条文本消息。
# 这里把文案直接绘制到合并后第一页的顶部,让“文字 + 图片”作为同一条图片消息的一部分发送。
if len(title) > 0:
merged_pages[0] = self._append_title_to_image(merged_pages[0], title)
for page in merged_pages:
await self.bot.send_image_message((roomid if roomid else sender), page)
return True, f"发送合并图片成功({len(merged_pages)}页)"
else:
video_url = media_info.get('url', '')
title = media_info.get('title', '无标题')
author = media_info.get('author', '未知作者')
cover = media_info.get('cover', '')
if not video_url:
self.LOG.error(f"❌无法获取视频地址")
return False, "获取视频地址失败"
if self.download_mode == "file":
video_filename = f"video_{int(time.time())}.mp4"
save_path = os.path.join(self.download_dir, video_filename)
self.LOG.info(f"开始下载视频到: {save_path}")
mp4_path = self._download_stream(video_url, os.path.join(self.download_dir, save_path))
if mp4_path:
await self.bot.send_video_message((roomid if roomid else sender), Path(mp4_path))
return True, "发送视频文件成功"
else:
self.LOG.error(f"❌下载视频失败")
return False, "下载视频失败"
else:
xml_content = f"{VIDEO_XML_MESSAGE}".format(title=author,
des=title,
url=video_url,
thumburl=cover
)
await self.bot.send_link_xml_message(xml_content, (roomid if roomid else sender))
return True, "发送卡片成功"
except DouyinParserError as e:
self.LOG.error(f"抖音解析错误: {e}")
self.LOG.error(f"❌抖音解析失败: {str(e)}")
return False, f"解析错误: {e}"
except Exception as e:
self.LOG.error(f"处理抖音链接出错: {e}\n{traceback.format_exc()}")
self.LOG.error(f"❌处理抖音链接出错: {str(e)}")
return False, f"处理出错: {e}"
def _clean_url(self, url: str) -> str:
"""清理URL"""
cleaned_url = url.strip().replace(';', '').replace('\n', '').replace('\r', '')
self.LOG.debug(f"[抖音] 清理后的URL: {cleaned_url}")
return cleaned_url
def _extract_douyin_url(self, content: str) -> Optional[str]:
"""从消息文本中提取第一条抖音链接。
这里参考外部项目的做法,把句尾常见中文标点一并裁掉:
1. 用户经常直接把“复制打开抖音……”整段文案贴进群里;
2. 链接后面常跟着 `,。!?)` 这类符号,若不清洗会导致请求 404 或跳错页;
3. 抽取逻辑统一收口后,`can_process` 和 `process_message` 可以复用同一套结果。
"""
text = str(content or "").strip()
if not text:
return None
match = self.url_pattern.search(text)
if not match:
return None
return self._clean_url(match.group(0).rstrip(",。,.!?)"))
def _clean_response_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""清理响应数据"""
if not data:
return data
default_cover = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/v4/7c/49/e1/7c49e1af-ce92-d1c4-9a93-0a316e47ba94/AppIcon_TikTok-0-0-1x_U007epad-0-1-0-0-85-220.png/512x512bb.jpg"
media_type = data.get('type') or 'video'
if media_type == 'video':
cover = data.get('cover')
if isinstance(cover, str):
c = cover.strip().strip('`')
data['cover'] = c if c.startswith('http') else default_cover
else:
data['cover'] = default_cover
else:
imgs = data.get('images') or []
data['cover'] = imgs[0] if imgs else default_cover
return data
def _parse_douyin(self, url: str) -> Dict[str, Any]:
try:
clean_url = self._clean_url(url)
# 第一优先级:本地页面解析。
# 1. 参考你给的 DouyinParser 项目,先直接请求分享页并解析 HTML 中的 `_ROUTER_DATA`
# 2. 这样成功时完全不依赖第三方解析 API也更符合“本地优先”的目标
# 3. 只有页面结构变化或网络异常时,才继续走你原来的内网接口和外部接口兜底。
local_primary = self._parse_from_local_page(clean_url)
if local_primary and (local_primary.get('url') or local_primary.get('images')):
return self._clean_response_data(local_primary)
# 第二优先级:保留原有本地业务解析服务(内网)。
primary = self._parse_from_internal_api(clean_url)
if primary and (primary.get('url') or primary.get('images')):
return self._clean_response_data(primary)
# 第三优先级:外部接口兜底。
secondary = self._parse_from_external_api(clean_url)
if secondary and (secondary.get('url') or secondary.get('images')):
return self._clean_response_data(secondary)
raise DouyinParserError("未获取到有效媒资数据")
except Exception as e:
self.LOG.error(f"[抖音] 解析过程发生未知错误: {str(e)}\n{traceback.format_exc()}")
raise DouyinParserError(f"未知错误: {str(e)}")
def _build_proxies(self) -> Optional[Dict[str, str]]:
if self.http_proxy:
return {"http": self.http_proxy, "https": self.http_proxy}
return None
def _build_request_headers(self) -> Dict[str, str]:
"""
构建通用请求头。
设计说明:
- User-Agent 保持常规浏览器标识,降低被目标站点直接拒绝的概率;
- Cookie 在有配置时注入到请求头,提升受限资源的提取成功率。
"""
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
)
}
if self.cookie:
headers["Cookie"] = self.cookie
return headers
def _build_local_parse_headers(self) -> Dict[str, str]:
"""构建本地页面解析专用请求头。
这里刻意切成移动端 Safari UA原因有三点
1. 抖音分享页在移动端更容易直接返回完整作品页,而不是额外的跳转或限制提示;
2. 参考项目就是用移动端 UA 解析,现成经验已经验证过这条链路更稳;
3. 只在本地 HTML 解析链路生效,不会影响你原来的内网/外部接口调用头。
"""
headers = self._build_request_headers()
headers["User-Agent"] = (
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1"
)
headers["Accept"] = "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"
return headers
def _parse_from_local_page(self, clean_url: str) -> Optional[Dict[str, Any]]:
"""优先走本地页面解析。
处理流程:
1. 先跟随分享短链跳转,拿到最终作品页 HTML
2. 优先解析 `window._ROUTER_DATA`,提取视频或图文结构化数据;
3. 若新版结构失效,再用旧版 `play_addr` 正则做一次视频兜底。
"""
try:
response = requests.get(
clean_url,
headers=self._build_local_parse_headers(),
timeout=self.local_parse_timeout_seconds,
proxies=self._build_proxies(),
allow_redirects=True,
)
if response.status_code != 200:
return None
html_content = response.text or ""
if not html_content:
return None
result = self._parse_local_page_html(html_content)
if result:
result["source_url"] = str(response.url or clean_url)
return result
except Exception:
return None
def _parse_local_page_html(self, html_content: str) -> Optional[Dict[str, Any]]:
"""解析抖音分享页 HTML。"""
item = self._extract_aweme_item_from_router_data(html_content)
if item:
note_result = self._parse_local_note_item(item)
if note_result:
return note_result
video_result = self._parse_local_video_item(item)
if video_result:
return video_result
return self._parse_local_legacy_video(html_content)
def _extract_aweme_item_from_router_data(self, html_content: str) -> Optional[Dict[str, Any]]:
"""从 `_ROUTER_DATA` 中抽出作品主数据节点。"""
match = self.ROUTER_DATA_RE.search(html_content or "")
if not match:
return None
try:
router_data = json.loads(match.group(1))
except json.JSONDecodeError as e:
self.LOG.debug(f"[抖音] 解析 _ROUTER_DATA 失败: {e}")
return None
loader_data = router_data.get("loaderData")
if not isinstance(loader_data, dict):
return None
for page_data in loader_data.values():
if not isinstance(page_data, dict):
continue
video_info = page_data.get("videoInfoRes")
if not isinstance(video_info, dict):
continue
item_list = video_info.get("item_list")
if isinstance(item_list, list) and item_list and isinstance(item_list[0], dict):
return item_list[0]
return None
def _parse_local_note_item(self, item: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""解析图文作品。"""
image_url_groups = self._pick_local_image_url_groups(item)
if not image_url_groups:
return None
desc = self._clean_local_text(item.get("desc"))
author = self._clean_local_text((item.get("author") or {}).get("nickname"))
images = [group[0] for group in image_url_groups if group]
if not images:
return None
return {
"type": "image",
"images": images,
"image_url_groups": image_url_groups,
"title": desc,
"author": author,
"cover": images[0],
}
def _pick_local_image_url_groups(self, item: Dict[str, Any]) -> List[List[str]]:
"""从图文作品中提取每一张图的候选地址列表。"""
image_url_groups: List[List[str]] = []
seen_groups = set()
for image_info in item.get("images") or item.get("image_infos") or []:
if not isinstance(image_info, dict):
continue
candidates: List[str] = []
seen_urls = set()
for image_url in image_info.get("url_list") or []:
if not isinstance(image_url, str) or not image_url.startswith("http"):
continue
decoded_url = self._decode_local_value(image_url)
if decoded_url in seen_urls:
continue
candidates.append(decoded_url)
seen_urls.add(decoded_url)
group_key = tuple(candidates)
if candidates and group_key not in seen_groups:
image_url_groups.append(candidates)
seen_groups.add(group_key)
return image_url_groups
def _parse_local_video_item(self, item: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""解析视频作品。"""
video = item.get("video")
if not isinstance(video, dict):
return None
if int(video.get("duration") or 1) == 0:
return None
play_addr = video.get("play_addr") or {}
urls = play_addr.get("url_list") or []
cleaned_urls = [self._decode_local_value(url).replace("playwm", "play") for url in urls if isinstance(url, str) and url]
video_url = self._prefer_v3_v10(cleaned_urls)
if not video_url:
return None
cover = video.get("cover") or {}
cover_urls = cover.get("url_list") or []
cover_url = self._decode_local_value(cover_urls[0]) if cover_urls else ""
return {
"type": "video",
"url": video_url,
"title": self._clean_local_text(item.get("desc")),
"author": self._clean_local_text((item.get("author") or {}).get("nickname")),
"cover": cover_url,
}
def _parse_local_legacy_video(self, html_content: str) -> Optional[Dict[str, Any]]:
"""旧版页面结构兜底:直接从 HTML 里正则抽 `play_addr.url_list`。"""
match = self.LEGACY_PLAY_ADDR_RE.search(html_content or "")
if not match:
return None
raw_urls = [url.strip().strip('"') for url in match.group(1).split(",")]
cleaned_urls = [self._decode_local_value(url).replace("playwm", "play") for url in raw_urls if url]
video_url = self._prefer_v3_v10(cleaned_urls)
if not video_url:
return None
title = self._match_local_json_string(html_content, "desc")
author = self._match_local_json_string(html_content, "nickname")
cover_match = re.search(r'"cover":\s*{\s*"url_list":\s*\[\s*"([^"]+)"', html_content or "")
cover_url = self._decode_local_value(cover_match.group(1)) if cover_match else ""
return {
"type": "video",
"url": video_url,
"title": title,
"author": author,
"cover": cover_url,
}
def _match_local_json_string(self, text: str, key: str) -> str:
"""从页面原始 JSON 片段中提取单个字符串字段。"""
match = re.search(rf'"{re.escape(key)}":\s*"([^"]*)"', text or "")
if not match:
return ""
return self._clean_local_text(self._decode_local_value(match.group(1)))
def _decode_local_value(self, value: str) -> str:
"""解码 HTML 实体和 `\\uXXXX` 形式的转义文本。"""
text = str(value or "")
try:
text = text.encode("utf-8").decode("unicode_escape")
except Exception:
pass
return html.unescape(text)
def _clean_local_text(self, value: Any) -> str:
"""清洗页面里读出来的标题、作者等文本字段。"""
if value is None:
return ""
return html.unescape(str(value)).strip()
def _parse_from_internal_api(self, clean_url: str) -> Optional[Dict[str, Any]]:
try:
endpoint = "http://192.168.2.32:8999/api/hybrid/video_data"
headers = self._build_request_headers()
headers["accept"] = "application/json"
params = {"url": clean_url, "minimal": "false"}
response = requests.get(endpoint, headers=headers, params=params, timeout=10, proxies=self._build_proxies())
if response.status_code != 200:
return None
body = response.json() or {}
if body.get("code") != 200:
return None
data = body.get("data") or {}
aweme_type = data.get("aweme_type")
author = (data.get("author") or {})
nickname = author.get("nickname") or author.get("unique_id") or "未知作者"
if aweme_type == 68 or (data.get("images") or data.get("image_list")):
images_field = data.get("images") or []
images: List[str] = []
for img in images_field:
ulist = img.get("url_list") or img.get("download_url_list") or []
chosen = self._prefer_image_url(ulist)
if chosen:
images.append(chosen)
desc = data.get("desc") or data.get("caption") or ""
result = {"type": "image", "images": images, "title": desc, "author": nickname,
"cover": images[0] if images else ""}
if images:
return result
return None
video = data.get("video") or {}
bit_rates = video.get("bit_rate") or []
chosen_url = ""
mp4_sorted = sorted([br for br in bit_rates if br.get("format") == "mp4"],
key=lambda x: x.get("bit_rate") or 0, reverse=True)
for br in mp4_sorted:
play_addr = br.get("play_addr") or {}
urls = play_addr.get("url_list") or []
selected = self._prefer_v3_v10(urls)
if selected:
chosen_url = selected
break
if not chosen_url:
play_addr = video.get("play_addr") or {}
urls = play_addr.get("url_list") or []
selected = self._prefer_v3_v10(urls)
if selected:
chosen_url = selected
cover = (video.get("cover") or {}).get("url_list") or []
cover_url = cover[0] if cover else ""
caption = data.get("caption") or "无标题"
author = (data.get("author") or {})
nickname = author.get("nickname") or author.get("unique_id") or "未知作者"
result = {"type": "video", "url": chosen_url or "", "title": caption, "author": nickname,
"cover": cover_url}
if result.get("url"):
return result
return None
except Exception:
return None
def _parse_from_external_api(self, clean_url: str) -> Optional[Dict[str, Any]]:
try:
pay_api_url = "https://api.pearktrue.cn/api/video/api.php"
params = {"url": clean_url, "key": "f56c1fed0c6e64e7"}
response = requests.post(
pay_api_url,
params=params,
headers=self._build_request_headers(),
timeout=10,
proxies=self._build_proxies(),
)
if response.status_code != 200:
return None
data = response.json() or {}
if data.get("code") == 200:
result = data.get("data", {})
if result.get("url"):
return result
return None
except Exception:
return None
def _prefer_v3_v10(self, urls: List[str]) -> Optional[str]:
try:
if not urls:
return None
cleaned = [(u or "").strip().strip("`") for u in urls if u]
def is_vx(n: str) -> bool:
return bool(re.match(r"^v(3|4|5|6|7|8|9|10|11)(?:[\-.]|$)", n, re.I))
def is_douyinvod(n: str) -> bool:
return "douyinvod.com" in n.lower()
first = None
for s in cleaned:
netloc = urlparse(s).netloc
if is_vx(netloc) and is_douyinvod(netloc):
return s
if first is None:
first = s
for s in cleaned:
netloc = urlparse(s).netloc
if is_vx(netloc):
return s
for s in cleaned:
netloc = urlparse(s).netloc
if is_douyinvod(netloc):
return s
return first
except Exception:
return urls[0] if urls else None
def _prefer_image_url(self, urls: List[str]) -> Optional[str]:
try:
if not urls:
return None
cleaned = [(u or "").strip().strip("`") for u in urls if u]
jpeg = next((u for u in cleaned if ".jpeg" in u.lower() or u.lower().endswith(".jpg")), None)
if jpeg:
return jpeg
webp = next((u for u in cleaned if ".webp" in u.lower()), None)
if webp:
return webp
return cleaned[0]
except Exception:
return urls[0] if urls else None
def _download_stream(self, url, save_path):
"""
从指定URL读取视频流并保存到本地
:param url: 视频流的URL
:param save_path: 本地保存路径(包含文件名,例如 "video.mp4"
"""
try:
# 发送GET请求启用流式传输
response = requests.get(
url,
stream=True,
headers=self._build_request_headers(),
proxies=self._build_proxies(),
timeout=30,
)
# 检查请求是否成功
response.raise_for_status() # 如果状态码不是200将抛出异常
# 确保保存路径的目录存在
os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True)
# 检查是否是视频流可选根据Content-Type判断
content_type = response.headers.get("Content-Type", "").lower()
if "video" not in content_type and "application/octet-stream" not in content_type:
self.LOG.warning(f"警告: 返回的可能不是视频流Content-Type: {content_type}")
self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看
return None
# 以二进制写入模式保存流数据
with open(save_path, "wb") as file:
for chunk in response.iter_content(chunk_size=1024): # 分块读取每块1KB
if chunk: # 过滤空块
file.write(chunk)
self.LOG.info(f"视频已下载到: {save_path}")
return os.path.abspath(save_path)
except requests.RequestException as e:
self.LOG.error(f"请求失败: {e}")
except IOError as e:
self.LOG.error(f"文件写入失败: {e}")
except Exception as e:
self.LOG.error(f"发生未知错误: {e}")
return None
def _download_image_bytes(self, url: str) -> Optional[bytes]:
try:
resp = requests.get(
url,
headers=self._build_request_headers(),
timeout=15,
proxies=self._build_proxies(),
)
if resp.status_code == 200:
return resp.content
return None
except Exception:
return None
def _merge_images_vertical(self, images: List[bytes], target_width: int = 1242) -> Optional[bytes]:
try:
pil_images: List[Image.Image] = []
for b in images:
img = Image.open(io.BytesIO(b))
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
w, h = img.size
if w != target_width:
ratio = target_width / float(w)
img = img.resize((target_width, int(h * ratio)))
pil_images.append(img)
if not pil_images:
return None
total_height = sum(i.size[1] for i in pil_images)
merged = Image.new("RGB", (target_width, total_height))
y = 0
for im in pil_images:
merged.paste(im, (0, y))
y += im.size[1]
output = io.BytesIO()
merged.save(output, format="JPEG", quality=85)
return output.getvalue()
except Exception:
return None
def _merge_images_vertical_paged(self, images: List[bytes], target_width: int = 1242, max_total_height: int = 18000) -> Optional[List[bytes]]:
try:
outputs: List[bytes] = []
current_images: List[Image.Image] = []
current_height = 0
for b in images:
try:
img = Image.open(io.BytesIO(b))
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
w, h = img.size
if w != target_width:
ratio = target_width / float(w)
img = img.resize((target_width, int(h * ratio)))
ih = img.size[1]
except Exception:
continue
if current_images and current_height + ih > max_total_height:
merged = Image.new("RGB", (target_width, current_height))
y = 0
for im in current_images:
merged.paste(im, (0, y))
y += im.size[1]
out = io.BytesIO()
merged.save(out, format="JPEG", quality=85)
outputs.append(out.getvalue())
current_images = [img]
current_height = img.size[1]
else:
current_images.append(img)
current_height += ih
if current_images:
merged = Image.new("RGB", (target_width, current_height))
y = 0
for im in current_images:
merged.paste(im, (0, y))
y += im.size[1]
out = io.BytesIO()
merged.save(out, format="JPEG", quality=85)
outputs.append(out.getvalue())
return outputs if outputs else None
except Exception:
return None
def _append_title_to_image(self, image_bytes: bytes, title: str) -> bytes:
"""
将标题绘制到图片顶部,返回新的图片二进制数据。
设计说明:
1) 微信接口没有“单条消息同时携带纯文本+图片”的通用发送 API
2) 为了满足“图文合并发送”,这里把标题渲染为图片顶部文字区域;
3) 渲染失败时直接回退原图,避免影响主流程可用性。
"""
if not title:
return image_bytes
try:
source = Image.open(io.BytesIO(image_bytes))
if source.mode in ("RGBA", "P"):
source = source.convert("RGB")
width, height = source.size
# 文字区域留出左右/上下内边距,保证可读性。
pad_x = 36
pad_y = 26
font = self._load_chinese_font(44)
wrapped_lines = self._wrap_text_for_image(title.strip(), font, max(100, width - pad_x * 2))
if not wrapped_lines:
return image_bytes
# 行高按字体大小动态计算,并增加少量行间距。
line_height = max(44, int(font.size * 1.4))
text_block_height = pad_y * 2 + line_height * len(wrapped_lines)
# 新建画布:上方白底承载文案,下方保留原图内容。
canvas = Image.new("RGB", (width, height + text_block_height), (255, 255, 255))
canvas.paste(source, (0, text_block_height))
draw = ImageDraw.Draw(canvas)
y = pad_y
for line in wrapped_lines:
draw.text((pad_x, y), line, font=font, fill=(34, 34, 34))
y += line_height
output = io.BytesIO()
canvas.save(output, format="JPEG", quality=88)
return output.getvalue()
except Exception as e:
self.LOG.warning(f"标题绘制失败,回退原图: {e}")
return image_bytes
def _load_chinese_font(self, size: int) -> ImageFont.FreeTypeFont:
"""
尝试加载常见中文字体,保证标题在不同系统尽量可读。
如果都不可用,则回退到 Pillow 默认字体(可能不支持完整中文)。
"""
font_candidates = [
"C:/Windows/Fonts/msyh.ttc",
"C:/Windows/Fonts/simhei.ttf",
"/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc",
"/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
"/System/Library/Fonts/PingFang.ttc",
]
for font_path in font_candidates:
if os.path.exists(font_path):
try:
return ImageFont.truetype(font_path, size=size)
except Exception:
continue
return ImageFont.load_default()
def _wrap_text_for_image(self, text: str, font: ImageFont.ImageFont, max_width: int) -> List[str]:
"""
按像素宽度将文本自动换行,避免标题超宽被截断。
实现策略:
- 逐字追加,超过最大宽度就换行;
- 保留原有换行语义(按行分段后再逐字处理)。
"""
draw = ImageDraw.Draw(Image.new("RGB", (10, 10)))
lines: List[str] = []
for para in text.splitlines():
if not para:
lines.append("")
continue
current = ""
for ch in para:
test = current + ch
text_width = int(draw.textlength(test, font=font))
if current and text_width > max_width:
lines.append(current)
current = ch
else:
current = test
if current:
lines.append(current)
return lines