343 lines
14 KiB
Python
343 lines
14 KiB
Python
import os
|
||
import re
|
||
import time
|
||
import traceback
|
||
import requests
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
from urllib.parse import urlparse
|
||
|
||
from loguru import logger
|
||
from pathlib import Path
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from utils.decorator.plugin_decorators import plugin_stats_decorator
|
||
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
|
||
from wechat_ipad import WechatAPIClient
|
||
from wechat_ipad.models.appmsg_xml import LINK_XML_NORMAL, VIDEO_XML_MESSAGE
|
||
|
||
|
||
class DouyinParserError(Exception):
|
||
"""抖音解析器自定义异常基类"""
|
||
pass
|
||
|
||
|
||
class DouyinParserPlugin(MessagePluginInterface):
|
||
"""抖音无水印解析插件"""
|
||
|
||
# 功能权限常量
|
||
FEATURE_KEY = "DOUYIN_PARSER"
|
||
FEATURE_DESCRIPTION = "🎵 抖音解析功能 [自动解析抖音链接]"
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "抖音解析"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "提供抖音链接无水印解析功能,支持视频下载和分享"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "姜不吃先生"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return "" # 不需要前缀,直接匹配命令
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return [] # 不使用命令触发,而是通过消息内容匹配
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.LOG = logger
|
||
self.url_pattern = re.compile(r'https?://v\.douyin\.com/[^\s/]+/?')
|
||
# 注册功能权限
|
||
self.feature = self.register_feature()
|
||
# 修改为使用插件目录下的down_load_dir文件夹
|
||
self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir")
|
||
# 确保下载目录存在
|
||
if not os.path.exists(self.download_dir):
|
||
os.makedirs(self.download_dir, exist_ok=True)
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG.debug(f"正在初始化 {self.name} 插件...")
|
||
|
||
# 保存上下文对象
|
||
self.event_system = context.get("event_system")
|
||
self.gbm = context.get("gbm")
|
||
self.download_dir = str(Path(Path(__file__).parent, "down_load_dir"))
|
||
# 从配置中获取参数
|
||
douyin_config = self._config.get("Douyin", {})
|
||
self.enable = douyin_config.get("enable", True)
|
||
self.http_proxy = douyin_config.get("http_proxy", "")
|
||
self.download_mode = douyin_config.get("download_mode", "card") # card或file
|
||
|
||
self.LOG.debug(f"[{self.name}] 插件初始化完成,代理设置: {self.http_proxy}")
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.debug(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
if not self.enable:
|
||
return False
|
||
|
||
content = str(message.get("content", "")).strip()
|
||
match = self.url_pattern.search(content)
|
||
return match is not None
|
||
|
||
@plugin_stats_decorator(plugin_name="抖音解析")
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理消息"""
|
||
content = str(message.get("content", "")).strip()
|
||
self.LOG.debug(f"插件执行: {self.name}:{content}")
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
|
||
bot: WechatAPIClient = message.get("bot")
|
||
# 检查权限
|
||
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
|
||
try:
|
||
match = self.url_pattern.search(content)
|
||
if not match:
|
||
return False, "未找到抖音链接"
|
||
|
||
original_url = self._clean_url(match.group(0))
|
||
self.LOG.info(f"发现抖音链接: {original_url}")
|
||
|
||
# 解析抖音视频
|
||
video_info = self._parse_douyin(original_url)
|
||
if not video_info:
|
||
self.LOG.error(f"❌无法解析抖音视频信息")
|
||
return False, "解析失败"
|
||
|
||
video_url = video_info.get('url', '')
|
||
title = video_info.get('title', '无标题')
|
||
author = video_info.get('author', '未知作者')
|
||
cover = video_info.get('cover', '')
|
||
|
||
if not video_url:
|
||
self.LOG.error(f"❌无法获取视频地址")
|
||
return False, "获取视频地址失败"
|
||
|
||
# 根据模式选择发送方式
|
||
if self.download_mode == "file":
|
||
# 下载并发送文件
|
||
video_filename = f"video_{int(time.time())}.mp4"
|
||
save_path = os.path.join(self.download_dir, video_filename)
|
||
self.LOG.info(f"开始下载视频到: {save_path}")
|
||
mp4_path = self._download_stream(video_url, os.path.join(self.download_dir, save_path))
|
||
if mp4_path:
|
||
await self.bot.send_video_message((roomid if roomid else sender), Path(mp4_path))
|
||
return True, "发送视频文件成功"
|
||
else:
|
||
self.LOG.error(f"❌下载视频失败")
|
||
return False, "下载视频失败"
|
||
else:
|
||
# 发送卡片
|
||
xml_content = f"{VIDEO_XML_MESSAGE}".format(title=author,
|
||
des=title,
|
||
url=video_url,
|
||
thumburl=cover
|
||
)
|
||
await self.bot.send_link_xml_message(xml_content, (roomid if roomid else sender))
|
||
return True, "发送卡片成功"
|
||
|
||
except DouyinParserError as e:
|
||
self.LOG.error(f"抖音解析错误: {e}")
|
||
self.LOG.error(f"❌抖音解析失败: {str(e)}")
|
||
return False, f"解析错误: {e}"
|
||
except Exception as e:
|
||
self.LOG.error(f"处理抖音链接出错: {e}\n{traceback.format_exc()}")
|
||
self.LOG.error(f"❌处理抖音链接出错: {str(e)}")
|
||
return False, f"处理出错: {e}"
|
||
|
||
def _clean_url(self, url: str) -> str:
|
||
"""清理URL"""
|
||
cleaned_url = url.strip().replace(';', '').replace('\n', '').replace('\r', '')
|
||
self.LOG.debug(f"[抖音] 清理后的URL: {cleaned_url}")
|
||
return cleaned_url
|
||
|
||
def _clean_response_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""清理响应数据"""
|
||
if not data:
|
||
return data
|
||
default_cover = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/v4/7c/49/e1/7c49e1af-ce92-d1c4-9a93-0a316e47ba94/AppIcon_TikTok-0-0-1x_U007epad-0-1-0-0-85-220.png/512x512bb.jpg"
|
||
cover = data.get('cover')
|
||
if isinstance(cover, str):
|
||
c = cover.strip().strip('`')
|
||
data['cover'] = c if c.startswith('http') else default_cover
|
||
else:
|
||
data['cover'] = default_cover
|
||
return data
|
||
|
||
def _parse_douyin(self, url: str) -> Dict[str, Any]:
|
||
try:
|
||
clean_url = self._clean_url(url)
|
||
primary = self._parse_from_internal_api(clean_url)
|
||
if primary and primary.get('url'):
|
||
return self._clean_response_data(primary)
|
||
secondary = self._parse_from_external_api(clean_url)
|
||
if secondary and secondary.get('url'):
|
||
return self._clean_response_data(secondary)
|
||
raise DouyinParserError("两种渠道均未获取到视频地址")
|
||
except Exception as e:
|
||
self.LOG.error(f"[抖音] 解析过程发生未知错误: {str(e)}\n{traceback.format_exc()}")
|
||
raise DouyinParserError(f"未知错误: {str(e)}")
|
||
|
||
def _build_proxies(self) -> Optional[Dict[str, str]]:
|
||
if self.http_proxy:
|
||
return {"http": self.http_proxy, "https": self.http_proxy}
|
||
return None
|
||
|
||
def _parse_from_internal_api(self, clean_url: str) -> Optional[Dict[str, Any]]:
|
||
try:
|
||
endpoint = "http://192.168.2.32:8999/api/hybrid/video_data"
|
||
headers = {"accept": "application/json"}
|
||
params = {"url": clean_url, "minimal": "false"}
|
||
response = requests.get(endpoint, headers=headers, params=params, timeout=10, proxies=self._build_proxies())
|
||
if response.status_code != 200:
|
||
return None
|
||
body = response.json() or {}
|
||
if body.get("code") != 200:
|
||
return None
|
||
data = body.get("data") or {}
|
||
video = data.get("video") or {}
|
||
bit_rates = video.get("bit_rate") or []
|
||
chosen_url = ""
|
||
mp4_sorted = sorted([br for br in bit_rates if br.get("format") == "mp4"], key=lambda x: x.get("bit_rate") or 0, reverse=True)
|
||
for br in mp4_sorted:
|
||
play_addr = br.get("play_addr") or {}
|
||
urls = play_addr.get("url_list") or []
|
||
selected = self._prefer_v3_v10(urls)
|
||
if selected:
|
||
chosen_url = selected
|
||
break
|
||
if not chosen_url:
|
||
play_addr = video.get("play_addr") or {}
|
||
urls = play_addr.get("url_list") or []
|
||
selected = self._prefer_v3_v10(urls)
|
||
if selected:
|
||
chosen_url = selected
|
||
cover = (video.get("cover") or {}).get("url_list") or []
|
||
cover_url = cover[0] if cover else ""
|
||
caption = data.get("caption") or "无标题"
|
||
author = (data.get("author") or {})
|
||
nickname = author.get("nickname") or author.get("unique_id") or "未知作者"
|
||
result = {"url": chosen_url or "", "title": caption, "author": nickname, "cover": cover_url}
|
||
if result.get("url"):
|
||
return result
|
||
return None
|
||
except Exception:
|
||
return None
|
||
|
||
def _parse_from_external_api(self, clean_url: str) -> Optional[Dict[str, Any]]:
|
||
try:
|
||
pay_api_url = "https://api.pearktrue.cn/api/video/api.php"
|
||
params = {"url": clean_url, "key": "f56c1fed0c6e64e7"}
|
||
response = requests.post(pay_api_url, params=params, timeout=10, proxies=self._build_proxies())
|
||
if response.status_code != 200:
|
||
return None
|
||
data = response.json() or {}
|
||
if data.get("code") == 200:
|
||
result = data.get("data", {})
|
||
if result.get("url"):
|
||
return result
|
||
return None
|
||
except Exception:
|
||
return None
|
||
|
||
def _prefer_v3_v10(self, urls: List[str]) -> Optional[str]:
|
||
try:
|
||
if not urls:
|
||
return None
|
||
cleaned = [(u or "").strip().strip("`") for u in urls if u]
|
||
def is_vx(n: str) -> bool:
|
||
return bool(re.match(r"^v(3|4|5|6|7|8|9|10)(?:[\-.]|$)", n, re.I))
|
||
def is_douyinvod(n: str) -> bool:
|
||
return "douyinvod.com" in n.lower()
|
||
first = None
|
||
for s in cleaned:
|
||
netloc = urlparse(s).netloc
|
||
if is_vx(netloc) and is_douyinvod(netloc):
|
||
return s
|
||
if first is None:
|
||
first = s
|
||
for s in cleaned:
|
||
netloc = urlparse(s).netloc
|
||
if is_vx(netloc):
|
||
return s
|
||
for s in cleaned:
|
||
netloc = urlparse(s).netloc
|
||
if is_douyinvod(netloc):
|
||
return s
|
||
return first
|
||
except Exception:
|
||
return urls[0] if urls else None
|
||
|
||
def _download_stream(self, url, save_path):
|
||
"""
|
||
从指定URL读取视频流并保存到本地
|
||
:param url: 视频流的URL
|
||
:param save_path: 本地保存路径(包含文件名,例如 "video.mp4")
|
||
"""
|
||
try:
|
||
# 发送GET请求,启用流式传输
|
||
response = requests.get(url, stream=True)
|
||
|
||
# 检查请求是否成功
|
||
response.raise_for_status() # 如果状态码不是200,将抛出异常
|
||
|
||
# 确保保存路径的目录存在
|
||
os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True)
|
||
|
||
# 检查是否是视频流(可选,根据Content-Type判断)
|
||
content_type = response.headers.get("Content-Type", "").lower()
|
||
if "video" not in content_type and "application/octet-stream" not in content_type:
|
||
self.LOG.warning(f"警告: 返回的可能不是视频流,Content-Type: {content_type}")
|
||
self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看
|
||
return None
|
||
|
||
# 以二进制写入模式保存流数据
|
||
with open(save_path, "wb") as file:
|
||
for chunk in response.iter_content(chunk_size=1024): # 分块读取,每块1KB
|
||
if chunk: # 过滤空块
|
||
file.write(chunk)
|
||
self.LOG.info(f"视频已下载到: {save_path}")
|
||
return os.path.abspath(save_path)
|
||
except requests.RequestException as e:
|
||
self.LOG.error(f"请求失败: {e}")
|
||
except IOError as e:
|
||
self.LOG.error(f"文件写入失败: {e}")
|
||
except Exception as e:
|
||
self.LOG.error(f"发生未知错误: {e}")
|
||
return None
|