Files
abot/db/admin_account_db.py
liuwei f438f0f955 后台账号体系改造:接入t_admin_数据库账号与前端改密
变更项:
1. 新增 db/admin_account_db.py,提供 t_admin_accounts 表初始化、PBKDF2口令哈希、登录校验、登录信息回写与密码更新能力。
2. DashboardServer 启动时接入账号数据层,自动建表并把旧配置默认账号迁移为数据库账号种子。
3. 重构 auth 登录逻辑:优先走数据库账号鉴权,保留旧配置账号回退;新增 /api/auth/change_password 接口支持在线修改密码。
4. base.html 增加顶部修改密码入口与弹窗表单,前端可直接提交旧密码与新密码完成改密。
5. login.html 增强小屏适配:允许纵向滚动、768以下隐藏展示侧栏并优化输入区间距与字号,修复移动端登录体验。
6. 新增迁移脚本 db/scripts/migrations/20260423_add_admin_account_table.sql,便于独立数据库升级。
2026-04-23 09:09:19 +08:00

167 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
后台管理员账号数据访问层。
设计目标:
1. 用数据库表承载后台账号,替代“固定配置文件账号密码”;
2. 提供安全的密码散列存储与校验能力;
3. 支持登录成功后的登录信息回写与在线修改密码。
"""
import base64
import hashlib
import hmac
import secrets
from typing import Any, Dict, Optional
from db.base import BaseDBOperator
class AdminAccountDBOperator(BaseDBOperator):
"""后台管理员账号数据访问对象。"""
# 口令哈希算法版本前缀,便于将来平滑升级算法。
HASH_SCHEME = "pbkdf2_sha256"
# PBKDF2 迭代次数:在安全性与计算开销之间做平衡。
HASH_ITERATIONS = 150_000
def init_tables(self) -> bool:
"""初始化后台管理员表。
表名使用 t_admin_ 前缀,满足后台账号体系命名约定。
"""
sql = """
CREATE TABLE IF NOT EXISTS t_admin_accounts (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(64) NOT NULL COMMENT '登录用户名',
password_hash VARCHAR(255) NOT NULL COMMENT '口令哈希',
display_name VARCHAR(64) NULL COMMENT '展示名称',
status TINYINT NOT NULL DEFAULT 1 COMMENT '状态1启用0禁用',
last_login_at DATETIME NULL COMMENT '最近登录时间',
last_login_ip VARCHAR(64) NULL COMMENT '最近登录IP',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY uk_admin_username (username)
) COMMENT='后台管理员账号表'
"""
return self.execute_update(sql)
def get_admin_by_username(self, username: str) -> Optional[Dict[str, Any]]:
"""按用户名读取管理员信息。"""
return self.execute_query(
"""
SELECT id, username, password_hash, display_name, status, last_login_at, last_login_ip
FROM t_admin_accounts
WHERE username = %s
LIMIT 1
""",
(str(username or "").strip(),),
fetch_one=True,
)
def ensure_default_admin(self, username: str, password: str, display_name: str = "系统管理员") -> bool:
"""确保默认管理员存在。
行为约束:
1. 若用户名已存在,不覆盖既有密码;
2. 仅在“表里不存在该账号”时创建初始账号;
3. 方便从旧配置平滑迁移到数据库账号体系。
"""
normalized_username = str(username or "").strip()
normalized_password = str(password or "").strip()
if not normalized_username or not normalized_password:
return False
existing = self.get_admin_by_username(normalized_username)
if existing:
return True
password_hash = self.hash_password(normalized_password)
return self.execute_update(
"""
INSERT INTO t_admin_accounts (username, password_hash, display_name, status)
VALUES (%s, %s, %s, 1)
""",
(normalized_username, password_hash, str(display_name or "").strip() or normalized_username),
)
def verify_admin_password(self, username: str, password: str) -> bool:
"""校验账号口令是否正确。"""
row = self.get_admin_by_username(username)
if not row:
return False
if int(row.get("status") or 0) != 1:
return False
password_hash = str(row.get("password_hash") or "")
return self.verify_password(password, password_hash)
def mark_login_success(self, username: str, login_ip: str = "") -> bool:
"""记录登录成功信息。"""
return self.execute_update(
"""
UPDATE t_admin_accounts
SET last_login_at = NOW(),
last_login_ip = %s
WHERE username = %s
""",
(str(login_ip or "").strip(), str(username or "").strip()),
)
def update_password(self, username: str, new_password: str) -> bool:
"""更新指定用户口令。"""
password_hash = self.hash_password(new_password)
return self.execute_update(
"""
UPDATE t_admin_accounts
SET password_hash = %s
WHERE username = %s
AND status = 1
""",
(password_hash, str(username or "").strip()),
)
@classmethod
def hash_password(cls, raw_password: str) -> str:
"""生成口令哈希。
存储格式:
pbkdf2_sha256$迭代次数$盐(HEX)$哈希(base64)
"""
password_text = str(raw_password or "")
salt_bytes = secrets.token_bytes(16)
digest = hashlib.pbkdf2_hmac(
"sha256",
password_text.encode("utf-8"),
salt_bytes,
cls.HASH_ITERATIONS,
)
salt_hex = salt_bytes.hex()
digest_b64 = base64.b64encode(digest).decode("utf-8")
return f"{cls.HASH_SCHEME}${cls.HASH_ITERATIONS}${salt_hex}${digest_b64}"
@classmethod
def verify_password(cls, raw_password: str, stored_hash: str) -> bool:
"""校验口令哈希。
安全细节:
1. 使用 hmac.compare_digest避免时序侧信道问题
2. 对格式异常统一返回 False避免抛错打断登录流程。
"""
try:
scheme, iterations_text, salt_hex, digest_b64 = str(stored_hash or "").split("$", 3)
if scheme != cls.HASH_SCHEME:
return False
iterations = int(iterations_text)
salt_bytes = bytes.fromhex(salt_hex)
expected_digest = base64.b64decode(digest_b64.encode("utf-8"))
actual_digest = hashlib.pbkdf2_hmac(
"sha256",
str(raw_password or "").encode("utf-8"),
salt_bytes,
iterations,
)
return hmac.compare_digest(actual_digest, expected_digest)
except Exception:
return False