- 修复抖音分享页 videoInfoRes 在新版 note 页面下的提取逻辑\n- 为图文页补充 note_pages 结构并识别 image.video 下的 live 实况视频地址\n- 命中 live 实况时优先按视频发送,失败再回退静态图发送
1074 lines
47 KiB
Python
1074 lines
47 KiB
Python
import os
|
||
import re
|
||
import time
|
||
import traceback
|
||
import json
|
||
import html
|
||
import requests
|
||
import io
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
from urllib.parse import urlparse
|
||
|
||
from loguru import logger
|
||
from pathlib import Path
|
||
from PIL import Image
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from utils.decorator.plugin_decorators import plugin_stats_decorator
|
||
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
|
||
from wechat_ipad import WechatAPIClient
|
||
from wechat_ipad.models.appmsg_xml import LINK_XML_NORMAL, VIDEO_XML_MESSAGE
|
||
from wechat_ipad.models.message import MessageType
|
||
|
||
|
||
class DouyinParserError(Exception):
|
||
"""抖音解析器自定义异常基类"""
|
||
pass
|
||
|
||
|
||
class DouyinParserPlugin(MessagePluginInterface):
|
||
"""抖音无水印解析插件"""
|
||
|
||
# 功能权限常量
|
||
FEATURE_KEY = "DOUYIN_PARSER"
|
||
FEATURE_DESCRIPTION = "🎵 抖音解析功能 [自动解析抖音链接]"
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "抖音解析"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "提供抖音链接无水印解析功能,支持视频下载和分享"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "姜不吃先生"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return "" # 不需要前缀,直接匹配命令
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return [] # 不使用命令触发,而是通过消息内容匹配
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.LOG = logger
|
||
# 既兼容 v.douyin.com 短链,也兼容分享页已经展开后的 douyin / iesdouyin 链接。
|
||
# 这样用户直接转发短链、长链或者带标点的分享文案时,都能进入统一解析链路。
|
||
self.url_pattern = re.compile(r'https?://[^\s<>\"]+?(?:douyin\.com|iesdouyin\.com)[^\s<>\"]*')
|
||
# 本地页面解析优先复用分享页中的 _ROUTER_DATA。
|
||
# 这是参考 DouyinParser 项目接入的核心能力,可以在不依赖第三方接口的情况下直接拿到图文/视频元数据。
|
||
self.router_data_pattern = re.compile(r"window\._ROUTER_DATA\s*=\s*({.*?})\s*</script>", re.S)
|
||
# 注册功能权限
|
||
self.feature = self.register_feature()
|
||
# 修改为使用插件目录下的down_load_dir文件夹
|
||
self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir")
|
||
# 确保下载目录存在
|
||
if not os.path.exists(self.download_dir):
|
||
os.makedirs(self.download_dir, exist_ok=True)
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG.debug(f"正在初始化 {self.name} 插件...")
|
||
|
||
# 保存上下文对象
|
||
self.event_system = context.get("event_system")
|
||
self.gbm = context.get("gbm")
|
||
self.download_dir = str(Path(Path(__file__).parent, "down_load_dir"))
|
||
# 从配置中获取参数
|
||
douyin_config = self._config.get("Douyin", {})
|
||
self.enable = douyin_config.get("enable", True)
|
||
self.http_proxy = douyin_config.get("http_proxy", "")
|
||
# Cookie 配置说明:
|
||
# 1) cookie: 直接粘贴请求头 Cookie 字符串;
|
||
# 2) cookie_file: Netscape 格式 cookies 文件路径;
|
||
# 3) 当二者同时存在时,后备提取优先 cookie_file(兼容性更好)。
|
||
self.cookie = douyin_config.get("cookie", "") or ""
|
||
self.cookie_file = douyin_config.get("cookie_file", "") or ""
|
||
self.download_mode = douyin_config.get("download_mode", "card") # card或file
|
||
|
||
self.LOG.debug(f"[{self.name}] 插件初始化完成,代理设置: {self.http_proxy}")
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.debug(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
if not self.enable:
|
||
return False
|
||
if message.get("type") != MessageType.TEXT:
|
||
return False
|
||
content = str(message.get("content", "")).strip()
|
||
return self._extract_douyin_url(content) is not None
|
||
|
||
@plugin_stats_decorator(plugin_name="抖音解析")
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理消息"""
|
||
content = str(message.get("content", "")).strip()
|
||
self.LOG.debug(f"插件执行: {self.name}:{content}")
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
|
||
bot: WechatAPIClient = message.get("bot")
|
||
# 检查权限
|
||
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
|
||
try:
|
||
extracted_url = self._extract_douyin_url(content)
|
||
if not extracted_url:
|
||
return False, "未找到抖音链接"
|
||
|
||
original_url = self._clean_url(extracted_url)
|
||
self.LOG.info(f"发现抖音链接: {original_url}")
|
||
|
||
media_info = self._parse_douyin(original_url)
|
||
if not media_info:
|
||
self.LOG.error(f"❌无法解析抖音媒资信息")
|
||
return False, "解析失败"
|
||
|
||
media_type = media_info.get('type', 'video')
|
||
if media_type == 'image':
|
||
target_id = roomid if roomid else sender
|
||
|
||
# 图文作品改回“文本与图片分离发送”:
|
||
# 1. 文本单独发送,可读性更强,也方便用户直接复制文案;
|
||
# 2. 图片数量较少时保留原始逐张展示,避免小图文被强行拼成长图;
|
||
# 3. 图片较多时再合并,兼顾刷屏控制与浏览体验。
|
||
note_text = self._build_note_text(media_info)
|
||
if note_text:
|
||
await bot.send_text_message(target_id, note_text)
|
||
|
||
note_pages = media_info.get('note_pages') or []
|
||
# live 实况图文会把动态部分塞进 image.video。
|
||
# 命中后优先按视频发送,发不出再回退到静态图,避免继续丢失动态内容。
|
||
if note_pages and any(page.get("media_type") == "video" for page in note_pages):
|
||
sent_count = 0
|
||
for page in note_pages:
|
||
if page.get("media_type") == "video":
|
||
video_bytes = self._download_first_available_video_bytes(page.get("video_candidates") or [])
|
||
cover_bytes = self._download_first_available_image_bytes(page.get("cover_candidates") or [])
|
||
if video_bytes:
|
||
await bot.send_video_message(target_id, video_bytes, cover_bytes if cover_bytes else None)
|
||
sent_count += 1
|
||
continue
|
||
image_bytes = self._download_first_available_image_bytes(page.get("image_candidates") or [])
|
||
if image_bytes:
|
||
await bot.send_image_message(target_id, image_bytes)
|
||
sent_count += 1
|
||
if sent_count:
|
||
return True, f"发送图文/实况成功({sent_count}页)"
|
||
return False, "下载图文内容失败"
|
||
|
||
image_candidates = media_info.get('image_candidates') or []
|
||
if not image_candidates:
|
||
raw_images = media_info.get('images') or []
|
||
image_candidates = [[str(url).strip()] for url in raw_images if str(url).strip()]
|
||
if not image_candidates:
|
||
return False, "未获取到图片地址"
|
||
img_bytes_list: List[bytes] = []
|
||
# 本地页面解析会尽量给出每张图的多个候选地址。
|
||
# 这里逐组兜底下载,避免首选链接偶发 403/失效时整条图文直接失败。
|
||
for candidates in image_candidates:
|
||
b = self._download_first_available_image_bytes(candidates)
|
||
if b:
|
||
img_bytes_list.append(b)
|
||
if not img_bytes_list:
|
||
return False, "下载图片失败"
|
||
|
||
if len(img_bytes_list) > 3:
|
||
merged_pages = self._merge_images_vertical_paged(img_bytes_list, 1242, 65000)
|
||
if not merged_pages:
|
||
return False, "图片合并失败"
|
||
for page in merged_pages:
|
||
await bot.send_image_message(target_id, page)
|
||
return True, f"发送合并图片成功({len(merged_pages)}页)"
|
||
|
||
for image_bytes in img_bytes_list:
|
||
await bot.send_image_message(target_id, image_bytes)
|
||
return True, f"发送原图成功({len(img_bytes_list)}张)"
|
||
else:
|
||
video_url = media_info.get('url', '')
|
||
title = media_info.get('title', '无标题')
|
||
author = media_info.get('author', '未知作者')
|
||
cover = media_info.get('cover', '')
|
||
|
||
if not video_url:
|
||
self.LOG.error(f"❌无法获取视频地址")
|
||
return False, "获取视频地址失败"
|
||
|
||
if self.download_mode == "file":
|
||
video_filename = f"video_{int(time.time())}.mp4"
|
||
save_path = os.path.join(self.download_dir, video_filename)
|
||
self.LOG.info(f"开始下载视频到: {save_path}")
|
||
mp4_path = self._download_stream(video_url, os.path.join(self.download_dir, save_path))
|
||
if mp4_path:
|
||
await self.bot.send_video_message((roomid if roomid else sender), Path(mp4_path))
|
||
return True, "发送视频文件成功"
|
||
else:
|
||
self.LOG.error(f"❌下载视频失败")
|
||
return False, "下载视频失败"
|
||
else:
|
||
xml_content = f"{VIDEO_XML_MESSAGE}".format(title=author,
|
||
des=title,
|
||
url=video_url,
|
||
thumburl=cover
|
||
)
|
||
await self.bot.send_link_xml_message(xml_content, (roomid if roomid else sender))
|
||
return True, "发送卡片成功"
|
||
|
||
except DouyinParserError as e:
|
||
self.LOG.error(f"抖音解析错误: {e}")
|
||
self.LOG.error(f"❌抖音解析失败: {str(e)}")
|
||
return False, f"解析错误: {e}"
|
||
except Exception as e:
|
||
self.LOG.error(f"处理抖音链接出错: {e}\n{traceback.format_exc()}")
|
||
self.LOG.error(f"❌处理抖音链接出错: {str(e)}")
|
||
return False, f"处理出错: {e}"
|
||
|
||
def _clean_url(self, url: str) -> str:
|
||
"""清理URL"""
|
||
cleaned_url = url.strip().replace(';', '').replace('\n', '').replace('\r', '')
|
||
cleaned_url = cleaned_url.rstrip(",。,.!!??))]}")
|
||
self.LOG.debug(f"[抖音] 清理后的URL: {cleaned_url}")
|
||
return cleaned_url
|
||
|
||
def _extract_douyin_url(self, content: str) -> Optional[str]:
|
||
"""
|
||
从消息文本中提取第一条抖音链接。
|
||
|
||
说明:
|
||
1. 分享文案里经常会把链接夹在中文标点中间,这里统一做一次裁剪;
|
||
2. 后续无论是本地页面解析还是外部兜底,都使用这一条标准化后的 URL;
|
||
3. 入口收口后,后面如果要补充更多抖音域名,也只需要改这一处。
|
||
"""
|
||
match = self.url_pattern.search(str(content or ""))
|
||
if not match:
|
||
return None
|
||
return self._clean_url(match.group(0))
|
||
|
||
def _clean_response_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""清理响应数据"""
|
||
if not data:
|
||
return data
|
||
default_cover = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/v4/7c/49/e1/7c49e1af-ce92-d1c4-9a93-0a316e47ba94/AppIcon_TikTok-0-0-1x_U007epad-0-1-0-0-85-220.png/512x512bb.jpg"
|
||
media_type = data.get('type') or 'video'
|
||
# 三条解析链路最终都会走到这里,因此把标题/作者统一再清洗一遍,
|
||
# 可以同时兜住“本地页面解析”“内网接口”“外部接口”三种来源的乱码问题。
|
||
data['title'] = self._clean_text(data.get('title'))
|
||
data['author'] = self._clean_text(data.get('author'))
|
||
if media_type == 'video':
|
||
cover = data.get('cover')
|
||
if isinstance(cover, str):
|
||
c = cover.strip().strip('`')
|
||
data['cover'] = c if c.startswith('http') else default_cover
|
||
else:
|
||
data['cover'] = default_cover
|
||
else:
|
||
imgs = data.get('images') or []
|
||
data['cover'] = imgs[0] if imgs else default_cover
|
||
return data
|
||
|
||
def _parse_douyin(self, url: str) -> Dict[str, Any]:
|
||
try:
|
||
clean_url = self._clean_url(url)
|
||
# 第一优先级:本地页面解析。
|
||
# 这里参考 DouyinParser 项目,直接展开短链并解析分享页里的 _ROUTER_DATA,
|
||
# 优点是不依赖外部第三方接口,命中成功时可直接拿到图文/视频的原始元数据。
|
||
local_result = self._parse_from_local_page(clean_url)
|
||
if local_result and (local_result.get('url') or local_result.get('images')):
|
||
return self._clean_response_data(local_result)
|
||
|
||
# 第二优先级:现有内网业务解析服务。
|
||
# 保留这条链路作为本地页面解析失败后的第一层兜底,避免线上能力回退。
|
||
primary = self._parse_from_internal_api(clean_url)
|
||
if primary and (primary.get('url') or primary.get('images')):
|
||
return self._clean_response_data(primary)
|
||
|
||
# 第三优先级:外部接口兜底。
|
||
# 这一层只在本地解析和内网解析都失败时再尝试,避免主路径对外部服务形成硬依赖。
|
||
secondary = self._parse_from_external_api(clean_url)
|
||
if secondary and (secondary.get('url') or secondary.get('images')):
|
||
return self._clean_response_data(secondary)
|
||
|
||
raise DouyinParserError("未获取到有效媒资数据")
|
||
except Exception as e:
|
||
self.LOG.error(f"[抖音] 解析过程发生未知错误: {str(e)}\n{traceback.format_exc()}")
|
||
raise DouyinParserError(f"未知错误: {str(e)}")
|
||
|
||
def _parse_from_local_page(self, clean_url: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
直接解析抖音分享页 HTML。
|
||
|
||
实现思路参考外部 DouyinParser 项目,但这里做了两点本地化适配:
|
||
1. 继续沿用当前插件已有的 requests / proxy / cookie 配置,避免额外引入异步 HTTP 依赖;
|
||
2. 解析结果统一映射成当前插件现有的数据结构,尽量不改发送链路。
|
||
"""
|
||
try:
|
||
resolved_url = self._resolve_douyin_share_url(clean_url)
|
||
html_content = self._fetch_douyin_page_html(resolved_url)
|
||
if not html_content:
|
||
return None
|
||
result = self._parse_douyin_page_html(html_content)
|
||
if result and resolved_url and not result.get("source_url"):
|
||
result["source_url"] = resolved_url
|
||
return result
|
||
except Exception as e:
|
||
self.LOG.warning(f"[抖音] 本地页面解析失败,准备进入兜底链路: {e}")
|
||
return None
|
||
|
||
def _resolve_douyin_share_url(self, url: str) -> str:
|
||
"""
|
||
展开抖音短链,拿到最终分享页地址。
|
||
|
||
这样后续拉取 HTML 时可以稳定命中作品详情页,而不是停留在 v.douyin.com 的跳转页。
|
||
"""
|
||
response = requests.get(
|
||
url,
|
||
headers=self._build_page_request_headers(),
|
||
timeout=10,
|
||
proxies=self._build_proxies(),
|
||
allow_redirects=True,
|
||
)
|
||
response.raise_for_status()
|
||
final_url = str(response.url or url).strip()
|
||
self.LOG.debug(f"[抖音] 展开后的分享页地址: {final_url}")
|
||
return final_url
|
||
|
||
def _fetch_douyin_page_html(self, url: str) -> str:
|
||
"""拉取抖音分享页 HTML 内容。"""
|
||
response = requests.get(
|
||
url,
|
||
headers=self._build_page_request_headers(),
|
||
timeout=15,
|
||
proxies=self._build_proxies(),
|
||
)
|
||
response.raise_for_status()
|
||
# 抖音分享页绝大多数场景实际都是 UTF-8。
|
||
# 之前这里优先使用 apparent_encoding,容易被短文本页面误判成 GBK/Latin-1,
|
||
# 最终导致图文文案和卡片标题一进解析链路就已经变成乱码。
|
||
# 这里改成:
|
||
# 1. 优先按 UTF-8 直接解原始 bytes;
|
||
# 2. UTF-8 失败时,再回退到响应头 / apparent_encoding;
|
||
# 3. 最后兜底 replace,至少保证流程不断。
|
||
html_content = self._decode_http_response_text(response)
|
||
if not html_content.strip():
|
||
raise DouyinParserError("抖音分享页内容为空")
|
||
return html_content
|
||
|
||
def _parse_douyin_page_html(self, html_content: str) -> Dict[str, Any]:
|
||
"""
|
||
解析分享页 HTML,兼容图文与视频作品。
|
||
|
||
解析顺序:
|
||
1. 优先尝试新版页面里的 _ROUTER_DATA;
|
||
2. 如果没有命中,再回退到旧页面中可直接正则提取的 video 字段。
|
||
"""
|
||
item = self._extract_aweme_item(html_content)
|
||
if item:
|
||
note = self._parse_note_item(item)
|
||
if note:
|
||
return note
|
||
|
||
video = self._parse_video_item(item)
|
||
if video:
|
||
return video
|
||
|
||
legacy_video = self._parse_legacy_video(html_content)
|
||
if legacy_video:
|
||
return legacy_video
|
||
|
||
raise DouyinParserError("未找到可解析的抖音图文或视频内容")
|
||
|
||
def _extract_aweme_item(self, html_content: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
从页面中的 _ROUTER_DATA 提取第一条作品数据。
|
||
|
||
这是当前抖音分享页最稳定的数据来源,图文、视频都可以从这里统一解析。
|
||
"""
|
||
match = self.router_data_pattern.search(html_content or "")
|
||
if not match:
|
||
return None
|
||
|
||
try:
|
||
router_data = json.loads(match.group(1))
|
||
except json.JSONDecodeError as e:
|
||
self.LOG.warning(f"[抖音] 解析 _ROUTER_DATA 失败: {e}")
|
||
return None
|
||
|
||
loader_data = router_data.get("loaderData")
|
||
if not isinstance(loader_data, dict):
|
||
return None
|
||
|
||
for page_data in loader_data.values():
|
||
if not isinstance(page_data, dict):
|
||
continue
|
||
# 新版 note 页的 videoInfoRes 直接挂在当前节点,不再额外包一层 page dict。
|
||
direct_video_info = page_data.get("videoInfoRes")
|
||
if isinstance(direct_video_info, dict):
|
||
item_list = direct_video_info.get("item_list")
|
||
if isinstance(item_list, list) and item_list and isinstance(item_list[0], dict):
|
||
return item_list[0]
|
||
for nested_page in page_data.values():
|
||
if not isinstance(nested_page, dict):
|
||
continue
|
||
video_info = nested_page.get("videoInfoRes")
|
||
if not isinstance(video_info, dict):
|
||
continue
|
||
item_list = video_info.get("item_list")
|
||
if isinstance(item_list, list) and item_list and isinstance(item_list[0], dict):
|
||
return item_list[0]
|
||
return None
|
||
|
||
def _parse_note_item(self, item: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
从作品数据中解析图文作品。
|
||
|
||
这里保留每张图的候选 URL 列表,后续下载阶段可以逐个重试,提升图文成功率。
|
||
"""
|
||
note_pages = self._build_note_pages(item.get("images") or item.get("image_infos") or [])
|
||
image_url_groups = [page.get("image_candidates") or [] for page in note_pages if page.get("image_candidates")]
|
||
if not image_url_groups:
|
||
return None
|
||
|
||
return {
|
||
"type": "image",
|
||
"title": self._clean_text(item.get("desc")),
|
||
"author": self._clean_text((item.get("author") or {}).get("nickname")),
|
||
"images": [group[0] for group in image_url_groups if group],
|
||
"image_candidates": image_url_groups,
|
||
"note_pages": note_pages,
|
||
"cover": image_url_groups[0][0] if image_url_groups and image_url_groups[0] else "",
|
||
}
|
||
|
||
def _pick_image_url_groups(self, item: Dict[str, Any]) -> List[List[str]]:
|
||
"""提取图文中每一页图片的候选地址列表,并做去重。"""
|
||
image_url_groups: List[List[str]] = []
|
||
seen_groups = set()
|
||
for image_info in item.get("images") or item.get("image_infos") or []:
|
||
if not isinstance(image_info, dict):
|
||
continue
|
||
candidates: List[str] = []
|
||
seen_urls = set()
|
||
for image_url in image_info.get("url_list") or []:
|
||
if not isinstance(image_url, str) or not image_url.startswith("http"):
|
||
continue
|
||
decoded_url = self._decode_text(image_url)
|
||
if decoded_url in seen_urls:
|
||
continue
|
||
candidates.append(decoded_url)
|
||
seen_urls.add(decoded_url)
|
||
group_key = tuple(candidates)
|
||
if candidates and group_key not in seen_groups:
|
||
image_url_groups.append(candidates)
|
||
seen_groups.add(group_key)
|
||
return image_url_groups
|
||
|
||
def _build_note_pages(self, image_infos: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||
"""把抖音图文页规整成统一的逐页发送描述。
|
||
|
||
设计说明:
|
||
1. 普通图文页只会带静态图候选地址;
|
||
2. live 实况页会额外在 image.video 里挂短视频;
|
||
3. 发送阶段只认这份结构,就能按页决定“发图片还是发视频”。
|
||
"""
|
||
pages: List[Dict[str, Any]] = []
|
||
for image_info in image_infos or []:
|
||
if not isinstance(image_info, dict):
|
||
continue
|
||
image_candidates = self._dedupe_http_urls(
|
||
list(image_info.get("url_list") or []) + list(image_info.get("download_url_list") or [])
|
||
)
|
||
if not image_candidates:
|
||
continue
|
||
video_candidates = self._extract_live_photo_video_candidates(image_info)
|
||
cover_candidates = self._dedupe_http_urls(
|
||
list((((image_info.get("video") or {}).get("cover") or {}).get("url_list")) or []) + image_candidates
|
||
)
|
||
pages.append({
|
||
"media_type": "video" if video_candidates else "image",
|
||
"image_candidates": image_candidates,
|
||
"video_candidates": video_candidates,
|
||
"cover_candidates": cover_candidates,
|
||
})
|
||
return pages
|
||
|
||
def _extract_live_photo_video_candidates(self, image_info: Dict[str, Any]) -> List[str]:
|
||
"""从 live 实况图的 image.video 里提取可发送视频地址。"""
|
||
video_info = image_info.get("video") or {}
|
||
if not isinstance(video_info, dict):
|
||
return []
|
||
|
||
ordered_groups: List[List[str]] = []
|
||
bit_rate_rows = sorted(
|
||
[row for row in (video_info.get("bit_rate") or []) if isinstance(row, dict)],
|
||
key=lambda row: row.get("bit_rate") or 0,
|
||
reverse=True,
|
||
)
|
||
for row in bit_rate_rows:
|
||
ordered_groups.append(list(((row.get("play_addr") or {}).get("url_list")) or []))
|
||
ordered_groups.extend([
|
||
list(((video_info.get("download_addr") or {}).get("url_list")) or []),
|
||
list(((video_info.get("play_addr_h264") or {}).get("url_list")) or []),
|
||
list(((video_info.get("play_addr") or {}).get("url_list")) or []),
|
||
list(((video_info.get("play_addr_lowbr") or {}).get("url_list")) or []),
|
||
list(((video_info.get("download_suffix_logo_addr") or {}).get("url_list")) or []),
|
||
])
|
||
|
||
candidates: List[str] = []
|
||
for url_group in ordered_groups:
|
||
chosen = self._pick_video_url(url_group)
|
||
if chosen:
|
||
candidates.append(chosen)
|
||
return self._dedupe_http_urls(candidates)
|
||
|
||
def _dedupe_http_urls(self, urls: List[str]) -> List[str]:
|
||
"""去重并规整 URL 列表,避免对同一地址重复下载。"""
|
||
cleaned_urls: List[str] = []
|
||
seen_urls = set()
|
||
for url in urls or []:
|
||
if not isinstance(url, str):
|
||
continue
|
||
decoded_url = self._decode_text(url).strip()
|
||
if not decoded_url.startswith("http") or decoded_url in seen_urls:
|
||
continue
|
||
cleaned_urls.append(decoded_url)
|
||
seen_urls.add(decoded_url)
|
||
return cleaned_urls
|
||
|
||
def _parse_video_item(self, item: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||
"""从作品数据中解析视频作品,并优先挑选无水印播放地址。"""
|
||
video = item.get("video")
|
||
if not isinstance(video, dict) or video.get("duration") == 0:
|
||
return None
|
||
|
||
play_addr = video.get("play_addr") or {}
|
||
urls = play_addr.get("url_list") or []
|
||
video_url = self._pick_video_url(urls)
|
||
if not video_url:
|
||
return None
|
||
|
||
cover = video.get("cover") or {}
|
||
cover_urls = cover.get("url_list") or []
|
||
cover_url = self._decode_text(cover_urls[0]) if cover_urls else ""
|
||
|
||
return {
|
||
"type": "video",
|
||
"url": video_url,
|
||
"title": self._clean_text(item.get("desc")),
|
||
"author": self._clean_text((item.get("author") or {}).get("nickname")),
|
||
"cover": cover_url,
|
||
}
|
||
|
||
def _parse_legacy_video(self, html_content: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
兼容旧分享页结构。
|
||
|
||
有些页面没有 _ROUTER_DATA,但仍然能从 play_addr / cover / desc 中拼出完整视频卡片。
|
||
"""
|
||
pattern = re.compile(r'"play_addr":\s*{\s*"uri":\s*"[^"]*",\s*"url_list":\s*\[([^\]]*)\]')
|
||
match = pattern.search(html_content or "")
|
||
if not match:
|
||
return None
|
||
|
||
raw_urls = [url.strip().strip('"') for url in match.group(1).split(",")]
|
||
video_url = self._pick_video_url(raw_urls)
|
||
if not video_url:
|
||
return None
|
||
|
||
title = self._match_json_string(html_content, "desc")
|
||
author = self._match_json_string(html_content, "nickname")
|
||
cover_match = re.search(r'"cover":\s*{\s*"url_list":\s*\[\s*"([^"]+)"', html_content or "")
|
||
|
||
return {
|
||
"type": "video",
|
||
"url": video_url,
|
||
"title": title,
|
||
"author": author,
|
||
"cover": self._decode_text(cover_match.group(1)) if cover_match else "",
|
||
}
|
||
|
||
def _pick_video_url(self, urls: List[Any]) -> str:
|
||
"""
|
||
从多个视频地址里优先挑选更适合直发的无水印链接。
|
||
|
||
规则:
|
||
1. 优先把 playwm 改成 play,尽量拿无水印地址;
|
||
2. 优先选择 aweme.snssdk.com 这类直链;
|
||
3. 如果没有,再退回现有 v3/v10 / douyinvod 选择逻辑。
|
||
"""
|
||
decoded_urls = [
|
||
self._decode_text(str(url)).replace("playwm", "play")
|
||
for url in urls
|
||
if isinstance(url, str) and str(url).strip()
|
||
]
|
||
snssdk_urls = [url for url in decoded_urls if "aweme.snssdk.com" in url]
|
||
if snssdk_urls:
|
||
return snssdk_urls[0]
|
||
return self._prefer_v3_v10(decoded_urls) or ""
|
||
|
||
def _match_json_string(self, text: str, key: str) -> str:
|
||
"""从 HTML 文本中的 JSON 片段抽取字符串字段。"""
|
||
match = re.search(rf'"{re.escape(key)}":\s*"([^"]*)"', text or "")
|
||
return self._clean_text(self._decode_text(match.group(1))) if match else ""
|
||
|
||
def _decode_text(self, value: Any) -> str:
|
||
"""同时处理 HTML 转义与 unicode 转义,避免标题和 URL 出现 \\uXXXX / &。"""
|
||
if value is None:
|
||
return ""
|
||
text = html.unescape(str(value))
|
||
# 只有在文本里明显存在 \uXXXX / \xXX 这类转义片段时才做 unicode_escape 解码,
|
||
# 避免把本来已经是正常中文的字符串再次错误解码成乱码。
|
||
if "\\u" in text or "\\x" in text:
|
||
try:
|
||
text = text.encode("utf-8").decode("unicode_escape")
|
||
except Exception:
|
||
pass
|
||
# 某些链路里文本已经在上游被错误按 Latin-1 / CP1252 解过一次,
|
||
# 这里做一层“仅在明显像乱码时才尝试”的温和修复,避免正常中文被误伤。
|
||
return self._repair_mojibake_text(text)
|
||
|
||
def _clean_text(self, value: Any) -> str:
|
||
"""统一清理文本字段,避免标题/作者带空白或转义残留。"""
|
||
return "" if value is None else self._decode_text(value).strip()
|
||
|
||
def _decode_http_response_text(self, response: requests.Response) -> str:
|
||
"""更稳妥地把 HTTP 响应转成文本。
|
||
|
||
设计说明:
|
||
1. 抖音分享页和大部分 JSON/HTML 实际都用 UTF-8;
|
||
2. `apparent_encoding` 在中文短文本页面上很容易误判,直接用会把整段中文解坏;
|
||
3. 因此先信任 UTF-8,再逐步回退到 header / apparent / replace。
|
||
"""
|
||
raw_bytes = response.content or b""
|
||
if not raw_bytes:
|
||
return ""
|
||
|
||
for encoding in ("utf-8", response.encoding, response.apparent_encoding, "gb18030"):
|
||
if not encoding:
|
||
continue
|
||
try:
|
||
decoded_text = raw_bytes.decode(encoding)
|
||
# 如果解出来明显像“UTF-8 被错按单字节编码解释过”,再试着修一手。
|
||
repaired_text = self._repair_mojibake_text(decoded_text)
|
||
if repaired_text:
|
||
return repaired_text
|
||
except Exception:
|
||
continue
|
||
return raw_bytes.decode("utf-8", errors="replace")
|
||
|
||
def _looks_like_mojibake(self, text: str) -> bool:
|
||
"""判断文本是否像常见的 UTF-8 误解码乱码。"""
|
||
if not text:
|
||
return False
|
||
suspicious_markers = ("Ã", "Â", "æ", "ä", "å", "ç", "é", "ê", "ï", "ð")
|
||
marker_hits = sum(text.count(marker) for marker in suspicious_markers)
|
||
# 中文场景里这些字符密集出现时,基本就是“UTF-8 被按 Latin-1/CP1252 解了”。
|
||
return marker_hits >= 2
|
||
|
||
def _repair_mojibake_text(self, text: str) -> str:
|
||
"""修复常见的中文乱码,但只在高置信度时生效。"""
|
||
if not text or not self._looks_like_mojibake(text):
|
||
return text
|
||
|
||
for source_encoding in ("latin1", "cp1252"):
|
||
try:
|
||
repaired_text = text.encode(source_encoding).decode("utf-8")
|
||
# 修复后若中文比例明显提升,就采用修复结果。
|
||
if repaired_text and self._count_cjk_chars(repaired_text) >= self._count_cjk_chars(text):
|
||
return repaired_text
|
||
except Exception:
|
||
continue
|
||
return text
|
||
|
||
def _count_cjk_chars(self, text: str) -> int:
|
||
"""统计字符串中的中日韩统一表意文字数量,用于判断修复是否更合理。"""
|
||
if not text:
|
||
return 0
|
||
return sum(1 for ch in text if "\u4e00" <= ch <= "\u9fff")
|
||
|
||
def _build_proxies(self) -> Optional[Dict[str, str]]:
|
||
if self.http_proxy:
|
||
return {"http": self.http_proxy, "https": self.http_proxy}
|
||
return None
|
||
|
||
def _build_request_headers(self) -> Dict[str, str]:
|
||
"""
|
||
构建通用请求头。
|
||
|
||
设计说明:
|
||
- User-Agent 保持常规浏览器标识,降低被目标站点直接拒绝的概率;
|
||
- Cookie 在有配置时注入到请求头,提升受限资源的提取成功率。
|
||
"""
|
||
headers = {
|
||
"User-Agent": (
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
"Chrome/124.0.0.0 Safari/537.36"
|
||
)
|
||
}
|
||
if self.cookie:
|
||
headers["Cookie"] = self.cookie
|
||
return headers
|
||
|
||
def _build_page_request_headers(self) -> Dict[str, str]:
|
||
"""
|
||
构建用于访问抖音分享页的请求头。
|
||
|
||
这里单独使用移动端 Safari UA,是因为参考项目和线上经验都表明:
|
||
- 分享页 HTML 在移动端更稳定地携带 _ROUTER_DATA;
|
||
- 图文作品在移动端页面中的结构更统一;
|
||
- 不影响现有 API 兜底链路,因为只用于本地页面抓取。
|
||
"""
|
||
headers = self._build_request_headers()
|
||
headers["User-Agent"] = (
|
||
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) "
|
||
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1"
|
||
)
|
||
headers["Referer"] = "https://www.douyin.com/"
|
||
headers["Accept-Language"] = "zh-CN,zh;q=0.9"
|
||
return headers
|
||
|
||
def _parse_from_internal_api(self, clean_url: str) -> Optional[Dict[str, Any]]:
|
||
try:
|
||
endpoint = "http://192.168.2.32:8999/api/hybrid/video_data"
|
||
headers = self._build_request_headers()
|
||
headers["accept"] = "application/json"
|
||
params = {"url": clean_url, "minimal": "false"}
|
||
response = requests.get(endpoint, headers=headers, params=params, timeout=10, proxies=self._build_proxies())
|
||
if response.status_code != 200:
|
||
return None
|
||
body = response.json() or {}
|
||
if body.get("code") != 200:
|
||
return None
|
||
data = body.get("data") or {}
|
||
aweme_type = data.get("aweme_type")
|
||
author = (data.get("author") or {})
|
||
nickname = author.get("nickname") or author.get("unique_id") or "未知作者"
|
||
if aweme_type == 68 or (data.get("images") or data.get("image_list")):
|
||
images_field = data.get("images") or data.get("image_list") or []
|
||
note_pages = self._build_note_pages(images_field)
|
||
images = [page.get("image_candidates", [""])[0] for page in note_pages if page.get("image_candidates")]
|
||
desc = data.get("desc") or data.get("caption") or ""
|
||
result = {"type": "image", "images": images, "title": desc, "author": nickname,
|
||
"note_pages": note_pages, "cover": images[0] if images else ""}
|
||
if images:
|
||
return result
|
||
return None
|
||
video = data.get("video") or {}
|
||
bit_rates = video.get("bit_rate") or []
|
||
chosen_url = ""
|
||
mp4_sorted = sorted([br for br in bit_rates if br.get("format") == "mp4"],
|
||
key=lambda x: x.get("bit_rate") or 0, reverse=True)
|
||
for br in mp4_sorted:
|
||
play_addr = br.get("play_addr") or {}
|
||
urls = play_addr.get("url_list") or []
|
||
selected = self._prefer_v3_v10(urls)
|
||
if selected:
|
||
chosen_url = selected
|
||
break
|
||
if not chosen_url:
|
||
play_addr = video.get("play_addr") or {}
|
||
urls = play_addr.get("url_list") or []
|
||
selected = self._prefer_v3_v10(urls)
|
||
if selected:
|
||
chosen_url = selected
|
||
cover = (video.get("cover") or {}).get("url_list") or []
|
||
cover_url = cover[0] if cover else ""
|
||
caption = data.get("caption") or "无标题"
|
||
author = (data.get("author") or {})
|
||
nickname = author.get("nickname") or author.get("unique_id") or "未知作者"
|
||
result = {"type": "video", "url": chosen_url or "", "title": caption, "author": nickname,
|
||
"cover": cover_url}
|
||
if result.get("url"):
|
||
return result
|
||
return None
|
||
except Exception:
|
||
return None
|
||
|
||
def _parse_from_external_api(self, clean_url: str) -> Optional[Dict[str, Any]]:
|
||
try:
|
||
pay_api_url = "https://api.pearktrue.cn/api/video/api.php"
|
||
params = {"url": clean_url, "key": "f56c1fed0c6e64e7"}
|
||
response = requests.post(
|
||
pay_api_url,
|
||
params=params,
|
||
headers=self._build_request_headers(),
|
||
timeout=10,
|
||
proxies=self._build_proxies(),
|
||
)
|
||
if response.status_code != 200:
|
||
return None
|
||
data = response.json() or {}
|
||
if data.get("code") == 200:
|
||
result = data.get("data", {})
|
||
if result.get("url"):
|
||
return result
|
||
return None
|
||
except Exception:
|
||
return None
|
||
|
||
def _prefer_v3_v10(self, urls: List[str]) -> Optional[str]:
|
||
try:
|
||
if not urls:
|
||
return None
|
||
cleaned = [(u or "").strip().strip("`") for u in urls if u]
|
||
|
||
def is_vx(n: str) -> bool:
|
||
return bool(re.match(r"^v(3|4|5|6|7|8|9|10|11)(?:[\-.]|$)", n, re.I))
|
||
|
||
def is_douyinvod(n: str) -> bool:
|
||
return "douyinvod.com" in n.lower()
|
||
|
||
first = None
|
||
for s in cleaned:
|
||
netloc = urlparse(s).netloc
|
||
if is_vx(netloc) and is_douyinvod(netloc):
|
||
return s
|
||
if first is None:
|
||
first = s
|
||
for s in cleaned:
|
||
netloc = urlparse(s).netloc
|
||
if is_vx(netloc):
|
||
return s
|
||
for s in cleaned:
|
||
netloc = urlparse(s).netloc
|
||
if is_douyinvod(netloc):
|
||
return s
|
||
return first
|
||
except Exception:
|
||
return urls[0] if urls else None
|
||
|
||
def _prefer_image_url(self, urls: List[str]) -> Optional[str]:
|
||
try:
|
||
if not urls:
|
||
return None
|
||
cleaned = [(u or "").strip().strip("`") for u in urls if u]
|
||
jpeg = next((u for u in cleaned if ".jpeg" in u.lower() or u.lower().endswith(".jpg")), None)
|
||
if jpeg:
|
||
return jpeg
|
||
webp = next((u for u in cleaned if ".webp" in u.lower()), None)
|
||
if webp:
|
||
return webp
|
||
return cleaned[0]
|
||
except Exception:
|
||
return urls[0] if urls else None
|
||
|
||
def _download_stream(self, url, save_path):
|
||
"""
|
||
从指定URL读取视频流并保存到本地
|
||
:param url: 视频流的URL
|
||
:param save_path: 本地保存路径(包含文件名,例如 "video.mp4")
|
||
"""
|
||
try:
|
||
# 发送GET请求,启用流式传输
|
||
response = requests.get(
|
||
url,
|
||
stream=True,
|
||
headers=self._build_request_headers(),
|
||
proxies=self._build_proxies(),
|
||
timeout=30,
|
||
)
|
||
|
||
# 检查请求是否成功
|
||
response.raise_for_status() # 如果状态码不是200,将抛出异常
|
||
|
||
# 确保保存路径的目录存在
|
||
os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True)
|
||
|
||
# 检查是否是视频流(可选,根据Content-Type判断)
|
||
content_type = response.headers.get("Content-Type", "").lower()
|
||
if "video" not in content_type and "application/octet-stream" not in content_type:
|
||
self.LOG.warning(f"警告: 返回的可能不是视频流,Content-Type: {content_type}")
|
||
self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看
|
||
return None
|
||
|
||
# 以二进制写入模式保存流数据
|
||
with open(save_path, "wb") as file:
|
||
for chunk in response.iter_content(chunk_size=1024): # 分块读取,每块1KB
|
||
if chunk: # 过滤空块
|
||
file.write(chunk)
|
||
self.LOG.info(f"视频已下载到: {save_path}")
|
||
return os.path.abspath(save_path)
|
||
except requests.RequestException as e:
|
||
self.LOG.error(f"请求失败: {e}")
|
||
except IOError as e:
|
||
self.LOG.error(f"文件写入失败: {e}")
|
||
except Exception as e:
|
||
self.LOG.error(f"发生未知错误: {e}")
|
||
return None
|
||
|
||
def _download_image_bytes(self, url: str) -> Optional[bytes]:
|
||
try:
|
||
resp = requests.get(
|
||
url,
|
||
headers=self._build_request_headers(),
|
||
timeout=15,
|
||
proxies=self._build_proxies(),
|
||
)
|
||
if resp.status_code == 200:
|
||
return resp.content
|
||
return None
|
||
except Exception:
|
||
return None
|
||
|
||
def _download_video_bytes(self, url: str) -> Optional[bytes]:
|
||
"""下载短视频 bytes,供 live 实况页直接按视频发送。"""
|
||
try:
|
||
resp = requests.get(
|
||
url,
|
||
headers=self._build_request_headers(),
|
||
timeout=20,
|
||
proxies=self._build_proxies(),
|
||
)
|
||
if resp.status_code != 200:
|
||
return None
|
||
content_type = (resp.headers.get("Content-Type") or "").lower()
|
||
if "video" not in content_type and "application/octet-stream" not in content_type:
|
||
return None
|
||
return resp.content
|
||
except Exception:
|
||
return None
|
||
|
||
def _merge_images_vertical(self, images: List[bytes], target_width: int = 1242) -> Optional[bytes]:
|
||
try:
|
||
pil_images: List[Image.Image] = []
|
||
for b in images:
|
||
img = Image.open(io.BytesIO(b))
|
||
if img.mode in ("RGBA", "P"):
|
||
img = img.convert("RGB")
|
||
w, h = img.size
|
||
if w != target_width:
|
||
ratio = target_width / float(w)
|
||
img = img.resize((target_width, int(h * ratio)))
|
||
pil_images.append(img)
|
||
if not pil_images:
|
||
return None
|
||
total_height = sum(i.size[1] for i in pil_images)
|
||
merged = Image.new("RGB", (target_width, total_height))
|
||
y = 0
|
||
for im in pil_images:
|
||
merged.paste(im, (0, y))
|
||
y += im.size[1]
|
||
output = io.BytesIO()
|
||
merged.save(output, format="JPEG", quality=85)
|
||
return output.getvalue()
|
||
except Exception:
|
||
return None
|
||
|
||
def _merge_images_vertical_paged(self, images: List[bytes], target_width: int = 1242, max_total_height: int = 18000) -> Optional[List[bytes]]:
|
||
try:
|
||
outputs: List[bytes] = []
|
||
current_images: List[Image.Image] = []
|
||
current_height = 0
|
||
for b in images:
|
||
try:
|
||
img = Image.open(io.BytesIO(b))
|
||
if img.mode in ("RGBA", "P"):
|
||
img = img.convert("RGB")
|
||
w, h = img.size
|
||
if w != target_width:
|
||
ratio = target_width / float(w)
|
||
img = img.resize((target_width, int(h * ratio)))
|
||
ih = img.size[1]
|
||
except Exception:
|
||
continue
|
||
if current_images and current_height + ih > max_total_height:
|
||
merged = Image.new("RGB", (target_width, current_height))
|
||
y = 0
|
||
for im in current_images:
|
||
merged.paste(im, (0, y))
|
||
y += im.size[1]
|
||
out = io.BytesIO()
|
||
merged.save(out, format="JPEG", quality=85)
|
||
outputs.append(out.getvalue())
|
||
current_images = [img]
|
||
current_height = img.size[1]
|
||
else:
|
||
current_images.append(img)
|
||
current_height += ih
|
||
if current_images:
|
||
merged = Image.new("RGB", (target_width, current_height))
|
||
y = 0
|
||
for im in current_images:
|
||
merged.paste(im, (0, y))
|
||
y += im.size[1]
|
||
out = io.BytesIO()
|
||
merged.save(out, format="JPEG", quality=85)
|
||
outputs.append(out.getvalue())
|
||
return outputs if outputs else None
|
||
except Exception:
|
||
return None
|
||
|
||
def _download_first_available_image_bytes(self, candidates: List[str]) -> Optional[bytes]:
|
||
"""
|
||
按候选列表顺序下载第一张可用图片。
|
||
|
||
本地页面解析拿到的图片地址通常会给出多份 url_list,
|
||
这里逐个尝试可以减少单一 CDN 地址失效导致的图文整条失败。
|
||
"""
|
||
for candidate in candidates or []:
|
||
clean_candidate = self._clean_url(str(candidate or ""))
|
||
if not clean_candidate:
|
||
continue
|
||
image_bytes = self._download_image_bytes(clean_candidate)
|
||
if image_bytes:
|
||
return image_bytes
|
||
return None
|
||
|
||
def _download_first_available_video_bytes(self, candidates: List[str]) -> Optional[bytes]:
|
||
"""按候选列表顺序下载第一段可用视频。"""
|
||
for candidate in candidates or []:
|
||
clean_candidate = self._clean_url(str(candidate or ""))
|
||
if not clean_candidate:
|
||
continue
|
||
video_bytes = self._download_video_bytes(clean_candidate)
|
||
if video_bytes:
|
||
return video_bytes
|
||
return None
|
||
|
||
def _build_note_text(self, media_info: Dict[str, Any]) -> str:
|
||
"""
|
||
构建图文作品的单独文本说明。
|
||
|
||
设计说明:
|
||
1) 作者和文案分开展示,用户看到消息时更容易快速理解内容来源;
|
||
2) 不再把文本写进图片,避免图文较多时首图被额外改造;
|
||
3) 空字段会自动跳过,防止发出大段无意义占位文本。
|
||
"""
|
||
author = str(media_info.get("author", "") or "").strip()
|
||
title = str(media_info.get("title", "") or "").strip()
|
||
lines: List[str] = []
|
||
if author:
|
||
lines.append(f"作者:{author}")
|
||
if title:
|
||
lines.append(f"文案:{title}")
|
||
return "\n".join(lines).strip()
|