diff --git a/admin/dashboard/blueprints/auth.py b/admin/dashboard/blueprints/auth.py
index 971ea84..659aedd 100644
--- a/admin/dashboard/blueprints/auth.py
+++ b/admin/dashboard/blueprints/auth.py
@@ -1,17 +1,33 @@
from flask import Blueprint, render_template, request, redirect, url_for, session, current_app, jsonify
from functools import wraps
from loguru import logger
+import time
# 创建认证蓝图
auth_bp = Blueprint('auth', __name__)
+def _is_ajax_request() -> bool:
+ """判断当前请求是否由前端异步发起。"""
+ requested_with = str(request.headers.get("X-Requested-With", "") or "").strip().lower()
+ accept_header = str(request.headers.get("Accept", "") or "").strip().lower()
+ return requested_with == "xmlhttprequest" or "application/json" in accept_header
+
+
# 登录检查装饰器
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
return redirect(url_for('auth.login'))
+ server = current_app.dashboard_server
+ session_timeout_seconds = int(server.get_auth_policy().get("session_timeout_minutes", 480) * 60)
+ last_activity_at = float(session.get("last_activity_at") or 0.0)
+ if last_activity_at > 0 and (time.time() - last_activity_at) > session_timeout_seconds:
+ # 会话超时后统一清理登录态,避免长期闲置会话继续可用。
+ session.clear()
+ return redirect(url_for('auth.login'))
+ session["last_activity_at"] = time.time()
return f(*args, **kwargs)
return decorated_function
@@ -25,10 +41,18 @@ def login():
# 使用 strip 规避用户误输入首尾空格导致的误判。
username = str(request.form.get('username', '') or '').strip()
password = str(request.form.get('password', '') or '')
+ remote_ip = request.remote_addr or ""
# 从应用上下文获取服务器实例,而不是从蓝图对象
server = current_app.dashboard_server
admin_db = getattr(server, "admin_account_db", None)
+ lock_status = server.get_login_lock_status(username, remote_ip)
+ if lock_status.get("locked"):
+ wait_seconds = int(lock_status.get("remaining_seconds") or 0)
+ error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试"
+ if _is_ajax_request():
+ return jsonify({"success": False, "error": error}), 429
+ return render_template('login.html', error=error)
# 优先使用数据库账号体系鉴权;若不可用则回退旧配置模式,保证兼容存量部署。
login_ok = False
@@ -45,12 +69,29 @@ def login():
login_ok = (username == server.username and password == server.password)
if login_ok:
+ server.clear_login_failures(username, remote_ip)
session['logged_in'] = True
+ session.permanent = True
session['username'] = username # 存储用户名到session
+ session['last_activity_at'] = time.time()
+ # 若账号仍在使用默认/弱口令,则登录后强提示修改密码。
+ session['force_password_change'] = bool(server.should_force_password_change(username))
logger.debug(f"Login successful. Session after login: {dict(session)}")
+ if _is_ajax_request():
+ return jsonify({"success": True, "redirect_url": url_for('main.index')})
return redirect(url_for('main.index'))
else:
- error = '用户名或密码错误'
+ lock_status = server.mark_login_failure(username, remote_ip)
+ if lock_status.get("locked"):
+ wait_seconds = int(lock_status.get("remaining_seconds") or 0)
+ error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试"
+ else:
+ remaining = max(0, int(server.get_auth_policy().get("max_failed_attempts", 5)) - int(lock_status.get("failed_count") or 0))
+ error = '用户名或密码错误'
+ if remaining > 0:
+ error = f"{error},再失败 {remaining} 次将暂时锁定"
+ if _is_ajax_request():
+ return jsonify({"success": False, "error": error}), 400
return render_template('login.html', error=error)
@@ -58,11 +99,23 @@ def login():
# 登出
@auth_bp.route('/logout')
def logout():
- session.pop('logged_in', None)
- session.pop('username', None) # 同时删除username
+ session.clear()
return redirect(url_for('auth.login'))
+@auth_bp.route('/api/auth/security_status', methods=['GET'])
+@login_required
+def get_security_status():
+ """返回当前登录管理员的安全状态。"""
+ return jsonify({
+ "success": True,
+ "data": {
+ "force_password_change": bool(session.get("force_password_change", False)),
+ "session_timeout_minutes": int(current_app.dashboard_server.get_auth_policy().get("session_timeout_minutes", 480)),
+ }
+ })
+
+
@auth_bp.route('/api/auth/change_password', methods=['POST'])
@login_required
def change_password():
@@ -95,13 +148,13 @@ def change_password():
if new_password != confirm_password:
return jsonify({"success": False, "error": "两次输入的新密码不一致"}), 400
- # 密码长度做基础约束,避免过弱口令。
- if len(new_password) < 6:
- return jsonify({"success": False, "error": "新密码长度不能少于6位"}), 400
-
if new_password == old_password:
return jsonify({"success": False, "error": "新密码不能与旧密码相同"}), 400
+ password_error = admin_db.validate_password_strength(new_password)
+ if password_error:
+ return jsonify({"success": False, "error": password_error}), 400
+
try:
if not admin_db.verify_admin_password(username, old_password):
return jsonify({"success": False, "error": "旧密码错误"}), 400
@@ -110,6 +163,8 @@ def change_password():
if not updated:
return jsonify({"success": False, "error": "密码更新失败,请稍后重试"}), 500
+ # 密码修改成功后,移除“必须修改密码”的会话提示。
+ session["force_password_change"] = False
return jsonify({"success": True, "message": "密码修改成功"})
except Exception as e:
logger.error(f"修改后台密码失败: username={username}, error={e}")
diff --git a/admin/dashboard/server.py b/admin/dashboard/server.py
index 58190ad..40bf19f 100644
--- a/admin/dashboard/server.py
+++ b/admin/dashboard/server.py
@@ -5,6 +5,9 @@
import os
import sys
import threading
+import time
+import secrets
+from datetime import timedelta
import toml
from flask import Flask, send_from_directory
@@ -42,6 +45,12 @@ class DashboardServer:
self.LOG = logger
self.LOG.info(f"Dashboard配置加载完成: 服务器将运行在 {self.host}:{self.port}")
+ # 登录失败限流兜底缓存:
+ # 1. 优先尝试 Redis,但为了兼容 Redis 暂不可用的场景,这里保留进程内兜底;
+ # 2. 字典内容只保存短期登录失败窗口,不用于持久化;
+ # 3. 线程化 WSGI 会并发访问,因此需要显式加锁。
+ self._auth_runtime_lock = threading.Lock()
+ self._auth_failures = {}
# 如果提供了robot实例,则使用其对象
if robot_instance:
self.db_manager = robot_instance.db_manager
@@ -135,9 +144,30 @@ class DashboardServer:
# 指定模板文件夹路径
template_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')
app = Flask(__name__, template_folder=template_folder)
- app.secret_key = "stats_dashboard_secret_key"
+ auth_config = self.config.get("auth", {}) or {}
+ session_timeout_minutes = int(auth_config.get("session_timeout_minutes", 480) or 480)
+ cookie_secure = bool(auth_config.get("cookie_secure", False))
+ configured_secret = str(
+ os.environ.get("ABOT_DASHBOARD_SECRET_KEY")
+ or auth_config.get("secret_key", "")
+ or ""
+ ).strip()
+ if configured_secret:
+ app.secret_key = configured_secret
+ else:
+ # 若未显式配置 secret_key,则每次进程启动生成随机值:
+ # 1. 这比固定硬编码密钥安全得多;
+ # 2. 代价是服务重启后旧 session 会失效,作为安全兜底是可接受的;
+ # 3. 同时输出 warning,提醒后续最好通过配置或环境变量固定注入。
+ app.secret_key = secrets.token_hex(32)
+ self.LOG.warning("未配置 Dashboard secret_key,已使用进程级随机密钥,重启后现有登录会失效")
+
# 禁用模板缓存,使修改HTML文件后立即生效 False =重启才生效
app.config['TEMPLATES_AUTO_RELOAD'] = True
+ app.config['SESSION_COOKIE_HTTPONLY'] = True
+ app.config['SESSION_COOKIE_SAMESITE'] = str(auth_config.get("cookie_samesite", "Lax") or "Lax")
+ app.config['SESSION_COOKIE_SECURE'] = cookie_secure
+ app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(minutes=max(15, session_timeout_minutes))
# 设置Werkzeug日志级别为DEBUG
import logging
@@ -175,6 +205,119 @@ class DashboardServer:
return app
+ def get_auth_policy(self) -> dict:
+ """读取后台认证策略配置。"""
+ auth_config = self.config.get("auth", {}) or {}
+ return {
+ "max_failed_attempts": max(3, int(auth_config.get("max_failed_attempts", 5) or 5)),
+ "lock_seconds": max(60, int(auth_config.get("lock_seconds", 900) or 900)),
+ "session_timeout_minutes": max(15, int(auth_config.get("session_timeout_minutes", 480) or 480)),
+ }
+
+ @staticmethod
+ def _build_login_guard_key(username: str, remote_ip: str) -> str:
+ """构建登录限流键。"""
+ return f"{str(username or '').strip().lower()}::{str(remote_ip or '').strip() or 'unknown'}"
+
+ def _get_login_failure_record(self, guard_key: str) -> dict:
+ """读取登录失败记录,优先 Redis,失败时回退到进程内缓存。"""
+ try:
+ redis_conn = self.db_manager.get_redis_connection()
+ raw_value = redis_conn.get(f"dashboard:auth:fail:{guard_key}")
+ if raw_value:
+ parts = str(raw_value).split("|", 1)
+ return {
+ "count": int(parts[0] or 0),
+ "first_failed_at": float(parts[1] or 0) if len(parts) > 1 else 0.0,
+ }
+ except Exception:
+ pass
+
+ with self._auth_runtime_lock:
+ record = dict(self._auth_failures.get(guard_key, {}) or {})
+ expire_at = float(record.get("expire_at") or 0.0)
+ if expire_at > 0 and expire_at <= time.time():
+ self._auth_failures.pop(guard_key, None)
+ record = {}
+ return {
+ "count": int(record.get("count") or 0),
+ "first_failed_at": float(record.get("first_failed_at") or 0.0),
+ }
+
+ def _save_login_failure_record(self, guard_key: str, count: int, first_failed_at: float, ttl_seconds: int) -> None:
+ """保存登录失败记录,优先 Redis,失败时回退到进程内缓存。"""
+ payload = f"{int(count)}|{float(first_failed_at)}"
+ try:
+ redis_conn = self.db_manager.get_redis_connection()
+ redis_conn.setex(f"dashboard:auth:fail:{guard_key}", ttl_seconds, payload)
+ return
+ except Exception:
+ pass
+
+ expire_at = time.time() + ttl_seconds
+ with self._auth_runtime_lock:
+ self._auth_failures[guard_key] = {
+ "count": int(count),
+ "first_failed_at": float(first_failed_at),
+ "expire_at": expire_at,
+ }
+
+ def clear_login_failures(self, username: str, remote_ip: str) -> None:
+ """清理登录失败记录。"""
+ guard_key = self._build_login_guard_key(username, remote_ip)
+ try:
+ redis_conn = self.db_manager.get_redis_connection()
+ redis_conn.delete(f"dashboard:auth:fail:{guard_key}")
+ except Exception:
+ pass
+
+ with self._auth_runtime_lock:
+ self._auth_failures.pop(guard_key, None)
+
+ def get_login_lock_status(self, username: str, remote_ip: str) -> dict:
+ """获取当前账号/IP 组合的登录锁定状态。"""
+ policy = self.get_auth_policy()
+ record = self._get_login_failure_record(self._build_login_guard_key(username, remote_ip))
+ count = int(record.get("count") or 0)
+ first_failed_at = float(record.get("first_failed_at") or 0.0)
+ if count < policy["max_failed_attempts"] or first_failed_at <= 0:
+ return {"locked": False, "remaining_seconds": 0, "failed_count": count}
+
+ elapsed = max(0, int(time.time() - first_failed_at))
+ remaining_seconds = max(0, policy["lock_seconds"] - elapsed)
+ if remaining_seconds <= 0:
+ self.clear_login_failures(username, remote_ip)
+ return {"locked": False, "remaining_seconds": 0, "failed_count": 0}
+ return {"locked": True, "remaining_seconds": remaining_seconds, "failed_count": count}
+
+ def mark_login_failure(self, username: str, remote_ip: str) -> dict:
+ """记录一次登录失败,并返回最新锁定状态。"""
+ policy = self.get_auth_policy()
+ guard_key = self._build_login_guard_key(username, remote_ip)
+ record = self._get_login_failure_record(guard_key)
+ count = int(record.get("count") or 0)
+ first_failed_at = float(record.get("first_failed_at") or 0.0)
+ now_ts = time.time()
+ if first_failed_at <= 0:
+ first_failed_at = now_ts
+ count += 1
+ self._save_login_failure_record(guard_key, count, first_failed_at, policy["lock_seconds"])
+ return self.get_login_lock_status(username, remote_ip)
+
+ def should_force_password_change(self, username: str) -> bool:
+ """判断当前管理员是否应该被强制提示修改密码。"""
+ admin_db = getattr(self, "admin_account_db", None)
+ if admin_db and admin_db.is_using_risky_password(username):
+ return True
+
+ # 数据库体系不可用时,再回退配置值判断,至少把默认 admin/admin123 识别出来。
+ fallback_username = str(self.username or "").strip()
+ fallback_password = str(self.password or "").strip()
+ return (
+ str(username or "").strip() == fallback_username
+ and fallback_password in getattr(admin_db, "RISKY_PASSWORDS", {"admin123", "admin"})
+ )
+
def _register_blueprints(self, app):
"""注册所有蓝图"""
# 在函数内部导入蓝图,避免循环导入
diff --git a/admin/dashboard/templates/base.html b/admin/dashboard/templates/base.html
index 28aa342..eca0f07 100644
--- a/admin/dashboard/templates/base.html
+++ b/admin/dashboard/templates/base.html
@@ -929,6 +929,9 @@
:visible.sync="passwordDialogVisible"
width="460px"
:close-on-click-modal="false"
+ :close-on-press-escape="!passwordDialogLocked"
+ :show-close="!passwordDialogLocked"
+ :before-close="handlePasswordDialogBeforeClose"
custom-class="password-dialog">