Files
abot/db/admin_account_db.py

218 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
后台管理员账号数据访问层。
设计目标:
1. 用数据库表承载后台账号,替代“固定配置文件账号密码”;
2. 提供安全的密码散列存储与校验能力;
3. 支持登录成功后的登录信息回写与在线修改密码。
"""
import base64
import hashlib
import hmac
import re
import secrets
from typing import Any, Dict, Optional
from db.base import BaseDBOperator
class AdminAccountDBOperator(BaseDBOperator):
"""后台管理员账号数据访问对象。"""
# 口令哈希算法版本前缀,便于将来平滑升级算法。
HASH_SCHEME = "pbkdf2_sha256"
# PBKDF2 迭代次数:在安全性与计算开销之间做平衡。
HASH_ITERATIONS = 150_000
# 风险口令清单:
# 1. 这里优先覆盖系统默认口令和常见极弱口令;
# 2. 后台安全判断只需要识别“明显危险”的情况,不追求做成完整密码字典;
# 3. 统一放在数据层,便于登录鉴权、修改密码、首登提醒共用。
RISKY_PASSWORDS = {
"admin",
"admin123",
"123456",
"12345678",
"password",
"qwerty",
"abot123",
}
def init_tables(self) -> bool:
"""初始化后台管理员表。
表名使用 t_admin_ 前缀,满足后台账号体系命名约定。
"""
sql = """
CREATE TABLE IF NOT EXISTS t_admin_accounts (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(64) NOT NULL COMMENT '登录用户名',
password_hash VARCHAR(255) NOT NULL COMMENT '口令哈希',
display_name VARCHAR(64) NULL COMMENT '展示名称',
status TINYINT NOT NULL DEFAULT 1 COMMENT '状态1启用0禁用',
last_login_at DATETIME NULL COMMENT '最近登录时间',
last_login_ip VARCHAR(64) NULL COMMENT '最近登录IP',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY uk_admin_username (username)
) COMMENT='后台管理员账号表'
"""
return self.execute_update(sql)
def get_admin_by_username(self, username: str) -> Optional[Dict[str, Any]]:
"""按用户名读取管理员信息。"""
return self.execute_query(
"""
SELECT id, username, password_hash, display_name, status, last_login_at, last_login_ip
FROM t_admin_accounts
WHERE username = %s
LIMIT 1
""",
(str(username or "").strip(),),
fetch_one=True,
)
def ensure_default_admin(self, username: str, password: str, display_name: str = "系统管理员") -> bool:
"""确保默认管理员存在。
行为约束:
1. 若用户名已存在,不覆盖既有密码;
2. 仅在“表里不存在该账号”时创建初始账号;
3. 方便从旧配置平滑迁移到数据库账号体系。
"""
normalized_username = str(username or "").strip()
normalized_password = str(password or "").strip()
if not normalized_username or not normalized_password:
return False
existing = self.get_admin_by_username(normalized_username)
if existing:
return True
password_hash = self.hash_password(normalized_password)
return self.execute_update(
"""
INSERT INTO t_admin_accounts (username, password_hash, display_name, status)
VALUES (%s, %s, %s, 1)
""",
(normalized_username, password_hash, str(display_name or "").strip() or normalized_username),
)
def verify_admin_password(self, username: str, password: str) -> bool:
"""校验账号口令是否正确。"""
row = self.get_admin_by_username(username)
if not row:
return False
if int(row.get("status") or 0) != 1:
return False
password_hash = str(row.get("password_hash") or "")
return self.verify_password(password, password_hash)
def mark_login_success(self, username: str, login_ip: str = "") -> bool:
"""记录登录成功信息。"""
return self.execute_update(
"""
UPDATE t_admin_accounts
SET last_login_at = NOW(),
last_login_ip = %s
WHERE username = %s
""",
(str(login_ip or "").strip(), str(username or "").strip()),
)
def update_password(self, username: str, new_password: str) -> bool:
"""更新指定用户口令。"""
password_hash = self.hash_password(new_password)
return self.execute_update(
"""
UPDATE t_admin_accounts
SET password_hash = %s
WHERE username = %s
AND status = 1
""",
(password_hash, str(username or "").strip()),
)
def is_using_risky_password(self, username: str) -> bool:
"""判断指定管理员是否仍在使用风险口令。"""
row = self.get_admin_by_username(username)
if not row:
return False
stored_hash = str(row.get("password_hash") or "")
if not stored_hash:
return False
# 口令是哈希存储的,因此只能把风险候选集逐个比对。
for candidate in self.RISKY_PASSWORDS:
if self.verify_password(candidate, stored_hash):
return True
return False
@classmethod
def validate_password_strength(cls, raw_password: str) -> Optional[str]:
"""校验密码强度,返回错误提示;通过时返回 None。"""
password_text = str(raw_password or "")
if len(password_text) < 8:
return "新密码长度不能少于8位"
if password_text.lower() in cls.RISKY_PASSWORDS:
return "新密码过于简单,请避免使用默认口令或常见弱口令"
score = 0
if re.search(r"[A-Za-z]", password_text):
score += 1
if re.search(r"\d", password_text):
score += 1
if re.search(r"[^A-Za-z0-9]", password_text):
score += 1
# 至少满足两类字符,既兼顾安全性,也避免把规则设得过于苛刻。
if score < 2:
return "新密码需至少包含字母、数字、符号中的两类"
return None
@classmethod
def hash_password(cls, raw_password: str) -> str:
"""生成口令哈希。
存储格式:
pbkdf2_sha256$迭代次数$盐(HEX)$哈希(base64)
"""
password_text = str(raw_password or "")
salt_bytes = secrets.token_bytes(16)
digest = hashlib.pbkdf2_hmac(
"sha256",
password_text.encode("utf-8"),
salt_bytes,
cls.HASH_ITERATIONS,
)
salt_hex = salt_bytes.hex()
digest_b64 = base64.b64encode(digest).decode("utf-8")
return f"{cls.HASH_SCHEME}${cls.HASH_ITERATIONS}${salt_hex}${digest_b64}"
@classmethod
def verify_password(cls, raw_password: str, stored_hash: str) -> bool:
"""校验口令哈希。
安全细节:
1. 使用 hmac.compare_digest避免时序侧信道问题
2. 对格式异常统一返回 False避免抛错打断登录流程。
"""
try:
scheme, iterations_text, salt_hex, digest_b64 = str(stored_hash or "").split("$", 3)
if scheme != cls.HASH_SCHEME:
return False
iterations = int(iterations_text)
salt_bytes = bytes.fromhex(salt_hex)
expected_digest = base64.b64decode(digest_b64.encode("utf-8"))
actual_digest = hashlib.pbkdf2_hmac(
"sha256",
str(raw_password or "").encode("utf-8"),
salt_bytes,
iterations,
)
return hmac.compare_digest(actual_digest, expected_digest)
except Exception:
return False