diff --git a/admin/dashboard/blueprints/auth.py b/admin/dashboard/blueprints/auth.py index 659aedd..2e2aba5 100644 --- a/admin/dashboard/blueprints/auth.py +++ b/admin/dashboard/blueprints/auth.py @@ -1,10 +1,11 @@ -from flask import Blueprint, render_template, request, redirect, url_for, session, current_app, jsonify -from functools import wraps -from loguru import logger import time +from functools import wraps -# 创建认证蓝图 -auth_bp = Blueprint('auth', __name__) +from flask import Blueprint, current_app, jsonify, redirect, render_template, request, session, url_for +from loguru import logger + +# 创建认证蓝图。 +auth_bp = Blueprint("auth", __name__) def _is_ajax_request() -> bool: @@ -14,36 +15,38 @@ def _is_ajax_request() -> bool: return requested_with == "xmlhttprequest" or "application/json" in accept_header -# 登录检查装饰器 def login_required(f): + """登录检查装饰器。""" + @wraps(f) def decorated_function(*args, **kwargs): - if not session.get('logged_in'): - return redirect(url_for('auth.login')) + if not session.get("logged_in"): + return redirect(url_for("auth.login")) + server = current_app.dashboard_server session_timeout_seconds = int(server.get_auth_policy().get("session_timeout_minutes", 480) * 60) last_activity_at = float(session.get("last_activity_at") or 0.0) if last_activity_at > 0 and (time.time() - last_activity_at) > session_timeout_seconds: # 会话超时后统一清理登录态,避免长期闲置会话继续可用。 session.clear() - return redirect(url_for('auth.login')) + return redirect(url_for("auth.login")) + session["last_activity_at"] = time.time() return f(*args, **kwargs) return decorated_function -# 登录页面 -@auth_bp.route('/login', methods=['GET', 'POST']) +@auth_bp.route("/login", methods=["GET", "POST"]) def login(): + """后台登录页及登录提交入口。""" error = None - if request.method == 'POST': - # 使用 strip 规避用户误输入首尾空格导致的误判。 - username = str(request.form.get('username', '') or '').strip() - password = str(request.form.get('password', '') or '') + if request.method == "POST": + # 用户名做 strip,避免首尾空格导致的误判;密码保留原样,避免改变真实口令。 + username = str(request.form.get("username", "") or "").strip() + password = str(request.form.get("password", "") or "") remote_ip = request.remote_addr or "" - # 从应用上下文获取服务器实例,而不是从蓝图对象 server = current_app.dashboard_server admin_db = getattr(server, "admin_account_db", None) lock_status = server.get_login_lock_status(username, remote_ip) @@ -52,82 +55,89 @@ def login(): error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试" if _is_ajax_request(): return jsonify({"success": False, "error": error}), 429 - return render_template('login.html', error=error) + return render_template("login.html", error=error) - # 优先使用数据库账号体系鉴权;若不可用则回退旧配置模式,保证兼容存量部署。 + # 认证边界说明: + # 1. 只要数据库里已经存在该管理员账号,就必须以数据库密码为准; + # 2. 否则用户在后台改密后,config.toml 里的旧密码仍然可能继续生效; + # 3. 仅当数据库不可用,或者该账号尚未迁移进数据库时,才回退到旧配置模式。 login_ok = False + should_use_config_fallback = not bool(admin_db) if admin_db: try: - login_ok = admin_db.verify_admin_password(username, password) + admin_row = admin_db.get_admin_by_username(username) + should_use_config_fallback = not bool(admin_row) + if admin_row and int(admin_row.get("status") or 0) == 1: + login_ok = admin_db.verify_password(password, str(admin_row.get("password_hash") or "")) if login_ok: - admin_db.mark_login_success(username, request.remote_addr or "") + admin_db.mark_login_success(username, remote_ip) except Exception as e: logger.error(f"数据库账号登录校验异常,回退配置模式: {e}") login_ok = False + should_use_config_fallback = True - if not login_ok: - login_ok = (username == server.username and password == server.password) + if not login_ok and should_use_config_fallback: + login_ok = username == server.username and password == server.password if login_ok: server.clear_login_failures(username, remote_ip) - session['logged_in'] = True + session["logged_in"] = True session.permanent = True - session['username'] = username # 存储用户名到session - session['last_activity_at'] = time.time() - # 若账号仍在使用默认/弱口令,则登录后强提示修改密码。 - session['force_password_change'] = bool(server.should_force_password_change(username)) + session["username"] = username + session["last_activity_at"] = time.time() + # 登录成功后再读取当前账号的安全状态,决定是否强制弹出改密提示。 + session["force_password_change"] = bool(server.should_force_password_change(username)) logger.debug(f"Login successful. Session after login: {dict(session)}") if _is_ajax_request(): - return jsonify({"success": True, "redirect_url": url_for('main.index')}) - return redirect(url_for('main.index')) + return jsonify({"success": True, "redirect_url": url_for("main.index")}) + return redirect(url_for("main.index")) + + lock_status = server.mark_login_failure(username, remote_ip) + if lock_status.get("locked"): + wait_seconds = int(lock_status.get("remaining_seconds") or 0) + error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试" else: - lock_status = server.mark_login_failure(username, remote_ip) - if lock_status.get("locked"): - wait_seconds = int(lock_status.get("remaining_seconds") or 0) - error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试" - else: - remaining = max(0, int(server.get_auth_policy().get("max_failed_attempts", 5)) - int(lock_status.get("failed_count") or 0)) - error = '用户名或密码错误' - if remaining > 0: - error = f"{error},再失败 {remaining} 次将暂时锁定" - if _is_ajax_request(): - return jsonify({"success": False, "error": error}), 400 + remaining = max( + 0, + int(server.get_auth_policy().get("max_failed_attempts", 5)) - int(lock_status.get("failed_count") or 0), + ) + error = "用户名或密码错误" + if remaining > 0: + error = f"{error},再失败 {remaining} 次将暂时锁定" + if _is_ajax_request(): + return jsonify({"success": False, "error": error}), 400 - return render_template('login.html', error=error) + return render_template("login.html", error=error) -# 登出 -@auth_bp.route('/logout') +@auth_bp.route("/logout") def logout(): + """退出登录。""" session.clear() - return redirect(url_for('auth.login')) + return redirect(url_for("auth.login")) -@auth_bp.route('/api/auth/security_status', methods=['GET']) +@auth_bp.route("/api/auth/security_status", methods=["GET"]) @login_required def get_security_status(): """返回当前登录管理员的安全状态。""" - return jsonify({ - "success": True, - "data": { - "force_password_change": bool(session.get("force_password_change", False)), - "session_timeout_minutes": int(current_app.dashboard_server.get_auth_policy().get("session_timeout_minutes", 480)), + return jsonify( + { + "success": True, + "data": { + "force_password_change": bool(session.get("force_password_change", False)), + "session_timeout_minutes": int( + current_app.dashboard_server.get_auth_policy().get("session_timeout_minutes", 480) + ), + }, } - }) + ) -@auth_bp.route('/api/auth/change_password', methods=['POST']) +@auth_bp.route("/api/auth/change_password", methods=["POST"]) @login_required def change_password(): - """修改当前登录管理员密码。 - - 前端请求参数: - { - "old_password": "旧密码", - "new_password": "新密码", - "confirm_password": "确认新密码" - } - """ + """修改当前登录管理员密码。""" server = current_app.dashboard_server admin_db = getattr(server, "admin_account_db", None) if not admin_db: @@ -163,7 +173,7 @@ def change_password(): if not updated: return jsonify({"success": False, "error": "密码更新失败,请稍后重试"}), 500 - # 密码修改成功后,移除“必须修改密码”的会话提示。 + # 改密成功后立刻移除“必须修改密码”的会话提示,避免用户当前会话继续被反复打断。 session["force_password_change"] = False return jsonify({"success": True, "message": "密码修改成功"}) except Exception as e: diff --git a/admin/dashboard/server.py b/admin/dashboard/server.py index 40bf19f..ebe739e 100644 --- a/admin/dashboard/server.py +++ b/admin/dashboard/server.py @@ -1,43 +1,52 @@ # -*- coding: utf-8 -*- """ -统计看板服务器 - 使用Flask蓝图重构版 +统计看板服务器。 + +这里负责: +1. 读取 Dashboard 配置并创建 Flask 应用; +2. 初始化后台依赖对象、蓝图和静态资源路由; +3. 提供后台登录限流、弱密码提醒等通用安全能力。 """ + import os +import secrets import sys import threading import time -import secrets from datetime import timedelta import toml from flask import Flask, send_from_directory from loguru import logger -from db.contacts_db import ContactsDBOperator from db.admin_account_db import AdminAccountDBOperator +from db.contacts_db import ContactsDBOperator from db.emoji_asset_db import EmojiAssetDB +from db.fun_command_rule_db import FunCommandRuleDBOperator from db.member_context_db import MemberContextDBOperator from db.message_storage import MessageStorageDB from db.stats_db import StatsDBOperator from db.task_db import TaskDBOperator -from db.fun_command_rule_db import FunCommandRuleDBOperator from utils.fun_command_rule_service import FunCommandRuleService from wechat_ipad import WechatAPIClient -# 添加项目根目录到系统路径,确保可以导入项目模块 -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) +# 添加项目根目录到系统路径,确保可以导入项目模块。 +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))) class DashboardServer: - """统计看板服务器""" + """统计看板服务器。""" - def __init__(self, host: str = None, port: int = None, - username: str = None, password: str = None, - robot_instance=None): - # 加载配置文件 + def __init__( + self, + host: str = None, + port: int = None, + username: str = None, + password: str = None, + robot_instance=None, + ): + # 先加载配置文件,再用传入参数覆盖,保证 CLI/外部调用具备最高优先级。 self.config = self._load_dashboard_config() - - # 优先使用传入的参数,其次使用配置文件中的参数 self.host = host or self.config.get("server", {}).get("host", "0.0.0.0") self.port = port or self.config.get("server", {}).get("port", 8888) self.username = username or self.config.get("auth", {}).get("username", "admin") @@ -45,105 +54,108 @@ class DashboardServer: self.LOG = logger self.LOG.info(f"Dashboard配置加载完成: 服务器将运行在 {self.host}:{self.port}") - # 登录失败限流兜底缓存: - # 1. 优先尝试 Redis,但为了兼容 Redis 暂不可用的场景,这里保留进程内兜底; - # 2. 字典内容只保存短期登录失败窗口,不用于持久化; - # 3. 线程化 WSGI 会并发访问,因此需要显式加锁。 + + # 登录失败限流的进程内兜底缓存: + # 1. 优先用 Redis 做共享限流; + # 2. 如果 Redis 暂时不可用,至少本进程内仍有基本防护; + # 3. 这里会被并发访问,因此需要配合线程锁使用。 self._auth_runtime_lock = threading.Lock() self._auth_failures = {} - # 如果提供了robot实例,则使用其对象 - if robot_instance: - self.db_manager = robot_instance.db_manager - self.stats_db = StatsDBOperator(self.db_manager) - # Dashboard 启动可能早于 iPad 登录完成: - # 1. 此时 Robot 上的 message_storage 还没来得及绑定真实 bot; - # 2. 但后台很多页面仍然依赖消息存储与表情资产库; - # 3. 因此这里优先复用 Robot 已初始化的 message_storage,没有则再安全回退到 DB 层对象。 - self.message_storage = getattr(robot_instance, "message_storage", None) or MessageStorageDB(self.db_manager) - self.emoji_asset_db = getattr(self.message_storage, "emoji_asset_db", None) or EmojiAssetDB(self.db_manager) - self.contact_db: ContactsDBOperator = ContactsDBOperator(self.db_manager) - self.member_context_db = MemberContextDBOperator(self.db_manager) - self.task_db: TaskDBOperator = TaskDBOperator(self.db_manager) - # 后台管理员账号数据层:用于登录鉴权与修改密码。 - self.admin_account_db = AdminAccountDBOperator(self.db_manager) - self.system_job_db = robot_instance.system_job_db - self.system_job_loader = robot_instance.system_job_loader - self.plugin_schedule_db = robot_instance.plugin_schedule_db - self.plugin_schedule_manager = robot_instance.plugin_schedule_manager - self.group_plugin_config_db = robot_instance.group_plugin_config_db - self.llm_catalog_db = robot_instance.llm_catalog_db - self.group_plugin_config_service = robot_instance.group_plugin_config_service - # 趣味指令规则服务:用于“文案/事件触发多媒体玩法回复”后台配置与缓存。 - # 这里统一在 Dashboard 启动时初始化,保证管理端可直接读写规则。 - self.fun_command_rule_db = FunCommandRuleDBOperator(self.db_manager) - self.fun_command_rule_service = FunCommandRuleService( - db_operator=self.fun_command_rule_db, - redis_client=self.db_manager.get_redis_connection(), - local_ttl_seconds=30, - ) - self.fun_command_rule_service.init_tables() - self.fun_command_rule_service.refresh_cache() - # 获取联系人管理器实例 - self.contact_manager = robot_instance.contact_manager - self.plugin_manager = robot_instance.plugin_manager - self.plugin_registry = robot_instance.plugin_registry - self.client: WechatAPIClient = robot_instance.ipad_bot - self.robot = robot_instance - self.member_context_plugin = self.plugin_manager.plugins.get("成员交互摘要") - self.member_context_service = getattr(self.member_context_plugin, "service", None) - self.LOG.info("使用Robot实例的对象进行初始化") - - # 初始化后台管理员账号表,并将旧配置中的默认账号平滑迁移进数据库。 - try: - table_ok = self.admin_account_db.init_tables() - if not table_ok: - self.LOG.warning("初始化后台账号表失败,将回退旧配置账号模式") - else: - seed_ok = self.admin_account_db.ensure_default_admin(self.username, self.password, "系统管理员") - if seed_ok: - self.LOG.info("后台账号体系初始化完成(数据库账号模式已可用)") - else: - self.LOG.warning("后台账号种子初始化失败,请检查配置中的默认账号信息") - except Exception as e: - self.LOG.error(f"初始化后台账号体系失败,将回退旧配置账号模式: {e}") - else: + if not robot_instance: self.LOG.error("未提供Robot实例,Dashboard无法正常工作") raise ValueError("必须提供Robot实例") + self.db_manager = robot_instance.db_manager + self.stats_db = StatsDBOperator(self.db_manager) + + # Dashboard 启动可能早于 iPad 登录完成: + # 1. 此时 Robot 上的 message_storage 可能尚未绑定完成; + # 2. 但后台页面仍然依赖消息存储与表情资产库; + # 3. 因此优先复用 Robot 现成对象,没有时再安全回退到 DB 层对象。 + self.message_storage = getattr(robot_instance, "message_storage", None) or MessageStorageDB(self.db_manager) + self.emoji_asset_db = getattr(self.message_storage, "emoji_asset_db", None) or EmojiAssetDB(self.db_manager) + self.contact_db: ContactsDBOperator = ContactsDBOperator(self.db_manager) + self.member_context_db = MemberContextDBOperator(self.db_manager) + self.task_db: TaskDBOperator = TaskDBOperator(self.db_manager) + + # 后台管理员账号数据层:用于登录鉴权、弱密码判断和在线改密。 + self.admin_account_db = AdminAccountDBOperator(self.db_manager) + self.system_job_db = robot_instance.system_job_db + self.system_job_loader = robot_instance.system_job_loader + self.plugin_schedule_db = robot_instance.plugin_schedule_db + self.plugin_schedule_manager = robot_instance.plugin_schedule_manager + self.group_plugin_config_db = robot_instance.group_plugin_config_db + self.llm_catalog_db = robot_instance.llm_catalog_db + self.group_plugin_config_service = robot_instance.group_plugin_config_service + + # 趣味指令规则服务:用于“文案/事件触发多媒体玩法回复”的后台配置与缓存。 + self.fun_command_rule_db = FunCommandRuleDBOperator(self.db_manager) + self.fun_command_rule_service = FunCommandRuleService( + db_operator=self.fun_command_rule_db, + redis_client=self.db_manager.get_redis_connection(), + local_ttl_seconds=30, + ) + self.fun_command_rule_service.init_tables() + self.fun_command_rule_service.refresh_cache() + + # 其余运行时对象直接复用 Robot 已初始化实例,避免重复构造。 + self.contact_manager = robot_instance.contact_manager + self.plugin_manager = robot_instance.plugin_manager + self.plugin_registry = robot_instance.plugin_registry + self.client: WechatAPIClient = robot_instance.ipad_bot + self.robot = robot_instance + self.member_context_plugin = self.plugin_manager.plugins.get("成员交互摘要") + self.member_context_service = getattr(self.member_context_plugin, "service", None) + + self.LOG.info("使用Robot实例的对象进行初始化") + + # 初始化后台管理员账号表,并将旧配置中的默认账号平滑迁移进数据库。 + try: + table_ok = self.admin_account_db.init_tables() + if not table_ok: + self.LOG.warning("初始化后台账号表失败,将回退旧配置账号模式") + else: + seed_ok = self.admin_account_db.ensure_default_admin(self.username, self.password, "系统管理员") + if seed_ok: + self.LOG.info("后台账号体系初始化完成(数据库账号模式已可用)") + else: + self.LOG.warning("后台账号种子初始化失败,请检查配置中的默认账号信息") + except Exception as e: + self.LOG.error(f"初始化后台账号体系失败,将回退旧配置账号模式: {e}") + self.app = self._create_app() self._stop_event = threading.Event() - self._server = None # 存储服务器实例 + self._server = None # Werkzeug 服务实例会在 run() 时写入这里。 def _load_dashboard_config(self): - """加载Dashboard配置文件""" + """加载 Dashboard 配置文件。""" try: - config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.toml') + config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.toml") if os.path.exists(config_path): - with open(config_path, 'r', encoding='utf-8') as f: + with open(config_path, "r", encoding="utf-8") as f: return toml.load(f) - else: - # 如果配置文件不存在,创建默认配置 - default_config = { - "server": {"host": "0.0.0.0", "port": 8888}, - "auth": {"username": "admin", "password": "admin123"} - } - with open(config_path, 'w', encoding='utf-8') as f: - toml.dump(default_config, f) - return default_config + + # 如果配置文件不存在,则创建默认配置,方便本地快速启动。 + default_config = { + "server": {"host": "0.0.0.0", "port": 8888}, + "auth": {"username": "admin", "password": "admin123"}, + } + with open(config_path, "w", encoding="utf-8") as f: + toml.dump(default_config, f) + return default_config except Exception as e: self.LOG.error(f"加载Dashboard配置文件失败: {e}") - # 返回默认配置 return { "server": {"host": "0.0.0.0", "port": 8888}, - "auth": {"username": "admin", "password": "admin123"} + "auth": {"username": "admin", "password": "admin123"}, } def _create_app(self) -> Flask: - """创建Flask应用""" - # 指定模板文件夹路径 - template_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates') + """创建 Flask 应用。""" + template_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates") app = Flask(__name__, template_folder=template_folder) + auth_config = self.config.get("auth", {}) or {} session_timeout_minutes = int(auth_config.get("session_timeout_minutes", 480) or 480) cookie_secure = bool(auth_config.get("cookie_secure", False)) @@ -152,57 +164,54 @@ class DashboardServer: or auth_config.get("secret_key", "") or "" ).strip() + if configured_secret: app.secret_key = configured_secret else: - # 若未显式配置 secret_key,则每次进程启动生成随机值: - # 1. 这比固定硬编码密钥安全得多; - # 2. 代价是服务重启后旧 session 会失效,作为安全兜底是可接受的; - # 3. 同时输出 warning,提醒后续最好通过配置或环境变量固定注入。 + # 未显式配置 secret_key 时,用进程级随机密钥兜底: + # 1. 安全性优于硬编码固定值; + # 2. 服务重启后旧 session 会失效,但这是可接受的安全代价; + # 3. 同时输出 warning,提醒后续通过配置或环境变量固定注入。 app.secret_key = secrets.token_hex(32) self.LOG.warning("未配置 Dashboard secret_key,已使用进程级随机密钥,重启后现有登录会失效") - # 禁用模板缓存,使修改HTML文件后立即生效 False =重启才生效 - app.config['TEMPLATES_AUTO_RELOAD'] = True - app.config['SESSION_COOKIE_HTTPONLY'] = True - app.config['SESSION_COOKIE_SAMESITE'] = str(auth_config.get("cookie_samesite", "Lax") or "Lax") - app.config['SESSION_COOKIE_SECURE'] = cookie_secure - app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(minutes=max(15, session_timeout_minutes)) + # 关闭模板缓存,便于开发时实时看到页面修改结果。 + app.config["TEMPLATES_AUTO_RELOAD"] = True + app.config["SESSION_COOKIE_HTTPONLY"] = True + app.config["SESSION_COOKIE_SAMESITE"] = str(auth_config.get("cookie_samesite", "Lax") or "Lax") + app.config["SESSION_COOKIE_SECURE"] = cookie_secure + app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=max(15, session_timeout_minutes)) - # 设置Werkzeug日志级别为DEBUG import logging - logging.getLogger('werkzeug').setLevel(logging.ERROR) - # 将dashboard_server实例设置为app的属性 + logging.getLogger("werkzeug").setLevel(logging.ERROR) + + # 将 DashboardServer 实例挂到 app 上,方便蓝图在请求期取用。 app.dashboard_server = self - # 配置静态文件访问 - static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static') + static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static") - @app.route('/static/') + @app.route("/static/") def serve_static(filename): return send_from_directory(static_folder, filename) - # 获取项目根目录下的static/images目录 project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) images_dir = os.path.join(project_root, "static", "images") - - # 确保目录存在 os.makedirs(images_dir, exist_ok=True) - @app.route('/static/images/') + @app.route("/static/images/") def serve_images(filename): return send_from_directory(images_dir, filename) - # 添加一个路由处理favicon请求 - @app.route('/favicon.ico') + @app.route("/favicon.ico") def favicon(): - return send_from_directory(os.path.join(app.root_path, 'static'), - 'favicon.ico', mimetype='image/vnd.microsoft.icon') + return send_from_directory( + os.path.join(app.root_path, "static"), + "favicon.ico", + mimetype="image/vnd.microsoft.icon", + ) - # 注册蓝图 self._register_blueprints(app) - return app def get_auth_policy(self) -> dict: @@ -239,12 +248,19 @@ class DashboardServer: if expire_at > 0 and expire_at <= time.time(): self._auth_failures.pop(guard_key, None) record = {} + return { "count": int(record.get("count") or 0), "first_failed_at": float(record.get("first_failed_at") or 0.0), } - def _save_login_failure_record(self, guard_key: str, count: int, first_failed_at: float, ttl_seconds: int) -> None: + def _save_login_failure_record( + self, + guard_key: str, + count: int, + first_failed_at: float, + ttl_seconds: int, + ) -> None: """保存登录失败记录,优先 Redis,失败时回退到进程内缓存。""" payload = f"{int(count)}|{float(first_failed_at)}" try: @@ -306,41 +322,48 @@ class DashboardServer: def should_force_password_change(self, username: str) -> bool: """判断当前管理员是否应该被强制提示修改密码。""" + normalized_username = str(username or "").strip() admin_db = getattr(self, "admin_account_db", None) - if admin_db and admin_db.is_using_risky_password(username): - return True + if admin_db: + try: + admin_row = admin_db.get_admin_by_username(normalized_username) + # 只要数据库里已经存在该账号,就完全以数据库中的当前密码状态为准。 + # 这样可以避免用户已经在后台把密码改强后,仍被 config.toml 中的旧默认密码反复误判。 + if admin_row: + return bool(admin_db.is_using_risky_password(normalized_username)) + except Exception as e: + self.LOG.error(f"判断后台弱密码状态时出现异常,将回退配置判断: {e}") - # 数据库体系不可用时,再回退配置值判断,至少把默认 admin/admin123 识别出来。 + # 数据库体系不可用或该账号尚未迁移进数据库时,再回退配置值判断。 fallback_username = str(self.username or "").strip() fallback_password = str(self.password or "").strip() return ( - str(username or "").strip() == fallback_username + normalized_username == fallback_username and fallback_password in getattr(admin_db, "RISKY_PASSWORDS", {"admin123", "admin"}) ) def _register_blueprints(self, app): - """注册所有蓝图""" - # 在函数内部导入蓝图,避免循环导入 + """注册所有蓝图。""" + # 在函数内部导入蓝图,避免循环导入。 from admin.dashboard.blueprints.auth import auth_bp from admin.dashboard.blueprints.contacts import contacts_bp - from admin.dashboard.blueprints.robot import robot_bp + from admin.dashboard.blueprints.file_browser import file_browser_bp + from admin.dashboard.blueprints.friend_circle import friend_circle_bp + from admin.dashboard.blueprints.fun_command_rules import fun_command_rules_bp + from admin.dashboard.blueprints.group_plugin_config import group_plugin_config_bp + from admin.dashboard.blueprints.main import main_bp + from admin.dashboard.blueprints.message_push import message_push_bp from admin.dashboard.blueprints.messages import messages_bp + from admin.dashboard.blueprints.plugin_routes import plugin_routes + from admin.dashboard.blueprints.plugin_schedules import plugin_schedules_bp + from admin.dashboard.blueprints.robot import robot_bp from admin.dashboard.blueprints.stats import stats_bp from admin.dashboard.blueprints.system import system_bp - from admin.dashboard.blueprints.main import main_bp - from admin.dashboard.blueprints.plugin_routes import plugin_routes - from admin.dashboard.blueprints.virtual_group import virtual_group_bp - from admin.dashboard.blueprints.file_browser import file_browser_bp - from admin.dashboard.blueprints.message_push import message_push_bp - from admin.dashboard.blueprints.friend_circle import friend_circle_bp from admin.dashboard.blueprints.system_jobs import system_jobs_bp - from admin.dashboard.blueprints.plugin_schedules import plugin_schedules_bp - from admin.dashboard.blueprints.group_plugin_config import group_plugin_config_bp - from admin.dashboard.blueprints.fun_command_rules import fun_command_rules_bp from admin.dashboard.blueprints.trendradar_webhook import trendradar_webhook_bp + from admin.dashboard.blueprints.virtual_group import virtual_group_bp - # 在app.register_blueprint部分添加 - app.register_blueprint(virtual_group_bp, url_prefix='/virtual_group') + app.register_blueprint(virtual_group_bp, url_prefix="/virtual_group") app.register_blueprint(auth_bp) app.register_blueprint(main_bp) app.register_blueprint(contacts_bp) @@ -361,16 +384,16 @@ class DashboardServer: self.LOG.info("所有蓝图已注册") def run(self): - """运行服务器""" + """运行服务器。""" from werkzeug.serving import make_server - # 设置Werkzeug日志级别为DEBUG import logging - logging.getLogger('werkzeug').setLevel(logging.ERROR) + + logging.getLogger("werkzeug").setLevel(logging.ERROR) self.LOG.info(f"启动服务器: {self.host}:{self.port}") try: - # Dashboard 存在文件浏览、统计查询等慢请求,单线程 WSGI 一旦被占住会导致整个后台无响应。 - # 改为 threaded server,避免某个接口阻塞后拖死所有页面访问。 + # Dashboard 存在文件浏览、统计查询等慢请求,单线程 WSGI 一旦被占住会拖死整个后台。 + # 改为 threaded server 可以避免某个接口阻塞时所有页面一起无响应。 self._server = make_server(self.host, self.port, self.app, threaded=True) self._server.serve_forever() except Exception as e: @@ -378,26 +401,22 @@ class DashboardServer: self._stop_event.set() def stop(self): - """停止服务器""" + """停止服务器。""" self.LOG.info("正在停止服务器...") self._stop_event.set() - # 使用werkzeug服务器的关闭方法 if self._server: self._server.shutdown() self.LOG.info("服务器已停止") def get_current_user_info(self): - """获取当前登录的微信用户信息""" + """获取当前登录的微信用户信息。""" try: if not self.client: self.LOG.error("client实例不可用,无法获取当前用户信息") return {"success": False, "message": "实例不可用"} - # 获取当前登录的微信ID - - # 从新的resp格式中获取用户信息 try: if self.robot is None: raise ValueError("机器人对象未初始化") @@ -406,7 +425,7 @@ class DashboardServer: "nickName": getattr(self.robot, "nickname", ""), "mobile": getattr(self.robot, "phone", ""), "smallHeadImgUrl": getattr(self.robot, "head_image", ""), - "signature": getattr(self.robot, "signature", "") + "signature": getattr(self.robot, "signature", ""), } except (AttributeError, ValueError) as e: print(f"获取用户信息出错: {str(e)}") @@ -415,7 +434,7 @@ class DashboardServer: "nickName": self.robot.nickname, "mobile": self.robot.phone, "smallHeadImgUrl": self.robot.head_image, - "signature": self.robot.signature + "signature": self.robot.signature, } if not user_data: @@ -426,11 +445,11 @@ class DashboardServer: "data": { "wx_id": user_data.get("wxid", ""), "nickname": user_data.get("nickName", "未知用户"), - "avatar": user_data.get("smallHeadImgUrl", "logo.png"), # 使用小头像URL + "avatar": user_data.get("smallHeadImgUrl", "logo.png"), # 使用小头像 URL。 "mobile": user_data.get("mobile", ""), - "home": f"{user_data.get('province', '')}-{user_data.get('city', '')}", # 组合省市信息 - "signature": user_data.get("signature", "")[:10] - } + "home": f"{user_data.get('province', '')}-{user_data.get('city', '')}", # 组合省市信息。 + "signature": user_data.get("signature", "")[:10], + }, } except Exception as e: self.LOG.error(f"获取当前用户信息失败: {e}") diff --git a/db/admin_account_db.py b/db/admin_account_db.py index 86b6a17..5fdab6d 100644 --- a/db/admin_account_db.py +++ b/db/admin_account_db.py @@ -3,9 +3,10 @@ 后台管理员账号数据访问层。 设计目标: -1. 用数据库表承载后台账号,替代“固定配置文件账号密码”; -2. 提供安全的密码散列存储与校验能力; -3. 支持登录成功后的登录信息回写与在线修改密码。 +1. 用数据库表承载后台账号,逐步替代固定配置文件中的账号密码; +2. 提供安全的密码哈希存储与校验能力; +3. 支持登录成功后的登录信息回写,以及在线修改密码; +4. 兼容历史部署,把旧配置中的管理员密码平滑迁移到数据库体系。 """ import base64 @@ -21,14 +22,14 @@ from db.base import BaseDBOperator class AdminAccountDBOperator(BaseDBOperator): """后台管理员账号数据访问对象。""" - # 口令哈希算法版本前缀,便于将来平滑升级算法。 + # 口令哈希算法前缀,便于后续平滑升级算法。 HASH_SCHEME = "pbkdf2_sha256" - # PBKDF2 迭代次数:在安全性与计算开销之间做平衡。 + # PBKDF2 迭代次数,在安全性和计算开销之间做平衡。 HASH_ITERATIONS = 150_000 - # 风险口令清单: - # 1. 这里优先覆盖系统默认口令和常见极弱口令; - # 2. 后台安全判断只需要识别“明显危险”的情况,不追求做成完整密码字典; - # 3. 统一放在数据层,便于登录鉴权、修改密码、首登提醒共用。 + # 已知高风险密码列表: + # 1. 这里只覆盖默认密码和常见弱密码,不做完整字典; + # 2. 后台安全提醒只需要识别“明显高风险”的情况即可; + # 3. 统一收敛在数据层,方便登录、改密、首次提醒共用。 RISKY_PASSWORDS = { "admin", "admin123", @@ -40,10 +41,7 @@ class AdminAccountDBOperator(BaseDBOperator): } def init_tables(self) -> bool: - """初始化后台管理员表。 - - 表名使用 t_admin_ 前缀,满足后台账号体系命名约定。 - """ + """初始化后台管理员账号表。""" sql = """ CREATE TABLE IF NOT EXISTS t_admin_accounts ( id BIGINT AUTO_INCREMENT PRIMARY KEY, @@ -74,12 +72,17 @@ class AdminAccountDBOperator(BaseDBOperator): ) def ensure_default_admin(self, username: str, password: str, display_name: str = "系统管理员") -> bool: - """确保默认管理员存在。 + """确保默认管理员存在,并在安全前提下补齐旧配置密码迁移。 - 行为约束: - 1. 若用户名已存在,不覆盖既有密码; - 2. 仅在“表里不存在该账号”时创建初始账号; - 3. 方便从旧配置平滑迁移到数据库账号体系。 + 这里的迁移策略分三层: + 1. 数据库里还没有管理员账号时,按当前配置创建初始账号; + 2. 数据库里已经有账号,且密码本来就和当前配置一致时,直接视为已同步; + 3. 数据库里仍是历史默认弱密码,但配置里已经换成了更强密码时, + 自动把强密码同步进数据库,避免登录和弱密码提示长期错位。 + + 注意: + 只有在“数据库当前密码仍然是已知弱密码,而传入配置密码不是弱密码”时, + 才允许用配置回写数据库;如果用户已经在后台主动改过密码,则继续以数据库为准。 """ normalized_username = str(username or "").strip() normalized_password = str(password or "").strip() @@ -87,17 +90,25 @@ class AdminAccountDBOperator(BaseDBOperator): return False existing = self.get_admin_by_username(normalized_username) - if existing: + if not existing: + password_hash = self.hash_password(normalized_password) + return self.execute_update( + """ + INSERT INTO t_admin_accounts (username, password_hash, display_name, status) + VALUES (%s, %s, %s, 1) + """, + (normalized_username, password_hash, str(display_name or "").strip() or normalized_username), + ) + + stored_hash = str(existing.get("password_hash") or "") + if stored_hash and self.verify_password(normalized_password, stored_hash): return True - password_hash = self.hash_password(normalized_password) - return self.execute_update( - """ - INSERT INTO t_admin_accounts (username, password_hash, display_name, status) - VALUES (%s, %s, %s, 1) - """, - (normalized_username, password_hash, str(display_name or "").strip() or normalized_username), - ) + current_password_is_risky = self.is_password_hash_using_risky_password(stored_hash) + incoming_password_is_risky = normalized_password.lower() in self.RISKY_PASSWORDS + if current_password_is_risky and not incoming_password_is_risky: + return self.update_password(normalized_username, normalized_password) + return True def verify_admin_password(self, username: str, password: str) -> bool: """校验账号口令是否正确。""" @@ -135,17 +146,22 @@ class AdminAccountDBOperator(BaseDBOperator): ) def is_using_risky_password(self, username: str) -> bool: - """判断指定管理员是否仍在使用风险口令。""" + """判断指定管理员是否仍在使用已知弱密码。""" row = self.get_admin_by_username(username) if not row: return False stored_hash = str(row.get("password_hash") or "") + return self.is_password_hash_using_risky_password(stored_hash) + + @classmethod + def is_password_hash_using_risky_password(cls, stored_hash: str) -> bool: + """判断一个已存储密码哈希是否仍然对应已知弱密码。""" if not stored_hash: return False - # 口令是哈希存储的,因此只能把风险候选集逐个比对。 - for candidate in self.RISKY_PASSWORDS: - if self.verify_password(candidate, stored_hash): + # 哈希无法反解,这里只能把已知高风险候选值逐个比对。 + for candidate in cls.RISKY_PASSWORDS: + if cls.verify_password(candidate, stored_hash): return True return False @@ -154,10 +170,10 @@ class AdminAccountDBOperator(BaseDBOperator): """校验密码强度,返回错误提示;通过时返回 None。""" password_text = str(raw_password or "") if len(password_text) < 8: - return "新密码长度不能少于8位" + return "新密码长度不能少于 8 位" if password_text.lower() in cls.RISKY_PASSWORDS: - return "新密码过于简单,请避免使用默认口令或常见弱口令" + return "新密码过于简单,请避免使用默认口令或常见弱密码" score = 0 if re.search(r"[A-Za-z]", password_text): @@ -167,7 +183,7 @@ class AdminAccountDBOperator(BaseDBOperator): if re.search(r"[^A-Za-z0-9]", password_text): score += 1 - # 至少满足两类字符,既兼顾安全性,也避免把规则设得过于苛刻。 + # 至少满足两类字符,既兼顾安全性,也避免把规则设置得过于苛刻。 if score < 2: return "新密码需至少包含字母、数字、符号中的两类" return None @@ -177,7 +193,7 @@ class AdminAccountDBOperator(BaseDBOperator): """生成口令哈希。 存储格式: - pbkdf2_sha256$迭代次数$盐(HEX)$哈希(base64) + pbkdf2_sha256$迭代次数$盐值HEX$摘要base64 """ password_text = str(raw_password or "") salt_bytes = secrets.token_bytes(16) @@ -197,7 +213,7 @@ class AdminAccountDBOperator(BaseDBOperator): 安全细节: 1. 使用 hmac.compare_digest,避免时序侧信道问题; - 2. 对格式异常统一返回 False,避免抛错打断登录流程。 + 2. 对格式异常统一返回 False,避免异常中断登录流程。 """ try: scheme, iterations_text, salt_hex, digest_b64 = str(stored_hash or "").split("$", 3) diff --git a/test/test_dashboard_auth_logic.py b/test/test_dashboard_auth_logic.py new file mode 100644 index 0000000..b83acb1 --- /dev/null +++ b/test/test_dashboard_auth_logic.py @@ -0,0 +1,171 @@ +import unittest + +from flask import Blueprint, Flask + +from admin.dashboard.blueprints.auth import auth_bp +from admin.dashboard.server import DashboardServer +from db.admin_account_db import AdminAccountDBOperator + + +class DummyAdminDB: + """用于回归测试后台登录与弱密码判定的最小桩对象。""" + + RISKY_PASSWORDS = AdminAccountDBOperator.RISKY_PASSWORDS + + def __init__(self, row_exists: bool, db_password: str, risky: bool): + self.row_exists = row_exists + self.db_password = db_password + self.risky = risky + self.login_success_marked = False + + def get_admin_by_username(self, username: str): + if not self.row_exists: + return None + return { + "username": username, + "status": 1, + "password_hash": self.db_password, + } + + def verify_password(self, raw_password: str, stored_hash: str) -> bool: + # 这里不引入真实哈希算法,直接把“数据库里当前有效的密码”抽象成 db_password, + # 只验证登录流程是否仍然错误地回退到了 config.toml。 + return raw_password == self.db_password + + def is_using_risky_password(self, username: str) -> bool: + return self.risky + + def mark_login_success(self, username: str, login_ip: str = "") -> bool: + self.login_success_marked = True + return True + + +class DummyServer: + """为 auth 蓝图提供最小运行依赖,避免把整套 Dashboard 初始化起来。""" + + def __init__(self, username: str, password: str, admin_db: DummyAdminDB): + self.username = username + self.password = password + self.admin_account_db = admin_db + + def get_login_lock_status(self, username: str, remote_ip: str) -> dict: + return {"locked": False, "remaining_seconds": 0, "failed_count": 0} + + def get_auth_policy(self) -> dict: + return {"max_failed_attempts": 5, "lock_seconds": 900, "session_timeout_minutes": 480} + + def clear_login_failures(self, username: str, remote_ip: str) -> None: + return None + + def mark_login_failure(self, username: str, remote_ip: str) -> dict: + return {"locked": False, "remaining_seconds": 0, "failed_count": 1} + + def should_force_password_change(self, username: str) -> bool: + temp_server = DashboardServer.__new__(DashboardServer) + temp_server.username = self.username + temp_server.password = self.password + temp_server.admin_account_db = self.admin_account_db + temp_server.LOG = None + return DashboardServer.should_force_password_change(temp_server, username) + + +class FakeAdminAccountDBOperator(AdminAccountDBOperator): + """只覆盖 ensure_default_admin 测试所需的方法,避免真实数据库依赖。""" + + def __init__(self, existing_row=None): + super().__init__(db_manager=None) + self.existing_row = existing_row + self.updated_password = None + self.inserted = False + + def get_admin_by_username(self, username: str): + return self.existing_row + + @classmethod + def verify_password(cls, raw_password: str, stored_hash: str) -> bool: + # 测试里直接把 password_hash 当成明文占位值,重点验证迁移分支是否被正确触发。 + return raw_password == stored_hash + + def update_password(self, username: str, new_password: str) -> bool: + self.updated_password = (username, new_password) + return True + + def execute_update(self, sql: str, params=None) -> bool: + self.inserted = True + return True + + +class DashboardAuthLogicTestCase(unittest.TestCase): + def create_app(self, server: DummyServer) -> Flask: + app = Flask(__name__) + app.secret_key = "test-secret" + app.dashboard_server = server + + main_bp = Blueprint("main", __name__) + + @main_bp.route("/") + def index(): + return "ok" + + app.register_blueprint(auth_bp) + app.register_blueprint(main_bp) + return app + + def test_should_force_password_change_ignores_legacy_config_when_db_password_is_strong(self): + server = DummyServer( + username="admin", + password="admin123", + admin_db=DummyAdminDB(row_exists=True, db_password="StrongPass!2026", risky=False), + ) + + self.assertFalse(server.should_force_password_change("admin")) + + def test_login_rejects_legacy_config_password_after_db_password_changed(self): + server = DummyServer( + username="admin", + password="admin123", + admin_db=DummyAdminDB(row_exists=True, db_password="StrongPass!2026", risky=False), + ) + app = self.create_app(server) + + with app.test_client() as client: + response = client.post( + "/login", + data={"username": "admin", "password": "admin123"}, + headers={"X-Requested-With": "XMLHttpRequest"}, + ) + + self.assertEqual(response.status_code, 400) + self.assertFalse(server.admin_account_db.login_success_marked) + + def test_login_keeps_legacy_config_fallback_when_db_account_missing(self): + server = DummyServer( + username="admin", + password="admin123", + admin_db=DummyAdminDB(row_exists=False, db_password="", risky=False), + ) + app = self.create_app(server) + + with app.test_client() as client: + response = client.post( + "/login", + data={"username": "admin", "password": "admin123"}, + headers={"X-Requested-With": "XMLHttpRequest"}, + ) + + self.assertEqual(response.status_code, 200) + self.assertTrue(response.get_json()["success"]) + + def test_ensure_default_admin_syncs_strong_config_password_over_risky_seed(self): + operator = FakeAdminAccountDBOperator( + existing_row={"username": "admin", "password_hash": "admin123", "status": 1} + ) + + result = operator.ensure_default_admin("admin", "StrongPass!2026") + + self.assertTrue(result) + self.assertEqual(operator.updated_password, ("admin", "StrongPass!2026")) + + +if __name__ == "__main__": + unittest.main()