Files
abot/admin/dashboard/blueprints/fun_command_rules.py
liuwei 37d6c36e2c 修复中文文件名上传wav误判为不支持类型
1. 上传扩展名校验改为基于原始文件名提取,避免secure_filename清洗后丢失点号导致误判。

2. 重建安全文件名时分离basename和扩展名,兼容中文文件名与无效basename场景。

3. 继续保留媒体类型白名单校验,不放宽安全边界。
2026-04-23 13:50:02 +08:00

266 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""趣味指令规则后台蓝图。"""
import os
import uuid
from datetime import datetime
from typing import Any, Dict
from flask import Blueprint, current_app, jsonify, render_template, request
from werkzeug.utils import secure_filename
from .auth import login_required
fun_command_rules_bp = Blueprint("fun_command_rules", __name__, url_prefix="/fun_command_rules")
# 媒体上传白名单:
# 这里仅允许趣味指令管理功能需要的媒体类型,避免上传任意可执行文件带来风险。
ALLOWED_EXTENSIONS = {
"image": {"png", "jpg", "jpeg", "gif", "webp"},
"voice": {"mp3", "wav", "amr"},
"video": {"mp4", "mov", "m4v"},
}
def _normalize_datetime_text(value):
"""统一时间字段展示格式。"""
if value is None:
return value
if isinstance(value, datetime):
return value.strftime("%Y-%m-%d %H:%M:%S")
text = str(value)
if "T" in text:
return text.replace("T", " ")[:19]
return text
def _build_payload(raw: Dict[str, Any]) -> Dict[str, Any]:
"""构造并清洗规则载荷。"""
return {
"rule_name": str(raw.get("rule_name", "") or "").strip(),
"scope_type": str(raw.get("scope_type", "global") or "global").strip().lower(),
"scope_id": str(raw.get("scope_id", "") or "").strip(),
"trigger_type": str(raw.get("trigger_type", "exact") or "exact").strip().lower(),
"trigger_text": str(raw.get("trigger_text", "") or "").strip(),
"event_key": str(raw.get("event_key", "") or "").strip().upper(),
"responses_json": raw.get("responses_json", []),
"priority": int(raw.get("priority", 100) or 100),
"cooldown_seconds": int(raw.get("cooldown_seconds", 0) or 0),
"enabled": bool(raw.get("enabled", True)),
"updated_by": str(raw.get("updated_by", "dashboard") or "dashboard").strip() or "dashboard",
}
def _validate_payload(payload: Dict[str, Any], service) -> str:
"""校验规则数据,返回空字符串表示通过。"""
if not payload["rule_name"]:
return "rule_name 不能为空"
if payload["scope_type"] not in {"global", "group", "private"}:
return "scope_type 仅支持 global/group/private"
# group/private 必须提供 scope_id防止误配为全量匹配。
if payload["scope_type"] in {"group", "private"} and not payload["scope_id"]:
return "group/private 作用域必须填写 scope_id"
if payload["trigger_type"] not in {"exact", "prefix", "contains", "regex", "event"}:
return "trigger_type 仅支持 exact/prefix/contains/regex/event"
if payload["trigger_type"] == "event":
if not payload["event_key"]:
return "event 触发时 event_key 不能为空"
else:
if not payload["trigger_text"]:
return "文本触发时 trigger_text 不能为空"
ok, msg, normalized = service.validate_responses(payload.get("responses_json"))
if not ok:
return msg
payload["responses_json"] = normalized
return ""
def _extract_file_ext(filename: str) -> str:
"""从原始文件名提取扩展名(小写,不含点)。"""
if "." not in str(filename or ""):
return ""
return str(filename).rsplit(".", 1)[1].lower().strip()
@fun_command_rules_bp.route("/")
@login_required
def page_fun_command_rules():
return render_template("fun_command_rules.html")
@fun_command_rules_bp.route("/api/upload", methods=["POST"])
@login_required
def api_upload_media():
"""上传媒体文件并返回服务端绝对路径。
设计说明:
1. 使用独立目录 static/uploads/fun_command_rules与其它业务上传隔离。
2. 返回绝对路径,直接可用于插件发送媒体消息。
"""
if "file" not in request.files:
return jsonify({"success": False, "message": "未检测到上传文件"}), 400
media_type = str(request.form.get("media_type", "") or "").strip().lower()
if media_type not in ALLOWED_EXTENSIONS:
return jsonify({"success": False, "message": "media_type 非法,仅支持 image/voice/video"}), 400
upload_file = request.files.get("file")
if not upload_file or not upload_file.filename:
return jsonify({"success": False, "message": "文件名为空"}), 400
# 注意secure_filename 对中文文件名会清洗成 “wav” 这类无点字符串,
# 若直接基于清洗后文件名做扩展名校验会误判。这里改为优先使用原始文件名提取扩展名。
raw_filename = str(upload_file.filename or "").strip()
file_ext = _extract_file_ext(raw_filename)
if file_ext not in ALLOWED_EXTENSIONS.get(media_type, set()):
return jsonify({"success": False, "message": f"不支持的文件类型: {raw_filename}"}), 400
# 生成安全文件名basename 走 secure_filename扩展名使用原始提取值兜底。
raw_basename = raw_filename.rsplit(".", 1)[0] if "." in raw_filename else raw_filename
safe_basename = secure_filename(raw_basename).strip()
if not safe_basename:
safe_basename = "upload_file"
filename = f"{safe_basename}.{file_ext}"
# 按媒体类型分目录,便于后续清理和排查。
upload_root = os.path.join(current_app.root_path, "static", "uploads", "fun_command_rules", media_type)
os.makedirs(upload_root, exist_ok=True)
unique_filename = f"{uuid.uuid4().hex}_{filename}"
abs_path = os.path.abspath(os.path.join(upload_root, unique_filename))
upload_file.save(abs_path)
return jsonify({
"success": True,
"message": "上传成功",
"data": {
"path": abs_path,
"filename": unique_filename,
"media_type": media_type,
}
})
@fun_command_rules_bp.route("/api/list", methods=["GET"])
@login_required
def api_list_rules():
server = current_app.dashboard_server
service = server.fun_command_rule_service
scope_type = str(request.args.get("scope_type", "") or "").strip().lower()
scope_id = str(request.args.get("scope_id", "") or "").strip()
enabled_raw = str(request.args.get("enabled", "") or "").strip().lower()
enabled = None
if enabled_raw in {"0", "1", "true", "false"}:
enabled = enabled_raw in {"1", "true"}
rows = service.list_rules(scope_type=scope_type, scope_id=scope_id, enabled=enabled)
for row in rows:
row["created_at"] = _normalize_datetime_text(row.get("created_at"))
row["updated_at"] = _normalize_datetime_text(row.get("updated_at"))
return jsonify({"success": True, "data": rows})
@fun_command_rules_bp.route("/api/create", methods=["POST"])
@login_required
def api_create_rule():
server = current_app.dashboard_server
service = server.fun_command_rule_service
raw = request.get_json(silent=True) or {}
payload = _build_payload(raw)
error_text = _validate_payload(payload, service)
if error_text:
return jsonify({"success": False, "message": error_text}), 400
ok = service.create_rule(payload)
if not ok:
return jsonify({"success": False, "message": "创建失败"}), 500
return jsonify({"success": True, "message": "创建成功"})
@fun_command_rules_bp.route("/api/update/<int:rule_id>", methods=["POST"])
@login_required
def api_update_rule(rule_id: int):
server = current_app.dashboard_server
service = server.fun_command_rule_service
raw = request.get_json(silent=True) or {}
payload = _build_payload(raw)
error_text = _validate_payload(payload, service)
if error_text:
return jsonify({"success": False, "message": error_text}), 400
ok = service.update_rule(rule_id=rule_id, payload=payload)
if not ok:
return jsonify({"success": False, "message": "更新失败"}), 500
return jsonify({"success": True, "message": "更新成功"})
@fun_command_rules_bp.route("/api/delete/<int:rule_id>", methods=["POST"])
@login_required
def api_delete_rule(rule_id: int):
server = current_app.dashboard_server
service = server.fun_command_rule_service
ok = service.delete_rule(rule_id=rule_id)
if not ok:
return jsonify({"success": False, "message": "删除失败"}), 500
return jsonify({"success": True, "message": "删除成功"})
@fun_command_rules_bp.route("/api/toggle/<int:rule_id>", methods=["POST"])
@login_required
def api_toggle_rule(rule_id: int):
server = current_app.dashboard_server
service = server.fun_command_rule_service
raw = request.get_json(silent=True) or {}
enabled = bool(raw.get("enabled", True))
updated_by = str(raw.get("updated_by", "dashboard") or "dashboard").strip() or "dashboard"
ok = service.toggle_rule(rule_id=rule_id, enabled=enabled, updated_by=updated_by)
if not ok:
return jsonify({"success": False, "message": "切换失败"}), 500
return jsonify({"success": True, "message": "状态已更新"})
@fun_command_rules_bp.route("/api/test_match", methods=["POST"])
@login_required
def api_test_match():
"""提供后台测试入口,便于快速验证规则命中结果。"""
server = current_app.dashboard_server
service = server.fun_command_rule_service
raw = request.get_json(silent=True) or {}
scope_type = str(raw.get("scope_type", "group") or "group").strip().lower()
scope_id = str(raw.get("scope_id", "") or "").strip()
content = str(raw.get("content", "") or "").strip()
event_key = str(raw.get("event_key", "") or "").strip().upper()
session_key = scope_id or "test-session"
matched = service.match_rule(
scope_type=scope_type,
scope_id=scope_id,
content=content,
event_key=event_key,
session_key=session_key,
)
if not matched:
return jsonify({"success": True, "matched": False, "data": None})
return jsonify({"success": True, "matched": True, "data": matched})