from flask import Blueprint, render_template, request, redirect, url_for, session, current_app, jsonify from functools import wraps from loguru import logger import time # 创建认证蓝图 auth_bp = Blueprint('auth', __name__) def _is_ajax_request() -> bool: """判断当前请求是否由前端异步发起。""" requested_with = str(request.headers.get("X-Requested-With", "") or "").strip().lower() accept_header = str(request.headers.get("Accept", "") or "").strip().lower() return requested_with == "xmlhttprequest" or "application/json" in accept_header # 登录检查装饰器 def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not session.get('logged_in'): return redirect(url_for('auth.login')) server = current_app.dashboard_server session_timeout_seconds = int(server.get_auth_policy().get("session_timeout_minutes", 480) * 60) last_activity_at = float(session.get("last_activity_at") or 0.0) if last_activity_at > 0 and (time.time() - last_activity_at) > session_timeout_seconds: # 会话超时后统一清理登录态,避免长期闲置会话继续可用。 session.clear() return redirect(url_for('auth.login')) session["last_activity_at"] = time.time() return f(*args, **kwargs) return decorated_function # 登录页面 @auth_bp.route('/login', methods=['GET', 'POST']) def login(): error = None if request.method == 'POST': # 使用 strip 规避用户误输入首尾空格导致的误判。 username = str(request.form.get('username', '') or '').strip() password = str(request.form.get('password', '') or '') remote_ip = request.remote_addr or "" # 从应用上下文获取服务器实例,而不是从蓝图对象 server = current_app.dashboard_server admin_db = getattr(server, "admin_account_db", None) lock_status = server.get_login_lock_status(username, remote_ip) if lock_status.get("locked"): wait_seconds = int(lock_status.get("remaining_seconds") or 0) error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试" if _is_ajax_request(): return jsonify({"success": False, "error": error}), 429 return render_template('login.html', error=error) # 优先使用数据库账号体系鉴权。 # # 这里特意不再做“只要数据库校验失败就回退本地配置密码”的粗暴兜底, # 原因有两点: # 1. 一旦数据库里已经存在该管理员账号,真实口令状态应当以数据库为准; # 2. 如果继续无条件回退本地 config.toml,就会出现“数据库已改密,但本地默认密码仍然生效” # 的安全问题,也会让弱密码提示误判。 # # 兼容策略调整为: # - 数据库可用且账号存在:只认数据库; # - 数据库不可用,或数据库里根本没有这个账号:才回退旧配置模式。 login_ok = False db_account_exists = False if admin_db: try: db_account_exists = bool(admin_db.get_admin_by_username(username)) if db_account_exists: login_ok = admin_db.verify_admin_password(username, password) if login_ok: admin_db.mark_login_success(username, request.remote_addr or "") except Exception as e: logger.error(f"数据库账号登录校验异常,回退配置模式: {e}") login_ok = False db_account_exists = False if not login_ok and not db_account_exists: login_ok = (username == server.username and password == server.password) if login_ok: server.clear_login_failures(username, remote_ip) session['logged_in'] = True session.permanent = True session['username'] = username # 存储用户名到session session['last_activity_at'] = time.time() # 若账号仍在使用默认/弱口令,则登录后强提示修改密码。 session['force_password_change'] = bool(server.should_force_password_change(username)) logger.debug(f"Login successful. Session after login: {dict(session)}") if _is_ajax_request(): return jsonify({"success": True, "redirect_url": url_for('main.index')}) return redirect(url_for('main.index')) else: lock_status = server.mark_login_failure(username, remote_ip) if lock_status.get("locked"): wait_seconds = int(lock_status.get("remaining_seconds") or 0) error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试" else: remaining = max(0, int(server.get_auth_policy().get("max_failed_attempts", 5)) - int(lock_status.get("failed_count") or 0)) error = '用户名或密码错误' if remaining > 0: error = f"{error},再失败 {remaining} 次将暂时锁定" if _is_ajax_request(): return jsonify({"success": False, "error": error}), 400 return render_template('login.html', error=error) # 登出 @auth_bp.route('/logout') def logout(): session.clear() return redirect(url_for('auth.login')) @auth_bp.route('/api/auth/security_status', methods=['GET']) @login_required def get_security_status(): """返回当前登录管理员的安全状态。""" server = current_app.dashboard_server username = str(session.get("username", "") or "").strip() # 安全状态这里改成“实时重算 + 回写 session”,而不是只读登录瞬间写入的旧值: # 1. 避免管理员已经在数据库里改了密码,但当前会话仍保留旧的 force_password_change 标记; # 2. 避免本地配置和数据库状态不一致时,前端一直弹出错误的弱密码提示。 force_password_change = bool(server.should_force_password_change(username)) if username else False session["force_password_change"] = force_password_change return jsonify({ "success": True, "data": { "force_password_change": force_password_change, "session_timeout_minutes": int(server.get_auth_policy().get("session_timeout_minutes", 480)), } }) @auth_bp.route('/api/auth/change_password', methods=['POST']) @login_required def change_password(): """修改当前登录管理员密码。 前端请求参数: { "old_password": "旧密码", "new_password": "新密码", "confirm_password": "确认新密码" } """ server = current_app.dashboard_server admin_db = getattr(server, "admin_account_db", None) if not admin_db: return jsonify({"success": False, "error": "账号数据库未初始化,无法修改密码"}), 500 payload = request.get_json(silent=True) or {} old_password = str(payload.get("old_password", "") or "") new_password = str(payload.get("new_password", "") or "") confirm_password = str(payload.get("confirm_password", "") or "") username = str(session.get("username", "") or "").strip() if not username: return jsonify({"success": False, "error": "会话失效,请重新登录"}), 401 if not old_password or not new_password or not confirm_password: return jsonify({"success": False, "error": "请完整填写旧密码与新密码"}), 400 if new_password != confirm_password: return jsonify({"success": False, "error": "两次输入的新密码不一致"}), 400 if new_password == old_password: return jsonify({"success": False, "error": "新密码不能与旧密码相同"}), 400 password_error = admin_db.validate_password_strength(new_password) if password_error: return jsonify({"success": False, "error": password_error}), 400 try: if not admin_db.verify_admin_password(username, old_password): return jsonify({"success": False, "error": "旧密码错误"}), 400 updated = admin_db.update_password(username, new_password) if not updated: return jsonify({"success": False, "error": "密码更新失败,请稍后重试"}), 500 # 密码修改成功后,移除“必须修改密码”的会话提示。 session["force_password_change"] = False return jsonify({"success": True, "message": "密码修改成功"}) except Exception as e: logger.error(f"修改后台密码失败: username={username}, error={e}") return jsonify({"success": False, "error": "密码修改失败,请检查日志"}), 500