1. 新增趣味指令规则数据层与服务层,支持应用级缓存+Redis+MySQL三级读取与缓存刷新。 2. 新增 fun_command_play 插件,支持文本/图片/语音/视频/卡片/App 多媒体响应,并接入群权限开关。 3. 新增拍一拍事件识别(PAT)并纳入统一触发模型。 4. 新增后台页面与API:规则增删改查、启停、命中测试。 5. 将趣味指令剧本接入 Dashboard 菜单与蓝图注册,并补充数据库迁移脚本。
216 lines
8.3 KiB
Python
216 lines
8.3 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""趣味指令规则数据库操作层。
|
||
|
||
这里专门封装“趣味指令剧本”相关的 MySQL 读写逻辑,
|
||
避免插件层直接拼 SQL,后续扩展字段也更安全。
|
||
"""
|
||
|
||
import json
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
from loguru import logger
|
||
|
||
from db.base import BaseDBOperator
|
||
from db.connection import DBConnectionManager
|
||
|
||
|
||
class FunCommandRuleDBOperator(BaseDBOperator):
|
||
"""趣味指令规则数据访问对象。"""
|
||
|
||
def __init__(self, db_manager: DBConnectionManager):
|
||
super().__init__(db_manager)
|
||
|
||
def init_tables(self) -> bool:
|
||
"""初始化趣味指令规则表。
|
||
|
||
说明:
|
||
1. responses_json 使用 JSON 字段存储多条响应动作,便于前端以数组方式编排。
|
||
2. scope_type + scope_id 用于做“全局/群聊/私聊”多作用域控制。
|
||
3. trigger_type + trigger_text/event_key 支持关键词与事件(如拍一拍)混合触发。
|
||
"""
|
||
try:
|
||
return self.execute_update(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS t_fun_command_rule (
|
||
id BIGINT PRIMARY KEY AUTO_INCREMENT,
|
||
rule_name VARCHAR(128) NOT NULL,
|
||
scope_type VARCHAR(20) NOT NULL DEFAULT 'global',
|
||
scope_id VARCHAR(100) NOT NULL DEFAULT '',
|
||
trigger_type VARCHAR(20) NOT NULL DEFAULT 'exact',
|
||
trigger_text VARCHAR(500) NOT NULL DEFAULT '',
|
||
event_key VARCHAR(64) NOT NULL DEFAULT '',
|
||
responses_json JSON NOT NULL,
|
||
priority INT NOT NULL DEFAULT 100,
|
||
cooldown_seconds INT NOT NULL DEFAULT 0,
|
||
enabled TINYINT(1) NOT NULL DEFAULT 1,
|
||
updated_by VARCHAR(100) NOT NULL DEFAULT 'system',
|
||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||
INDEX idx_scope_enabled_priority (scope_type, scope_id, enabled, priority),
|
||
INDEX idx_trigger_type (trigger_type),
|
||
INDEX idx_event_key (event_key)
|
||
)
|
||
"""
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"初始化趣味指令规则表失败: {e}")
|
||
return False
|
||
|
||
@staticmethod
|
||
def _normalize_row(row: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""统一处理数据库行数据。
|
||
|
||
这里把 JSON 字段解析为 Python 对象,且对关键字段做兜底,
|
||
防止旧数据/脏数据导致插件执行阶段崩溃。
|
||
"""
|
||
if not row:
|
||
return {}
|
||
|
||
responses_value = row.get("responses_json")
|
||
if isinstance(responses_value, str):
|
||
try:
|
||
row["responses_json"] = json.loads(responses_value)
|
||
except Exception:
|
||
row["responses_json"] = []
|
||
elif responses_value is None:
|
||
row["responses_json"] = []
|
||
|
||
if not isinstance(row.get("responses_json"), list):
|
||
row["responses_json"] = []
|
||
|
||
row["enabled"] = bool(row.get("enabled", 0))
|
||
row["priority"] = int(row.get("priority", 100) or 100)
|
||
row["cooldown_seconds"] = int(row.get("cooldown_seconds", 0) or 0)
|
||
row["scope_type"] = str(row.get("scope_type", "global") or "global")
|
||
row["scope_id"] = str(row.get("scope_id", "") or "")
|
||
row["trigger_type"] = str(row.get("trigger_type", "exact") or "exact")
|
||
row["trigger_text"] = str(row.get("trigger_text", "") or "")
|
||
row["event_key"] = str(row.get("event_key", "") or "")
|
||
return row
|
||
|
||
def list_rules(self, scope_type: str = "", scope_id: str = "", enabled: Optional[bool] = None) -> List[Dict[str, Any]]:
|
||
"""按条件查询规则列表。"""
|
||
where_sql = []
|
||
params: List[Any] = []
|
||
|
||
if scope_type:
|
||
where_sql.append("scope_type = %s")
|
||
params.append(scope_type)
|
||
if scope_id:
|
||
where_sql.append("scope_id = %s")
|
||
params.append(scope_id)
|
||
if enabled is not None:
|
||
where_sql.append("enabled = %s")
|
||
params.append(1 if enabled else 0)
|
||
|
||
where_clause = f"WHERE {' AND '.join(where_sql)}" if where_sql else ""
|
||
rows = self.execute_query(
|
||
f"""
|
||
SELECT *
|
||
FROM t_fun_command_rule
|
||
{where_clause}
|
||
ORDER BY priority ASC, id DESC
|
||
""",
|
||
tuple(params) if params else None,
|
||
) or []
|
||
|
||
return [self._normalize_row(dict(row)) for row in rows]
|
||
|
||
def get_rule(self, rule_id: int) -> Optional[Dict[str, Any]]:
|
||
"""按主键获取单条规则。"""
|
||
row = self.execute_query(
|
||
"""
|
||
SELECT *
|
||
FROM t_fun_command_rule
|
||
WHERE id = %s
|
||
LIMIT 1
|
||
""",
|
||
(int(rule_id),),
|
||
fetch_one=True,
|
||
)
|
||
if not row:
|
||
return None
|
||
return self._normalize_row(dict(row))
|
||
|
||
def create_rule(self, payload: Dict[str, Any]) -> bool:
|
||
"""创建规则。"""
|
||
return self.execute_update(
|
||
"""
|
||
INSERT INTO t_fun_command_rule (
|
||
rule_name, scope_type, scope_id,
|
||
trigger_type, trigger_text, event_key,
|
||
responses_json, priority, cooldown_seconds,
|
||
enabled, updated_by
|
||
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
""",
|
||
(
|
||
str(payload.get("rule_name", "") or "").strip(),
|
||
str(payload.get("scope_type", "global") or "global").strip(),
|
||
str(payload.get("scope_id", "") or "").strip(),
|
||
str(payload.get("trigger_type", "exact") or "exact").strip(),
|
||
str(payload.get("trigger_text", "") or "").strip(),
|
||
str(payload.get("event_key", "") or "").strip(),
|
||
json.dumps(payload.get("responses_json") or [], ensure_ascii=False),
|
||
int(payload.get("priority", 100) or 100),
|
||
int(payload.get("cooldown_seconds", 0) or 0),
|
||
1 if bool(payload.get("enabled", True)) else 0,
|
||
str(payload.get("updated_by", "system") or "system").strip(),
|
||
),
|
||
)
|
||
|
||
def update_rule(self, rule_id: int, payload: Dict[str, Any]) -> bool:
|
||
"""更新规则。"""
|
||
return self.execute_update(
|
||
"""
|
||
UPDATE t_fun_command_rule
|
||
SET
|
||
rule_name = %s,
|
||
scope_type = %s,
|
||
scope_id = %s,
|
||
trigger_type = %s,
|
||
trigger_text = %s,
|
||
event_key = %s,
|
||
responses_json = %s,
|
||
priority = %s,
|
||
cooldown_seconds = %s,
|
||
enabled = %s,
|
||
updated_by = %s
|
||
WHERE id = %s
|
||
""",
|
||
(
|
||
str(payload.get("rule_name", "") or "").strip(),
|
||
str(payload.get("scope_type", "global") or "global").strip(),
|
||
str(payload.get("scope_id", "") or "").strip(),
|
||
str(payload.get("trigger_type", "exact") or "exact").strip(),
|
||
str(payload.get("trigger_text", "") or "").strip(),
|
||
str(payload.get("event_key", "") or "").strip(),
|
||
json.dumps(payload.get("responses_json") or [], ensure_ascii=False),
|
||
int(payload.get("priority", 100) or 100),
|
||
int(payload.get("cooldown_seconds", 0) or 0),
|
||
1 if bool(payload.get("enabled", True)) else 0,
|
||
str(payload.get("updated_by", "system") or "system").strip(),
|
||
int(rule_id),
|
||
),
|
||
)
|
||
|
||
def delete_rule(self, rule_id: int) -> bool:
|
||
"""删除规则。"""
|
||
return self.execute_update(
|
||
"""
|
||
DELETE FROM t_fun_command_rule
|
||
WHERE id = %s
|
||
""",
|
||
(int(rule_id),),
|
||
)
|
||
|
||
def toggle_rule(self, rule_id: int, enabled: bool, updated_by: str = "system") -> bool:
|
||
"""快速切换规则启停状态。"""
|
||
return self.execute_update(
|
||
"""
|
||
UPDATE t_fun_command_rule
|
||
SET enabled = %s, updated_by = %s
|
||
WHERE id = %s
|
||
""",
|
||
(1 if enabled else 0, str(updated_by or "system"), int(rule_id)),
|
||
)
|