import unittest from flask import Blueprint, Flask from admin.dashboard.blueprints.auth import auth_bp from admin.dashboard.server import DashboardServer from db.admin_account_db import AdminAccountDBOperator class DummyAdminDB: """用于回归测试后台登录与弱密码判定的最小桩对象。""" RISKY_PASSWORDS = AdminAccountDBOperator.RISKY_PASSWORDS def __init__(self, row_exists: bool, db_password: str, risky: bool): self.row_exists = row_exists self.db_password = db_password self.risky = risky self.login_success_marked = False def get_admin_by_username(self, username: str): if not self.row_exists: return None return { "username": username, "status": 1, "password_hash": self.db_password, } def verify_password(self, raw_password: str, stored_hash: str) -> bool: # 这里不引入真实哈希算法,直接把“数据库里当前有效的密码”抽象成 db_password, # 只验证登录流程是否仍然错误地回退到了 config.toml。 return raw_password == self.db_password def is_using_risky_password(self, username: str) -> bool: return self.risky def mark_login_success(self, username: str, login_ip: str = "") -> bool: self.login_success_marked = True return True class DummyServer: """为 auth 蓝图提供最小运行依赖,避免把整套 Dashboard 初始化起来。""" def __init__(self, username: str, password: str, admin_db: DummyAdminDB): self.username = username self.password = password self.admin_account_db = admin_db def get_login_lock_status(self, username: str, remote_ip: str) -> dict: return {"locked": False, "remaining_seconds": 0, "failed_count": 0} def get_auth_policy(self) -> dict: return {"max_failed_attempts": 5, "lock_seconds": 900, "session_timeout_minutes": 480} def clear_login_failures(self, username: str, remote_ip: str) -> None: return None def mark_login_failure(self, username: str, remote_ip: str) -> dict: return {"locked": False, "remaining_seconds": 0, "failed_count": 1} def should_force_password_change(self, username: str) -> bool: temp_server = DashboardServer.__new__(DashboardServer) temp_server.username = self.username temp_server.password = self.password temp_server.admin_account_db = self.admin_account_db temp_server.LOG = None return DashboardServer.should_force_password_change(temp_server, username) class FakeAdminAccountDBOperator(AdminAccountDBOperator): """只覆盖 ensure_default_admin 测试所需的方法,避免真实数据库依赖。""" def __init__(self, existing_row=None): super().__init__(db_manager=None) self.existing_row = existing_row self.updated_password = None self.inserted = False def get_admin_by_username(self, username: str): return self.existing_row @classmethod def verify_password(cls, raw_password: str, stored_hash: str) -> bool: # 测试里直接把 password_hash 当成明文占位值,重点验证迁移分支是否被正确触发。 return raw_password == stored_hash def update_password(self, username: str, new_password: str) -> bool: self.updated_password = (username, new_password) return True def execute_update(self, sql: str, params=None) -> bool: self.inserted = True return True class DashboardAuthLogicTestCase(unittest.TestCase): def create_app(self, server: DummyServer) -> Flask: app = Flask(__name__) app.secret_key = "test-secret" app.dashboard_server = server main_bp = Blueprint("main", __name__) @main_bp.route("/") def index(): return "ok" app.register_blueprint(auth_bp) app.register_blueprint(main_bp) return app def test_should_force_password_change_ignores_legacy_config_when_db_password_is_strong(self): server = DummyServer( username="admin", password="admin123", admin_db=DummyAdminDB(row_exists=True, db_password="StrongPass!2026", risky=False), ) self.assertFalse(server.should_force_password_change("admin")) def test_login_rejects_legacy_config_password_after_db_password_changed(self): server = DummyServer( username="admin", password="admin123", admin_db=DummyAdminDB(row_exists=True, db_password="StrongPass!2026", risky=False), ) app = self.create_app(server) with app.test_client() as client: response = client.post( "/login", data={"username": "admin", "password": "admin123"}, headers={"X-Requested-With": "XMLHttpRequest"}, ) self.assertEqual(response.status_code, 400) self.assertFalse(server.admin_account_db.login_success_marked) def test_login_keeps_legacy_config_fallback_when_db_account_missing(self): server = DummyServer( username="admin", password="admin123", admin_db=DummyAdminDB(row_exists=False, db_password="", risky=False), ) app = self.create_app(server) with app.test_client() as client: response = client.post( "/login", data={"username": "admin", "password": "admin123"}, headers={"X-Requested-With": "XMLHttpRequest"}, ) self.assertEqual(response.status_code, 200) self.assertTrue(response.get_json()["success"]) def test_ensure_default_admin_syncs_strong_config_password_over_risky_seed(self): operator = FakeAdminAccountDBOperator( existing_row={"username": "admin", "password_hash": "admin123", "status": 1} ) result = operator.ensure_default_admin("admin", "StrongPass!2026") self.assertTrue(result) self.assertEqual(operator.updated_password, ("admin", "StrongPass!2026")) if __name__ == "__main__": unittest.main()