# -*- coding: utf-8 -*- """趣味指令规则后台蓝图。""" import os import uuid from datetime import datetime from typing import Any, Dict from flask import Blueprint, current_app, jsonify, render_template, request from werkzeug.utils import secure_filename from .auth import login_required fun_command_rules_bp = Blueprint("fun_command_rules", __name__, url_prefix="/fun_command_rules") # 媒体上传白名单: # 这里仅允许趣味指令管理功能需要的媒体类型,避免上传任意可执行文件带来风险。 ALLOWED_EXTENSIONS = { "image": {"png", "jpg", "jpeg", "gif", "webp"}, "voice": {"mp3", "wav", "amr"}, "video": {"mp4", "mov", "m4v"}, } def _normalize_datetime_text(value): """统一时间字段展示格式。""" if value is None: return value if isinstance(value, datetime): return value.strftime("%Y-%m-%d %H:%M:%S") text = str(value) if "T" in text: return text.replace("T", " ")[:19] return text def _build_payload(raw: Dict[str, Any]) -> Dict[str, Any]: """构造并清洗规则载荷。""" return { "rule_name": str(raw.get("rule_name", "") or "").strip(), "scope_type": str(raw.get("scope_type", "global") or "global").strip().lower(), "scope_id": str(raw.get("scope_id", "") or "").strip(), "trigger_type": str(raw.get("trigger_type", "exact") or "exact").strip().lower(), "trigger_text": str(raw.get("trigger_text", "") or "").strip(), "event_key": str(raw.get("event_key", "") or "").strip().upper(), "responses_json": raw.get("responses_json", []), "priority": int(raw.get("priority", 100) or 100), "cooldown_seconds": int(raw.get("cooldown_seconds", 0) or 0), "enabled": bool(raw.get("enabled", True)), "updated_by": str(raw.get("updated_by", "dashboard") or "dashboard").strip() or "dashboard", } def _validate_payload(payload: Dict[str, Any], service) -> str: """校验规则数据,返回空字符串表示通过。""" if not payload["rule_name"]: return "rule_name 不能为空" if payload["scope_type"] not in {"global", "group", "private"}: return "scope_type 仅支持 global/group/private" # group/private 必须提供 scope_id,防止误配为全量匹配。 if payload["scope_type"] in {"group", "private"} and not payload["scope_id"]: return "group/private 作用域必须填写 scope_id" if payload["trigger_type"] not in {"exact", "prefix", "contains", "regex", "event"}: return "trigger_type 仅支持 exact/prefix/contains/regex/event" if payload["trigger_type"] == "event": if not payload["event_key"]: return "event 触发时 event_key 不能为空" else: if not payload["trigger_text"]: return "文本触发时 trigger_text 不能为空" ok, msg, normalized = service.validate_responses(payload.get("responses_json")) if not ok: return msg payload["responses_json"] = normalized return "" def _extract_file_ext(filename: str) -> str: """从原始文件名提取扩展名(小写,不含点)。""" if "." not in str(filename or ""): return "" return str(filename).rsplit(".", 1)[1].lower().strip() @fun_command_rules_bp.route("/") @login_required def page_fun_command_rules(): return render_template("fun_command_rules.html") @fun_command_rules_bp.route("/api/upload", methods=["POST"]) @login_required def api_upload_media(): """上传媒体文件并返回服务端绝对路径。 设计说明: 1. 使用独立目录 static/uploads/fun_command_rules,与其它业务上传隔离。 2. 返回绝对路径,直接可用于插件发送媒体消息。 """ if "file" not in request.files: return jsonify({"success": False, "message": "未检测到上传文件"}), 400 media_type = str(request.form.get("media_type", "") or "").strip().lower() if media_type not in ALLOWED_EXTENSIONS: return jsonify({"success": False, "message": "media_type 非法,仅支持 image/voice/video"}), 400 upload_file = request.files.get("file") if not upload_file or not upload_file.filename: return jsonify({"success": False, "message": "文件名为空"}), 400 # 注意:secure_filename 对中文文件名会清洗成 “wav” 这类无点字符串, # 若直接基于清洗后文件名做扩展名校验会误判。这里改为优先使用原始文件名提取扩展名。 raw_filename = str(upload_file.filename or "").strip() file_ext = _extract_file_ext(raw_filename) if file_ext not in ALLOWED_EXTENSIONS.get(media_type, set()): return jsonify({"success": False, "message": f"不支持的文件类型: {raw_filename}"}), 400 # 生成安全文件名:basename 走 secure_filename,扩展名使用原始提取值兜底。 raw_basename = raw_filename.rsplit(".", 1)[0] if "." in raw_filename else raw_filename safe_basename = secure_filename(raw_basename).strip() if not safe_basename: safe_basename = "upload_file" filename = f"{safe_basename}.{file_ext}" # 按媒体类型分目录,便于后续清理和排查。 upload_root = os.path.join(current_app.root_path, "static", "uploads", "fun_command_rules", media_type) os.makedirs(upload_root, exist_ok=True) unique_filename = f"{uuid.uuid4().hex}_{filename}" abs_path = os.path.abspath(os.path.join(upload_root, unique_filename)) upload_file.save(abs_path) return jsonify({ "success": True, "message": "上传成功", "data": { "path": abs_path, "filename": unique_filename, "media_type": media_type, } }) @fun_command_rules_bp.route("/api/list", methods=["GET"]) @login_required def api_list_rules(): server = current_app.dashboard_server service = server.fun_command_rule_service scope_type = str(request.args.get("scope_type", "") or "").strip().lower() scope_id = str(request.args.get("scope_id", "") or "").strip() enabled_raw = str(request.args.get("enabled", "") or "").strip().lower() enabled = None if enabled_raw in {"0", "1", "true", "false"}: enabled = enabled_raw in {"1", "true"} rows = service.list_rules(scope_type=scope_type, scope_id=scope_id, enabled=enabled) for row in rows: row["created_at"] = _normalize_datetime_text(row.get("created_at")) row["updated_at"] = _normalize_datetime_text(row.get("updated_at")) return jsonify({"success": True, "data": rows}) @fun_command_rules_bp.route("/api/create", methods=["POST"]) @login_required def api_create_rule(): server = current_app.dashboard_server service = server.fun_command_rule_service raw = request.get_json(silent=True) or {} payload = _build_payload(raw) error_text = _validate_payload(payload, service) if error_text: return jsonify({"success": False, "message": error_text}), 400 ok = service.create_rule(payload) if not ok: return jsonify({"success": False, "message": "创建失败"}), 500 return jsonify({"success": True, "message": "创建成功"}) @fun_command_rules_bp.route("/api/update/", methods=["POST"]) @login_required def api_update_rule(rule_id: int): server = current_app.dashboard_server service = server.fun_command_rule_service raw = request.get_json(silent=True) or {} payload = _build_payload(raw) error_text = _validate_payload(payload, service) if error_text: return jsonify({"success": False, "message": error_text}), 400 ok = service.update_rule(rule_id=rule_id, payload=payload) if not ok: return jsonify({"success": False, "message": "更新失败"}), 500 return jsonify({"success": True, "message": "更新成功"}) @fun_command_rules_bp.route("/api/delete/", methods=["POST"]) @login_required def api_delete_rule(rule_id: int): server = current_app.dashboard_server service = server.fun_command_rule_service ok = service.delete_rule(rule_id=rule_id) if not ok: return jsonify({"success": False, "message": "删除失败"}), 500 return jsonify({"success": True, "message": "删除成功"}) @fun_command_rules_bp.route("/api/toggle/", methods=["POST"]) @login_required def api_toggle_rule(rule_id: int): server = current_app.dashboard_server service = server.fun_command_rule_service raw = request.get_json(silent=True) or {} enabled = bool(raw.get("enabled", True)) updated_by = str(raw.get("updated_by", "dashboard") or "dashboard").strip() or "dashboard" ok = service.toggle_rule(rule_id=rule_id, enabled=enabled, updated_by=updated_by) if not ok: return jsonify({"success": False, "message": "切换失败"}), 500 return jsonify({"success": True, "message": "状态已更新"})