430 lines
16 KiB
Python
430 lines
16 KiB
Python
"""
|
||
短视频自动解析插件
|
||
|
||
自动检测消息中的短视频链接并解析,支持抖音、皮皮虾、哔哩哔哩等平台
|
||
"""
|
||
|
||
import re
|
||
import tomllib
|
||
import aiohttp
|
||
from pathlib import Path
|
||
from loguru import logger
|
||
from utils.plugin_base import PluginBase
|
||
from utils.decorators import on_text_message
|
||
from WechatHook import WechatHookClient
|
||
|
||
|
||
class VideoParser(PluginBase):
|
||
"""短视频解析插件"""
|
||
|
||
# 插件元数据
|
||
description = "自动解析短视频链接并发送卡片"
|
||
author = "ShiHao"
|
||
version = "1.0.0"
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.config = None
|
||
|
||
# 支持的短视频平台链接正则表达式
|
||
self.video_patterns = [
|
||
# 抖音
|
||
r'https?://v\.douyin\.com/[A-Za-z0-9]+',
|
||
r'https?://www\.douyin\.com/video/\d+',
|
||
r'https?://www\.iesdouyin\.com/share/video/\d+',
|
||
|
||
# 快手
|
||
r'https?://v\.kuaishou\.com/[A-Za-z0-9]+',
|
||
r'https?://www\.kuaishou\.com/short-video/\d+',
|
||
|
||
# 小红书
|
||
r'https?://xhslink\.com/[A-Za-z0-9]+',
|
||
r'https?://www\.xiaohongshu\.com/discovery/item/[A-Za-z0-9]+',
|
||
|
||
# 微博
|
||
r'https?://weibo\.com/tv/show/\d+:\d+',
|
||
r'https?://video\.weibo\.com/show\?fid=\d+:\d+',
|
||
|
||
# 微视
|
||
r'https?://video\.weishi\.qq\.com/[A-Za-z0-9]+',
|
||
r'https?://h5\.weishi\.qq\.com/weishi/feed/[A-Za-z0-9]+',
|
||
|
||
# 西瓜视频
|
||
r'https?://v\.ixigua\.com/[A-Za-z0-9]+',
|
||
|
||
# 最右
|
||
r'https?://share\.izuiyou\.com/[A-Za-z0-9]+',
|
||
|
||
# 美拍
|
||
r'https?://www\.meipai\.com/media/\d+',
|
||
|
||
# 虎牙
|
||
r'https?://v\.huya\.com/play/\d+\.html',
|
||
|
||
# 梨视频
|
||
r'https?://www\.pearvideo\.com/video_\d+',
|
||
|
||
# TikTok
|
||
r'https?://(?:www\.)?tiktok\.com/@[^/]+/video/\d+',
|
||
r'https?://vm\.tiktok\.com/[A-Za-z0-9]+',
|
||
|
||
# YouTube
|
||
r'https?://(?:www\.)?youtube\.com/watch\?v=[A-Za-z0-9_-]+',
|
||
r'https?://youtu\.be/[A-Za-z0-9_-]+',
|
||
|
||
# Instagram
|
||
r'https?://(?:www\.)?instagram\.com/(?:p|reel)/[A-Za-z0-9_-]+',
|
||
]
|
||
|
||
# 编译正则表达式
|
||
self.compiled_patterns = [re.compile(pattern) for pattern in self.video_patterns]
|
||
|
||
async def async_init(self):
|
||
"""插件异步初始化"""
|
||
# 读取配置
|
||
config_path = Path(__file__).parent / "config.toml"
|
||
with open(config_path, "rb") as f:
|
||
self.config = tomllib.load(f)
|
||
|
||
logger.success("[VideoParser] 短视频解析插件已加载")
|
||
|
||
@on_text_message(priority=60)
|
||
async def handle_video_link(self, bot: WechatHookClient, message: dict):
|
||
"""处理包含视频链接的消息"""
|
||
# 检查是否启用
|
||
if not self.config["behavior"]["enabled"]:
|
||
return
|
||
|
||
content = message.get("Content", "").strip()
|
||
from_wxid = message.get("FromWxid", "")
|
||
is_group = message.get("IsGroup", False)
|
||
|
||
# 检查群聊/私聊过滤
|
||
if is_group:
|
||
if not self._should_parse_group(from_wxid):
|
||
return
|
||
else:
|
||
if not self.config["behavior"]["enable_private"]:
|
||
return
|
||
|
||
# 检测消息中的视频链接
|
||
video_url = self._extract_video_url(content)
|
||
if not video_url:
|
||
return
|
||
|
||
logger.info(f"[VideoParser] 检测到视频链接: {video_url}")
|
||
|
||
# 调用 API 解析视频
|
||
try:
|
||
video_info = await self._parse_video(video_url)
|
||
if video_info:
|
||
# 发送链接卡片
|
||
await self._send_video_card(bot, from_wxid, video_info)
|
||
|
||
# 下载并发送视频(使用原始分享链接)
|
||
await self._download_and_send_video(bot, from_wxid, video_url)
|
||
else:
|
||
logger.warning(f"[VideoParser] 视频解析失败: {video_url}")
|
||
except Exception as e:
|
||
logger.error(f"[VideoParser] 处理视频链接失败: {e}")
|
||
import traceback
|
||
logger.error(f"详细错误: {traceback.format_exc()}")
|
||
|
||
def _extract_video_url(self, content: str) -> str:
|
||
"""从消息内容中提取视频链接"""
|
||
for pattern in self.compiled_patterns:
|
||
match = pattern.search(content)
|
||
if match:
|
||
return match.group(0)
|
||
return ""
|
||
|
||
async def _parse_video(self, video_url: str) -> dict:
|
||
"""调用 API 解析视频"""
|
||
get_aweme_id_url = self.config["api"]["get_aweme_id_url"]
|
||
fetch_video_url = self.config["api"]["url"]
|
||
timeout = self.config["api"]["timeout"]
|
||
|
||
try:
|
||
import ssl
|
||
ssl_context = ssl.create_default_context()
|
||
ssl_context.check_hostname = False
|
||
ssl_context.verify_mode = ssl.CERT_NONE
|
||
|
||
# 配置代理
|
||
proxy_config = self.config.get("proxy", {})
|
||
proxy_url = None
|
||
if proxy_config.get("enabled", False):
|
||
proxy_type = proxy_config.get("type", "socks5")
|
||
proxy_host = proxy_config.get("host")
|
||
proxy_port = proxy_config.get("port")
|
||
if proxy_host and proxy_port:
|
||
proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}"
|
||
logger.info(f"[VideoParser] 使用代理: {proxy_url}")
|
||
|
||
connector = aiohttp.TCPConnector(
|
||
ssl=ssl_context,
|
||
force_close=True,
|
||
enable_cleanup_closed=True
|
||
)
|
||
|
||
async with aiohttp.ClientSession(connector=connector) as session:
|
||
# 第一步:提取 aweme_id
|
||
logger.info(f"[VideoParser] 提取视频ID: {get_aweme_id_url}")
|
||
async with session.get(
|
||
get_aweme_id_url,
|
||
params={"url": video_url},
|
||
proxy=proxy_url,
|
||
timeout=aiohttp.ClientTimeout(total=timeout)
|
||
) as response:
|
||
if response.status != 200:
|
||
logger.error(f"[VideoParser] 提取视频ID失败: HTTP {response.status}")
|
||
return None
|
||
|
||
result = await response.json()
|
||
logger.debug(f"[VideoParser] 提取ID返回: {result}")
|
||
|
||
if result.get("code") != 200:
|
||
logger.error(f"[VideoParser] 提取视频ID失败: {result.get('msg', '未知错误')}")
|
||
return None
|
||
|
||
# data 可能是字符串类型的 aweme_id
|
||
data = result.get("data")
|
||
if isinstance(data, str):
|
||
aweme_id = data
|
||
elif isinstance(data, dict):
|
||
aweme_id = data.get("aweme_id")
|
||
else:
|
||
aweme_id = None
|
||
|
||
if not aweme_id:
|
||
logger.error("[VideoParser] 未找到 aweme_id")
|
||
return None
|
||
|
||
logger.info(f"[VideoParser] 获取到视频ID: {aweme_id}")
|
||
|
||
# 第二步:获取视频数据
|
||
logger.info(f"[VideoParser] 获取视频数据: {fetch_video_url}")
|
||
async with session.get(
|
||
fetch_video_url,
|
||
params={"aweme_id": aweme_id},
|
||
proxy=proxy_url,
|
||
timeout=aiohttp.ClientTimeout(total=timeout)
|
||
) as response:
|
||
return await self._handle_response(response)
|
||
|
||
except aiohttp.ClientConnectorError as e:
|
||
logger.error(f"[VideoParser] 无法连接到 API 服务器: {e}")
|
||
return None
|
||
except aiohttp.ClientError as e:
|
||
logger.error(f"[VideoParser] 网络请求失败: {e}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"[VideoParser] 解析视频失败: {e}")
|
||
import traceback
|
||
logger.error(f"详细错误: {traceback.format_exc()}")
|
||
return None
|
||
|
||
async def _handle_response(self, response) -> dict:
|
||
"""处理 API 响应"""
|
||
if response.status != 200:
|
||
response_text = await response.text()
|
||
logger.error(f"[VideoParser] API 请求失败: HTTP {response.status}, 响应: {response_text[:200]}")
|
||
return None
|
||
|
||
result = await response.json()
|
||
logger.info(f"[VideoParser] API 返回: code={result.get('code')}, msg={result.get('msg')}")
|
||
# 打印完整返回数据以便调试
|
||
import json
|
||
logger.info(f"[VideoParser] 完整返回数据: {json.dumps(result, ensure_ascii=False, indent=2)}")
|
||
|
||
# 检查返回状态(支持多种状态码格式)
|
||
code = result.get("code")
|
||
if code not in [200, "200", 1, "1", True]:
|
||
logger.error(f"[VideoParser] API 返回错误: {result.get('msg', '未知错误')}")
|
||
return None
|
||
|
||
return result
|
||
|
||
async def _send_video_card(self, bot: WechatHookClient, to_wxid: str, video_info: dict):
|
||
"""发送视频信息卡片"""
|
||
try:
|
||
# 从 API 返回中提取字段
|
||
data = video_info.get("data", {})
|
||
aweme_detail = data.get("aweme_detail", {})
|
||
|
||
# 提取作者信息
|
||
author = aweme_detail.get("author", {})
|
||
nickname = author.get("nickname", "")
|
||
|
||
# 提取视频描述
|
||
desc = aweme_detail.get("desc", "")
|
||
|
||
# 提取封面图(使用 cover_original_scale 的第一个链接)
|
||
video = aweme_detail.get("video", {})
|
||
cover_original_scale = video.get("cover_original_scale", {})
|
||
cover_url_list = cover_original_scale.get("url_list", [])
|
||
image_url = cover_url_list[0] if cover_url_list else ""
|
||
|
||
# 提取视频播放地址(使用 play_addr 的第一个链接)
|
||
play_addr = video.get("play_addr", {})
|
||
url_list = play_addr.get("url_list", [])
|
||
video_url = url_list[0] if url_list else ""
|
||
|
||
# 使用默认值(如果字段为空)
|
||
title = nickname or self.config["card"]["default_title"]
|
||
desc = desc or self.config["card"]["default_desc"]
|
||
image_url = image_url or "https://www.functen.cn/static/img/709a3f34713ef07b09d524bee2df69d6.DY.webp"
|
||
url = video_url or self.config["card"]["default_url"]
|
||
|
||
# 限制标题和描述长度
|
||
if len(title) > 50:
|
||
title = title[:47] + "..."
|
||
if len(desc) > 100:
|
||
desc = desc[:97] + "..."
|
||
|
||
logger.info(f"[VideoParser] 发送卡片: title={title}, desc={desc[:30]}...")
|
||
|
||
# 发送链接卡片
|
||
await bot.send_link_card(
|
||
to_wxid=to_wxid,
|
||
title=title,
|
||
desc=desc,
|
||
url=url,
|
||
image_url=image_url,
|
||
)
|
||
|
||
logger.success(f"[VideoParser] 视频卡片发送成功")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[VideoParser] 发送视频卡片失败: {e}")
|
||
import traceback
|
||
logger.error(f"详细错误: {traceback.format_exc()}")
|
||
|
||
async def _download_and_send_video(self, bot: WechatHookClient, to_wxid: str, video_url: str):
|
||
"""下载视频并发送"""
|
||
try:
|
||
if not self.config.get("download", {}).get("enabled", False):
|
||
logger.info("[VideoParser] 视频下载功能未启用")
|
||
return False
|
||
|
||
download_api_url = self.config["download"]["download_api_url"]
|
||
timeout = self.config["download"]["timeout"]
|
||
|
||
# 下载到插件目录下的 videos 文件夹
|
||
videos_dir = Path(__file__).parent / "videos"
|
||
videos_dir.mkdir(exist_ok=True)
|
||
|
||
logger.info(f"[VideoParser] 开始下载视频: {video_url}")
|
||
|
||
import ssl
|
||
import time
|
||
import uuid
|
||
from datetime import datetime
|
||
|
||
ssl_context = ssl.create_default_context()
|
||
ssl_context.check_hostname = False
|
||
ssl_context.verify_mode = ssl.CERT_NONE
|
||
|
||
# 配置代理
|
||
proxy_config = self.config.get("proxy", {})
|
||
proxy_url = None
|
||
if proxy_config.get("enabled", False):
|
||
proxy_type = proxy_config.get("type", "socks5")
|
||
proxy_host = proxy_config.get("host")
|
||
proxy_port = proxy_config.get("port")
|
||
if proxy_host and proxy_port:
|
||
proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}"
|
||
|
||
connector = aiohttp.TCPConnector(
|
||
ssl=ssl_context,
|
||
force_close=True,
|
||
enable_cleanup_closed=True
|
||
)
|
||
|
||
async with aiohttp.ClientSession(connector=connector) as session:
|
||
async with session.get(
|
||
download_api_url,
|
||
params={"url": video_url},
|
||
proxy=proxy_url,
|
||
timeout=aiohttp.ClientTimeout(total=timeout)
|
||
) as response:
|
||
if response.status != 200:
|
||
logger.error(f"[VideoParser] 视频下载失败: HTTP {response.status}")
|
||
return False
|
||
|
||
# 检查响应类型
|
||
content_type = response.headers.get('Content-Type', '')
|
||
logger.info(f"[VideoParser] 响应类型: {content_type}")
|
||
|
||
video_data = await response.read()
|
||
|
||
# 检查是否是视频文件(MP4文件头)
|
||
if len(video_data) > 8:
|
||
file_header = video_data[:8].hex()
|
||
logger.info(f"[VideoParser] 文件头: {file_header}")
|
||
# MP4文件头通常是 00 00 00 xx 66 74 79 70
|
||
if not (b'ftyp' in video_data[:12] or b'moov' in video_data[:100]):
|
||
logger.warning(f"[VideoParser] 下载的可能不是有效的视频文件,前100字节: {video_data[:100]}")
|
||
|
||
if len(video_data) < 1024:
|
||
logger.warning(f"[VideoParser] 文件太小,可能下载失败,内容: {video_data[:200]}")
|
||
|
||
# 生成文件名
|
||
filename = f"douyin_{datetime.now():%Y%m%d_%H%M%S}_{uuid.uuid4().hex[:8]}.mp4"
|
||
file_path = videos_dir / filename
|
||
|
||
with open(file_path, "wb") as f:
|
||
f.write(video_data)
|
||
|
||
logger.info(f"[VideoParser] 视频下载完成: {file_path}, 文件大小: {len(video_data)} 字节")
|
||
|
||
# 等待文件写入完成
|
||
import os
|
||
max_wait = 10
|
||
wait_time = 0
|
||
while wait_time < max_wait:
|
||
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
|
||
logger.info(f"[VideoParser] 文件已就绪: {file_path}")
|
||
break
|
||
await asyncio.sleep(0.5)
|
||
wait_time += 0.5
|
||
|
||
if not os.path.exists(file_path):
|
||
logger.error(f"[VideoParser] 文件写入失败: {file_path}")
|
||
return False
|
||
|
||
logger.info(f"[VideoParser] 准备发送视频: {file_path}")
|
||
video_sent = await bot.send_file(to_wxid, str(file_path.resolve()))
|
||
|
||
if not video_sent:
|
||
logger.error(f"[VideoParser] 视频发送失败")
|
||
return False
|
||
|
||
logger.success(f"[VideoParser] 视频发送成功")
|
||
return True
|
||
|
||
except aiohttp.ClientError as e:
|
||
logger.error(f"[VideoParser] 视频下载网络错误: {e}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"[VideoParser] 视频下载失败: {e}")
|
||
import traceback
|
||
logger.error(f"详细错误: {traceback.format_exc()}")
|
||
return False
|
||
|
||
def _should_parse_group(self, room_wxid: str) -> bool:
|
||
"""判断是否应该在该群解析视频"""
|
||
enabled_groups = self.config["behavior"]["enabled_groups"]
|
||
disabled_groups = self.config["behavior"]["disabled_groups"]
|
||
|
||
# 如果在禁用列表中,不解析
|
||
if room_wxid in disabled_groups:
|
||
return False
|
||
|
||
# 如果启用列表为空,对所有群生效
|
||
if not enabled_groups:
|
||
return True
|
||
|
||
# 否则只对启用列表中的群生效
|
||
return room_wxid in enabled_groups
|