10 KiB
10 KiB
WechatHookBot 插件开发指南
插件系统
WechatHookBot 的插件系统完全兼容 XYBotV2,所有 XYBot 插件可以直接使用。
插件结构
plugins/
PluginName/
__init__.py # 可选,可为空
main.py # 必需,包含插件类
config.toml # 可选,插件配置
README.md # 可选,插件说明
基本插件模板
from utils.plugin_base import PluginBase
from utils.decorators import *
from WechatHook import WechatHookClient
class MyPlugin(PluginBase):
description = "插件描述"
author = "作者名"
version = "1.0.0"
def __init__(self):
super().__init__()
# 同步初始化
self.data = {}
async def async_init(self):
# 异步初始化
pass
@on_text_message(priority=50)
async def handle_text(self, client: WechatHookClient, message: dict):
"""处理文本消息"""
content = message.get("Content", "")
from_wxid = message.get("FromWxid", "")
if content == "你好":
await client.send_text(from_wxid, "你好!我是机器人")
return False # 阻止后续处理
return True # 继续执行后续处理器
事件装饰器
消息事件
| 装饰器 | 触发条件 | 参数 |
|---|---|---|
@on_text_message |
文本消息 | priority (0-99) |
@on_image_message |
图片消息 | priority |
@on_voice_message |
语音消息 | priority |
@on_video_message |
视频消息 | priority |
@on_file_message |
文件消息 | priority |
@on_at_message |
@消息 | priority |
@on_quote_message |
引用消息 | priority |
@on_card_message |
名片消息 | priority |
@on_location_message |
位置消息 | priority |
@on_link_message |
链接消息 | priority |
@on_miniapp_message |
小程序消息 | priority |
@on_emoji_message |
表情消息 | priority |
@on_revoke_message |
撤回消息 | priority |
@on_friend_request |
好友请求 | priority |
优先级机制
- 优先级范围:0-99
- 数值越大,优先级越高
- 默认优先级:50
- 按优先级从高到低执行
@on_text_message(priority=80) # 高优先级
async def handle_important(self, client, message):
pass
@on_text_message(priority=20) # 低优先级
async def handle_normal(self, client, message):
pass
阻塞机制
- 返回
False:阻止后续处理器执行 - 返回
True或不返回:继续执行
@on_text_message
async def handle_sensitive(self, client, message):
if "敏感词" in message["Content"]:
await client.send_text(message["FromWxid"], "检测到敏感内容")
return False # 阻止后续执行
return True # 继续执行
定时任务
interval - 间隔触发
@schedule('interval', seconds=30)
async def periodic_task(self, client: WechatHookClient):
"""每30秒执行一次"""
pass
@schedule('interval', minutes=5)
async def five_minutes_task(self, client: WechatHookClient):
"""每5分钟执行一次"""
pass
cron - 定时触发
@schedule('cron', hour=8, minute=30)
async def morning_task(self, client: WechatHookClient):
"""每天早上8:30执行"""
await client.send_text("wxid_xxx", "早安!")
@schedule('cron', day_of_week='mon-fri', hour='9-17')
async def work_time_task(self, client: WechatHookClient):
"""工作日9-17点每小时执行"""
pass
date - 指定时间触发
@schedule('date', run_date='2024-12-31 23:59:59')
async def new_year_task(self, client: WechatHookClient):
"""在指定时间执行一次"""
pass
消息对象结构
文本消息
{
"FromWxid": "wxid_xxx", # 发送者 wxid
"ToWxid": "wxid_yyy", # 接收者 wxid
"Content": "消息内容", # 文本内容
"MsgType": 1, # 消息类型
"IsGroup": False, # 是否群聊
"SenderWxid": "wxid_xxx", # 实际发送者(群聊时不同)
"CreateTime": 1234567890, # 创建时间戳
}
群聊消息
{
"FromWxid": "123@chatroom", # 群聊 ID
"ToWxid": "wxid_bot", # 机器人 wxid
"Content": "消息内容",
"IsGroup": True, # 是否群聊
"SenderWxid": "wxid_xxx", # 实际发送者
"Ats": ["wxid_bot"], # 被@的用户列表
}
图片消息
{
"FromWxid": "wxid_xxx",
"Content": "base64_image_data", # 图片 base64 数据
"MsgType": 3,
"ImagePath": "/path/to/image", # 图片路径(如果有)
}
WechatHookClient API
发送消息
# 发送文本
await client.send_text(wxid, "消息内容")
# 发送图片
await client.send_image(wxid, "/path/to/image.jpg")
# 发送文件
await client.send_file(wxid, "/path/to/file.pdf")
# 发送视频
await client.send_video(wxid, "/path/to/video.mp4")
# 发送名片
await client.send_card(wxid, card_wxid, card_nickname)
# 发送位置
await client.send_location(wxid, lat, lng, title, address)
# 发送链接
await client.send_link(wxid, title, desc, url, thumb_url)
# 发送小程序
await client.send_miniapp(wxid, appid, title, page_path, thumb_url)
# 群聊@消息
await client.send_at_message(chatroom_id, content, at_list)
# 撤回消息
await client.revoke_message(msg_id)
好友管理
# 获取好友列表
friends = await client.get_friend_list()
# 获取好友信息
info = await client.get_friend_info(wxid)
# 搜索用户
result = await client.search_user(keyword)
# 添加好友
await client.add_friend(wxid, verify_msg)
# 同意好友请求
await client.accept_friend(v3, v4, scene)
# 删除好友
await client.delete_friend(wxid)
# 修改备注
await client.set_friend_remark(wxid, remark)
# 检测好友状态
status = await client.check_friend_status(wxid)
群聊管理
# 获取群聊列表
chatrooms = await client.get_chatroom_list()
# 获取群成员
members = await client.get_chatroom_members(chatroom_id)
# 获取群信息
info = await client.get_chatroom_info(chatroom_id)
# 创建群聊
chatroom_id = await client.create_chatroom(member_list)
# 邀请进群
await client.invite_to_chatroom(chatroom_id, wxid_list)
# 踢出群成员
await client.remove_chatroom_member(chatroom_id, wxid_list)
# 退出群聊
await client.quit_chatroom(chatroom_id)
# 修改群名称
await client.set_chatroom_name(chatroom_id, name)
# 修改群公告
await client.set_chatroom_announcement(chatroom_id, announcement)
# 修改我的群昵称
await client.set_my_chatroom_nickname(chatroom_id, nickname)
数据库使用
KeyvalDB - 键值存储
from database.keyvalDB import KeyvalDB
keyval_db = KeyvalDB()
# 设置值
await keyval_db.set("key", "value")
# 获取值
value = await keyval_db.get("key")
# 删除值
await keyval_db.delete("key")
XYBotDB - 业务数据
from database.XYBotDB import XYBotDB
db = XYBotDB()
# 使用 SQLAlchemy 操作数据库
# 参考 XYBot 的数据库使用方式
配置文件
插件配置 (config.toml)
[basic]
enable = true
[settings]
api_key = "your_api_key"
timeout = 30
读取配置
import tomllib
import os
def __init__(self):
super().__init__()
config_path = os.path.join(os.path.dirname(__file__), "config.toml")
with open(config_path, "rb") as f:
config = tomllib.load(f)
self.enable = config.get("basic", {}).get("enable", False)
self.api_key = config.get("settings", {}).get("api_key", "")
日志系统
使用 loguru 记录日志:
from loguru import logger
# 不同级别的日志
logger.debug("调试信息")
logger.info("普通信息")
logger.success("成功信息")
logger.warning("警告信息")
logger.error("错误信息")
# 带参数的日志
logger.info("收到消息: {}", message)
异步编程
所有插件函数必须是异步函数:
# ✅ 正确
@on_text_message
async def handle_text(self, client, message):
await client.send_text(...)
# ❌ 错误
@on_text_message
def handle_text(self, client, message): # 缺少 async
client.send_text(...) # 缺少 await
使用阻塞函数
如需使用阻塞函数,使用 asyncio.run_in_executor:
import asyncio
@on_text_message
async def handle_text(self, client, message):
# 在线程池中运行阻塞函数
result = await asyncio.to_thread(blocking_function, arg1, arg2)
完整示例
from utils.plugin_base import PluginBase
from utils.decorators import *
from WechatHook import WechatHookClient
from loguru import logger
import tomllib
import os
class ExamplePlugin(PluginBase):
description = "示例插件"
author = "Your Name"
version = "1.0.0"
def __init__(self):
super().__init__()
# 读取配置
config_path = os.path.join(os.path.dirname(__file__), "config.toml")
with open(config_path, "rb") as f:
config = tomllib.load(f)
self.enable = config.get("basic", {}).get("enable", False)
self.keyword = config.get("settings", {}).get("keyword", "你好")
async def async_init(self):
logger.info("ExamplePlugin 初始化完成")
@on_text_message(priority=50)
async def handle_text(self, client: WechatHookClient, message: dict):
if not self.enable:
return
content = message.get("Content", "")
from_wxid = message.get("FromWxid", "")
if self.keyword in content:
await client.send_text(from_wxid, f"你说了关键词:{self.keyword}")
logger.info(f"触发关键词回复: {from_wxid}")
@on_at_message(priority=60)
async def handle_at(self, client: WechatHookClient, message: dict):
if not self.enable:
return
content = message.get("Content", "")
from_wxid = message.get("FromWxid", "")
await client.send_text(from_wxid, "你@了我!")
@schedule('cron', hour=9, minute=0)
async def morning_greeting(self, client: WechatHookClient):
if not self.enable:
return
# 每天早上9点发送问候
await client.send_text("wxid_xxx", "早安!新的一天开始了")
插件管理
禁用插件
在 main_config.toml 中添加:
[Bot]
disabled-plugins = ["ExamplePlugin", "AnotherPlugin"]
删除插件
直接删除 plugins/ 下对应的文件夹
热重载
通过 WebUI 或 ManagePlugin 插件实现热重载