# -*- coding: utf-8 -*- """趣味指令规则后台蓝图。""" import os import uuid from datetime import datetime from typing import Any, Dict from flask import Blueprint, current_app, jsonify, render_template, request from werkzeug.utils import secure_filename from .auth import login_required fun_command_rules_bp = Blueprint("fun_command_rules", __name__, url_prefix="/fun_command_rules") # 媒体上传白名单: # 这里仅允许趣味指令管理功能需要的媒体类型,避免上传任意可执行文件带来风险。 ALLOWED_EXTENSIONS = { "image": {"png", "jpg", "jpeg", "gif", "webp"}, "voice": {"mp3", "wav", "amr"}, "video": {"mp4", "mov", "m4v"}, } def _normalize_datetime_text(value): """统一时间字段展示格式。""" if value is None: return value if isinstance(value, datetime): return value.strftime("%Y-%m-%d %H:%M:%S") text = str(value) if "T" in text: return text.replace("T", " ")[:19] return text def _build_payload(raw: Dict[str, Any]) -> Dict[str, Any]: """构造并清洗规则载荷。""" return { "rule_name": str(raw.get("rule_name", "") or "").strip(), "scope_type": str(raw.get("scope_type", "global") or "global").strip().lower(), "scope_id": str(raw.get("scope_id", "") or "").strip(), "trigger_type": str(raw.get("trigger_type", "exact") or "exact").strip().lower(), "trigger_text": str(raw.get("trigger_text", "") or "").strip(), "event_key": str(raw.get("event_key", "") or "").strip().upper(), "responses_json": raw.get("responses_json", []), "priority": int(raw.get("priority", 100) or 100), "cooldown_seconds": int(raw.get("cooldown_seconds", 0) or 0), "enabled": bool(raw.get("enabled", True)), "updated_by": str(raw.get("updated_by", "dashboard") or "dashboard").strip() or "dashboard", } def _validate_payload(payload: Dict[str, Any], service) -> str: """校验规则数据,返回空字符串表示通过。""" if not payload["rule_name"]: return "rule_name 不能为空" if payload["scope_type"] not in {"global", "group", "private"}: return "scope_type 仅支持 global/group/private" # group/private 必须提供 scope_id,防止误配为全量匹配。 if payload["scope_type"] in {"group", "private"} and not payload["scope_id"]: return "group/private 作用域必须填写 scope_id" if payload["trigger_type"] not in {"exact", "prefix", "contains", "regex", "event"}: return "trigger_type 仅支持 exact/prefix/contains/regex/event" if payload["trigger_type"] == "event": if not payload["event_key"]: return "event 触发时 event_key 不能为空" else: if not payload["trigger_text"]: return "文本触发时 trigger_text 不能为空" ok, msg, normalized = service.validate_responses(payload.get("responses_json")) if not ok: return msg payload["responses_json"] = normalized return "" def _allowed_file(filename: str, media_type: str) -> bool: """判断上传文件扩展名是否允许。""" if "." not in filename: return False ext = filename.rsplit(".", 1)[1].lower() return ext in ALLOWED_EXTENSIONS.get(media_type, set()) @fun_command_rules_bp.route("/") @login_required def page_fun_command_rules(): return render_template("fun_command_rules.html") @fun_command_rules_bp.route("/api/upload", methods=["POST"]) @login_required def api_upload_media(): """上传媒体文件并返回服务端绝对路径。 设计说明: 1. 使用独立目录 static/uploads/fun_command_rules,与其它业务上传隔离。 2. 返回绝对路径,直接可用于插件发送媒体消息。 """ if "file" not in request.files: return jsonify({"success": False, "message": "未检测到上传文件"}), 400 media_type = str(request.form.get("media_type", "") or "").strip().lower() if media_type not in ALLOWED_EXTENSIONS: return jsonify({"success": False, "message": "media_type 非法,仅支持 image/voice/video"}), 400 upload_file = request.files.get("file") if not upload_file or not upload_file.filename: return jsonify({"success": False, "message": "文件名为空"}), 400 filename = secure_filename(upload_file.filename) if not _allowed_file(filename, media_type): return jsonify({"success": False, "message": f"不支持的文件类型: {filename}"}), 400 # 按媒体类型分目录,便于后续清理和排查。 upload_root = os.path.join(current_app.root_path, "static", "uploads", "fun_command_rules", media_type) os.makedirs(upload_root, exist_ok=True) unique_filename = f"{uuid.uuid4().hex}_{filename}" abs_path = os.path.abspath(os.path.join(upload_root, unique_filename)) upload_file.save(abs_path) return jsonify({ "success": True, "message": "上传成功", "data": { "path": abs_path, "filename": unique_filename, "media_type": media_type, } }) @fun_command_rules_bp.route("/api/list", methods=["GET"]) @login_required def api_list_rules(): server = current_app.dashboard_server service = server.fun_command_rule_service scope_type = str(request.args.get("scope_type", "") or "").strip().lower() scope_id = str(request.args.get("scope_id", "") or "").strip() enabled_raw = str(request.args.get("enabled", "") or "").strip().lower() enabled = None if enabled_raw in {"0", "1", "true", "false"}: enabled = enabled_raw in {"1", "true"} rows = service.list_rules(scope_type=scope_type, scope_id=scope_id, enabled=enabled) for row in rows: row["created_at"] = _normalize_datetime_text(row.get("created_at")) row["updated_at"] = _normalize_datetime_text(row.get("updated_at")) return jsonify({"success": True, "data": rows}) @fun_command_rules_bp.route("/api/create", methods=["POST"]) @login_required def api_create_rule(): server = current_app.dashboard_server service = server.fun_command_rule_service raw = request.get_json(silent=True) or {} payload = _build_payload(raw) error_text = _validate_payload(payload, service) if error_text: return jsonify({"success": False, "message": error_text}), 400 ok = service.create_rule(payload) if not ok: return jsonify({"success": False, "message": "创建失败"}), 500 return jsonify({"success": True, "message": "创建成功"}) @fun_command_rules_bp.route("/api/update/", methods=["POST"]) @login_required def api_update_rule(rule_id: int): server = current_app.dashboard_server service = server.fun_command_rule_service raw = request.get_json(silent=True) or {} payload = _build_payload(raw) error_text = _validate_payload(payload, service) if error_text: return jsonify({"success": False, "message": error_text}), 400 ok = service.update_rule(rule_id=rule_id, payload=payload) if not ok: return jsonify({"success": False, "message": "更新失败"}), 500 return jsonify({"success": True, "message": "更新成功"}) @fun_command_rules_bp.route("/api/delete/", methods=["POST"]) @login_required def api_delete_rule(rule_id: int): server = current_app.dashboard_server service = server.fun_command_rule_service ok = service.delete_rule(rule_id=rule_id) if not ok: return jsonify({"success": False, "message": "删除失败"}), 500 return jsonify({"success": True, "message": "删除成功"}) @fun_command_rules_bp.route("/api/toggle/", methods=["POST"]) @login_required def api_toggle_rule(rule_id: int): server = current_app.dashboard_server service = server.fun_command_rule_service raw = request.get_json(silent=True) or {} enabled = bool(raw.get("enabled", True)) updated_by = str(raw.get("updated_by", "dashboard") or "dashboard").strip() or "dashboard" ok = service.toggle_rule(rule_id=rule_id, enabled=enabled, updated_by=updated_by) if not ok: return jsonify({"success": False, "message": "切换失败"}), 500 return jsonify({"success": True, "message": "状态已更新"}) @fun_command_rules_bp.route("/api/test_match", methods=["POST"]) @login_required def api_test_match(): """提供后台测试入口,便于快速验证规则命中结果。""" server = current_app.dashboard_server service = server.fun_command_rule_service raw = request.get_json(silent=True) or {} scope_type = str(raw.get("scope_type", "group") or "group").strip().lower() scope_id = str(raw.get("scope_id", "") or "").strip() content = str(raw.get("content", "") or "").strip() event_key = str(raw.get("event_key", "") or "").strip().upper() session_key = scope_id or "test-session" matched = service.match_rule( scope_type=scope_type, scope_id=scope_id, content=content, event_key=event_key, session_key=session_key, ) if not matched: return jsonify({"success": True, "matched": False, "data": None}) return jsonify({"success": True, "matched": True, "data": matched})