Files
abot/admin/dashboard/blueprints/fun_command_rules.py
liuwei 3c7becd94f 响应指令管理支持媒体上传并自动回填路径
1. 新增响应指令管理专用媒体上传接口,按图片语音视频白名单校验并分目录存储。

2. 在动作配置UI中为图片语音视频增加上传按钮,上传成功后自动回填本地绝对路径。

3. 保留结构化动作表单,进一步减少手工维护路径和JSON的场景。
2026-04-23 13:30:17 +08:00

257 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""趣味指令规则后台蓝图。"""
import os
import uuid
from datetime import datetime
from typing import Any, Dict
from flask import Blueprint, current_app, jsonify, render_template, request
from werkzeug.utils import secure_filename
from .auth import login_required
fun_command_rules_bp = Blueprint("fun_command_rules", __name__, url_prefix="/fun_command_rules")
# 媒体上传白名单:
# 这里仅允许趣味指令管理功能需要的媒体类型,避免上传任意可执行文件带来风险。
ALLOWED_EXTENSIONS = {
"image": {"png", "jpg", "jpeg", "gif", "webp"},
"voice": {"mp3", "wav", "amr"},
"video": {"mp4", "mov", "m4v"},
}
def _normalize_datetime_text(value):
"""统一时间字段展示格式。"""
if value is None:
return value
if isinstance(value, datetime):
return value.strftime("%Y-%m-%d %H:%M:%S")
text = str(value)
if "T" in text:
return text.replace("T", " ")[:19]
return text
def _build_payload(raw: Dict[str, Any]) -> Dict[str, Any]:
"""构造并清洗规则载荷。"""
return {
"rule_name": str(raw.get("rule_name", "") or "").strip(),
"scope_type": str(raw.get("scope_type", "global") or "global").strip().lower(),
"scope_id": str(raw.get("scope_id", "") or "").strip(),
"trigger_type": str(raw.get("trigger_type", "exact") or "exact").strip().lower(),
"trigger_text": str(raw.get("trigger_text", "") or "").strip(),
"event_key": str(raw.get("event_key", "") or "").strip().upper(),
"responses_json": raw.get("responses_json", []),
"priority": int(raw.get("priority", 100) or 100),
"cooldown_seconds": int(raw.get("cooldown_seconds", 0) or 0),
"enabled": bool(raw.get("enabled", True)),
"updated_by": str(raw.get("updated_by", "dashboard") or "dashboard").strip() or "dashboard",
}
def _validate_payload(payload: Dict[str, Any], service) -> str:
"""校验规则数据,返回空字符串表示通过。"""
if not payload["rule_name"]:
return "rule_name 不能为空"
if payload["scope_type"] not in {"global", "group", "private"}:
return "scope_type 仅支持 global/group/private"
# group/private 必须提供 scope_id防止误配为全量匹配。
if payload["scope_type"] in {"group", "private"} and not payload["scope_id"]:
return "group/private 作用域必须填写 scope_id"
if payload["trigger_type"] not in {"exact", "prefix", "contains", "regex", "event"}:
return "trigger_type 仅支持 exact/prefix/contains/regex/event"
if payload["trigger_type"] == "event":
if not payload["event_key"]:
return "event 触发时 event_key 不能为空"
else:
if not payload["trigger_text"]:
return "文本触发时 trigger_text 不能为空"
ok, msg, normalized = service.validate_responses(payload.get("responses_json"))
if not ok:
return msg
payload["responses_json"] = normalized
return ""
def _allowed_file(filename: str, media_type: str) -> bool:
"""判断上传文件扩展名是否允许。"""
if "." not in filename:
return False
ext = filename.rsplit(".", 1)[1].lower()
return ext in ALLOWED_EXTENSIONS.get(media_type, set())
@fun_command_rules_bp.route("/")
@login_required
def page_fun_command_rules():
return render_template("fun_command_rules.html")
@fun_command_rules_bp.route("/api/upload", methods=["POST"])
@login_required
def api_upload_media():
"""上传媒体文件并返回服务端绝对路径。
设计说明:
1. 使用独立目录 static/uploads/fun_command_rules与其它业务上传隔离。
2. 返回绝对路径,直接可用于插件发送媒体消息。
"""
if "file" not in request.files:
return jsonify({"success": False, "message": "未检测到上传文件"}), 400
media_type = str(request.form.get("media_type", "") or "").strip().lower()
if media_type not in ALLOWED_EXTENSIONS:
return jsonify({"success": False, "message": "media_type 非法,仅支持 image/voice/video"}), 400
upload_file = request.files.get("file")
if not upload_file or not upload_file.filename:
return jsonify({"success": False, "message": "文件名为空"}), 400
filename = secure_filename(upload_file.filename)
if not _allowed_file(filename, media_type):
return jsonify({"success": False, "message": f"不支持的文件类型: {filename}"}), 400
# 按媒体类型分目录,便于后续清理和排查。
upload_root = os.path.join(current_app.root_path, "static", "uploads", "fun_command_rules", media_type)
os.makedirs(upload_root, exist_ok=True)
unique_filename = f"{uuid.uuid4().hex}_{filename}"
abs_path = os.path.abspath(os.path.join(upload_root, unique_filename))
upload_file.save(abs_path)
return jsonify({
"success": True,
"message": "上传成功",
"data": {
"path": abs_path,
"filename": unique_filename,
"media_type": media_type,
}
})
@fun_command_rules_bp.route("/api/list", methods=["GET"])
@login_required
def api_list_rules():
server = current_app.dashboard_server
service = server.fun_command_rule_service
scope_type = str(request.args.get("scope_type", "") or "").strip().lower()
scope_id = str(request.args.get("scope_id", "") or "").strip()
enabled_raw = str(request.args.get("enabled", "") or "").strip().lower()
enabled = None
if enabled_raw in {"0", "1", "true", "false"}:
enabled = enabled_raw in {"1", "true"}
rows = service.list_rules(scope_type=scope_type, scope_id=scope_id, enabled=enabled)
for row in rows:
row["created_at"] = _normalize_datetime_text(row.get("created_at"))
row["updated_at"] = _normalize_datetime_text(row.get("updated_at"))
return jsonify({"success": True, "data": rows})
@fun_command_rules_bp.route("/api/create", methods=["POST"])
@login_required
def api_create_rule():
server = current_app.dashboard_server
service = server.fun_command_rule_service
raw = request.get_json(silent=True) or {}
payload = _build_payload(raw)
error_text = _validate_payload(payload, service)
if error_text:
return jsonify({"success": False, "message": error_text}), 400
ok = service.create_rule(payload)
if not ok:
return jsonify({"success": False, "message": "创建失败"}), 500
return jsonify({"success": True, "message": "创建成功"})
@fun_command_rules_bp.route("/api/update/<int:rule_id>", methods=["POST"])
@login_required
def api_update_rule(rule_id: int):
server = current_app.dashboard_server
service = server.fun_command_rule_service
raw = request.get_json(silent=True) or {}
payload = _build_payload(raw)
error_text = _validate_payload(payload, service)
if error_text:
return jsonify({"success": False, "message": error_text}), 400
ok = service.update_rule(rule_id=rule_id, payload=payload)
if not ok:
return jsonify({"success": False, "message": "更新失败"}), 500
return jsonify({"success": True, "message": "更新成功"})
@fun_command_rules_bp.route("/api/delete/<int:rule_id>", methods=["POST"])
@login_required
def api_delete_rule(rule_id: int):
server = current_app.dashboard_server
service = server.fun_command_rule_service
ok = service.delete_rule(rule_id=rule_id)
if not ok:
return jsonify({"success": False, "message": "删除失败"}), 500
return jsonify({"success": True, "message": "删除成功"})
@fun_command_rules_bp.route("/api/toggle/<int:rule_id>", methods=["POST"])
@login_required
def api_toggle_rule(rule_id: int):
server = current_app.dashboard_server
service = server.fun_command_rule_service
raw = request.get_json(silent=True) or {}
enabled = bool(raw.get("enabled", True))
updated_by = str(raw.get("updated_by", "dashboard") or "dashboard").strip() or "dashboard"
ok = service.toggle_rule(rule_id=rule_id, enabled=enabled, updated_by=updated_by)
if not ok:
return jsonify({"success": False, "message": "切换失败"}), 500
return jsonify({"success": True, "message": "状态已更新"})
@fun_command_rules_bp.route("/api/test_match", methods=["POST"])
@login_required
def api_test_match():
"""提供后台测试入口,便于快速验证规则命中结果。"""
server = current_app.dashboard_server
service = server.fun_command_rule_service
raw = request.get_json(silent=True) or {}
scope_type = str(raw.get("scope_type", "group") or "group").strip().lower()
scope_id = str(raw.get("scope_id", "") or "").strip()
content = str(raw.get("content", "") or "").strip()
event_key = str(raw.get("event_key", "") or "").strip().upper()
session_key = scope_id or "test-session"
matched = service.match_rule(
scope_type=scope_type,
scope_id=scope_id,
content=content,
event_key=event_key,
session_key=session_key,
)
if not matched:
return jsonify({"success": True, "matched": False, "data": None})
return jsonify({"success": True, "matched": True, "data": matched})