1. 修复数据库账号存在时仍回退 config.toml 判断,导致每次登录重复提示弱密码的问题。 2. 补齐默认管理员密码从旧配置迁移到数据库的同步逻辑,兼容历史部署。 3. 恢复 server.py 为可读 UTF-8 中文版本,并补充后台登录与弱密码判定的回归测试。
172 lines
6.1 KiB
Python
172 lines
6.1 KiB
Python
import unittest
|
||
|
||
from flask import Blueprint, Flask
|
||
|
||
from admin.dashboard.blueprints.auth import auth_bp
|
||
from admin.dashboard.server import DashboardServer
|
||
from db.admin_account_db import AdminAccountDBOperator
|
||
|
||
|
||
class DummyAdminDB:
|
||
"""用于回归测试后台登录与弱密码判定的最小桩对象。"""
|
||
|
||
RISKY_PASSWORDS = AdminAccountDBOperator.RISKY_PASSWORDS
|
||
|
||
def __init__(self, row_exists: bool, db_password: str, risky: bool):
|
||
self.row_exists = row_exists
|
||
self.db_password = db_password
|
||
self.risky = risky
|
||
self.login_success_marked = False
|
||
|
||
def get_admin_by_username(self, username: str):
|
||
if not self.row_exists:
|
||
return None
|
||
return {
|
||
"username": username,
|
||
"status": 1,
|
||
"password_hash": self.db_password,
|
||
}
|
||
|
||
def verify_password(self, raw_password: str, stored_hash: str) -> bool:
|
||
# 这里不引入真实哈希算法,直接把“数据库里当前有效的密码”抽象成 db_password,
|
||
# 只验证登录流程是否仍然错误地回退到了 config.toml。
|
||
return raw_password == self.db_password
|
||
|
||
def is_using_risky_password(self, username: str) -> bool:
|
||
return self.risky
|
||
|
||
def mark_login_success(self, username: str, login_ip: str = "") -> bool:
|
||
self.login_success_marked = True
|
||
return True
|
||
|
||
|
||
class DummyServer:
|
||
"""为 auth 蓝图提供最小运行依赖,避免把整套 Dashboard 初始化起来。"""
|
||
|
||
def __init__(self, username: str, password: str, admin_db: DummyAdminDB):
|
||
self.username = username
|
||
self.password = password
|
||
self.admin_account_db = admin_db
|
||
|
||
def get_login_lock_status(self, username: str, remote_ip: str) -> dict:
|
||
return {"locked": False, "remaining_seconds": 0, "failed_count": 0}
|
||
|
||
def get_auth_policy(self) -> dict:
|
||
return {"max_failed_attempts": 5, "lock_seconds": 900, "session_timeout_minutes": 480}
|
||
|
||
def clear_login_failures(self, username: str, remote_ip: str) -> None:
|
||
return None
|
||
|
||
def mark_login_failure(self, username: str, remote_ip: str) -> dict:
|
||
return {"locked": False, "remaining_seconds": 0, "failed_count": 1}
|
||
|
||
def should_force_password_change(self, username: str) -> bool:
|
||
temp_server = DashboardServer.__new__(DashboardServer)
|
||
temp_server.username = self.username
|
||
temp_server.password = self.password
|
||
temp_server.admin_account_db = self.admin_account_db
|
||
temp_server.LOG = None
|
||
return DashboardServer.should_force_password_change(temp_server, username)
|
||
|
||
|
||
class FakeAdminAccountDBOperator(AdminAccountDBOperator):
|
||
"""只覆盖 ensure_default_admin 测试所需的方法,避免真实数据库依赖。"""
|
||
|
||
def __init__(self, existing_row=None):
|
||
super().__init__(db_manager=None)
|
||
self.existing_row = existing_row
|
||
self.updated_password = None
|
||
self.inserted = False
|
||
|
||
def get_admin_by_username(self, username: str):
|
||
return self.existing_row
|
||
|
||
@classmethod
|
||
def verify_password(cls, raw_password: str, stored_hash: str) -> bool:
|
||
# 测试里直接把 password_hash 当成明文占位值,重点验证迁移分支是否被正确触发。
|
||
return raw_password == stored_hash
|
||
|
||
def update_password(self, username: str, new_password: str) -> bool:
|
||
self.updated_password = (username, new_password)
|
||
return True
|
||
|
||
def execute_update(self, sql: str, params=None) -> bool:
|
||
self.inserted = True
|
||
return True
|
||
|
||
|
||
class DashboardAuthLogicTestCase(unittest.TestCase):
|
||
def create_app(self, server: DummyServer) -> Flask:
|
||
app = Flask(__name__)
|
||
app.secret_key = "test-secret"
|
||
app.dashboard_server = server
|
||
|
||
main_bp = Blueprint("main", __name__)
|
||
|
||
@main_bp.route("/")
|
||
def index():
|
||
return "ok"
|
||
|
||
app.register_blueprint(auth_bp)
|
||
app.register_blueprint(main_bp)
|
||
return app
|
||
|
||
def test_should_force_password_change_ignores_legacy_config_when_db_password_is_strong(self):
|
||
server = DummyServer(
|
||
username="admin",
|
||
password="admin123",
|
||
admin_db=DummyAdminDB(row_exists=True, db_password="StrongPass!2026", risky=False),
|
||
)
|
||
|
||
self.assertFalse(server.should_force_password_change("admin"))
|
||
|
||
def test_login_rejects_legacy_config_password_after_db_password_changed(self):
|
||
server = DummyServer(
|
||
username="admin",
|
||
password="admin123",
|
||
admin_db=DummyAdminDB(row_exists=True, db_password="StrongPass!2026", risky=False),
|
||
)
|
||
app = self.create_app(server)
|
||
|
||
with app.test_client() as client:
|
||
response = client.post(
|
||
"/login",
|
||
data={"username": "admin", "password": "admin123"},
|
||
headers={"X-Requested-With": "XMLHttpRequest"},
|
||
)
|
||
|
||
self.assertEqual(response.status_code, 400)
|
||
self.assertFalse(server.admin_account_db.login_success_marked)
|
||
|
||
def test_login_keeps_legacy_config_fallback_when_db_account_missing(self):
|
||
server = DummyServer(
|
||
username="admin",
|
||
password="admin123",
|
||
admin_db=DummyAdminDB(row_exists=False, db_password="", risky=False),
|
||
)
|
||
app = self.create_app(server)
|
||
|
||
with app.test_client() as client:
|
||
response = client.post(
|
||
"/login",
|
||
data={"username": "admin", "password": "admin123"},
|
||
headers={"X-Requested-With": "XMLHttpRequest"},
|
||
)
|
||
|
||
self.assertEqual(response.status_code, 200)
|
||
self.assertTrue(response.get_json()["success"])
|
||
|
||
def test_ensure_default_admin_syncs_strong_config_password_over_risky_seed(self):
|
||
operator = FakeAdminAccountDBOperator(
|
||
existing_row={"username": "admin", "password_hash": "admin123", "status": 1}
|
||
)
|
||
|
||
result = operator.ensure_default_admin("admin", "StrongPass!2026")
|
||
|
||
self.assertTrue(result)
|
||
self.assertEqual(operator.updated_password, ("admin", "StrongPass!2026"))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
unittest.main()
|