# -*- coding: utf-8 -*- """趣味指令规则数据库操作层。 这里专门封装“趣味指令剧本”相关的 MySQL 读写逻辑, 避免插件层直接拼 SQL,后续扩展字段也更安全。 """ import json from typing import Any, Dict, List, Optional from loguru import logger from db.base import BaseDBOperator from db.connection import DBConnectionManager class FunCommandRuleDBOperator(BaseDBOperator): """趣味指令规则数据访问对象。""" def __init__(self, db_manager: DBConnectionManager): super().__init__(db_manager) def init_tables(self) -> bool: """初始化趣味指令规则表。 说明: 1. responses_json 使用 JSON 字段存储多条响应动作,便于前端以数组方式编排。 2. scope_type + scope_id 用于做“全局/群聊/私聊”多作用域控制。 3. trigger_type + trigger_text/event_key 支持关键词与事件(如拍一拍)混合触发。 """ try: return self.execute_update( """ CREATE TABLE IF NOT EXISTS t_fun_command_rule ( id BIGINT PRIMARY KEY AUTO_INCREMENT, rule_name VARCHAR(128) NOT NULL, scope_type VARCHAR(20) NOT NULL DEFAULT 'global', scope_id VARCHAR(100) NOT NULL DEFAULT '', trigger_type VARCHAR(20) NOT NULL DEFAULT 'exact', trigger_text VARCHAR(500) NOT NULL DEFAULT '', event_key VARCHAR(64) NOT NULL DEFAULT '', responses_json JSON NOT NULL, priority INT NOT NULL DEFAULT 100, cooldown_seconds INT NOT NULL DEFAULT 0, enabled TINYINT(1) NOT NULL DEFAULT 1, updated_by VARCHAR(100) NOT NULL DEFAULT 'system', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_scope_enabled_priority (scope_type, scope_id, enabled, priority), INDEX idx_trigger_type (trigger_type), INDEX idx_event_key (event_key) ) """ ) except Exception as e: logger.error(f"初始化趣味指令规则表失败: {e}") return False @staticmethod def _normalize_row(row: Dict[str, Any]) -> Dict[str, Any]: """统一处理数据库行数据。 这里把 JSON 字段解析为 Python 对象,且对关键字段做兜底, 防止旧数据/脏数据导致插件执行阶段崩溃。 """ if not row: return {} responses_value = row.get("responses_json") if isinstance(responses_value, str): try: row["responses_json"] = json.loads(responses_value) except Exception: row["responses_json"] = [] elif responses_value is None: row["responses_json"] = [] if not isinstance(row.get("responses_json"), list): row["responses_json"] = [] row["enabled"] = bool(row.get("enabled", 0)) row["priority"] = int(row.get("priority", 100) or 100) row["cooldown_seconds"] = int(row.get("cooldown_seconds", 0) or 0) row["scope_type"] = str(row.get("scope_type", "global") or "global") row["scope_id"] = str(row.get("scope_id", "") or "") row["trigger_type"] = str(row.get("trigger_type", "exact") or "exact") row["trigger_text"] = str(row.get("trigger_text", "") or "") row["event_key"] = str(row.get("event_key", "") or "") return row def list_rules(self, scope_type: str = "", scope_id: str = "", enabled: Optional[bool] = None) -> List[Dict[str, Any]]: """按条件查询规则列表。""" where_sql = [] params: List[Any] = [] if scope_type: where_sql.append("scope_type = %s") params.append(scope_type) if scope_id: where_sql.append("scope_id = %s") params.append(scope_id) if enabled is not None: where_sql.append("enabled = %s") params.append(1 if enabled else 0) where_clause = f"WHERE {' AND '.join(where_sql)}" if where_sql else "" rows = self.execute_query( f""" SELECT * FROM t_fun_command_rule {where_clause} ORDER BY priority ASC, id DESC """, tuple(params) if params else None, ) or [] return [self._normalize_row(dict(row)) for row in rows] def get_rule(self, rule_id: int) -> Optional[Dict[str, Any]]: """按主键获取单条规则。""" row = self.execute_query( """ SELECT * FROM t_fun_command_rule WHERE id = %s LIMIT 1 """, (int(rule_id),), fetch_one=True, ) if not row: return None return self._normalize_row(dict(row)) def create_rule(self, payload: Dict[str, Any]) -> bool: """创建规则。""" return self.execute_update( """ INSERT INTO t_fun_command_rule ( rule_name, scope_type, scope_id, trigger_type, trigger_text, event_key, responses_json, priority, cooldown_seconds, enabled, updated_by ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( str(payload.get("rule_name", "") or "").strip(), str(payload.get("scope_type", "global") or "global").strip(), str(payload.get("scope_id", "") or "").strip(), str(payload.get("trigger_type", "exact") or "exact").strip(), str(payload.get("trigger_text", "") or "").strip(), str(payload.get("event_key", "") or "").strip(), json.dumps(payload.get("responses_json") or [], ensure_ascii=False), int(payload.get("priority", 100) or 100), int(payload.get("cooldown_seconds", 0) or 0), 1 if bool(payload.get("enabled", True)) else 0, str(payload.get("updated_by", "system") or "system").strip(), ), ) def update_rule(self, rule_id: int, payload: Dict[str, Any]) -> bool: """更新规则。""" return self.execute_update( """ UPDATE t_fun_command_rule SET rule_name = %s, scope_type = %s, scope_id = %s, trigger_type = %s, trigger_text = %s, event_key = %s, responses_json = %s, priority = %s, cooldown_seconds = %s, enabled = %s, updated_by = %s WHERE id = %s """, ( str(payload.get("rule_name", "") or "").strip(), str(payload.get("scope_type", "global") or "global").strip(), str(payload.get("scope_id", "") or "").strip(), str(payload.get("trigger_type", "exact") or "exact").strip(), str(payload.get("trigger_text", "") or "").strip(), str(payload.get("event_key", "") or "").strip(), json.dumps(payload.get("responses_json") or [], ensure_ascii=False), int(payload.get("priority", 100) or 100), int(payload.get("cooldown_seconds", 0) or 0), 1 if bool(payload.get("enabled", True)) else 0, str(payload.get("updated_by", "system") or "system").strip(), int(rule_id), ), ) def delete_rule(self, rule_id: int) -> bool: """删除规则。""" return self.execute_update( """ DELETE FROM t_fun_command_rule WHERE id = %s """, (int(rule_id),), ) def toggle_rule(self, rule_id: int, enabled: bool, updated_by: str = "system") -> bool: """快速切换规则启停状态。""" return self.execute_update( """ UPDATE t_fun_command_rule SET enabled = %s, updated_by = %s WHERE id = %s """, (1 if enabled else 0, str(updated_by or "system"), int(rule_id)), )