From 22c871105a2341666404392261d1f6c7eb9c45ce Mon Sep 17 00:00:00 2001 From: Liu Date: Fri, 1 May 2026 12:45:35 +0800 Subject: [PATCH] =?UTF-8?q?Revert=20"=E4=BF=AE=E5=A4=8D=E5=90=8E=E5=8F=B0?= =?UTF-8?q?=E5=BC=B1=E5=AF=86=E7=A0=81=E6=8F=90=E7=A4=BA=E8=AF=AF=E5=88=A4?= =?UTF-8?q?=E5=B9=B6=E6=81=A2=E5=A4=8Dserver.py=E7=BC=96=E7=A0=81"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 342b4c0065cb8eb4872393a59a76152034775ebe. --- admin/dashboard/blueprints/auth.py | 132 ++++++------ admin/dashboard/server.py | 327 ++++++++++++++--------------- db/admin_account_db.py | 88 ++++---- test/test_dashboard_auth_logic.py | 171 --------------- 4 files changed, 251 insertions(+), 467 deletions(-) delete mode 100644 test/test_dashboard_auth_logic.py diff --git a/admin/dashboard/blueprints/auth.py b/admin/dashboard/blueprints/auth.py index 2e2aba5..659aedd 100644 --- a/admin/dashboard/blueprints/auth.py +++ b/admin/dashboard/blueprints/auth.py @@ -1,11 +1,10 @@ -import time +from flask import Blueprint, render_template, request, redirect, url_for, session, current_app, jsonify from functools import wraps - -from flask import Blueprint, current_app, jsonify, redirect, render_template, request, session, url_for from loguru import logger +import time -# 创建认证蓝图。 -auth_bp = Blueprint("auth", __name__) +# 创建认证蓝图 +auth_bp = Blueprint('auth', __name__) def _is_ajax_request() -> bool: @@ -15,38 +14,36 @@ def _is_ajax_request() -> bool: return requested_with == "xmlhttprequest" or "application/json" in accept_header +# 登录检查装饰器 def login_required(f): - """登录检查装饰器。""" - @wraps(f) def decorated_function(*args, **kwargs): - if not session.get("logged_in"): - return redirect(url_for("auth.login")) - + if not session.get('logged_in'): + return redirect(url_for('auth.login')) server = current_app.dashboard_server session_timeout_seconds = int(server.get_auth_policy().get("session_timeout_minutes", 480) * 60) last_activity_at = float(session.get("last_activity_at") or 0.0) if last_activity_at > 0 and (time.time() - last_activity_at) > session_timeout_seconds: # 会话超时后统一清理登录态,避免长期闲置会话继续可用。 session.clear() - return redirect(url_for("auth.login")) - + return redirect(url_for('auth.login')) session["last_activity_at"] = time.time() return f(*args, **kwargs) return decorated_function -@auth_bp.route("/login", methods=["GET", "POST"]) +# 登录页面 +@auth_bp.route('/login', methods=['GET', 'POST']) def login(): - """后台登录页及登录提交入口。""" error = None - if request.method == "POST": - # 用户名做 strip,避免首尾空格导致的误判;密码保留原样,避免改变真实口令。 - username = str(request.form.get("username", "") or "").strip() - password = str(request.form.get("password", "") or "") + if request.method == 'POST': + # 使用 strip 规避用户误输入首尾空格导致的误判。 + username = str(request.form.get('username', '') or '').strip() + password = str(request.form.get('password', '') or '') remote_ip = request.remote_addr or "" + # 从应用上下文获取服务器实例,而不是从蓝图对象 server = current_app.dashboard_server admin_db = getattr(server, "admin_account_db", None) lock_status = server.get_login_lock_status(username, remote_ip) @@ -55,89 +52,82 @@ def login(): error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试" if _is_ajax_request(): return jsonify({"success": False, "error": error}), 429 - return render_template("login.html", error=error) + return render_template('login.html', error=error) - # 认证边界说明: - # 1. 只要数据库里已经存在该管理员账号,就必须以数据库密码为准; - # 2. 否则用户在后台改密后,config.toml 里的旧密码仍然可能继续生效; - # 3. 仅当数据库不可用,或者该账号尚未迁移进数据库时,才回退到旧配置模式。 + # 优先使用数据库账号体系鉴权;若不可用则回退旧配置模式,保证兼容存量部署。 login_ok = False - should_use_config_fallback = not bool(admin_db) if admin_db: try: - admin_row = admin_db.get_admin_by_username(username) - should_use_config_fallback = not bool(admin_row) - if admin_row and int(admin_row.get("status") or 0) == 1: - login_ok = admin_db.verify_password(password, str(admin_row.get("password_hash") or "")) + login_ok = admin_db.verify_admin_password(username, password) if login_ok: - admin_db.mark_login_success(username, remote_ip) + admin_db.mark_login_success(username, request.remote_addr or "") except Exception as e: logger.error(f"数据库账号登录校验异常,回退配置模式: {e}") login_ok = False - should_use_config_fallback = True - if not login_ok and should_use_config_fallback: - login_ok = username == server.username and password == server.password + if not login_ok: + login_ok = (username == server.username and password == server.password) if login_ok: server.clear_login_failures(username, remote_ip) - session["logged_in"] = True + session['logged_in'] = True session.permanent = True - session["username"] = username - session["last_activity_at"] = time.time() - # 登录成功后再读取当前账号的安全状态,决定是否强制弹出改密提示。 - session["force_password_change"] = bool(server.should_force_password_change(username)) + session['username'] = username # 存储用户名到session + session['last_activity_at'] = time.time() + # 若账号仍在使用默认/弱口令,则登录后强提示修改密码。 + session['force_password_change'] = bool(server.should_force_password_change(username)) logger.debug(f"Login successful. Session after login: {dict(session)}") if _is_ajax_request(): - return jsonify({"success": True, "redirect_url": url_for("main.index")}) - return redirect(url_for("main.index")) - - lock_status = server.mark_login_failure(username, remote_ip) - if lock_status.get("locked"): - wait_seconds = int(lock_status.get("remaining_seconds") or 0) - error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试" + return jsonify({"success": True, "redirect_url": url_for('main.index')}) + return redirect(url_for('main.index')) else: - remaining = max( - 0, - int(server.get_auth_policy().get("max_failed_attempts", 5)) - int(lock_status.get("failed_count") or 0), - ) - error = "用户名或密码错误" - if remaining > 0: - error = f"{error},再失败 {remaining} 次将暂时锁定" - if _is_ajax_request(): - return jsonify({"success": False, "error": error}), 400 + lock_status = server.mark_login_failure(username, remote_ip) + if lock_status.get("locked"): + wait_seconds = int(lock_status.get("remaining_seconds") or 0) + error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试" + else: + remaining = max(0, int(server.get_auth_policy().get("max_failed_attempts", 5)) - int(lock_status.get("failed_count") or 0)) + error = '用户名或密码错误' + if remaining > 0: + error = f"{error},再失败 {remaining} 次将暂时锁定" + if _is_ajax_request(): + return jsonify({"success": False, "error": error}), 400 - return render_template("login.html", error=error) + return render_template('login.html', error=error) -@auth_bp.route("/logout") +# 登出 +@auth_bp.route('/logout') def logout(): - """退出登录。""" session.clear() - return redirect(url_for("auth.login")) + return redirect(url_for('auth.login')) -@auth_bp.route("/api/auth/security_status", methods=["GET"]) +@auth_bp.route('/api/auth/security_status', methods=['GET']) @login_required def get_security_status(): """返回当前登录管理员的安全状态。""" - return jsonify( - { - "success": True, - "data": { - "force_password_change": bool(session.get("force_password_change", False)), - "session_timeout_minutes": int( - current_app.dashboard_server.get_auth_policy().get("session_timeout_minutes", 480) - ), - }, + return jsonify({ + "success": True, + "data": { + "force_password_change": bool(session.get("force_password_change", False)), + "session_timeout_minutes": int(current_app.dashboard_server.get_auth_policy().get("session_timeout_minutes", 480)), } - ) + }) -@auth_bp.route("/api/auth/change_password", methods=["POST"]) +@auth_bp.route('/api/auth/change_password', methods=['POST']) @login_required def change_password(): - """修改当前登录管理员密码。""" + """修改当前登录管理员密码。 + + 前端请求参数: + { + "old_password": "旧密码", + "new_password": "新密码", + "confirm_password": "确认新密码" + } + """ server = current_app.dashboard_server admin_db = getattr(server, "admin_account_db", None) if not admin_db: @@ -173,7 +163,7 @@ def change_password(): if not updated: return jsonify({"success": False, "error": "密码更新失败,请稍后重试"}), 500 - # 改密成功后立刻移除“必须修改密码”的会话提示,避免用户当前会话继续被反复打断。 + # 密码修改成功后,移除“必须修改密码”的会话提示。 session["force_password_change"] = False return jsonify({"success": True, "message": "密码修改成功"}) except Exception as e: diff --git a/admin/dashboard/server.py b/admin/dashboard/server.py index ebe739e..40bf19f 100644 --- a/admin/dashboard/server.py +++ b/admin/dashboard/server.py @@ -1,52 +1,43 @@ # -*- coding: utf-8 -*- """ -统计看板服务器。 - -这里负责: -1. 读取 Dashboard 配置并创建 Flask 应用; -2. 初始化后台依赖对象、蓝图和静态资源路由; -3. 提供后台登录限流、弱密码提醒等通用安全能力。 +统计看板服务器 - 使用Flask蓝图重构版 """ - import os -import secrets import sys import threading import time +import secrets from datetime import timedelta import toml from flask import Flask, send_from_directory from loguru import logger -from db.admin_account_db import AdminAccountDBOperator from db.contacts_db import ContactsDBOperator +from db.admin_account_db import AdminAccountDBOperator from db.emoji_asset_db import EmojiAssetDB -from db.fun_command_rule_db import FunCommandRuleDBOperator from db.member_context_db import MemberContextDBOperator from db.message_storage import MessageStorageDB from db.stats_db import StatsDBOperator from db.task_db import TaskDBOperator +from db.fun_command_rule_db import FunCommandRuleDBOperator from utils.fun_command_rule_service import FunCommandRuleService from wechat_ipad import WechatAPIClient -# 添加项目根目录到系统路径,确保可以导入项目模块。 -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))) +# 添加项目根目录到系统路径,确保可以导入项目模块 +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) class DashboardServer: - """统计看板服务器。""" + """统计看板服务器""" - def __init__( - self, - host: str = None, - port: int = None, - username: str = None, - password: str = None, - robot_instance=None, - ): - # 先加载配置文件,再用传入参数覆盖,保证 CLI/外部调用具备最高优先级。 + def __init__(self, host: str = None, port: int = None, + username: str = None, password: str = None, + robot_instance=None): + # 加载配置文件 self.config = self._load_dashboard_config() + + # 优先使用传入的参数,其次使用配置文件中的参数 self.host = host or self.config.get("server", {}).get("host", "0.0.0.0") self.port = port or self.config.get("server", {}).get("port", 8888) self.username = username or self.config.get("auth", {}).get("username", "admin") @@ -54,108 +45,105 @@ class DashboardServer: self.LOG = logger self.LOG.info(f"Dashboard配置加载完成: 服务器将运行在 {self.host}:{self.port}") - - # 登录失败限流的进程内兜底缓存: - # 1. 优先用 Redis 做共享限流; - # 2. 如果 Redis 暂时不可用,至少本进程内仍有基本防护; - # 3. 这里会被并发访问,因此需要配合线程锁使用。 + # 登录失败限流兜底缓存: + # 1. 优先尝试 Redis,但为了兼容 Redis 暂不可用的场景,这里保留进程内兜底; + # 2. 字典内容只保存短期登录失败窗口,不用于持久化; + # 3. 线程化 WSGI 会并发访问,因此需要显式加锁。 self._auth_runtime_lock = threading.Lock() self._auth_failures = {} + # 如果提供了robot实例,则使用其对象 + if robot_instance: + self.db_manager = robot_instance.db_manager + self.stats_db = StatsDBOperator(self.db_manager) + # Dashboard 启动可能早于 iPad 登录完成: + # 1. 此时 Robot 上的 message_storage 还没来得及绑定真实 bot; + # 2. 但后台很多页面仍然依赖消息存储与表情资产库; + # 3. 因此这里优先复用 Robot 已初始化的 message_storage,没有则再安全回退到 DB 层对象。 + self.message_storage = getattr(robot_instance, "message_storage", None) or MessageStorageDB(self.db_manager) + self.emoji_asset_db = getattr(self.message_storage, "emoji_asset_db", None) or EmojiAssetDB(self.db_manager) + self.contact_db: ContactsDBOperator = ContactsDBOperator(self.db_manager) + self.member_context_db = MemberContextDBOperator(self.db_manager) + self.task_db: TaskDBOperator = TaskDBOperator(self.db_manager) + # 后台管理员账号数据层:用于登录鉴权与修改密码。 + self.admin_account_db = AdminAccountDBOperator(self.db_manager) + self.system_job_db = robot_instance.system_job_db + self.system_job_loader = robot_instance.system_job_loader + self.plugin_schedule_db = robot_instance.plugin_schedule_db + self.plugin_schedule_manager = robot_instance.plugin_schedule_manager + self.group_plugin_config_db = robot_instance.group_plugin_config_db + self.llm_catalog_db = robot_instance.llm_catalog_db + self.group_plugin_config_service = robot_instance.group_plugin_config_service + # 趣味指令规则服务:用于“文案/事件触发多媒体玩法回复”后台配置与缓存。 + # 这里统一在 Dashboard 启动时初始化,保证管理端可直接读写规则。 + self.fun_command_rule_db = FunCommandRuleDBOperator(self.db_manager) + self.fun_command_rule_service = FunCommandRuleService( + db_operator=self.fun_command_rule_db, + redis_client=self.db_manager.get_redis_connection(), + local_ttl_seconds=30, + ) + self.fun_command_rule_service.init_tables() + self.fun_command_rule_service.refresh_cache() + # 获取联系人管理器实例 + self.contact_manager = robot_instance.contact_manager + self.plugin_manager = robot_instance.plugin_manager + self.plugin_registry = robot_instance.plugin_registry + self.client: WechatAPIClient = robot_instance.ipad_bot + self.robot = robot_instance + self.member_context_plugin = self.plugin_manager.plugins.get("成员交互摘要") + self.member_context_service = getattr(self.member_context_plugin, "service", None) - if not robot_instance: + self.LOG.info("使用Robot实例的对象进行初始化") + + # 初始化后台管理员账号表,并将旧配置中的默认账号平滑迁移进数据库。 + try: + table_ok = self.admin_account_db.init_tables() + if not table_ok: + self.LOG.warning("初始化后台账号表失败,将回退旧配置账号模式") + else: + seed_ok = self.admin_account_db.ensure_default_admin(self.username, self.password, "系统管理员") + if seed_ok: + self.LOG.info("后台账号体系初始化完成(数据库账号模式已可用)") + else: + self.LOG.warning("后台账号种子初始化失败,请检查配置中的默认账号信息") + except Exception as e: + self.LOG.error(f"初始化后台账号体系失败,将回退旧配置账号模式: {e}") + else: self.LOG.error("未提供Robot实例,Dashboard无法正常工作") raise ValueError("必须提供Robot实例") - self.db_manager = robot_instance.db_manager - self.stats_db = StatsDBOperator(self.db_manager) - - # Dashboard 启动可能早于 iPad 登录完成: - # 1. 此时 Robot 上的 message_storage 可能尚未绑定完成; - # 2. 但后台页面仍然依赖消息存储与表情资产库; - # 3. 因此优先复用 Robot 现成对象,没有时再安全回退到 DB 层对象。 - self.message_storage = getattr(robot_instance, "message_storage", None) or MessageStorageDB(self.db_manager) - self.emoji_asset_db = getattr(self.message_storage, "emoji_asset_db", None) or EmojiAssetDB(self.db_manager) - self.contact_db: ContactsDBOperator = ContactsDBOperator(self.db_manager) - self.member_context_db = MemberContextDBOperator(self.db_manager) - self.task_db: TaskDBOperator = TaskDBOperator(self.db_manager) - - # 后台管理员账号数据层:用于登录鉴权、弱密码判断和在线改密。 - self.admin_account_db = AdminAccountDBOperator(self.db_manager) - self.system_job_db = robot_instance.system_job_db - self.system_job_loader = robot_instance.system_job_loader - self.plugin_schedule_db = robot_instance.plugin_schedule_db - self.plugin_schedule_manager = robot_instance.plugin_schedule_manager - self.group_plugin_config_db = robot_instance.group_plugin_config_db - self.llm_catalog_db = robot_instance.llm_catalog_db - self.group_plugin_config_service = robot_instance.group_plugin_config_service - - # 趣味指令规则服务:用于“文案/事件触发多媒体玩法回复”的后台配置与缓存。 - self.fun_command_rule_db = FunCommandRuleDBOperator(self.db_manager) - self.fun_command_rule_service = FunCommandRuleService( - db_operator=self.fun_command_rule_db, - redis_client=self.db_manager.get_redis_connection(), - local_ttl_seconds=30, - ) - self.fun_command_rule_service.init_tables() - self.fun_command_rule_service.refresh_cache() - - # 其余运行时对象直接复用 Robot 已初始化实例,避免重复构造。 - self.contact_manager = robot_instance.contact_manager - self.plugin_manager = robot_instance.plugin_manager - self.plugin_registry = robot_instance.plugin_registry - self.client: WechatAPIClient = robot_instance.ipad_bot - self.robot = robot_instance - self.member_context_plugin = self.plugin_manager.plugins.get("成员交互摘要") - self.member_context_service = getattr(self.member_context_plugin, "service", None) - - self.LOG.info("使用Robot实例的对象进行初始化") - - # 初始化后台管理员账号表,并将旧配置中的默认账号平滑迁移进数据库。 - try: - table_ok = self.admin_account_db.init_tables() - if not table_ok: - self.LOG.warning("初始化后台账号表失败,将回退旧配置账号模式") - else: - seed_ok = self.admin_account_db.ensure_default_admin(self.username, self.password, "系统管理员") - if seed_ok: - self.LOG.info("后台账号体系初始化完成(数据库账号模式已可用)") - else: - self.LOG.warning("后台账号种子初始化失败,请检查配置中的默认账号信息") - except Exception as e: - self.LOG.error(f"初始化后台账号体系失败,将回退旧配置账号模式: {e}") - self.app = self._create_app() self._stop_event = threading.Event() - self._server = None # Werkzeug 服务实例会在 run() 时写入这里。 + self._server = None # 存储服务器实例 def _load_dashboard_config(self): - """加载 Dashboard 配置文件。""" + """加载Dashboard配置文件""" try: - config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.toml") + config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.toml') if os.path.exists(config_path): - with open(config_path, "r", encoding="utf-8") as f: + with open(config_path, 'r', encoding='utf-8') as f: return toml.load(f) - - # 如果配置文件不存在,则创建默认配置,方便本地快速启动。 - default_config = { - "server": {"host": "0.0.0.0", "port": 8888}, - "auth": {"username": "admin", "password": "admin123"}, - } - with open(config_path, "w", encoding="utf-8") as f: - toml.dump(default_config, f) - return default_config + else: + # 如果配置文件不存在,创建默认配置 + default_config = { + "server": {"host": "0.0.0.0", "port": 8888}, + "auth": {"username": "admin", "password": "admin123"} + } + with open(config_path, 'w', encoding='utf-8') as f: + toml.dump(default_config, f) + return default_config except Exception as e: self.LOG.error(f"加载Dashboard配置文件失败: {e}") + # 返回默认配置 return { "server": {"host": "0.0.0.0", "port": 8888}, - "auth": {"username": "admin", "password": "admin123"}, + "auth": {"username": "admin", "password": "admin123"} } def _create_app(self) -> Flask: - """创建 Flask 应用。""" - template_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates") + """创建Flask应用""" + # 指定模板文件夹路径 + template_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates') app = Flask(__name__, template_folder=template_folder) - auth_config = self.config.get("auth", {}) or {} session_timeout_minutes = int(auth_config.get("session_timeout_minutes", 480) or 480) cookie_secure = bool(auth_config.get("cookie_secure", False)) @@ -164,54 +152,57 @@ class DashboardServer: or auth_config.get("secret_key", "") or "" ).strip() - if configured_secret: app.secret_key = configured_secret else: - # 未显式配置 secret_key 时,用进程级随机密钥兜底: - # 1. 安全性优于硬编码固定值; - # 2. 服务重启后旧 session 会失效,但这是可接受的安全代价; - # 3. 同时输出 warning,提醒后续通过配置或环境变量固定注入。 + # 若未显式配置 secret_key,则每次进程启动生成随机值: + # 1. 这比固定硬编码密钥安全得多; + # 2. 代价是服务重启后旧 session 会失效,作为安全兜底是可接受的; + # 3. 同时输出 warning,提醒后续最好通过配置或环境变量固定注入。 app.secret_key = secrets.token_hex(32) self.LOG.warning("未配置 Dashboard secret_key,已使用进程级随机密钥,重启后现有登录会失效") - # 关闭模板缓存,便于开发时实时看到页面修改结果。 - app.config["TEMPLATES_AUTO_RELOAD"] = True - app.config["SESSION_COOKIE_HTTPONLY"] = True - app.config["SESSION_COOKIE_SAMESITE"] = str(auth_config.get("cookie_samesite", "Lax") or "Lax") - app.config["SESSION_COOKIE_SECURE"] = cookie_secure - app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=max(15, session_timeout_minutes)) + # 禁用模板缓存,使修改HTML文件后立即生效 False =重启才生效 + app.config['TEMPLATES_AUTO_RELOAD'] = True + app.config['SESSION_COOKIE_HTTPONLY'] = True + app.config['SESSION_COOKIE_SAMESITE'] = str(auth_config.get("cookie_samesite", "Lax") or "Lax") + app.config['SESSION_COOKIE_SECURE'] = cookie_secure + app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(minutes=max(15, session_timeout_minutes)) + # 设置Werkzeug日志级别为DEBUG import logging + logging.getLogger('werkzeug').setLevel(logging.ERROR) - logging.getLogger("werkzeug").setLevel(logging.ERROR) - - # 将 DashboardServer 实例挂到 app 上,方便蓝图在请求期取用。 + # 将dashboard_server实例设置为app的属性 app.dashboard_server = self - static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static") + # 配置静态文件访问 + static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static') - @app.route("/static/") + @app.route('/static/') def serve_static(filename): return send_from_directory(static_folder, filename) + # 获取项目根目录下的static/images目录 project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) images_dir = os.path.join(project_root, "static", "images") + + # 确保目录存在 os.makedirs(images_dir, exist_ok=True) - @app.route("/static/images/") + @app.route('/static/images/') def serve_images(filename): return send_from_directory(images_dir, filename) - @app.route("/favicon.ico") + # 添加一个路由处理favicon请求 + @app.route('/favicon.ico') def favicon(): - return send_from_directory( - os.path.join(app.root_path, "static"), - "favicon.ico", - mimetype="image/vnd.microsoft.icon", - ) + return send_from_directory(os.path.join(app.root_path, 'static'), + 'favicon.ico', mimetype='image/vnd.microsoft.icon') + # 注册蓝图 self._register_blueprints(app) + return app def get_auth_policy(self) -> dict: @@ -248,19 +239,12 @@ class DashboardServer: if expire_at > 0 and expire_at <= time.time(): self._auth_failures.pop(guard_key, None) record = {} - return { "count": int(record.get("count") or 0), "first_failed_at": float(record.get("first_failed_at") or 0.0), } - def _save_login_failure_record( - self, - guard_key: str, - count: int, - first_failed_at: float, - ttl_seconds: int, - ) -> None: + def _save_login_failure_record(self, guard_key: str, count: int, first_failed_at: float, ttl_seconds: int) -> None: """保存登录失败记录,优先 Redis,失败时回退到进程内缓存。""" payload = f"{int(count)}|{float(first_failed_at)}" try: @@ -322,48 +306,41 @@ class DashboardServer: def should_force_password_change(self, username: str) -> bool: """判断当前管理员是否应该被强制提示修改密码。""" - normalized_username = str(username or "").strip() admin_db = getattr(self, "admin_account_db", None) - if admin_db: - try: - admin_row = admin_db.get_admin_by_username(normalized_username) - # 只要数据库里已经存在该账号,就完全以数据库中的当前密码状态为准。 - # 这样可以避免用户已经在后台把密码改强后,仍被 config.toml 中的旧默认密码反复误判。 - if admin_row: - return bool(admin_db.is_using_risky_password(normalized_username)) - except Exception as e: - self.LOG.error(f"判断后台弱密码状态时出现异常,将回退配置判断: {e}") + if admin_db and admin_db.is_using_risky_password(username): + return True - # 数据库体系不可用或该账号尚未迁移进数据库时,再回退配置值判断。 + # 数据库体系不可用时,再回退配置值判断,至少把默认 admin/admin123 识别出来。 fallback_username = str(self.username or "").strip() fallback_password = str(self.password or "").strip() return ( - normalized_username == fallback_username + str(username or "").strip() == fallback_username and fallback_password in getattr(admin_db, "RISKY_PASSWORDS", {"admin123", "admin"}) ) def _register_blueprints(self, app): - """注册所有蓝图。""" - # 在函数内部导入蓝图,避免循环导入。 + """注册所有蓝图""" + # 在函数内部导入蓝图,避免循环导入 from admin.dashboard.blueprints.auth import auth_bp from admin.dashboard.blueprints.contacts import contacts_bp - from admin.dashboard.blueprints.file_browser import file_browser_bp - from admin.dashboard.blueprints.friend_circle import friend_circle_bp - from admin.dashboard.blueprints.fun_command_rules import fun_command_rules_bp - from admin.dashboard.blueprints.group_plugin_config import group_plugin_config_bp - from admin.dashboard.blueprints.main import main_bp - from admin.dashboard.blueprints.message_push import message_push_bp - from admin.dashboard.blueprints.messages import messages_bp - from admin.dashboard.blueprints.plugin_routes import plugin_routes - from admin.dashboard.blueprints.plugin_schedules import plugin_schedules_bp from admin.dashboard.blueprints.robot import robot_bp + from admin.dashboard.blueprints.messages import messages_bp from admin.dashboard.blueprints.stats import stats_bp from admin.dashboard.blueprints.system import system_bp - from admin.dashboard.blueprints.system_jobs import system_jobs_bp - from admin.dashboard.blueprints.trendradar_webhook import trendradar_webhook_bp + from admin.dashboard.blueprints.main import main_bp + from admin.dashboard.blueprints.plugin_routes import plugin_routes from admin.dashboard.blueprints.virtual_group import virtual_group_bp + from admin.dashboard.blueprints.file_browser import file_browser_bp + from admin.dashboard.blueprints.message_push import message_push_bp + from admin.dashboard.blueprints.friend_circle import friend_circle_bp + from admin.dashboard.blueprints.system_jobs import system_jobs_bp + from admin.dashboard.blueprints.plugin_schedules import plugin_schedules_bp + from admin.dashboard.blueprints.group_plugin_config import group_plugin_config_bp + from admin.dashboard.blueprints.fun_command_rules import fun_command_rules_bp + from admin.dashboard.blueprints.trendradar_webhook import trendradar_webhook_bp - app.register_blueprint(virtual_group_bp, url_prefix="/virtual_group") + # 在app.register_blueprint部分添加 + app.register_blueprint(virtual_group_bp, url_prefix='/virtual_group') app.register_blueprint(auth_bp) app.register_blueprint(main_bp) app.register_blueprint(contacts_bp) @@ -384,16 +361,16 @@ class DashboardServer: self.LOG.info("所有蓝图已注册") def run(self): - """运行服务器。""" + """运行服务器""" from werkzeug.serving import make_server + # 设置Werkzeug日志级别为DEBUG import logging - - logging.getLogger("werkzeug").setLevel(logging.ERROR) + logging.getLogger('werkzeug').setLevel(logging.ERROR) self.LOG.info(f"启动服务器: {self.host}:{self.port}") try: - # Dashboard 存在文件浏览、统计查询等慢请求,单线程 WSGI 一旦被占住会拖死整个后台。 - # 改为 threaded server 可以避免某个接口阻塞时所有页面一起无响应。 + # Dashboard 存在文件浏览、统计查询等慢请求,单线程 WSGI 一旦被占住会导致整个后台无响应。 + # 改为 threaded server,避免某个接口阻塞后拖死所有页面访问。 self._server = make_server(self.host, self.port, self.app, threaded=True) self._server.serve_forever() except Exception as e: @@ -401,22 +378,26 @@ class DashboardServer: self._stop_event.set() def stop(self): - """停止服务器。""" + """停止服务器""" self.LOG.info("正在停止服务器...") self._stop_event.set() + # 使用werkzeug服务器的关闭方法 if self._server: self._server.shutdown() self.LOG.info("服务器已停止") def get_current_user_info(self): - """获取当前登录的微信用户信息。""" + """获取当前登录的微信用户信息""" try: if not self.client: self.LOG.error("client实例不可用,无法获取当前用户信息") return {"success": False, "message": "实例不可用"} + # 获取当前登录的微信ID + + # 从新的resp格式中获取用户信息 try: if self.robot is None: raise ValueError("机器人对象未初始化") @@ -425,7 +406,7 @@ class DashboardServer: "nickName": getattr(self.robot, "nickname", ""), "mobile": getattr(self.robot, "phone", ""), "smallHeadImgUrl": getattr(self.robot, "head_image", ""), - "signature": getattr(self.robot, "signature", ""), + "signature": getattr(self.robot, "signature", "") } except (AttributeError, ValueError) as e: print(f"获取用户信息出错: {str(e)}") @@ -434,7 +415,7 @@ class DashboardServer: "nickName": self.robot.nickname, "mobile": self.robot.phone, "smallHeadImgUrl": self.robot.head_image, - "signature": self.robot.signature, + "signature": self.robot.signature } if not user_data: @@ -445,11 +426,11 @@ class DashboardServer: "data": { "wx_id": user_data.get("wxid", ""), "nickname": user_data.get("nickName", "未知用户"), - "avatar": user_data.get("smallHeadImgUrl", "logo.png"), # 使用小头像 URL。 + "avatar": user_data.get("smallHeadImgUrl", "logo.png"), # 使用小头像URL "mobile": user_data.get("mobile", ""), - "home": f"{user_data.get('province', '')}-{user_data.get('city', '')}", # 组合省市信息。 - "signature": user_data.get("signature", "")[:10], - }, + "home": f"{user_data.get('province', '')}-{user_data.get('city', '')}", # 组合省市信息 + "signature": user_data.get("signature", "")[:10] + } } except Exception as e: self.LOG.error(f"获取当前用户信息失败: {e}") diff --git a/db/admin_account_db.py b/db/admin_account_db.py index 5fdab6d..86b6a17 100644 --- a/db/admin_account_db.py +++ b/db/admin_account_db.py @@ -3,10 +3,9 @@ 后台管理员账号数据访问层。 设计目标: -1. 用数据库表承载后台账号,逐步替代固定配置文件中的账号密码; -2. 提供安全的密码哈希存储与校验能力; -3. 支持登录成功后的登录信息回写,以及在线修改密码; -4. 兼容历史部署,把旧配置中的管理员密码平滑迁移到数据库体系。 +1. 用数据库表承载后台账号,替代“固定配置文件账号密码”; +2. 提供安全的密码散列存储与校验能力; +3. 支持登录成功后的登录信息回写与在线修改密码。 """ import base64 @@ -22,14 +21,14 @@ from db.base import BaseDBOperator class AdminAccountDBOperator(BaseDBOperator): """后台管理员账号数据访问对象。""" - # 口令哈希算法前缀,便于后续平滑升级算法。 + # 口令哈希算法版本前缀,便于将来平滑升级算法。 HASH_SCHEME = "pbkdf2_sha256" - # PBKDF2 迭代次数,在安全性和计算开销之间做平衡。 + # PBKDF2 迭代次数:在安全性与计算开销之间做平衡。 HASH_ITERATIONS = 150_000 - # 已知高风险密码列表: - # 1. 这里只覆盖默认密码和常见弱密码,不做完整字典; - # 2. 后台安全提醒只需要识别“明显高风险”的情况即可; - # 3. 统一收敛在数据层,方便登录、改密、首次提醒共用。 + # 风险口令清单: + # 1. 这里优先覆盖系统默认口令和常见极弱口令; + # 2. 后台安全判断只需要识别“明显危险”的情况,不追求做成完整密码字典; + # 3. 统一放在数据层,便于登录鉴权、修改密码、首登提醒共用。 RISKY_PASSWORDS = { "admin", "admin123", @@ -41,7 +40,10 @@ class AdminAccountDBOperator(BaseDBOperator): } def init_tables(self) -> bool: - """初始化后台管理员账号表。""" + """初始化后台管理员表。 + + 表名使用 t_admin_ 前缀,满足后台账号体系命名约定。 + """ sql = """ CREATE TABLE IF NOT EXISTS t_admin_accounts ( id BIGINT AUTO_INCREMENT PRIMARY KEY, @@ -72,17 +74,12 @@ class AdminAccountDBOperator(BaseDBOperator): ) def ensure_default_admin(self, username: str, password: str, display_name: str = "系统管理员") -> bool: - """确保默认管理员存在,并在安全前提下补齐旧配置密码迁移。 + """确保默认管理员存在。 - 这里的迁移策略分三层: - 1. 数据库里还没有管理员账号时,按当前配置创建初始账号; - 2. 数据库里已经有账号,且密码本来就和当前配置一致时,直接视为已同步; - 3. 数据库里仍是历史默认弱密码,但配置里已经换成了更强密码时, - 自动把强密码同步进数据库,避免登录和弱密码提示长期错位。 - - 注意: - 只有在“数据库当前密码仍然是已知弱密码,而传入配置密码不是弱密码”时, - 才允许用配置回写数据库;如果用户已经在后台主动改过密码,则继续以数据库为准。 + 行为约束: + 1. 若用户名已存在,不覆盖既有密码; + 2. 仅在“表里不存在该账号”时创建初始账号; + 3. 方便从旧配置平滑迁移到数据库账号体系。 """ normalized_username = str(username or "").strip() normalized_password = str(password or "").strip() @@ -90,25 +87,17 @@ class AdminAccountDBOperator(BaseDBOperator): return False existing = self.get_admin_by_username(normalized_username) - if not existing: - password_hash = self.hash_password(normalized_password) - return self.execute_update( - """ - INSERT INTO t_admin_accounts (username, password_hash, display_name, status) - VALUES (%s, %s, %s, 1) - """, - (normalized_username, password_hash, str(display_name or "").strip() or normalized_username), - ) - - stored_hash = str(existing.get("password_hash") or "") - if stored_hash and self.verify_password(normalized_password, stored_hash): + if existing: return True - current_password_is_risky = self.is_password_hash_using_risky_password(stored_hash) - incoming_password_is_risky = normalized_password.lower() in self.RISKY_PASSWORDS - if current_password_is_risky and not incoming_password_is_risky: - return self.update_password(normalized_username, normalized_password) - return True + password_hash = self.hash_password(normalized_password) + return self.execute_update( + """ + INSERT INTO t_admin_accounts (username, password_hash, display_name, status) + VALUES (%s, %s, %s, 1) + """, + (normalized_username, password_hash, str(display_name or "").strip() or normalized_username), + ) def verify_admin_password(self, username: str, password: str) -> bool: """校验账号口令是否正确。""" @@ -146,22 +135,17 @@ class AdminAccountDBOperator(BaseDBOperator): ) def is_using_risky_password(self, username: str) -> bool: - """判断指定管理员是否仍在使用已知弱密码。""" + """判断指定管理员是否仍在使用风险口令。""" row = self.get_admin_by_username(username) if not row: return False stored_hash = str(row.get("password_hash") or "") - return self.is_password_hash_using_risky_password(stored_hash) - - @classmethod - def is_password_hash_using_risky_password(cls, stored_hash: str) -> bool: - """判断一个已存储密码哈希是否仍然对应已知弱密码。""" if not stored_hash: return False - # 哈希无法反解,这里只能把已知高风险候选值逐个比对。 - for candidate in cls.RISKY_PASSWORDS: - if cls.verify_password(candidate, stored_hash): + # 口令是哈希存储的,因此只能把风险候选集逐个比对。 + for candidate in self.RISKY_PASSWORDS: + if self.verify_password(candidate, stored_hash): return True return False @@ -170,10 +154,10 @@ class AdminAccountDBOperator(BaseDBOperator): """校验密码强度,返回错误提示;通过时返回 None。""" password_text = str(raw_password or "") if len(password_text) < 8: - return "新密码长度不能少于 8 位" + return "新密码长度不能少于8位" if password_text.lower() in cls.RISKY_PASSWORDS: - return "新密码过于简单,请避免使用默认口令或常见弱密码" + return "新密码过于简单,请避免使用默认口令或常见弱口令" score = 0 if re.search(r"[A-Za-z]", password_text): @@ -183,7 +167,7 @@ class AdminAccountDBOperator(BaseDBOperator): if re.search(r"[^A-Za-z0-9]", password_text): score += 1 - # 至少满足两类字符,既兼顾安全性,也避免把规则设置得过于苛刻。 + # 至少满足两类字符,既兼顾安全性,也避免把规则设得过于苛刻。 if score < 2: return "新密码需至少包含字母、数字、符号中的两类" return None @@ -193,7 +177,7 @@ class AdminAccountDBOperator(BaseDBOperator): """生成口令哈希。 存储格式: - pbkdf2_sha256$迭代次数$盐值HEX$摘要base64 + pbkdf2_sha256$迭代次数$盐(HEX)$哈希(base64) """ password_text = str(raw_password or "") salt_bytes = secrets.token_bytes(16) @@ -213,7 +197,7 @@ class AdminAccountDBOperator(BaseDBOperator): 安全细节: 1. 使用 hmac.compare_digest,避免时序侧信道问题; - 2. 对格式异常统一返回 False,避免异常中断登录流程。 + 2. 对格式异常统一返回 False,避免抛错打断登录流程。 """ try: scheme, iterations_text, salt_hex, digest_b64 = str(stored_hash or "").split("$", 3) diff --git a/test/test_dashboard_auth_logic.py b/test/test_dashboard_auth_logic.py deleted file mode 100644 index b83acb1..0000000 --- a/test/test_dashboard_auth_logic.py +++ /dev/null @@ -1,171 +0,0 @@ -import unittest - -from flask import Blueprint, Flask - -from admin.dashboard.blueprints.auth import auth_bp -from admin.dashboard.server import DashboardServer -from db.admin_account_db import AdminAccountDBOperator - - -class DummyAdminDB: - """用于回归测试后台登录与弱密码判定的最小桩对象。""" - - RISKY_PASSWORDS = AdminAccountDBOperator.RISKY_PASSWORDS - - def __init__(self, row_exists: bool, db_password: str, risky: bool): - self.row_exists = row_exists - self.db_password = db_password - self.risky = risky - self.login_success_marked = False - - def get_admin_by_username(self, username: str): - if not self.row_exists: - return None - return { - "username": username, - "status": 1, - "password_hash": self.db_password, - } - - def verify_password(self, raw_password: str, stored_hash: str) -> bool: - # 这里不引入真实哈希算法,直接把“数据库里当前有效的密码”抽象成 db_password, - # 只验证登录流程是否仍然错误地回退到了 config.toml。 - return raw_password == self.db_password - - def is_using_risky_password(self, username: str) -> bool: - return self.risky - - def mark_login_success(self, username: str, login_ip: str = "") -> bool: - self.login_success_marked = True - return True - - -class DummyServer: - """为 auth 蓝图提供最小运行依赖,避免把整套 Dashboard 初始化起来。""" - - def __init__(self, username: str, password: str, admin_db: DummyAdminDB): - self.username = username - self.password = password - self.admin_account_db = admin_db - - def get_login_lock_status(self, username: str, remote_ip: str) -> dict: - return {"locked": False, "remaining_seconds": 0, "failed_count": 0} - - def get_auth_policy(self) -> dict: - return {"max_failed_attempts": 5, "lock_seconds": 900, "session_timeout_minutes": 480} - - def clear_login_failures(self, username: str, remote_ip: str) -> None: - return None - - def mark_login_failure(self, username: str, remote_ip: str) -> dict: - return {"locked": False, "remaining_seconds": 0, "failed_count": 1} - - def should_force_password_change(self, username: str) -> bool: - temp_server = DashboardServer.__new__(DashboardServer) - temp_server.username = self.username - temp_server.password = self.password - temp_server.admin_account_db = self.admin_account_db - temp_server.LOG = None - return DashboardServer.should_force_password_change(temp_server, username) - - -class FakeAdminAccountDBOperator(AdminAccountDBOperator): - """只覆盖 ensure_default_admin 测试所需的方法,避免真实数据库依赖。""" - - def __init__(self, existing_row=None): - super().__init__(db_manager=None) - self.existing_row = existing_row - self.updated_password = None - self.inserted = False - - def get_admin_by_username(self, username: str): - return self.existing_row - - @classmethod - def verify_password(cls, raw_password: str, stored_hash: str) -> bool: - # 测试里直接把 password_hash 当成明文占位值,重点验证迁移分支是否被正确触发。 - return raw_password == stored_hash - - def update_password(self, username: str, new_password: str) -> bool: - self.updated_password = (username, new_password) - return True - - def execute_update(self, sql: str, params=None) -> bool: - self.inserted = True - return True - - -class DashboardAuthLogicTestCase(unittest.TestCase): - def create_app(self, server: DummyServer) -> Flask: - app = Flask(__name__) - app.secret_key = "test-secret" - app.dashboard_server = server - - main_bp = Blueprint("main", __name__) - - @main_bp.route("/") - def index(): - return "ok" - - app.register_blueprint(auth_bp) - app.register_blueprint(main_bp) - return app - - def test_should_force_password_change_ignores_legacy_config_when_db_password_is_strong(self): - server = DummyServer( - username="admin", - password="admin123", - admin_db=DummyAdminDB(row_exists=True, db_password="StrongPass!2026", risky=False), - ) - - self.assertFalse(server.should_force_password_change("admin")) - - def test_login_rejects_legacy_config_password_after_db_password_changed(self): - server = DummyServer( - username="admin", - password="admin123", - admin_db=DummyAdminDB(row_exists=True, db_password="StrongPass!2026", risky=False), - ) - app = self.create_app(server) - - with app.test_client() as client: - response = client.post( - "/login", - data={"username": "admin", "password": "admin123"}, - headers={"X-Requested-With": "XMLHttpRequest"}, - ) - - self.assertEqual(response.status_code, 400) - self.assertFalse(server.admin_account_db.login_success_marked) - - def test_login_keeps_legacy_config_fallback_when_db_account_missing(self): - server = DummyServer( - username="admin", - password="admin123", - admin_db=DummyAdminDB(row_exists=False, db_password="", risky=False), - ) - app = self.create_app(server) - - with app.test_client() as client: - response = client.post( - "/login", - data={"username": "admin", "password": "admin123"}, - headers={"X-Requested-With": "XMLHttpRequest"}, - ) - - self.assertEqual(response.status_code, 200) - self.assertTrue(response.get_json()["success"]) - - def test_ensure_default_admin_syncs_strong_config_password_over_risky_seed(self): - operator = FakeAdminAccountDBOperator( - existing_row={"username": "admin", "password_hash": "admin123", "status": 1} - ) - - result = operator.ensure_default_admin("admin", "StrongPass!2026") - - self.assertTrue(result) - self.assertEqual(operator.updated_password, ("admin", "StrongPass!2026")) - - -if __name__ == "__main__": - unittest.main()