160 lines
6.7 KiB
Python
160 lines
6.7 KiB
Python
import logging
|
||
import re
|
||
import tomllib
|
||
import os
|
||
import traceback
|
||
import requests
|
||
from typing import Dict, Any
|
||
|
||
from wcferry import WxMsg, Wcf
|
||
|
||
from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
|
||
|
||
|
||
class DouyinParserError(Exception):
|
||
"""抖音解析器自定义异常基类"""
|
||
pass
|
||
|
||
|
||
class DouyinParser:
|
||
description = "抖音无水印解析插件"
|
||
author = "姜不吃先生"
|
||
version = "1.0.2"
|
||
|
||
def __init__(self, wcf: Wcf, gbm: GroupBotManager):
|
||
self.url_pattern = re.compile(r'https?://v\.douyin\.com/\w+/?')
|
||
self.LOG = logging.getLogger(__name__)
|
||
self.wcf = wcf
|
||
self.gbm = gbm
|
||
with open("douyin_parser/config.toml", "rb") as f:
|
||
plugin_config = tomllib.load(f)
|
||
|
||
config = plugin_config["Video"]
|
||
|
||
basic_config = config.get("Douyin", {})
|
||
self.enable = basic_config.get("enable", True)
|
||
self.http_proxy = basic_config.get("http_proxy", None)
|
||
self.LOG.debug("[抖音] 插件初始化完成,代理设置: %s", self.http_proxy)
|
||
|
||
def _clean_response_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||
if not data:
|
||
return data
|
||
data['cover'] = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/.../512x512bb.jpg"
|
||
return data
|
||
|
||
def _clean_url(self, url: str) -> str:
|
||
cleaned_url = url.strip().replace(';', '').replace('\n', '').replace('\r', '')
|
||
self.LOG.debug("[抖音] 清理后的URL: %s", cleaned_url)
|
||
return cleaned_url
|
||
|
||
def _get_real_video_url(self, video_url: str) -> str:
|
||
max_retries = 3
|
||
retry_delay = 2
|
||
|
||
for retry in range(max_retries):
|
||
try:
|
||
self.LOG.info("[抖音] 开始获取真实视频链接: %s (第%d次尝试)", video_url, retry + 1)
|
||
proxy = {"http": self.http_proxy, "https": self.http_proxy} if self.http_proxy else None
|
||
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)...'
|
||
}
|
||
|
||
response = requests.get(video_url, headers=headers, proxies=proxy, allow_redirects=True, timeout=60)
|
||
if response.status_code in [200, 206]:
|
||
real_url = response.url
|
||
self.LOG.info("[抖音] 成功获取真实链接: %s", real_url)
|
||
return real_url
|
||
else:
|
||
self.LOG.error("[抖音] 获取视频真实链接失败, 状态码: %d", response.status_code)
|
||
except Exception as e:
|
||
self.LOG.error("[抖音] 获取真实链接失败: %s", str(e))
|
||
|
||
self.LOG.error("[抖音] 获取真实链接失败,已达到最大重试次数")
|
||
return video_url
|
||
|
||
def _parse_douyin(self, url: str) -> Dict[str, Any]:
|
||
try:
|
||
api_url = "https://apih.kfcgw50.me/api/douyin"
|
||
clean_url = self._clean_url(url)
|
||
params = {'url': clean_url, 'type': 'json'}
|
||
|
||
self.LOG.debug("[抖音] 请求API: %s, 参数: %s", api_url, repr(params))
|
||
proxy = {"http": self.http_proxy, "https": self.http_proxy} if self.http_proxy else None
|
||
response = requests.get(api_url, params=params, timeout=30, proxies=proxy)
|
||
|
||
if response.status_code != 200:
|
||
raise DouyinParserError(f"API请求失败,状态码: {response.status_code}")
|
||
|
||
data = response.json()
|
||
self.LOG.debug("[抖音] API响应数据: %s", data)
|
||
|
||
if data.get("code") == 200:
|
||
result = data.get("data", {})
|
||
if result.get('video'):
|
||
result['video'] = self._get_real_video_url(result['video'])
|
||
return self._clean_response_data(result)
|
||
else:
|
||
raise DouyinParserError(data.get("message", "未知错误"))
|
||
except Exception as e:
|
||
self.LOG.error("[抖音] 解析过程发生未知错误: %s\n%s", str(e), traceback.format_exc())
|
||
raise DouyinParserError(f"未知错误: {str(e)}")
|
||
|
||
def handle_douyin_links(self, message: WxMsg):
|
||
if not self.enable:
|
||
return
|
||
|
||
# 如果触发了指令,但是没有权限,则返回权限不足
|
||
if self.gbm.get_group_permission(message.roomid, Feature.DOUYIN_PARSER) == PermissionStatus.DISABLED:
|
||
return
|
||
|
||
try:
|
||
match = self.url_pattern.search(message.content)
|
||
if not match:
|
||
return
|
||
|
||
original_url = self._clean_url(match.group(0))
|
||
self.LOG.info("发现抖音链接: %s", original_url)
|
||
self.LOG.info("检测到抖音分享链接,正在解析无水印视频...")
|
||
self.wcf.send_text(f"检测到抖音分享链接,正在解析无水印视频...",
|
||
(message.roomid if message.from_group() else message.sender), message.sender)
|
||
video_info = self._parse_douyin(original_url)
|
||
if not video_info:
|
||
raise DouyinParserError("无法获取视频信息")
|
||
|
||
video_url = video_info.get('video', '')
|
||
title = video_info.get('title', '无标题')
|
||
author = video_info.get('name', '未知作者')
|
||
if not video_url:
|
||
raise DouyinParserError("无法获取视频地址")
|
||
file_abspath = self.download_video(video_url, "douyin_parser/down_load_dir")
|
||
self.wcf.send_file(file_abspath, (message.roomid if message.from_group() else message.sender))
|
||
self.LOG.info("已发送解析结果: 标题[%s] 作者[%s]", title, author)
|
||
except DouyinParserError as e:
|
||
self.LOG.error("抖音解析失败: %s", str(e))
|
||
except Exception as e:
|
||
self.LOG.error("抖音解析发生未知错误: %s", str(e))
|
||
|
||
def download_video(self, video_url, save_dir):
|
||
# 确保 save_dir 是一个目录
|
||
if os.path.isfile(save_dir):
|
||
print(f"错误: {save_dir} 是一个文件,不能作为目录使用。")
|
||
return None
|
||
|
||
os.makedirs(save_dir, exist_ok=True)
|
||
save_path = os.path.join(save_dir, "video.mp4")
|
||
if video_url:
|
||
video_response = requests.get(video_url, stream=True)
|
||
if video_response.status_code == 200:
|
||
with open(save_path, "wb") as file:
|
||
for chunk in video_response.iter_content(chunk_size=1024):
|
||
file.write(chunk)
|
||
abs_path = os.path.abspath(save_path)
|
||
print(f"视频已下载至: {abs_path}")
|
||
return abs_path
|
||
else:
|
||
print("无法下载视频,HTTP 状态码:", video_response.status_code)
|
||
else:
|
||
print("API 响应中没有找到视频 URL")
|
||
return None
|