Files
JieXi/routes/admin.py
2025-11-30 19:49:25 +08:00

1035 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Blueprint, request, jsonify, session
from models import Admin, User, UserGroup, ParserAPI, SiteConfig, SMTPConfig, ParseLog, DailyParseStat, HealthCheckConfig
from models import db
from utils.security import hash_password, verify_password, get_client_ip
from utils.admin_auth import admin_required, verify_2fa, generate_2fa_secret, get_2fa_qrcode_url
from utils.email import EmailService
from datetime import datetime, timedelta, date
from sqlalchemy import func
import qrcode
import io
import base64
admin_bp = Blueprint('admin', __name__)
@admin_bp.route('/login', methods=['GET', 'POST'])
def login():
"""管理员登录"""
if request.method == 'GET':
from flask import render_template
return render_template('admin_login.html')
data = request.get_json()
username = data.get('username')
password = data.get('password')
code_2fa = data.get('code_2fa')
if not all([username, password]):
return jsonify({'success': False, 'message': '请填写完整信息'}), 400
admin = Admin.query.filter_by(username=username).first()
if not admin or not verify_password(password, admin.password):
return jsonify({'success': False, 'message': '用户名或密码错误'}), 401
# 检查2FA
if admin.is_2fa_enabled:
if not code_2fa:
return jsonify({'success': False, 'message': '请输入2FA验证码', 'require_2fa': True}), 400
if not verify_2fa(admin, code_2fa):
return jsonify({'success': False, 'message': '2FA验证码错误'}), 401
# 更新登录信息
admin.last_login_ip = get_client_ip(request)
admin.last_login_at = datetime.utcnow()
db.session.commit()
# 设置会话
session['admin_id'] = admin.id
session['admin_username'] = admin.username
return jsonify({'success': True, 'message': '登录成功'})
@admin_bp.route('/logout', methods=['POST'])
@admin_required
def logout():
"""管理员登出"""
session.pop('admin_id', None)
session.pop('admin_username', None)
return jsonify({'success': True, 'message': '已退出登录'})
@admin_bp.route('/dashboard', methods=['GET'])
@admin_required
def dashboard():
"""仪表板页面"""
from flask import render_template
return render_template('admin_dashboard.html')
@admin_bp.route('/api/dashboard', methods=['GET'])
@admin_required
def dashboard_api():
"""仪表板统计API"""
today = date.today()
# 今日统计
today_stats = db.session.query(
func.sum(DailyParseStat.parse_count).label('total'),
func.sum(DailyParseStat.success_count).label('success'),
func.sum(DailyParseStat.fail_count).label('fail')
).filter(DailyParseStat.date == today).first()
# 总用户数
total_users = User.query.count()
# 总解析次数
total_parses = ParseLog.query.count()
# 活跃API数
active_apis = ParserAPI.query.filter_by(is_enabled=True, health_status=True).count()
return jsonify({
'success': True,
'data': {
'today': {
'total': today_stats.total or 0,
'success': today_stats.success or 0,
'fail': today_stats.fail or 0
},
'total_users': total_users,
'total_parses': total_parses,
'active_apis': active_apis
}
})
@admin_bp.route('/users', methods=['GET'])
@admin_required
def users_page():
"""用户管理页面"""
from flask import render_template
return render_template('admin_users.html')
@admin_bp.route('/api/users', methods=['GET'])
@admin_required
def get_users():
"""获取用户列表API"""
from models import UserGroupExpiry
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
group_id = request.args.get('group_id', type=int)
# 构建查询
query = User.query
if group_id:
query = query.filter_by(group_id=group_id)
pagination = query.order_by(User.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
users = []
for u in pagination.items:
# 获取套餐到期时间
expiry = UserGroupExpiry.query.filter_by(user_id=u.id).first()
expires_at = None
is_expired = False
if expiry and expiry.expires_at:
expires_at = expiry.expires_at.strftime('%Y-%m-%dT%H:%M')
is_expired = expiry.expires_at < datetime.utcnow()
users.append({
'id': u.id,
'username': u.username,
'email': u.email,
'group_id': u.group_id,
'group_name': u.group.name if u.group else '',
'total_parse_count': u.total_parse_count,
'is_active': u.is_active,
'created_at': u.created_at.isoformat(),
'expires_at': expires_at,
'is_expired': is_expired
})
return jsonify({
'success': True,
'data': users,
'pagination': {
'page': page,
'per_page': per_page,
'total': pagination.total,
'pages': pagination.pages
}
})
@admin_bp.route('/api/users/<int:user_id>', methods=['PUT'])
@admin_required
def update_user(user_id):
"""更新用户信息"""
from models import UserGroupExpiry
user = User.query.get_or_404(user_id)
data = request.get_json()
if 'group_id' in data:
user.group_id = data['group_id']
if 'is_active' in data:
user.is_active = data['is_active']
# 处理套餐到期时间
if 'expires_at' in data:
expiry = UserGroupExpiry.query.filter_by(user_id=user_id).first()
if data['expires_at']:
# 解析时间字符串
expires_at = datetime.strptime(data['expires_at'], '%Y-%m-%dT%H:%M')
if expiry:
expiry.group_id = user.group_id
expiry.expires_at = expires_at
else:
expiry = UserGroupExpiry(
user_id=user_id,
group_id=user.group_id,
expires_at=expires_at
)
db.session.add(expiry)
else:
# 清空到期时间(游客/普通用户)
if expiry:
db.session.delete(expiry)
db.session.commit()
return jsonify({'success': True, 'message': '更新成功'})
@admin_bp.route('/api/groups', methods=['GET'])
@admin_required
def get_groups():
"""获取用户分组列表"""
groups = UserGroup.query.all()
return jsonify({
'success': True,
'data': [{
'id': g.id,
'name': g.name,
'daily_limit': g.daily_limit,
'description': g.description
} for g in groups]
})
@admin_bp.route('/api/groups/<int:group_id>', methods=['PUT'])
@admin_required
def update_group(group_id):
"""更新用户分组"""
group = UserGroup.query.get_or_404(group_id)
data = request.get_json()
if 'daily_limit' in data:
group.daily_limit = data['daily_limit']
if 'description' in data:
group.description = data['description']
db.session.commit()
return jsonify({'success': True, 'message': '更新成功'})
@admin_bp.route('/apis', methods=['GET'])
@admin_required
def apis_page():
"""接口管理页面"""
from flask import render_template
return render_template('admin_apis.html')
@admin_bp.route('/api/apis', methods=['GET'])
@admin_required
def get_apis():
"""获取解析接口列表API"""
apis = ParserAPI.query.all()
return jsonify({
'success': True,
'data': [{
'id': a.id,
'name': a.name,
'platform': a.platform,
'api_url': a.api_url,
'weight': a.weight,
'is_enabled': a.is_enabled,
'health_status': a.health_status,
'total_calls': a.total_calls,
'success_calls': a.success_calls,
'avg_response_time': a.avg_response_time,
'last_check_at': a.last_check_at.isoformat() if a.last_check_at else None
} for a in apis]
})
@admin_bp.route('/api/apis', methods=['POST'])
@admin_required
def create_api():
"""创建解析接口"""
data = request.get_json()
api = ParserAPI(
name=data['name'],
platform=data['platform'],
api_url=data['api_url'],
api_key=data.get('api_key'),
weight=data.get('weight', 1),
is_enabled=data.get('is_enabled', True)
)
db.session.add(api)
db.session.commit()
return jsonify({'success': True, 'message': '创建成功', 'id': api.id})
@admin_bp.route('/api/apis/<int:api_id>', methods=['PUT'])
@admin_required
def update_api(api_id):
"""更新解析接口"""
api = ParserAPI.query.get_or_404(api_id)
data = request.get_json()
for key in ['name', 'api_url', 'api_key', 'weight', 'is_enabled']:
if key in data:
setattr(api, key, data[key])
db.session.commit()
return jsonify({'success': True, 'message': '更新成功'})
@admin_bp.route('/api/apis/<int:api_id>', methods=['DELETE'])
@admin_required
def delete_api(api_id):
"""删除解析接口"""
api = ParserAPI.query.get_or_404(api_id)
db.session.delete(api)
db.session.commit()
return jsonify({'success': True, 'message': '删除成功'})
@admin_bp.route('/api/apis/<int:api_id>/test', methods=['POST'])
@admin_required
def test_api(api_id):
"""测试解析接口"""
from parsers.factory import ParserFactory
import time
api = ParserAPI.query.get_or_404(api_id)
data = request.get_json()
test_url = data.get('test_url')
if not test_url:
return jsonify({'success': False, 'message': '请提供测试链接'}), 400
try:
parser = ParserFactory.create_parser(api)
start_time = time.time()
result = parser.parse(test_url)
response_time = int((time.time() - start_time) * 1000)
# 更新健康状态和统计
api.health_status = True
api.fail_count = 0
api.total_calls += 1
api.success_calls += 1
api.avg_response_time = int((api.avg_response_time * (api.total_calls - 1) + response_time) / api.total_calls)
api.last_check_at = datetime.utcnow()
db.session.commit()
return jsonify({
'success': True,
'message': '测试成功',
'data': result,
'response_time': response_time
})
except Exception as e:
# 更新失败状态
api.fail_count += 1
api.total_calls += 1
if api.fail_count >= 3:
api.health_status = False
api.last_check_at = datetime.utcnow()
db.session.commit()
return jsonify({'success': False, 'message': f'测试失败: {str(e)}'}), 500
@admin_bp.route('/config', methods=['GET'])
@admin_required
def config_page():
"""站点配置页面"""
from flask import render_template
return render_template('admin_config.html')
@admin_bp.route('/api/config', methods=['GET'])
@admin_required
def get_config():
"""获取站点配置API"""
configs = SiteConfig.query.all()
return jsonify({
'success': True,
'data': {c.config_key: c.config_value for c in configs}
})
@admin_bp.route('/api/config', methods=['PUT'])
@admin_required
def update_config():
"""更新站点配置"""
data = request.get_json()
for key, value in data.items():
config = SiteConfig.query.filter_by(config_key=key).first()
if config:
config.config_value = str(value)
else:
config = SiteConfig(config_key=key, config_value=str(value))
db.session.add(config)
db.session.commit()
return jsonify({'success': True, 'message': '更新成功'})
@admin_bp.route('/smtp', methods=['GET'])
@admin_required
def smtp_page():
"""SMTP配置页面"""
from flask import render_template
return render_template('admin_smtp.html')
@admin_bp.route('/api/smtp', methods=['GET'])
@admin_required
def get_smtp():
"""获取SMTP配置列表"""
smtps = SMTPConfig.query.all()
return jsonify({
'success': True,
'data': [{
'id': s.id,
'name': s.name,
'host': s.host,
'port': s.port,
'username': s.username,
'from_email': s.from_email,
'from_name': s.from_name,
'use_tls': s.use_tls,
'is_enabled': s.is_enabled,
'is_default': s.is_default,
'weight': s.weight,
'send_count': s.send_count,
'fail_count': s.fail_count
} for s in smtps]
})
@admin_bp.route('/api/smtp', methods=['POST'])
@admin_required
def create_smtp():
"""创建SMTP配置"""
data = request.get_json()
smtp = SMTPConfig(
name=data['name'],
host=data['host'],
port=data['port'],
username=data['username'],
password=data['password'],
from_email=data['from_email'],
from_name=data.get('from_name', ''),
use_tls=data.get('use_tls', True),
is_enabled=data.get('is_enabled', True),
weight=data.get('weight', 1)
)
db.session.add(smtp)
db.session.commit()
return jsonify({'success': True, 'message': '创建成功', 'id': smtp.id})
@admin_bp.route('/api/smtp/<int:smtp_id>', methods=['PUT'])
@admin_required
def update_smtp(smtp_id):
"""更新SMTP配置"""
smtp = SMTPConfig.query.get_or_404(smtp_id)
data = request.get_json()
for key in ['name', 'host', 'port', 'username', 'from_email', 'from_name', 'use_tls', 'is_enabled', 'weight']:
if key in data:
setattr(smtp, key, data[key])
# Only update password when user explicitly provides one, so empty edits won't wipe valid credentials.
if 'password' in data and data.get('password'):
smtp.password = data['password']
db.session.commit()
return jsonify({'success': True, 'message': '更新成功'})
@admin_bp.route('/api/smtp/test', methods=['POST'])
@admin_required
def test_smtp():
"""测试SMTP配置"""
data = request.get_json()
email = data.get('email')
if not email:
return jsonify({'success': False, 'message': '请提供测试邮箱'}), 400
try:
EmailService.send_email(
email,
'【短视频解析平台】SMTP测试邮件',
'<h2>测试成功</h2><p>如果您收到此邮件说明SMTP配置正常。</p>',
html=True
)
return jsonify({'success': True, 'message': '测试邮件已发送'})
except Exception as e:
import traceback
error_detail = traceback.format_exc()
print(f"SMTP测试失败详细信息:\n{error_detail}")
# 提供更友好的错误提示
error_msg = str(e)
if 'Connection unexpectedly closed' in error_msg:
error_msg = '连接被服务器关闭请检查1) 端口和加密方式是否匹配 2) QQ邮箱需使用授权码而非密码 3) 用户名是否正确'
elif 'Authentication failed' in error_msg or '535' in error_msg:
error_msg = '认证失败请检查用户名和密码QQ邮箱需使用授权码'
elif 'timed out' in error_msg:
error_msg = '连接超时,请检查网络和服务器地址'
return jsonify({'success': False, 'message': f'邮件发送失败: {error_msg}'}), 500
@admin_bp.route('/api/stats/parse', methods=['GET'])
@admin_required
def get_parse_stats():
"""获取解析统计"""
days = request.args.get('days', 7, type=int)
start_date = date.today() - timedelta(days=days-1)
stats = db.session.query(
DailyParseStat.date,
func.sum(DailyParseStat.parse_count).label('total'),
func.sum(DailyParseStat.success_count).label('success'),
func.sum(DailyParseStat.fail_count).label('fail')
).filter(DailyParseStat.date >= start_date).group_by(DailyParseStat.date).all()
return jsonify({
'success': True,
'data': [{
'date': s.date.isoformat(),
'total': s.total or 0,
'success': s.success or 0,
'fail': s.fail or 0
} for s in stats]
})
@admin_bp.route('/api/stats/platform', methods=['GET'])
@admin_required
def get_platform_stats():
"""获取平台统计"""
stats = db.session.query(
ParseLog.platform,
func.count(ParseLog.id).label('count')
).group_by(ParseLog.platform).all()
return jsonify({
'success': True,
'data': [{
'platform': s.platform,
'count': s.count
} for s in stats]
})
@admin_bp.route('/api/2fa/enable', methods=['POST'])
@admin_required
def enable_2fa():
"""启用2FA"""
admin_id = session.get('admin_id')
admin = Admin.query.get(admin_id)
if admin.is_2fa_enabled:
return jsonify({'success': False, 'message': '2FA已启用'}), 400
# 生成密钥
secret = generate_2fa_secret()
qr_url = get_2fa_qrcode_url(admin, secret)
# 生成二维码
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(qr_url)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
# 转换为base64
buffer = io.BytesIO()
img.save(buffer, format='PNG')
img_str = base64.b64encode(buffer.getvalue()).decode()
# 临时保存密钥(需要验证后才正式启用)
session['temp_2fa_secret'] = secret
return jsonify({
'success': True,
'secret': secret,
'qr_code': f'data:image/png;base64,{img_str}'
})
@admin_bp.route('/api/2fa/verify', methods=['POST'])
@admin_required
def verify_2fa_setup():
"""验证并启用2FA"""
admin_id = session.get('admin_id')
admin = Admin.query.get(admin_id)
data = request.get_json()
code = data.get('code')
secret = session.get('temp_2fa_secret')
if not secret:
return jsonify({'success': False, 'message': '请先生成2FA密钥'}), 400
# 验证代码
import pyotp
totp = pyotp.TOTP(secret)
if not totp.verify(code, valid_window=1):
return jsonify({'success': False, 'message': '验证码错误'}), 400
# 启用2FA
admin.totp_secret = secret
admin.is_2fa_enabled = True
db.session.commit()
session.pop('temp_2fa_secret', None)
return jsonify({'success': True, 'message': '2FA已启用'})
@admin_bp.route('/api/2fa/disable', methods=['POST'])
@admin_required
def disable_2fa():
"""禁用2FA"""
admin_id = session.get('admin_id')
admin = Admin.query.get(admin_id)
data = request.get_json()
code = data.get('code')
if not admin.is_2fa_enabled:
return jsonify({'success': False, 'message': '2FA未启用'}), 400
# 验证代码
if not verify_2fa(admin, code):
return jsonify({'success': False, 'message': '验证码错误'}), 401
# 禁用2FA
admin.is_2fa_enabled = False
admin.totp_secret = None
db.session.commit()
return jsonify({'success': True, 'message': '2FA已禁用'})
@admin_bp.route('/api/smtp/<int:smtp_id>', methods=['DELETE'])
@admin_required
def delete_smtp(smtp_id):
"""删除SMTP配置"""
smtp = SMTPConfig.query.get_or_404(smtp_id)
db.session.delete(smtp)
db.session.commit()
return jsonify({'success': True, 'message': '删除成功'})
@admin_bp.route('/api/smtp/<int:smtp_id>/set-default', methods=['POST'])
@admin_required
def set_default_smtp(smtp_id):
"""设置默认SMTP"""
SMTPConfig.query.update({'is_default': False})
smtp = SMTPConfig.query.get_or_404(smtp_id)
smtp.is_default = True
db.session.commit()
return jsonify({'success': True, 'message': '已设置为默认SMTP'})
@admin_bp.route('/logs', methods=['GET'])
@admin_required
def logs_page():
"""解析日志页面"""
from flask import render_template
return render_template('admin_logs.html')
@admin_bp.route('/api/logs', methods=['GET'])
@admin_required
def get_logs():
"""获取解析日志"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
platform = request.args.get('platform')
status = request.args.get('status')
query = ParseLog.query
if platform:
query = query.filter_by(platform=platform)
if status:
query = query.filter_by(status=status)
pagination = query.order_by(ParseLog.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
logs = [{
'id': log.id,
'user_id': log.user_id,
'ip_address': log.ip_address,
'platform': log.platform,
'video_url': log.video_url,
'status': log.status,
'error_message': log.error_message,
'response_time': log.response_time,
'created_at': log.created_at.isoformat()
} for log in pagination.items]
return jsonify({
'success': True,
'data': logs,
'pagination': {
'page': page,
'per_page': per_page,
'total': pagination.total,
'pages': pagination.pages
}
})
@admin_bp.route('/health-checks', methods=['GET'])
@admin_required
def health_checks_page():
"""健康检查配置页面"""
from flask import render_template
return render_template('admin_health_checks.html')
@admin_bp.route('/api/health-checks', methods=['GET'])
@admin_required
def get_health_checks():
"""获取健康检查配置"""
configs = HealthCheckConfig.query.all()
return jsonify({
'success': True,
'data': [{
'id': c.id,
'platform': c.platform,
'test_url': c.test_url,
'check_interval': c.check_interval,
'is_enabled': c.is_enabled,
'alert_email': c.alert_email
} for c in configs]
})
@admin_bp.route('/api/health-checks', methods=['POST'])
@admin_required
def create_health_check():
"""创建健康检查配置"""
data = request.get_json()
config = HealthCheckConfig(
platform=data['platform'],
test_url=data['test_url'],
check_interval=data.get('check_interval', 300),
is_enabled=data.get('is_enabled', True),
alert_email=data.get('alert_email')
)
db.session.add(config)
db.session.commit()
return jsonify({'success': True, 'message': '创建成功', 'id': config.id})
@admin_bp.route('/api/health-checks/<int:config_id>', methods=['PUT'])
@admin_required
def update_health_check(config_id):
"""更新健康检查配置"""
config = HealthCheckConfig.query.get_or_404(config_id)
data = request.get_json()
for key in ['test_url', 'check_interval', 'is_enabled', 'alert_email']:
if key in data:
setattr(config, key, data[key])
db.session.commit()
return jsonify({'success': True, 'message': '更新成功'})
@admin_bp.route('/api/health-checks/<int:config_id>', methods=['DELETE'])
@admin_required
def delete_health_check(config_id):
"""删除健康检查配置"""
config = HealthCheckConfig.query.get_or_404(config_id)
db.session.delete(config)
db.session.commit()
return jsonify({'success': True, 'message': '删除成功'})
@admin_bp.route('/profile', methods=['GET'])
@admin_required
def profile_page():
"""账号管理页面"""
from flask import render_template
return render_template('admin_profile.html')
@admin_bp.route('/api/profile', methods=['GET'])
@admin_required
def get_profile():
"""获取管理员信息"""
admin_id = session.get('admin_id')
admin = Admin.query.get(admin_id)
return jsonify({
'success': True,
'data': {
'id': admin.id,
'username': admin.username,
'email': admin.email,
'is_2fa_enabled': admin.is_2fa_enabled,
'last_login_ip': admin.last_login_ip,
'last_login_at': admin.last_login_at.isoformat() if admin.last_login_at else None
}
})
@admin_bp.route('/api/profile/password', methods=['PUT'])
@admin_required
def change_password():
"""修改管理员密码"""
admin_id = session.get('admin_id')
admin = Admin.query.get(admin_id)
data = request.get_json()
old_password = data.get('old_password')
new_password = data.get('new_password')
if not all([old_password, new_password]):
return jsonify({'success': False, 'message': '请填写完整信息'}), 400
if not verify_password(old_password, admin.password):
return jsonify({'success': False, 'message': '原密码错误'}), 401
admin.password = hash_password(new_password)
db.session.commit()
return jsonify({'success': True, 'message': '密码修改成功'})
@admin_bp.route('/api/profile/email', methods=['PUT'])
@admin_required
def change_email():
"""修改管理员邮箱"""
admin_id = session.get('admin_id')
admin = Admin.query.get(admin_id)
data = request.get_json()
email = data.get('email')
if not email:
return jsonify({'success': False, 'message': '请提供邮箱地址'}), 400
admin.email = email
db.session.commit()
return jsonify({'success': True, 'message': '邮箱修改成功'})
# ==================== 用户组 API ====================
@admin_bp.route('/api/user-groups', methods=['GET'])
@admin_required
def get_user_groups():
"""获取用户组列表(仅返回可兑换的套餐,排除游客和普通用户)"""
# 排除游客(id=1)和普通用户(id=2)
groups = UserGroup.query.filter(UserGroup.id > 2).all()
return jsonify({
'success': True,
'data': [{'id': g.id, 'name': g.name, 'daily_limit': g.daily_limit} for g in groups]
})
# ==================== 兑换码管理 ====================
@admin_bp.route('/redeem-codes')
@admin_required
def redeem_codes_page():
"""兑换码管理页面"""
from flask import render_template
return render_template('admin_redeem_codes.html')
@admin_bp.route('/api/redeem-codes', methods=['GET'])
@admin_required
def get_redeem_codes():
"""获取兑换码列表"""
from models import RedeemCode
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
batch_id = request.args.get('batch_id', '')
status = request.args.get('status', '') # unused, used, expired
query = RedeemCode.query
if batch_id:
query = query.filter(RedeemCode.batch_id == batch_id)
if status == 'unused':
query = query.filter(RedeemCode.is_used == False)
elif status == 'used':
query = query.filter(RedeemCode.is_used == True)
elif status == 'expired':
query = query.filter(RedeemCode.is_used == False, RedeemCode.expires_at < datetime.utcnow())
pagination = query.order_by(RedeemCode.created_at.desc()).paginate(page=page, per_page=per_page)
codes = []
for code in pagination.items:
codes.append({
'id': code.id,
'code': code.code,
'batch_id': code.batch_id,
'target_group': code.target_group.name if code.target_group else '',
'target_group_id': code.target_group_id,
'duration_days': code.duration_days,
'is_used': code.is_used,
'used_by': code.user.username if code.user else None,
'used_at': code.used_at.strftime('%Y-%m-%d %H:%M') if code.used_at else None,
'expires_at': code.expires_at.strftime('%Y-%m-%d %H:%M') if code.expires_at else None,
'is_expired': code.expires_at and code.expires_at < datetime.utcnow() and not code.is_used,
'remark': code.remark,
'created_at': code.created_at.strftime('%Y-%m-%d %H:%M')
})
return jsonify({
'success': True,
'data': codes,
'pagination': {
'page': page,
'per_page': per_page,
'total': pagination.total,
'pages': pagination.pages
}
})
@admin_bp.route('/api/redeem-codes/generate', methods=['POST'])
@admin_required
def generate_redeem_codes():
"""批量生成兑换码"""
from models import RedeemCode
import secrets
import string
data = request.get_json()
count = data.get('count', 1)
target_group_id = data.get('target_group_id')
duration_days = data.get('duration_days', 30)
expires_days = data.get('expires_days') # 兑换码有效期(天)
remark = data.get('remark', '')
prefix = data.get('prefix', '') # 兑换码前缀
if not target_group_id:
return jsonify({'success': False, 'message': '请选择目标用户组'}), 400
if count < 1 or count > 1000:
return jsonify({'success': False, 'message': '生成数量必须在1-1000之间'}), 400
# 验证用户组存在
group = UserGroup.query.get(target_group_id)
if not group:
return jsonify({'success': False, 'message': '用户组不存在'}), 400
# 生成批次ID
batch_id = datetime.now().strftime('%Y%m%d%H%M%S') + secrets.token_hex(4)
# 计算过期时间
expires_at = None
if expires_days:
expires_at = datetime.utcnow() + timedelta(days=expires_days)
# 生成兑换码
codes = []
chars = string.ascii_uppercase + string.digits
for _ in range(count):
# 生成随机码
random_part = ''.join(secrets.choice(chars) for _ in range(12))
code_str = f"{prefix}{random_part}" if prefix else random_part
code = RedeemCode(
code=code_str,
batch_id=batch_id,
target_group_id=target_group_id,
duration_days=duration_days,
expires_at=expires_at,
remark=remark
)
db.session.add(code)
codes.append(code_str)
db.session.commit()
return jsonify({
'success': True,
'message': f'成功生成 {count} 个兑换码',
'data': {
'batch_id': batch_id,
'codes': codes
}
})
@admin_bp.route('/api/redeem-codes/<int:code_id>', methods=['DELETE'])
@admin_required
def delete_redeem_code(code_id):
"""删除兑换码"""
from models import RedeemCode
code = RedeemCode.query.get(code_id)
if not code:
return jsonify({'success': False, 'message': '兑换码不存在'}), 404
if code.is_used:
return jsonify({'success': False, 'message': '已使用的兑换码不能删除'}), 400
db.session.delete(code)
db.session.commit()
return jsonify({'success': True, 'message': '删除成功'})
@admin_bp.route('/api/redeem-codes/batch/<batch_id>', methods=['DELETE'])
@admin_required
def delete_batch_codes(batch_id):
"""删除整批兑换码"""
from models import RedeemCode
# 只删除未使用的
deleted = RedeemCode.query.filter_by(batch_id=batch_id, is_used=False).delete()
db.session.commit()
return jsonify({'success': True, 'message': f'成功删除 {deleted} 个未使用的兑换码'})
@admin_bp.route('/api/redeem-codes/batches', methods=['GET'])
@admin_required
def get_batch_list():
"""获取批次列表"""
from models import RedeemCode
batches = db.session.query(
RedeemCode.batch_id,
func.count(RedeemCode.id).label('total'),
func.sum(db.case((RedeemCode.is_used == True, 1), else_=0)).label('used'),
func.min(RedeemCode.created_at).label('created_at')
).group_by(RedeemCode.batch_id).order_by(func.min(RedeemCode.created_at).desc()).all()
result = []
for batch in batches:
result.append({
'batch_id': batch.batch_id,
'total': batch.total,
'used': int(batch.used or 0),
'unused': batch.total - int(batch.used or 0),
'created_at': batch.created_at.strftime('%Y-%m-%d %H:%M') if batch.created_at else ''
})
return jsonify({'success': True, 'data': result})
@admin_bp.route('/api/redeem-codes/export/<batch_id>', methods=['GET'])
@admin_required
def export_batch_codes(batch_id):
"""导出批次兑换码"""
from models import RedeemCode
codes = RedeemCode.query.filter_by(batch_id=batch_id).all()
if not codes:
return jsonify({'success': False, 'message': '批次不存在'}), 404
code_list = [code.code for code in codes]
return jsonify({
'success': True,
'data': {
'batch_id': batch_id,
'codes': code_list,
'total': len(code_list)
}
})