Files
abot/plugins/fun_command_play/main.py
liuwei b83bb8eb37 限制拍一拍规则仅在用户拍机器人本人时触发
1. 新增事件解析约束:PAT事件仅当pattedusername等于当前机器人wxid时生效。

2. 用户互拍等非拍机器人场景返回空事件,不再命中拍一拍规则。

3. can_process与process_message统一复用该事件解析结果,避免重复判断不一致。
2026-04-23 14:16:55 +08:00

588 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""趣味指令剧本插件。
核心目标:
1. 让机器人支持“文案/事件 -> 多媒体回应”的可配置玩法。
2. 把玩法规则彻底数据化,便于后续持续收集、扩展梗库。
3. 将“拍一拍”作为内置事件纳入统一触发体系。
"""
import asyncio
import os
import threading
from collections import OrderedDict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import xml.etree.ElementTree as ET
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.fun_command_rule_db import FunCommandRuleDBOperator
from utils.fun_command_rule_service import FunCommandRuleService
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.message import MessageType
class FunCommandPlayPlugin(MessagePluginInterface):
"""趣味指令剧本插件。"""
FEATURE_KEY = "FUN_COMMAND_PLAY"
FEATURE_DESCRIPTION = "🎭 趣味指令剧本 [配置文案/事件触发多媒体玩法回复]"
@property
def name(self) -> str:
return "趣味指令剧本"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "支持文案与事件触发的趣味玩法回复文本、图片、语音、视频、卡片、App消息"
@property
def author(self) -> str:
return "codex"
@property
def commands(self) -> List[str]:
# 该插件是“被动触发型”,不依赖固定命令词。
return []
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
# 注册群级权限开关,方便你按群控制玩法是否启用。
self.feature = self.register_feature()
self.rule_service: Optional[FunCommandRuleService] = None
self.enable = True
# 媒资缓存开关:默认开启,首次发送时读磁盘,后续直接走内存,降低 I/O 开销。
self.media_cache_enable = True
# 单个文件允许缓存的最大体积(字节)。超过阈值则只发送不入缓存,防止单文件挤爆内存。
self.media_cache_max_file_bytes = 80 * 1024 * 1024
# 全局缓存总上限(字节)。采用 LRU 淘汰,越久未使用越先被移除。
self.media_cache_max_total_bytes = 200 * 1024 * 1024
# 缓存结构key -> bytes。OrderedDict 用于维护访问顺序,实现 LRU。
self._media_cache: "OrderedDict[str, bytes]" = OrderedDict()
self._media_cache_total_bytes = 0
self._media_cache_lock = threading.RLock()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件。"""
self.LOG = logger
self.LOG.debug(f"正在初始化 {self.name} 插件...")
db_manager = context.get("db_manager")
if db_manager is None:
self.LOG.error("未拿到 db_manager趣味指令剧本插件初始化失败")
return False
# 读取开关配置:保留插件级总开关,便于快速停用。
plugin_cfg = self._config.get("FunCommandPlay", {})
self.enable = bool(plugin_cfg.get("enable", True))
# 媒资缓存参数支持通过配置覆盖,便于你按机器内存规模灵活调整。
# 约定单位为 MB方便运维理解和调整。
self.media_cache_enable = bool(plugin_cfg.get("media-cache-enable", True))
self.media_cache_max_file_bytes = max(
int(plugin_cfg.get("media-cache-max-file-mb", 80) or 80),
1
) * 1024 * 1024
self.media_cache_max_total_bytes = max(
int(plugin_cfg.get("media-cache-max-total-mb", 200) or 200),
10
) * 1024 * 1024
# 初始化规则服务,确保首次启动就有表结构。
redis_client = db_manager.get_redis_connection()
db_operator = FunCommandRuleDBOperator(db_manager)
self.rule_service = FunCommandRuleService(db_operator=db_operator, redis_client=redis_client, local_ttl_seconds=30)
if not self.rule_service.init_tables():
self.LOG.error("趣味指令规则表初始化失败")
return False
# 启动时预热一次缓存,减少第一条消息延迟。
self.rule_service.refresh_cache()
self.LOG.debug(f"[{self.name}] 插件初始化完成")
return True
@staticmethod
def _build_media_cache_key(media_kind: str, media_path: str) -> str:
"""构建媒资缓存键。
关键点:
1. 键里带上绝对路径 + 文件修改时间 + 文件大小,确保文件内容变更后会自动命中新键。
2. 不依赖手动清缓存,更新本地文件后下一次发送会自然回源读取新内容。
"""
abs_path = os.path.abspath(media_path)
stat = os.stat(abs_path)
return f"{media_kind}:{abs_path}:{int(stat.st_mtime_ns)}:{int(stat.st_size)}"
def _try_get_media_bytes_from_cache(self, cache_key: str) -> Optional[bytes]:
"""尝试从内存缓存读取媒资字节。
读取后会把条目移动到末尾,表示最近访问,配合 LRU 淘汰策略使用。
"""
with self._media_cache_lock:
cached = self._media_cache.get(cache_key)
if cached is None:
return None
self._media_cache.move_to_end(cache_key)
return cached
def _put_media_bytes_to_cache(self, cache_key: str, payload: bytes) -> None:
"""写入媒资到内存缓存,并执行 LRU 淘汰。
规则:
1. 单文件超过阈值不入缓存。
2. 新写入前先清理旧键占用(若覆盖)。
3. 超过总容量时,从最久未使用项开始淘汰。
"""
if not self.media_cache_enable:
return
if payload is None:
return
payload_size = len(payload)
if payload_size <= 0:
return
if payload_size > self.media_cache_max_file_bytes:
return
with self._media_cache_lock:
old_payload = self._media_cache.pop(cache_key, None)
if old_payload is not None:
self._media_cache_total_bytes -= len(old_payload)
self._media_cache[cache_key] = payload
self._media_cache_total_bytes += payload_size
self._media_cache.move_to_end(cache_key)
while self._media_cache_total_bytes > self.media_cache_max_total_bytes and self._media_cache:
stale_key, stale_payload = self._media_cache.popitem(last=False)
self._media_cache_total_bytes -= len(stale_payload)
self.LOG.debug(f"[{self.name}] 媒资缓存淘汰: {stale_key}")
def _load_media_bytes(self, media_kind: str, media_path: str) -> Optional[bytes]:
"""加载媒资字节:先缓存,后磁盘。
行为:
1. 首次发送读取磁盘并缓存。
2. 后续发送命中缓存,避免重复磁盘 I/O。
"""
if not media_path:
return None
if not os.path.exists(media_path):
self.LOG.warning(f"[{self.name}] 媒资路径不存在: {media_path}")
return None
try:
cache_key = self._build_media_cache_key(media_kind=media_kind, media_path=media_path)
except Exception as e:
self.LOG.warning(f"[{self.name}] 构建媒资缓存键失败,将回退磁盘读取: path={media_path}, error={e}")
cache_key = ""
if cache_key:
cached = self._try_get_media_bytes_from_cache(cache_key)
if cached is not None:
return cached
try:
with open(media_path, "rb") as fp:
payload = fp.read()
except Exception as e:
self.LOG.error(f"[{self.name}] 读取媒资文件失败: path={media_path}, error={e}")
return None
if cache_key:
self._put_media_bytes_to_cache(cache_key, payload)
return payload
@staticmethod
def _infer_voice_format_by_path(voice_path: str, configured_format: str) -> str:
"""根据语音文件路径推断发送格式。
设计说明:
1. 参考 message_push.py 的既有稳定逻辑:优先按文件后缀判断格式。
2. 若后缀可识别wav/mp3/amr直接使用后缀避免“配置格式与真实文件不一致”导致解码失败。
3. 若后缀不可识别,再回退到配置值;配置也无效时最终默认 mp3。
"""
suffix = Path(str(voice_path or "")).suffix.lower().strip()
if suffix == ".wav":
return "wav"
if suffix == ".amr":
return "amr"
if suffix == ".mp3":
return "mp3"
normalized_cfg = str(configured_format or "").strip().lower()
if normalized_cfg in {"wav", "mp3", "amr"}:
return normalized_cfg
return "mp3"
async def _send_voice_with_fallback(
self,
bot: WechatAPIClient,
target_id: str,
voice_bytes: bytes,
voice_path: str,
configured_format: str,
) -> None:
"""发送语音并进行格式兜底重试。
背景:
- 在线上规则维护中,常见问题是“文件实际是 wav但配置里误写 mp3”
这会触发 ffmpeg 解码失败Header missing
策略:
1. 首次发送:使用“后缀优先”推断格式(与 message_push.py 保持一致)。
2. 失败后自动尝试其它格式mp3/wav/amr提高容错性降低人工维护成本。
"""
first_try = self._infer_voice_format_by_path(voice_path=voice_path, configured_format=configured_format)
candidates: List[str] = [first_try]
for fallback in ("mp3", "wav", "amr"):
if fallback not in candidates:
candidates.append(fallback)
last_error: Optional[Exception] = None
for fmt in candidates:
try:
await bot.send_voice_message(target_id, voice_bytes, fmt)
if fmt != first_try:
self.LOG.warning(
f"[{self.name}] 语音发送触发格式兜底重试成功: path={voice_path}, first={first_try}, final={fmt}"
)
return
except Exception as e:
last_error = e
self.LOG.warning(
f"[{self.name}] 语音发送失败,尝试下一个格式: path={voice_path}, format={fmt}, error={e}"
)
if last_error:
raise last_error
def start(self) -> bool:
self.status = PluginStatus.RUNNING
self.LOG.info(f"[{self.name}] 插件已启动")
return True
def stop(self) -> bool:
self.status = PluginStatus.STOPPED
self.LOG.info(f"[{self.name}] 插件已停止")
return True
@staticmethod
def _normalize_scope(message: Dict[str, Any]) -> Tuple[str, str, str]:
"""标准化作用域信息。
返回:
- scope_type: global/group/private
- scope_id: 群ID或用户ID
- target_id: 发送目标群ID优先否则私聊用户ID
"""
room_id = str(message.get("roomid", "") or "").strip()
sender = str(message.get("sender", "") or "").strip()
if room_id:
return "group", room_id, room_id
return "private", sender, sender
@staticmethod
def _parse_pat_event_meta(message: Dict[str, Any]) -> Dict[str, str]:
"""解析拍一拍事件元数据。
兼容格式示例(用户实测):
<sysmsg type="pat">
<pat>
<fromusername>Jyunere</fromusername>
<chatusername>56594698995@chatroom</chatusername>
<pattedusername>wxid_xxx</pattedusername>
<template><![CDATA["${Jyunere}" 拍了拍 "${wxid_xxx}"]]></template>
</pat>
</sysmsg>
返回字段:
- event_key: PAT
- pat_from_username / pat_chatusername / pat_pattedusername
- pat_suffix / pat_suffix_version / pat_template
"""
content = str(message.get("content", "") or "")
if not content:
return {}
normalized = content.strip()
if "<sysmsg" not in normalized:
return {}
try:
root = ET.fromstring(normalized)
except ET.ParseError:
return {}
if root.tag != "sysmsg":
return {}
if str(root.attrib.get("type", "") or "").strip().lower() != "pat":
return {}
pat_node = root.find("pat")
if pat_node is None:
return {}
def _read_text(tag_name: str) -> str:
node = pat_node.find(tag_name)
if node is None or node.text is None:
return ""
return str(node.text or "").strip()
return {
"event_key": "PAT",
"pat_from_username": _read_text("fromusername"),
"pat_chatusername": _read_text("chatusername"),
"pat_pattedusername": _read_text("pattedusername"),
"pat_suffix": _read_text("patsuffix"),
"pat_suffix_version": _read_text("patsuffixversion"),
"pat_template": _read_text("template"),
}
@staticmethod
def _extract_event_key(message: Dict[str, Any]) -> str:
"""提取事件触发键。"""
# 优先走结构化解析sysmsg type="pat" 命中即判定为 PAT。
pat_meta = FunCommandPlayPlugin._parse_pat_event_meta(message)
if pat_meta.get("event_key"):
return pat_meta["event_key"]
content = str(message.get("content", "") or "")
full_msg = message.get("full_wx_msg")
# 通过消息枚举类型识别系统消息,再结合关键词更稳妥。
msg_type = getattr(full_msg, "msg_type", None)
msg_type_value = getattr(msg_type, "value", msg_type)
is_system = str(msg_type_value) in {str(MessageType.SYSTEM.value), str(MessageType.SYSTEM_NOTIFY.value), "10000", "10002"}
# 注意:这里明确不再用 "<patMsg" 子串做判定。
# 原因:大量普通 appmsg 结构里也会带 patMsg 节点(例如 recordNum=0
# 会导致非拍一拍消息被误识别成 PAT 事件。
if is_system and "拍了拍" in content:
return "PAT"
return ""
@staticmethod
def _build_message_context(message: Dict[str, Any], event_key: str, event_meta: Optional[Dict[str, str]] = None) -> Dict[str, str]:
"""构建模板变量上下文。"""
room_id = str(message.get("roomid", "") or "")
sender = str(message.get("sender", "") or "")
context = {
"sender": sender,
"roomid": room_id,
"event": event_key or "",
}
# 拍一拍扩展变量,便于在规则文案中使用更精细的占位符。
# 例如:{pat_from_username}、{pat_pattedusername}、{pat_template}
for key, value in (event_meta or {}).items():
context[str(key)] = str(value or "")
return context
@staticmethod
def _render_template(text: str, context: Dict[str, str]) -> str:
"""轻量模板替换。
使用 {sender}/{roomid}/{event} 占位符,
保持简单可控,避免引入模板引擎复杂性。
"""
output = str(text or "")
for key, value in (context or {}).items():
output = output.replace(f"{{{key}}}", str(value or ""))
return output
def _find_match_rule(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""查找命中规则。"""
if not self.rule_service:
return None
scope_type, scope_id, _ = self._normalize_scope(message)
content = str(message.get("content", "") or "").strip()
event_key, _ = self._resolve_effective_event(message)
session_key = scope_id or str(message.get("sender", "") or "")
return self.rule_service.match_rule(
scope_type=scope_type,
scope_id=scope_id,
content=content,
event_key=event_key,
session_key=session_key,
)
def _resolve_current_wxid(self, message: Dict[str, Any]) -> str:
"""解析当前机器人 wxid。"""
# 优先使用已注入的 self.bot主流程没有则回退 message['bot'](兜底)。
bot_obj = self.bot if getattr(self, "bot", None) else message.get("bot")
return str(getattr(bot_obj, "wxid", "") or "").strip()
def _resolve_effective_event(self, message: Dict[str, Any]) -> Tuple[str, Dict[str, str]]:
"""解析最终可用事件。
关键约束:
- 对于拍一拍PAT仅当“被拍者是机器人自己”才返回 PAT。
- 其他拍一拍场景(用户拍用户)返回空事件,避免误触发规则。
"""
pat_meta = self._parse_pat_event_meta(message)
if pat_meta.get("event_key") == "PAT":
current_wxid = self._resolve_current_wxid(message)
patted_username = str(pat_meta.get("pat_pattedusername", "") or "").strip()
if current_wxid and patted_username and patted_username == current_wxid:
return "PAT", pat_meta
return "", {}
# 非结构化 PAT 场景走原有兜底识别。
return self._extract_event_key(message), {}
def can_process(self, message: Dict[str, Any]) -> bool:
"""判断是否可处理。
说明:
1. 只在插件总开关开启时参与匹配。
2. 只处理群聊与私聊文本/系统类消息,不处理空内容。
3. 群聊下会遵循群权限开关。
"""
if not self.enable or not self.rule_service:
return False
content = str(message.get("content", "") or "").strip()
if not content:
return False
sender = str(message.get("sender", "") or "").strip()
room_id = str(message.get("roomid", "") or "").strip()
gbm: GroupBotManager = message.get("gbm")
# 防止机器人自回复导致循环。
if self.bot and sender and sender == getattr(self.bot, "wxid", ""):
return False
# 群聊场景遵循群级权限。
if room_id and gbm and gbm.get_group_permission(room_id, self.feature) == PermissionStatus.DISABLED:
return False
# 先做一次匹配并塞入 messageprocess_message 阶段直接复用,减少重复计算。
matched_rule = self._find_match_rule(message)
if matched_rule:
event_key, event_meta = self._resolve_effective_event(message)
message["_fun_rule_match"] = matched_rule
message["_fun_event_key"] = str(event_key or "")
message["_fun_event_meta"] = event_meta
return True
return False
async def _send_action(self, bot: WechatAPIClient, target_id: str, action: Dict[str, Any], context: Dict[str, str]) -> None:
"""发送单条响应动作。"""
action_type = str(action.get("type", "") or "").strip().lower()
# 支持配置 delay_ms模拟“连发节奏感”。
delay_ms = int(action.get("delay_ms", 0) or 0)
if delay_ms > 0:
await asyncio.sleep(delay_ms / 1000.0)
if action_type == "text":
text = self._render_template(str(action.get("text", "") or ""), context)
if text:
await bot.send_text_message(target_id, text)
return
if action_type == "image":
image_path = self._render_template(str(action.get("path", "") or ""), context)
image_bytes = self._load_media_bytes(media_kind="image", media_path=image_path)
if image_bytes:
await bot.send_image_message(target_id, image_bytes)
return
if action_type == "voice":
voice_path = self._render_template(str(action.get("path", "") or ""), context)
voice_format = str(action.get("format", "") or "").strip().lower()
voice_bytes = self._load_media_bytes(media_kind="voice", media_path=voice_path)
if voice_bytes:
await self._send_voice_with_fallback(
bot=bot,
target_id=target_id,
voice_bytes=voice_bytes,
voice_path=voice_path,
configured_format=voice_format,
)
return
if action_type == "video":
video_path = self._render_template(str(action.get("path", "") or ""), context)
cover_path = self._render_template(str(action.get("cover_path", "") or ""), context)
video_bytes = self._load_media_bytes(media_kind="video", media_path=video_path)
if video_bytes:
cover_bytes = self._load_media_bytes(media_kind="image", media_path=cover_path) if cover_path else None
if cover_bytes:
await bot.send_video_message(target_id, video_bytes, cover_bytes)
else:
await bot.send_video_message(target_id, video_bytes)
return
if action_type == "link":
title = self._render_template(str(action.get("title", "") or ""), context)
desc = self._render_template(str(action.get("desc", "") or ""), context)
url = self._render_template(str(action.get("url", "") or ""), context)
thumb_url = self._render_template(str(action.get("thumb_url", "") or ""), context)
if url:
await bot.send_link_message(target_id, url=url, title=title, description=desc, thumb_url=thumb_url)
return
if action_type == "app":
xml = self._render_template(str(action.get("xml", "") or ""), context)
app_type = int(action.get("app_type", 0) or 0)
if xml:
await bot.send_app_message(target_id, xml=xml, type=app_type)
return
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理趣味规则消息。"""
bot: WechatAPIClient = message.get("bot")
if not bot:
return False, "bot 不可用"
# 优先复用 can_process 阶段缓存的命中规则,避免重复匹配。
matched_rule = message.get("_fun_rule_match") or self._find_match_rule(message)
if not matched_rule:
return False, "未命中规则"
_, _, target_id = self._normalize_scope(message)
if not target_id:
return False, "无可用目标"
responses = matched_rule.get("responses_json") or []
if not isinstance(responses, list) or not responses:
return False, "规则无响应动作"
event_key = str(message.get("_fun_event_key", "") or "")
event_meta = message.get("_fun_event_meta")
if not isinstance(event_meta, dict):
_, event_meta = self._resolve_effective_event(message)
context = self._build_message_context(message, event_key=event_key, event_meta=event_meta)
try:
for action in responses:
if not isinstance(action, dict):
continue
await self._send_action(bot, target_id, action, context)
return True, f"命中趣味规则 #{matched_rule.get('id')}"
except Exception as e:
self.LOG.error(f"趣味指令剧本执行失败: rule={matched_rule.get('id')}, error={e}")
return False, f"执行失败: {e}"