Revert "修复后台弱密码提示误判并恢复server.py编码"

This reverts commit 342b4c0065.
This commit is contained in:
Liu
2026-05-01 12:45:35 +08:00
parent 5aca5c5f28
commit 22c871105a
4 changed files with 251 additions and 467 deletions

View File

@@ -1,11 +1,10 @@
import time from flask import Blueprint, render_template, request, redirect, url_for, session, current_app, jsonify
from functools import wraps from functools import wraps
from flask import Blueprint, current_app, jsonify, redirect, render_template, request, session, url_for
from loguru import logger from loguru import logger
import time
# 创建认证蓝图 # 创建认证蓝图
auth_bp = Blueprint("auth", __name__) auth_bp = Blueprint('auth', __name__)
def _is_ajax_request() -> bool: def _is_ajax_request() -> bool:
@@ -15,38 +14,36 @@ def _is_ajax_request() -> bool:
return requested_with == "xmlhttprequest" or "application/json" in accept_header return requested_with == "xmlhttprequest" or "application/json" in accept_header
# 登录检查装饰器
def login_required(f): def login_required(f):
"""登录检查装饰器。"""
@wraps(f) @wraps(f)
def decorated_function(*args, **kwargs): def decorated_function(*args, **kwargs):
if not session.get("logged_in"): if not session.get('logged_in'):
return redirect(url_for("auth.login")) return redirect(url_for('auth.login'))
server = current_app.dashboard_server server = current_app.dashboard_server
session_timeout_seconds = int(server.get_auth_policy().get("session_timeout_minutes", 480) * 60) session_timeout_seconds = int(server.get_auth_policy().get("session_timeout_minutes", 480) * 60)
last_activity_at = float(session.get("last_activity_at") or 0.0) last_activity_at = float(session.get("last_activity_at") or 0.0)
if last_activity_at > 0 and (time.time() - last_activity_at) > session_timeout_seconds: if last_activity_at > 0 and (time.time() - last_activity_at) > session_timeout_seconds:
# 会话超时后统一清理登录态,避免长期闲置会话继续可用。 # 会话超时后统一清理登录态,避免长期闲置会话继续可用。
session.clear() session.clear()
return redirect(url_for("auth.login")) return redirect(url_for('auth.login'))
session["last_activity_at"] = time.time() session["last_activity_at"] = time.time()
return f(*args, **kwargs) return f(*args, **kwargs)
return decorated_function return decorated_function
@auth_bp.route("/login", methods=["GET", "POST"]) # 登录页面
@auth_bp.route('/login', methods=['GET', 'POST'])
def login(): def login():
"""后台登录页及登录提交入口。"""
error = None error = None
if request.method == "POST": if request.method == 'POST':
# 用户名做 strip,避免首尾空格导致的误判;密码保留原样,避免改变真实口令 # 使用 strip 规避用户误输入首尾空格导致的误判。
username = str(request.form.get("username", "") or "").strip() username = str(request.form.get('username', '') or '').strip()
password = str(request.form.get("password", "") or "") password = str(request.form.get('password', '') or '')
remote_ip = request.remote_addr or "" remote_ip = request.remote_addr or ""
# 从应用上下文获取服务器实例,而不是从蓝图对象
server = current_app.dashboard_server server = current_app.dashboard_server
admin_db = getattr(server, "admin_account_db", None) admin_db = getattr(server, "admin_account_db", None)
lock_status = server.get_login_lock_status(username, remote_ip) lock_status = server.get_login_lock_status(username, remote_ip)
@@ -55,89 +52,82 @@ def login():
error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试" error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试"
if _is_ajax_request(): if _is_ajax_request():
return jsonify({"success": False, "error": error}), 429 return jsonify({"success": False, "error": error}), 429
return render_template("login.html", error=error) return render_template('login.html', error=error)
# 认证边界说明: # 优先使用数据库账号体系鉴权;若不可用则回退旧配置模式,保证兼容存量部署。
# 1. 只要数据库里已经存在该管理员账号,就必须以数据库密码为准;
# 2. 否则用户在后台改密后config.toml 里的旧密码仍然可能继续生效;
# 3. 仅当数据库不可用,或者该账号尚未迁移进数据库时,才回退到旧配置模式。
login_ok = False login_ok = False
should_use_config_fallback = not bool(admin_db)
if admin_db: if admin_db:
try: try:
admin_row = admin_db.get_admin_by_username(username) login_ok = admin_db.verify_admin_password(username, password)
should_use_config_fallback = not bool(admin_row)
if admin_row and int(admin_row.get("status") or 0) == 1:
login_ok = admin_db.verify_password(password, str(admin_row.get("password_hash") or ""))
if login_ok: if login_ok:
admin_db.mark_login_success(username, remote_ip) admin_db.mark_login_success(username, request.remote_addr or "")
except Exception as e: except Exception as e:
logger.error(f"数据库账号登录校验异常,回退配置模式: {e}") logger.error(f"数据库账号登录校验异常,回退配置模式: {e}")
login_ok = False login_ok = False
should_use_config_fallback = True
if not login_ok and should_use_config_fallback: if not login_ok:
login_ok = username == server.username and password == server.password login_ok = (username == server.username and password == server.password)
if login_ok: if login_ok:
server.clear_login_failures(username, remote_ip) server.clear_login_failures(username, remote_ip)
session["logged_in"] = True session['logged_in'] = True
session.permanent = True session.permanent = True
session["username"] = username session['username'] = username # 存储用户名到session
session["last_activity_at"] = time.time() session['last_activity_at'] = time.time()
# 登录成功后再读取当前账号的安全状态,决定是否强制弹出改密提示 # 若账号仍在使用默认/弱口令,则登录后强提示修改密码
session["force_password_change"] = bool(server.should_force_password_change(username)) session['force_password_change'] = bool(server.should_force_password_change(username))
logger.debug(f"Login successful. Session after login: {dict(session)}") logger.debug(f"Login successful. Session after login: {dict(session)}")
if _is_ajax_request(): if _is_ajax_request():
return jsonify({"success": True, "redirect_url": url_for("main.index")}) return jsonify({"success": True, "redirect_url": url_for('main.index')})
return redirect(url_for("main.index")) return redirect(url_for('main.index'))
lock_status = server.mark_login_failure(username, remote_ip)
if lock_status.get("locked"):
wait_seconds = int(lock_status.get("remaining_seconds") or 0)
error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试"
else: else:
remaining = max( lock_status = server.mark_login_failure(username, remote_ip)
0, if lock_status.get("locked"):
int(server.get_auth_policy().get("max_failed_attempts", 5)) - int(lock_status.get("failed_count") or 0), wait_seconds = int(lock_status.get("remaining_seconds") or 0)
) error = f"登录失败次数过多,请在 {wait_seconds} 秒后再试"
error = "用户名或密码错误" else:
if remaining > 0: remaining = max(0, int(server.get_auth_policy().get("max_failed_attempts", 5)) - int(lock_status.get("failed_count") or 0))
error = f"{error},再失败 {remaining} 次将暂时锁定" error = '用户名或密码错误'
if _is_ajax_request(): if remaining > 0:
return jsonify({"success": False, "error": error}), 400 error = f"{error},再失败 {remaining} 次将暂时锁定"
if _is_ajax_request():
return jsonify({"success": False, "error": error}), 400
return render_template("login.html", error=error) return render_template('login.html', error=error)
@auth_bp.route("/logout") # 登出
@auth_bp.route('/logout')
def logout(): def logout():
"""退出登录。"""
session.clear() session.clear()
return redirect(url_for("auth.login")) return redirect(url_for('auth.login'))
@auth_bp.route("/api/auth/security_status", methods=["GET"]) @auth_bp.route('/api/auth/security_status', methods=['GET'])
@login_required @login_required
def get_security_status(): def get_security_status():
"""返回当前登录管理员的安全状态。""" """返回当前登录管理员的安全状态。"""
return jsonify( return jsonify({
{ "success": True,
"success": True, "data": {
"data": { "force_password_change": bool(session.get("force_password_change", False)),
"force_password_change": bool(session.get("force_password_change", False)), "session_timeout_minutes": int(current_app.dashboard_server.get_auth_policy().get("session_timeout_minutes", 480)),
"session_timeout_minutes": int(
current_app.dashboard_server.get_auth_policy().get("session_timeout_minutes", 480)
),
},
} }
) })
@auth_bp.route("/api/auth/change_password", methods=["POST"]) @auth_bp.route('/api/auth/change_password', methods=['POST'])
@login_required @login_required
def change_password(): def change_password():
"""修改当前登录管理员密码。""" """修改当前登录管理员密码。
前端请求参数:
{
"old_password": "旧密码",
"new_password": "新密码",
"confirm_password": "确认新密码"
}
"""
server = current_app.dashboard_server server = current_app.dashboard_server
admin_db = getattr(server, "admin_account_db", None) admin_db = getattr(server, "admin_account_db", None)
if not admin_db: if not admin_db:
@@ -173,7 +163,7 @@ def change_password():
if not updated: if not updated:
return jsonify({"success": False, "error": "密码更新失败,请稍后重试"}), 500 return jsonify({"success": False, "error": "密码更新失败,请稍后重试"}), 500
# 密成功后立刻移除“必须修改密码”的会话提示,避免用户当前会话继续被反复打断 # 密码修改成功后移除“必须修改密码”的会话提示。
session["force_password_change"] = False session["force_password_change"] = False
return jsonify({"success": True, "message": "密码修改成功"}) return jsonify({"success": True, "message": "密码修改成功"})
except Exception as e: except Exception as e:

View File

@@ -1,52 +1,43 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
统计看板服务器 统计看板服务器 - 使用Flask蓝图重构版
这里负责:
1. 读取 Dashboard 配置并创建 Flask 应用;
2. 初始化后台依赖对象、蓝图和静态资源路由;
3. 提供后台登录限流、弱密码提醒等通用安全能力。
""" """
import os import os
import secrets
import sys import sys
import threading import threading
import time import time
import secrets
from datetime import timedelta from datetime import timedelta
import toml import toml
from flask import Flask, send_from_directory from flask import Flask, send_from_directory
from loguru import logger from loguru import logger
from db.admin_account_db import AdminAccountDBOperator
from db.contacts_db import ContactsDBOperator from db.contacts_db import ContactsDBOperator
from db.admin_account_db import AdminAccountDBOperator
from db.emoji_asset_db import EmojiAssetDB from db.emoji_asset_db import EmojiAssetDB
from db.fun_command_rule_db import FunCommandRuleDBOperator
from db.member_context_db import MemberContextDBOperator from db.member_context_db import MemberContextDBOperator
from db.message_storage import MessageStorageDB from db.message_storage import MessageStorageDB
from db.stats_db import StatsDBOperator from db.stats_db import StatsDBOperator
from db.task_db import TaskDBOperator from db.task_db import TaskDBOperator
from db.fun_command_rule_db import FunCommandRuleDBOperator
from utils.fun_command_rule_service import FunCommandRuleService from utils.fun_command_rule_service import FunCommandRuleService
from wechat_ipad import WechatAPIClient from wechat_ipad import WechatAPIClient
# 添加项目根目录到系统路径,确保可以导入项目模块 # 添加项目根目录到系统路径,确保可以导入项目模块
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
class DashboardServer: class DashboardServer:
"""统计看板服务器""" """统计看板服务器"""
def __init__( def __init__(self, host: str = None, port: int = None,
self, username: str = None, password: str = None,
host: str = None, robot_instance=None):
port: int = None, # 加载配置文件
username: str = None,
password: str = None,
robot_instance=None,
):
# 先加载配置文件,再用传入参数覆盖,保证 CLI/外部调用具备最高优先级。
self.config = self._load_dashboard_config() self.config = self._load_dashboard_config()
# 优先使用传入的参数,其次使用配置文件中的参数
self.host = host or self.config.get("server", {}).get("host", "0.0.0.0") self.host = host or self.config.get("server", {}).get("host", "0.0.0.0")
self.port = port or self.config.get("server", {}).get("port", 8888) self.port = port or self.config.get("server", {}).get("port", 8888)
self.username = username or self.config.get("auth", {}).get("username", "admin") self.username = username or self.config.get("auth", {}).get("username", "admin")
@@ -54,108 +45,105 @@ class DashboardServer:
self.LOG = logger self.LOG = logger
self.LOG.info(f"Dashboard配置加载完成: 服务器将运行在 {self.host}:{self.port}") self.LOG.info(f"Dashboard配置加载完成: 服务器将运行在 {self.host}:{self.port}")
# 登录失败限流兜底缓存:
# 登录失败限流的进程内兜底缓存: # 1. 优先尝试 Redis但为了兼容 Redis 暂不可用的场景,这里保留进程内兜底
# 1. 优先用 Redis 做共享限流 # 2. 字典内容只保存短期登录失败窗口,不用于持久化
# 2. 如果 Redis 暂时不可用,至少本进程内仍有基本防护; # 3. 线程化 WSGI 会并发访问,因此需要显式加锁。
# 3. 这里会被并发访问,因此需要配合线程锁使用。
self._auth_runtime_lock = threading.Lock() self._auth_runtime_lock = threading.Lock()
self._auth_failures = {} self._auth_failures = {}
# 如果提供了robot实例则使用其对象
if robot_instance:
self.db_manager = robot_instance.db_manager
self.stats_db = StatsDBOperator(self.db_manager)
# Dashboard 启动可能早于 iPad 登录完成:
# 1. 此时 Robot 上的 message_storage 还没来得及绑定真实 bot
# 2. 但后台很多页面仍然依赖消息存储与表情资产库;
# 3. 因此这里优先复用 Robot 已初始化的 message_storage没有则再安全回退到 DB 层对象。
self.message_storage = getattr(robot_instance, "message_storage", None) or MessageStorageDB(self.db_manager)
self.emoji_asset_db = getattr(self.message_storage, "emoji_asset_db", None) or EmojiAssetDB(self.db_manager)
self.contact_db: ContactsDBOperator = ContactsDBOperator(self.db_manager)
self.member_context_db = MemberContextDBOperator(self.db_manager)
self.task_db: TaskDBOperator = TaskDBOperator(self.db_manager)
# 后台管理员账号数据层:用于登录鉴权与修改密码。
self.admin_account_db = AdminAccountDBOperator(self.db_manager)
self.system_job_db = robot_instance.system_job_db
self.system_job_loader = robot_instance.system_job_loader
self.plugin_schedule_db = robot_instance.plugin_schedule_db
self.plugin_schedule_manager = robot_instance.plugin_schedule_manager
self.group_plugin_config_db = robot_instance.group_plugin_config_db
self.llm_catalog_db = robot_instance.llm_catalog_db
self.group_plugin_config_service = robot_instance.group_plugin_config_service
# 趣味指令规则服务:用于“文案/事件触发多媒体玩法回复”后台配置与缓存。
# 这里统一在 Dashboard 启动时初始化,保证管理端可直接读写规则。
self.fun_command_rule_db = FunCommandRuleDBOperator(self.db_manager)
self.fun_command_rule_service = FunCommandRuleService(
db_operator=self.fun_command_rule_db,
redis_client=self.db_manager.get_redis_connection(),
local_ttl_seconds=30,
)
self.fun_command_rule_service.init_tables()
self.fun_command_rule_service.refresh_cache()
# 获取联系人管理器实例
self.contact_manager = robot_instance.contact_manager
self.plugin_manager = robot_instance.plugin_manager
self.plugin_registry = robot_instance.plugin_registry
self.client: WechatAPIClient = robot_instance.ipad_bot
self.robot = robot_instance
self.member_context_plugin = self.plugin_manager.plugins.get("成员交互摘要")
self.member_context_service = getattr(self.member_context_plugin, "service", None)
if not robot_instance: self.LOG.info("使用Robot实例的对象进行初始化")
# 初始化后台管理员账号表,并将旧配置中的默认账号平滑迁移进数据库。
try:
table_ok = self.admin_account_db.init_tables()
if not table_ok:
self.LOG.warning("初始化后台账号表失败,将回退旧配置账号模式")
else:
seed_ok = self.admin_account_db.ensure_default_admin(self.username, self.password, "系统管理员")
if seed_ok:
self.LOG.info("后台账号体系初始化完成(数据库账号模式已可用)")
else:
self.LOG.warning("后台账号种子初始化失败,请检查配置中的默认账号信息")
except Exception as e:
self.LOG.error(f"初始化后台账号体系失败,将回退旧配置账号模式: {e}")
else:
self.LOG.error("未提供Robot实例Dashboard无法正常工作") self.LOG.error("未提供Robot实例Dashboard无法正常工作")
raise ValueError("必须提供Robot实例") raise ValueError("必须提供Robot实例")
self.db_manager = robot_instance.db_manager
self.stats_db = StatsDBOperator(self.db_manager)
# Dashboard 启动可能早于 iPad 登录完成:
# 1. 此时 Robot 上的 message_storage 可能尚未绑定完成;
# 2. 但后台页面仍然依赖消息存储与表情资产库;
# 3. 因此优先复用 Robot 现成对象,没有时再安全回退到 DB 层对象。
self.message_storage = getattr(robot_instance, "message_storage", None) or MessageStorageDB(self.db_manager)
self.emoji_asset_db = getattr(self.message_storage, "emoji_asset_db", None) or EmojiAssetDB(self.db_manager)
self.contact_db: ContactsDBOperator = ContactsDBOperator(self.db_manager)
self.member_context_db = MemberContextDBOperator(self.db_manager)
self.task_db: TaskDBOperator = TaskDBOperator(self.db_manager)
# 后台管理员账号数据层:用于登录鉴权、弱密码判断和在线改密。
self.admin_account_db = AdminAccountDBOperator(self.db_manager)
self.system_job_db = robot_instance.system_job_db
self.system_job_loader = robot_instance.system_job_loader
self.plugin_schedule_db = robot_instance.plugin_schedule_db
self.plugin_schedule_manager = robot_instance.plugin_schedule_manager
self.group_plugin_config_db = robot_instance.group_plugin_config_db
self.llm_catalog_db = robot_instance.llm_catalog_db
self.group_plugin_config_service = robot_instance.group_plugin_config_service
# 趣味指令规则服务:用于“文案/事件触发多媒体玩法回复”的后台配置与缓存。
self.fun_command_rule_db = FunCommandRuleDBOperator(self.db_manager)
self.fun_command_rule_service = FunCommandRuleService(
db_operator=self.fun_command_rule_db,
redis_client=self.db_manager.get_redis_connection(),
local_ttl_seconds=30,
)
self.fun_command_rule_service.init_tables()
self.fun_command_rule_service.refresh_cache()
# 其余运行时对象直接复用 Robot 已初始化实例,避免重复构造。
self.contact_manager = robot_instance.contact_manager
self.plugin_manager = robot_instance.plugin_manager
self.plugin_registry = robot_instance.plugin_registry
self.client: WechatAPIClient = robot_instance.ipad_bot
self.robot = robot_instance
self.member_context_plugin = self.plugin_manager.plugins.get("成员交互摘要")
self.member_context_service = getattr(self.member_context_plugin, "service", None)
self.LOG.info("使用Robot实例的对象进行初始化")
# 初始化后台管理员账号表,并将旧配置中的默认账号平滑迁移进数据库。
try:
table_ok = self.admin_account_db.init_tables()
if not table_ok:
self.LOG.warning("初始化后台账号表失败,将回退旧配置账号模式")
else:
seed_ok = self.admin_account_db.ensure_default_admin(self.username, self.password, "系统管理员")
if seed_ok:
self.LOG.info("后台账号体系初始化完成(数据库账号模式已可用)")
else:
self.LOG.warning("后台账号种子初始化失败,请检查配置中的默认账号信息")
except Exception as e:
self.LOG.error(f"初始化后台账号体系失败,将回退旧配置账号模式: {e}")
self.app = self._create_app() self.app = self._create_app()
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._server = None # Werkzeug 服务实例会在 run() 时写入这里。 self._server = None # 存储服务实例
def _load_dashboard_config(self): def _load_dashboard_config(self):
"""加载 Dashboard 配置文件""" """加载Dashboard配置文件"""
try: try:
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.toml") config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.toml')
if os.path.exists(config_path): if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f: with open(config_path, 'r', encoding='utf-8') as f:
return toml.load(f) return toml.load(f)
else:
# 如果配置文件不存在,创建默认配置,方便本地快速启动。 # 如果配置文件不存在,创建默认配置
default_config = { default_config = {
"server": {"host": "0.0.0.0", "port": 8888}, "server": {"host": "0.0.0.0", "port": 8888},
"auth": {"username": "admin", "password": "admin123"}, "auth": {"username": "admin", "password": "admin123"}
} }
with open(config_path, "w", encoding="utf-8") as f: with open(config_path, 'w', encoding='utf-8') as f:
toml.dump(default_config, f) toml.dump(default_config, f)
return default_config return default_config
except Exception as e: except Exception as e:
self.LOG.error(f"加载Dashboard配置文件失败: {e}") self.LOG.error(f"加载Dashboard配置文件失败: {e}")
# 返回默认配置
return { return {
"server": {"host": "0.0.0.0", "port": 8888}, "server": {"host": "0.0.0.0", "port": 8888},
"auth": {"username": "admin", "password": "admin123"}, "auth": {"username": "admin", "password": "admin123"}
} }
def _create_app(self) -> Flask: def _create_app(self) -> Flask:
"""创建 Flask 应用""" """创建Flask应用"""
template_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates") # 指定模板文件夹路径
template_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')
app = Flask(__name__, template_folder=template_folder) app = Flask(__name__, template_folder=template_folder)
auth_config = self.config.get("auth", {}) or {} auth_config = self.config.get("auth", {}) or {}
session_timeout_minutes = int(auth_config.get("session_timeout_minutes", 480) or 480) session_timeout_minutes = int(auth_config.get("session_timeout_minutes", 480) or 480)
cookie_secure = bool(auth_config.get("cookie_secure", False)) cookie_secure = bool(auth_config.get("cookie_secure", False))
@@ -164,54 +152,57 @@ class DashboardServer:
or auth_config.get("secret_key", "") or auth_config.get("secret_key", "")
or "" or ""
).strip() ).strip()
if configured_secret: if configured_secret:
app.secret_key = configured_secret app.secret_key = configured_secret
else: else:
# 未显式配置 secret_key 时,用进程级随机密钥兜底 # 未显式配置 secret_key,则每次进程启动生成随机值
# 1. 安全性优于硬编码固定值 # 1. 这比固定硬编码密钥安全得多
# 2. 服务重启后旧 session 会失效,但这是可接受的安全代价 # 2. 代价是服务重启后旧 session 会失效,作为安全兜底是可接受的;
# 3. 同时输出 warning提醒后续通过配置或环境变量固定注入。 # 3. 同时输出 warning提醒后续最好通过配置或环境变量固定注入。
app.secret_key = secrets.token_hex(32) app.secret_key = secrets.token_hex(32)
self.LOG.warning("未配置 Dashboard secret_key已使用进程级随机密钥重启后现有登录会失效") self.LOG.warning("未配置 Dashboard secret_key已使用进程级随机密钥重启后现有登录会失效")
# 关闭模板缓存,便于开发时实时看到页面修改结果。 # 禁用模板缓存,使修改HTML文件后立即生效 False =重启才生效
app.config["TEMPLATES_AUTO_RELOAD"] = True app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config["SESSION_COOKIE_HTTPONLY"] = True app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config["SESSION_COOKIE_SAMESITE"] = str(auth_config.get("cookie_samesite", "Lax") or "Lax") app.config['SESSION_COOKIE_SAMESITE'] = str(auth_config.get("cookie_samesite", "Lax") or "Lax")
app.config["SESSION_COOKIE_SECURE"] = cookie_secure app.config['SESSION_COOKIE_SECURE'] = cookie_secure
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=max(15, session_timeout_minutes)) app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(minutes=max(15, session_timeout_minutes))
# 设置Werkzeug日志级别为DEBUG
import logging import logging
logging.getLogger('werkzeug').setLevel(logging.ERROR)
logging.getLogger("werkzeug").setLevel(logging.ERROR) # 将dashboard_server实例设置为app的属性
# 将 DashboardServer 实例挂到 app 上,方便蓝图在请求期取用。
app.dashboard_server = self app.dashboard_server = self
static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static") # 配置静态文件访问
static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
@app.route("/static/<path:filename>") @app.route('/static/<path:filename>')
def serve_static(filename): def serve_static(filename):
return send_from_directory(static_folder, filename) return send_from_directory(static_folder, filename)
# 获取项目根目录下的static/images目录
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
images_dir = os.path.join(project_root, "static", "images") images_dir = os.path.join(project_root, "static", "images")
# 确保目录存在
os.makedirs(images_dir, exist_ok=True) os.makedirs(images_dir, exist_ok=True)
@app.route("/static/images/<path:filename>") @app.route('/static/images/<path:filename>')
def serve_images(filename): def serve_images(filename):
return send_from_directory(images_dir, filename) return send_from_directory(images_dir, filename)
@app.route("/favicon.ico") # 添加一个路由处理favicon请求
@app.route('/favicon.ico')
def favicon(): def favicon():
return send_from_directory( return send_from_directory(os.path.join(app.root_path, 'static'),
os.path.join(app.root_path, "static"), 'favicon.ico', mimetype='image/vnd.microsoft.icon')
"favicon.ico",
mimetype="image/vnd.microsoft.icon",
)
# 注册蓝图
self._register_blueprints(app) self._register_blueprints(app)
return app return app
def get_auth_policy(self) -> dict: def get_auth_policy(self) -> dict:
@@ -248,19 +239,12 @@ class DashboardServer:
if expire_at > 0 and expire_at <= time.time(): if expire_at > 0 and expire_at <= time.time():
self._auth_failures.pop(guard_key, None) self._auth_failures.pop(guard_key, None)
record = {} record = {}
return { return {
"count": int(record.get("count") or 0), "count": int(record.get("count") or 0),
"first_failed_at": float(record.get("first_failed_at") or 0.0), "first_failed_at": float(record.get("first_failed_at") or 0.0),
} }
def _save_login_failure_record( def _save_login_failure_record(self, guard_key: str, count: int, first_failed_at: float, ttl_seconds: int) -> None:
self,
guard_key: str,
count: int,
first_failed_at: float,
ttl_seconds: int,
) -> None:
"""保存登录失败记录,优先 Redis失败时回退到进程内缓存。""" """保存登录失败记录,优先 Redis失败时回退到进程内缓存。"""
payload = f"{int(count)}|{float(first_failed_at)}" payload = f"{int(count)}|{float(first_failed_at)}"
try: try:
@@ -322,48 +306,41 @@ class DashboardServer:
def should_force_password_change(self, username: str) -> bool: def should_force_password_change(self, username: str) -> bool:
"""判断当前管理员是否应该被强制提示修改密码。""" """判断当前管理员是否应该被强制提示修改密码。"""
normalized_username = str(username or "").strip()
admin_db = getattr(self, "admin_account_db", None) admin_db = getattr(self, "admin_account_db", None)
if admin_db: if admin_db and admin_db.is_using_risky_password(username):
try: return True
admin_row = admin_db.get_admin_by_username(normalized_username)
# 只要数据库里已经存在该账号,就完全以数据库中的当前密码状态为准。
# 这样可以避免用户已经在后台把密码改强后,仍被 config.toml 中的旧默认密码反复误判。
if admin_row:
return bool(admin_db.is_using_risky_password(normalized_username))
except Exception as e:
self.LOG.error(f"判断后台弱密码状态时出现异常,将回退配置判断: {e}")
# 数据库体系不可用或该账号尚未迁移进数据库时,再回退配置值判断。 # 数据库体系不可用时,再回退配置值判断,至少把默认 admin/admin123 识别出来
fallback_username = str(self.username or "").strip() fallback_username = str(self.username or "").strip()
fallback_password = str(self.password or "").strip() fallback_password = str(self.password or "").strip()
return ( return (
normalized_username == fallback_username str(username or "").strip() == fallback_username
and fallback_password in getattr(admin_db, "RISKY_PASSWORDS", {"admin123", "admin"}) and fallback_password in getattr(admin_db, "RISKY_PASSWORDS", {"admin123", "admin"})
) )
def _register_blueprints(self, app): def _register_blueprints(self, app):
"""注册所有蓝图""" """注册所有蓝图"""
# 在函数内部导入蓝图,避免循环导入 # 在函数内部导入蓝图,避免循环导入
from admin.dashboard.blueprints.auth import auth_bp from admin.dashboard.blueprints.auth import auth_bp
from admin.dashboard.blueprints.contacts import contacts_bp from admin.dashboard.blueprints.contacts import contacts_bp
from admin.dashboard.blueprints.file_browser import file_browser_bp
from admin.dashboard.blueprints.friend_circle import friend_circle_bp
from admin.dashboard.blueprints.fun_command_rules import fun_command_rules_bp
from admin.dashboard.blueprints.group_plugin_config import group_plugin_config_bp
from admin.dashboard.blueprints.main import main_bp
from admin.dashboard.blueprints.message_push import message_push_bp
from admin.dashboard.blueprints.messages import messages_bp
from admin.dashboard.blueprints.plugin_routes import plugin_routes
from admin.dashboard.blueprints.plugin_schedules import plugin_schedules_bp
from admin.dashboard.blueprints.robot import robot_bp from admin.dashboard.blueprints.robot import robot_bp
from admin.dashboard.blueprints.messages import messages_bp
from admin.dashboard.blueprints.stats import stats_bp from admin.dashboard.blueprints.stats import stats_bp
from admin.dashboard.blueprints.system import system_bp from admin.dashboard.blueprints.system import system_bp
from admin.dashboard.blueprints.system_jobs import system_jobs_bp from admin.dashboard.blueprints.main import main_bp
from admin.dashboard.blueprints.trendradar_webhook import trendradar_webhook_bp from admin.dashboard.blueprints.plugin_routes import plugin_routes
from admin.dashboard.blueprints.virtual_group import virtual_group_bp from admin.dashboard.blueprints.virtual_group import virtual_group_bp
from admin.dashboard.blueprints.file_browser import file_browser_bp
from admin.dashboard.blueprints.message_push import message_push_bp
from admin.dashboard.blueprints.friend_circle import friend_circle_bp
from admin.dashboard.blueprints.system_jobs import system_jobs_bp
from admin.dashboard.blueprints.plugin_schedules import plugin_schedules_bp
from admin.dashboard.blueprints.group_plugin_config import group_plugin_config_bp
from admin.dashboard.blueprints.fun_command_rules import fun_command_rules_bp
from admin.dashboard.blueprints.trendradar_webhook import trendradar_webhook_bp
app.register_blueprint(virtual_group_bp, url_prefix="/virtual_group") # 在app.register_blueprint部分添加
app.register_blueprint(virtual_group_bp, url_prefix='/virtual_group')
app.register_blueprint(auth_bp) app.register_blueprint(auth_bp)
app.register_blueprint(main_bp) app.register_blueprint(main_bp)
app.register_blueprint(contacts_bp) app.register_blueprint(contacts_bp)
@@ -384,16 +361,16 @@ class DashboardServer:
self.LOG.info("所有蓝图已注册") self.LOG.info("所有蓝图已注册")
def run(self): def run(self):
"""运行服务器""" """运行服务器"""
from werkzeug.serving import make_server from werkzeug.serving import make_server
# 设置Werkzeug日志级别为DEBUG
import logging import logging
logging.getLogger('werkzeug').setLevel(logging.ERROR)
logging.getLogger("werkzeug").setLevel(logging.ERROR)
self.LOG.info(f"启动服务器: {self.host}:{self.port}") self.LOG.info(f"启动服务器: {self.host}:{self.port}")
try: try:
# Dashboard 存在文件浏览、统计查询等慢请求,单线程 WSGI 一旦被占住会拖死整个后台。 # Dashboard 存在文件浏览、统计查询等慢请求,单线程 WSGI 一旦被占住会导致整个后台无响应
# 改为 threaded server 可以避免某个接口阻塞所有页面一起无响应 # 改为 threaded server避免某个接口阻塞后拖死所有页面访问
self._server = make_server(self.host, self.port, self.app, threaded=True) self._server = make_server(self.host, self.port, self.app, threaded=True)
self._server.serve_forever() self._server.serve_forever()
except Exception as e: except Exception as e:
@@ -401,22 +378,26 @@ class DashboardServer:
self._stop_event.set() self._stop_event.set()
def stop(self): def stop(self):
"""停止服务器""" """停止服务器"""
self.LOG.info("正在停止服务器...") self.LOG.info("正在停止服务器...")
self._stop_event.set() self._stop_event.set()
# 使用werkzeug服务器的关闭方法
if self._server: if self._server:
self._server.shutdown() self._server.shutdown()
self.LOG.info("服务器已停止") self.LOG.info("服务器已停止")
def get_current_user_info(self): def get_current_user_info(self):
"""获取当前登录的微信用户信息""" """获取当前登录的微信用户信息"""
try: try:
if not self.client: if not self.client:
self.LOG.error("client实例不可用无法获取当前用户信息") self.LOG.error("client实例不可用无法获取当前用户信息")
return {"success": False, "message": "实例不可用"} return {"success": False, "message": "实例不可用"}
# 获取当前登录的微信ID
# 从新的resp格式中获取用户信息
try: try:
if self.robot is None: if self.robot is None:
raise ValueError("机器人对象未初始化") raise ValueError("机器人对象未初始化")
@@ -425,7 +406,7 @@ class DashboardServer:
"nickName": getattr(self.robot, "nickname", ""), "nickName": getattr(self.robot, "nickname", ""),
"mobile": getattr(self.robot, "phone", ""), "mobile": getattr(self.robot, "phone", ""),
"smallHeadImgUrl": getattr(self.robot, "head_image", ""), "smallHeadImgUrl": getattr(self.robot, "head_image", ""),
"signature": getattr(self.robot, "signature", ""), "signature": getattr(self.robot, "signature", "")
} }
except (AttributeError, ValueError) as e: except (AttributeError, ValueError) as e:
print(f"获取用户信息出错: {str(e)}") print(f"获取用户信息出错: {str(e)}")
@@ -434,7 +415,7 @@ class DashboardServer:
"nickName": self.robot.nickname, "nickName": self.robot.nickname,
"mobile": self.robot.phone, "mobile": self.robot.phone,
"smallHeadImgUrl": self.robot.head_image, "smallHeadImgUrl": self.robot.head_image,
"signature": self.robot.signature, "signature": self.robot.signature
} }
if not user_data: if not user_data:
@@ -445,11 +426,11 @@ class DashboardServer:
"data": { "data": {
"wx_id": user_data.get("wxid", ""), "wx_id": user_data.get("wxid", ""),
"nickname": user_data.get("nickName", "未知用户"), "nickname": user_data.get("nickName", "未知用户"),
"avatar": user_data.get("smallHeadImgUrl", "logo.png"), # 使用小头像 URL "avatar": user_data.get("smallHeadImgUrl", "logo.png"), # 使用小头像URL
"mobile": user_data.get("mobile", ""), "mobile": user_data.get("mobile", ""),
"home": f"{user_data.get('province', '')}-{user_data.get('city', '')}", # 组合省市信息 "home": f"{user_data.get('province', '')}-{user_data.get('city', '')}", # 组合省市信息
"signature": user_data.get("signature", "")[:10], "signature": user_data.get("signature", "")[:10]
}, }
} }
except Exception as e: except Exception as e:
self.LOG.error(f"获取当前用户信息失败: {e}") self.LOG.error(f"获取当前用户信息失败: {e}")

View File

@@ -3,10 +3,9 @@
后台管理员账号数据访问层。 后台管理员账号数据访问层。
设计目标: 设计目标:
1. 用数据库表承载后台账号,逐步替代固定配置文件中的账号密码; 1. 用数据库表承载后台账号,替代固定配置文件账号密码
2. 提供安全的密码哈希存储与校验能力; 2. 提供安全的密码散列存储与校验能力;
3. 支持登录成功后的登录信息回写,以及在线修改密码 3. 支持登录成功后的登录信息回写在线修改密码
4. 兼容历史部署,把旧配置中的管理员密码平滑迁移到数据库体系。
""" """
import base64 import base64
@@ -22,14 +21,14 @@ from db.base import BaseDBOperator
class AdminAccountDBOperator(BaseDBOperator): class AdminAccountDBOperator(BaseDBOperator):
"""后台管理员账号数据访问对象。""" """后台管理员账号数据访问对象。"""
# 口令哈希算法前缀,便于后续平滑升级算法。 # 口令哈希算法版本前缀,便于将来平滑升级算法。
HASH_SCHEME = "pbkdf2_sha256" HASH_SCHEME = "pbkdf2_sha256"
# PBKDF2 迭代次数在安全性计算开销之间做平衡。 # PBKDF2 迭代次数在安全性计算开销之间做平衡。
HASH_ITERATIONS = 150_000 HASH_ITERATIONS = 150_000
# 已知高风险密码列表 # 风险口令清单
# 1. 这里只覆盖默认密码和常见弱密码,不做完整字典 # 1. 这里优先覆盖系统默认口令和常见极弱口令
# 2. 后台安全提醒只需要识别“明显高风险”的情况即可 # 2. 后台安全判断只需要识别“明显险”的情况,不追求做成完整密码字典
# 3. 统一收敛在数据层,方便登录、改密、首提醒共用。 # 3. 统一在数据层,便于登录鉴权、修改密、首提醒共用。
RISKY_PASSWORDS = { RISKY_PASSWORDS = {
"admin", "admin",
"admin123", "admin123",
@@ -41,7 +40,10 @@ class AdminAccountDBOperator(BaseDBOperator):
} }
def init_tables(self) -> bool: def init_tables(self) -> bool:
"""初始化后台管理员账号表。""" """初始化后台管理员表。
表名使用 t_admin_ 前缀,满足后台账号体系命名约定。
"""
sql = """ sql = """
CREATE TABLE IF NOT EXISTS t_admin_accounts ( CREATE TABLE IF NOT EXISTS t_admin_accounts (
id BIGINT AUTO_INCREMENT PRIMARY KEY, id BIGINT AUTO_INCREMENT PRIMARY KEY,
@@ -72,17 +74,12 @@ class AdminAccountDBOperator(BaseDBOperator):
) )
def ensure_default_admin(self, username: str, password: str, display_name: str = "系统管理员") -> bool: def ensure_default_admin(self, username: str, password: str, display_name: str = "系统管理员") -> bool:
"""确保默认管理员存在,并在安全前提下补齐旧配置密码迁移 """确保默认管理员存在。
这里的迁移策略分三层 行为约束
1. 数据库里还没有管理员账号时,按当前配置创建初始账号 1. 若用户名已存在,不覆盖既有密码
2. 数据库里已经有账号,且密码本来就和当前配置一致时,直接视为已同步 2. 仅在“表里不存在该账号”时创建初始账号
3. 数据库里仍是历史默认弱密码,但配置里已经换成了更强密码时, 3. 方便从旧配置平滑迁移到数据库账号体系。
自动把强密码同步进数据库,避免登录和弱密码提示长期错位。
注意:
只有在“数据库当前密码仍然是已知弱密码,而传入配置密码不是弱密码”时,
才允许用配置回写数据库;如果用户已经在后台主动改过密码,则继续以数据库为准。
""" """
normalized_username = str(username or "").strip() normalized_username = str(username or "").strip()
normalized_password = str(password or "").strip() normalized_password = str(password or "").strip()
@@ -90,25 +87,17 @@ class AdminAccountDBOperator(BaseDBOperator):
return False return False
existing = self.get_admin_by_username(normalized_username) existing = self.get_admin_by_username(normalized_username)
if not existing: if existing:
password_hash = self.hash_password(normalized_password)
return self.execute_update(
"""
INSERT INTO t_admin_accounts (username, password_hash, display_name, status)
VALUES (%s, %s, %s, 1)
""",
(normalized_username, password_hash, str(display_name or "").strip() or normalized_username),
)
stored_hash = str(existing.get("password_hash") or "")
if stored_hash and self.verify_password(normalized_password, stored_hash):
return True return True
current_password_is_risky = self.is_password_hash_using_risky_password(stored_hash) password_hash = self.hash_password(normalized_password)
incoming_password_is_risky = normalized_password.lower() in self.RISKY_PASSWORDS return self.execute_update(
if current_password_is_risky and not incoming_password_is_risky: """
return self.update_password(normalized_username, normalized_password) INSERT INTO t_admin_accounts (username, password_hash, display_name, status)
return True VALUES (%s, %s, %s, 1)
""",
(normalized_username, password_hash, str(display_name or "").strip() or normalized_username),
)
def verify_admin_password(self, username: str, password: str) -> bool: def verify_admin_password(self, username: str, password: str) -> bool:
"""校验账号口令是否正确。""" """校验账号口令是否正确。"""
@@ -146,22 +135,17 @@ class AdminAccountDBOperator(BaseDBOperator):
) )
def is_using_risky_password(self, username: str) -> bool: def is_using_risky_password(self, username: str) -> bool:
"""判断指定管理员是否仍在使用已知弱密码""" """判断指定管理员是否仍在使用风险口令"""
row = self.get_admin_by_username(username) row = self.get_admin_by_username(username)
if not row: if not row:
return False return False
stored_hash = str(row.get("password_hash") or "") stored_hash = str(row.get("password_hash") or "")
return self.is_password_hash_using_risky_password(stored_hash)
@classmethod
def is_password_hash_using_risky_password(cls, stored_hash: str) -> bool:
"""判断一个已存储密码哈希是否仍然对应已知弱密码。"""
if not stored_hash: if not stored_hash:
return False return False
# 哈希无法反解,这里只能把已知高风险候选逐个比对。 # 口令是哈希存储的,因此只能把风险候选逐个比对。
for candidate in cls.RISKY_PASSWORDS: for candidate in self.RISKY_PASSWORDS:
if cls.verify_password(candidate, stored_hash): if self.verify_password(candidate, stored_hash):
return True return True
return False return False
@@ -170,10 +154,10 @@ class AdminAccountDBOperator(BaseDBOperator):
"""校验密码强度,返回错误提示;通过时返回 None。""" """校验密码强度,返回错误提示;通过时返回 None。"""
password_text = str(raw_password or "") password_text = str(raw_password or "")
if len(password_text) < 8: if len(password_text) < 8:
return "新密码长度不能少于 8 " return "新密码长度不能少于8"
if password_text.lower() in cls.RISKY_PASSWORDS: if password_text.lower() in cls.RISKY_PASSWORDS:
return "新密码过于简单,请避免使用默认口令或常见弱密码" return "新密码过于简单,请避免使用默认口令或常见弱口令"
score = 0 score = 0
if re.search(r"[A-Za-z]", password_text): if re.search(r"[A-Za-z]", password_text):
@@ -183,7 +167,7 @@ class AdminAccountDBOperator(BaseDBOperator):
if re.search(r"[^A-Za-z0-9]", password_text): if re.search(r"[^A-Za-z0-9]", password_text):
score += 1 score += 1
# 至少满足两类字符,既兼顾安全性,也避免把规则设得过于苛刻。 # 至少满足两类字符,既兼顾安全性,也避免把规则设得过于苛刻。
if score < 2: if score < 2:
return "新密码需至少包含字母、数字、符号中的两类" return "新密码需至少包含字母、数字、符号中的两类"
return None return None
@@ -193,7 +177,7 @@ class AdminAccountDBOperator(BaseDBOperator):
"""生成口令哈希。 """生成口令哈希。
存储格式: 存储格式:
pbkdf2_sha256$迭代次数$盐HEX$摘要base64 pbkdf2_sha256$迭代次数$盐(HEX)$哈希(base64)
""" """
password_text = str(raw_password or "") password_text = str(raw_password or "")
salt_bytes = secrets.token_bytes(16) salt_bytes = secrets.token_bytes(16)
@@ -213,7 +197,7 @@ class AdminAccountDBOperator(BaseDBOperator):
安全细节: 安全细节:
1. 使用 hmac.compare_digest避免时序侧信道问题 1. 使用 hmac.compare_digest避免时序侧信道问题
2. 对格式异常统一返回 False避免异常中断登录流程。 2. 对格式异常统一返回 False避免抛错打断登录流程。
""" """
try: try:
scheme, iterations_text, salt_hex, digest_b64 = str(stored_hash or "").split("$", 3) scheme, iterations_text, salt_hex, digest_b64 = str(stored_hash or "").split("$", 3)

View File

@@ -1,171 +0,0 @@
import unittest
from flask import Blueprint, Flask
from admin.dashboard.blueprints.auth import auth_bp
from admin.dashboard.server import DashboardServer
from db.admin_account_db import AdminAccountDBOperator
class DummyAdminDB:
"""用于回归测试后台登录与弱密码判定的最小桩对象。"""
RISKY_PASSWORDS = AdminAccountDBOperator.RISKY_PASSWORDS
def __init__(self, row_exists: bool, db_password: str, risky: bool):
self.row_exists = row_exists
self.db_password = db_password
self.risky = risky
self.login_success_marked = False
def get_admin_by_username(self, username: str):
if not self.row_exists:
return None
return {
"username": username,
"status": 1,
"password_hash": self.db_password,
}
def verify_password(self, raw_password: str, stored_hash: str) -> bool:
# 这里不引入真实哈希算法,直接把“数据库里当前有效的密码”抽象成 db_password
# 只验证登录流程是否仍然错误地回退到了 config.toml。
return raw_password == self.db_password
def is_using_risky_password(self, username: str) -> bool:
return self.risky
def mark_login_success(self, username: str, login_ip: str = "") -> bool:
self.login_success_marked = True
return True
class DummyServer:
"""为 auth 蓝图提供最小运行依赖,避免把整套 Dashboard 初始化起来。"""
def __init__(self, username: str, password: str, admin_db: DummyAdminDB):
self.username = username
self.password = password
self.admin_account_db = admin_db
def get_login_lock_status(self, username: str, remote_ip: str) -> dict:
return {"locked": False, "remaining_seconds": 0, "failed_count": 0}
def get_auth_policy(self) -> dict:
return {"max_failed_attempts": 5, "lock_seconds": 900, "session_timeout_minutes": 480}
def clear_login_failures(self, username: str, remote_ip: str) -> None:
return None
def mark_login_failure(self, username: str, remote_ip: str) -> dict:
return {"locked": False, "remaining_seconds": 0, "failed_count": 1}
def should_force_password_change(self, username: str) -> bool:
temp_server = DashboardServer.__new__(DashboardServer)
temp_server.username = self.username
temp_server.password = self.password
temp_server.admin_account_db = self.admin_account_db
temp_server.LOG = None
return DashboardServer.should_force_password_change(temp_server, username)
class FakeAdminAccountDBOperator(AdminAccountDBOperator):
"""只覆盖 ensure_default_admin 测试所需的方法,避免真实数据库依赖。"""
def __init__(self, existing_row=None):
super().__init__(db_manager=None)
self.existing_row = existing_row
self.updated_password = None
self.inserted = False
def get_admin_by_username(self, username: str):
return self.existing_row
@classmethod
def verify_password(cls, raw_password: str, stored_hash: str) -> bool:
# 测试里直接把 password_hash 当成明文占位值,重点验证迁移分支是否被正确触发。
return raw_password == stored_hash
def update_password(self, username: str, new_password: str) -> bool:
self.updated_password = (username, new_password)
return True
def execute_update(self, sql: str, params=None) -> bool:
self.inserted = True
return True
class DashboardAuthLogicTestCase(unittest.TestCase):
def create_app(self, server: DummyServer) -> Flask:
app = Flask(__name__)
app.secret_key = "test-secret"
app.dashboard_server = server
main_bp = Blueprint("main", __name__)
@main_bp.route("/")
def index():
return "ok"
app.register_blueprint(auth_bp)
app.register_blueprint(main_bp)
return app
def test_should_force_password_change_ignores_legacy_config_when_db_password_is_strong(self):
server = DummyServer(
username="admin",
password="admin123",
admin_db=DummyAdminDB(row_exists=True, db_password="StrongPass!2026", risky=False),
)
self.assertFalse(server.should_force_password_change("admin"))
def test_login_rejects_legacy_config_password_after_db_password_changed(self):
server = DummyServer(
username="admin",
password="admin123",
admin_db=DummyAdminDB(row_exists=True, db_password="StrongPass!2026", risky=False),
)
app = self.create_app(server)
with app.test_client() as client:
response = client.post(
"/login",
data={"username": "admin", "password": "admin123"},
headers={"X-Requested-With": "XMLHttpRequest"},
)
self.assertEqual(response.status_code, 400)
self.assertFalse(server.admin_account_db.login_success_marked)
def test_login_keeps_legacy_config_fallback_when_db_account_missing(self):
server = DummyServer(
username="admin",
password="admin123",
admin_db=DummyAdminDB(row_exists=False, db_password="", risky=False),
)
app = self.create_app(server)
with app.test_client() as client:
response = client.post(
"/login",
data={"username": "admin", "password": "admin123"},
headers={"X-Requested-With": "XMLHttpRequest"},
)
self.assertEqual(response.status_code, 200)
self.assertTrue(response.get_json()["success"])
def test_ensure_default_admin_syncs_strong_config_password_over_risky_seed(self):
operator = FakeAdminAccountDBOperator(
existing_row={"username": "admin", "password_hash": "admin123", "status": 1}
)
result = operator.ensure_default_admin("admin", "StrongPass!2026")
self.assertTrue(result)
self.assertEqual(operator.updated_password, ("admin", "StrongPass!2026"))
if __name__ == "__main__":
unittest.main()