diff --git a/admin/dashboard/blueprints/fun_command_rules.py b/admin/dashboard/blueprints/fun_command_rules.py new file mode 100644 index 0000000..7fa80eb --- /dev/null +++ b/admin/dashboard/blueprints/fun_command_rules.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +"""趣味指令规则后台蓝图。""" + +from datetime import datetime +from typing import Any, Dict + +from flask import Blueprint, current_app, jsonify, render_template, request + +from .auth import login_required + + +fun_command_rules_bp = Blueprint("fun_command_rules", __name__, url_prefix="/fun_command_rules") + + +def _normalize_datetime_text(value): + """统一时间字段展示格式。""" + if value is None: + return value + if isinstance(value, datetime): + return value.strftime("%Y-%m-%d %H:%M:%S") + text = str(value) + if "T" in text: + return text.replace("T", " ")[:19] + return text + + +def _build_payload(raw: Dict[str, Any]) -> Dict[str, Any]: + """构造并清洗规则载荷。""" + return { + "rule_name": str(raw.get("rule_name", "") or "").strip(), + "scope_type": str(raw.get("scope_type", "global") or "global").strip().lower(), + "scope_id": str(raw.get("scope_id", "") or "").strip(), + "trigger_type": str(raw.get("trigger_type", "exact") or "exact").strip().lower(), + "trigger_text": str(raw.get("trigger_text", "") or "").strip(), + "event_key": str(raw.get("event_key", "") or "").strip().upper(), + "responses_json": raw.get("responses_json", []), + "priority": int(raw.get("priority", 100) or 100), + "cooldown_seconds": int(raw.get("cooldown_seconds", 0) or 0), + "enabled": bool(raw.get("enabled", True)), + "updated_by": str(raw.get("updated_by", "dashboard") or "dashboard").strip() or "dashboard", + } + + +def _validate_payload(payload: Dict[str, Any], service) -> str: + """校验规则数据,返回空字符串表示通过。""" + if not payload["rule_name"]: + return "rule_name 不能为空" + + if payload["scope_type"] not in {"global", "group", "private"}: + return "scope_type 仅支持 global/group/private" + + # group/private 必须提供 scope_id,防止误配为全量匹配。 + if payload["scope_type"] in {"group", "private"} and not payload["scope_id"]: + return "group/private 作用域必须填写 scope_id" + + if payload["trigger_type"] not in {"exact", "prefix", "contains", "regex", "event"}: + return "trigger_type 仅支持 exact/prefix/contains/regex/event" + + if payload["trigger_type"] == "event": + if not payload["event_key"]: + return "event 触发时 event_key 不能为空" + else: + if not payload["trigger_text"]: + return "文本触发时 trigger_text 不能为空" + + ok, msg, normalized = service.validate_responses(payload.get("responses_json")) + if not ok: + return msg + payload["responses_json"] = normalized + return "" + + +@fun_command_rules_bp.route("/") +@login_required +def page_fun_command_rules(): + return render_template("fun_command_rules.html") + + +@fun_command_rules_bp.route("/api/list", methods=["GET"]) +@login_required +def api_list_rules(): + server = current_app.dashboard_server + service = server.fun_command_rule_service + + scope_type = str(request.args.get("scope_type", "") or "").strip().lower() + scope_id = str(request.args.get("scope_id", "") or "").strip() + + enabled_raw = str(request.args.get("enabled", "") or "").strip().lower() + enabled = None + if enabled_raw in {"0", "1", "true", "false"}: + enabled = enabled_raw in {"1", "true"} + + rows = service.list_rules(scope_type=scope_type, scope_id=scope_id, enabled=enabled) + for row in rows: + row["created_at"] = _normalize_datetime_text(row.get("created_at")) + row["updated_at"] = _normalize_datetime_text(row.get("updated_at")) + + return jsonify({"success": True, "data": rows}) + + +@fun_command_rules_bp.route("/api/create", methods=["POST"]) +@login_required +def api_create_rule(): + server = current_app.dashboard_server + service = server.fun_command_rule_service + + raw = request.get_json(silent=True) or {} + payload = _build_payload(raw) + error_text = _validate_payload(payload, service) + if error_text: + return jsonify({"success": False, "message": error_text}), 400 + + ok = service.create_rule(payload) + if not ok: + return jsonify({"success": False, "message": "创建失败"}), 500 + + return jsonify({"success": True, "message": "创建成功"}) + + +@fun_command_rules_bp.route("/api/update/", methods=["POST"]) +@login_required +def api_update_rule(rule_id: int): + server = current_app.dashboard_server + service = server.fun_command_rule_service + + raw = request.get_json(silent=True) or {} + payload = _build_payload(raw) + error_text = _validate_payload(payload, service) + if error_text: + return jsonify({"success": False, "message": error_text}), 400 + + ok = service.update_rule(rule_id=rule_id, payload=payload) + if not ok: + return jsonify({"success": False, "message": "更新失败"}), 500 + + return jsonify({"success": True, "message": "更新成功"}) + + +@fun_command_rules_bp.route("/api/delete/", methods=["POST"]) +@login_required +def api_delete_rule(rule_id: int): + server = current_app.dashboard_server + service = server.fun_command_rule_service + + ok = service.delete_rule(rule_id=rule_id) + if not ok: + return jsonify({"success": False, "message": "删除失败"}), 500 + + return jsonify({"success": True, "message": "删除成功"}) + + +@fun_command_rules_bp.route("/api/toggle/", methods=["POST"]) +@login_required +def api_toggle_rule(rule_id: int): + server = current_app.dashboard_server + service = server.fun_command_rule_service + + raw = request.get_json(silent=True) or {} + enabled = bool(raw.get("enabled", True)) + updated_by = str(raw.get("updated_by", "dashboard") or "dashboard").strip() or "dashboard" + + ok = service.toggle_rule(rule_id=rule_id, enabled=enabled, updated_by=updated_by) + if not ok: + return jsonify({"success": False, "message": "切换失败"}), 500 + + return jsonify({"success": True, "message": "状态已更新"}) + + +@fun_command_rules_bp.route("/api/test_match", methods=["POST"]) +@login_required +def api_test_match(): + """提供后台测试入口,便于快速验证规则命中结果。""" + server = current_app.dashboard_server + service = server.fun_command_rule_service + + raw = request.get_json(silent=True) or {} + scope_type = str(raw.get("scope_type", "group") or "group").strip().lower() + scope_id = str(raw.get("scope_id", "") or "").strip() + content = str(raw.get("content", "") or "").strip() + event_key = str(raw.get("event_key", "") or "").strip().upper() + + session_key = scope_id or "test-session" + matched = service.match_rule( + scope_type=scope_type, + scope_id=scope_id, + content=content, + event_key=event_key, + session_key=session_key, + ) + + if not matched: + return jsonify({"success": True, "matched": False, "data": None}) + + return jsonify({"success": True, "matched": True, "data": matched}) diff --git a/admin/dashboard/server.py b/admin/dashboard/server.py index e0dcb9d..e5e53ca 100644 --- a/admin/dashboard/server.py +++ b/admin/dashboard/server.py @@ -16,6 +16,8 @@ from db.member_context_db import MemberContextDBOperator from db.message_storage import MessageStorageDB from db.stats_db import StatsDBOperator from db.task_db import TaskDBOperator +from db.fun_command_rule_db import FunCommandRuleDBOperator +from utils.fun_command_rule_service import FunCommandRuleService from wechat_ipad import WechatAPIClient # 添加项目根目录到系统路径,确保可以导入项目模块 @@ -56,6 +58,16 @@ class DashboardServer: self.group_plugin_config_db = robot_instance.group_plugin_config_db self.llm_catalog_db = robot_instance.llm_catalog_db self.group_plugin_config_service = robot_instance.group_plugin_config_service + # 趣味指令规则服务:用于“文案/事件触发多媒体玩法回复”后台配置与缓存。 + # 这里统一在 Dashboard 启动时初始化,保证管理端可直接读写规则。 + self.fun_command_rule_db = FunCommandRuleDBOperator(self.db_manager) + self.fun_command_rule_service = FunCommandRuleService( + db_operator=self.fun_command_rule_db, + redis_client=self.db_manager.get_redis_connection(), + local_ttl_seconds=30, + ) + self.fun_command_rule_service.init_tables() + self.fun_command_rule_service.refresh_cache() # 获取联系人管理器实例 self.contact_manager = robot_instance.contact_manager self.plugin_manager = robot_instance.plugin_manager @@ -175,6 +187,7 @@ class DashboardServer: from admin.dashboard.blueprints.system_jobs import system_jobs_bp from admin.dashboard.blueprints.plugin_schedules import plugin_schedules_bp from admin.dashboard.blueprints.group_plugin_config import group_plugin_config_bp + from admin.dashboard.blueprints.fun_command_rules import fun_command_rules_bp from admin.dashboard.blueprints.trendradar_webhook import trendradar_webhook_bp # 在app.register_blueprint部分添加 @@ -193,6 +206,7 @@ class DashboardServer: app.register_blueprint(system_jobs_bp) app.register_blueprint(plugin_schedules_bp) app.register_blueprint(group_plugin_config_bp) + app.register_blueprint(fun_command_rules_bp) app.register_blueprint(trendradar_webhook_bp) self.LOG.info("所有蓝图已注册") diff --git a/admin/dashboard/templates/base.html b/admin/dashboard/templates/base.html index 0456495..c7bdf9d 100644 --- a/admin/dashboard/templates/base.html +++ b/admin/dashboard/templates/base.html @@ -1011,6 +1011,7 @@ { label: '插件管理', path: '/plugins_manage' }, { label: '插件定时任务', path: '/plugin_schedules' }, { label: '群级插件配置', path: '/group_plugin_config' }, + { label: '趣味指令剧本', path: '/fun_command_rules' }, { label: '接口文档', path: '/api_docs' } ] }, diff --git a/admin/dashboard/templates/fun_command_rules.html b/admin/dashboard/templates/fun_command_rules.html new file mode 100644 index 0000000..50618a5 --- /dev/null +++ b/admin/dashboard/templates/fun_command_rules.html @@ -0,0 +1,504 @@ +{% extends "base.html" %} + +{% block title %}趣味指令剧本 - 机器人管理后台{% endblock %} + +{% block content %} +
+
+
+
Fun Script Rules
+

趣味指令剧本

+

把“文本关键词/拍一拍事件”映射成可编排的多媒体回应,持续沉淀你的机器人玩法库。

+
+
+ 刷新 + 新增规则 +
+
+ + + + + + + + + + + + + + + + + + + + + 查询 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
规则测试
+ + + + + + + + + + + + + + + + + + + + + 测试命中 + + + + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ 新增动作 + 格式化JSON +
+ + + + + + + + + + + + + + + + + +
+ 支持占位符:{sender} {roomid} {event}。 + 常见示例: + text -> {"text":"你拍了拍我,我就拍回去~"} + link -> {"title":"今日梗图","desc":"点开看","url":"https://example.com","thumb_url":""} +
+
+
+
+ 取消 + 保存 +
+
+
+{% endblock %} + +{% block scripts %} + +{% endblock %} + +{% block styles %} + +{% endblock %} diff --git a/db/fun_command_rule_db.py b/db/fun_command_rule_db.py new file mode 100644 index 0000000..dd7746a --- /dev/null +++ b/db/fun_command_rule_db.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- +"""趣味指令规则数据库操作层。 + +这里专门封装“趣味指令剧本”相关的 MySQL 读写逻辑, +避免插件层直接拼 SQL,后续扩展字段也更安全。 +""" + +import json +from typing import Any, Dict, List, Optional + +from loguru import logger + +from db.base import BaseDBOperator +from db.connection import DBConnectionManager + + +class FunCommandRuleDBOperator(BaseDBOperator): + """趣味指令规则数据访问对象。""" + + def __init__(self, db_manager: DBConnectionManager): + super().__init__(db_manager) + + def init_tables(self) -> bool: + """初始化趣味指令规则表。 + + 说明: + 1. responses_json 使用 JSON 字段存储多条响应动作,便于前端以数组方式编排。 + 2. scope_type + scope_id 用于做“全局/群聊/私聊”多作用域控制。 + 3. trigger_type + trigger_text/event_key 支持关键词与事件(如拍一拍)混合触发。 + """ + try: + return self.execute_update( + """ + CREATE TABLE IF NOT EXISTS t_fun_command_rule ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + rule_name VARCHAR(128) NOT NULL, + scope_type VARCHAR(20) NOT NULL DEFAULT 'global', + scope_id VARCHAR(100) NOT NULL DEFAULT '', + trigger_type VARCHAR(20) NOT NULL DEFAULT 'exact', + trigger_text VARCHAR(500) NOT NULL DEFAULT '', + event_key VARCHAR(64) NOT NULL DEFAULT '', + responses_json JSON NOT NULL, + priority INT NOT NULL DEFAULT 100, + cooldown_seconds INT NOT NULL DEFAULT 0, + enabled TINYINT(1) NOT NULL DEFAULT 1, + updated_by VARCHAR(100) NOT NULL DEFAULT 'system', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_scope_enabled_priority (scope_type, scope_id, enabled, priority), + INDEX idx_trigger_type (trigger_type), + INDEX idx_event_key (event_key) + ) + """ + ) + except Exception as e: + logger.error(f"初始化趣味指令规则表失败: {e}") + return False + + @staticmethod + def _normalize_row(row: Dict[str, Any]) -> Dict[str, Any]: + """统一处理数据库行数据。 + + 这里把 JSON 字段解析为 Python 对象,且对关键字段做兜底, + 防止旧数据/脏数据导致插件执行阶段崩溃。 + """ + if not row: + return {} + + responses_value = row.get("responses_json") + if isinstance(responses_value, str): + try: + row["responses_json"] = json.loads(responses_value) + except Exception: + row["responses_json"] = [] + elif responses_value is None: + row["responses_json"] = [] + + if not isinstance(row.get("responses_json"), list): + row["responses_json"] = [] + + row["enabled"] = bool(row.get("enabled", 0)) + row["priority"] = int(row.get("priority", 100) or 100) + row["cooldown_seconds"] = int(row.get("cooldown_seconds", 0) or 0) + row["scope_type"] = str(row.get("scope_type", "global") or "global") + row["scope_id"] = str(row.get("scope_id", "") or "") + row["trigger_type"] = str(row.get("trigger_type", "exact") or "exact") + row["trigger_text"] = str(row.get("trigger_text", "") or "") + row["event_key"] = str(row.get("event_key", "") or "") + return row + + def list_rules(self, scope_type: str = "", scope_id: str = "", enabled: Optional[bool] = None) -> List[Dict[str, Any]]: + """按条件查询规则列表。""" + where_sql = [] + params: List[Any] = [] + + if scope_type: + where_sql.append("scope_type = %s") + params.append(scope_type) + if scope_id: + where_sql.append("scope_id = %s") + params.append(scope_id) + if enabled is not None: + where_sql.append("enabled = %s") + params.append(1 if enabled else 0) + + where_clause = f"WHERE {' AND '.join(where_sql)}" if where_sql else "" + rows = self.execute_query( + f""" + SELECT * + FROM t_fun_command_rule + {where_clause} + ORDER BY priority ASC, id DESC + """, + tuple(params) if params else None, + ) or [] + + return [self._normalize_row(dict(row)) for row in rows] + + def get_rule(self, rule_id: int) -> Optional[Dict[str, Any]]: + """按主键获取单条规则。""" + row = self.execute_query( + """ + SELECT * + FROM t_fun_command_rule + WHERE id = %s + LIMIT 1 + """, + (int(rule_id),), + fetch_one=True, + ) + if not row: + return None + return self._normalize_row(dict(row)) + + def create_rule(self, payload: Dict[str, Any]) -> bool: + """创建规则。""" + return self.execute_update( + """ + INSERT INTO t_fun_command_rule ( + rule_name, scope_type, scope_id, + trigger_type, trigger_text, event_key, + responses_json, priority, cooldown_seconds, + enabled, updated_by + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """, + ( + str(payload.get("rule_name", "") or "").strip(), + str(payload.get("scope_type", "global") or "global").strip(), + str(payload.get("scope_id", "") or "").strip(), + str(payload.get("trigger_type", "exact") or "exact").strip(), + str(payload.get("trigger_text", "") or "").strip(), + str(payload.get("event_key", "") or "").strip(), + json.dumps(payload.get("responses_json") or [], ensure_ascii=False), + int(payload.get("priority", 100) or 100), + int(payload.get("cooldown_seconds", 0) or 0), + 1 if bool(payload.get("enabled", True)) else 0, + str(payload.get("updated_by", "system") or "system").strip(), + ), + ) + + def update_rule(self, rule_id: int, payload: Dict[str, Any]) -> bool: + """更新规则。""" + return self.execute_update( + """ + UPDATE t_fun_command_rule + SET + rule_name = %s, + scope_type = %s, + scope_id = %s, + trigger_type = %s, + trigger_text = %s, + event_key = %s, + responses_json = %s, + priority = %s, + cooldown_seconds = %s, + enabled = %s, + updated_by = %s + WHERE id = %s + """, + ( + str(payload.get("rule_name", "") or "").strip(), + str(payload.get("scope_type", "global") or "global").strip(), + str(payload.get("scope_id", "") or "").strip(), + str(payload.get("trigger_type", "exact") or "exact").strip(), + str(payload.get("trigger_text", "") or "").strip(), + str(payload.get("event_key", "") or "").strip(), + json.dumps(payload.get("responses_json") or [], ensure_ascii=False), + int(payload.get("priority", 100) or 100), + int(payload.get("cooldown_seconds", 0) or 0), + 1 if bool(payload.get("enabled", True)) else 0, + str(payload.get("updated_by", "system") or "system").strip(), + int(rule_id), + ), + ) + + def delete_rule(self, rule_id: int) -> bool: + """删除规则。""" + return self.execute_update( + """ + DELETE FROM t_fun_command_rule + WHERE id = %s + """, + (int(rule_id),), + ) + + def toggle_rule(self, rule_id: int, enabled: bool, updated_by: str = "system") -> bool: + """快速切换规则启停状态。""" + return self.execute_update( + """ + UPDATE t_fun_command_rule + SET enabled = %s, updated_by = %s + WHERE id = %s + """, + (1 if enabled else 0, str(updated_by or "system"), int(rule_id)), + ) diff --git a/db/scripts/migrations/20260423_add_fun_command_rule_table.sql b/db/scripts/migrations/20260423_add_fun_command_rule_table.sql new file mode 100644 index 0000000..e2b0bb8 --- /dev/null +++ b/db/scripts/migrations/20260423_add_fun_command_rule_table.sql @@ -0,0 +1,21 @@ +-- 趣味指令剧本规则表 +-- 说明:用于配置“文本/事件触发 -> 多媒体响应”玩法规则。 +CREATE TABLE IF NOT EXISTS t_fun_command_rule ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + rule_name VARCHAR(128) NOT NULL, + scope_type VARCHAR(20) NOT NULL DEFAULT 'global', + scope_id VARCHAR(100) NOT NULL DEFAULT '', + trigger_type VARCHAR(20) NOT NULL DEFAULT 'exact', + trigger_text VARCHAR(500) NOT NULL DEFAULT '', + event_key VARCHAR(64) NOT NULL DEFAULT '', + responses_json JSON NOT NULL, + priority INT NOT NULL DEFAULT 100, + cooldown_seconds INT NOT NULL DEFAULT 0, + enabled TINYINT(1) NOT NULL DEFAULT 1, + updated_by VARCHAR(100) NOT NULL DEFAULT 'system', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_scope_enabled_priority (scope_type, scope_id, enabled, priority), + INDEX idx_trigger_type (trigger_type), + INDEX idx_event_key (event_key) +); diff --git a/plugins/fun_command_play/__init__.py b/plugins/fun_command_play/__init__.py new file mode 100644 index 0000000..1cbdd48 --- /dev/null +++ b/plugins/fun_command_play/__init__.py @@ -0,0 +1,7 @@ +# 从当前包导入插件主类 +from .main import FunCommandPlayPlugin + + +def get_plugin(): + """返回插件实例。""" + return FunCommandPlayPlugin() diff --git a/plugins/fun_command_play/config.toml b/plugins/fun_command_play/config.toml new file mode 100644 index 0000000..b07d613 --- /dev/null +++ b/plugins/fun_command_play/config.toml @@ -0,0 +1,2 @@ +[FunCommandPlay] +enable = true diff --git a/plugins/fun_command_play/main.py b/plugins/fun_command_play/main.py new file mode 100644 index 0000000..0235194 --- /dev/null +++ b/plugins/fun_command_play/main.py @@ -0,0 +1,312 @@ +# -*- coding: utf-8 -*- +"""趣味指令剧本插件。 + +核心目标: +1. 让机器人支持“文案/事件 -> 多媒体回应”的可配置玩法。 +2. 把玩法规则彻底数据化,便于后续持续收集、扩展梗库。 +3. 将“拍一拍”作为内置事件纳入统一触发体系。 +""" + +import asyncio +import os +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from loguru import logger + +from base.plugin_common.message_plugin_interface import MessagePluginInterface +from base.plugin_common.plugin_interface import PluginStatus +from db.fun_command_rule_db import FunCommandRuleDBOperator +from utils.fun_command_rule_service import FunCommandRuleService +from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus +from wechat_ipad import WechatAPIClient +from wechat_ipad.models.message import MessageType + + +class FunCommandPlayPlugin(MessagePluginInterface): + """趣味指令剧本插件。""" + + FEATURE_KEY = "FUN_COMMAND_PLAY" + FEATURE_DESCRIPTION = "🎭 趣味指令剧本 [配置文案/事件触发多媒体玩法回复]" + + @property + def name(self) -> str: + return "趣味指令剧本" + + @property + def version(self) -> str: + return "1.0.0" + + @property + def description(self) -> str: + return "支持文案与事件触发的趣味玩法回复(文本、图片、语音、视频、卡片、App消息)" + + @property + def author(self) -> str: + return "codex" + + @property + def commands(self) -> List[str]: + # 该插件是“被动触发型”,不依赖固定命令词。 + return [] + + @property + def feature_key(self) -> Optional[str]: + return self.FEATURE_KEY + + @property + def feature_description(self) -> Optional[str]: + return self.FEATURE_DESCRIPTION + + def __init__(self): + super().__init__() + # 注册群级权限开关,方便你按群控制玩法是否启用。 + self.feature = self.register_feature() + self.rule_service: Optional[FunCommandRuleService] = None + self.enable = True + + def initialize(self, context: Dict[str, Any]) -> bool: + """初始化插件。""" + self.LOG = logger + self.LOG.debug(f"正在初始化 {self.name} 插件...") + + db_manager = context.get("db_manager") + if db_manager is None: + self.LOG.error("未拿到 db_manager,趣味指令剧本插件初始化失败") + return False + + # 读取开关配置:保留插件级总开关,便于快速停用。 + plugin_cfg = self._config.get("FunCommandPlay", {}) + self.enable = bool(plugin_cfg.get("enable", True)) + + # 初始化规则服务,确保首次启动就有表结构。 + redis_client = db_manager.get_redis_connection() + db_operator = FunCommandRuleDBOperator(db_manager) + self.rule_service = FunCommandRuleService(db_operator=db_operator, redis_client=redis_client, local_ttl_seconds=30) + if not self.rule_service.init_tables(): + self.LOG.error("趣味指令规则表初始化失败") + return False + + # 启动时预热一次缓存,减少第一条消息延迟。 + self.rule_service.refresh_cache() + self.LOG.debug(f"[{self.name}] 插件初始化完成") + return True + + def start(self) -> bool: + self.status = PluginStatus.RUNNING + self.LOG.info(f"[{self.name}] 插件已启动") + return True + + def stop(self) -> bool: + self.status = PluginStatus.STOPPED + self.LOG.info(f"[{self.name}] 插件已停止") + return True + + @staticmethod + def _normalize_scope(message: Dict[str, Any]) -> Tuple[str, str, str]: + """标准化作用域信息。 + + 返回: + - scope_type: global/group/private + - scope_id: 群ID或用户ID + - target_id: 发送目标(群ID优先,否则私聊用户ID) + """ + room_id = str(message.get("roomid", "") or "").strip() + sender = str(message.get("sender", "") or "").strip() + + if room_id: + return "group", room_id, room_id + return "private", sender, sender + + @staticmethod + def _extract_event_key(message: Dict[str, Any]) -> str: + """提取事件触发键。 + + 当前内置: + - PAT:拍一拍事件 + + 检测策略: + 1. 系统消息文案包含“拍了拍”。 + 2. XML 内容包含 patMsg 结构。 + """ + content = str(message.get("content", "") or "") + full_msg = message.get("full_wx_msg") + + # 通过消息枚举类型识别系统消息,再结合关键词更稳妥。 + msg_type = getattr(full_msg, "msg_type", None) + msg_type_value = getattr(msg_type, "value", msg_type) + is_system = str(msg_type_value) in {str(MessageType.SYSTEM.value), str(MessageType.SYSTEM_NOTIFY.value), "10000", "10002"} + + lowered_content = content.lower() + if " Dict[str, str]: + """构建模板变量上下文。""" + room_id = str(message.get("roomid", "") or "") + sender = str(message.get("sender", "") or "") + return { + "sender": sender, + "roomid": room_id, + "event": event_key or "", + } + + @staticmethod + def _render_template(text: str, context: Dict[str, str]) -> str: + """轻量模板替换。 + + 使用 {sender}/{roomid}/{event} 占位符, + 保持简单可控,避免引入模板引擎复杂性。 + """ + output = str(text or "") + for key, value in (context or {}).items(): + output = output.replace(f"{{{key}}}", str(value or "")) + return output + + def _find_match_rule(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """查找命中规则。""" + if not self.rule_service: + return None + + scope_type, scope_id, _ = self._normalize_scope(message) + content = str(message.get("content", "") or "").strip() + event_key = self._extract_event_key(message) + session_key = scope_id or str(message.get("sender", "") or "") + + return self.rule_service.match_rule( + scope_type=scope_type, + scope_id=scope_id, + content=content, + event_key=event_key, + session_key=session_key, + ) + + def can_process(self, message: Dict[str, Any]) -> bool: + """判断是否可处理。 + + 说明: + 1. 只在插件总开关开启时参与匹配。 + 2. 只处理群聊与私聊文本/系统类消息,不处理空内容。 + 3. 群聊下会遵循群权限开关。 + """ + if not self.enable or not self.rule_service: + return False + + content = str(message.get("content", "") or "").strip() + if not content: + return False + + sender = str(message.get("sender", "") or "").strip() + room_id = str(message.get("roomid", "") or "").strip() + gbm: GroupBotManager = message.get("gbm") + + # 防止机器人自回复导致循环。 + if self.bot and sender and sender == getattr(self.bot, "wxid", ""): + return False + + # 群聊场景遵循群级权限。 + if room_id and gbm and gbm.get_group_permission(room_id, self.feature) == PermissionStatus.DISABLED: + return False + + # 先做一次匹配并塞入 message,process_message 阶段直接复用,减少重复计算。 + matched_rule = self._find_match_rule(message) + if matched_rule: + message["_fun_rule_match"] = matched_rule + message["_fun_event_key"] = self._extract_event_key(message) + return True + return False + + async def _send_action(self, bot: WechatAPIClient, target_id: str, action: Dict[str, Any], context: Dict[str, str]) -> None: + """发送单条响应动作。""" + action_type = str(action.get("type", "") or "").strip().lower() + + # 支持配置 delay_ms,模拟“连发节奏感”。 + delay_ms = int(action.get("delay_ms", 0) or 0) + if delay_ms > 0: + await asyncio.sleep(delay_ms / 1000.0) + + if action_type == "text": + text = self._render_template(str(action.get("text", "") or ""), context) + if text: + await bot.send_text_message(target_id, text) + return + + if action_type == "image": + image_path = self._render_template(str(action.get("path", "") or ""), context) + if image_path and os.path.exists(image_path): + await bot.send_image_message(target_id, Path(image_path)) + return + + if action_type == "voice": + voice_path = self._render_template(str(action.get("path", "") or ""), context) + voice_format = str(action.get("format", "") or "").strip().lower() + if voice_path and os.path.exists(voice_path): + if not voice_format: + suffix = Path(voice_path).suffix.lower() + voice_format = "wav" if suffix == ".wav" else "mp3" + await bot.send_voice_message(target_id, Path(voice_path), voice_format) + return + + if action_type == "video": + video_path = self._render_template(str(action.get("path", "") or ""), context) + cover_path = self._render_template(str(action.get("cover_path", "") or ""), context) + if video_path and os.path.exists(video_path): + if cover_path and os.path.exists(cover_path): + await bot.send_video_message(target_id, Path(video_path), Path(cover_path)) + else: + await bot.send_video_message(target_id, Path(video_path)) + return + + if action_type == "link": + title = self._render_template(str(action.get("title", "") or ""), context) + desc = self._render_template(str(action.get("desc", "") or ""), context) + url = self._render_template(str(action.get("url", "") or ""), context) + thumb_url = self._render_template(str(action.get("thumb_url", "") or ""), context) + if url: + await bot.send_link_message(target_id, url=url, title=title, description=desc, thumb_url=thumb_url) + return + + if action_type == "app": + xml = self._render_template(str(action.get("xml", "") or ""), context) + app_type = int(action.get("app_type", 0) or 0) + if xml: + await bot.send_app_message(target_id, xml=xml, type=app_type) + return + + async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """处理趣味规则消息。""" + bot: WechatAPIClient = message.get("bot") + if not bot: + return False, "bot 不可用" + + # 优先复用 can_process 阶段缓存的命中规则,避免重复匹配。 + matched_rule = message.get("_fun_rule_match") or self._find_match_rule(message) + if not matched_rule: + return False, "未命中规则" + + _, _, target_id = self._normalize_scope(message) + if not target_id: + return False, "无可用目标" + + responses = matched_rule.get("responses_json") or [] + if not isinstance(responses, list) or not responses: + return False, "规则无响应动作" + + event_key = str(message.get("_fun_event_key", "") or "") + context = self._build_message_context(message, event_key=event_key) + + try: + for action in responses: + if not isinstance(action, dict): + continue + await self._send_action(bot, target_id, action, context) + + return True, f"命中趣味规则 #{matched_rule.get('id')}" + except Exception as e: + self.LOG.error(f"趣味指令剧本执行失败: rule={matched_rule.get('id')}, error={e}") + return False, f"执行失败: {e}" diff --git a/utils/fun_command_rule_service.py b/utils/fun_command_rule_service.py new file mode 100644 index 0000000..12684d1 --- /dev/null +++ b/utils/fun_command_rule_service.py @@ -0,0 +1,300 @@ +# -*- coding: utf-8 -*- +"""趣味指令规则服务层。 + +服务层职责: +1. 聚合 MySQL + Redis + 应用内存三层缓存读写。 +2. 提供规则匹配能力给插件层。 +3. 提供后台 CRUD 的统一入口,确保变更后缓存立刻刷新。 +""" + +import json +import re +import threading +import time +from typing import Any, Dict, List, Optional, Tuple + +from loguru import logger + +from db.fun_command_rule_db import FunCommandRuleDBOperator + + +class FunCommandRuleService: + """趣味指令规则服务。""" + + # Redis 键定义:统一集中,便于后续迁移命名。 + REDIS_RULES_KEY = "fun:command:rules:all" + + def __init__(self, db_operator: FunCommandRuleDBOperator, redis_client, local_ttl_seconds: int = 30): + self.db = db_operator + self.redis = redis_client + self.local_ttl_seconds = max(int(local_ttl_seconds or 30), 5) + + # 进程内缓存:热路径优先命中,避免每条消息都打 Redis。 + self._local_lock = threading.RLock() + self._local_rules: List[Dict[str, Any]] = [] + self._local_expire_at: float = 0.0 + + # 命中冷却缓存:用于每条规则的简单限频。 + self._cooldown_lock = threading.RLock() + self._cooldown_map: Dict[str, float] = {} + + def init_tables(self) -> bool: + """初始化底层数据表。""" + return self.db.init_tables() + + # --------------------------- 缓存层 --------------------------- + def _load_rules_from_db(self) -> List[Dict[str, Any]]: + """从 MySQL 回源全量启用规则。""" + return self.db.list_rules(enabled=True) + + def _write_redis_rules(self, rules: List[Dict[str, Any]]) -> None: + """写入 Redis 持久缓存。 + + 注意:这里不设置过期时间,Redis 作为跨进程共享缓存常驻。 + """ + try: + self.redis.set(self.REDIS_RULES_KEY, json.dumps(rules or [], ensure_ascii=False)) + except Exception as e: + logger.warning(f"写入 Redis 规则缓存失败: {e}") + + def _read_redis_rules(self) -> Optional[List[Dict[str, Any]]]: + """从 Redis 读取规则缓存。""" + try: + text = self.redis.get(self.REDIS_RULES_KEY) + if not text: + return None + data = json.loads(text) + if not isinstance(data, list): + return None + return data + except Exception as e: + logger.warning(f"读取 Redis 规则缓存失败: {e}") + return None + + def _set_local_cache(self, rules: List[Dict[str, Any]]) -> None: + """更新应用内缓存。""" + with self._local_lock: + self._local_rules = list(rules or []) + self._local_expire_at = time.time() + self.local_ttl_seconds + + def _get_local_cache(self) -> Optional[List[Dict[str, Any]]]: + """读取应用内缓存。 + + 仅当未过期才返回,确保后台更新后最长 local_ttl_seconds 可见。 + """ + with self._local_lock: + if time.time() < self._local_expire_at: + return list(self._local_rules) + return None + + def refresh_cache(self) -> List[Dict[str, Any]]: + """强制刷新缓存(DB -> Redis -> Local)。""" + rules = self._load_rules_from_db() + self._write_redis_rules(rules) + self._set_local_cache(rules) + return rules + + def get_enabled_rules(self) -> List[Dict[str, Any]]: + """获取启用规则。 + + 读取顺序: + 1. 本地缓存命中直接返回(最高性能)。 + 2. Redis 命中则回填本地缓存。 + 3. MySQL 回源并回填 Redis + 本地缓存。 + """ + local_rules = self._get_local_cache() + if local_rules is not None: + return local_rules + + redis_rules = self._read_redis_rules() + if redis_rules is not None: + self._set_local_cache(redis_rules) + return redis_rules + + return self.refresh_cache() + + # --------------------------- 管理端 CRUD --------------------------- + def list_rules(self, scope_type: str = "", scope_id: str = "", enabled: Optional[bool] = None) -> List[Dict[str, Any]]: + """后台使用:按条件列规则。""" + return self.db.list_rules(scope_type=scope_type, scope_id=scope_id, enabled=enabled) + + def create_rule(self, payload: Dict[str, Any]) -> bool: + """创建规则并刷新缓存。""" + ok = self.db.create_rule(payload) + if ok: + self.refresh_cache() + return ok + + def update_rule(self, rule_id: int, payload: Dict[str, Any]) -> bool: + """更新规则并刷新缓存。""" + ok = self.db.update_rule(rule_id, payload) + if ok: + self.refresh_cache() + return ok + + def delete_rule(self, rule_id: int) -> bool: + """删除规则并刷新缓存。""" + ok = self.db.delete_rule(rule_id) + if ok: + self.refresh_cache() + return ok + + def toggle_rule(self, rule_id: int, enabled: bool, updated_by: str = "system") -> bool: + """启停规则并刷新缓存。""" + ok = self.db.toggle_rule(rule_id=rule_id, enabled=enabled, updated_by=updated_by) + if ok: + self.refresh_cache() + return ok + + # --------------------------- 规则匹配 --------------------------- + @staticmethod + def _scope_match(rule: Dict[str, Any], scope_type: str, scope_id: str) -> bool: + """判断规则作用域是否命中当前会话。""" + rule_scope_type = str(rule.get("scope_type", "global") or "global").strip().lower() + rule_scope_id = str(rule.get("scope_id", "") or "").strip() + + # global:全局可用。 + if rule_scope_type == "global": + return True + + # group/private:必须同时匹配 scope_id。 + if rule_scope_type == scope_type: + return not rule_scope_id or rule_scope_id == scope_id + + return False + + @staticmethod + def _trigger_match(rule: Dict[str, Any], event_key: str, content: str) -> bool: + """判断规则触发条件是否命中。 + + 支持: + - event(事件触发,如 PAT) + - exact/prefix/contains/regex(文本触发) + """ + trigger_type = str(rule.get("trigger_type", "exact") or "exact").strip().lower() + trigger_text = str(rule.get("trigger_text", "") or "") + target_event_key = str(rule.get("event_key", "") or "").strip().upper() + + normalized_content = str(content or "").strip() + + if trigger_type == "event": + return bool(target_event_key) and target_event_key == str(event_key or "").strip().upper() + + if not normalized_content: + return False + + if trigger_type == "exact": + return normalized_content == trigger_text + if trigger_type == "prefix": + return normalized_content.startswith(trigger_text) + if trigger_type == "contains": + return trigger_text in normalized_content + if trigger_type == "regex": + try: + return re.search(trigger_text, normalized_content) is not None + except re.error: + # 正则配置非法时直接视为不匹配,避免打断主流程。 + return False + + return False + + def _cooldown_key(self, rule_id: int, session_key: str) -> str: + """构造冷却键。 + + session_key 采用群ID/私聊ID,确保不同会话互不干扰。 + """ + return f"{int(rule_id)}::{session_key}" + + def _check_and_mark_cooldown(self, rule: Dict[str, Any], session_key: str) -> bool: + """检查并写入冷却窗口。 + + 返回 True 表示允许触发;False 表示仍在冷却中。 + """ + cooldown_seconds = int(rule.get("cooldown_seconds", 0) or 0) + if cooldown_seconds <= 0: + return True + + now = time.time() + key = self._cooldown_key(int(rule.get("id", 0) or 0), session_key) + with self._cooldown_lock: + expired_at = self._cooldown_map.get(key, 0) + if now < expired_at: + return False + self._cooldown_map[key] = now + cooldown_seconds + + # 轻量清理,防止 map 长期膨胀。 + if len(self._cooldown_map) > 5000: + stale_keys = [k for k, v in self._cooldown_map.items() if v < now] + for stale_key in stale_keys[:1000]: + self._cooldown_map.pop(stale_key, None) + + return True + + def match_rule( + self, + scope_type: str, + scope_id: str, + content: str, + event_key: str, + session_key: str, + ) -> Optional[Dict[str, Any]]: + """匹配首条可执行规则。 + + 设计为“首条命中即返回”,通过 priority 实现可控顺序。 + """ + rules = self.get_enabled_rules() + if not rules: + return None + + normalized_scope_type = str(scope_type or "global").strip().lower() + normalized_scope_id = str(scope_id or "").strip() + + for rule in rules: + if not self._scope_match(rule, normalized_scope_type, normalized_scope_id): + continue + if not self._trigger_match(rule, event_key=event_key, content=content): + continue + if not self._check_and_mark_cooldown(rule, session_key=session_key): + continue + return rule + + return None + + def validate_responses(self, responses_json: Any) -> Tuple[bool, str, List[Dict[str, Any]]]: + """校验响应动作数组。 + + 返回: + - ok: 是否通过 + - message: 错误说明 + - normalized: 归一化后的响应列表 + """ + if isinstance(responses_json, str): + try: + responses_json = json.loads(responses_json) + except Exception: + return False, "responses_json 不是合法 JSON", [] + + if not isinstance(responses_json, list) or not responses_json: + return False, "responses_json 必须是非空数组", [] + + normalized: List[Dict[str, Any]] = [] + allowed_types = {"text", "image", "voice", "video", "link", "app"} + + for idx, item in enumerate(responses_json): + if not isinstance(item, dict): + return False, f"第 {idx + 1} 条响应必须是对象", [] + + action_type = str(item.get("type", "") or "").strip().lower() + if action_type not in allowed_types: + return False, f"第 {idx + 1} 条响应 type 非法,仅支持 {sorted(allowed_types)}", [] + + delay_ms = int(item.get("delay_ms", 0) or 0) + if delay_ms < 0: + delay_ms = 0 + + normalized_item = dict(item) + normalized_item["type"] = action_type + normalized_item["delay_ms"] = delay_ms + normalized.append(normalized_item) + + return True, "ok", normalized