889 lines
35 KiB
Python
889 lines
35 KiB
Python
"""
|
||
短视频自动解析插件
|
||
|
||
自动检测消息中的短视频链接并解析,支持抖音、皮皮虾、哔哩哔哩等平台
|
||
"""
|
||
|
||
import io
|
||
import re
|
||
import tomllib
|
||
import aiohttp
|
||
import uuid
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import List, Optional
|
||
from zipfile import ZipFile
|
||
from urllib.parse import urlparse, urlunparse
|
||
from loguru import logger
|
||
from utils.plugin_base import PluginBase
|
||
from utils.decorators import on_text_message
|
||
from WechatHook import WechatHookClient
|
||
|
||
|
||
class VideoParser(PluginBase):
|
||
"""短视频解析插件"""
|
||
|
||
# 插件元数据
|
||
description = "自动解析短视频链接并发送卡片"
|
||
author = "ShiHao"
|
||
version = "1.0.0"
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.config = None
|
||
|
||
# 支持的短视频平台链接正则表达式
|
||
self.video_patterns = [
|
||
# 抖音
|
||
r'https?://v\.douyin\.com/[A-Za-z0-9_-]+/?',
|
||
r'https?://www\.douyin\.com/video/\d+',
|
||
r'https?://www\.iesdouyin\.com/share/video/\d+',
|
||
|
||
# 快手
|
||
r'https?://v\.kuaishou\.com/[A-Za-z0-9]+',
|
||
r'https?://www\.kuaishou\.com/short-video/\d+',
|
||
|
||
# 小红书
|
||
r'https?://xhslink\.com/[A-Za-z0-9]+',
|
||
r'https?://www\.xiaohongshu\.com/discovery/item/[A-Za-z0-9]+',
|
||
|
||
# 微博
|
||
r'https?://weibo\.com/tv/show/\d+:\d+',
|
||
r'https?://video\.weibo\.com/show\?fid=\d+:\d+',
|
||
|
||
# 微视
|
||
r'https?://video\.weishi\.qq\.com/[A-Za-z0-9]+',
|
||
r'https?://h5\.weishi\.qq\.com/weishi/feed/[A-Za-z0-9]+',
|
||
|
||
# 西瓜视频
|
||
r'https?://v\.ixigua\.com/[A-Za-z0-9]+',
|
||
|
||
# 最右
|
||
r'https?://share\.izuiyou\.com/[A-Za-z0-9]+',
|
||
|
||
# 美拍
|
||
r'https?://www\.meipai\.com/media/\d+',
|
||
|
||
# 虎牙
|
||
r'https?://v\.huya\.com/play/\d+\.html',
|
||
|
||
# 梨视频
|
||
r'https?://www\.pearvideo\.com/video_\d+',
|
||
|
||
# TikTok
|
||
r'https?://(?:www\.)?tiktok\.com/@[^/]+/video/\d+',
|
||
r'https?://vm\.tiktok\.com/[A-Za-z0-9]+',
|
||
|
||
# YouTube
|
||
r'https?://(?:www\.)?youtube\.com/watch\?v=[A-Za-z0-9_-]+',
|
||
r'https?://youtu\.be/[A-Za-z0-9_-]+',
|
||
|
||
# Instagram
|
||
r'https?://(?:www\.)?instagram\.com/(?:p|reel)/[A-Za-z0-9_-]+',
|
||
]
|
||
|
||
# 编译正则表达式
|
||
self.compiled_patterns = [re.compile(pattern) for pattern in self.video_patterns]
|
||
|
||
async def async_init(self):
|
||
"""插件异步初始化"""
|
||
# 读取配置
|
||
config_path = Path(__file__).parent / "config.toml"
|
||
with open(config_path, "rb") as f:
|
||
self.config = tomllib.load(f)
|
||
|
||
logger.success("[VideoParser] 短视频解析插件已加载")
|
||
|
||
@on_text_message(priority=60)
|
||
async def handle_video_link(self, bot: WechatHookClient, message: dict):
|
||
"""处理包含视频链接的消息"""
|
||
# 检查是否启用
|
||
if not self.config["behavior"]["enabled"]:
|
||
return
|
||
|
||
content = message.get("Content", "").strip()
|
||
from_wxid = message.get("FromWxid", "")
|
||
is_group = message.get("IsGroup", False)
|
||
|
||
# 检查群聊/私聊过滤
|
||
if is_group:
|
||
if not self._should_parse_group(from_wxid):
|
||
return
|
||
else:
|
||
if not self.config["behavior"]["enable_private"]:
|
||
return
|
||
|
||
# 检测消息中的视频链接
|
||
video_url = self._extract_video_url(content)
|
||
if not video_url:
|
||
return
|
||
|
||
logger.info(f"[VideoParser] 检测到视频链接: {video_url}")
|
||
|
||
# 调用 API 解析视频
|
||
try:
|
||
video_info = await self._parse_video(video_url)
|
||
if video_info and self._is_douyin_or_tiktok(video_url):
|
||
hybrid_info = await self._parse_hybrid_video_data(video_url)
|
||
if hybrid_info:
|
||
video_info["_hybrid_data"] = hybrid_info
|
||
if video_info:
|
||
video_info["_source_url"] = video_url
|
||
if video_info:
|
||
# 发送链接卡片
|
||
await self._send_video_card(bot, from_wxid, video_info)
|
||
else:
|
||
logger.warning(f"[VideoParser] 视频解析失败: {video_url}")
|
||
except Exception as e:
|
||
logger.error(f"[VideoParser] 处理视频链接失败: {e}")
|
||
import traceback
|
||
logger.error(f"详细错误: {traceback.format_exc()}")
|
||
|
||
def _extract_video_url(self, content: str) -> str:
|
||
"""从消息内容中提取视频链接"""
|
||
for pattern in self.compiled_patterns:
|
||
match = pattern.search(content)
|
||
if match:
|
||
return match.group(0)
|
||
return ""
|
||
|
||
async def _parse_video(self, video_url: str) -> dict:
|
||
"""调用 API 解析视频"""
|
||
get_aweme_id_url = self.config["api"]["get_aweme_id_url"]
|
||
fetch_video_url = self.config["api"]["url"]
|
||
timeout = self.config["api"]["timeout"]
|
||
|
||
try:
|
||
import ssl
|
||
ssl_context = ssl.create_default_context()
|
||
ssl_context.check_hostname = False
|
||
ssl_context.verify_mode = ssl.CERT_NONE
|
||
|
||
# 配置代理
|
||
proxy_config = self.config.get("proxy", {})
|
||
proxy_url = None
|
||
if proxy_config.get("enabled", False):
|
||
proxy_type = proxy_config.get("type", "socks5")
|
||
proxy_host = proxy_config.get("host")
|
||
proxy_port = proxy_config.get("port")
|
||
if proxy_host and proxy_port:
|
||
proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}"
|
||
logger.info(f"[VideoParser] 使用代理: {proxy_url}")
|
||
|
||
connector = aiohttp.TCPConnector(
|
||
ssl=ssl_context,
|
||
force_close=True,
|
||
enable_cleanup_closed=True
|
||
)
|
||
|
||
async with aiohttp.ClientSession(connector=connector) as session:
|
||
# 第一步:提取 aweme_id
|
||
logger.info(f"[VideoParser] 提取视频ID: {get_aweme_id_url}")
|
||
async with session.get(
|
||
get_aweme_id_url,
|
||
params={"url": video_url},
|
||
proxy=proxy_url,
|
||
timeout=aiohttp.ClientTimeout(total=timeout)
|
||
) as response:
|
||
if response.status != 200:
|
||
logger.error(f"[VideoParser] 提取视频ID失败: HTTP {response.status}")
|
||
return None
|
||
|
||
result = await response.json()
|
||
logger.debug(f"[VideoParser] 提取ID返回: {result}")
|
||
|
||
if result.get("code") != 200:
|
||
logger.error(f"[VideoParser] 提取视频ID失败: {result.get('msg', '未知错误')}")
|
||
return None
|
||
|
||
# data 可能是字符串类型的 aweme_id
|
||
data = result.get("data")
|
||
if isinstance(data, str):
|
||
aweme_id = data
|
||
elif isinstance(data, dict):
|
||
aweme_id = data.get("aweme_id")
|
||
else:
|
||
aweme_id = None
|
||
|
||
if not aweme_id:
|
||
logger.error("[VideoParser] 未找到 aweme_id")
|
||
return None
|
||
|
||
logger.info(f"[VideoParser] 获取到视频ID: {aweme_id}")
|
||
|
||
# 第二步:获取视频数据
|
||
logger.info(f"[VideoParser] 获取视频数据: {fetch_video_url}")
|
||
async with session.get(
|
||
fetch_video_url,
|
||
params={"aweme_id": aweme_id},
|
||
proxy=proxy_url,
|
||
timeout=aiohttp.ClientTimeout(total=timeout)
|
||
) as response:
|
||
return await self._handle_response(response)
|
||
|
||
except aiohttp.ClientConnectorError as e:
|
||
logger.error(f"[VideoParser] 无法连接到 API 服务器: {e}")
|
||
return None
|
||
except aiohttp.ClientError as e:
|
||
logger.error(f"[VideoParser] 网络请求失败: {e}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"[VideoParser] 解析视频失败: {e}")
|
||
import traceback
|
||
logger.error(f"详细错误: {traceback.format_exc()}")
|
||
return None
|
||
|
||
async def _parse_hybrid_video_data(self, video_url: str) -> dict:
|
||
"""调用 hybrid/video_data 接口解析单条视频数据"""
|
||
hybrid_url = self._resolve_hybrid_url()
|
||
if not hybrid_url:
|
||
return None
|
||
|
||
timeout = self.config["api"]["timeout"]
|
||
try:
|
||
import ssl
|
||
ssl_context = ssl.create_default_context()
|
||
ssl_context.check_hostname = False
|
||
ssl_context.verify_mode = ssl.CERT_NONE
|
||
|
||
# 配置代理
|
||
proxy_config = self.config.get("proxy", {})
|
||
proxy_url = None
|
||
if proxy_config.get("enabled", False):
|
||
proxy_type = proxy_config.get("type", "socks5")
|
||
proxy_host = proxy_config.get("host")
|
||
proxy_port = proxy_config.get("port")
|
||
if proxy_host and proxy_port:
|
||
proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}"
|
||
|
||
connector = aiohttp.TCPConnector(
|
||
ssl=ssl_context,
|
||
force_close=True,
|
||
enable_cleanup_closed=True
|
||
)
|
||
|
||
async with aiohttp.ClientSession(connector=connector) as session:
|
||
async with session.get(
|
||
hybrid_url,
|
||
params={"url": video_url, "minimal": "false"},
|
||
proxy=proxy_url,
|
||
timeout=aiohttp.ClientTimeout(total=timeout)
|
||
) as response:
|
||
if response.status != 200:
|
||
logger.warning(f"[VideoParser] hybrid 接口失败: HTTP {response.status}")
|
||
return None
|
||
result = await response.json()
|
||
if result.get("code") not in [200, "200", 1, "1", True]:
|
||
logger.warning(f"[VideoParser] hybrid 接口返回错误: {result.get('msg') or result.get('message')}")
|
||
return None
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[VideoParser] hybrid 接口调用失败: {e}")
|
||
return None
|
||
|
||
def _resolve_hybrid_url(self) -> str:
|
||
"""自动复用下载接口域名构建 hybrid 接口地址"""
|
||
api_config = self.config.get("api", {}) if self.config else {}
|
||
hybrid_url = api_config.get("hybrid_url", "")
|
||
if hybrid_url:
|
||
return hybrid_url
|
||
|
||
download_api_url = self.config.get("download", {}).get("download_api_url", "")
|
||
if not download_api_url:
|
||
return ""
|
||
|
||
try:
|
||
parsed = urlparse(download_api_url)
|
||
if not parsed.scheme or not parsed.netloc:
|
||
return ""
|
||
return urlunparse(parsed._replace(path="/api/hybrid/video_data", query="", fragment=""))
|
||
except Exception:
|
||
return ""
|
||
|
||
async def _handle_response(self, response) -> dict:
|
||
"""处理 API 响应"""
|
||
if response.status != 200:
|
||
response_text = await response.text()
|
||
logger.error(f"[VideoParser] API 请求失败: HTTP {response.status}, 响应: {response_text[:200]}")
|
||
return None
|
||
|
||
result = await response.json()
|
||
logger.info(f"[VideoParser] API 返回: code={result.get('code')}, msg={result.get('msg')}")
|
||
# 打印完整返回数据以便调试
|
||
import json
|
||
logger.info(f"[VideoParser] 完整返回数据: {json.dumps(result, ensure_ascii=False, indent=2)}")
|
||
|
||
# 检查返回状态(支持多种状态码格式)
|
||
code = result.get("code")
|
||
if code not in [200, "200", 1, "1", True]:
|
||
logger.error(f"[VideoParser] API 返回错误: {result.get('msg', '未知错误')}")
|
||
return None
|
||
|
||
return result
|
||
|
||
async def _send_video_card(self, bot: WechatHookClient, to_wxid: str, video_info: dict):
|
||
"""发送视频信息卡片"""
|
||
try:
|
||
# 从 API 返回中提取字段
|
||
data = video_info.get("data", {})
|
||
aweme_detail = data.get("aweme_detail", {})
|
||
source_url = video_info.get("_source_url", "")
|
||
hybrid_data = video_info.get("_hybrid_data", {})
|
||
|
||
# 提取作者信息
|
||
author = aweme_detail.get("author", {})
|
||
nickname = author.get("nickname", "")
|
||
|
||
# 提取视频描述
|
||
desc = aweme_detail.get("desc", "")
|
||
|
||
# 先处理图文(无视频或存在图片列表)
|
||
image_urls = self._extract_image_urls(aweme_detail)
|
||
if hybrid_data:
|
||
hybrid_images = self._extract_image_urls_from_data(hybrid_data.get("data", {}))
|
||
if hybrid_images:
|
||
image_urls = hybrid_images
|
||
if image_urls:
|
||
logger.info(f"[VideoParser] 检测到图文内容,图片数量: {len(image_urls)}")
|
||
referer = self._build_douyin_referer(aweme_detail, data)
|
||
sent = await self._download_and_send_images(
|
||
bot,
|
||
to_wxid,
|
||
image_urls,
|
||
referer=referer,
|
||
source_url=source_url or referer,
|
||
)
|
||
if sent:
|
||
return
|
||
|
||
# 提取封面图(优先使用 cover_original_scale)
|
||
video = aweme_detail.get("video", {})
|
||
image_url = self._extract_url(
|
||
video.get("cover_original_scale")
|
||
or video.get("cover")
|
||
or video.get("origin_cover")
|
||
or video.get("dynamic_cover")
|
||
)
|
||
|
||
# 提取视频播放地址(使用 play_addr 的第一个链接)
|
||
play_addr = video.get("play_addr", {})
|
||
video_url = self._extract_url(play_addr)
|
||
|
||
# 使用默认值(如果字段为空)
|
||
title = nickname or self.config["card"]["default_title"]
|
||
desc = desc or self.config["card"]["default_desc"]
|
||
default_image_url = self.config.get("card", {}).get("default_image_url", "")
|
||
image_url = image_url if isinstance(image_url, str) else ""
|
||
if default_image_url:
|
||
image_url = default_image_url
|
||
else:
|
||
image_url = image_url or "https://mmbiz.qpic.cn/mmbiz_png/NbW0ZIUM8lVHoUbjXw2YbYXbNJDtUH7Sbkibm9Qwo9FhAiaEFG4jY3Q2MEleRpiaWDyDv8BZUfR85AW3kG4ib6DyAw/640?wx_fmt=png"
|
||
url = video_url or self.config["card"]["default_url"]
|
||
|
||
# 限制标题和描述长度
|
||
if len(title) > 50:
|
||
title = title[:47] + "..."
|
||
if len(desc) > 100:
|
||
desc = desc[:97] + "..."
|
||
|
||
logger.info(f"[VideoParser] 发送卡片: title={title}, desc={desc[:30]}...")
|
||
|
||
# 发送链接卡片
|
||
await bot.send_link_card(
|
||
to_wxid=to_wxid,
|
||
title=title,
|
||
desc=desc,
|
||
url=url,
|
||
image_url=image_url,
|
||
)
|
||
|
||
logger.success(f"[VideoParser] 视频卡片发送成功")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[VideoParser] 发送视频卡片失败: {e}")
|
||
import traceback
|
||
logger.error(f"详细错误: {traceback.format_exc()}")
|
||
|
||
def _extract_url(self, value):
|
||
"""提取第一个可用的 URL 字符串"""
|
||
if isinstance(value, str):
|
||
return value if value.startswith("http") else ""
|
||
if isinstance(value, dict):
|
||
for key in ("url_list", "url", "uri"):
|
||
if key in value:
|
||
result = self._extract_url(value.get(key))
|
||
if result:
|
||
return result
|
||
# 兜底:递归查找嵌套结构中的 URL
|
||
for v in value.values():
|
||
result = self._extract_url(v)
|
||
if result:
|
||
return result
|
||
return ""
|
||
if isinstance(value, list):
|
||
for item in value:
|
||
result = self._extract_url(item)
|
||
if result:
|
||
return result
|
||
return ""
|
||
|
||
def _extract_image_urls_from_data(self, data: dict) -> List[str]:
|
||
"""从 hybrid 或其他结构中提取图文图片 URL 列表"""
|
||
if not isinstance(data, dict):
|
||
return []
|
||
if data.get("aweme_detail") and isinstance(data.get("aweme_detail"), dict):
|
||
return self._extract_image_urls(data.get("aweme_detail"))
|
||
return self._extract_image_urls(data)
|
||
|
||
def _extract_image_urls(self, aweme_detail: dict) -> List[str]:
|
||
"""从 aweme_detail 中提取图文图片 URL 列表"""
|
||
image_urls: List[str] = []
|
||
images = aweme_detail.get("images")
|
||
if not images:
|
||
image_post_info = aweme_detail.get("image_post_info", {})
|
||
images = image_post_info.get("images") or []
|
||
|
||
if not isinstance(images, list):
|
||
return image_urls
|
||
|
||
for item in images:
|
||
url = self._extract_url(item)
|
||
if not url and isinstance(item, dict):
|
||
for key in ("download_url", "display_image", "origin_image", "cover", "thumbnail"):
|
||
url = self._extract_url(item.get(key))
|
||
if url:
|
||
break
|
||
if url:
|
||
image_urls.append(url)
|
||
return image_urls
|
||
|
||
def _build_douyin_referer(self, aweme_detail: dict, data: dict) -> str:
|
||
"""构建抖音图片请求 Referer"""
|
||
share_info = aweme_detail.get("share_info", {}) if isinstance(aweme_detail, dict) else {}
|
||
share_url = share_info.get("share_url") if isinstance(share_info, dict) else ""
|
||
if share_url:
|
||
return share_url
|
||
aweme_id = ""
|
||
for key in ("aweme_id", "aweme_id_str", "item_id"):
|
||
if aweme_detail.get(key):
|
||
aweme_id = str(aweme_detail.get(key))
|
||
break
|
||
if not aweme_id and isinstance(data, dict):
|
||
for key in ("aweme_id", "aweme_id_str", "item_id"):
|
||
if data.get(key):
|
||
aweme_id = str(data.get(key))
|
||
break
|
||
if aweme_id:
|
||
return f"https://www.douyin.com/video/{aweme_id}"
|
||
return "https://www.douyin.com/"
|
||
|
||
async def _download_and_send_images(
|
||
self,
|
||
bot: WechatHookClient,
|
||
to_wxid: str,
|
||
image_urls: List[str],
|
||
referer: str = "",
|
||
source_url: str = "",
|
||
) -> bool:
|
||
"""下载图文图片并合成长图发送"""
|
||
try:
|
||
if not self.config.get("download", {}).get("enabled", False):
|
||
logger.info("[VideoParser] 图片下载功能未启用")
|
||
return False
|
||
|
||
download_api_url = self.config["download"]["download_api_url"]
|
||
timeout = self.config["download"]["timeout"]
|
||
|
||
images_dir = Path(__file__).parent / "images"
|
||
images_dir.mkdir(exist_ok=True)
|
||
|
||
import ssl
|
||
|
||
ssl_context = ssl.create_default_context()
|
||
ssl_context.check_hostname = False
|
||
ssl_context.verify_mode = ssl.CERT_NONE
|
||
|
||
# 配置代理
|
||
proxy_config = self.config.get("proxy", {})
|
||
proxy_url = None
|
||
if proxy_config.get("enabled", False):
|
||
proxy_type = proxy_config.get("type", "socks5")
|
||
proxy_host = proxy_config.get("host")
|
||
proxy_port = proxy_config.get("port")
|
||
if proxy_host and proxy_port:
|
||
proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}"
|
||
|
||
connector = aiohttp.TCPConnector(
|
||
ssl=ssl_context,
|
||
force_close=True,
|
||
enable_cleanup_closed=True
|
||
)
|
||
|
||
image_bytes_list: List[bytes] = []
|
||
cookie_jar = aiohttp.CookieJar(unsafe=True)
|
||
async with aiohttp.ClientSession(connector=connector, cookie_jar=cookie_jar) as session:
|
||
await self._prime_douyin_session(session, proxy_url, timeout, referer)
|
||
|
||
# 仅通过下载接口获取整包图文(避免直链 403)
|
||
api_images: List[bytes] = []
|
||
if source_url:
|
||
api_images = await self._download_images_via_api(
|
||
session=session,
|
||
download_api_url=download_api_url,
|
||
source_url=source_url,
|
||
proxy_url=proxy_url,
|
||
timeout=timeout,
|
||
)
|
||
if api_images:
|
||
image_bytes_list = api_images
|
||
|
||
if not image_bytes_list:
|
||
logger.warning("[VideoParser] 图文包下载失败,未启用直链回退")
|
||
|
||
if not image_bytes_list:
|
||
logger.error("[VideoParser] 图文图片下载失败:没有可用图片")
|
||
return False
|
||
|
||
merged_bytes = self._merge_images_vertical(image_bytes_list)
|
||
if not merged_bytes:
|
||
logger.warning("[VideoParser] 图片合成失败,尝试发送首张图片")
|
||
first_path = images_dir / f"douyin_image_{datetime.now():%Y%m%d_%H%M%S}_{uuid.uuid4().hex[:8]}.jpg"
|
||
with open(first_path, "wb") as f:
|
||
f.write(image_bytes_list[0])
|
||
sent = await bot.send_image(to_wxid, str(first_path.resolve()))
|
||
return bool(sent)
|
||
|
||
merged_path = images_dir / f"douyin_merged_{datetime.now():%Y%m%d_%H%M%S}_{uuid.uuid4().hex[:8]}.jpg"
|
||
with open(merged_path, "wb") as f:
|
||
f.write(merged_bytes)
|
||
|
||
logger.info(f"[VideoParser] 长图已生成: {merged_path}")
|
||
sent = await bot.send_image(to_wxid, str(merged_path.resolve()))
|
||
if sent:
|
||
logger.success("[VideoParser] 长图发送成功")
|
||
else:
|
||
logger.error("[VideoParser] 长图发送失败")
|
||
return bool(sent)
|
||
|
||
except aiohttp.ClientError as e:
|
||
logger.error(f"[VideoParser] 图片下载网络错误: {e}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"[VideoParser] 图文图片处理失败: {e}")
|
||
import traceback
|
||
logger.error(f"详细错误: {traceback.format_exc()}")
|
||
return False
|
||
|
||
async def _download_image_bytes(
|
||
self,
|
||
session: aiohttp.ClientSession,
|
||
download_api_url: str,
|
||
img_url: str,
|
||
proxy_url: Optional[str],
|
||
timeout: int,
|
||
referer: str = "",
|
||
) -> Optional[bytes]:
|
||
"""获取图片字节(优先直链,其次下载接口)"""
|
||
base_headers = {
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
|
||
"Accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8",
|
||
"Accept-Language": "zh-CN,zh;q=0.9",
|
||
}
|
||
referers = [r for r in [referer, "https://www.douyin.com/", "https://www.iesdouyin.com/"] if r]
|
||
|
||
# 1) 直链下载
|
||
for ref in referers:
|
||
headers = dict(base_headers)
|
||
headers["Referer"] = ref
|
||
headers["Origin"] = "https://www.douyin.com"
|
||
try:
|
||
async with session.get(
|
||
img_url,
|
||
headers=headers,
|
||
proxy=proxy_url,
|
||
timeout=aiohttp.ClientTimeout(total=timeout),
|
||
allow_redirects=True,
|
||
) as response:
|
||
if response.status == 200:
|
||
data = await response.read()
|
||
if data:
|
||
return data
|
||
logger.warning("[VideoParser] 直链图片内容为空")
|
||
else:
|
||
logger.warning(f"[VideoParser] 直链图片下载失败: HTTP {response.status}")
|
||
except Exception as e:
|
||
logger.warning(f"[VideoParser] 直链图片下载异常: {e}")
|
||
|
||
# 2) 兜底使用下载接口(部分接口仅支持作品页链接)
|
||
if "douyinpic.com" in img_url or "douyinimg.com" in img_url:
|
||
return None
|
||
async with session.get(
|
||
download_api_url,
|
||
params={"url": img_url},
|
||
proxy=proxy_url,
|
||
timeout=aiohttp.ClientTimeout(total=timeout),
|
||
) as response:
|
||
if response.status != 200:
|
||
logger.error(f"[VideoParser] 图片下载失败: HTTP {response.status}")
|
||
return None
|
||
content_type = response.headers.get("Content-Type", "")
|
||
if "application/json" in content_type:
|
||
text = await response.text()
|
||
logger.warning(f"[VideoParser] 图片下载返回 JSON: {text[:200]}")
|
||
return None
|
||
data = await response.read()
|
||
if not data:
|
||
logger.warning("[VideoParser] 图片内容为空")
|
||
return None
|
||
return data
|
||
|
||
async def _download_images_via_api(
|
||
self,
|
||
session: aiohttp.ClientSession,
|
||
download_api_url: str,
|
||
source_url: str,
|
||
proxy_url: Optional[str],
|
||
timeout: int,
|
||
) -> List[bytes]:
|
||
"""通过下载接口获取图文包(ZIP 或单图)"""
|
||
try:
|
||
async with session.get(
|
||
download_api_url,
|
||
params={"url": source_url},
|
||
proxy=proxy_url,
|
||
timeout=aiohttp.ClientTimeout(total=timeout),
|
||
) as response:
|
||
if response.status != 200:
|
||
logger.error(f"[VideoParser] 图文包下载失败: HTTP {response.status}")
|
||
return []
|
||
content_type = response.headers.get("Content-Type", "")
|
||
if "application/json" in content_type:
|
||
text = await response.text()
|
||
logger.warning(f"[VideoParser] 图文包返回 JSON: {text[:200]}")
|
||
return []
|
||
data = await response.read()
|
||
if not data:
|
||
logger.warning("[VideoParser] 图文包内容为空")
|
||
return []
|
||
except Exception as e:
|
||
logger.warning(f"[VideoParser] 图文包下载异常: {e}")
|
||
return []
|
||
|
||
# ZIP 文件
|
||
if data[:2] == b"PK":
|
||
try:
|
||
images: List[bytes] = []
|
||
with ZipFile(io.BytesIO(data)) as zf:
|
||
names = [n for n in zf.namelist() if n.lower().endswith((".jpg", ".jpeg", ".png", ".webp"))]
|
||
for name in sorted(names):
|
||
with zf.open(name) as f:
|
||
images.append(f.read())
|
||
logger.info(f"[VideoParser] 图文包解压成功,图片数量: {len(images)}")
|
||
return images
|
||
except Exception as e:
|
||
logger.warning(f"[VideoParser] 图文包解压失败: {e}")
|
||
return []
|
||
|
||
# 单图返回
|
||
if data[:2] == b"\xff\xd8" or data[:8] == b"\x89PNG\r\n\x1a\n" or data[:4] == b"RIFF":
|
||
return [data]
|
||
|
||
# 非图片/非压缩包
|
||
if b"ftyp" in data[:12] or b"moov" in data[:100]:
|
||
logger.warning("[VideoParser] 下载接口返回视频文件,无法作为图文处理")
|
||
else:
|
||
logger.warning(f"[VideoParser] 图文包内容类型未知,前16字节: {data[:16].hex()}")
|
||
return []
|
||
|
||
async def _prime_douyin_session(
|
||
self,
|
||
session: aiohttp.ClientSession,
|
||
proxy_url: Optional[str],
|
||
timeout: int,
|
||
referer: str = "",
|
||
) -> None:
|
||
"""预热抖音 Cookie(减少直链 403)"""
|
||
targets = [r for r in [referer, "https://www.douyin.com/", "https://www.iesdouyin.com/"] if r]
|
||
headers = {
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||
"Accept-Language": "zh-CN,zh;q=0.9",
|
||
}
|
||
for url in targets:
|
||
try:
|
||
async with session.get(
|
||
url,
|
||
headers=headers,
|
||
proxy=proxy_url,
|
||
timeout=aiohttp.ClientTimeout(total=min(15, timeout)),
|
||
allow_redirects=True,
|
||
) as response:
|
||
await response.read()
|
||
except Exception:
|
||
continue
|
||
|
||
def _is_douyin_or_tiktok(self, video_url: str) -> bool:
|
||
return any(k in (video_url or "").lower() for k in ("douyin", "iesdouyin", "tiktok"))
|
||
|
||
def _merge_images_vertical(self, images: List[bytes], target_width: int = 1242) -> Optional[bytes]:
|
||
"""将多张图片按顺序拼接为长图"""
|
||
try:
|
||
from PIL import Image
|
||
except Exception:
|
||
logger.error("[VideoParser] 未安装 Pillow,无法合成长图")
|
||
return None
|
||
|
||
try:
|
||
pil_images: List[Image.Image] = []
|
||
for b in images:
|
||
img = Image.open(io.BytesIO(b))
|
||
if img.mode in ("RGBA", "P"):
|
||
img = img.convert("RGB")
|
||
w, h = img.size
|
||
if w != target_width:
|
||
ratio = target_width / float(w)
|
||
img = img.resize((target_width, int(h * ratio)))
|
||
pil_images.append(img)
|
||
if not pil_images:
|
||
return None
|
||
total_height = sum(i.size[1] for i in pil_images)
|
||
merged = Image.new("RGB", (target_width, total_height))
|
||
y = 0
|
||
for im in pil_images:
|
||
merged.paste(im, (0, y))
|
||
y += im.size[1]
|
||
output = io.BytesIO()
|
||
merged.save(output, format="JPEG", quality=85)
|
||
return output.getvalue()
|
||
except Exception:
|
||
return None
|
||
|
||
async def _download_and_send_video(self, bot: WechatHookClient, to_wxid: str, video_url: str):
|
||
"""下载视频并发送"""
|
||
try:
|
||
if not self.config.get("download", {}).get("enabled", False):
|
||
logger.info("[VideoParser] 视频下载功能未启用")
|
||
return False
|
||
|
||
download_api_url = self.config["download"]["download_api_url"]
|
||
timeout = self.config["download"]["timeout"]
|
||
|
||
# 下载到插件目录下的 videos 文件夹
|
||
videos_dir = Path(__file__).parent / "videos"
|
||
videos_dir.mkdir(exist_ok=True)
|
||
|
||
logger.info(f"[VideoParser] 开始下载视频: {video_url}")
|
||
|
||
import ssl
|
||
import time
|
||
import uuid
|
||
from datetime import datetime
|
||
|
||
ssl_context = ssl.create_default_context()
|
||
ssl_context.check_hostname = False
|
||
ssl_context.verify_mode = ssl.CERT_NONE
|
||
|
||
# 配置代理
|
||
proxy_config = self.config.get("proxy", {})
|
||
proxy_url = None
|
||
if proxy_config.get("enabled", False):
|
||
proxy_type = proxy_config.get("type", "socks5")
|
||
proxy_host = proxy_config.get("host")
|
||
proxy_port = proxy_config.get("port")
|
||
if proxy_host and proxy_port:
|
||
proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}"
|
||
|
||
connector = aiohttp.TCPConnector(
|
||
ssl=ssl_context,
|
||
force_close=True,
|
||
enable_cleanup_closed=True
|
||
)
|
||
|
||
async with aiohttp.ClientSession(connector=connector) as session:
|
||
async with session.get(
|
||
download_api_url,
|
||
params={"url": video_url},
|
||
proxy=proxy_url,
|
||
timeout=aiohttp.ClientTimeout(total=timeout)
|
||
) as response:
|
||
if response.status != 200:
|
||
logger.error(f"[VideoParser] 视频下载失败: HTTP {response.status}")
|
||
return False
|
||
|
||
# 检查响应类型
|
||
content_type = response.headers.get('Content-Type', '')
|
||
logger.info(f"[VideoParser] 响应类型: {content_type}")
|
||
|
||
video_data = await response.read()
|
||
|
||
# 检查是否是视频文件(MP4文件头)
|
||
if len(video_data) > 8:
|
||
file_header = video_data[:8].hex()
|
||
logger.info(f"[VideoParser] 文件头: {file_header}")
|
||
# MP4文件头通常是 00 00 00 xx 66 74 79 70
|
||
if not (b'ftyp' in video_data[:12] or b'moov' in video_data[:100]):
|
||
logger.warning(f"[VideoParser] 下载的可能不是有效的视频文件,前100字节: {video_data[:100]}")
|
||
|
||
if len(video_data) < 1024:
|
||
logger.warning(f"[VideoParser] 文件太小,可能下载失败,内容: {video_data[:200]}")
|
||
|
||
# 生成文件名
|
||
filename = f"douyin_{datetime.now():%Y%m%d_%H%M%S}_{uuid.uuid4().hex[:8]}.mp4"
|
||
file_path = videos_dir / filename
|
||
|
||
with open(file_path, "wb") as f:
|
||
f.write(video_data)
|
||
|
||
logger.info(f"[VideoParser] 视频下载完成: {file_path}, 文件大小: {len(video_data)} 字节")
|
||
|
||
# 等待文件写入完成
|
||
import os
|
||
max_wait = 10
|
||
wait_time = 0
|
||
while wait_time < max_wait:
|
||
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
|
||
logger.info(f"[VideoParser] 文件已就绪: {file_path}")
|
||
break
|
||
await asyncio.sleep(0.5)
|
||
wait_time += 0.5
|
||
|
||
if not os.path.exists(file_path):
|
||
logger.error(f"[VideoParser] 文件写入失败: {file_path}")
|
||
return False
|
||
|
||
logger.info(f"[VideoParser] 准备发送视频: {file_path}")
|
||
video_sent = await bot.send_media(to_wxid, str(file_path.resolve()), media_type="video")
|
||
|
||
if not video_sent:
|
||
logger.error(f"[VideoParser] 视频发送失败")
|
||
return False
|
||
|
||
logger.success(f"[VideoParser] 视频发送成功")
|
||
return True
|
||
|
||
except aiohttp.ClientError as e:
|
||
logger.error(f"[VideoParser] 视频下载网络错误: {e}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"[VideoParser] 视频下载失败: {e}")
|
||
import traceback
|
||
logger.error(f"详细错误: {traceback.format_exc()}")
|
||
return False
|
||
|
||
def _should_parse_group(self, room_wxid: str) -> bool:
|
||
"""判断是否应该在该群解析视频"""
|
||
enabled_groups = self.config["behavior"]["enabled_groups"]
|
||
disabled_groups = self.config["behavior"]["disabled_groups"]
|
||
|
||
# 如果在禁用列表中,不解析
|
||
if room_wxid in disabled_groups:
|
||
return False
|
||
|
||
# 如果启用列表为空,对所有群生效
|
||
if not enabled_groups:
|
||
return True
|
||
|
||
# 否则只对启用列表中的群生效
|
||
return room_wxid in enabled_groups
|