Files
JieXi/routes/api_v1.py
2025-11-30 19:49:25 +08:00

213 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Blueprint, request, jsonify
from models import ParserAPI, ParseLog, UserApiKey, ApiKeyDailyStat, db
from parsers.factory import ParserFactory
from utils.security import get_client_ip
from datetime import datetime, date
import time
api_v1_bp = Blueprint('api_v1', __name__, url_prefix='/api/v1')
def validate_and_get_key(api_key):
"""验证 API Key 并返回 key 对象"""
if not api_key:
return None, 'API Key 不能为空'
key_obj = UserApiKey.query.filter_by(api_key=api_key).first()
if not key_obj:
return None, 'API Key 无效'
if not key_obj.is_active:
return None, 'API Key 已被禁用'
if not key_obj.user.is_active:
return None, '用户账号已被禁用'
# 检查每日限额
today_stat = ApiKeyDailyStat.query.filter_by(
api_key_id=key_obj.id,
date=date.today()
).first()
if today_stat and today_stat.call_count >= key_obj.daily_limit:
return None, f'已达到每日调用限额({key_obj.daily_limit}次)'
return key_obj, None
def record_api_call(key_obj, ip_address, success=True):
"""记录 API 调用"""
key_obj.total_calls = (key_obj.total_calls or 0) + 1
key_obj.last_used_at = datetime.utcnow()
key_obj.last_used_ip = ip_address
today_stat = ApiKeyDailyStat.query.filter_by(
api_key_id=key_obj.id,
date=date.today()
).first()
if not today_stat:
today_stat = ApiKeyDailyStat(api_key_id=key_obj.id, date=date.today())
db.session.add(today_stat)
today_stat.call_count = (today_stat.call_count or 0) + 1
if success:
today_stat.success_count = (today_stat.success_count or 0) + 1
else:
today_stat.fail_count = (today_stat.fail_count or 0) + 1
@api_v1_bp.route('/parse', methods=['GET'])
def parse_video():
"""
对外解析API - 简化版
请求方式: GET
参数:
key: API Key
url: 视频链接
示例: /api/v1/parse?key=sk_xxx&url=https://v.douyin.com/xxx
返回:
{
"code": 200,
"msg": "解析成功",
"data": {
"cover": "封面URL",
"title": "标题",
"description": "简介",
"author": "作者",
"video_url": "无水印视频链接"
}
}
"""
# 获取参数
api_key = request.args.get('key')
video_url = request.args.get('url')
# 验证 API Key
key_obj, error = validate_and_get_key(api_key)
if error:
return jsonify({'code': 401, 'msg': error})
# 验证 URL
if not video_url:
return jsonify({'code': 400, 'msg': '请提供视频链接'})
# 检测平台
try:
platform = ParserFactory.detect_platform(video_url)
except ValueError as e:
return jsonify({'code': 400, 'msg': str(e)})
# 展开短链接
video_url = ParserFactory.expand_short_url(video_url)
# 获取客户端IP
ip_address = get_client_ip(request)
start_time = time.time()
# 获取该平台所有可用的API
available_apis = ParserAPI.query.filter_by(
platform=platform.lower(),
is_enabled=True
).all()
if not available_apis:
return jsonify({'code': 503, 'msg': f'没有可用的{platform}解析接口'})
last_error = None
user_id = key_obj.user_id
# 尝试所有可用的APIfailover机制
for api_config in available_apis:
try:
parser = ParserFactory.create_parser(api_config)
result = parser.parse(video_url)
# 验证解析结果video_url 不能为空
if not result.get('video_url'):
raise Exception('未能获取到视频链接')
response_time = int((time.time() - start_time) * 1000)
# 记录成功日志
log = ParseLog(
user_id=user_id,
ip_address=ip_address,
platform=platform,
video_url=video_url,
parser_api_id=api_config.id,
status='success',
response_time=response_time
)
db.session.add(log)
# 更新API统计
api_config.total_calls = (api_config.total_calls or 0) + 1
api_config.success_calls = (api_config.success_calls or 0) + 1
avg_time = api_config.avg_response_time or 0
api_config.avg_response_time = int(
(avg_time * (api_config.total_calls - 1) + response_time) / api_config.total_calls
)
api_config.fail_count = 0
# 记录 API Key 调用
record_api_call(key_obj, ip_address, success=True)
db.session.commit()
return jsonify({
'code': 200,
'msg': '解析成功',
'data': {
'cover': result.get('cover', ''),
'title': result.get('title', ''),
'description': result.get('description', ''),
'author': result.get('author', ''),
'video_url': result.get('video_url', '')
}
})
except Exception as e:
last_error = str(e)
api_config.total_calls = (api_config.total_calls or 0) + 1
api_config.fail_count = (api_config.fail_count or 0) + 1
db.session.commit()
continue
# 所有API都失败
response_time = int((time.time() - start_time) * 1000)
log = ParseLog(
user_id=user_id,
ip_address=ip_address,
platform=platform,
video_url=video_url,
status='failed',
error_message=last_error or '所有接口都失败',
response_time=response_time
)
db.session.add(log)
record_api_call(key_obj, ip_address, success=False)
db.session.commit()
return jsonify({
'code': 500,
'msg': last_error or '解析失败,请稍后重试'
})
@api_v1_bp.route('/platforms', methods=['GET'])
def get_platforms():
"""获取支持的平台列表"""
return jsonify({
'code': 200,
'msg': '获取成功',
'data': [
{'name': 'douyin', 'display_name': '抖音'},
{'name': 'tiktok', 'display_name': 'TikTok'},
{'name': 'bilibili', 'display_name': '哔哩哔哩'},
{'name': 'kuaishou', 'display_name': '快手'},
{'name': 'pipixia', 'display_name': '皮皮虾'},
{'name': 'weibo', 'display_name': '微博'}
]
})