积分赠送功能插件化

抖音真实地址获取插件化
This commit is contained in:
liuwei
2025-03-19 17:49:51 +08:00
parent 31516c3090
commit 7593483a40
10 changed files with 621 additions and 460 deletions

View File

@@ -0,0 +1,7 @@
# 从当前包的main模块导入DouyinParserPlugin类
from .main import DouyinParserPlugin
# 提供get_plugin函数返回插件实例
def get_plugin():
"""获取插件实例"""
return DouyinParserPlugin()

View File

@@ -0,0 +1,10 @@
[Douyin]
enable = true
# 发送模式: card(发送卡片) 或 file(下载并发送文件)
download_mode = "card"
# Http代理设置用于获取真实链接发送卡片如果家里有ipv6可以设置为空
# 格式: http://用户名:密码@代理地址:代理端口
# 例如http://127.0.0.1:7890
http_proxy = ""

View File

@@ -0,0 +1,323 @@
import logging
import os
import re
import time
import traceback
import requests
from typing import Dict, Any, List, Optional, Tuple
from wcferry import Wcf, WxMsg
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from plugins.stats_collector.decorators import plugin_stats_decorator
from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
class DouyinParserError(Exception):
"""抖音解析器自定义异常基类"""
pass
class DouyinParserPlugin(MessagePluginInterface):
"""抖音无水印解析插件"""
@property
def name(self) -> str:
return "抖音解析"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供抖音链接无水印解析功能,支持视频下载和分享"
@property
def author(self) -> str:
return "姜不吃先生"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return [] # 不使用命令触发,而是通过消息内容匹配
def __init__(self):
super().__init__()
self.url_pattern = re.compile(r'https?://v\.douyin\.com/\w+/?')
# 修改为使用插件目录下的down_load_dir文件夹
self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir")
# 确保下载目录存在
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir, exist_ok=True)
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf = context.get("wcf")
self.event_system = context.get("event_system")
self.message_util = context.get("message_util")
self.gbm = context.get("gbm")
# 从配置中获取参数
douyin_config = self._config.get("Douyin", {})
self.enable = douyin_config.get("enable", True)
self.http_proxy = douyin_config.get("http_proxy", "")
self.download_mode = douyin_config.get("download_mode", "card") # card或file
self.LOG.info(f"[{self.name}] 插件初始化完成,代理设置: {self.http_proxy}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
match = self.url_pattern.search(content)
return match is not None
@plugin_stats_decorator(plugin_name="抖音解析")
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf: Wcf = message.get("wcf")
gbm: GroupBotManager = message.get("gbm")
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.DOUYIN_PARSER) == PermissionStatus.DISABLED:
return False, "没有权限"
try:
match = self.url_pattern.search(content)
if not match:
return False, "未找到抖音链接"
original_url = self._clean_url(match.group(0))
self.LOG.info(f"发现抖音链接: {original_url}")
# 解析抖音视频
video_info = self._parse_douyin(original_url)
if not video_info:
wcf.send_text(f"❌无法解析抖音视频信息",
(roomid if roomid else sender), sender)
return True, "解析失败"
video_url = video_info.get('video', '')
title = video_info.get('title', '无标题')
author = video_info.get('name', '未知作者')
cover = video_info.get('cover', '')
if not video_url:
wcf.send_text(f"❌无法获取视频地址",
(roomid if roomid else sender), sender)
return True, "获取视频地址失败"
# 根据模式选择发送方式
if self.download_mode == "file":
# 下载并发送文件
mp4_path = self._download_stream(video_url, os.path.join(self.download_dir, "douyin.mp4"))
if mp4_path:
wcf.send_file(mp4_path, (roomid if roomid else sender))
return True, "发送视频文件成功"
else:
wcf.send_text(f"❌下载视频失败",
(roomid if roomid else sender), sender)
return True, "下载视频失败"
else:
# 发送卡片
wcf.send_rich_text(
"BOT-PC直接查看",
"gh_11",
title[:30],
f"PC直接查看-{title[:20]} - {author[:10]}",
video_url,
cover,
(roomid if roomid else sender)
)
return True, "发送卡片成功"
except DouyinParserError as e:
self.LOG.error(f"抖音解析错误: {e}")
wcf.send_text(f"❌抖音解析失败: {str(e)}",
(roomid if roomid else sender), sender)
return True, f"解析错误: {e}"
except Exception as e:
self.LOG.error(f"处理抖音链接出错: {e}\n{traceback.format_exc()}")
wcf.send_text(f"❌处理抖音链接出错: {str(e)}",
(roomid if roomid else sender), sender)
return True, f"处理出错: {e}"
def _clean_url(self, url: str) -> str:
"""清理URL"""
cleaned_url = url.strip().replace(';', '').replace('\n', '').replace('\r', '')
self.LOG.debug(f"[抖音] 清理后的URL: {cleaned_url}")
return cleaned_url
def _clean_response_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""清理响应数据"""
if not data:
return data
data['cover'] = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/v4/7c/49/e1/7c49e1af-ce92-d1c4-9a93-0a316e47ba94/AppIcon_TikTok-0-0-1x_U007epad-0-1-0-0-85-220.png/512x512bb.jpg"
return data
def _get_real_video_url(self, video_url: str) -> str:
"""获取真实视频链接"""
max_retries = 3 # 最大重试次数
retry_delay = 2 # 重试延迟秒数
max_redirects = 10 # 最大重定向次数,防止死循环
proxies = {"http": self.http_proxy, "https": self.http_proxy} if self.http_proxy else None
redirect_history = []
for retry in range(max_retries):
try:
self.LOG.info(f"[抖音] 开始获取真实视频链接: {video_url} (第{retry + 1}次尝试)")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Range': 'bytes=0-'
}
# 默认使用 allow_redirects=True 获取历史记录
response = requests.get(video_url, headers=headers, proxies=proxies, allow_redirects=True, timeout=60)
if response.history:
redirect_history = [resp.url for resp in response.history]
real_url = response.url
else:
# response.history 为空,手动解析重定向
current_url = video_url
for _ in range(max_redirects): # 限制最大重定向次数
resp = requests.get(current_url, headers=headers, proxies=proxies, allow_redirects=False,
timeout=60)
new_url = resp.headers.get('Location')
if not new_url:
break # 没有新的 Location停止
if not new_url.startswith("http"):
from urllib.parse import urljoin
new_url = urljoin(current_url, new_url) # 处理相对路径重定向
if new_url in redirect_history:
self.LOG.info(f"[抖音] 检测到循环重定向: {new_url}")
break # 避免死循环
redirect_history.append(new_url)
self.LOG.info(f"[抖音] 发现重定向: {current_url} -> {new_url}")
current_url = new_url
real_url = current_url
if redirect_history:
self.LOG.info(f"[抖音] 重定向历史: {redirect_history}")
if real_url != video_url and ('v3-' in real_url.lower() or 'douyinvod.com' in real_url.lower()):
self.LOG.info(f"[抖音] 成功获取真实链接: {real_url}")
return real_url
else:
self.LOG.info("[抖音] 未能获取到符合预期的视频链接,准备重试")
if retry < max_retries - 1:
time.sleep(retry_delay)
continue
return video_url
except Exception as e:
self.LOG.error(f"[抖音] 获取真实链接失败: {str(e)} (第{retry + 1}次尝试)")
if retry < max_retries - 1:
time.sleep(retry_delay)
continue
return video_url
self.LOG.error("[抖音] 获取真实链接失败,已达到最大重试次数")
return video_url
def _parse_douyin(self, url: str) -> Dict[str, Any]:
"""解析抖音链接"""
try:
api_url = "http://zj.v.api.aa1.cn/api/douyinjx/"
clean_url = self._clean_url(url)
params = {'text': clean_url, 'type': 'json'}
self.LOG.info(f"[抖音] 请求API: {api_url}, 参数: {repr(params)}")
proxy = {"http": self.http_proxy, "https": self.http_proxy} if self.http_proxy else None
response = requests.get(api_url, params=params, timeout=30, proxies=proxy)
if response.status_code != 200:
raise DouyinParserError(f"API请求失败状态码: {response.status_code}")
data = response.json()
self.LOG.info(f"[抖音] API响应数据: {data}")
if data.get("code") == 200:
result = data.get("data", {})
self.LOG.info(f"[抖音] API响应数据result: {result}")
if result.get('video'):
result['video'] = self._get_real_video_url(result['video'])
return self._clean_response_data(result)
else:
raise DouyinParserError(data.get("message", "未知错误"))
except Exception as e:
self.LOG.error(f"[抖音] 解析过程发生未知错误: {str(e)}\n{traceback.format_exc()}")
raise DouyinParserError(f"未知错误: {str(e)}")
def _download_stream(self, url, save_path):
"""
从指定URL读取视频流并保存到本地
:param url: 视频流的URL
:param save_path: 本地保存路径(包含文件名,例如 "video.mp4"
"""
try:
# 发送GET请求启用流式传输
response = requests.get(url, stream=True)
# 检查请求是否成功
response.raise_for_status() # 如果状态码不是200将抛出异常
# 确保保存路径的目录存在
os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True)
# 检查是否是视频流可选根据Content-Type判断
content_type = response.headers.get("Content-Type", "").lower()
if "video" not in content_type and "application/octet-stream" not in content_type:
self.LOG.warning(f"警告: 返回的可能不是视频流Content-Type: {content_type}")
self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看
return None
# 以二进制写入模式保存流数据
with open(save_path, "wb") as file:
for chunk in response.iter_content(chunk_size=1024): # 分块读取每块1KB
if chunk: # 过滤空块
file.write(chunk)
self.LOG.info(f"视频已下载到: {save_path}")
return os.path.abspath(save_path)
except requests.RequestException as e:
self.LOG.error(f"请求失败: {e}")
except IOError as e:
self.LOG.error(f"文件写入失败: {e}")
except Exception as e:
self.LOG.error(f"发生未知错误: {e}")
return None