Files
abot/db/admin_account_db.py
Liu 342b4c0065 修复后台弱密码提示误判并恢复server.py编码
1. 修复数据库账号存在时仍回退 config.toml 判断,导致每次登录重复提示弱密码的问题。
2. 补齐默认管理员密码从旧配置迁移到数据库的同步逻辑,兼容历史部署。
3. 恢复 server.py 为可读 UTF-8 中文版本,并补充后台登录与弱密码判定的回归测试。
2026-05-01 10:49:38 +08:00

234 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
后台管理员账号数据访问层。
设计目标:
1. 用数据库表承载后台账号,逐步替代固定配置文件中的账号密码;
2. 提供安全的密码哈希存储与校验能力;
3. 支持登录成功后的登录信息回写,以及在线修改密码;
4. 兼容历史部署,把旧配置中的管理员密码平滑迁移到数据库体系。
"""
import base64
import hashlib
import hmac
import re
import secrets
from typing import Any, Dict, Optional
from db.base import BaseDBOperator
class AdminAccountDBOperator(BaseDBOperator):
"""后台管理员账号数据访问对象。"""
# 口令哈希算法前缀,便于后续平滑升级算法。
HASH_SCHEME = "pbkdf2_sha256"
# PBKDF2 迭代次数,在安全性和计算开销之间做平衡。
HASH_ITERATIONS = 150_000
# 已知高风险密码列表:
# 1. 这里只覆盖默认密码和常见弱密码,不做完整字典;
# 2. 后台安全提醒只需要识别“明显高风险”的情况即可;
# 3. 统一收敛在数据层,方便登录、改密、首次提醒共用。
RISKY_PASSWORDS = {
"admin",
"admin123",
"123456",
"12345678",
"password",
"qwerty",
"abot123",
}
def init_tables(self) -> bool:
"""初始化后台管理员账号表。"""
sql = """
CREATE TABLE IF NOT EXISTS t_admin_accounts (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(64) NOT NULL COMMENT '登录用户名',
password_hash VARCHAR(255) NOT NULL COMMENT '口令哈希',
display_name VARCHAR(64) NULL COMMENT '展示名称',
status TINYINT NOT NULL DEFAULT 1 COMMENT '状态1启用0禁用',
last_login_at DATETIME NULL COMMENT '最近登录时间',
last_login_ip VARCHAR(64) NULL COMMENT '最近登录IP',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY uk_admin_username (username)
) COMMENT='后台管理员账号表'
"""
return self.execute_update(sql)
def get_admin_by_username(self, username: str) -> Optional[Dict[str, Any]]:
"""按用户名读取管理员信息。"""
return self.execute_query(
"""
SELECT id, username, password_hash, display_name, status, last_login_at, last_login_ip
FROM t_admin_accounts
WHERE username = %s
LIMIT 1
""",
(str(username or "").strip(),),
fetch_one=True,
)
def ensure_default_admin(self, username: str, password: str, display_name: str = "系统管理员") -> bool:
"""确保默认管理员存在,并在安全前提下补齐旧配置密码迁移。
这里的迁移策略分三层:
1. 数据库里还没有管理员账号时,按当前配置创建初始账号;
2. 数据库里已经有账号,且密码本来就和当前配置一致时,直接视为已同步;
3. 数据库里仍是历史默认弱密码,但配置里已经换成了更强密码时,
自动把强密码同步进数据库,避免登录和弱密码提示长期错位。
注意:
只有在“数据库当前密码仍然是已知弱密码,而传入配置密码不是弱密码”时,
才允许用配置回写数据库;如果用户已经在后台主动改过密码,则继续以数据库为准。
"""
normalized_username = str(username or "").strip()
normalized_password = str(password or "").strip()
if not normalized_username or not normalized_password:
return False
existing = self.get_admin_by_username(normalized_username)
if not existing:
password_hash = self.hash_password(normalized_password)
return self.execute_update(
"""
INSERT INTO t_admin_accounts (username, password_hash, display_name, status)
VALUES (%s, %s, %s, 1)
""",
(normalized_username, password_hash, str(display_name or "").strip() or normalized_username),
)
stored_hash = str(existing.get("password_hash") or "")
if stored_hash and self.verify_password(normalized_password, stored_hash):
return True
current_password_is_risky = self.is_password_hash_using_risky_password(stored_hash)
incoming_password_is_risky = normalized_password.lower() in self.RISKY_PASSWORDS
if current_password_is_risky and not incoming_password_is_risky:
return self.update_password(normalized_username, normalized_password)
return True
def verify_admin_password(self, username: str, password: str) -> bool:
"""校验账号口令是否正确。"""
row = self.get_admin_by_username(username)
if not row:
return False
if int(row.get("status") or 0) != 1:
return False
password_hash = str(row.get("password_hash") or "")
return self.verify_password(password, password_hash)
def mark_login_success(self, username: str, login_ip: str = "") -> bool:
"""记录登录成功信息。"""
return self.execute_update(
"""
UPDATE t_admin_accounts
SET last_login_at = NOW(),
last_login_ip = %s
WHERE username = %s
""",
(str(login_ip or "").strip(), str(username or "").strip()),
)
def update_password(self, username: str, new_password: str) -> bool:
"""更新指定用户口令。"""
password_hash = self.hash_password(new_password)
return self.execute_update(
"""
UPDATE t_admin_accounts
SET password_hash = %s
WHERE username = %s
AND status = 1
""",
(password_hash, str(username or "").strip()),
)
def is_using_risky_password(self, username: str) -> bool:
"""判断指定管理员是否仍在使用已知弱密码。"""
row = self.get_admin_by_username(username)
if not row:
return False
stored_hash = str(row.get("password_hash") or "")
return self.is_password_hash_using_risky_password(stored_hash)
@classmethod
def is_password_hash_using_risky_password(cls, stored_hash: str) -> bool:
"""判断一个已存储密码哈希是否仍然对应已知弱密码。"""
if not stored_hash:
return False
# 哈希无法反解,这里只能把已知高风险候选值逐个比对。
for candidate in cls.RISKY_PASSWORDS:
if cls.verify_password(candidate, stored_hash):
return True
return False
@classmethod
def validate_password_strength(cls, raw_password: str) -> Optional[str]:
"""校验密码强度,返回错误提示;通过时返回 None。"""
password_text = str(raw_password or "")
if len(password_text) < 8:
return "新密码长度不能少于 8 位"
if password_text.lower() in cls.RISKY_PASSWORDS:
return "新密码过于简单,请避免使用默认口令或常见弱密码"
score = 0
if re.search(r"[A-Za-z]", password_text):
score += 1
if re.search(r"\d", password_text):
score += 1
if re.search(r"[^A-Za-z0-9]", password_text):
score += 1
# 至少满足两类字符,既兼顾安全性,也避免把规则设置得过于苛刻。
if score < 2:
return "新密码需至少包含字母、数字、符号中的两类"
return None
@classmethod
def hash_password(cls, raw_password: str) -> str:
"""生成口令哈希。
存储格式:
pbkdf2_sha256$迭代次数$盐值HEX$摘要base64
"""
password_text = str(raw_password or "")
salt_bytes = secrets.token_bytes(16)
digest = hashlib.pbkdf2_hmac(
"sha256",
password_text.encode("utf-8"),
salt_bytes,
cls.HASH_ITERATIONS,
)
salt_hex = salt_bytes.hex()
digest_b64 = base64.b64encode(digest).decode("utf-8")
return f"{cls.HASH_SCHEME}${cls.HASH_ITERATIONS}${salt_hex}${digest_b64}"
@classmethod
def verify_password(cls, raw_password: str, stored_hash: str) -> bool:
"""校验口令哈希。
安全细节:
1. 使用 hmac.compare_digest避免时序侧信道问题
2. 对格式异常统一返回 False避免异常中断登录流程。
"""
try:
scheme, iterations_text, salt_hex, digest_b64 = str(stored_hash or "").split("$", 3)
if scheme != cls.HASH_SCHEME:
return False
iterations = int(iterations_text)
salt_bytes = bytes.fromhex(salt_hex)
expected_digest = base64.b64decode(digest_b64.encode("utf-8"))
actual_digest = hashlib.pbkdf2_hmac(
"sha256",
str(raw_password or "").encode("utf-8"),
salt_bytes,
iterations,
)
return hmac.compare_digest(actual_digest, expected_digest)
except Exception:
return False