import os import re import time import traceback import requests from typing import Dict, Any, List, Optional, Tuple from loguru import logger from pathlib import Path from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from wechat_ipad import WechatAPIClient from wechat_ipad.models.appmsg_xml import LINK_XML_NORMAL, VIDEO_XML_MESSAGE class DouyinParserError(Exception): """抖音解析器自定义异常基类""" pass class DouyinParserPlugin(MessagePluginInterface): """抖音无水印解析插件""" # 功能权限常量 FEATURE_KEY = "DOUYIN_PARSER" FEATURE_DESCRIPTION = "🎵 抖音解析功能 [自动解析抖音链接]" @property def name(self) -> str: return "抖音解析" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供抖音链接无水印解析功能,支持视频下载和分享" @property def author(self) -> str: return "姜不吃先生" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return [] # 不使用命令触发,而是通过消息内容匹配 @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.LOG = logger self.url_pattern = re.compile(r'https?://v\.douyin\.com/[^\s/]+/?') # 注册功能权限 self.feature = self.register_feature() # 修改为使用插件目录下的down_load_dir文件夹 self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir") # 确保下载目录存在 if not os.path.exists(self.download_dir): os.makedirs(self.download_dir, exist_ok=True) def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG.info(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.event_system = context.get("event_system") self.gbm = context.get("gbm") self.download_dir = str(Path(Path(__file__).parent, "down_load_dir")) # 从配置中获取参数 douyin_config = self._config.get("Douyin", {}) self.enable = douyin_config.get("enable", True) self.http_proxy = douyin_config.get("http_proxy", "") self.download_mode = douyin_config.get("download_mode", "card") # card或file self.LOG.info(f"[{self.name}] 插件初始化完成,代理设置: {self.http_proxy}") return True def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() match = self.url_pattern.search(content) return match is not None @plugin_stats_decorator(plugin_name="抖音解析") async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.debug(f"插件执行: {self.name}:{content}") sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") # 检查权限 if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" try: match = self.url_pattern.search(content) if not match: return False, "未找到抖音链接" original_url = self._clean_url(match.group(0)) self.LOG.info(f"发现抖音链接: {original_url}") # 解析抖音视频 video_info = self._parse_douyin(original_url) if not video_info: self.LOG.error(f"❌无法解析抖音视频信息") return False, "解析失败" video_url = video_info.get('url', '') title = video_info.get('title', '无标题') author = video_info.get('author', '未知作者') cover = video_info.get('cover', '') if not video_url: self.LOG.error(f"❌无法获取视频地址") return False, "获取视频地址失败" # 根据模式选择发送方式 if self.download_mode == "file": # 下载并发送文件 video_filename = f"video_{int(time.time())}.mp4" save_path = os.path.join(self.download_dir, video_filename) self.LOG.info(f"开始下载视频到: {save_path}") mp4_path = self._download_stream(video_url, os.path.join(self.download_dir, save_path)) if mp4_path: await self.bot.send_video_message((roomid if roomid else sender), Path(mp4_path)) return True, "发送视频文件成功" else: self.LOG.error(f"❌下载视频失败") return False, "下载视频失败" else: # 发送卡片 xml_content = f"{VIDEO_XML_MESSAGE}".format(title=author, des=title, url=video_url, thumburl=cover ) await self.bot.send_link_xml_message(xml_content, (roomid if roomid else sender)) return True, "发送卡片成功" except DouyinParserError as e: self.LOG.error(f"抖音解析错误: {e}") self.LOG.error(f"❌抖音解析失败: {str(e)}") return False, f"解析错误: {e}" except Exception as e: self.LOG.error(f"处理抖音链接出错: {e}\n{traceback.format_exc()}") self.LOG.error(f"❌处理抖音链接出错: {str(e)}") return False, f"处理出错: {e}" def _clean_url(self, url: str) -> str: """清理URL""" cleaned_url = url.strip().replace(';', '').replace('\n', '').replace('\r', '') self.LOG.debug(f"[抖音] 清理后的URL: {cleaned_url}") return cleaned_url def _clean_response_data(self, data: Dict[str, Any]) -> Dict[str, Any]: """清理响应数据""" if not data: return data data[ 'cover'] = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/v4/7c/49/e1/7c49e1af-ce92-d1c4-9a93-0a316e47ba94/AppIcon_TikTok-0-0-1x_U007epad-0-1-0-0-85-220.png/512x512bb.jpg" return data def _parse_douyin(self, url: str) -> Dict[str, Any]: """解析抖音链接""" try: api_url = "https://api.pearktrue.cn/api/video/douyin/?url=" clean_url = self._clean_url(url) api_url = api_url + clean_url response = requests.get(api_url, timeout=30) if response.status_code != 200: raise DouyinParserError(f"API请求失败,状态码: {response.status_code}") data = response.json() self.LOG.info(f"[抖音] API响应数据: {data}") if data.get("code") == 200: result = data.get("data", {}) self.LOG.info(f"[抖音] API响应数据result: {result}") if result.get('url'): return result else: raise DouyinParserError(data.get("message", "未知错误")) except Exception as e: self.LOG.error(f"[抖音] 解析过程发生未知错误: {str(e)}\n{traceback.format_exc()}") raise DouyinParserError(f"未知错误: {str(e)}") def _download_stream(self, url, save_path): """ 从指定URL读取视频流并保存到本地 :param url: 视频流的URL :param save_path: 本地保存路径(包含文件名,例如 "video.mp4") """ try: # 发送GET请求,启用流式传输 response = requests.get(url, stream=True) # 检查请求是否成功 response.raise_for_status() # 如果状态码不是200,将抛出异常 # 确保保存路径的目录存在 os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True) # 检查是否是视频流(可选,根据Content-Type判断) content_type = response.headers.get("Content-Type", "").lower() if "video" not in content_type and "application/octet-stream" not in content_type: self.LOG.warning(f"警告: 返回的可能不是视频流,Content-Type: {content_type}") self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看 return None # 以二进制写入模式保存流数据 with open(save_path, "wb") as file: for chunk in response.iter_content(chunk_size=1024): # 分块读取,每块1KB if chunk: # 过滤空块 file.write(chunk) self.LOG.info(f"视频已下载到: {save_path}") return os.path.abspath(save_path) except requests.RequestException as e: self.LOG.error(f"请求失败: {e}") except IOError as e: self.LOG.error(f"文件写入失败: {e}") except Exception as e: self.LOG.error(f"发生未知错误: {e}") return None