init
This commit is contained in:
1
parsers/__init__.py
Normal file
1
parsers/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Parsers package
|
||||
54
parsers/base.py
Normal file
54
parsers/base.py
Normal file
@@ -0,0 +1,54 @@
|
||||
from abc import ABC, abstractmethod
|
||||
import requests
|
||||
from typing import Dict, Optional
|
||||
|
||||
class BaseParser(ABC):
|
||||
"""解析器基类"""
|
||||
|
||||
def __init__(self, api_url: str, api_key: Optional[str] = None, timeout: int = 30):
|
||||
self.api_url = api_url
|
||||
self.api_key = api_key
|
||||
self.timeout = timeout
|
||||
|
||||
@abstractmethod
|
||||
def parse(self, video_url: str) -> Dict:
|
||||
"""
|
||||
解析视频链接
|
||||
返回统一格式:
|
||||
{
|
||||
"cover": "封面URL",
|
||||
"video_url": "视频URL",
|
||||
"title": "标题",
|
||||
"description": "简介"
|
||||
}
|
||||
"""
|
||||
pass
|
||||
|
||||
def _make_request(self, url: str, params: Dict = None, headers: Dict = None, verify: bool = True) -> requests.Response:
|
||||
"""发送HTTP请求"""
|
||||
try:
|
||||
# 设置默认请求头
|
||||
default_headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||||
'Accept': 'application/json, text/plain, */*',
|
||||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
||||
}
|
||||
|
||||
# 合并自定义请求头
|
||||
if headers:
|
||||
default_headers.update(headers)
|
||||
|
||||
response = requests.get(url, params=params, headers=default_headers, timeout=self.timeout, verify=verify)
|
||||
response.raise_for_status()
|
||||
return response
|
||||
except requests.RequestException as e:
|
||||
raise Exception(f"请求失败: {str(e)}")
|
||||
|
||||
def _normalize_response(self, cover: str, video_url: str, title: str, description: str) -> Dict:
|
||||
"""标准化返回数据"""
|
||||
return {
|
||||
"cover": cover or "",
|
||||
"video_url": video_url or "",
|
||||
"title": title or "",
|
||||
"description": description or ""
|
||||
}
|
||||
120
parsers/bilibili.py
Normal file
120
parsers/bilibili.py
Normal file
@@ -0,0 +1,120 @@
|
||||
from parsers.base import BaseParser
|
||||
from typing import Dict
|
||||
from urllib.parse import urlencode
|
||||
|
||||
class BilibiliMirParser(BaseParser):
|
||||
"""哔哩哔哩解析器 - 米人API"""
|
||||
|
||||
def parse(self, video_url: str) -> Dict:
|
||||
"""解析哔哩哔哩视频"""
|
||||
try:
|
||||
# 手动构建URL,避免双重编码
|
||||
url = f"{self.api_url}/api/bzjiexi?{urlencode({'url': video_url})}"
|
||||
|
||||
response = self._make_request(url)
|
||||
data = response.json()
|
||||
|
||||
return self._extract_data(data)
|
||||
except Exception as e:
|
||||
raise Exception(f"哔哩哔哩解析失败(米人API): {str(e)}")
|
||||
|
||||
def _extract_data(self, data: Dict) -> Dict:
|
||||
"""提取并标准化数据"""
|
||||
try:
|
||||
if data.get("code") == 200 or data.get("status") == "success":
|
||||
video_data = data.get("data", {})
|
||||
|
||||
# 如果data是列表,取第一个元素
|
||||
if isinstance(video_data, list):
|
||||
video_data = video_data[0] if video_data else {}
|
||||
|
||||
cover = video_data.get("cover", "") or video_data.get("pic", "")
|
||||
video_url = video_data.get("url", "") or video_data.get("video_url", "")
|
||||
title = video_data.get("title", "")
|
||||
description = video_data.get("desc", "") or video_data.get("description", "")
|
||||
|
||||
return self._normalize_response(cover, video_url, title, description)
|
||||
else:
|
||||
raise Exception(f"解析失败: {data.get('msg', '未知错误')}")
|
||||
except Exception as e:
|
||||
raise Exception(f"数据提取失败: {str(e)}")
|
||||
|
||||
|
||||
class BilibiliBugPKParser(BaseParser):
|
||||
"""哔哩哔哩解析器 - BugPK API"""
|
||||
|
||||
def parse(self, video_url: str) -> Dict:
|
||||
"""解析哔哩哔哩视频"""
|
||||
try:
|
||||
# 手动构建URL,避免双重编码
|
||||
url = f"{self.api_url}/api/bilibili?{urlencode({'url': video_url})}"
|
||||
|
||||
response = self._make_request(url)
|
||||
data = response.json()
|
||||
|
||||
return self._extract_data(data)
|
||||
except Exception as e:
|
||||
raise Exception(f"哔哩哔哩解析失败(BugPK API): {str(e)}")
|
||||
|
||||
def _extract_data(self, data: Dict) -> Dict:
|
||||
"""提取并标准化数据"""
|
||||
try:
|
||||
if data.get("code") == 200 or data.get("status") == "success":
|
||||
video_data = data.get("data", {})
|
||||
|
||||
# 如果data是列表,取第一个元素
|
||||
if isinstance(video_data, list):
|
||||
video_data = video_data[0] if video_data else {}
|
||||
|
||||
cover = video_data.get("cover", "") or video_data.get("pic", "")
|
||||
video_url = video_data.get("url", "") or video_data.get("video_url", "")
|
||||
title = video_data.get("title", "")
|
||||
description = video_data.get("desc", "") or video_data.get("description", "")
|
||||
|
||||
return self._normalize_response(cover, video_url, title, description)
|
||||
else:
|
||||
raise Exception(f"解析失败: {data.get('msg', '未知错误')}")
|
||||
except Exception as e:
|
||||
raise Exception(f"数据提取失败: {str(e)}")
|
||||
|
||||
|
||||
class BilibiliYaohuParser(BaseParser):
|
||||
"""哔哩哔哩解析器 - 妖狐API"""
|
||||
|
||||
def parse(self, video_url: str) -> Dict:
|
||||
"""解析哔哩哔哩视频"""
|
||||
try:
|
||||
# 手动构建URL,避免双重编码
|
||||
url = f"{self.api_url}/api/v6/video/bili?{urlencode({'key': self.api_key, 'url': video_url})}"
|
||||
|
||||
response = self._make_request(url, verify=False)
|
||||
data = response.json()
|
||||
|
||||
return self._extract_data(data)
|
||||
except Exception as e:
|
||||
raise Exception(f"哔哩哔哩解析失败(妖狐API): {str(e)}")
|
||||
|
||||
def _extract_data(self, data: Dict) -> Dict:
|
||||
"""提取并标准化数据"""
|
||||
try:
|
||||
if data.get("parse_type") == "video":
|
||||
video_data = data.get("data", {})
|
||||
basic = video_data.get("basic", {})
|
||||
|
||||
# 提取基本信息
|
||||
cover = basic.get("cover", "")
|
||||
title = basic.get("title", "")
|
||||
description = basic.get("description", "")
|
||||
|
||||
# 提取视频URL - 优先使用data.video_url,其次使用videos[0].url
|
||||
video_url = video_data.get("video_url", "")
|
||||
if not video_url:
|
||||
videos = video_data.get("videos", [])
|
||||
if isinstance(videos, list) and videos:
|
||||
video_url = videos[0].get("url", "")
|
||||
|
||||
return self._normalize_response(cover, video_url, title, description)
|
||||
else:
|
||||
raise Exception(f"解析失败: 不支持的类型 {data.get('parse_type')}")
|
||||
except Exception as e:
|
||||
raise Exception(f"数据提取失败: {str(e)}")
|
||||
89
parsers/douyin.py
Normal file
89
parsers/douyin.py
Normal file
@@ -0,0 +1,89 @@
|
||||
from parsers.base import BaseParser
|
||||
from typing import Dict
|
||||
from urllib.parse import urlencode
|
||||
|
||||
class DouyinParser(BaseParser):
|
||||
"""抖音解析器"""
|
||||
|
||||
def parse(self, video_url: str) -> Dict:
|
||||
"""解析抖音视频"""
|
||||
try:
|
||||
# 步骤1: 提取视频ID
|
||||
aweme_id = self._get_aweme_id(video_url)
|
||||
|
||||
# 步骤2: 获取视频详细信息
|
||||
video_info = self._fetch_video_info(aweme_id)
|
||||
|
||||
# 步骤3: 提取并标准化数据
|
||||
return self._extract_data(video_info)
|
||||
except Exception as e:
|
||||
raise Exception(f"抖音解析失败: {str(e)}")
|
||||
|
||||
def _get_aweme_id(self, video_url: str) -> str:
|
||||
"""提取视频ID"""
|
||||
# 手动构建URL,避免双重编码
|
||||
url = f"{self.api_url}/api/douyin/web/get_aweme_id?{urlencode({'url': video_url})}"
|
||||
|
||||
response = self._make_request(url)
|
||||
data = response.json()
|
||||
|
||||
if data.get("code") != 200:
|
||||
raise Exception(f"获取视频ID失败: {data.get('msg', '未知错误')}")
|
||||
|
||||
return data.get("data")
|
||||
|
||||
def _fetch_video_info(self, aweme_id: str) -> Dict:
|
||||
"""获取视频详细信息"""
|
||||
# 手动构建URL,避免双重编码
|
||||
url = f"{self.api_url}/api/douyin/web/fetch_one_video?{urlencode({'aweme_id': aweme_id})}"
|
||||
|
||||
response = self._make_request(url)
|
||||
data = response.json()
|
||||
|
||||
if data.get("code") != 200:
|
||||
raise Exception("获取视频信息失败")
|
||||
|
||||
return data.get("data", {}).get("aweme_detail", {})
|
||||
|
||||
def _extract_data(self, video_info: Dict) -> Dict:
|
||||
"""提取并标准化数据"""
|
||||
try:
|
||||
# 提取封面
|
||||
cover = video_info.get("video", {}).get("cover_original_scale", {}).get("url_list", [""])[0]
|
||||
|
||||
# 提取视频URL
|
||||
video_url = video_info.get("video", {}).get("play_addr", {}).get("url_list", [""])[0]
|
||||
|
||||
# 提取标题(描述)
|
||||
title = video_info.get("desc", "")
|
||||
|
||||
# 提取作者信息作为简介
|
||||
author = video_info.get("author", {})
|
||||
author_name = author.get("nickname", "")
|
||||
author_signature = author.get("signature", "")
|
||||
description = f"作者: {author_name}"
|
||||
if author_signature:
|
||||
description += f" | {author_signature}"
|
||||
|
||||
return self._normalize_response(cover, video_url, title, description)
|
||||
except Exception as e:
|
||||
raise Exception(f"数据提取失败: {str(e)}")
|
||||
|
||||
|
||||
class DouyinDownloadParser(BaseParser):
|
||||
"""抖音下载解析器(直接下载)"""
|
||||
|
||||
def parse(self, video_url: str) -> Dict:
|
||||
"""解析抖音视频(下载方式)"""
|
||||
try:
|
||||
download_url = f"{self.api_url}/api/download"
|
||||
|
||||
# 手动构建URL,避免双重编码
|
||||
return self._normalize_response(
|
||||
cover="",
|
||||
video_url=f"{download_url}?{urlencode({'url': video_url})}",
|
||||
title="抖音视频",
|
||||
description="通过下载接口获取"
|
||||
)
|
||||
except Exception as e:
|
||||
raise Exception(f"抖音下载解析失败: {str(e)}")
|
||||
87
parsers/factory.py
Normal file
87
parsers/factory.py
Normal file
@@ -0,0 +1,87 @@
|
||||
from parsers.douyin import DouyinParser
|
||||
from parsers.tiktok import TikTokParser
|
||||
from parsers.bilibili import BilibiliMirParser, BilibiliBugPKParser, BilibiliYaohuParser
|
||||
from models import ParserAPI
|
||||
import random
|
||||
|
||||
class ParserFactory:
|
||||
"""解析器工厂类"""
|
||||
|
||||
@staticmethod
|
||||
def create_parser(api_config: ParserAPI):
|
||||
"""根据API配置创建解析器实例"""
|
||||
platform = api_config.platform.lower()
|
||||
api_url = api_config.api_url
|
||||
api_key = api_config.api_key
|
||||
|
||||
if platform == 'douyin':
|
||||
return DouyinParser(api_url, api_key)
|
||||
elif platform == 'tiktok':
|
||||
return TikTokParser(api_url, api_key)
|
||||
elif platform == 'bilibili':
|
||||
# 根据API名称选择不同的解析器
|
||||
if 'mir6' in api_url:
|
||||
return BilibiliMirParser(api_url, api_key)
|
||||
elif 'bugpk' in api_url:
|
||||
return BilibiliBugPKParser(api_url, api_key)
|
||||
elif 'yaohud' in api_url:
|
||||
return BilibiliYaohuParser(api_url, api_key)
|
||||
else:
|
||||
return BilibiliMirParser(api_url, api_key)
|
||||
else:
|
||||
raise ValueError(f"不支持的平台: {platform}")
|
||||
|
||||
@staticmethod
|
||||
def get_parser_for_platform(platform: str):
|
||||
"""获取指定平台的解析器(带负载均衡)"""
|
||||
from models import db
|
||||
|
||||
# 查询该平台所有启用且健康的API
|
||||
apis = ParserAPI.query.filter_by(
|
||||
platform=platform.lower(),
|
||||
is_enabled=True,
|
||||
health_status=True
|
||||
).all()
|
||||
|
||||
if not apis:
|
||||
raise Exception(f"没有可用的{platform}解析接口")
|
||||
|
||||
# 如果是哔哩哔哩,使用加权随机选择(负载均衡)
|
||||
if platform.lower() == 'bilibili' and len(apis) > 1:
|
||||
api = ParserFactory._weighted_random_choice(apis)
|
||||
else:
|
||||
# 其他平台选择第一个可用的
|
||||
api = apis[0]
|
||||
|
||||
return ParserFactory.create_parser(api), api
|
||||
|
||||
@staticmethod
|
||||
def _weighted_random_choice(apis):
|
||||
"""加权随机选择"""
|
||||
total_weight = sum(api.weight for api in apis)
|
||||
if total_weight == 0:
|
||||
return random.choice(apis)
|
||||
|
||||
rand = random.uniform(0, total_weight)
|
||||
current = 0
|
||||
|
||||
for api in apis:
|
||||
current += api.weight
|
||||
if rand <= current:
|
||||
return api
|
||||
|
||||
return apis[-1]
|
||||
|
||||
@staticmethod
|
||||
def detect_platform(video_url: str) -> str:
|
||||
"""检测视频链接所属平台"""
|
||||
url_lower = video_url.lower()
|
||||
|
||||
if 'douyin.com' in url_lower or 'v.douyin' in url_lower:
|
||||
return 'douyin'
|
||||
elif 'tiktok.com' in url_lower:
|
||||
return 'tiktok'
|
||||
elif 'bilibili.com' in url_lower or 'b23.tv' in url_lower:
|
||||
return 'bilibili'
|
||||
else:
|
||||
raise ValueError("无法识别的视频平台")
|
||||
70
parsers/tiktok.py
Normal file
70
parsers/tiktok.py
Normal file
@@ -0,0 +1,70 @@
|
||||
from parsers.base import BaseParser
|
||||
from typing import Dict
|
||||
from urllib.parse import urlencode
|
||||
|
||||
class TikTokParser(BaseParser):
|
||||
"""TikTok解析器"""
|
||||
|
||||
def parse(self, video_url: str) -> Dict:
|
||||
"""解析TikTok视频"""
|
||||
try:
|
||||
# 步骤1: 提取视频ID
|
||||
aweme_id = self._get_aweme_id(video_url)
|
||||
|
||||
# 步骤2: 获取视频详细信息
|
||||
video_info = self._fetch_video_info(aweme_id)
|
||||
|
||||
# 步骤3: 提取并标准化数据
|
||||
return self._extract_data(video_info)
|
||||
except Exception as e:
|
||||
raise Exception(f"TikTok解析失败: {str(e)}")
|
||||
|
||||
def _get_aweme_id(self, video_url: str) -> str:
|
||||
"""提取视频ID"""
|
||||
# 手动构建URL,避免双重编码
|
||||
url = f"{self.api_url}/api/tiktok/web/get_aweme_id?{urlencode({'url': video_url})}"
|
||||
|
||||
response = self._make_request(url)
|
||||
data = response.json()
|
||||
|
||||
if data.get("code") != 200:
|
||||
raise Exception(f"获取视频ID失败: {data.get('msg', '未知错误')}")
|
||||
|
||||
return data.get("data")
|
||||
|
||||
def _fetch_video_info(self, aweme_id: str) -> Dict:
|
||||
"""获取视频详细信息"""
|
||||
# 手动构建URL,避免双重编码
|
||||
url = f"{self.api_url}/api/tiktok/app/fetch_one_video?{urlencode({'aweme_id': aweme_id})}"
|
||||
|
||||
response = self._make_request(url)
|
||||
data = response.json()
|
||||
|
||||
if data.get("code") != 200:
|
||||
raise Exception("获取视频信息失败")
|
||||
|
||||
# TikTok API 返回的数据直接在 data 字段下,没有 aweme_detail 层级
|
||||
return data.get("data", {})
|
||||
|
||||
def _extract_data(self, video_info: Dict) -> Dict:
|
||||
"""提取并标准化数据"""
|
||||
try:
|
||||
# TikTok数据结构与抖音类似
|
||||
cover = video_info.get("video", {}).get("cover_original_scale", {}).get("url_list", [""])[0]
|
||||
if not cover:
|
||||
cover = video_info.get("video", {}).get("cover", {}).get("url_list", [""])[0]
|
||||
|
||||
video_url = video_info.get("video", {}).get("play_addr", {}).get("url_list", [""])[0]
|
||||
|
||||
title = video_info.get("desc", "")
|
||||
|
||||
author = video_info.get("author", {})
|
||||
author_name = author.get("nickname", "")
|
||||
author_signature = author.get("signature", "")
|
||||
description = f"Author: {author_name}"
|
||||
if author_signature:
|
||||
description += f" | {author_signature}"
|
||||
|
||||
return self._normalize_response(cover, video_url, title, description)
|
||||
except Exception as e:
|
||||
raise Exception(f"数据提取失败: {str(e)}")
|
||||
Reference in New Issue
Block a user