229 lines
10 KiB
Python
229 lines
10 KiB
Python
import logging
|
||
import os
|
||
import re
|
||
import time
|
||
import tomllib
|
||
import traceback
|
||
import requests
|
||
from typing import Dict, Any
|
||
|
||
from wcferry import WxMsg, Wcf
|
||
|
||
from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
|
||
|
||
|
||
class DouyinParserError(Exception):
|
||
"""抖音解析器自定义异常基类"""
|
||
pass
|
||
|
||
|
||
class DouyinParser:
|
||
description = "抖音无水印解析插件"
|
||
author = "姜不吃先生"
|
||
version = "1.0.2"
|
||
|
||
def __init__(self, wcf: Wcf, gbm: GroupBotManager):
|
||
self.url_pattern = re.compile(r'https?://v\.douyin\.com/\w+/?')
|
||
self.LOG = logging.getLogger(__name__)
|
||
self.wcf = wcf
|
||
self.gbm = gbm
|
||
with open("douyin_parser/config.toml", "rb") as f:
|
||
plugin_config = tomllib.load(f)
|
||
|
||
config = plugin_config["Douyin"]
|
||
|
||
self.enable = config.get("enable", True)
|
||
self.http_proxy = config.get("http_proxy", None)
|
||
self.LOG.info("[抖音] 插件初始化完成,代理设置: %s", self.http_proxy)
|
||
|
||
def _clean_response_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||
if not data:
|
||
return data
|
||
data[
|
||
'cover'] = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/v4/7c/49/e1/7c49e1af-ce92-d1c4-9a93-0a316e47ba94/AppIcon_TikTok-0-0-1x_U007epad-0-1-0-0-85-220.png/512x512bb.jpg"
|
||
|
||
return data
|
||
|
||
def _clean_url(self, url: str) -> str:
|
||
cleaned_url = url.strip().replace(';', '').replace('\n', '').replace('\r', '')
|
||
self.LOG.debug("[抖音] 清理后的URL: %s", cleaned_url)
|
||
return cleaned_url
|
||
|
||
def _get_real_video_url(self, video_url: str) -> str:
|
||
"""获取真实视频链接"""
|
||
max_retries = 3 # 最大重试次数
|
||
retry_delay = 2 # 重试延迟秒数
|
||
max_redirects = 10 # 最大重定向次数,防止死循环
|
||
proxies = {"http": self.http_proxy, "https": self.http_proxy} if self.http_proxy else None
|
||
redirect_history = []
|
||
|
||
for retry in range(max_retries):
|
||
try:
|
||
self.LOG.info("[抖音] 开始获取真实视频链接: %s (第%d次尝试)", video_url, retry + 1)
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
||
'Range': 'bytes=0-'
|
||
}
|
||
|
||
# 默认使用 allow_redirects=True 获取历史记录
|
||
response = requests.get(video_url, headers=headers, proxies=proxies, allow_redirects=True, timeout=60)
|
||
|
||
if response.history:
|
||
redirect_history = [resp.url for resp in response.history]
|
||
real_url = response.url
|
||
else:
|
||
# response.history 为空,手动解析重定向
|
||
current_url = video_url
|
||
for _ in range(max_redirects): # 限制最大重定向次数
|
||
resp = requests.get(current_url, headers=headers, proxies=proxies, allow_redirects=False,
|
||
timeout=60)
|
||
new_url = resp.headers.get('Location')
|
||
|
||
if not new_url:
|
||
break # 没有新的 Location,停止
|
||
|
||
if not new_url.startswith("http"):
|
||
from urllib.parse import urljoin
|
||
new_url = urljoin(current_url, new_url) # 处理相对路径重定向
|
||
|
||
if new_url in redirect_history:
|
||
self.LOG.info("[抖音] 检测到循环重定向: %s", new_url)
|
||
break # 避免死循环
|
||
|
||
redirect_history.append(new_url)
|
||
self.LOG.info("[抖音] 发现重定向: %s -> %s", current_url, new_url)
|
||
current_url = new_url
|
||
|
||
real_url = current_url
|
||
|
||
if redirect_history:
|
||
self.LOG.info("[抖音] 重定向历史: %s", redirect_history)
|
||
|
||
if real_url != video_url and ('v3-' in real_url.lower() or 'douyinvod.com' in real_url.lower()):
|
||
self.LOG.info("[抖音] 成功获取真实链接: %s", real_url)
|
||
return real_url
|
||
else:
|
||
self.LOG.info("[抖音] 未能获取到符合预期的视频链接,准备重试")
|
||
if retry < max_retries - 1:
|
||
time.sleep(retry_delay)
|
||
continue
|
||
return video_url
|
||
|
||
except Exception as e:
|
||
self.LOG.error("[抖音] 获取真实链接失败: %s (第%d次尝试)", str(e), retry + 1)
|
||
if retry < max_retries - 1:
|
||
time.sleep(retry_delay)
|
||
continue
|
||
return video_url
|
||
|
||
self.LOG.error("[抖音] 获取真实链接失败,已达到最大重试次数")
|
||
return video_url
|
||
|
||
def _parse_douyin(self, url: str) -> Dict[str, Any]:
|
||
try:
|
||
api_url = "https://apih.kfcgw50.me/api/douyin"
|
||
clean_url = self._clean_url(url)
|
||
params = {'url': clean_url, 'type': 'json'}
|
||
|
||
self.LOG.info("[抖音] 请求API: %s, 参数: %s", api_url, repr(params))
|
||
proxy = {"http": self.http_proxy, "https": self.http_proxy} if self.http_proxy else None
|
||
response = requests.get(api_url, params=params, timeout=30, proxies=proxy)
|
||
|
||
if response.status_code != 200:
|
||
raise DouyinParserError(f"API请求失败,状态码: {response.status_code}")
|
||
|
||
data = response.json()
|
||
self.LOG.info("[抖音] API响应数据: %s", data)
|
||
|
||
if data.get("code") == 200:
|
||
result = data.get("data", {})
|
||
self.LOG.info("[抖音] API响应数据result: %s", result)
|
||
if result.get('video'):
|
||
result['video'] = self._get_real_video_url(result['video'])
|
||
return self._clean_response_data(result)
|
||
else:
|
||
raise DouyinParserError(data.get("message", "未知错误"))
|
||
except Exception as e:
|
||
self.LOG.error("[抖音] 解析过程发生未知错误: %s\n%s", str(e), traceback.format_exc())
|
||
raise DouyinParserError(f"未知错误: {str(e)}")
|
||
|
||
def handle_douyin_links(self, message: WxMsg):
|
||
if not self.enable:
|
||
return
|
||
|
||
# 如果触发了指令,但是没有权限,则返回权限不足
|
||
if self.gbm.get_group_permission(message.roomid, Feature.DOUYIN_PARSER) == PermissionStatus.DISABLED:
|
||
return
|
||
|
||
try:
|
||
match = self.url_pattern.search(message.content)
|
||
if not match:
|
||
return
|
||
|
||
original_url = self._clean_url(match.group(0))
|
||
self.LOG.info("发现抖音链接: %s", original_url)
|
||
self.LOG.info("检测到抖音分享链接,正在解析无水印视频...")
|
||
self.wcf.send_text(f"检测到抖音分享链接,正在解析无水印视频...",
|
||
(message.roomid if message.from_group() else message.sender), message.sender)
|
||
video_info = self._parse_douyin(original_url)
|
||
if not video_info:
|
||
raise DouyinParserError("无法获取视频信息")
|
||
|
||
video_url = video_info.get('video', '')
|
||
title = video_info.get('title', '无标题')
|
||
author = video_info.get('name', '未知作者')
|
||
cover = video_info.get('cover', '')
|
||
|
||
if not video_url:
|
||
raise DouyinParserError("无法获取视频地址")
|
||
|
||
self.wcf.send_rich_text("bot", "gh_11", title[:30], f"{title[:30]} - {author[:10]}", video_url, cover,
|
||
message.roomid)
|
||
self.LOG.info(f"video_url: {video_url}, title: {title}, author: {author}, cover: {cover}")
|
||
mp4_path = self.download_stream(video_url, "douyin_parser/down_load_dir/douyin.mp4")
|
||
self.LOG.info(f"发送抖音视频:{mp4_path}")
|
||
self.wcf.send_file(mp4_path, message.roomid)
|
||
except Exception as e:
|
||
self.LOG.error("[抖音] 解析过程发生未知错误: %s\n%s", str(e), traceback.format_exc())
|
||
raise DouyinParserError(f"未知错误: {str(e)}")
|
||
return
|
||
|
||
def download_stream(self, url, save_path):
|
||
"""
|
||
从指定URL读取视频流并保存到本地
|
||
:param url: 视频流的URL
|
||
:param save_path: 本地保存路径(包含文件名,例如 "video.mp4")
|
||
"""
|
||
try:
|
||
# 发送GET请求,启用流式传输
|
||
response = requests.get(url, stream=True)
|
||
|
||
# 检查请求是否成功
|
||
response.raise_for_status() # 如果状态码不是200,将抛出异常
|
||
|
||
# 确保保存路径的目录存在
|
||
os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True)
|
||
|
||
# 检查是否是视频流(可选,根据Content-Type判断)
|
||
content_type = response.headers.get("Content-Type", "").lower()
|
||
if "video" not in content_type and "application/octet-stream" not in content_type:
|
||
print(f"警告: 返回的可能不是视频流,Content-Type: {content_type}")
|
||
print("响应内容预览:", response.text[:100]) # 打印前100字符查看
|
||
return
|
||
|
||
# 以二进制写入模式保存流数据
|
||
with open(save_path, "wb") as file:
|
||
for chunk in response.iter_content(chunk_size=1024): # 分块读取,每块1KB
|
||
if chunk: # 过滤空块
|
||
file.write(chunk)
|
||
print(f"视频已下载到: {save_path}")
|
||
return os.path.abspath(save_path)
|
||
except requests.RequestException as e:
|
||
print(f"请求失败: {e}")
|
||
except IOError as e:
|
||
print(f"文件写入失败: {e}")
|
||
except Exception as e:
|
||
print(f"发生未知错误: {e}")
|