# -*- coding: utf-8 -*- """ 后台管理员账号数据访问层。 设计目标: 1. 用数据库表承载后台账号,替代“固定配置文件账号密码”; 2. 提供安全的密码散列存储与校验能力; 3. 支持登录成功后的登录信息回写与在线修改密码。 """ import base64 import hashlib import hmac import secrets from typing import Any, Dict, Optional from db.base import BaseDBOperator class AdminAccountDBOperator(BaseDBOperator): """后台管理员账号数据访问对象。""" # 口令哈希算法版本前缀,便于将来平滑升级算法。 HASH_SCHEME = "pbkdf2_sha256" # PBKDF2 迭代次数:在安全性与计算开销之间做平衡。 HASH_ITERATIONS = 150_000 def init_tables(self) -> bool: """初始化后台管理员表。 表名使用 t_admin_ 前缀,满足后台账号体系命名约定。 """ sql = """ CREATE TABLE IF NOT EXISTS t_admin_accounts ( id BIGINT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(64) NOT NULL COMMENT '登录用户名', password_hash VARCHAR(255) NOT NULL COMMENT '口令哈希', display_name VARCHAR(64) NULL COMMENT '展示名称', status TINYINT NOT NULL DEFAULT 1 COMMENT '状态:1启用,0禁用', last_login_at DATETIME NULL COMMENT '最近登录时间', last_login_ip VARCHAR(64) NULL COMMENT '最近登录IP', create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', UNIQUE KEY uk_admin_username (username) ) COMMENT='后台管理员账号表' """ return self.execute_update(sql) def get_admin_by_username(self, username: str) -> Optional[Dict[str, Any]]: """按用户名读取管理员信息。""" return self.execute_query( """ SELECT id, username, password_hash, display_name, status, last_login_at, last_login_ip FROM t_admin_accounts WHERE username = %s LIMIT 1 """, (str(username or "").strip(),), fetch_one=True, ) def ensure_default_admin(self, username: str, password: str, display_name: str = "系统管理员") -> bool: """确保默认管理员存在。 行为约束: 1. 若用户名已存在,不覆盖既有密码; 2. 仅在“表里不存在该账号”时创建初始账号; 3. 方便从旧配置平滑迁移到数据库账号体系。 """ normalized_username = str(username or "").strip() normalized_password = str(password or "").strip() if not normalized_username or not normalized_password: return False existing = self.get_admin_by_username(normalized_username) if existing: return True password_hash = self.hash_password(normalized_password) return self.execute_update( """ INSERT INTO t_admin_accounts (username, password_hash, display_name, status) VALUES (%s, %s, %s, 1) """, (normalized_username, password_hash, str(display_name or "").strip() or normalized_username), ) def verify_admin_password(self, username: str, password: str) -> bool: """校验账号口令是否正确。""" row = self.get_admin_by_username(username) if not row: return False if int(row.get("status") or 0) != 1: return False password_hash = str(row.get("password_hash") or "") return self.verify_password(password, password_hash) def mark_login_success(self, username: str, login_ip: str = "") -> bool: """记录登录成功信息。""" return self.execute_update( """ UPDATE t_admin_accounts SET last_login_at = NOW(), last_login_ip = %s WHERE username = %s """, (str(login_ip or "").strip(), str(username or "").strip()), ) def update_password(self, username: str, new_password: str) -> bool: """更新指定用户口令。""" password_hash = self.hash_password(new_password) return self.execute_update( """ UPDATE t_admin_accounts SET password_hash = %s WHERE username = %s AND status = 1 """, (password_hash, str(username or "").strip()), ) @classmethod def hash_password(cls, raw_password: str) -> str: """生成口令哈希。 存储格式: pbkdf2_sha256$迭代次数$盐(HEX)$哈希(base64) """ password_text = str(raw_password or "") salt_bytes = secrets.token_bytes(16) digest = hashlib.pbkdf2_hmac( "sha256", password_text.encode("utf-8"), salt_bytes, cls.HASH_ITERATIONS, ) salt_hex = salt_bytes.hex() digest_b64 = base64.b64encode(digest).decode("utf-8") return f"{cls.HASH_SCHEME}${cls.HASH_ITERATIONS}${salt_hex}${digest_b64}" @classmethod def verify_password(cls, raw_password: str, stored_hash: str) -> bool: """校验口令哈希。 安全细节: 1. 使用 hmac.compare_digest,避免时序侧信道问题; 2. 对格式异常统一返回 False,避免抛错打断登录流程。 """ try: scheme, iterations_text, salt_hex, digest_b64 = str(stored_hash or "").split("$", 3) if scheme != cls.HASH_SCHEME: return False iterations = int(iterations_text) salt_bytes = bytes.fromhex(salt_hex) expected_digest = base64.b64decode(digest_b64.encode("utf-8")) actual_digest = hashlib.pbkdf2_hmac( "sha256", str(raw_password or "").encode("utf-8"), salt_bytes, iterations, ) return hmac.compare_digest(actual_digest, expected_digest) except Exception: return False