Files
WechatHookBot/docs/插件开发.md
2025-12-03 15:48:44 +08:00

10 KiB
Raw Permalink Blame History

WechatHookBot 插件开发指南

插件系统

WechatHookBot 的插件系统完全兼容 XYBotV2所有 XYBot 插件可以直接使用。

插件结构

plugins/
  PluginName/
    __init__.py      # 可选,可为空
    main.py          # 必需,包含插件类
    config.toml      # 可选,插件配置
    README.md        # 可选,插件说明

基本插件模板

from utils.plugin_base import PluginBase
from utils.decorators import *
from WechatHook import WechatHookClient

class MyPlugin(PluginBase):
    description = "插件描述"
    author = "作者名"
    version = "1.0.0"

    def __init__(self):
        super().__init__()
        # 同步初始化
        self.data = {}

    async def async_init(self):
        # 异步初始化
        pass

    @on_text_message(priority=50)
    async def handle_text(self, client: WechatHookClient, message: dict):
        """处理文本消息"""
        content = message.get("Content", "")
        from_wxid = message.get("FromWxid", "")

        if content == "你好":
            await client.send_text(from_wxid, "你好!我是机器人")
            return False  # 阻止后续处理

        return True  # 继续执行后续处理器

事件装饰器

消息事件

装饰器 触发条件 参数
@on_text_message 文本消息 priority (0-99)
@on_image_message 图片消息 priority
@on_voice_message 语音消息 priority
@on_video_message 视频消息 priority
@on_file_message 文件消息 priority
@on_at_message @消息 priority
@on_quote_message 引用消息 priority
@on_card_message 名片消息 priority
@on_location_message 位置消息 priority
@on_link_message 链接消息 priority
@on_miniapp_message 小程序消息 priority
@on_emoji_message 表情消息 priority
@on_revoke_message 撤回消息 priority
@on_friend_request 好友请求 priority

优先级机制

  • 优先级范围0-99
  • 数值越大,优先级越高
  • 默认优先级50
  • 按优先级从高到低执行
@on_text_message(priority=80)  # 高优先级
async def handle_important(self, client, message):
    pass

@on_text_message(priority=20)  # 低优先级
async def handle_normal(self, client, message):
    pass

阻塞机制

  • 返回 False:阻止后续处理器执行
  • 返回 True 或不返回:继续执行
@on_text_message
async def handle_sensitive(self, client, message):
    if "敏感词" in message["Content"]:
        await client.send_text(message["FromWxid"], "检测到敏感内容")
        return False  # 阻止后续执行
    return True  # 继续执行

定时任务

interval - 间隔触发

@schedule('interval', seconds=30)
async def periodic_task(self, client: WechatHookClient):
    """每30秒执行一次"""
    pass

@schedule('interval', minutes=5)
async def five_minutes_task(self, client: WechatHookClient):
    """每5分钟执行一次"""
    pass

cron - 定时触发

@schedule('cron', hour=8, minute=30)
async def morning_task(self, client: WechatHookClient):
    """每天早上8:30执行"""
    await client.send_text("wxid_xxx", "早安!")

@schedule('cron', day_of_week='mon-fri', hour='9-17')
async def work_time_task(self, client: WechatHookClient):
    """工作日9-17点每小时执行"""
    pass

date - 指定时间触发

@schedule('date', run_date='2024-12-31 23:59:59')
async def new_year_task(self, client: WechatHookClient):
    """在指定时间执行一次"""
    pass

消息对象结构

文本消息

{
    "FromWxid": "wxid_xxx",           # 发送者 wxid
    "ToWxid": "wxid_yyy",             # 接收者 wxid
    "Content": "消息内容",             # 文本内容
    "MsgType": 1,                     # 消息类型
    "IsGroup": False,                 # 是否群聊
    "SenderWxid": "wxid_xxx",         # 实际发送者(群聊时不同)
    "CreateTime": 1234567890,         # 创建时间戳
}

群聊消息

{
    "FromWxid": "123@chatroom",       # 群聊 ID
    "ToWxid": "wxid_bot",             # 机器人 wxid
    "Content": "消息内容",
    "IsGroup": True,                  # 是否群聊
    "SenderWxid": "wxid_xxx",         # 实际发送者
    "Ats": ["wxid_bot"],              # 被@的用户列表
}

图片消息

{
    "FromWxid": "wxid_xxx",
    "Content": "base64_image_data",   # 图片 base64 数据
    "MsgType": 3,
    "ImagePath": "/path/to/image",    # 图片路径(如果有)
}

WechatHookClient API

发送消息

# 发送文本
await client.send_text(wxid, "消息内容")

# 发送图片
await client.send_image(wxid, "/path/to/image.jpg")

# 发送文件
await client.send_file(wxid, "/path/to/file.pdf")

# 发送视频
await client.send_video(wxid, "/path/to/video.mp4")

# 发送名片
await client.send_card(wxid, card_wxid, card_nickname)

# 发送位置
await client.send_location(wxid, lat, lng, title, address)

# 发送链接
await client.send_link(wxid, title, desc, url, thumb_url)

# 发送小程序
await client.send_miniapp(wxid, appid, title, page_path, thumb_url)

# 群聊@消息
await client.send_at_message(chatroom_id, content, at_list)

# 撤回消息
await client.revoke_message(msg_id)

好友管理

# 获取好友列表
friends = await client.get_friend_list()

# 获取好友信息
info = await client.get_friend_info(wxid)

# 搜索用户
result = await client.search_user(keyword)

# 添加好友
await client.add_friend(wxid, verify_msg)

# 同意好友请求
await client.accept_friend(v3, v4, scene)

# 删除好友
await client.delete_friend(wxid)

# 修改备注
await client.set_friend_remark(wxid, remark)

# 检测好友状态
status = await client.check_friend_status(wxid)

群聊管理

# 获取群聊列表
chatrooms = await client.get_chatroom_list()

# 获取群成员
members = await client.get_chatroom_members(chatroom_id)

# 获取群信息
info = await client.get_chatroom_info(chatroom_id)

# 创建群聊
chatroom_id = await client.create_chatroom(member_list)

# 邀请进群
await client.invite_to_chatroom(chatroom_id, wxid_list)

# 踢出群成员
await client.remove_chatroom_member(chatroom_id, wxid_list)

# 退出群聊
await client.quit_chatroom(chatroom_id)

# 修改群名称
await client.set_chatroom_name(chatroom_id, name)

# 修改群公告
await client.set_chatroom_announcement(chatroom_id, announcement)

# 修改我的群昵称
await client.set_my_chatroom_nickname(chatroom_id, nickname)

数据库使用

KeyvalDB - 键值存储

from database.keyvalDB import KeyvalDB

keyval_db = KeyvalDB()

# 设置值
await keyval_db.set("key", "value")

# 获取值
value = await keyval_db.get("key")

# 删除值
await keyval_db.delete("key")

XYBotDB - 业务数据

from database.XYBotDB import XYBotDB

db = XYBotDB()

# 使用 SQLAlchemy 操作数据库
# 参考 XYBot 的数据库使用方式

配置文件

插件配置 (config.toml)

[basic]
enable = true

[settings]
api_key = "your_api_key"
timeout = 30

读取配置

import tomllib
import os

def __init__(self):
    super().__init__()

    config_path = os.path.join(os.path.dirname(__file__), "config.toml")
    with open(config_path, "rb") as f:
        config = tomllib.load(f)

    self.enable = config.get("basic", {}).get("enable", False)
    self.api_key = config.get("settings", {}).get("api_key", "")

日志系统

使用 loguru 记录日志:

from loguru import logger

# 不同级别的日志
logger.debug("调试信息")
logger.info("普通信息")
logger.success("成功信息")
logger.warning("警告信息")
logger.error("错误信息")

# 带参数的日志
logger.info("收到消息: {}", message)

异步编程

所有插件函数必须是异步函数:

# ✅ 正确
@on_text_message
async def handle_text(self, client, message):
    await client.send_text(...)

# ❌ 错误
@on_text_message
def handle_text(self, client, message):  # 缺少 async
    client.send_text(...)  # 缺少 await

使用阻塞函数

如需使用阻塞函数,使用 asyncio.run_in_executor

import asyncio

@on_text_message
async def handle_text(self, client, message):
    # 在线程池中运行阻塞函数
    result = await asyncio.to_thread(blocking_function, arg1, arg2)

完整示例

from utils.plugin_base import PluginBase
from utils.decorators import *
from WechatHook import WechatHookClient
from loguru import logger
import tomllib
import os

class ExamplePlugin(PluginBase):
    description = "示例插件"
    author = "Your Name"
    version = "1.0.0"

    def __init__(self):
        super().__init__()

        # 读取配置
        config_path = os.path.join(os.path.dirname(__file__), "config.toml")
        with open(config_path, "rb") as f:
            config = tomllib.load(f)

        self.enable = config.get("basic", {}).get("enable", False)
        self.keyword = config.get("settings", {}).get("keyword", "你好")

    async def async_init(self):
        logger.info("ExamplePlugin 初始化完成")

    @on_text_message(priority=50)
    async def handle_text(self, client: WechatHookClient, message: dict):
        if not self.enable:
            return

        content = message.get("Content", "")
        from_wxid = message.get("FromWxid", "")

        if self.keyword in content:
            await client.send_text(from_wxid, f"你说了关键词:{self.keyword}")
            logger.info(f"触发关键词回复: {from_wxid}")

    @on_at_message(priority=60)
    async def handle_at(self, client: WechatHookClient, message: dict):
        if not self.enable:
            return

        content = message.get("Content", "")
        from_wxid = message.get("FromWxid", "")

        await client.send_text(from_wxid, "你@了我!")

    @schedule('cron', hour=9, minute=0)
    async def morning_greeting(self, client: WechatHookClient):
        if not self.enable:
            return

        # 每天早上9点发送问候
        await client.send_text("wxid_xxx", "早安!新的一天开始了")

插件管理

禁用插件

main_config.toml 中添加:

[Bot]
disabled-plugins = ["ExamplePlugin", "AnotherPlugin"]

删除插件

直接删除 plugins/ 下对应的文件夹

热重载

通过 WebUI 或 ManagePlugin 插件实现热重载