5.5 KiB
5.5 KiB
技术要点
核心架构
1. DLL Hook 机制
工作流程:
1. 加载 Loader.dll
2. 创建共享内存 (关键!)
3. 注册 Socket 回调
4. 调用 InjectWeChat 注入 Helper.dll
5. 等待 Socket 连接回调
6. 使用 Socket Client ID 发送 API 请求
关键代码:
# 共享内存创建 (必须在 Loader 之前)
create_shared_memory()
# 注册回调
add_callback_handler(self)
loader = NoveLoader(loader_dll)
# 注入微信
process_id = loader.InjectWeChat(helper_dll)
# 等待 Socket 连接
# socket_client_id 从回调中获取 (通常为 1)
# 使用 Socket ID 发送请求
client = WechatHookClient(loader, socket_client_id)
2. 消息类型映射
实际测试的消息类型:
MT_DEBUG_LOG = 11024 # 调试日志
MT_USER_LOGIN = 11025 # 用户登录 (自动推送)
MT_TEXT = 11046 # 文本消息
MT_IMAGE = 11047 # 图片消息
MT_VOICE = 11048 # 语音消息
MT_VIDEO = 11049 # 视频消息
MT_EMOJI = 11050 # 表情消息
MT_LOCATION = 11051 # 位置消息
MT_LINK = 11052 # 链接消息
MT_FILE = 11053 # 文件消息
MT_MINIAPP = 11054 # 小程序消息
MT_CARD = 11055 # 名片消息
MT_FRIEND_REQUEST = 11056 # 好友请求
MT_REVOKE = 11057 # 撤回消息
MT_SYSTEM = 11058 # 系统消息
3. 消息数据结构
文本消息 (type=11046):
{
"at_user_list": [],
"from_wxid": "wxid_xxx",
"is_pc": 0,
"msg": "消息内容",
"msgid": "123456789",
"room_wxid": "",
"timestamp": 1762940000,
"to_wxid": "wxid_yyy",
"wx_type": 1
}
登录信息 (type=11025):
{
"account": "账号",
"avatar": "http://...",
"device_id": "设备ID",
"nickname": "昵称",
"phone": "手机号",
"wxid": "wxid_xxx",
"wx_user_dir": "C:\\..."
}
4. 群聊消息判断
关键字段:
room_wxid: 群聊 ID,私聊时为空from_wxid: 发送者 wxidto_wxid: 接收者 (群聊时是群 ID)
判断逻辑:
room_wxid = data.get("room_wxid", "")
if room_wxid:
# 群聊消息
message["IsGroup"] = True
message["FromWxid"] = room_wxid
message["SenderWxid"] = data.get("from_wxid", "")
else:
# 私聊消息
message["IsGroup"] = False
message["FromWxid"] = data.get("from_wxid", "")
5. 异步回调处理
问题: 回调在同步线程中执行,但需要调用异步方法
解决方案:
# 在初始化时保存事件循环
self.event_loop = asyncio.get_event_loop()
# 在回调中使用
asyncio.run_coroutine_threadsafe(
self.hookbot.process_message(msg_type, data),
self.event_loop
)
6. 插件系统
插件生命周期:
class MyPlugin(PluginBase):
def __init__(self):
super().__init__()
# 初始化成员变量
async def async_init(self):
# 异步初始化 (加载配置等)
pass
async def on_enable(self, bot=None):
# 启用时调用 (注册定时任务)
pass
async def on_disable(self):
# 禁用时调用 (清理资源)
pass
事件处理:
@on_text_message()
async def handle_message(self, bot, message: dict):
content = message.get("Content", "")
from_wxid = message.get("FromWxid", "")
# 处理消息
await bot.send_text(from_wxid, "回复内容")
常见问题
Q1: 为什么必须使用 32 位 Python?
A: Loader.dll 和 Helper.dll 是 32 位编译的,只能在 32 位 Python 中加载。
Q2: 为什么需要共享内存?
A: DLL 之间通过共享内存进行通信,必须在加载 Loader.dll 之前创建。
Q3: 为什么 Socket Client ID 和进程 ID 不同?
A:
InjectWeChat返回的是微信进程 ID- 回调中的
client_id是 Socket 连接 ID (从 1 开始) - 发送 API 请求时使用 Socket ID
Q4: 如何判断是群聊还是私聊?
A: 检查 room_wxid 字段,不为空则是群聊。
Q5: 插件配置为什么没有加载?
A: 使用 async_init 方法而不是 on_load。
Q6: 如何热重载插件?
A: 发送 /重载插件 插件名 命令。
Q7: 为什么收不到消息?
A: 检查以下几点:
- 共享内存是否创建成功
- Socket 客户端是否连接
- 消息类型是否在 MESSAGE_TYPE_MAP 中
- 消息是否被过滤 (白名单/黑名单)
Q8: 如何添加新的消息类型?
A:
- 在
message_types.py中添加常量 - 在
MESSAGE_TYPE_MAP中添加映射 - 在
normalize_message中处理特殊字段
性能优化
1. 消息处理
- 使用
asyncio.run_coroutine_threadsafe避免阻塞回调线程 - 深拷贝消息数据避免并发问题
2. 插件管理
- 按优先级排序事件处理器
- 支持插件返回 False 中断处理链
3. 错误处理
- 每个处理器独立 try-except
- 一个插件出错不影响其他插件
调试技巧
1. 查看回调数据
logger.info(f"[回调] 收到消息: type={msg_type}, data={msg_data}")
2. 查看消息处理
logger.info(f"收到消息: type={event_type}, from={from_wxid}, content={content}")
3. 查看插件加载
logger.info(f"插件 {plugin_name} 已加载")
4. 使用调试日志
logger.debug(f"详细信息: {variable}")
安全注意事项
- API 密钥: 不要将 API 密钥提交到版本控制
- 管理员权限: 只有管理员可以执行插件管理命令
- 消息过滤: 使用白名单/黑名单控制消息处理
- 错误处理: 捕获所有异常避免崩溃