Files
abot/plugins/douyin_parser/main.py
liuwei a97e2fc092 修复抖音图文与卡片文案乱码问题
- 抖音分享页改为优先按 UTF-8 解码响应内容\n- 新增常见中文乱码识别与温和修复逻辑\n- 统一清洗抖音解析返回的标题与作者字段
2026-05-06 13:35:11 +08:00

944 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import time
import traceback
import json
import html
import requests
import io
from typing import Dict, Any, List, Optional, Tuple
from urllib.parse import urlparse
from loguru import logger
from pathlib import Path
from PIL import Image
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.appmsg_xml import LINK_XML_NORMAL, VIDEO_XML_MESSAGE
from wechat_ipad.models.message import MessageType
class DouyinParserError(Exception):
"""抖音解析器自定义异常基类"""
pass
class DouyinParserPlugin(MessagePluginInterface):
"""抖音无水印解析插件"""
# 功能权限常量
FEATURE_KEY = "DOUYIN_PARSER"
FEATURE_DESCRIPTION = "🎵 抖音解析功能 [自动解析抖音链接]"
@property
def name(self) -> str:
return "抖音解析"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供抖音链接无水印解析功能,支持视频下载和分享"
@property
def author(self) -> str:
return "姜不吃先生"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return [] # 不使用命令触发,而是通过消息内容匹配
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.LOG = logger
# 既兼容 v.douyin.com 短链,也兼容分享页已经展开后的 douyin / iesdouyin 链接。
# 这样用户直接转发短链、长链或者带标点的分享文案时,都能进入统一解析链路。
self.url_pattern = re.compile(r'https?://[^\s<>\"]+?(?:douyin\.com|iesdouyin\.com)[^\s<>\"]*')
# 本地页面解析优先复用分享页中的 _ROUTER_DATA。
# 这是参考 DouyinParser 项目接入的核心能力,可以在不依赖第三方接口的情况下直接拿到图文/视频元数据。
self.router_data_pattern = re.compile(r"window\._ROUTER_DATA\s*=\s*({.*?})\s*</script>", re.S)
# 注册功能权限
self.feature = self.register_feature()
# 修改为使用插件目录下的down_load_dir文件夹
self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir")
# 确保下载目录存在
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir, exist_ok=True)
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG.debug(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self.gbm = context.get("gbm")
self.download_dir = str(Path(Path(__file__).parent, "down_load_dir"))
# 从配置中获取参数
douyin_config = self._config.get("Douyin", {})
self.enable = douyin_config.get("enable", True)
self.http_proxy = douyin_config.get("http_proxy", "")
# Cookie 配置说明:
# 1) cookie: 直接粘贴请求头 Cookie 字符串;
# 2) cookie_file: Netscape 格式 cookies 文件路径;
# 3) 当二者同时存在时,后备提取优先 cookie_file兼容性更好
self.cookie = douyin_config.get("cookie", "") or ""
self.cookie_file = douyin_config.get("cookie_file", "") or ""
self.download_mode = douyin_config.get("download_mode", "card") # card或file
self.LOG.debug(f"[{self.name}] 插件初始化完成,代理设置: {self.http_proxy}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.debug(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
if message.get("type") != MessageType.TEXT:
return False
content = str(message.get("content", "")).strip()
return self._extract_douyin_url(content) is not None
@plugin_stats_decorator(plugin_name="抖音解析")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
try:
extracted_url = self._extract_douyin_url(content)
if not extracted_url:
return False, "未找到抖音链接"
original_url = self._clean_url(extracted_url)
self.LOG.info(f"发现抖音链接: {original_url}")
media_info = self._parse_douyin(original_url)
if not media_info:
self.LOG.error(f"❌无法解析抖音媒资信息")
return False, "解析失败"
media_type = media_info.get('type', 'video')
if media_type == 'image':
image_candidates = media_info.get('image_candidates') or []
if not image_candidates:
raw_images = media_info.get('images') or []
image_candidates = [[str(url).strip()] for url in raw_images if str(url).strip()]
if not image_candidates:
return False, "未获取到图片地址"
img_bytes_list: List[bytes] = []
# 本地页面解析会尽量给出每张图的多个候选地址。
# 这里逐组兜底下载,避免首选链接偶发 403/失效时整条图文直接失败。
for candidates in image_candidates:
b = self._download_first_available_image_bytes(candidates)
if b:
img_bytes_list.append(b)
if not img_bytes_list:
return False, "下载图片失败"
target_id = roomid if roomid else sender
# 图文作品改回“文本与图片分离发送”:
# 1. 文本单独发送,可读性更强,也方便用户直接复制文案;
# 2. 图片数量较少时保留原始逐张展示,避免小图文被强行拼成长图;
# 3. 图片较多时再合并,兼顾刷屏控制与浏览体验。
note_text = self._build_note_text(media_info)
if note_text:
await bot.send_text_message(target_id, note_text)
if len(img_bytes_list) > 3:
merged_pages = self._merge_images_vertical_paged(img_bytes_list, 1242, 65000)
if not merged_pages:
return False, "图片合并失败"
for page in merged_pages:
await bot.send_image_message(target_id, page)
return True, f"发送合并图片成功({len(merged_pages)}页)"
for image_bytes in img_bytes_list:
await bot.send_image_message(target_id, image_bytes)
return True, f"发送原图成功({len(img_bytes_list)}张)"
else:
video_url = media_info.get('url', '')
title = media_info.get('title', '无标题')
author = media_info.get('author', '未知作者')
cover = media_info.get('cover', '')
if not video_url:
self.LOG.error(f"❌无法获取视频地址")
return False, "获取视频地址失败"
if self.download_mode == "file":
video_filename = f"video_{int(time.time())}.mp4"
save_path = os.path.join(self.download_dir, video_filename)
self.LOG.info(f"开始下载视频到: {save_path}")
mp4_path = self._download_stream(video_url, os.path.join(self.download_dir, save_path))
if mp4_path:
await self.bot.send_video_message((roomid if roomid else sender), Path(mp4_path))
return True, "发送视频文件成功"
else:
self.LOG.error(f"❌下载视频失败")
return False, "下载视频失败"
else:
xml_content = f"{VIDEO_XML_MESSAGE}".format(title=author,
des=title,
url=video_url,
thumburl=cover
)
await self.bot.send_link_xml_message(xml_content, (roomid if roomid else sender))
return True, "发送卡片成功"
except DouyinParserError as e:
self.LOG.error(f"抖音解析错误: {e}")
self.LOG.error(f"❌抖音解析失败: {str(e)}")
return False, f"解析错误: {e}"
except Exception as e:
self.LOG.error(f"处理抖音链接出错: {e}\n{traceback.format_exc()}")
self.LOG.error(f"❌处理抖音链接出错: {str(e)}")
return False, f"处理出错: {e}"
def _clean_url(self, url: str) -> str:
"""清理URL"""
cleaned_url = url.strip().replace(';', '').replace('\n', '').replace('\r', '')
cleaned_url = cleaned_url.rstrip(",。,.!?)]}")
self.LOG.debug(f"[抖音] 清理后的URL: {cleaned_url}")
return cleaned_url
def _extract_douyin_url(self, content: str) -> Optional[str]:
"""
从消息文本中提取第一条抖音链接。
说明:
1. 分享文案里经常会把链接夹在中文标点中间,这里统一做一次裁剪;
2. 后续无论是本地页面解析还是外部兜底,都使用这一条标准化后的 URL
3. 入口收口后,后面如果要补充更多抖音域名,也只需要改这一处。
"""
match = self.url_pattern.search(str(content or ""))
if not match:
return None
return self._clean_url(match.group(0))
def _clean_response_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""清理响应数据"""
if not data:
return data
default_cover = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/v4/7c/49/e1/7c49e1af-ce92-d1c4-9a93-0a316e47ba94/AppIcon_TikTok-0-0-1x_U007epad-0-1-0-0-85-220.png/512x512bb.jpg"
media_type = data.get('type') or 'video'
# 三条解析链路最终都会走到这里,因此把标题/作者统一再清洗一遍,
# 可以同时兜住“本地页面解析”“内网接口”“外部接口”三种来源的乱码问题。
data['title'] = self._clean_text(data.get('title'))
data['author'] = self._clean_text(data.get('author'))
if media_type == 'video':
cover = data.get('cover')
if isinstance(cover, str):
c = cover.strip().strip('`')
data['cover'] = c if c.startswith('http') else default_cover
else:
data['cover'] = default_cover
else:
imgs = data.get('images') or []
data['cover'] = imgs[0] if imgs else default_cover
return data
def _parse_douyin(self, url: str) -> Dict[str, Any]:
try:
clean_url = self._clean_url(url)
# 第一优先级:本地页面解析。
# 这里参考 DouyinParser 项目,直接展开短链并解析分享页里的 _ROUTER_DATA
# 优点是不依赖外部第三方接口,命中成功时可直接拿到图文/视频的原始元数据。
local_result = self._parse_from_local_page(clean_url)
if local_result and (local_result.get('url') or local_result.get('images')):
return self._clean_response_data(local_result)
# 第二优先级:现有内网业务解析服务。
# 保留这条链路作为本地页面解析失败后的第一层兜底,避免线上能力回退。
primary = self._parse_from_internal_api(clean_url)
if primary and (primary.get('url') or primary.get('images')):
return self._clean_response_data(primary)
# 第三优先级:外部接口兜底。
# 这一层只在本地解析和内网解析都失败时再尝试,避免主路径对外部服务形成硬依赖。
secondary = self._parse_from_external_api(clean_url)
if secondary and (secondary.get('url') or secondary.get('images')):
return self._clean_response_data(secondary)
raise DouyinParserError("未获取到有效媒资数据")
except Exception as e:
self.LOG.error(f"[抖音] 解析过程发生未知错误: {str(e)}\n{traceback.format_exc()}")
raise DouyinParserError(f"未知错误: {str(e)}")
def _parse_from_local_page(self, clean_url: str) -> Optional[Dict[str, Any]]:
"""
直接解析抖音分享页 HTML。
实现思路参考外部 DouyinParser 项目,但这里做了两点本地化适配:
1. 继续沿用当前插件已有的 requests / proxy / cookie 配置,避免额外引入异步 HTTP 依赖;
2. 解析结果统一映射成当前插件现有的数据结构,尽量不改发送链路。
"""
try:
resolved_url = self._resolve_douyin_share_url(clean_url)
html_content = self._fetch_douyin_page_html(resolved_url)
if not html_content:
return None
result = self._parse_douyin_page_html(html_content)
if result and resolved_url and not result.get("source_url"):
result["source_url"] = resolved_url
return result
except Exception as e:
self.LOG.warning(f"[抖音] 本地页面解析失败,准备进入兜底链路: {e}")
return None
def _resolve_douyin_share_url(self, url: str) -> str:
"""
展开抖音短链,拿到最终分享页地址。
这样后续拉取 HTML 时可以稳定命中作品详情页,而不是停留在 v.douyin.com 的跳转页。
"""
response = requests.get(
url,
headers=self._build_page_request_headers(),
timeout=10,
proxies=self._build_proxies(),
allow_redirects=True,
)
response.raise_for_status()
final_url = str(response.url or url).strip()
self.LOG.debug(f"[抖音] 展开后的分享页地址: {final_url}")
return final_url
def _fetch_douyin_page_html(self, url: str) -> str:
"""拉取抖音分享页 HTML 内容。"""
response = requests.get(
url,
headers=self._build_page_request_headers(),
timeout=15,
proxies=self._build_proxies(),
)
response.raise_for_status()
# 抖音分享页绝大多数场景实际都是 UTF-8。
# 之前这里优先使用 apparent_encoding容易被短文本页面误判成 GBK/Latin-1
# 最终导致图文文案和卡片标题一进解析链路就已经变成乱码。
# 这里改成:
# 1. 优先按 UTF-8 直接解原始 bytes
# 2. UTF-8 失败时,再回退到响应头 / apparent_encoding
# 3. 最后兜底 replace至少保证流程不断。
html_content = self._decode_http_response_text(response)
if not html_content.strip():
raise DouyinParserError("抖音分享页内容为空")
return html_content
def _parse_douyin_page_html(self, html_content: str) -> Dict[str, Any]:
"""
解析分享页 HTML兼容图文与视频作品。
解析顺序:
1. 优先尝试新版页面里的 _ROUTER_DATA
2. 如果没有命中,再回退到旧页面中可直接正则提取的 video 字段。
"""
item = self._extract_aweme_item(html_content)
if item:
note = self._parse_note_item(item)
if note:
return note
video = self._parse_video_item(item)
if video:
return video
legacy_video = self._parse_legacy_video(html_content)
if legacy_video:
return legacy_video
raise DouyinParserError("未找到可解析的抖音图文或视频内容")
def _extract_aweme_item(self, html_content: str) -> Optional[Dict[str, Any]]:
"""
从页面中的 _ROUTER_DATA 提取第一条作品数据。
这是当前抖音分享页最稳定的数据来源,图文、视频都可以从这里统一解析。
"""
match = self.router_data_pattern.search(html_content or "")
if not match:
return None
try:
router_data = json.loads(match.group(1))
except json.JSONDecodeError as e:
self.LOG.warning(f"[抖音] 解析 _ROUTER_DATA 失败: {e}")
return None
loader_data = router_data.get("loaderData")
if not isinstance(loader_data, dict):
return None
for page_data in loader_data.values():
if not isinstance(page_data, dict):
continue
video_info = page_data.get("videoInfoRes")
if not isinstance(video_info, dict):
continue
item_list = video_info.get("item_list")
if isinstance(item_list, list) and item_list and isinstance(item_list[0], dict):
return item_list[0]
return None
def _parse_note_item(self, item: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
从作品数据中解析图文作品。
这里保留每张图的候选 URL 列表,后续下载阶段可以逐个重试,提升图文成功率。
"""
image_url_groups = self._pick_image_url_groups(item)
if not image_url_groups:
return None
return {
"type": "image",
"title": self._clean_text(item.get("desc")),
"author": self._clean_text((item.get("author") or {}).get("nickname")),
"images": [group[0] for group in image_url_groups if group],
"image_candidates": image_url_groups,
"cover": image_url_groups[0][0] if image_url_groups and image_url_groups[0] else "",
}
def _pick_image_url_groups(self, item: Dict[str, Any]) -> List[List[str]]:
"""提取图文中每一页图片的候选地址列表,并做去重。"""
image_url_groups: List[List[str]] = []
seen_groups = set()
for image_info in item.get("images") or item.get("image_infos") or []:
if not isinstance(image_info, dict):
continue
candidates: List[str] = []
seen_urls = set()
for image_url in image_info.get("url_list") or []:
if not isinstance(image_url, str) or not image_url.startswith("http"):
continue
decoded_url = self._decode_text(image_url)
if decoded_url in seen_urls:
continue
candidates.append(decoded_url)
seen_urls.add(decoded_url)
group_key = tuple(candidates)
if candidates and group_key not in seen_groups:
image_url_groups.append(candidates)
seen_groups.add(group_key)
return image_url_groups
def _parse_video_item(self, item: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""从作品数据中解析视频作品,并优先挑选无水印播放地址。"""
video = item.get("video")
if not isinstance(video, dict) or video.get("duration") == 0:
return None
play_addr = video.get("play_addr") or {}
urls = play_addr.get("url_list") or []
video_url = self._pick_video_url(urls)
if not video_url:
return None
cover = video.get("cover") or {}
cover_urls = cover.get("url_list") or []
cover_url = self._decode_text(cover_urls[0]) if cover_urls else ""
return {
"type": "video",
"url": video_url,
"title": self._clean_text(item.get("desc")),
"author": self._clean_text((item.get("author") or {}).get("nickname")),
"cover": cover_url,
}
def _parse_legacy_video(self, html_content: str) -> Optional[Dict[str, Any]]:
"""
兼容旧分享页结构。
有些页面没有 _ROUTER_DATA但仍然能从 play_addr / cover / desc 中拼出完整视频卡片。
"""
pattern = re.compile(r'"play_addr":\s*{\s*"uri":\s*"[^"]*",\s*"url_list":\s*\[([^\]]*)\]')
match = pattern.search(html_content or "")
if not match:
return None
raw_urls = [url.strip().strip('"') for url in match.group(1).split(",")]
video_url = self._pick_video_url(raw_urls)
if not video_url:
return None
title = self._match_json_string(html_content, "desc")
author = self._match_json_string(html_content, "nickname")
cover_match = re.search(r'"cover":\s*{\s*"url_list":\s*\[\s*"([^"]+)"', html_content or "")
return {
"type": "video",
"url": video_url,
"title": title,
"author": author,
"cover": self._decode_text(cover_match.group(1)) if cover_match else "",
}
def _pick_video_url(self, urls: List[Any]) -> str:
"""
从多个视频地址里优先挑选更适合直发的无水印链接。
规则:
1. 优先把 playwm 改成 play尽量拿无水印地址
2. 优先选择 aweme.snssdk.com 这类直链;
3. 如果没有,再退回现有 v3/v10 / douyinvod 选择逻辑。
"""
decoded_urls = [
self._decode_text(str(url)).replace("playwm", "play")
for url in urls
if isinstance(url, str) and str(url).strip()
]
snssdk_urls = [url for url in decoded_urls if "aweme.snssdk.com" in url]
if snssdk_urls:
return snssdk_urls[0]
return self._prefer_v3_v10(decoded_urls) or ""
def _match_json_string(self, text: str, key: str) -> str:
"""从 HTML 文本中的 JSON 片段抽取字符串字段。"""
match = re.search(rf'"{re.escape(key)}":\s*"([^"]*)"', text or "")
return self._clean_text(self._decode_text(match.group(1))) if match else ""
def _decode_text(self, value: Any) -> str:
"""同时处理 HTML 转义与 unicode 转义,避免标题和 URL 出现 \\uXXXX / &amp;。"""
if value is None:
return ""
text = html.unescape(str(value))
# 只有在文本里明显存在 \uXXXX / \xXX 这类转义片段时才做 unicode_escape 解码,
# 避免把本来已经是正常中文的字符串再次错误解码成乱码。
if "\\u" in text or "\\x" in text:
try:
text = text.encode("utf-8").decode("unicode_escape")
except Exception:
pass
# 某些链路里文本已经在上游被错误按 Latin-1 / CP1252 解过一次,
# 这里做一层“仅在明显像乱码时才尝试”的温和修复,避免正常中文被误伤。
return self._repair_mojibake_text(text)
def _clean_text(self, value: Any) -> str:
"""统一清理文本字段,避免标题/作者带空白或转义残留。"""
return "" if value is None else self._decode_text(value).strip()
def _decode_http_response_text(self, response: requests.Response) -> str:
"""更稳妥地把 HTTP 响应转成文本。
设计说明:
1. 抖音分享页和大部分 JSON/HTML 实际都用 UTF-8
2. `apparent_encoding` 在中文短文本页面上很容易误判,直接用会把整段中文解坏;
3. 因此先信任 UTF-8再逐步回退到 header / apparent / replace。
"""
raw_bytes = response.content or b""
if not raw_bytes:
return ""
for encoding in ("utf-8", response.encoding, response.apparent_encoding, "gb18030"):
if not encoding:
continue
try:
decoded_text = raw_bytes.decode(encoding)
# 如果解出来明显像“UTF-8 被错按单字节编码解释过”,再试着修一手。
repaired_text = self._repair_mojibake_text(decoded_text)
if repaired_text:
return repaired_text
except Exception:
continue
return raw_bytes.decode("utf-8", errors="replace")
def _looks_like_mojibake(self, text: str) -> bool:
"""判断文本是否像常见的 UTF-8 误解码乱码。"""
if not text:
return False
suspicious_markers = ("Ã", "Â", "æ", "ä", "å", "ç", "é", "ê", "ï", "ð")
marker_hits = sum(text.count(marker) for marker in suspicious_markers)
# 中文场景里这些字符密集出现时基本就是“UTF-8 被按 Latin-1/CP1252 解了”。
return marker_hits >= 2
def _repair_mojibake_text(self, text: str) -> str:
"""修复常见的中文乱码,但只在高置信度时生效。"""
if not text or not self._looks_like_mojibake(text):
return text
for source_encoding in ("latin1", "cp1252"):
try:
repaired_text = text.encode(source_encoding).decode("utf-8")
# 修复后若中文比例明显提升,就采用修复结果。
if repaired_text and self._count_cjk_chars(repaired_text) >= self._count_cjk_chars(text):
return repaired_text
except Exception:
continue
return text
def _count_cjk_chars(self, text: str) -> int:
"""统计字符串中的中日韩统一表意文字数量,用于判断修复是否更合理。"""
if not text:
return 0
return sum(1 for ch in text if "\u4e00" <= ch <= "\u9fff")
def _build_proxies(self) -> Optional[Dict[str, str]]:
if self.http_proxy:
return {"http": self.http_proxy, "https": self.http_proxy}
return None
def _build_request_headers(self) -> Dict[str, str]:
"""
构建通用请求头。
设计说明:
- User-Agent 保持常规浏览器标识,降低被目标站点直接拒绝的概率;
- Cookie 在有配置时注入到请求头,提升受限资源的提取成功率。
"""
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
)
}
if self.cookie:
headers["Cookie"] = self.cookie
return headers
def _build_page_request_headers(self) -> Dict[str, str]:
"""
构建用于访问抖音分享页的请求头。
这里单独使用移动端 Safari UA是因为参考项目和线上经验都表明
- 分享页 HTML 在移动端更稳定地携带 _ROUTER_DATA
- 图文作品在移动端页面中的结构更统一;
- 不影响现有 API 兜底链路,因为只用于本地页面抓取。
"""
headers = self._build_request_headers()
headers["User-Agent"] = (
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1"
)
headers["Referer"] = "https://www.douyin.com/"
headers["Accept-Language"] = "zh-CN,zh;q=0.9"
return headers
def _parse_from_internal_api(self, clean_url: str) -> Optional[Dict[str, Any]]:
try:
endpoint = "http://192.168.2.32:8999/api/hybrid/video_data"
headers = self._build_request_headers()
headers["accept"] = "application/json"
params = {"url": clean_url, "minimal": "false"}
response = requests.get(endpoint, headers=headers, params=params, timeout=10, proxies=self._build_proxies())
if response.status_code != 200:
return None
body = response.json() or {}
if body.get("code") != 200:
return None
data = body.get("data") or {}
aweme_type = data.get("aweme_type")
author = (data.get("author") or {})
nickname = author.get("nickname") or author.get("unique_id") or "未知作者"
if aweme_type == 68 or (data.get("images") or data.get("image_list")):
images_field = data.get("images") or []
images: List[str] = []
for img in images_field:
ulist = img.get("url_list") or img.get("download_url_list") or []
chosen = self._prefer_image_url(ulist)
if chosen:
images.append(chosen)
desc = data.get("desc") or data.get("caption") or ""
result = {"type": "image", "images": images, "title": desc, "author": nickname,
"cover": images[0] if images else ""}
if images:
return result
return None
video = data.get("video") or {}
bit_rates = video.get("bit_rate") or []
chosen_url = ""
mp4_sorted = sorted([br for br in bit_rates if br.get("format") == "mp4"],
key=lambda x: x.get("bit_rate") or 0, reverse=True)
for br in mp4_sorted:
play_addr = br.get("play_addr") or {}
urls = play_addr.get("url_list") or []
selected = self._prefer_v3_v10(urls)
if selected:
chosen_url = selected
break
if not chosen_url:
play_addr = video.get("play_addr") or {}
urls = play_addr.get("url_list") or []
selected = self._prefer_v3_v10(urls)
if selected:
chosen_url = selected
cover = (video.get("cover") or {}).get("url_list") or []
cover_url = cover[0] if cover else ""
caption = data.get("caption") or "无标题"
author = (data.get("author") or {})
nickname = author.get("nickname") or author.get("unique_id") or "未知作者"
result = {"type": "video", "url": chosen_url or "", "title": caption, "author": nickname,
"cover": cover_url}
if result.get("url"):
return result
return None
except Exception:
return None
def _parse_from_external_api(self, clean_url: str) -> Optional[Dict[str, Any]]:
try:
pay_api_url = "https://api.pearktrue.cn/api/video/api.php"
params = {"url": clean_url, "key": "f56c1fed0c6e64e7"}
response = requests.post(
pay_api_url,
params=params,
headers=self._build_request_headers(),
timeout=10,
proxies=self._build_proxies(),
)
if response.status_code != 200:
return None
data = response.json() or {}
if data.get("code") == 200:
result = data.get("data", {})
if result.get("url"):
return result
return None
except Exception:
return None
def _prefer_v3_v10(self, urls: List[str]) -> Optional[str]:
try:
if not urls:
return None
cleaned = [(u or "").strip().strip("`") for u in urls if u]
def is_vx(n: str) -> bool:
return bool(re.match(r"^v(3|4|5|6|7|8|9|10|11)(?:[\-.]|$)", n, re.I))
def is_douyinvod(n: str) -> bool:
return "douyinvod.com" in n.lower()
first = None
for s in cleaned:
netloc = urlparse(s).netloc
if is_vx(netloc) and is_douyinvod(netloc):
return s
if first is None:
first = s
for s in cleaned:
netloc = urlparse(s).netloc
if is_vx(netloc):
return s
for s in cleaned:
netloc = urlparse(s).netloc
if is_douyinvod(netloc):
return s
return first
except Exception:
return urls[0] if urls else None
def _prefer_image_url(self, urls: List[str]) -> Optional[str]:
try:
if not urls:
return None
cleaned = [(u or "").strip().strip("`") for u in urls if u]
jpeg = next((u for u in cleaned if ".jpeg" in u.lower() or u.lower().endswith(".jpg")), None)
if jpeg:
return jpeg
webp = next((u for u in cleaned if ".webp" in u.lower()), None)
if webp:
return webp
return cleaned[0]
except Exception:
return urls[0] if urls else None
def _download_stream(self, url, save_path):
"""
从指定URL读取视频流并保存到本地
:param url: 视频流的URL
:param save_path: 本地保存路径(包含文件名,例如 "video.mp4"
"""
try:
# 发送GET请求启用流式传输
response = requests.get(
url,
stream=True,
headers=self._build_request_headers(),
proxies=self._build_proxies(),
timeout=30,
)
# 检查请求是否成功
response.raise_for_status() # 如果状态码不是200将抛出异常
# 确保保存路径的目录存在
os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True)
# 检查是否是视频流可选根据Content-Type判断
content_type = response.headers.get("Content-Type", "").lower()
if "video" not in content_type and "application/octet-stream" not in content_type:
self.LOG.warning(f"警告: 返回的可能不是视频流Content-Type: {content_type}")
self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看
return None
# 以二进制写入模式保存流数据
with open(save_path, "wb") as file:
for chunk in response.iter_content(chunk_size=1024): # 分块读取每块1KB
if chunk: # 过滤空块
file.write(chunk)
self.LOG.info(f"视频已下载到: {save_path}")
return os.path.abspath(save_path)
except requests.RequestException as e:
self.LOG.error(f"请求失败: {e}")
except IOError as e:
self.LOG.error(f"文件写入失败: {e}")
except Exception as e:
self.LOG.error(f"发生未知错误: {e}")
return None
def _download_image_bytes(self, url: str) -> Optional[bytes]:
try:
resp = requests.get(
url,
headers=self._build_request_headers(),
timeout=15,
proxies=self._build_proxies(),
)
if resp.status_code == 200:
return resp.content
return None
except Exception:
return None
def _merge_images_vertical(self, images: List[bytes], target_width: int = 1242) -> Optional[bytes]:
try:
pil_images: List[Image.Image] = []
for b in images:
img = Image.open(io.BytesIO(b))
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
w, h = img.size
if w != target_width:
ratio = target_width / float(w)
img = img.resize((target_width, int(h * ratio)))
pil_images.append(img)
if not pil_images:
return None
total_height = sum(i.size[1] for i in pil_images)
merged = Image.new("RGB", (target_width, total_height))
y = 0
for im in pil_images:
merged.paste(im, (0, y))
y += im.size[1]
output = io.BytesIO()
merged.save(output, format="JPEG", quality=85)
return output.getvalue()
except Exception:
return None
def _merge_images_vertical_paged(self, images: List[bytes], target_width: int = 1242, max_total_height: int = 18000) -> Optional[List[bytes]]:
try:
outputs: List[bytes] = []
current_images: List[Image.Image] = []
current_height = 0
for b in images:
try:
img = Image.open(io.BytesIO(b))
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
w, h = img.size
if w != target_width:
ratio = target_width / float(w)
img = img.resize((target_width, int(h * ratio)))
ih = img.size[1]
except Exception:
continue
if current_images and current_height + ih > max_total_height:
merged = Image.new("RGB", (target_width, current_height))
y = 0
for im in current_images:
merged.paste(im, (0, y))
y += im.size[1]
out = io.BytesIO()
merged.save(out, format="JPEG", quality=85)
outputs.append(out.getvalue())
current_images = [img]
current_height = img.size[1]
else:
current_images.append(img)
current_height += ih
if current_images:
merged = Image.new("RGB", (target_width, current_height))
y = 0
for im in current_images:
merged.paste(im, (0, y))
y += im.size[1]
out = io.BytesIO()
merged.save(out, format="JPEG", quality=85)
outputs.append(out.getvalue())
return outputs if outputs else None
except Exception:
return None
def _download_first_available_image_bytes(self, candidates: List[str]) -> Optional[bytes]:
"""
按候选列表顺序下载第一张可用图片。
本地页面解析拿到的图片地址通常会给出多份 url_list
这里逐个尝试可以减少单一 CDN 地址失效导致的图文整条失败。
"""
for candidate in candidates or []:
clean_candidate = self._clean_url(str(candidate or ""))
if not clean_candidate:
continue
image_bytes = self._download_image_bytes(clean_candidate)
if image_bytes:
return image_bytes
return None
def _build_note_text(self, media_info: Dict[str, Any]) -> str:
"""
构建图文作品的单独文本说明。
设计说明:
1) 作者和文案分开展示,用户看到消息时更容易快速理解内容来源;
2) 不再把文本写进图片,避免图文较多时首图被额外改造;
3) 空字段会自动跳过,防止发出大段无意义占位文本。
"""
author = str(media_info.get("author", "") or "").strip()
title = str(media_info.get("title", "") or "").strip()
lines: List[str] = []
if author:
lines.append(f"作者:{author}")
if title:
lines.append(f"文案:{title}")
return "\n".join(lines).strip()