Files
abot/admin/dashboard/blueprints/auth.py

193 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Blueprint, render_template, request, redirect, url_for, session, current_app, jsonify
from functools import wraps
from loguru import logger
import time
# 创建认证蓝图
auth_bp = Blueprint('auth', __name__)
def _is_ajax_request() -> bool:
"""判断当前请求是否由前端异步发起。"""
requested_with = str(request.headers.get("X-Requested-With", "") or "").strip().lower()
accept_header = str(request.headers.get("Accept", "") or "").strip().lower()
return requested_with == "xmlhttprequest" or "application/json" in accept_header
# 登录检查装饰器
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
return redirect(url_for('auth.login'))
server = current_app.dashboard_server
session_timeout_seconds = int(server.get_auth_policy().get("session_timeout_minutes", 480) * 60)
last_activity_at = float(session.get("last_activity_at") or 0.0)
if last_activity_at > 0 and (time.time() - last_activity_at) > session_timeout_seconds:
# 会话超时后统一清理登录态,避免长期闲置会话继续可用。
session.clear()
return redirect(url_for('auth.login'))
session["last_activity_at"] = time.time()
return f(*args, **kwargs)
return decorated_function
# 登录页面
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
error = None
if request.method == 'POST':
# 使用 strip 规避用户误输入首尾空格导致的误判。
username = str(request.form.get('username', '') or '').strip()
password = str(request.form.get('password', '') or '')
remote_ip = request.remote_addr or ""
# 从应用上下文获取服务器实例,而不是从蓝图对象
server = current_app.dashboard_server
admin_db = getattr(server, "admin_account_db", None)
lock_status = server.get_login_lock_status(username, remote_ip)
if lock_status.get("locked"):
wait_seconds = int(lock_status.get("remaining_seconds") or 0)
error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试"
if _is_ajax_request():
return jsonify({"success": False, "error": error}), 429
return render_template('login.html', error=error)
# 优先使用数据库账号体系鉴权。
#
# 这里特意不再做“只要数据库校验失败就回退本地配置密码”的粗暴兜底,
# 原因有两点:
# 1. 一旦数据库里已经存在该管理员账号,真实口令状态应当以数据库为准;
# 2. 如果继续无条件回退本地 config.toml就会出现“数据库已改密但本地默认密码仍然生效”
# 的安全问题,也会让弱密码提示误判。
#
# 兼容策略调整为:
# - 数据库可用且账号存在:只认数据库;
# - 数据库不可用,或数据库里根本没有这个账号:才回退旧配置模式。
login_ok = False
db_account_exists = False
if admin_db:
try:
db_account_exists = bool(admin_db.get_admin_by_username(username))
if db_account_exists:
login_ok = admin_db.verify_admin_password(username, password)
if login_ok:
admin_db.mark_login_success(username, request.remote_addr or "")
except Exception as e:
logger.error(f"数据库账号登录校验异常,回退配置模式: {e}")
login_ok = False
db_account_exists = False
if not login_ok and not db_account_exists:
login_ok = (username == server.username and password == server.password)
if login_ok:
server.clear_login_failures(username, remote_ip)
session['logged_in'] = True
session.permanent = True
session['username'] = username # 存储用户名到session
session['last_activity_at'] = time.time()
# 若账号仍在使用默认/弱口令,则登录后强提示修改密码。
session['force_password_change'] = bool(server.should_force_password_change(username))
logger.debug(f"Login successful. Session after login: {dict(session)}")
if _is_ajax_request():
return jsonify({"success": True, "redirect_url": url_for('main.index')})
return redirect(url_for('main.index'))
else:
lock_status = server.mark_login_failure(username, remote_ip)
if lock_status.get("locked"):
wait_seconds = int(lock_status.get("remaining_seconds") or 0)
error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试"
else:
remaining = max(0, int(server.get_auth_policy().get("max_failed_attempts", 5)) - int(lock_status.get("failed_count") or 0))
error = '用户名或密码错误'
if remaining > 0:
error = f"{error},再失败 {remaining} 次将暂时锁定"
if _is_ajax_request():
return jsonify({"success": False, "error": error}), 400
return render_template('login.html', error=error)
# 登出
@auth_bp.route('/logout')
def logout():
session.clear()
return redirect(url_for('auth.login'))
@auth_bp.route('/api/auth/security_status', methods=['GET'])
@login_required
def get_security_status():
"""返回当前登录管理员的安全状态。"""
server = current_app.dashboard_server
username = str(session.get("username", "") or "").strip()
# 安全状态这里改成“实时重算 + 回写 session”而不是只读登录瞬间写入的旧值
# 1. 避免管理员已经在数据库里改了密码,但当前会话仍保留旧的 force_password_change 标记;
# 2. 避免本地配置和数据库状态不一致时,前端一直弹出错误的弱密码提示。
force_password_change = bool(server.should_force_password_change(username)) if username else False
session["force_password_change"] = force_password_change
return jsonify({
"success": True,
"data": {
"force_password_change": force_password_change,
"session_timeout_minutes": int(server.get_auth_policy().get("session_timeout_minutes", 480)),
}
})
@auth_bp.route('/api/auth/change_password', methods=['POST'])
@login_required
def change_password():
"""修改当前登录管理员密码。
前端请求参数:
{
"old_password": "旧密码",
"new_password": "新密码",
"confirm_password": "确认新密码"
}
"""
server = current_app.dashboard_server
admin_db = getattr(server, "admin_account_db", None)
if not admin_db:
return jsonify({"success": False, "error": "账号数据库未初始化,无法修改密码"}), 500
payload = request.get_json(silent=True) or {}
old_password = str(payload.get("old_password", "") or "")
new_password = str(payload.get("new_password", "") or "")
confirm_password = str(payload.get("confirm_password", "") or "")
username = str(session.get("username", "") or "").strip()
if not username:
return jsonify({"success": False, "error": "会话失效,请重新登录"}), 401
if not old_password or not new_password or not confirm_password:
return jsonify({"success": False, "error": "请完整填写旧密码与新密码"}), 400
if new_password != confirm_password:
return jsonify({"success": False, "error": "两次输入的新密码不一致"}), 400
if new_password == old_password:
return jsonify({"success": False, "error": "新密码不能与旧密码相同"}), 400
password_error = admin_db.validate_password_strength(new_password)
if password_error:
return jsonify({"success": False, "error": password_error}), 400
try:
if not admin_db.verify_admin_password(username, old_password):
return jsonify({"success": False, "error": "旧密码错误"}), 400
updated = admin_db.update_password(username, new_password)
if not updated:
return jsonify({"success": False, "error": "密码更新失败,请稍后重试"}), 500
# 密码修改成功后,移除“必须修改密码”的会话提示。
session["force_password_change"] = False
return jsonify({"success": True, "message": "密码修改成功"})
except Exception as e:
logger.error(f"修改后台密码失败: username={username}, error={e}")
return jsonify({"success": False, "error": "密码修改失败,请检查日志"}), 500