202 lines
8.0 KiB
Python
202 lines
8.0 KiB
Python
import logging
|
||
import re
|
||
import tomllib
|
||
import os
|
||
import traceback
|
||
import requests
|
||
from typing import Dict, Any
|
||
|
||
from wcferry import WxMsg, Wcf
|
||
|
||
from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
|
||
import lz4.block as lb
|
||
|
||
|
||
class DouyinParserError(Exception):
|
||
"""抖音解析器自定义异常基类"""
|
||
pass
|
||
|
||
|
||
class DouyinParser:
|
||
description = "抖音无水印解析插件"
|
||
author = "姜不吃先生"
|
||
version = "1.0.2"
|
||
|
||
def __init__(self, wcf: Wcf, gbm: GroupBotManager):
|
||
self.url_pattern = re.compile(r'https?://v\.douyin\.com/\w+/?')
|
||
self.LOG = logging.getLogger(__name__)
|
||
self.wcf = wcf
|
||
self.gbm = gbm
|
||
with open("douyin_parser/config.toml", "rb") as f:
|
||
plugin_config = tomllib.load(f)
|
||
|
||
config = plugin_config["Douyin"]
|
||
|
||
self.enable = config.get("enable", True)
|
||
self.http_proxy = config.get("http_proxy", None)
|
||
self.LOG.info("[抖音] 插件初始化完成,代理设置: %s", self.http_proxy)
|
||
|
||
def _clean_response_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||
if not data:
|
||
return data
|
||
data['cover'] = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/.../512x512bb.jpg"
|
||
return data
|
||
|
||
def _clean_url(self, url: str) -> str:
|
||
cleaned_url = url.strip().replace(';', '').replace('\n', '').replace('\r', '')
|
||
self.LOG.debug("[抖音] 清理后的URL: %s", cleaned_url)
|
||
return cleaned_url
|
||
|
||
def _get_real_video_url(self, video_url: str) -> str:
|
||
max_retries = 3
|
||
retry_delay = 2
|
||
|
||
for retry in range(max_retries):
|
||
try:
|
||
self.LOG.info("[抖音] 开始获取真实视频链接: %s (第%d次尝试)", video_url, retry + 1)
|
||
proxy = {"http": self.http_proxy, "https": self.http_proxy} if self.http_proxy else None
|
||
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)...'
|
||
}
|
||
|
||
response = requests.get(video_url, headers=headers, proxies=proxy, allow_redirects=True, timeout=60)
|
||
if response.status_code in [200, 206]:
|
||
real_url = response.url
|
||
self.LOG.info("[抖音] 成功获取真实链接: %s", real_url)
|
||
return real_url
|
||
else:
|
||
self.LOG.error("[抖音] 获取视频真实链接失败, 状态码: %d", response.status_code)
|
||
except Exception as e:
|
||
self.LOG.error("[抖音] 获取真实链接失败: %s", str(e))
|
||
|
||
self.LOG.error("[抖音] 获取真实链接失败,已达到最大重试次数")
|
||
return video_url
|
||
|
||
def _parse_douyin(self, url: str) -> Dict[str, Any]:
|
||
try:
|
||
api_url = "https://apih.kfcgw50.me/api/douyin"
|
||
clean_url = self._clean_url(url)
|
||
params = {'url': clean_url, 'type': 'json'}
|
||
|
||
self.LOG.debug("[抖音] 请求API: %s, 参数: %s", api_url, repr(params))
|
||
proxy = {"http": self.http_proxy, "https": self.http_proxy} if self.http_proxy else None
|
||
response = requests.get(api_url, params=params, timeout=30, proxies=proxy)
|
||
|
||
if response.status_code != 200:
|
||
raise DouyinParserError(f"API请求失败,状态码: {response.status_code}")
|
||
|
||
data = response.json()
|
||
self.LOG.debug("[抖音] API响应数据: %s", data)
|
||
|
||
if data.get("code") == 200:
|
||
result = data.get("data", {})
|
||
if result.get('video'):
|
||
result['video'] = self._get_real_video_url(result['video'])
|
||
return self._clean_response_data(result)
|
||
else:
|
||
raise DouyinParserError(data.get("message", "未知错误"))
|
||
except Exception as e:
|
||
self.LOG.error("[抖音] 解析过程发生未知错误: %s\n%s", str(e), traceback.format_exc())
|
||
raise DouyinParserError(f"未知错误: {str(e)}")
|
||
|
||
def handle_douyin_links(self, message: WxMsg):
|
||
if not self.enable:
|
||
return
|
||
|
||
# 如果触发了指令,但是没有权限,则返回权限不足
|
||
if self.gbm.get_group_permission(message.roomid, Feature.DOUYIN_PARSER) == PermissionStatus.DISABLED:
|
||
return
|
||
|
||
try:
|
||
match = self.url_pattern.search(message.content)
|
||
if not match:
|
||
return
|
||
|
||
original_url = self._clean_url(match.group(0))
|
||
self.LOG.info("发现抖音链接: %s", original_url)
|
||
self.LOG.info("检测到抖音分享链接,正在解析无水印视频...")
|
||
self.wcf.send_text(f"检测到抖音分享链接,正在解析无水印视频...",
|
||
(message.roomid if message.from_group() else message.sender), message.sender)
|
||
video_info = self._parse_douyin(original_url)
|
||
if not video_info:
|
||
raise DouyinParserError("无法获取视频信息")
|
||
|
||
video_url = video_info.get('video', '')
|
||
title = video_info.get('title', '无标题')
|
||
author = video_info.get('name', '未知作者')
|
||
|
||
if not video_url:
|
||
raise DouyinParserError("无法获取视频地址")
|
||
self.send_xml_video(message, title, author, video_url)
|
||
except Exception as e:
|
||
self.LOG.error("[抖音] 解析过程发生未知错误: %s\n%s", str(e), traceback.format_exc())
|
||
raise DouyinParserError(f"未知错误: {str(e)}")
|
||
return
|
||
|
||
def send_xml_video(self, message: WxMsg, title, author, video_url):
|
||
|
||
xml_message = f"""
|
||
<?xml version="1.0"?>
|
||
<msg>
|
||
<appmsg appid="" sdkver="1">
|
||
<title>{title} @{author}</title>
|
||
<des>点击观看无水印视频</des>
|
||
<action>view</action>
|
||
<type>5</type>
|
||
<showtype>0</showtype>
|
||
<content />
|
||
<url>{video_url}</url>
|
||
<dataurl />
|
||
<lowurl />
|
||
<lowdataurl />
|
||
<recorditem />
|
||
<thumburl></thumburl>
|
||
<messageaction />
|
||
<laninfo />
|
||
<extinfo />
|
||
<sourceusername />
|
||
<sourcedisplayname />
|
||
<commenturl />
|
||
<appattach>
|
||
<totallen>0</totallen>
|
||
<attachid />
|
||
<emoticonmd5 />
|
||
<fileext />
|
||
<aeskey />
|
||
</appattach>
|
||
<webviewshared>
|
||
<publisherId />
|
||
<publisherReqId>0</publisherReqId>
|
||
</webviewshared>
|
||
<weappinfo>
|
||
<pagepath />
|
||
<username />
|
||
<appid />
|
||
<appservicetype>0</appservicetype>
|
||
</weappinfo>
|
||
<websearch />
|
||
</appmsg>
|
||
<fromusername>Jyunere</fromusername>
|
||
<scene>0</scene>
|
||
<appinfo>
|
||
<version>1</version>
|
||
<appname></appname>
|
||
</appinfo>
|
||
<commenturl></commenturl>
|
||
</msg>
|
||
|
||
"""
|
||
# 修改消息数据库里面的消息content 内容
|
||
text_bytes = xml_message.encode('utf-8')
|
||
compressed_data = lb.compress(text_bytes, store_size=False).hex()
|
||
|
||
data = self.wcf.query_sql('MSG0.db', "SELECT * FROM MSG where type = 49 limit 1")
|
||
self.wcf.query_sql('MSG0.db',
|
||
f"""UPDATE MSG SET CompressContent = x'{compressed_data}', BytesExtra=x'', type=49, SubType=5,
|
||
IsSender=0, TalkerId=2 WHERE MsgSvrID={data[0]['MsgSvrID']}"""
|
||
)
|
||
|
||
result = self.wcf.forward_msg(data[0]["MsgSvrID"], message.roomid)
|
||
print(f"视频链接:{result}")
|