Files
abot/admin/dashboard/blueprints/fun_command_rules.py
liuwei aa56a7ca02 移除响应指令管理规则测试功能及接口
1. 删除页面中的规则测试区域,简化管理端交互。

2. 删除前端关联状态与方法,避免无用请求与冗余代码。

3. 删除后端/api/test_match接口,保持蓝图能力与页面一致。
2026-04-23 14:23:35 +08:00

238 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""趣味指令规则后台蓝图。"""
import os
import uuid
from datetime import datetime
from typing import Any, Dict
from flask import Blueprint, current_app, jsonify, render_template, request
from werkzeug.utils import secure_filename
from .auth import login_required
fun_command_rules_bp = Blueprint("fun_command_rules", __name__, url_prefix="/fun_command_rules")
# 媒体上传白名单:
# 这里仅允许趣味指令管理功能需要的媒体类型,避免上传任意可执行文件带来风险。
ALLOWED_EXTENSIONS = {
"image": {"png", "jpg", "jpeg", "gif", "webp"},
"voice": {"mp3", "wav", "amr"},
"video": {"mp4", "mov", "m4v"},
}
def _normalize_datetime_text(value):
"""统一时间字段展示格式。"""
if value is None:
return value
if isinstance(value, datetime):
return value.strftime("%Y-%m-%d %H:%M:%S")
text = str(value)
if "T" in text:
return text.replace("T", " ")[:19]
return text
def _build_payload(raw: Dict[str, Any]) -> Dict[str, Any]:
"""构造并清洗规则载荷。"""
return {
"rule_name": str(raw.get("rule_name", "") or "").strip(),
"scope_type": str(raw.get("scope_type", "global") or "global").strip().lower(),
"scope_id": str(raw.get("scope_id", "") or "").strip(),
"trigger_type": str(raw.get("trigger_type", "exact") or "exact").strip().lower(),
"trigger_text": str(raw.get("trigger_text", "") or "").strip(),
"event_key": str(raw.get("event_key", "") or "").strip().upper(),
"responses_json": raw.get("responses_json", []),
"priority": int(raw.get("priority", 100) or 100),
"cooldown_seconds": int(raw.get("cooldown_seconds", 0) or 0),
"enabled": bool(raw.get("enabled", True)),
"updated_by": str(raw.get("updated_by", "dashboard") or "dashboard").strip() or "dashboard",
}
def _validate_payload(payload: Dict[str, Any], service) -> str:
"""校验规则数据,返回空字符串表示通过。"""
if not payload["rule_name"]:
return "rule_name 不能为空"
if payload["scope_type"] not in {"global", "group", "private"}:
return "scope_type 仅支持 global/group/private"
# group/private 必须提供 scope_id防止误配为全量匹配。
if payload["scope_type"] in {"group", "private"} and not payload["scope_id"]:
return "group/private 作用域必须填写 scope_id"
if payload["trigger_type"] not in {"exact", "prefix", "contains", "regex", "event"}:
return "trigger_type 仅支持 exact/prefix/contains/regex/event"
if payload["trigger_type"] == "event":
if not payload["event_key"]:
return "event 触发时 event_key 不能为空"
else:
if not payload["trigger_text"]:
return "文本触发时 trigger_text 不能为空"
ok, msg, normalized = service.validate_responses(payload.get("responses_json"))
if not ok:
return msg
payload["responses_json"] = normalized
return ""
def _extract_file_ext(filename: str) -> str:
"""从原始文件名提取扩展名(小写,不含点)。"""
if "." not in str(filename or ""):
return ""
return str(filename).rsplit(".", 1)[1].lower().strip()
@fun_command_rules_bp.route("/")
@login_required
def page_fun_command_rules():
return render_template("fun_command_rules.html")
@fun_command_rules_bp.route("/api/upload", methods=["POST"])
@login_required
def api_upload_media():
"""上传媒体文件并返回服务端绝对路径。
设计说明:
1. 使用独立目录 static/uploads/fun_command_rules与其它业务上传隔离。
2. 返回绝对路径,直接可用于插件发送媒体消息。
"""
if "file" not in request.files:
return jsonify({"success": False, "message": "未检测到上传文件"}), 400
media_type = str(request.form.get("media_type", "") or "").strip().lower()
if media_type not in ALLOWED_EXTENSIONS:
return jsonify({"success": False, "message": "media_type 非法,仅支持 image/voice/video"}), 400
upload_file = request.files.get("file")
if not upload_file or not upload_file.filename:
return jsonify({"success": False, "message": "文件名为空"}), 400
# 注意secure_filename 对中文文件名会清洗成 “wav” 这类无点字符串,
# 若直接基于清洗后文件名做扩展名校验会误判。这里改为优先使用原始文件名提取扩展名。
raw_filename = str(upload_file.filename or "").strip()
file_ext = _extract_file_ext(raw_filename)
if file_ext not in ALLOWED_EXTENSIONS.get(media_type, set()):
return jsonify({"success": False, "message": f"不支持的文件类型: {raw_filename}"}), 400
# 生成安全文件名basename 走 secure_filename扩展名使用原始提取值兜底。
raw_basename = raw_filename.rsplit(".", 1)[0] if "." in raw_filename else raw_filename
safe_basename = secure_filename(raw_basename).strip()
if not safe_basename:
safe_basename = "upload_file"
filename = f"{safe_basename}.{file_ext}"
# 按媒体类型分目录,便于后续清理和排查。
upload_root = os.path.join(current_app.root_path, "static", "uploads", "fun_command_rules", media_type)
os.makedirs(upload_root, exist_ok=True)
unique_filename = f"{uuid.uuid4().hex}_{filename}"
abs_path = os.path.abspath(os.path.join(upload_root, unique_filename))
upload_file.save(abs_path)
return jsonify({
"success": True,
"message": "上传成功",
"data": {
"path": abs_path,
"filename": unique_filename,
"media_type": media_type,
}
})
@fun_command_rules_bp.route("/api/list", methods=["GET"])
@login_required
def api_list_rules():
server = current_app.dashboard_server
service = server.fun_command_rule_service
scope_type = str(request.args.get("scope_type", "") or "").strip().lower()
scope_id = str(request.args.get("scope_id", "") or "").strip()
enabled_raw = str(request.args.get("enabled", "") or "").strip().lower()
enabled = None
if enabled_raw in {"0", "1", "true", "false"}:
enabled = enabled_raw in {"1", "true"}
rows = service.list_rules(scope_type=scope_type, scope_id=scope_id, enabled=enabled)
for row in rows:
row["created_at"] = _normalize_datetime_text(row.get("created_at"))
row["updated_at"] = _normalize_datetime_text(row.get("updated_at"))
return jsonify({"success": True, "data": rows})
@fun_command_rules_bp.route("/api/create", methods=["POST"])
@login_required
def api_create_rule():
server = current_app.dashboard_server
service = server.fun_command_rule_service
raw = request.get_json(silent=True) or {}
payload = _build_payload(raw)
error_text = _validate_payload(payload, service)
if error_text:
return jsonify({"success": False, "message": error_text}), 400
ok = service.create_rule(payload)
if not ok:
return jsonify({"success": False, "message": "创建失败"}), 500
return jsonify({"success": True, "message": "创建成功"})
@fun_command_rules_bp.route("/api/update/<int:rule_id>", methods=["POST"])
@login_required
def api_update_rule(rule_id: int):
server = current_app.dashboard_server
service = server.fun_command_rule_service
raw = request.get_json(silent=True) or {}
payload = _build_payload(raw)
error_text = _validate_payload(payload, service)
if error_text:
return jsonify({"success": False, "message": error_text}), 400
ok = service.update_rule(rule_id=rule_id, payload=payload)
if not ok:
return jsonify({"success": False, "message": "更新失败"}), 500
return jsonify({"success": True, "message": "更新成功"})
@fun_command_rules_bp.route("/api/delete/<int:rule_id>", methods=["POST"])
@login_required
def api_delete_rule(rule_id: int):
server = current_app.dashboard_server
service = server.fun_command_rule_service
ok = service.delete_rule(rule_id=rule_id)
if not ok:
return jsonify({"success": False, "message": "删除失败"}), 500
return jsonify({"success": True, "message": "删除成功"})
@fun_command_rules_bp.route("/api/toggle/<int:rule_id>", methods=["POST"])
@login_required
def api_toggle_rule(rule_id: int):
server = current_app.dashboard_server
service = server.fun_command_rule_service
raw = request.get_json(silent=True) or {}
enabled = bool(raw.get("enabled", True))
updated_by = str(raw.get("updated_by", "dashboard") or "dashboard").strip() or "dashboard"
ok = service.toggle_rule(rule_id=rule_id, enabled=enabled, updated_by=updated_by)
if not ok:
return jsonify({"success": False, "message": "切换失败"}), 500
return jsonify({"success": True, "message": "状态已更新"})