From cb99e94493fd99094a1ff40a6533c971296db2a9 Mon Sep 17 00:00:00 2001 From: liuwei Date: Thu, 30 Apr 2026 15:32:41 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=BC=BA=E5=90=8E=E5=8F=B0=E7=99=BB?= =?UTF-8?q?=E5=BD=95=E5=AE=89=E5=85=A8=E4=B8=8E=E5=AF=86=E7=A0=81=E7=AD=96?= =?UTF-8?q?=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- admin/dashboard/blueprints/auth.py | 69 +++++++++++-- admin/dashboard/server.py | 145 ++++++++++++++++++++++++++- admin/dashboard/templates/base.html | 59 ++++++++++- admin/dashboard/templates/login.html | 35 ++++--- db/admin_account_db.py | 53 +++++++++- docs/工程优化与Feature清单.md | 7 ++ 6 files changed, 342 insertions(+), 26 deletions(-) diff --git a/admin/dashboard/blueprints/auth.py b/admin/dashboard/blueprints/auth.py index 971ea84..659aedd 100644 --- a/admin/dashboard/blueprints/auth.py +++ b/admin/dashboard/blueprints/auth.py @@ -1,17 +1,33 @@ from flask import Blueprint, render_template, request, redirect, url_for, session, current_app, jsonify from functools import wraps from loguru import logger +import time # 创建认证蓝图 auth_bp = Blueprint('auth', __name__) +def _is_ajax_request() -> bool: + """判断当前请求是否由前端异步发起。""" + requested_with = str(request.headers.get("X-Requested-With", "") or "").strip().lower() + accept_header = str(request.headers.get("Accept", "") or "").strip().lower() + return requested_with == "xmlhttprequest" or "application/json" in accept_header + + # 登录检查装饰器 def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not session.get('logged_in'): return redirect(url_for('auth.login')) + server = current_app.dashboard_server + session_timeout_seconds = int(server.get_auth_policy().get("session_timeout_minutes", 480) * 60) + last_activity_at = float(session.get("last_activity_at") or 0.0) + if last_activity_at > 0 and (time.time() - last_activity_at) > session_timeout_seconds: + # 会话超时后统一清理登录态,避免长期闲置会话继续可用。 + session.clear() + return redirect(url_for('auth.login')) + session["last_activity_at"] = time.time() return f(*args, **kwargs) return decorated_function @@ -25,10 +41,18 @@ def login(): # 使用 strip 规避用户误输入首尾空格导致的误判。 username = str(request.form.get('username', '') or '').strip() password = str(request.form.get('password', '') or '') + remote_ip = request.remote_addr or "" # 从应用上下文获取服务器实例,而不是从蓝图对象 server = current_app.dashboard_server admin_db = getattr(server, "admin_account_db", None) + lock_status = server.get_login_lock_status(username, remote_ip) + if lock_status.get("locked"): + wait_seconds = int(lock_status.get("remaining_seconds") or 0) + error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试" + if _is_ajax_request(): + return jsonify({"success": False, "error": error}), 429 + return render_template('login.html', error=error) # 优先使用数据库账号体系鉴权;若不可用则回退旧配置模式,保证兼容存量部署。 login_ok = False @@ -45,12 +69,29 @@ def login(): login_ok = (username == server.username and password == server.password) if login_ok: + server.clear_login_failures(username, remote_ip) session['logged_in'] = True + session.permanent = True session['username'] = username # 存储用户名到session + session['last_activity_at'] = time.time() + # 若账号仍在使用默认/弱口令,则登录后强提示修改密码。 + session['force_password_change'] = bool(server.should_force_password_change(username)) logger.debug(f"Login successful. Session after login: {dict(session)}") + if _is_ajax_request(): + return jsonify({"success": True, "redirect_url": url_for('main.index')}) return redirect(url_for('main.index')) else: - error = '用户名或密码错误' + lock_status = server.mark_login_failure(username, remote_ip) + if lock_status.get("locked"): + wait_seconds = int(lock_status.get("remaining_seconds") or 0) + error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试" + else: + remaining = max(0, int(server.get_auth_policy().get("max_failed_attempts", 5)) - int(lock_status.get("failed_count") or 0)) + error = '用户名或密码错误' + if remaining > 0: + error = f"{error},再失败 {remaining} 次将暂时锁定" + if _is_ajax_request(): + return jsonify({"success": False, "error": error}), 400 return render_template('login.html', error=error) @@ -58,11 +99,23 @@ def login(): # 登出 @auth_bp.route('/logout') def logout(): - session.pop('logged_in', None) - session.pop('username', None) # 同时删除username + session.clear() return redirect(url_for('auth.login')) +@auth_bp.route('/api/auth/security_status', methods=['GET']) +@login_required +def get_security_status(): + """返回当前登录管理员的安全状态。""" + return jsonify({ + "success": True, + "data": { + "force_password_change": bool(session.get("force_password_change", False)), + "session_timeout_minutes": int(current_app.dashboard_server.get_auth_policy().get("session_timeout_minutes", 480)), + } + }) + + @auth_bp.route('/api/auth/change_password', methods=['POST']) @login_required def change_password(): @@ -95,13 +148,13 @@ def change_password(): if new_password != confirm_password: return jsonify({"success": False, "error": "两次输入的新密码不一致"}), 400 - # 密码长度做基础约束,避免过弱口令。 - if len(new_password) < 6: - return jsonify({"success": False, "error": "新密码长度不能少于6位"}), 400 - if new_password == old_password: return jsonify({"success": False, "error": "新密码不能与旧密码相同"}), 400 + password_error = admin_db.validate_password_strength(new_password) + if password_error: + return jsonify({"success": False, "error": password_error}), 400 + try: if not admin_db.verify_admin_password(username, old_password): return jsonify({"success": False, "error": "旧密码错误"}), 400 @@ -110,6 +163,8 @@ def change_password(): if not updated: return jsonify({"success": False, "error": "密码更新失败,请稍后重试"}), 500 + # 密码修改成功后,移除“必须修改密码”的会话提示。 + session["force_password_change"] = False return jsonify({"success": True, "message": "密码修改成功"}) except Exception as e: logger.error(f"修改后台密码失败: username={username}, error={e}") diff --git a/admin/dashboard/server.py b/admin/dashboard/server.py index 58190ad..40bf19f 100644 --- a/admin/dashboard/server.py +++ b/admin/dashboard/server.py @@ -5,6 +5,9 @@ import os import sys import threading +import time +import secrets +from datetime import timedelta import toml from flask import Flask, send_from_directory @@ -42,6 +45,12 @@ class DashboardServer: self.LOG = logger self.LOG.info(f"Dashboard配置加载完成: 服务器将运行在 {self.host}:{self.port}") + # 登录失败限流兜底缓存: + # 1. 优先尝试 Redis,但为了兼容 Redis 暂不可用的场景,这里保留进程内兜底; + # 2. 字典内容只保存短期登录失败窗口,不用于持久化; + # 3. 线程化 WSGI 会并发访问,因此需要显式加锁。 + self._auth_runtime_lock = threading.Lock() + self._auth_failures = {} # 如果提供了robot实例,则使用其对象 if robot_instance: self.db_manager = robot_instance.db_manager @@ -135,9 +144,30 @@ class DashboardServer: # 指定模板文件夹路径 template_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates') app = Flask(__name__, template_folder=template_folder) - app.secret_key = "stats_dashboard_secret_key" + auth_config = self.config.get("auth", {}) or {} + session_timeout_minutes = int(auth_config.get("session_timeout_minutes", 480) or 480) + cookie_secure = bool(auth_config.get("cookie_secure", False)) + configured_secret = str( + os.environ.get("ABOT_DASHBOARD_SECRET_KEY") + or auth_config.get("secret_key", "") + or "" + ).strip() + if configured_secret: + app.secret_key = configured_secret + else: + # 若未显式配置 secret_key,则每次进程启动生成随机值: + # 1. 这比固定硬编码密钥安全得多; + # 2. 代价是服务重启后旧 session 会失效,作为安全兜底是可接受的; + # 3. 同时输出 warning,提醒后续最好通过配置或环境变量固定注入。 + app.secret_key = secrets.token_hex(32) + self.LOG.warning("未配置 Dashboard secret_key,已使用进程级随机密钥,重启后现有登录会失效") + # 禁用模板缓存,使修改HTML文件后立即生效 False =重启才生效 app.config['TEMPLATES_AUTO_RELOAD'] = True + app.config['SESSION_COOKIE_HTTPONLY'] = True + app.config['SESSION_COOKIE_SAMESITE'] = str(auth_config.get("cookie_samesite", "Lax") or "Lax") + app.config['SESSION_COOKIE_SECURE'] = cookie_secure + app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(minutes=max(15, session_timeout_minutes)) # 设置Werkzeug日志级别为DEBUG import logging @@ -175,6 +205,119 @@ class DashboardServer: return app + def get_auth_policy(self) -> dict: + """读取后台认证策略配置。""" + auth_config = self.config.get("auth", {}) or {} + return { + "max_failed_attempts": max(3, int(auth_config.get("max_failed_attempts", 5) or 5)), + "lock_seconds": max(60, int(auth_config.get("lock_seconds", 900) or 900)), + "session_timeout_minutes": max(15, int(auth_config.get("session_timeout_minutes", 480) or 480)), + } + + @staticmethod + def _build_login_guard_key(username: str, remote_ip: str) -> str: + """构建登录限流键。""" + return f"{str(username or '').strip().lower()}::{str(remote_ip or '').strip() or 'unknown'}" + + def _get_login_failure_record(self, guard_key: str) -> dict: + """读取登录失败记录,优先 Redis,失败时回退到进程内缓存。""" + try: + redis_conn = self.db_manager.get_redis_connection() + raw_value = redis_conn.get(f"dashboard:auth:fail:{guard_key}") + if raw_value: + parts = str(raw_value).split("|", 1) + return { + "count": int(parts[0] or 0), + "first_failed_at": float(parts[1] or 0) if len(parts) > 1 else 0.0, + } + except Exception: + pass + + with self._auth_runtime_lock: + record = dict(self._auth_failures.get(guard_key, {}) or {}) + expire_at = float(record.get("expire_at") or 0.0) + if expire_at > 0 and expire_at <= time.time(): + self._auth_failures.pop(guard_key, None) + record = {} + return { + "count": int(record.get("count") or 0), + "first_failed_at": float(record.get("first_failed_at") or 0.0), + } + + def _save_login_failure_record(self, guard_key: str, count: int, first_failed_at: float, ttl_seconds: int) -> None: + """保存登录失败记录,优先 Redis,失败时回退到进程内缓存。""" + payload = f"{int(count)}|{float(first_failed_at)}" + try: + redis_conn = self.db_manager.get_redis_connection() + redis_conn.setex(f"dashboard:auth:fail:{guard_key}", ttl_seconds, payload) + return + except Exception: + pass + + expire_at = time.time() + ttl_seconds + with self._auth_runtime_lock: + self._auth_failures[guard_key] = { + "count": int(count), + "first_failed_at": float(first_failed_at), + "expire_at": expire_at, + } + + def clear_login_failures(self, username: str, remote_ip: str) -> None: + """清理登录失败记录。""" + guard_key = self._build_login_guard_key(username, remote_ip) + try: + redis_conn = self.db_manager.get_redis_connection() + redis_conn.delete(f"dashboard:auth:fail:{guard_key}") + except Exception: + pass + + with self._auth_runtime_lock: + self._auth_failures.pop(guard_key, None) + + def get_login_lock_status(self, username: str, remote_ip: str) -> dict: + """获取当前账号/IP 组合的登录锁定状态。""" + policy = self.get_auth_policy() + record = self._get_login_failure_record(self._build_login_guard_key(username, remote_ip)) + count = int(record.get("count") or 0) + first_failed_at = float(record.get("first_failed_at") or 0.0) + if count < policy["max_failed_attempts"] or first_failed_at <= 0: + return {"locked": False, "remaining_seconds": 0, "failed_count": count} + + elapsed = max(0, int(time.time() - first_failed_at)) + remaining_seconds = max(0, policy["lock_seconds"] - elapsed) + if remaining_seconds <= 0: + self.clear_login_failures(username, remote_ip) + return {"locked": False, "remaining_seconds": 0, "failed_count": 0} + return {"locked": True, "remaining_seconds": remaining_seconds, "failed_count": count} + + def mark_login_failure(self, username: str, remote_ip: str) -> dict: + """记录一次登录失败,并返回最新锁定状态。""" + policy = self.get_auth_policy() + guard_key = self._build_login_guard_key(username, remote_ip) + record = self._get_login_failure_record(guard_key) + count = int(record.get("count") or 0) + first_failed_at = float(record.get("first_failed_at") or 0.0) + now_ts = time.time() + if first_failed_at <= 0: + first_failed_at = now_ts + count += 1 + self._save_login_failure_record(guard_key, count, first_failed_at, policy["lock_seconds"]) + return self.get_login_lock_status(username, remote_ip) + + def should_force_password_change(self, username: str) -> bool: + """判断当前管理员是否应该被强制提示修改密码。""" + admin_db = getattr(self, "admin_account_db", None) + if admin_db and admin_db.is_using_risky_password(username): + return True + + # 数据库体系不可用时,再回退配置值判断,至少把默认 admin/admin123 识别出来。 + fallback_username = str(self.username or "").strip() + fallback_password = str(self.password or "").strip() + return ( + str(username or "").strip() == fallback_username + and fallback_password in getattr(admin_db, "RISKY_PASSWORDS", {"admin123", "admin"}) + ) + def _register_blueprints(self, app): """注册所有蓝图""" # 在函数内部导入蓝图,避免循环导入 diff --git a/admin/dashboard/templates/base.html b/admin/dashboard/templates/base.html index 28aa342..eca0f07 100644 --- a/admin/dashboard/templates/base.html +++ b/admin/dashboard/templates/base.html @@ -929,6 +929,9 @@ :visible.sync="passwordDialogVisible" width="460px" :close-on-click-modal="false" + :close-on-press-escape="!passwordDialogLocked" + :show-close="!passwordDialogLocked" + :before-close="handlePasswordDialogBeforeClose" custom-class="password-dialog"> - +
- 提示:修改成功后将立即生效,建议使用强密码(字母、数字、符号组合)。 + 提示:修改成功后将立即生效,建议使用强密码(至少 8 位,且包含两类以上字符)。
- 取消 + 取消 确认修改 @@ -1043,6 +1046,7 @@ navGroups: NAV_GROUPS, // 账号密码修改弹窗状态。 passwordDialogVisible: false, + passwordDialogLocked: false, passwordSubmitting: false, passwordForm: { old_password: '', @@ -1055,7 +1059,22 @@ ], new_password: [ { required: true, message: '请输入新密码', trigger: 'blur' }, - { min: 6, message: '新密码长度至少6位', trigger: 'blur' } + { min: 8, message: '新密码长度至少8位', trigger: 'blur' }, + { + validator: function(rule, value, callback) { + const passwordText = String(value || ''); + let score = 0; + if (/[A-Za-z]/.test(passwordText)) score += 1; + if (/\d/.test(passwordText)) score += 1; + if (/[^A-Za-z0-9]/.test(passwordText)) score += 1; + if (passwordText && score < 2) { + callback(new Error('新密码需至少包含字母、数字、符号中的两类')); + return; + } + callback(); + }, + trigger: 'blur' + } ], confirm_password: [ { required: true, message: '请再次输入新密码', trigger: 'blur' }, @@ -1094,6 +1113,7 @@ }, mounted() { document.querySelector('.app-container').classList.add('loaded'); + this.loadSecurityStatus(); }, methods: { normalizePath(path) { @@ -1149,6 +1169,29 @@ window.location.href = '/logout'; }); }, + loadSecurityStatus() { + // 登录后主动拉一次账号安全状态: + // 1. 若仍在使用默认/弱口令,这里会强制打开改密弹窗; + // 2. 这样用户不需要额外摸索入口,首次进入后台就能完成风险收敛。 + axios.get('/api/auth/security_status') + .then((response) => { + const data = (response.data || {}).data || {}; + if (data.force_password_change) { + this.passwordDialogLocked = true; + this.openPasswordDialog(); + this.$alert('当前后台账号仍在使用默认或弱密码,请先修改密码后再继续操作。', '安全提示', { + confirmButtonText: '去修改', + showClose: false, + closeOnClickModal: false, + closeOnPressEscape: false, + type: 'warning' + }).catch(() => {}); + } + }) + .catch(() => { + // 安全状态获取失败时不阻塞页面使用,避免偶发接口异常影响整体后台。 + }); + }, openPasswordDialog() { // 打开弹窗前重置表单,避免上次输入残留。 this.passwordDialogVisible = true; @@ -1164,6 +1207,13 @@ } }); }, + handlePasswordDialogBeforeClose(done) { + if (this.passwordDialogLocked) { + this.$message.warning('当前账号需要先完成密码修改,暂时不能关闭该弹窗。'); + return; + } + done(); + }, submitPasswordChange() { if (!this.$refs.passwordFormRef) { return; @@ -1182,6 +1232,7 @@ } this.$message.success(data.message || '密码修改成功'); this.passwordDialogVisible = false; + this.passwordDialogLocked = false; }) .catch((error) => { let errorMsg = '修改密码失败,请稍后重试'; diff --git a/admin/dashboard/templates/login.html b/admin/dashboard/templates/login.html index 1f0d845..843918f 100644 --- a/admin/dashboard/templates/login.html +++ b/admin/dashboard/templates/login.html @@ -312,7 +312,7 @@ password: [{ required: true, message: '请输入密码', trigger: 'blur' }] }, loading: false, - errorMessage: '' + errorMessage: {{ (error or '')|tojson }} } }, methods: { @@ -323,23 +323,32 @@ const formData = new FormData(); formData.append('username', this.loginForm.username); formData.append('password', this.loginForm.password); - fetch('/login', { method: 'POST', body: formData }) + axios.post('/login', formData, { + headers: { + 'X-Requested-With': 'XMLHttpRequest', + 'Accept': 'application/json' + } + }) .then(response => { - if (response.redirected) { - window.location.href = response.url; - } else { - return response.text(); - } - }) - .then(html => { - if (html) { - this.errorMessage = '用户名或密码错误'; - this.loading = false; + const data = response.data || {}; + if (data.success && data.redirect_url) { + window.location.href = data.redirect_url; + return; } + this.errorMessage = data.error || '登录失败,请稍后重试'; + this.loading = false; }) .catch(error => { console.error('登录出错:', error); - this.errorMessage = '登录请求失败,请稍后重试'; + let errorMsg = '登录请求失败,请稍后重试'; + try { + if (error && error.response && error.response.data && error.response.data.error) { + errorMsg = error.response.data.error; + } + } catch (e) { + // 读取错误详情失败时保留默认提示即可。 + } + this.errorMessage = errorMsg; this.loading = false; }); } diff --git a/db/admin_account_db.py b/db/admin_account_db.py index d4ae748..86b6a17 100644 --- a/db/admin_account_db.py +++ b/db/admin_account_db.py @@ -11,6 +11,7 @@ import base64 import hashlib import hmac +import re import secrets from typing import Any, Dict, Optional @@ -24,6 +25,19 @@ class AdminAccountDBOperator(BaseDBOperator): HASH_SCHEME = "pbkdf2_sha256" # PBKDF2 迭代次数:在安全性与计算开销之间做平衡。 HASH_ITERATIONS = 150_000 + # 风险口令清单: + # 1. 这里优先覆盖系统默认口令和常见极弱口令; + # 2. 后台安全判断只需要识别“明显危险”的情况,不追求做成完整密码字典; + # 3. 统一放在数据层,便于登录鉴权、修改密码、首登提醒共用。 + RISKY_PASSWORDS = { + "admin", + "admin123", + "123456", + "12345678", + "password", + "qwerty", + "abot123", + } def init_tables(self) -> bool: """初始化后台管理员表。 @@ -120,6 +134,44 @@ class AdminAccountDBOperator(BaseDBOperator): (password_hash, str(username or "").strip()), ) + def is_using_risky_password(self, username: str) -> bool: + """判断指定管理员是否仍在使用风险口令。""" + row = self.get_admin_by_username(username) + if not row: + return False + stored_hash = str(row.get("password_hash") or "") + if not stored_hash: + return False + + # 口令是哈希存储的,因此只能把风险候选集逐个比对。 + for candidate in self.RISKY_PASSWORDS: + if self.verify_password(candidate, stored_hash): + return True + return False + + @classmethod + def validate_password_strength(cls, raw_password: str) -> Optional[str]: + """校验密码强度,返回错误提示;通过时返回 None。""" + password_text = str(raw_password or "") + if len(password_text) < 8: + return "新密码长度不能少于8位" + + if password_text.lower() in cls.RISKY_PASSWORDS: + return "新密码过于简单,请避免使用默认口令或常见弱口令" + + score = 0 + if re.search(r"[A-Za-z]", password_text): + score += 1 + if re.search(r"\d", password_text): + score += 1 + if re.search(r"[^A-Za-z0-9]", password_text): + score += 1 + + # 至少满足两类字符,既兼顾安全性,也避免把规则设得过于苛刻。 + if score < 2: + return "新密码需至少包含字母、数字、符号中的两类" + return None + @classmethod def hash_password(cls, raw_password: str) -> str: """生成口令哈希。 @@ -163,4 +215,3 @@ class AdminAccountDBOperator(BaseDBOperator): return hmac.compare_digest(actual_digest, expected_digest) except Exception: return False - diff --git a/docs/工程优化与Feature清单.md b/docs/工程优化与Feature清单.md index 7667e75..d9977aa 100644 --- a/docs/工程优化与Feature清单.md +++ b/docs/工程优化与Feature清单.md @@ -20,6 +20,7 @@ - 已在后台首页补充“系统健康快照”,可集中查看机器人连接、插件运行、近 24 小时异常与 md2img 运行状态 - 已补充 MySQL / Redis 连接探测与统一 LLM 最近调用快照,基础设施与 AI 运行态可直接在首页查看 - 已将 `trace_id` 通过异步上下文继续贯穿到统一 LLM 调用与微信发送动作,链路追踪粒度进一步提升 +- 已补充后台登录失败限流、会话超时、默认弱口令强提醒与密码复杂度校验,后台安全基线进一步收紧 ## 2. 项目现状判断 @@ -260,6 +261,12 @@ - 提高后台管理面的安全基线 +当前进展: + +- 第一阶段已完成:已补充登录失败限流、会话超时、安全 Cookie 与动态 secret_key 兜底 +- 第二阶段已完成:已补充默认弱口令识别、登录后强制改密提示与密码复杂度校验 +- 后续可继续补充关键操作审计日志与更细粒度的管理员行为追踪 + 建议内容: - 首次部署强制修改默认管理员密码