105 lines
2.6 KiB
Python
105 lines
2.6 KiB
Python
from functools import wraps
|
|
from flask import request, jsonify
|
|
from models import db, UserApiKey, ApiKeyDailyStat
|
|
from datetime import datetime, date
|
|
|
|
|
|
def get_api_key_from_request():
|
|
"""从请求中获取 API Key"""
|
|
# 优先从 Header 获取
|
|
api_key = request.headers.get('X-API-Key')
|
|
if api_key:
|
|
return api_key
|
|
|
|
# 其次从 Authorization Bearer 获取
|
|
auth_header = request.headers.get('Authorization')
|
|
if auth_header and auth_header.startswith('Bearer '):
|
|
return auth_header[7:]
|
|
|
|
# 最后从查询参数获取
|
|
return request.args.get('api_key')
|
|
|
|
|
|
def validate_api_key(api_key):
|
|
"""
|
|
验证 API Key
|
|
返回: (is_valid, key_obj_or_error_msg)
|
|
"""
|
|
if not api_key:
|
|
return False, 'API Key 不能为空'
|
|
|
|
key_obj = UserApiKey.query.filter_by(api_key=api_key).first()
|
|
|
|
if not key_obj:
|
|
return False, 'API Key 无效'
|
|
|
|
if not key_obj.is_active:
|
|
return False, 'API Key 已被禁用'
|
|
|
|
if not key_obj.user.is_active:
|
|
return False, '用户账号已被禁用'
|
|
|
|
# 检查每日限额
|
|
today_stat = ApiKeyDailyStat.query.filter_by(
|
|
api_key_id=key_obj.id,
|
|
date=date.today()
|
|
).first()
|
|
|
|
if today_stat and today_stat.call_count >= key_obj.daily_limit:
|
|
return False, f'已达到每日调用限额({key_obj.daily_limit}次)'
|
|
|
|
return True, key_obj
|
|
|
|
|
|
def record_api_call(key_obj, ip_address, success=True):
|
|
"""记录 API 调用"""
|
|
# 更新 Key 统计
|
|
key_obj.total_calls += 1
|
|
key_obj.last_used_at = datetime.utcnow()
|
|
key_obj.last_used_ip = ip_address
|
|
|
|
# 更新每日统计
|
|
today_stat = ApiKeyDailyStat.query.filter_by(
|
|
api_key_id=key_obj.id,
|
|
date=date.today()
|
|
).first()
|
|
|
|
if not today_stat:
|
|
today_stat = ApiKeyDailyStat(
|
|
api_key_id=key_obj.id,
|
|
date=date.today()
|
|
)
|
|
db.session.add(today_stat)
|
|
|
|
today_stat.call_count += 1
|
|
if success:
|
|
today_stat.success_count += 1
|
|
else:
|
|
today_stat.fail_count += 1
|
|
|
|
db.session.commit()
|
|
|
|
|
|
def api_key_required(f):
|
|
"""API Key 鉴权装饰器"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
api_key = get_api_key_from_request()
|
|
|
|
is_valid, result = validate_api_key(api_key)
|
|
|
|
if not is_valid:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': {
|
|
'code': 'UNAUTHORIZED',
|
|
'message': result
|
|
}
|
|
}), 401
|
|
|
|
# 将 key 对象传递给视图函数
|
|
request.api_key_obj = result
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|