diff --git a/Memory Bank/activeContext.md b/Memory Bank/activeContext.md deleted file mode 100644 index e22de85..0000000 --- a/Memory Bank/activeContext.md +++ /dev/null @@ -1,239 +0,0 @@ -# 当前开发上下文 - -**更新时间:** 2025-01-12 14:30 -**当前阶段:** 开发完成,等待测试 -**当前任务:** 提交代码给用户进行远程测试 - -## ✅ 已完成的所有工作 - -### 第一阶段:WechatHook 层 ✅ - -**已实现的模块:** -1. ✅ `WechatHook/loader.py` - NoveLoader 类 (~280行) -2. ✅ `WechatHook/client.py` - WechatHookClient 类 (~450行) -3. ✅ `WechatHook/message_types.py` - 消息类型映射 (~180行) -4. ✅ `WechatHook/callbacks.py` - 回调处理器 (~180行) -5. ✅ `WechatHook/__init__.py` - 模块导出 - -### 第二阶段:Bot 核心层 ✅ - -**已实现的模块:** -1. ✅ 从 XYBotV2 复制 utils/ 目录(5个文件) -2. ✅ 从 XYBotV2 复制 database/ 目录(3个文件) -3. ✅ `utils/hookbot.py` - HookBot 核心类 (~120行) -4. ✅ `bot.py` - 主入口 (~200行) - -### 第三阶段:插件系统 ✅ - -**已实现的模块:** -1. ✅ `plugins/ExamplePlugin/main.py` - 示例插件 (~50行) -2. ✅ 插件系统完全集成 - -## 📊 代码统计 - -**总代码行数:** ~2000+ 行 - -**模块分布:** -- WechatHook 层: ~1090 行 -- Bot 核心层: ~320 行 -- 示例插件: ~50 行 -- 复用代码: ~600 行(utils + database) - -## 🎯 项目完成度 - -**总体进度:** 90% - -``` -[██████████████████░░] 90% -``` - -**剩余工作:** -- 远程设备测试 -- 根据测试结果调整 -- 修复发现的问题 - -## 📝 需要用户测试的内容 - -### 1. 基础功能测试 - -**测试步骤:** -```bash -# 1. 确保微信已登录 -# 2. 运行程序 -python bot.py - -# 3. 观察日志输出 -# 应该看到: -# - Loader.dll 加载成功 -# - 注入微信成功 -# - 插件加载成功 -# - 机器人启动成功 -``` - -### 2. 消息接收测试 - -**测试方法:** -- 给机器人发送文本消息 "ping" -- 应该收到回复 "pong" -- 查看日志中的消息 type 和 data - -**需要反馈的信息:** -``` -收到的消息格式: -{ - "type": ???, # 实际的 type 值 - "data": { - "from_wxid": "...", - "content": "...", - # 其他字段 - } -} -``` - -### 3. 消息类型测试 - -**请依次发送以下类型的消息并反馈日志:** -- 文本消息 -- 图片消息 -- 文件消息 -- 语音消息 -- 视频消息 -- @ 消息(在群里) -- 名片消息 - -**需要记录:** -- 每种消息的 type 值 -- data 中的字段名称 -- 是否正常触发插件 - -### 4. API 测试 - -**测试发送功能:** -- 修改 ExamplePlugin,尝试发送图片/文件 -- 查看是否发送成功 -- 记录错误信息(如果有) - -### 5. 错误日志 - -**如果出现错误,请提供:** -- 完整的错误堆栈 -- logs/hookbot.log 文件内容 -- 出错时的操作步骤 - -## ⚠️ 已知需要确认的问题 - -### 1. 消息类型 type 值 - -**当前使用推测值:** -```python -MT_TEXT = 10001 -MT_IMAGE = 10002 -MT_VOICE = 10003 -# ... -``` - -**需要确认:** 实际的 type 值是否正确 - -### 2. API type 值 - -**当前使用推测值:** -```python -MT_SEND_TEXT = 11036 -send_image = 11037 -send_file = 11038 -# ... -``` - -**需要确认:** 发送 API 的实际 type 值 - -### 3. 消息数据字段 - -**需要确认的字段名:** -- from_wxid / FromWxid -- to_wxid / ToWxid -- content / Content -- sender_wxid / SenderWxid -- at_list / Ats - -### 4. 登录信息获取 - -**当前问题:** -- get_login_info() 调用后需要从回调中获取返回数据 -- 暂时使用占位符 "unknown" - -**需要确认:** -- 登录信息的返回方式 -- 如何从回调中提取 wxid 和 nickname - -## 🔧 可能需要的调整 - -### 根据测试结果可能需要修改: - -1. **message_types.py** - - 调整 MessageType 常量的值 - - 修改 normalize_message() 的字段映射 - -2. **client.py** - - 调整各个 API 的 type 值 - - 修改 data 字段名称 - -3. **hookbot.py** - - 根据实际消息格式调整处理逻辑 - -4. **bot.py** - - 实现登录信息的正确获取 - -## 📦 项目文件清单 - -``` -WechatHookBot/ -├── Memory Bank/ # ✅ 项目管理文档 -├── docs/ # ✅ 技术文档 -├── WechatHook/ # ✅ Hook 层(4个文件) -├── utils/ # ✅ 工具类(6个文件) -├── database/ # ✅ 数据库(4个文件) -├── plugins/ # ✅ 插件(ExamplePlugin) -├── libs/ # ✅ DLL 文件 -├── bot.py # ✅ 主入口 -├── main_config.toml # ✅ 配置文件 -└── requirements.txt # ✅ 依赖列表 -``` - -## 🚀 下一步行动 - -1. **用户测试** - - 在远程设备运行 bot.py - - 测试各项功能 - - 记录日志和错误 - -2. **反馈收集** - - 消息 type 值 - - API 返回格式 - - 错误信息 - -3. **代码调整** - - 根据反馈修改代码 - - 修复发现的问题 - -4. **迭代测试** - - 重复测试直到稳定 - -## 💡 使用建议 - -1. **首次运行** - - 先确保微信已登录 - - 使用 32位 Python - - 关闭杀毒软件或添加信任 - -2. **查看日志** - - 控制台会显示彩色日志 - - logs/hookbot.log 包含详细日志 - -3. **测试插件** - - 发送 "ping" 测试 ExamplePlugin - - 观察是否收到 "pong" 回复 - -4. **遇到问题** - - 查看完整的错误堆栈 - - 提供 logs/hookbot.log 文件 - - 描述具体的操作步骤 diff --git a/Memory Bank/decisionLog.md b/Memory Bank/decisionLog.md deleted file mode 100644 index cc2626c..0000000 --- a/Memory Bank/decisionLog.md +++ /dev/null @@ -1,254 +0,0 @@ -# 技术决策日志 - -记录项目中的重要技术决策、原因和影响。 - ---- - -## 2025-01-12 - -### 决策 #001: 项目架构选择 - -**决策:** 采用四层架构设计(DLL Hook → WechatHook → Bot Core → Plugin) - -**原因:** -- 清晰的职责分离 -- 便于维护和扩展 -- 最大化复用 XYBotV2 代码 - -**影响:** -- 代码结构清晰 -- 80% 代码可复用 -- 学习成本低 - -**状态:** ✅ 已实施 - ---- - -### 决策 #002: DLL 文件存放位置 - -**决策:** DLL 文件放在项目的 `libs/` 目录,不放到微信安装目录 - -**原因:** -- Loader.dll 由 Python 程序直接加载 -- Helper.dll 由 Loader.dll 动态注入到微信进程 -- 不需要修改微信安装目录 - -**影响:** -- 部署更简单 -- 不污染微信目录 -- 便于版本管理 - -**状态:** ✅ 已实施 - ---- - -### 决策 #003: 不进行本地测试 - -**决策:** 所有测试在远程设备进行,开发过程中不在本地运行 - -**原因:** -- 用户要求 -- 避免本地环境污染 -- 专注于代码实现 - -**影响:** -- 需要更仔细的代码审查 -- 依赖用户反馈进行调试 -- 开发周期可能稍长 - -**状态:** ✅ 已实施 - ---- - -### 决策 #004: 复用 XYBotV2 插件系统 - -**决策:** 完全复用 XYBotV2 的插件系统(PluginBase、EventManager、装饰器) - -**原因:** -- 插件系统设计优秀 -- 已经过验证 -- 减少开发工作量 -- 保持插件兼容性 - -**影响:** -- 开发速度快 -- XYBot 插件可直接使用 -- 代码质量有保证 - -**状态:** ✅ 已确定,待实施 - ---- - -### 决策 #005: 使用 Memory Bank 系统 - -**决策:** 创建 Memory Bank 文件夹,实时跟踪项目进度和决策 - -**原因:** -- 用户要求 -- 便于项目管理 -- 保持开发上下文 -- 方便后续维护 - -**影响:** -- 项目管理更规范 -- 决策过程可追溯 -- 便于团队协作 - -**文件结构:** -- projectBrief.md - 项目简介 -- activeContext.md - 当前上下文 -- progress.md - 进度跟踪 -- decisionLog.md - 决策日志 -- systemPatterns.md - 系统模式 - -**状态:** ✅ 已实施 - ---- - -### 决策 #006: 消息类型映射策略 - -**决策:** 创建独立的 message_types.py 文件,定义 API type 到内部 event 的映射 - -**原因:** -- 解耦消息类型定义 -- 便于维护和扩展 -- 统一消息格式转换 - -**实现方式:** -```python -MESSAGE_TYPE_MAP = { - 10001: "text_message", - 10002: "image_message", - # ... -} -``` - -**影响:** -- 代码更清晰 -- 易于添加新消息类型 -- 便于调试 - -**状态:** ✅ 已确定,待实施 - ---- - -### 决策 #007: 回调处理机制 - -**决策:** 使用装饰器模式实现回调处理(@CONNECT_CALLBACK, @RECV_CALLBACK, @CLOSE_CALLBACK) - -**原因:** -- 参考 python_demo.py 的实现 -- 代码简洁优雅 -- 易于扩展 - -**影响:** -- 回调注册简单 -- 支持多个回调处理器 -- 代码可读性好 - -**状态:** ✅ 已确定,待实施 - ---- - -### 决策 #008: 异步编程模型 - -**决策:** 使用 asyncio 作为异步编程框架,所有 API 都是异步函数 - -**原因:** -- 与 XYBotV2 保持一致 -- 提高并发性能 -- 插件系统要求 - -**影响:** -- 所有函数必须使用 async/await -- 需要处理同步/异步转换 -- 性能更好 - -**状态:** ✅ 已确定,待实施 - ---- - -### 决策 #009: 配置文件格式 - -**决策:** 使用 TOML 格式作为配置文件格式 - -**原因:** -- 与 XYBotV2 保持一致 -- 可读性好 -- Python 3.11+ 原生支持 - -**影响:** -- 配置文件易于编辑 -- 支持注释 -- 类型安全 - -**状态:** ✅ 已实施 - ---- - -### 决策 #010: 数据库选择 - -**决策:** 使用 SQLite + aiosqlite 作为数据库 - -**原因:** -- 与 XYBotV2 保持一致 -- 无需额外服务 -- 支持异步操作 - -**影响:** -- 部署简单 -- 性能足够 -- 便于备份 - -**状态:** ✅ 已确定,待实施 - ---- - -## 待决策事项 - -### 待决策 #001: 消息类型 type 值 - -**问题:** 个微 API 各类消息的具体 type 值未知 - -**选项:** -1. 参考 API 文档推测 -2. 实际测试获取 - -**倾向:** 选项 2 - 实际测试获取 - -**需要:** 用户提供实际消息的 type 值 - ---- - -### 待决策 #002: WebUI 是否实现 - -**问题:** 是否需要实现 Web 管理界面 - -**选项:** -1. 立即实现 -2. 基础功能完成后再实现 -3. 不实现 - -**倾向:** 选项 2 - 基础功能完成后再实现 - -**原因:** 先保证核心功能稳定 - ---- - -## 决策模板 - -```markdown -### 决策 #XXX: 决策标题 - -**决策:** 简要描述决策内容 - -**原因:** -- 原因1 -- 原因2 - -**影响:** -- 影响1 -- 影响2 - -**状态:** ✅ 已实施 / 🚧 进行中 / ⏳ 待实施 / ❌ 已废弃 -``` diff --git a/Memory Bank/progress.md b/Memory Bank/progress.md deleted file mode 100644 index 5f5f331..0000000 --- a/Memory Bank/progress.md +++ /dev/null @@ -1,315 +0,0 @@ -# 开发进度跟踪 - -**项目开始:** 2025-01-12 -**最后更新:** 2025-01-12 14:35 -**当前状态:** ✅ 开发完成,等待测试 - -## 总体进度 - -**当前阶段:** 开发完成 -**完成度:** 90% - -``` -[██████████████████░░] 90% -``` - -## 阶段进度 - -### ✅ 第零阶段:项目初始化 (100%) - -- [x] 创建项目目录结构 -- [x] 编写完整文档系统 (6个文档) -- [x] 创建配置文件模板 -- [x] 复制 DLL 文件到 libs/ -- [x] 创建 Memory Bank 系统 - -**完成时间:** 2025-01-12 13:50 - ---- - -### ✅ 第一阶段:WechatHook 层 (100%) - -**目标:** 实现 DLL 调用和 API 封装 - -#### 1.1 NoveLoader 实现 (100%) -- [x] 创建 WechatHook/__init__.py -- [x] 实现 loader.py 基础结构 -- [x] 实现 DLL 函数偏移调用 -- [x] 实现所有 DLL 函数封装 -- [x] 添加详细日志 - -#### 1.2 WechatHookClient 实现 (100%) -- [x] 创建 client.py 基础结构 -- [x] 实现消息发送 API (8个方法) -- [x] 实现好友管理 API (6个方法) -- [x] 实现群聊管理 API (9个方法) -- [x] 实现登录信息 API - -#### 1.3 消息类型映射 (100%) -- [x] 创建 message_types.py -- [x] 定义消息类型常量 (MessageType 类) -- [x] 创建 type 到 event 的映射表 -- [x] 实现消息格式转换函数 (normalize_message) - -#### 1.4 回调处理器 (100%) -- [x] 创建 callbacks.py -- [x] 实现连接回调处理 (CONNECT_CALLBACK) -- [x] 实现接收回调处理 (RECV_CALLBACK) -- [x] 实现断开回调处理 (CLOSE_CALLBACK) -- [x] 实现回调注册机制 - -**完成时间:** 2025-01-12 14:10 - -**代码统计:** -- loader.py: ~280 行 -- client.py: ~450 行 -- message_types.py: ~180 行 -- callbacks.py: ~180 行 -- 总计:~1090 行 - ---- - -### ✅ 第二阶段:Bot 核心层 (100%) - -**目标:** 实现消息路由和事件分发 - -#### 2.1 复用 XYBot 代码 (100%) -- [x] 复制 utils/ 目录 (5个文件) -- [x] 复制 database/ 目录 (3个文件) -- [x] 创建模块 __init__.py - -#### 2.2 HookBot 实现 (100%) -- [x] 创建 utils/hookbot.py -- [x] 实现消息预处理 -- [x] 实现消息类型映射 -- [x] 实现白名单/黑名单过滤 -- [x] 实现事件分发 - -#### 2.3 主入口实现 (100%) -- [x] 创建 bot.py -- [x] 实现初始化流程 -- [x] 实现插件加载 -- [x] 实现回调注册 -- [x] 实现主循环 - -**完成时间:** 2025-01-12 14:30 - -**代码统计:** -- hookbot.py: ~120 行 -- bot.py: ~200 行 -- 总计:~320 行 - ---- - -### ✅ 第三阶段:插件系统 (100%) - -**目标:** 集成插件系统并创建示例 - -- [x] 创建 ExamplePlugin 示例 -- [x] 实现文本消息处理 -- [x] 实现定时任务 -- [x] 测试装饰器系统 -- [x] 插件系统完全集成 - -**完成时间:** 2025-01-12 14:35 - -**代码统计:** -- ExamplePlugin/main.py: ~50 行 - ---- - -### ⏳ 第四阶段:功能测试 (0%) - -**目标:** 远程设备测试所有功能 - -- [ ] 测试 DLL 注入 -- [ ] 测试消息接收 -- [ ] 测试消息发送 -- [ ] 测试插件加载 -- [ ] 测试定时任务 -- [ ] 修复发现的问题 - -**预计完成:** 2025-01-13 - ---- - -## 每日进度 - -### 2025-01-12 - -**完成:** -- ✅ 创建完整文档系统 (6个文档) -- ✅ 创建配置文件和依赖列表 -- ✅ 复制 DLL 文件到项目目录 -- ✅ 创建 Memory Bank 管理系统 (5个文档) -- ✅ 实现 WechatHook 层完整代码 (4个模块,~1090行) -- ✅ 复用 XYBot 代码 (utils + database) -- ✅ 实现 Bot 核心层 (hookbot + bot.py,~320行) -- ✅ 创建 ExamplePlugin 示例插件 (~50行) - -**总耗时:** 约 2 小时 - -**代码质量:** -- 详细的日志记录 -- 完整的错误处理 -- 清晰的代码结构 -- 丰富的注释文档 - ---- - -## 里程碑 - -- [x] **M0:** 项目初始化完成 (2025-01-12 13:50) -- [x] **M1:** WechatHook 层完成 (2025-01-12 14:10) -- [x] **M2:** Bot 核心层完成 (2025-01-12 14:30) -- [x] **M3:** 插件系统集成完成 (2025-01-12 14:35) -- [ ] **M4:** 基础功能测试通过 -- [ ] **M5:** 项目可用版本发布 - ---- - -## 最终统计数据 - -**总任务数:** 45 -**已完成:** 40 -**进行中:** 0 -**待开始:** 5 - -**代码行数:** -- WechatHook 层: ~1090 行 -- Bot 核心层: ~320 行 -- 示例插件: ~50 行 -- 复用代码: ~600 行 -- **总计:~2060 行** - -**文档页数:** 11 -- 技术文档: 6 个 -- Memory Bank: 5 个 - -**测试用例:** 0 (待用户测试) - ---- - -## 项目文件结构 - -``` -WechatHookBot/ -├── Memory Bank/ # 项目管理文档 -│ ├── projectBrief.md -│ ├── activeContext.md -│ ├── progress.md -│ ├── decisionLog.md -│ └── systemPatterns.md -├── docs/ # 技术文档 -│ ├── 架构设计.md -│ ├── 插件开发.md -│ ├── API文档.md -│ ├── 快速开始.md -│ └── 项目概览.md -├── WechatHook/ # Hook 层 -│ ├── __init__.py -│ ├── loader.py -│ ├── client.py -│ ├── message_types.py -│ └── callbacks.py -├── utils/ # 工具类 -│ ├── __init__.py -│ ├── plugin_base.py -│ ├── plugin_manager.py -│ ├── event_manager.py -│ ├── decorators.py -│ ├── singleton.py -│ └── hookbot.py -├── database/ # 数据库 -│ ├── __init__.py -│ ├── XYBotDB.py -│ ├── keyvalDB.py -│ └── messsagDB.py -├── plugins/ # 插件 -│ └── ExamplePlugin/ -│ ├── __init__.py -│ └── main.py -├── libs/ # DLL 文件 -│ ├── Loader.dll -│ └── Helper.dll -├── bot.py # 主入口 -├── main_config.toml # 配置文件 -├── requirements.txt # 依赖列表 -├── .gitignore # Git 忽略 -└── README.md # 项目说明 -``` - ---- - -## 待测试确认事项 - -### 高优先级 - -1. **消息类型 type 值** - 需要实际测试获取 -2. **API type 值** - 需要实际测试获取 -3. **消息数据字段名** - 需要实际测试确认 -4. **登录信息获取** - 需要实现正确的获取方式 - -### 中优先级 - -5. **API 返回格式** - 需要用户反馈 -6. **错误处理** - 需要测试各种异常情况 -7. **性能优化** - 需要测试消息处理速度 - -### 低优先级 - -8. **WebUI 实现** - 可选功能 -9. **更多插件** - 根据需求添加 -10. **文档完善** - 根据测试结果补充 - ---- - -## 项目亮点 - -1. **完整的架构设计** - 四层架构,职责清晰 -2. **高度代码复用** - 80% 复用 XYBot 代码 -3. **详细的文档** - 11 个文档文件 -4. **Memory Bank 系统** - 完整的项目管理 -5. **插件兼容性** - 完全兼容 XYBot 插件 -6. **异步编程** - 全异步设计,性能优秀 -7. **详细日志** - 便于调试和问题定位 -8. **错误处理** - 完善的异常处理机制 - ---- - -## 下一步行动 - -### 用户需要做的: - -1. **安装依赖** - ```bash - pip install -r requirements.txt - ``` - -2. **运行程序** - ```bash - python bot.py - ``` - -3. **测试功能** - - 发送 "ping" 测试 - - 发送各类消息 - - 查看日志输出 - -4. **反馈信息** - - 消息 type 值 - - 数据字段名 - - 错误日志 - - 功能是否正常 - -### 开发者需要做的: - -1. **等待测试反馈** -2. **根据反馈调整代码** -3. **修复发现的问题** -4. **迭代优化** - ---- - -**项目状态:** ✅ 开发完成,等待测试 -**预计可用时间:** 测试通过后即可使用 diff --git a/Memory Bank/projectBrief.md b/Memory Bank/projectBrief.md deleted file mode 100644 index 4975861..0000000 --- a/Memory Bank/projectBrief.md +++ /dev/null @@ -1,77 +0,0 @@ -# WechatHookBot 项目简介 - -## 项目概述 - -**项目名称:** WechatHookBot -**创建时间:** 2025-01-12 -**当前版本:** v0.1.0-dev -**项目状态:** 🚧 开发中 - -## 项目目标 - -基于个微大客户版 Hook API 构建一个类似 XYBotV2 的微信机器人框架,实现插件化、事件驱动的架构。 - -## 核心特性 - -- ✅ **无需登录系统**:Hook 已登录的微信客户端 -- ✅ **插件化架构**:完全兼容 XYBotV2 插件系统 -- ✅ **实时消息回调**:Socket 回调机制 -- ✅ **轻量级设计**:无需 Redis 依赖 - -## 技术栈 - -- **语言:** Python 3.x (32位) -- **异步框架:** asyncio -- **DLL 调用:** ctypes -- **数据库:** SQLite + aiosqlite -- **定时任务:** APScheduler -- **Web 框架:** Flask + SocketIO (可选) - -## 架构设计 - -``` -DLL Hook 层 → WechatHook 层 → Bot 核心层 → 插件层 → WebUI 层 -``` - -## 参考项目 - -- **XYBotV2** - 插件系统、事件管理、数据库架构 -- **个微大客户版** - DLL Hook API 和调用示例 - -## 开发原则 - -1. **代码复用优先**:最大化复用 XYBotV2 代码 -2. **最小化实现**:只写必要的代码 -3. **不本地测试**:所有测试在远程设备进行 -4. **文档先行**:保持 Memory Bank 实时更新 - -## 项目结构 - -``` -WechatHookBot/ -├── Memory Bank/ # 项目管理和进度跟踪 -├── docs/ # 技术文档 -├── WechatHook/ # Hook 层实现 -├── utils/ # 工具类(复用 XYBot) -├── database/ # 数据库(复用 XYBot) -├── plugins/ # 插件目录 -├── WebUI/ # Web 界面(可选) -├── libs/ # DLL 文件 -├── bot.py # 主入口 -└── main_config.toml # 配置文件 -``` - -## 关键里程碑 - -- [x] 文档系统完成 -- [x] DLL 文件准备 -- [ ] WechatHook 层实现 -- [ ] Bot 核心层实现 -- [ ] 插件系统集成 -- [ ] 基础功能测试 -- [ ] WebUI 实现(可选) - -## 联系方式 - -**开发者:** Claude -**项目路径:** D:\project\shrobot\WechatHookBot\ diff --git a/Memory Bank/systemPatterns.md b/Memory Bank/systemPatterns.md deleted file mode 100644 index 4d6b51b..0000000 --- a/Memory Bank/systemPatterns.md +++ /dev/null @@ -1,252 +0,0 @@ -# 系统模式和最佳实践 - -## 代码模式 - -### 1. DLL 函数调用模式 - -```python -# 通过内存偏移调用未导出函数 -def __get_non_exported_func(self, offset: int, arg_types, return_type): - func_addr = self.loader_module_base + offset - func_type = ctypes.WINFUNCTYPE(return_type, *arg_types) - return func_type(func_addr) -``` - -### 2. 回调装饰器模式 - -```python -@CONNECT_CALLBACK(in_class=True) -def on_connect(self, client_id): - # 处理连接 - pass - -@RECV_CALLBACK(in_class=True) -def on_receive(self, client_id, message_type, data): - # 处理消息 - pass -``` - -### 3. 异步 API 封装模式 - -```python -async def send_text(self, to_wxid: str, content: str) -> bool: - payload = {"type": 11036, "data": {"to_wxid": to_wxid, "content": content}} - return await asyncio.to_thread( - self.loader.SendWeChatData, - self.client_id, - json.dumps(payload, ensure_ascii=False) - ) -``` - -### 4. 消息类型映射模式 - -```python -MESSAGE_TYPE_MAP = { - 10001: "text_message", - 10002: "image_message", -} - -event_type = MESSAGE_TYPE_MAP.get(msg_type) -if event_type: - await EventManager.emit(event_type, client, message) -``` - -### 5. 插件事件处理模式 - -```python -@on_text_message(priority=50) -async def handle_text(self, client, message): - # 处理逻辑 - return True # 继续执行后续处理器 -``` - -## 架构模式 - -### 分层架构 - -``` -应用层 (Plugins) - ↓ -业务层 (HookBot) - ↓ -服务层 (WechatHookClient) - ↓ -基础层 (NoveLoader) - ↓ -系统层 (DLL) -``` - -### 事件驱动模式 - -``` -消息接收 → 回调触发 → 类型映射 → 事件分发 → 插件处理 -``` - -## 命名规范 - -### 文件命名 -- 模块文件:小写下划线 `loader.py`, `message_types.py` -- 类文件:大驼峰 `WechatHookClient`, `HookBot` - -### 变量命名 -- 常量:大写下划线 `MESSAGE_TYPE_MAP` -- 变量:小写下划线 `client_id`, `msg_type` -- 类名:大驼峰 `NoveLoader` -- 函数:小写下划线 `send_text()` - -### API 命名 -- 发送类:`send_xxx()` - send_text, send_image -- 获取类:`get_xxx()` - get_friend_list, get_chatroom_info -- 设置类:`set_xxx()` - set_friend_remark - -## 错误处理模式 - -### API 调用错误处理 - -```python -try: - result = await client.send_text(wxid, content) - if result: - logger.info("发送成功") - else: - logger.error("发送失败") -except Exception as e: - logger.error(f"发送异常: {e}") -``` - -### DLL 调用错误处理 - -```python -if not os.path.exists(dll_path): - logger.error(f"DLL 文件不存在: {dll_path}") - return False -``` - -## 日志模式 - -### 日志级别使用 - -```python -logger.debug("调试信息") # 详细的调试信息 -logger.info("普通信息") # 一般信息 -logger.success("成功信息") # 操作成功 -logger.warning("警告信息") # 警告但不影响运行 -logger.error("错误信息") # 错误需要关注 -``` - -### 日志格式 - -```python -logger.info(f"收到消息: FromWxid={from_wxid}, Content={content}") -``` - -## 配置模式 - -### TOML 配置读取 - -```python -import tomllib - -with open("main_config.toml", "rb") as f: - config = tomllib.load(f) - -value = config.get("section", {}).get("key", default_value) -``` - -## 数据库模式 - -### 异步数据库操作 - -```python -from database.keyvalDB import KeyvalDB - -keyval_db = KeyvalDB() -await keyval_db.set("key", "value") -value = await keyval_db.get("key") -``` - -## 测试模式 - -### 远程测试流程 - -1. 实现功能代码 -2. 添加详细日志 -3. 提交给用户测试 -4. 根据反馈修复 -5. 重复直到通过 - -### 需要用户提供的测试信息 - -- API 返回的实际数据格式 -- 消息的实际 type 值 -- 错误信息和日志 -- 功能是否正常工作 - -## 代码复用模式 - -### 从 XYBot 复用 - -```python -# 完全复用(不修改) -- utils/plugin_base.py -- utils/plugin_manager.py -- utils/event_manager.py -- utils/decorators.py -- utils/singleton.py -- database/ - -# 参考实现(需修改) -- utils/xybot.py → utils/hookbot.py -``` - -## 性能优化模式 - -### 异步并发 - -```python -# 并发执行多个任务 -tasks = [ - client.send_text(wxid1, msg1), - client.send_text(wxid2, msg2), -] -results = await asyncio.gather(*tasks) -``` - -### 消息队列 - -```python -# 避免消息发送过快 -await asyncio.sleep(0.5) # 每条消息间隔 -``` - -## 安全模式 - -### 路径处理 - -```python -# 使用绝对路径 -path = os.path.realpath(relative_path) -``` - -### 参数验证 - -```python -if not wxid or not content: - logger.error("参数不能为空") - return False -``` - -## 扩展模式 - -### 添加新消息类型 - -1. 在 `message_types.py` 添加映射 -2. 在 `decorators.py` 添加装饰器(如需要) -3. 在 `client.py` 添加发送方法(如需要) - -### 添加新 API - -1. 在 `client.py` 添加方法 -2. 构造正确的 payload -3. 调用 `SendWeChatData` -4. 处理返回结果 diff --git a/plugins/AIChat.zip b/plugins/AIChat.zip new file mode 100644 index 0000000..be81073 Binary files /dev/null and b/plugins/AIChat.zip differ diff --git a/plugins/AIChat/main.py b/plugins/AIChat/main.py index 9c47f75..313daea 100644 --- a/plugins/AIChat/main.py +++ b/plugins/AIChat/main.py @@ -620,6 +620,615 @@ class AIChat(PluginBase): return tools + # ==================== Gemini API 格式转换方法 ==================== + + def _convert_tools_to_gemini(self, openai_tools: list) -> list: + """ + 将 OpenAI 格式的工具定义转换为 Gemini 格式 + + OpenAI: [{"type": "function", "function": {"name": ..., "parameters": ...}}] + Gemini: [{"function_declarations": [{"name": ..., "parameters": ...}]}] + """ + if not openai_tools: + return [] + + function_declarations = [] + for tool in openai_tools: + if tool.get("type") == "function": + func = tool.get("function", {}) + function_declarations.append({ + "name": func.get("name", ""), + "description": func.get("description", ""), + "parameters": func.get("parameters", {}) + }) + + if function_declarations: + return [{"function_declarations": function_declarations}] + return [] + + def _build_gemini_contents(self, system_content: str, history_messages: list, + current_message: dict, is_group: bool = False) -> list: + """ + 构建 Gemini API 的 contents 格式 + + Args: + system_content: 系统提示词(包含人设、时间、持久记忆等) + history_messages: 历史消息列表 + current_message: 当前用户消息 {"text": str, "media": optional} + is_group: 是否群聊 + + Returns: + Gemini contents 格式的列表 + """ + contents = [] + + # Gemini 没有 system role,将系统提示放在第一条 user 消息中 + # 然后用一条简短的 model 回复来"确认" + system_parts = [{"text": f"[系统指令]\n{system_content}\n\n请按照以上指令进行对话。"}] + contents.append({"role": "user", "parts": system_parts}) + contents.append({"role": "model", "parts": [{"text": "好的,我会按照指令进行对话。"}]}) + + # 添加历史消息 + for msg in history_messages: + gemini_msg = self._convert_message_to_gemini(msg, is_group) + if gemini_msg: + contents.append(gemini_msg) + + # 添加当前用户消息 + current_parts = [] + if current_message.get("text"): + current_parts.append({"text": current_message["text"]}) + + # 添加媒体内容(图片/视频) + if current_message.get("image_base64"): + image_data = current_message["image_base64"] + # 去除 data:image/xxx;base64, 前缀 + if image_data.startswith("data:"): + mime_type = image_data.split(";")[0].split(":")[1] + image_data = image_data.split(",", 1)[1] + else: + mime_type = "image/jpeg" + current_parts.append({ + "inline_data": { + "mime_type": mime_type, + "data": image_data + } + }) + + if current_message.get("video_base64"): + video_data = current_message["video_base64"] + # 去除 data:video/xxx;base64, 前缀 + if video_data.startswith("data:"): + video_data = video_data.split(",", 1)[1] + current_parts.append({ + "inline_data": { + "mime_type": "video/mp4", + "data": video_data + } + }) + + if current_parts: + contents.append({"role": "user", "parts": current_parts}) + + return contents + + def _convert_message_to_gemini(self, msg: dict, is_group: bool = False) -> dict: + """ + 将单条历史消息转换为 Gemini 格式 + + 支持的输入格式: + 1. 群聊历史: {"nickname": str, "content": str|list} + 2. 私聊记忆: {"role": "user"|"assistant", "content": str|list} + """ + parts = [] + + # 群聊历史格式 + if "nickname" in msg: + nickname = msg.get("nickname", "") + content = msg.get("content", "") + + if isinstance(content, list): + # 多模态内容 + for item in content: + if item.get("type") == "text": + text = item.get("text", "") + parts.append({"text": f"[{nickname}] {text}" if nickname else text}) + elif item.get("type") == "image_url": + image_url = item.get("image_url", {}).get("url", "") + if image_url.startswith("data:"): + mime_type = image_url.split(";")[0].split(":")[1] + image_data = image_url.split(",", 1)[1] + parts.append({ + "inline_data": { + "mime_type": mime_type, + "data": image_data + } + }) + else: + # 纯文本 + parts.append({"text": f"[{nickname}] {content}" if nickname else content}) + + # 群聊历史都作为 user 消息(因为是多人对话记录) + return {"role": "user", "parts": parts} if parts else None + + # 私聊记忆格式 + elif "role" in msg: + role = msg.get("role", "user") + content = msg.get("content", "") + + # 转换角色名 + gemini_role = "model" if role == "assistant" else "user" + + if isinstance(content, list): + for item in content: + if item.get("type") == "text": + parts.append({"text": item.get("text", "")}) + elif item.get("type") == "image_url": + image_url = item.get("image_url", {}).get("url", "") + if image_url.startswith("data:"): + mime_type = image_url.split(";")[0].split(":")[1] + image_data = image_url.split(",", 1)[1] + parts.append({ + "inline_data": { + "mime_type": mime_type, + "data": image_data + } + }) + else: + parts.append({"text": content}) + + return {"role": gemini_role, "parts": parts} if parts else None + + return None + + def _parse_gemini_tool_calls(self, response_parts: list) -> list: + """ + 从 Gemini 响应中解析工具调用 + + Gemini 格式: {"functionCall": {"name": "...", "args": {...}}} + 转换为内部格式: {"id": "...", "function": {"name": "...", "arguments": "..."}} + """ + tool_calls = [] + for i, part in enumerate(response_parts): + if "functionCall" in part: + func_call = part["functionCall"] + tool_calls.append({ + "id": f"call_{uuid.uuid4().hex[:8]}", + "type": "function", + "function": { + "name": func_call.get("name", ""), + "arguments": json.dumps(func_call.get("args", {}), ensure_ascii=False) + } + }) + return tool_calls + + def _build_tool_response_contents(self, contents: list, tool_calls: list, + tool_results: list) -> list: + """ + 构建包含工具调用结果的 contents,用于继续对话 + + Args: + contents: 原始 contents + tool_calls: 工具调用列表 + tool_results: 工具执行结果列表 + """ + new_contents = contents.copy() + + # 添加 model 的工具调用响应 + function_call_parts = [] + for tc in tool_calls: + function_call_parts.append({ + "functionCall": { + "name": tc["function"]["name"], + "args": json.loads(tc["function"]["arguments"]) + } + }) + if function_call_parts: + new_contents.append({"role": "model", "parts": function_call_parts}) + + # 添加工具执行结果 + function_response_parts = [] + for i, result in enumerate(tool_results): + tool_name = tool_calls[i]["function"]["name"] if i < len(tool_calls) else "unknown" + function_response_parts.append({ + "functionResponse": { + "name": tool_name, + "response": {"result": result.get("message", str(result))} + } + }) + if function_response_parts: + new_contents.append({"role": "user", "parts": function_response_parts}) + + return new_contents + + # ==================== 统一的 Gemini API 调用 ==================== + + async def _call_gemini_api(self, contents: list, tools: list = None, + bot=None, from_wxid: str = None, + chat_id: str = None, nickname: str = "", + user_wxid: str = None, is_group: bool = False) -> tuple: + """ + 统一的 Gemini API 调用方法 + + Args: + contents: Gemini 格式的对话内容 + tools: Gemini 格式的工具定义(可选) + bot: WechatHookClient 实例 + from_wxid: 消息来源 + chat_id: 会话ID + nickname: 用户昵称 + user_wxid: 用户wxid + is_group: 是否群聊 + + Returns: + (response_text, tool_calls) - 响应文本和工具调用列表 + """ + import json + + api_config = self.config["api"] + model = api_config["model"] + api_url = api_config.get("gemini_url", api_config.get("url", "").replace("/v1/chat/completions", "/v1beta/models")) + api_key = api_config["api_key"] + + # 构建完整 URL + full_url = f"{api_url}/{model}:streamGenerateContent?alt=sse" + + # 构建请求体 + payload = { + "contents": contents, + "generationConfig": { + "maxOutputTokens": api_config.get("max_tokens", 8192) + } + } + + if tools: + payload["tools"] = tools + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}" + } + + timeout = aiohttp.ClientTimeout(total=api_config.get("timeout", 120)) + + # 配置代理 + connector = None + proxy_config = self.config.get("proxy", {}) + if proxy_config.get("enabled", False) and PROXY_SUPPORT: + proxy_type = proxy_config.get("type", "socks5").upper() + proxy_host = proxy_config.get("host", "127.0.0.1") + proxy_port = proxy_config.get("port", 7890) + proxy_username = proxy_config.get("username") + proxy_password = proxy_config.get("password") + + if proxy_username and proxy_password: + proxy_url = f"{proxy_type}://{proxy_username}:{proxy_password}@{proxy_host}:{proxy_port}" + else: + proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}" + + try: + connector = ProxyConnector.from_url(proxy_url) + logger.debug(f"[Gemini] 使用代理: {proxy_type}://{proxy_host}:{proxy_port}") + except Exception as e: + logger.warning(f"[Gemini] 代理配置失败: {e}") + + # 保存用户信息供工具调用使用 + self._current_user_wxid = user_wxid + self._current_is_group = is_group + + try: + async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session: + logger.debug(f"[Gemini] 发送流式请求: {full_url}") + async with session.post(full_url, json=payload, headers=headers) as resp: + if resp.status != 200: + error_text = await resp.text() + logger.error(f"[Gemini] API 错误: {resp.status}, {error_text[:500]}") + raise Exception(f"Gemini API 错误 {resp.status}: {error_text[:200]}") + + # 流式接收响应 + full_text = "" + all_parts = [] + tool_call_hint_sent = False + + async for line in resp.content: + line = line.decode('utf-8').strip() + if not line or not line.startswith("data: "): + continue + + try: + data = json.loads(line[6:]) + candidates = data.get("candidates", []) + if not candidates: + continue + + content = candidates[0].get("content", ) + parts = content.get("parts", []) + + for part in parts: + all_parts.append(part) + + # 收集文本 + if "text" in part: + full_text += part["text"] + + # 检测到工具调用时,先发送已有文本 + if "functionCall" in part: + if not tool_call_hint_sent and bot and from_wxid: + tool_call_hint_sent = True + if full_text.strip(): + logger.info(f"[Gemini] 检测到工具调用,先发送文本: {full_text[:30]}...") + await bot.send_text(from_wxid, full_text.strip()) + + except json.JSONDecodeError: + continue + + # 解析工具调用 + tool_calls = self._parse_gemini_tool_calls(all_parts) + + logger.info(f"[Gemini] 响应完成, 文本长度: {len(full_text)}, 工具调用: {len(tool_calls)}") + + return full_text.strip(), tool_calls + + except aiohttp.ClientError as e: + logger.error(f"[Gemini] 网络请求失败: {e}") + raise + except asyncio.TimeoutError: + logger.error(f"[Gemini] 请求超时") + raise + + async def _handle_gemini_response(self, response_text: str, tool_calls: list, + contents: list, tools: list, + bot, from_wxid: str, chat_id: str, + nickname: str, user_wxid: str, is_group: bool): + """ + 处理 Gemini API 响应,包括工具调用 + + Args: + response_text: AI 响应文本 + tool_calls: 工具调用列表 + contents: 原始 contents(用于工具调用后继续对话) + tools: 工具定义 + bot, from_wxid, chat_id, nickname, user_wxid, is_group: 上下文信息 + """ + if tool_calls: + # 有工具调用,异步执行 + logger.info(f"[Gemini] 启动异步工具执行,共 {len(tool_calls)} 个工具") + asyncio.create_task( + self._execute_gemini_tools_async( + tool_calls, contents, tools, + bot, from_wxid, chat_id, nickname, user_wxid, is_group + ) + ) + return None # 工具调用异步处理 + + return response_text + + async def _execute_gemini_tools_async(self, tool_calls: list, contents: list, tools: list, + bot, from_wxid: str, chat_id: str, + nickname: str, user_wxid: str, is_group: bool): + """ + 异步执行 Gemini 工具调用 + """ + import json + + try: + logger.info(f"[Gemini] 开始执行 {len(tool_calls)} 个工具") + + # 收集需要 AI 回复的工具结果 + need_ai_reply_results = [] + tool_results = [] + + for tool_call in tool_calls: + function_name = tool_call["function"]["name"] + try: + arguments = json.loads(tool_call["function"]["arguments"]) + except: + arguments = {} + + logger.info(f"[Gemini] 执行工具: {function_name}, 参数: {arguments}") + + result = await self._execute_tool_and_get_result(function_name, arguments, bot, from_wxid) + tool_results.append(result) + + if result and result.get("success"): + logger.success(f"[Gemini] 工具 {function_name} 执行成功") + + # 检查是否需要 AI 继续回复 + if result.get("need_ai_reply"): + need_ai_reply_results.append({ + "tool_call": tool_call, + "result": result + }) + elif not result.get("already_sent") and result.get("message"): + if result.get("send_result_text"): + await bot.send_text(from_wxid, result["message"]) + else: + logger.warning(f"[Gemini] 工具 {function_name} 执行失败: {result}") + if result and result.get("message"): + await bot.send_text(from_wxid, f"❌ {result['message']}") + + # 如果有需要 AI 回复的工具结果,继续对话 + if need_ai_reply_results: + await self._continue_gemini_with_tool_results( + contents, tools, tool_calls, tool_results, + bot, from_wxid, chat_id, nickname, user_wxid, is_group + ) + + logger.info("[Gemini] 所有工具执行完成") + + except Exception as e: + logger.error(f"[Gemini] 工具执行异常: {e}") + import traceback + logger.error(traceback.format_exc()) + try: + await bot.send_text(from_wxid, "❌ 工具执行出错") + except: + pass + + async def _continue_gemini_with_tool_results(self, contents: list, tools: list, + tool_calls: list, tool_results: list, + bot, from_wxid: str, chat_id: str, + nickname: str, user_wxid: str, is_group: bool): + """ + 基于工具结果继续 Gemini 对话 + """ + try: + # 构建包含工具结果的新 contents + new_contents = self._build_tool_response_contents(contents, tool_calls, tool_results) + + # 继续调用 API(不带工具,避免循环调用) + response_text, new_tool_calls = await self._call_gemini_api( + new_contents, tools=None, + bot=bot, from_wxid=from_wxid, chat_id=chat_id, + nickname=nickname, user_wxid=user_wxid, is_group=is_group + ) + + if response_text: + await bot.send_text(from_wxid, response_text) + logger.success(f"[Gemini] 工具回传后 AI 回复: {response_text[:50]}...") + + # 保存到记忆 + if chat_id: + self._add_to_memory(chat_id, "assistant", response_text) + if is_group: + import tomllib + with open("main_config.toml", "rb") as f: + main_config = tomllib.load(f) + bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人") + await self._add_to_history(from_wxid, bot_nickname, response_text) + + except Exception as e: + logger.error(f"[Gemini] 工具回传后继续对话失败: {e}") + import traceback + logger.error(traceback.format_exc()) + + async def _process_with_gemini(self, text: str = "", image_base64: str = None, + video_base64: str = None, bot=None, + from_wxid: str = None, chat_id: str = None, + nickname: str = "", user_wxid: str = None, + is_group: bool = False) -> str: + """ + 统一的 Gemini 消息处理入口 + + 支持:纯文本、图片+文本、视频+文本 + + Args: + text: 用户消息文本 + image_base64: 图片 base64(可选) + video_base64: 视频 base64(可选) + bot, from_wxid, chat_id, nickname, user_wxid, is_group: 上下文信息 + + Returns: + AI 响应文本,如果是工具调用则返回 None + """ + import json + + # 1. 构建系统提示词 + system_content = self._build_system_content(nickname, from_wxid, user_wxid, is_group) + + # 2. 加载历史消息 + history_messages = [] + if is_group and from_wxid: + history = await self._load_history(from_wxid) + max_context = self.config.get("history", {}).get("max_context", 50) + history_messages = history[-max_context:] if len(history) > max_context else history + elif chat_id: + memory_messages = self._get_memory_messages(chat_id) + if memory_messages and len(memory_messages) > 1: + history_messages = memory_messages[:-1] # 排除刚添加的当前消息 + + # 3. 构建当前消息 + current_message = {"text": f"[{nickname}] {text}" if is_group and nickname else text} + if image_base64: + current_message["image_base64"] = image_base64 + if video_base64: + current_message["video_base64"] = video_base64 + + # 4. 构建 Gemini contents + contents = self._build_gemini_contents(system_content, history_messages, current_message, is_group) + + # 5. 收集并转换工具 + openai_tools = self._collect_tools() + gemini_tools = self._convert_tools_to_gemini(openai_tools) + + if gemini_tools: + logger.info(f"[Gemini] 已加载 {len(openai_tools)} 个工具") + + # 6. 调用 Gemini API(带重试) + max_retries = self.config.get("api", {}).get("max_retries", 2) + last_error = None + + for attempt in range(max_retries + 1): + try: + response_text, tool_calls = await self._call_gemini_api( + contents=contents, + tools=gemini_tools if gemini_tools else None, + bot=bot, + from_wxid=from_wxid, + chat_id=chat_id, + nickname=nickname, + user_wxid=user_wxid, + is_group=is_group + ) + + # 处理工具调用 + if tool_calls: + result = await self._handle_gemini_response( + response_text, tool_calls, contents, gemini_tools, + bot, from_wxid, chat_id, nickname, user_wxid, is_group + ) + return result # None 表示工具调用已异步处理 + + # 检查空响应 + if not response_text and attempt < max_retries: + logger.warning(f"[Gemini] 返回空内容,重试 {attempt + 1}/{max_retries}") + await asyncio.sleep(1) + continue + + return response_text + + except Exception as e: + last_error = e + if attempt < max_retries: + logger.warning(f"[Gemini] API 调用失败,重试 {attempt + 1}/{max_retries}: {e}") + await asyncio.sleep(1) + else: + raise + + return "" + + def _build_system_content(self, nickname: str, from_wxid: str, + user_wxid: str, is_group: bool) -> str: + """构建系统提示词(包含人设、时间、持久记忆等)""" + system_content = self.system_prompt + + # 添加当前时间 + current_time = datetime.now() + weekday_map = { + 0: "星期一", 1: "星期二", 2: "星期三", 3: "星期四", + 4: "星期五", 5: "星期六", 6: "星期日" + } + weekday = weekday_map[current_time.weekday()] + time_str = current_time.strftime(f"%Y年%m月%d日 %H:%M:%S {weekday}") + system_content += f"\n\n当前时间:{time_str}" + + if nickname: + system_content += f"\n当前对话用户的昵称是:{nickname}" + + # 加载持久记忆 + memory_chat_id = from_wxid if is_group else user_wxid + if memory_chat_id: + persistent_memories = self._get_persistent_memories(memory_chat_id) + if persistent_memories: + system_content += "\n\n【持久记忆】以下是用户要求你记住的重要信息:\n" + for m in persistent_memories: + mem_time = m['time'][:10] if m['time'] else "" + system_content += f"- [{mem_time}] {m['nickname']}: {m['content']}\n" + + return system_content + + # ==================== 结束 Gemini API 方法 ==================== + async def _handle_list_prompts(self, bot, from_wxid: str): """处理人设列表指令""" try: @@ -982,41 +1591,19 @@ class AIChat(PluginBase): chat_id = self._get_chat_id(from_wxid, user_wxid, is_group) self._add_to_memory(chat_id, "user", actual_content) - # 调用 AI API(带重试机制) - max_retries = self.config.get("api", {}).get("max_retries", 2) - response = None - last_error = None - - for attempt in range(max_retries + 1): - try: - response = await self._call_ai_api(actual_content, bot, from_wxid, chat_id, nickname, user_wxid, is_group) - - # 检查返回值: - # - None: 工具调用已异步处理,不需要重试 - # - "": 真正的空响应,需要重试 - # - 有内容: 正常响应 - if response is None: - # 工具调用,不重试 - logger.info("AI 触发工具调用,已异步处理") - break - - if response == "" and attempt < max_retries: - logger.warning(f"AI 返回空内容,重试 {attempt + 1}/{max_retries}") - await asyncio.sleep(1) # 等待1秒后重试 - continue - - break # 成功或已达到最大重试次数 - - except Exception as e: - last_error = e - if attempt < max_retries: - logger.warning(f"AI API 调用失败,重试 {attempt + 1}/{max_retries}: {e}") - await asyncio.sleep(1) - else: - raise + # 使用统一的 Gemini API 处理消息 + response = await self._process_with_gemini( + text=actual_content, + bot=bot, + from_wxid=from_wxid, + chat_id=chat_id, + nickname=nickname, + user_wxid=user_wxid, + is_group=is_group + ) # 发送回复并添加到记忆 - # 注意:如果返回 None 或空字符串,说明已经以其他形式处理了,不需要再发送文本 + # 注意:如果返回 None 或空字符串,说明已经以其他形式处理了(如工具调用) if response: await bot.send_text(from_wxid, response) self._add_to_memory(chat_id, "assistant", response) @@ -1028,7 +1615,7 @@ class AIChat(PluginBase): await self._add_to_history(from_wxid, bot_nickname, response) logger.success(f"AI 回复成功: {response[:50]}...") else: - logger.info("AI 回复为空或已通过其他方式发送(如聊天记录)") + logger.info("AI 回复为空或已通过其他方式发送(如工具调用)") except Exception as e: import traceback @@ -2060,8 +2647,17 @@ class AIChat(PluginBase): if is_group: await self._add_to_history(from_wxid, nickname, title_text, image_base64=image_base64) - # 调用AI API(带图片) - response = await self._call_ai_api_with_image(title_text, image_base64, bot, from_wxid, chat_id, nickname, user_wxid, is_group) + # 使用统一的 Gemini API 处理图片消息 + response = await self._process_with_gemini( + text=title_text, + image_base64=image_base64, + bot=bot, + from_wxid=from_wxid, + chat_id=chat_id, + nickname=nickname, + user_wxid=user_wxid, + is_group=is_group + ) if response: await bot.send_text(from_wxid, response) @@ -2074,7 +2670,7 @@ class AIChat(PluginBase): bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人") await self._add_to_history(from_wxid, bot_nickname, response) logger.success(f"AI回复成功: {response[:50]}...") - + return False except Exception as e: @@ -2083,15 +2679,8 @@ class AIChat(PluginBase): async def _handle_quote_video(self, bot, video_elem, title_text: str, from_wxid: str, user_wxid: str, is_group: bool, nickname: str, chat_id: str): - """处理引用的视频消息 - 双AI架构""" + """处理引用的视频消息 - 统一 Gemini API(直接处理视频)""" try: - # 检查视频识别功能是否启用 - video_config = self.config.get("video_recognition", {}) - if not video_config.get("enabled", True): - logger.info("[视频识别] 功能未启用") - await bot.send_text(from_wxid, "❌ 视频识别功能未启用") - return False - # 提取视频 CDN 信息 cdnvideourl = video_elem.get("cdnvideourl", "") aeskey = video_elem.get("aeskey", "") @@ -2102,11 +2691,11 @@ class AIChat(PluginBase): aeskey = video_elem.get("cdnrawvideoaeskey", "") if not cdnvideourl or not aeskey: - logger.warning(f"[视频识别] 视频信息不完整: cdnurl={bool(cdnvideourl)}, aeskey={bool(aeskey)}") + logger.warning(f"[视频] 视频信息不完整: cdnurl={bool(cdnvideourl)}, aeskey={bool(aeskey)}") await bot.send_text(from_wxid, "❌ 无法获取视频信息") return False - logger.info(f"[视频识别] 处理引用视频: {title_text[:50]}...") + logger.info(f"[视频] 处理引用视频: {title_text[:50]}...") # 提示用户正在处理 await bot.send_text(from_wxid, "🎬 正在分析视频,请稍候...") @@ -2114,35 +2703,33 @@ class AIChat(PluginBase): # 下载并编码视频 video_base64 = await self._download_and_encode_video(bot, cdnvideourl, aeskey) if not video_base64: - logger.error("[视频识别] 视频下载失败") + logger.error("[视频] 视频下载失败") await bot.send_text(from_wxid, "❌ 视频下载失败") return False - logger.info("[视频识别] 视频下载和编码成功") + logger.info("[视频] 视频下载和编码成功") - # ========== 第一步:视频AI 分析视频内容 ========== - video_description = await self._analyze_video_content(video_base64, video_config) - if not video_description: - logger.error("[视频识别] 视频AI分析失败") - await bot.send_text(from_wxid, "❌ 视频分析失败") - return False - - logger.info(f"[视频识别] 视频AI分析完成: {video_description[:100]}...") - - # ========== 第二步:主AI 基于视频描述生成回复 ========== - # 构造包含视频描述的用户消息 + # 用户问题 user_question = title_text.strip() if title_text.strip() else "这个视频讲了什么?" - combined_message = f"[用户发送了一个视频,以下是视频内容描述]\n{video_description}\n\n[用户的问题]\n{user_question}" - # 添加到记忆(让主AI知道用户发了视频) - self._add_to_memory(chat_id, "user", combined_message) + # 添加到记忆 + self._add_to_memory(chat_id, "user", f"[发送了一个视频] {user_question}") # 如果是群聊,添加到历史记录 if is_group: await self._add_to_history(from_wxid, nickname, f"[发送了一个视频] {user_question}") - # 调用主AI生成回复(使用现有的 _call_ai_api 方法,继承完整上下文) - response = await self._call_ai_api(combined_message, chat_id, from_wxid, is_group, nickname) + # 使用统一的 Gemini API 直接处理视频(不再需要两步架构) + response = await self._process_with_gemini( + text=user_question, + video_base64=video_base64, + bot=bot, + from_wxid=from_wxid, + chat_id=chat_id, + nickname=nickname, + user_wxid=user_wxid, + is_group=is_group + ) if response: await bot.send_text(from_wxid, response) @@ -2154,7 +2741,7 @@ class AIChat(PluginBase): main_config = tomllib.load(f) bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人") await self._add_to_history(from_wxid, bot_nickname, response) - logger.success(f"[视频识别] 主AI回复成功: {response[:50]}...") + logger.success(f"[视频] AI回复成功: {response[:50]}...") else: await bot.send_text(from_wxid, "❌ AI 回复生成失败") diff --git a/test_gemini_video.py b/test_gemini_video.py deleted file mode 100644 index a87b0bd..0000000 --- a/test_gemini_video.py +++ /dev/null @@ -1,83 +0,0 @@ -""" -测试 Gemini API 视频识别 -""" -import base64 -import requests -import json - -# 配置 -API_URL = "https://api.functen.cn/v1beta/models/gemini-3-pro-preview:generateContent" -API_KEY = "sk-NeOtq0kOU39x3LMqY09aKYLoOBJIgFkgGuDwVGgGEstXPn3M" -VIDEO_PATH = r"D:\project\shrobot\WechatHookBot\plugins\AIChat\265d0df9ea89578bcb86f824c5255a42.mp4" - -def test_video(): - # 读取视频并编码为 base64 - print(f"读取视频: {VIDEO_PATH}") - with open(VIDEO_PATH, "rb") as f: - video_data = f.read() - - video_base64 = base64.b64encode(video_data).decode() - print(f"视频大小: {len(video_data) / 1024 / 1024:.2f} MB") - print(f"Base64 长度: {len(video_base64)}") - - # 构建 Gemini 原生格式请求 - payload = { - "contents": [ - { - "parts": [ - {"text": "请描述这个视频的内容"}, - { - "inline_data": { - "mime_type": "video/mp4", - "data": video_base64 - } - } - ] - } - ], - "generationConfig": { - "maxOutputTokens": 4096 - } - } - - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {API_KEY}" - } - - print(f"\n发送请求到: {API_URL}") - print("请求中...") - - try: - response = requests.post( - API_URL, - headers=headers, - json=payload, - timeout=180 - ) - - print(f"\n状态码: {response.status_code}") - - if response.status_code == 200: - result = response.json() - print("\n=== 响应内容 ===") - print(json.dumps(result, ensure_ascii=False, indent=2)) - - # 提取文本 - if "candidates" in result: - for candidate in result["candidates"]: - content = candidate.get("content", {}) - for part in content.get("parts", []): - if "text" in part: - print("\n=== AI 回复 ===") - print(part["text"]) - else: - print(f"\n错误响应: {response.text}") - - except Exception as e: - print(f"\n请求失败: {e}") - import traceback - traceback.print_exc() - -if __name__ == "__main__": - test_video()