Files
abot/admin/dashboard/blueprints/auth.py
Liu 342b4c0065 修复后台弱密码提示误判并恢复server.py编码
1. 修复数据库账号存在时仍回退 config.toml 判断,导致每次登录重复提示弱密码的问题。
2. 补齐默认管理员密码从旧配置迁移到数据库的同步逻辑,兼容历史部署。
3. 恢复 server.py 为可读 UTF-8 中文版本,并补充后台登录与弱密码判定的回归测试。
2026-05-01 10:49:38 +08:00

182 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
from functools import wraps
from flask import Blueprint, current_app, jsonify, redirect, render_template, request, session, url_for
from loguru import logger
# 创建认证蓝图。
auth_bp = Blueprint("auth", __name__)
def _is_ajax_request() -> bool:
"""判断当前请求是否由前端异步发起。"""
requested_with = str(request.headers.get("X-Requested-With", "") or "").strip().lower()
accept_header = str(request.headers.get("Accept", "") or "").strip().lower()
return requested_with == "xmlhttprequest" or "application/json" in accept_header
def login_required(f):
"""登录检查装饰器。"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get("logged_in"):
return redirect(url_for("auth.login"))
server = current_app.dashboard_server
session_timeout_seconds = int(server.get_auth_policy().get("session_timeout_minutes", 480) * 60)
last_activity_at = float(session.get("last_activity_at") or 0.0)
if last_activity_at > 0 and (time.time() - last_activity_at) > session_timeout_seconds:
# 会话超时后统一清理登录态,避免长期闲置会话继续可用。
session.clear()
return redirect(url_for("auth.login"))
session["last_activity_at"] = time.time()
return f(*args, **kwargs)
return decorated_function
@auth_bp.route("/login", methods=["GET", "POST"])
def login():
"""后台登录页及登录提交入口。"""
error = None
if request.method == "POST":
# 用户名做 strip避免首尾空格导致的误判密码保留原样避免改变真实口令。
username = str(request.form.get("username", "") or "").strip()
password = str(request.form.get("password", "") or "")
remote_ip = request.remote_addr or ""
server = current_app.dashboard_server
admin_db = getattr(server, "admin_account_db", None)
lock_status = server.get_login_lock_status(username, remote_ip)
if lock_status.get("locked"):
wait_seconds = int(lock_status.get("remaining_seconds") or 0)
error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试"
if _is_ajax_request():
return jsonify({"success": False, "error": error}), 429
return render_template("login.html", error=error)
# 认证边界说明:
# 1. 只要数据库里已经存在该管理员账号,就必须以数据库密码为准;
# 2. 否则用户在后台改密后config.toml 里的旧密码仍然可能继续生效;
# 3. 仅当数据库不可用,或者该账号尚未迁移进数据库时,才回退到旧配置模式。
login_ok = False
should_use_config_fallback = not bool(admin_db)
if admin_db:
try:
admin_row = admin_db.get_admin_by_username(username)
should_use_config_fallback = not bool(admin_row)
if admin_row and int(admin_row.get("status") or 0) == 1:
login_ok = admin_db.verify_password(password, str(admin_row.get("password_hash") or ""))
if login_ok:
admin_db.mark_login_success(username, remote_ip)
except Exception as e:
logger.error(f"数据库账号登录校验异常,回退配置模式: {e}")
login_ok = False
should_use_config_fallback = True
if not login_ok and should_use_config_fallback:
login_ok = username == server.username and password == server.password
if login_ok:
server.clear_login_failures(username, remote_ip)
session["logged_in"] = True
session.permanent = True
session["username"] = username
session["last_activity_at"] = time.time()
# 登录成功后再读取当前账号的安全状态,决定是否强制弹出改密提示。
session["force_password_change"] = bool(server.should_force_password_change(username))
logger.debug(f"Login successful. Session after login: {dict(session)}")
if _is_ajax_request():
return jsonify({"success": True, "redirect_url": url_for("main.index")})
return redirect(url_for("main.index"))
lock_status = server.mark_login_failure(username, remote_ip)
if lock_status.get("locked"):
wait_seconds = int(lock_status.get("remaining_seconds") or 0)
error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试"
else:
remaining = max(
0,
int(server.get_auth_policy().get("max_failed_attempts", 5)) - int(lock_status.get("failed_count") or 0),
)
error = "用户名或密码错误"
if remaining > 0:
error = f"{error},再失败 {remaining} 次将暂时锁定"
if _is_ajax_request():
return jsonify({"success": False, "error": error}), 400
return render_template("login.html", error=error)
@auth_bp.route("/logout")
def logout():
"""退出登录。"""
session.clear()
return redirect(url_for("auth.login"))
@auth_bp.route("/api/auth/security_status", methods=["GET"])
@login_required
def get_security_status():
"""返回当前登录管理员的安全状态。"""
return jsonify(
{
"success": True,
"data": {
"force_password_change": bool(session.get("force_password_change", False)),
"session_timeout_minutes": int(
current_app.dashboard_server.get_auth_policy().get("session_timeout_minutes", 480)
),
},
}
)
@auth_bp.route("/api/auth/change_password", methods=["POST"])
@login_required
def change_password():
"""修改当前登录管理员密码。"""
server = current_app.dashboard_server
admin_db = getattr(server, "admin_account_db", None)
if not admin_db:
return jsonify({"success": False, "error": "账号数据库未初始化,无法修改密码"}), 500
payload = request.get_json(silent=True) or {}
old_password = str(payload.get("old_password", "") or "")
new_password = str(payload.get("new_password", "") or "")
confirm_password = str(payload.get("confirm_password", "") or "")
username = str(session.get("username", "") or "").strip()
if not username:
return jsonify({"success": False, "error": "会话失效,请重新登录"}), 401
if not old_password or not new_password or not confirm_password:
return jsonify({"success": False, "error": "请完整填写旧密码与新密码"}), 400
if new_password != confirm_password:
return jsonify({"success": False, "error": "两次输入的新密码不一致"}), 400
if new_password == old_password:
return jsonify({"success": False, "error": "新密码不能与旧密码相同"}), 400
password_error = admin_db.validate_password_strength(new_password)
if password_error:
return jsonify({"success": False, "error": password_error}), 400
try:
if not admin_db.verify_admin_password(username, old_password):
return jsonify({"success": False, "error": "旧密码错误"}), 400
updated = admin_db.update_password(username, new_password)
if not updated:
return jsonify({"success": False, "error": "密码更新失败,请稍后重试"}), 500
# 改密成功后立刻移除“必须修改密码”的会话提示,避免用户当前会话继续被反复打断。
session["force_password_change"] = False
return jsonify({"success": True, "message": "密码修改成功"})
except Exception as e:
logger.error(f"修改后台密码失败: username={username}, error={e}")
return jsonify({"success": False, "error": "密码修改失败,请检查日志"}), 500