import logging import re import tomllib import os import traceback import requests from typing import Dict, Any from wcferry import WxMsg, Wcf from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus import lz4.block as lb class DouyinParserError(Exception): """抖音解析器自定义异常基类""" pass class DouyinParser: description = "抖音无水印解析插件" author = "姜不吃先生" version = "1.0.2" def __init__(self, wcf: Wcf, gbm: GroupBotManager): self.url_pattern = re.compile(r'https?://v\.douyin\.com/\w+/?') self.LOG = logging.getLogger(__name__) self.wcf = wcf self.gbm = gbm with open("douyin_parser/config.toml", "rb") as f: plugin_config = tomllib.load(f) config = plugin_config["Douyin"] self.enable = config.get("enable", True) self.http_proxy = config.get("http_proxy", None) self.LOG.info("[抖音] 插件初始化完成,代理设置: %s", self.http_proxy) def _clean_response_data(self, data: Dict[str, Any]) -> Dict[str, Any]: if not data: return data data['cover'] = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/.../512x512bb.jpg" return data def _clean_url(self, url: str) -> str: cleaned_url = url.strip().replace(';', '').replace('\n', '').replace('\r', '') self.LOG.debug("[抖音] 清理后的URL: %s", cleaned_url) return cleaned_url def _get_real_video_url(self, video_url: str) -> str: max_retries = 3 retry_delay = 2 for retry in range(max_retries): try: self.LOG.info("[抖音] 开始获取真实视频链接: %s (第%d次尝试)", video_url, retry + 1) proxy = {"http": self.http_proxy, "https": self.http_proxy} if self.http_proxy else None headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)...' } response = requests.get(video_url, headers=headers, proxies=proxy, allow_redirects=True, timeout=60) if response.status_code in [200, 206]: real_url = response.url self.LOG.info("[抖音] 成功获取真实链接: %s", real_url) return real_url else: self.LOG.error("[抖音] 获取视频真实链接失败, 状态码: %d", response.status_code) except Exception as e: self.LOG.error("[抖音] 获取真实链接失败: %s", str(e)) self.LOG.error("[抖音] 获取真实链接失败,已达到最大重试次数") return video_url def _parse_douyin(self, url: str) -> Dict[str, Any]: try: api_url = "https://apih.kfcgw50.me/api/douyin" clean_url = self._clean_url(url) params = {'url': clean_url, 'type': 'json'} self.LOG.debug("[抖音] 请求API: %s, 参数: %s", api_url, repr(params)) proxy = {"http": self.http_proxy, "https": self.http_proxy} if self.http_proxy else None response = requests.get(api_url, params=params, timeout=30, proxies=proxy) if response.status_code != 200: raise DouyinParserError(f"API请求失败,状态码: {response.status_code}") data = response.json() self.LOG.debug("[抖音] API响应数据: %s", data) if data.get("code") == 200: result = data.get("data", {}) if result.get('video'): result['video'] = self._get_real_video_url(result['video']) return self._clean_response_data(result) else: raise DouyinParserError(data.get("message", "未知错误")) except Exception as e: self.LOG.error("[抖音] 解析过程发生未知错误: %s\n%s", str(e), traceback.format_exc()) raise DouyinParserError(f"未知错误: {str(e)}") def handle_douyin_links(self, message: WxMsg): if not self.enable: return # 如果触发了指令,但是没有权限,则返回权限不足 if self.gbm.get_group_permission(message.roomid, Feature.DOUYIN_PARSER) == PermissionStatus.DISABLED: return try: match = self.url_pattern.search(message.content) if not match: return original_url = self._clean_url(match.group(0)) self.LOG.info("发现抖音链接: %s", original_url) self.LOG.info("检测到抖音分享链接,正在解析无水印视频...") self.wcf.send_text(f"检测到抖音分享链接,正在解析无水印视频...", (message.roomid if message.from_group() else message.sender), message.sender) video_info = self._parse_douyin(original_url) if not video_info: raise DouyinParserError("无法获取视频信息") video_url = video_info.get('video', '') title = video_info.get('title', '无标题') author = video_info.get('name', '未知作者') cover = video_info.get('cover', '') if not video_url: raise DouyinParserError("无法获取视频地址") self.send_xml_video(message, title, author, video_url,cover) except Exception as e: self.LOG.error("[抖音] 解析过程发生未知错误: %s\n%s", str(e), traceback.format_exc()) raise DouyinParserError(f"未知错误: {str(e)}") return def send_xml_video(self, message: WxMsg, title, author, video_url,cover): video_title = f"{title[:30]} - {author[:10]}" if author else title[:40], xml_message = f""" {video_title} 点击观看无水印视频 view 5 0 {video_url} {cover} 0 0 0 Jyunere 0 1 """ # 修改消息数据库里面的消息content 内容 text_bytes = xml_message.encode('utf-8') compressed_data = lb.compress(text_bytes, store_size=False).hex() data = self.wcf.query_sql('MSG0.db', "SELECT * FROM MSG where type = 49 limit 1") self.wcf.query_sql('MSG0.db', f"""UPDATE MSG SET CompressContent = x'{compressed_data}', BytesExtra=x'', type=49, SubType=3, IsSender=0, TalkerId=2 WHERE MsgSvrID={data[0]['MsgSvrID']}""" ) result = self.wcf.forward_msg(data[0]["MsgSvrID"], message.roomid) print(f"视频链接:{result}")