172 lines
7.3 KiB
Python
172 lines
7.3 KiB
Python
from flask import Blueprint, render_template, request, redirect, url_for, session, current_app, jsonify
|
|
from functools import wraps
|
|
from loguru import logger
|
|
import time
|
|
|
|
# 创建认证蓝图
|
|
auth_bp = Blueprint('auth', __name__)
|
|
|
|
|
|
def _is_ajax_request() -> bool:
|
|
"""判断当前请求是否由前端异步发起。"""
|
|
requested_with = str(request.headers.get("X-Requested-With", "") or "").strip().lower()
|
|
accept_header = str(request.headers.get("Accept", "") or "").strip().lower()
|
|
return requested_with == "xmlhttprequest" or "application/json" in accept_header
|
|
|
|
|
|
# 登录检查装饰器
|
|
def login_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not session.get('logged_in'):
|
|
return redirect(url_for('auth.login'))
|
|
server = current_app.dashboard_server
|
|
session_timeout_seconds = int(server.get_auth_policy().get("session_timeout_minutes", 480) * 60)
|
|
last_activity_at = float(session.get("last_activity_at") or 0.0)
|
|
if last_activity_at > 0 and (time.time() - last_activity_at) > session_timeout_seconds:
|
|
# 会话超时后统一清理登录态,避免长期闲置会话继续可用。
|
|
session.clear()
|
|
return redirect(url_for('auth.login'))
|
|
session["last_activity_at"] = time.time()
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
# 登录页面
|
|
@auth_bp.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
error = None
|
|
if request.method == 'POST':
|
|
# 使用 strip 规避用户误输入首尾空格导致的误判。
|
|
username = str(request.form.get('username', '') or '').strip()
|
|
password = str(request.form.get('password', '') or '')
|
|
remote_ip = request.remote_addr or ""
|
|
|
|
# 从应用上下文获取服务器实例,而不是从蓝图对象
|
|
server = current_app.dashboard_server
|
|
admin_db = getattr(server, "admin_account_db", None)
|
|
lock_status = server.get_login_lock_status(username, remote_ip)
|
|
if lock_status.get("locked"):
|
|
wait_seconds = int(lock_status.get("remaining_seconds") or 0)
|
|
error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试"
|
|
if _is_ajax_request():
|
|
return jsonify({"success": False, "error": error}), 429
|
|
return render_template('login.html', error=error)
|
|
|
|
# 优先使用数据库账号体系鉴权;若不可用则回退旧配置模式,保证兼容存量部署。
|
|
login_ok = False
|
|
if admin_db:
|
|
try:
|
|
login_ok = admin_db.verify_admin_password(username, password)
|
|
if login_ok:
|
|
admin_db.mark_login_success(username, request.remote_addr or "")
|
|
except Exception as e:
|
|
logger.error(f"数据库账号登录校验异常,回退配置模式: {e}")
|
|
login_ok = False
|
|
|
|
if not login_ok:
|
|
login_ok = (username == server.username and password == server.password)
|
|
|
|
if login_ok:
|
|
server.clear_login_failures(username, remote_ip)
|
|
session['logged_in'] = True
|
|
session.permanent = True
|
|
session['username'] = username # 存储用户名到session
|
|
session['last_activity_at'] = time.time()
|
|
# 若账号仍在使用默认/弱口令,则登录后强提示修改密码。
|
|
session['force_password_change'] = bool(server.should_force_password_change(username))
|
|
logger.debug(f"Login successful. Session after login: {dict(session)}")
|
|
if _is_ajax_request():
|
|
return jsonify({"success": True, "redirect_url": url_for('main.index')})
|
|
return redirect(url_for('main.index'))
|
|
else:
|
|
lock_status = server.mark_login_failure(username, remote_ip)
|
|
if lock_status.get("locked"):
|
|
wait_seconds = int(lock_status.get("remaining_seconds") or 0)
|
|
error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试"
|
|
else:
|
|
remaining = max(0, int(server.get_auth_policy().get("max_failed_attempts", 5)) - int(lock_status.get("failed_count") or 0))
|
|
error = '用户名或密码错误'
|
|
if remaining > 0:
|
|
error = f"{error},再失败 {remaining} 次将暂时锁定"
|
|
if _is_ajax_request():
|
|
return jsonify({"success": False, "error": error}), 400
|
|
|
|
return render_template('login.html', error=error)
|
|
|
|
|
|
# 登出
|
|
@auth_bp.route('/logout')
|
|
def logout():
|
|
session.clear()
|
|
return redirect(url_for('auth.login'))
|
|
|
|
|
|
@auth_bp.route('/api/auth/security_status', methods=['GET'])
|
|
@login_required
|
|
def get_security_status():
|
|
"""返回当前登录管理员的安全状态。"""
|
|
return jsonify({
|
|
"success": True,
|
|
"data": {
|
|
"force_password_change": bool(session.get("force_password_change", False)),
|
|
"session_timeout_minutes": int(current_app.dashboard_server.get_auth_policy().get("session_timeout_minutes", 480)),
|
|
}
|
|
})
|
|
|
|
|
|
@auth_bp.route('/api/auth/change_password', methods=['POST'])
|
|
@login_required
|
|
def change_password():
|
|
"""修改当前登录管理员密码。
|
|
|
|
前端请求参数:
|
|
{
|
|
"old_password": "旧密码",
|
|
"new_password": "新密码",
|
|
"confirm_password": "确认新密码"
|
|
}
|
|
"""
|
|
server = current_app.dashboard_server
|
|
admin_db = getattr(server, "admin_account_db", None)
|
|
if not admin_db:
|
|
return jsonify({"success": False, "error": "账号数据库未初始化,无法修改密码"}), 500
|
|
|
|
payload = request.get_json(silent=True) or {}
|
|
old_password = str(payload.get("old_password", "") or "")
|
|
new_password = str(payload.get("new_password", "") or "")
|
|
confirm_password = str(payload.get("confirm_password", "") or "")
|
|
username = str(session.get("username", "") or "").strip()
|
|
|
|
if not username:
|
|
return jsonify({"success": False, "error": "会话失效,请重新登录"}), 401
|
|
|
|
if not old_password or not new_password or not confirm_password:
|
|
return jsonify({"success": False, "error": "请完整填写旧密码与新密码"}), 400
|
|
|
|
if new_password != confirm_password:
|
|
return jsonify({"success": False, "error": "两次输入的新密码不一致"}), 400
|
|
|
|
if new_password == old_password:
|
|
return jsonify({"success": False, "error": "新密码不能与旧密码相同"}), 400
|
|
|
|
password_error = admin_db.validate_password_strength(new_password)
|
|
if password_error:
|
|
return jsonify({"success": False, "error": password_error}), 400
|
|
|
|
try:
|
|
if not admin_db.verify_admin_password(username, old_password):
|
|
return jsonify({"success": False, "error": "旧密码错误"}), 400
|
|
|
|
updated = admin_db.update_password(username, new_password)
|
|
if not updated:
|
|
return jsonify({"success": False, "error": "密码更新失败,请稍后重试"}), 500
|
|
|
|
# 密码修改成功后,移除“必须修改密码”的会话提示。
|
|
session["force_password_change"] = False
|
|
return jsonify({"success": True, "message": "密码修改成功"})
|
|
except Exception as e:
|
|
logger.error(f"修改后台密码失败: username={username}, error={e}")
|
|
return jsonify({"success": False, "error": "密码修改失败,请检查日志"}), 500
|