feta:api统一为Gemini格式

This commit is contained in:
2025-12-11 14:30:55 +08:00
parent e13be17a37
commit 908b5c8c07
8 changed files with 653 additions and 1286 deletions

View File

@@ -1,239 +0,0 @@
# 当前开发上下文
**更新时间:** 2025-01-12 14:30
**当前阶段:** 开发完成,等待测试
**当前任务:** 提交代码给用户进行远程测试
## ✅ 已完成的所有工作
### 第一阶段WechatHook 层 ✅
**已实现的模块:**
1.`WechatHook/loader.py` - NoveLoader 类 (~280行)
2.`WechatHook/client.py` - WechatHookClient 类 (~450行)
3.`WechatHook/message_types.py` - 消息类型映射 (~180行)
4.`WechatHook/callbacks.py` - 回调处理器 (~180行)
5.`WechatHook/__init__.py` - 模块导出
### 第二阶段Bot 核心层 ✅
**已实现的模块:**
1. ✅ 从 XYBotV2 复制 utils/ 目录5个文件
2. ✅ 从 XYBotV2 复制 database/ 目录3个文件
3.`utils/hookbot.py` - HookBot 核心类 (~120行)
4.`bot.py` - 主入口 (~200行)
### 第三阶段:插件系统 ✅
**已实现的模块:**
1.`plugins/ExamplePlugin/main.py` - 示例插件 (~50行)
2. ✅ 插件系统完全集成
## 📊 代码统计
**总代码行数:** ~2000+ 行
**模块分布:**
- WechatHook 层: ~1090 行
- Bot 核心层: ~320 行
- 示例插件: ~50 行
- 复用代码: ~600 行utils + database
## 🎯 项目完成度
**总体进度:** 90%
```
[██████████████████░░] 90%
```
**剩余工作:**
- 远程设备测试
- 根据测试结果调整
- 修复发现的问题
## 📝 需要用户测试的内容
### 1. 基础功能测试
**测试步骤:**
```bash
# 1. 确保微信已登录
# 2. 运行程序
python bot.py
# 3. 观察日志输出
# 应该看到:
# - Loader.dll 加载成功
# - 注入微信成功
# - 插件加载成功
# - 机器人启动成功
```
### 2. 消息接收测试
**测试方法:**
- 给机器人发送文本消息 "ping"
- 应该收到回复 "pong"
- 查看日志中的消息 type 和 data
**需要反馈的信息:**
```
收到的消息格式:
{
"type": ???, # 实际的 type 值
"data": {
"from_wxid": "...",
"content": "...",
# 其他字段
}
}
```
### 3. 消息类型测试
**请依次发送以下类型的消息并反馈日志:**
- 文本消息
- 图片消息
- 文件消息
- 语音消息
- 视频消息
- @ 消息(在群里)
- 名片消息
**需要记录:**
- 每种消息的 type 值
- data 中的字段名称
- 是否正常触发插件
### 4. API 测试
**测试发送功能:**
- 修改 ExamplePlugin尝试发送图片/文件
- 查看是否发送成功
- 记录错误信息(如果有)
### 5. 错误日志
**如果出现错误,请提供:**
- 完整的错误堆栈
- logs/hookbot.log 文件内容
- 出错时的操作步骤
## ⚠️ 已知需要确认的问题
### 1. 消息类型 type 值
**当前使用推测值:**
```python
MT_TEXT = 10001
MT_IMAGE = 10002
MT_VOICE = 10003
# ...
```
**需要确认:** 实际的 type 值是否正确
### 2. API type 值
**当前使用推测值:**
```python
MT_SEND_TEXT = 11036
send_image = 11037
send_file = 11038
# ...
```
**需要确认:** 发送 API 的实际 type 值
### 3. 消息数据字段
**需要确认的字段名:**
- from_wxid / FromWxid
- to_wxid / ToWxid
- content / Content
- sender_wxid / SenderWxid
- at_list / Ats
### 4. 登录信息获取
**当前问题:**
- get_login_info() 调用后需要从回调中获取返回数据
- 暂时使用占位符 "unknown"
**需要确认:**
- 登录信息的返回方式
- 如何从回调中提取 wxid 和 nickname
## 🔧 可能需要的调整
### 根据测试结果可能需要修改:
1. **message_types.py**
- 调整 MessageType 常量的值
- 修改 normalize_message() 的字段映射
2. **client.py**
- 调整各个 API 的 type 值
- 修改 data 字段名称
3. **hookbot.py**
- 根据实际消息格式调整处理逻辑
4. **bot.py**
- 实现登录信息的正确获取
## 📦 项目文件清单
```
WechatHookBot/
├── Memory Bank/ # ✅ 项目管理文档
├── docs/ # ✅ 技术文档
├── WechatHook/ # ✅ Hook 层4个文件
├── utils/ # ✅ 工具类6个文件
├── database/ # ✅ 数据库4个文件
├── plugins/ # ✅ 插件ExamplePlugin
├── libs/ # ✅ DLL 文件
├── bot.py # ✅ 主入口
├── main_config.toml # ✅ 配置文件
└── requirements.txt # ✅ 依赖列表
```
## 🚀 下一步行动
1. **用户测试**
- 在远程设备运行 bot.py
- 测试各项功能
- 记录日志和错误
2. **反馈收集**
- 消息 type 值
- API 返回格式
- 错误信息
3. **代码调整**
- 根据反馈修改代码
- 修复发现的问题
4. **迭代测试**
- 重复测试直到稳定
## 💡 使用建议
1. **首次运行**
- 先确保微信已登录
- 使用 32位 Python
- 关闭杀毒软件或添加信任
2. **查看日志**
- 控制台会显示彩色日志
- logs/hookbot.log 包含详细日志
3. **测试插件**
- 发送 "ping" 测试 ExamplePlugin
- 观察是否收到 "pong" 回复
4. **遇到问题**
- 查看完整的错误堆栈
- 提供 logs/hookbot.log 文件
- 描述具体的操作步骤

View File

@@ -1,254 +0,0 @@
# 技术决策日志
记录项目中的重要技术决策、原因和影响。
---
## 2025-01-12
### 决策 #001: 项目架构选择
**决策:** 采用四层架构设计DLL Hook → WechatHook → Bot Core → Plugin
**原因:**
- 清晰的职责分离
- 便于维护和扩展
- 最大化复用 XYBotV2 代码
**影响:**
- 代码结构清晰
- 80% 代码可复用
- 学习成本低
**状态:** ✅ 已实施
---
### 决策 #002: DLL 文件存放位置
**决策:** DLL 文件放在项目的 `libs/` 目录,不放到微信安装目录
**原因:**
- Loader.dll 由 Python 程序直接加载
- Helper.dll 由 Loader.dll 动态注入到微信进程
- 不需要修改微信安装目录
**影响:**
- 部署更简单
- 不污染微信目录
- 便于版本管理
**状态:** ✅ 已实施
---
### 决策 #003: 不进行本地测试
**决策:** 所有测试在远程设备进行,开发过程中不在本地运行
**原因:**
- 用户要求
- 避免本地环境污染
- 专注于代码实现
**影响:**
- 需要更仔细的代码审查
- 依赖用户反馈进行调试
- 开发周期可能稍长
**状态:** ✅ 已实施
---
### 决策 #004: 复用 XYBotV2 插件系统
**决策:** 完全复用 XYBotV2 的插件系统PluginBase、EventManager、装饰器
**原因:**
- 插件系统设计优秀
- 已经过验证
- 减少开发工作量
- 保持插件兼容性
**影响:**
- 开发速度快
- XYBot 插件可直接使用
- 代码质量有保证
**状态:** ✅ 已确定,待实施
---
### 决策 #005: 使用 Memory Bank 系统
**决策:** 创建 Memory Bank 文件夹,实时跟踪项目进度和决策
**原因:**
- 用户要求
- 便于项目管理
- 保持开发上下文
- 方便后续维护
**影响:**
- 项目管理更规范
- 决策过程可追溯
- 便于团队协作
**文件结构:**
- projectBrief.md - 项目简介
- activeContext.md - 当前上下文
- progress.md - 进度跟踪
- decisionLog.md - 决策日志
- systemPatterns.md - 系统模式
**状态:** ✅ 已实施
---
### 决策 #006: 消息类型映射策略
**决策:** 创建独立的 message_types.py 文件,定义 API type 到内部 event 的映射
**原因:**
- 解耦消息类型定义
- 便于维护和扩展
- 统一消息格式转换
**实现方式:**
```python
MESSAGE_TYPE_MAP = {
10001: "text_message",
10002: "image_message",
# ...
}
```
**影响:**
- 代码更清晰
- 易于添加新消息类型
- 便于调试
**状态:** ✅ 已确定,待实施
---
### 决策 #007: 回调处理机制
**决策:** 使用装饰器模式实现回调处理(@CONNECT_CALLBACK, @RECV_CALLBACK, @CLOSE_CALLBACK
**原因:**
- 参考 python_demo.py 的实现
- 代码简洁优雅
- 易于扩展
**影响:**
- 回调注册简单
- 支持多个回调处理器
- 代码可读性好
**状态:** ✅ 已确定,待实施
---
### 决策 #008: 异步编程模型
**决策:** 使用 asyncio 作为异步编程框架,所有 API 都是异步函数
**原因:**
- 与 XYBotV2 保持一致
- 提高并发性能
- 插件系统要求
**影响:**
- 所有函数必须使用 async/await
- 需要处理同步/异步转换
- 性能更好
**状态:** ✅ 已确定,待实施
---
### 决策 #009: 配置文件格式
**决策:** 使用 TOML 格式作为配置文件格式
**原因:**
- 与 XYBotV2 保持一致
- 可读性好
- Python 3.11+ 原生支持
**影响:**
- 配置文件易于编辑
- 支持注释
- 类型安全
**状态:** ✅ 已实施
---
### 决策 #010: 数据库选择
**决策:** 使用 SQLite + aiosqlite 作为数据库
**原因:**
- 与 XYBotV2 保持一致
- 无需额外服务
- 支持异步操作
**影响:**
- 部署简单
- 性能足够
- 便于备份
**状态:** ✅ 已确定,待实施
---
## 待决策事项
### 待决策 #001: 消息类型 type 值
**问题:** 个微 API 各类消息的具体 type 值未知
**选项:**
1. 参考 API 文档推测
2. 实际测试获取
**倾向:** 选项 2 - 实际测试获取
**需要:** 用户提供实际消息的 type 值
---
### 待决策 #002: WebUI 是否实现
**问题:** 是否需要实现 Web 管理界面
**选项:**
1. 立即实现
2. 基础功能完成后再实现
3. 不实现
**倾向:** 选项 2 - 基础功能完成后再实现
**原因:** 先保证核心功能稳定
---
## 决策模板
```markdown
### 决策 #XXX: 决策标题
**决策:** 简要描述决策内容
**原因:**
- 原因1
- 原因2
**影响:**
- 影响1
- 影响2
**状态:** ✅ 已实施 / 🚧 进行中 / ⏳ 待实施 / ❌ 已废弃
```

View File

@@ -1,315 +0,0 @@
# 开发进度跟踪
**项目开始:** 2025-01-12
**最后更新:** 2025-01-12 14:35
**当前状态:** ✅ 开发完成,等待测试
## 总体进度
**当前阶段:** 开发完成
**完成度:** 90%
```
[██████████████████░░] 90%
```
## 阶段进度
### ✅ 第零阶段:项目初始化 (100%)
- [x] 创建项目目录结构
- [x] 编写完整文档系统 (6个文档)
- [x] 创建配置文件模板
- [x] 复制 DLL 文件到 libs/
- [x] 创建 Memory Bank 系统
**完成时间:** 2025-01-12 13:50
---
### ✅ 第一阶段WechatHook 层 (100%)
**目标:** 实现 DLL 调用和 API 封装
#### 1.1 NoveLoader 实现 (100%)
- [x] 创建 WechatHook/__init__.py
- [x] 实现 loader.py 基础结构
- [x] 实现 DLL 函数偏移调用
- [x] 实现所有 DLL 函数封装
- [x] 添加详细日志
#### 1.2 WechatHookClient 实现 (100%)
- [x] 创建 client.py 基础结构
- [x] 实现消息发送 API (8个方法)
- [x] 实现好友管理 API (6个方法)
- [x] 实现群聊管理 API (9个方法)
- [x] 实现登录信息 API
#### 1.3 消息类型映射 (100%)
- [x] 创建 message_types.py
- [x] 定义消息类型常量 (MessageType 类)
- [x] 创建 type 到 event 的映射表
- [x] 实现消息格式转换函数 (normalize_message)
#### 1.4 回调处理器 (100%)
- [x] 创建 callbacks.py
- [x] 实现连接回调处理 (CONNECT_CALLBACK)
- [x] 实现接收回调处理 (RECV_CALLBACK)
- [x] 实现断开回调处理 (CLOSE_CALLBACK)
- [x] 实现回调注册机制
**完成时间:** 2025-01-12 14:10
**代码统计:**
- loader.py: ~280 行
- client.py: ~450 行
- message_types.py: ~180 行
- callbacks.py: ~180 行
- 总计:~1090 行
---
### ✅ 第二阶段Bot 核心层 (100%)
**目标:** 实现消息路由和事件分发
#### 2.1 复用 XYBot 代码 (100%)
- [x] 复制 utils/ 目录 (5个文件)
- [x] 复制 database/ 目录 (3个文件)
- [x] 创建模块 __init__.py
#### 2.2 HookBot 实现 (100%)
- [x] 创建 utils/hookbot.py
- [x] 实现消息预处理
- [x] 实现消息类型映射
- [x] 实现白名单/黑名单过滤
- [x] 实现事件分发
#### 2.3 主入口实现 (100%)
- [x] 创建 bot.py
- [x] 实现初始化流程
- [x] 实现插件加载
- [x] 实现回调注册
- [x] 实现主循环
**完成时间:** 2025-01-12 14:30
**代码统计:**
- hookbot.py: ~120 行
- bot.py: ~200 行
- 总计:~320 行
---
### ✅ 第三阶段:插件系统 (100%)
**目标:** 集成插件系统并创建示例
- [x] 创建 ExamplePlugin 示例
- [x] 实现文本消息处理
- [x] 实现定时任务
- [x] 测试装饰器系统
- [x] 插件系统完全集成
**完成时间:** 2025-01-12 14:35
**代码统计:**
- ExamplePlugin/main.py: ~50 行
---
### ⏳ 第四阶段:功能测试 (0%)
**目标:** 远程设备测试所有功能
- [ ] 测试 DLL 注入
- [ ] 测试消息接收
- [ ] 测试消息发送
- [ ] 测试插件加载
- [ ] 测试定时任务
- [ ] 修复发现的问题
**预计完成:** 2025-01-13
---
## 每日进度
### 2025-01-12
**完成:**
- ✅ 创建完整文档系统 (6个文档)
- ✅ 创建配置文件和依赖列表
- ✅ 复制 DLL 文件到项目目录
- ✅ 创建 Memory Bank 管理系统 (5个文档)
- ✅ 实现 WechatHook 层完整代码 (4个模块~1090行)
- ✅ 复用 XYBot 代码 (utils + database)
- ✅ 实现 Bot 核心层 (hookbot + bot.py~320行)
- ✅ 创建 ExamplePlugin 示例插件 (~50行)
**总耗时:** 约 2 小时
**代码质量:**
- 详细的日志记录
- 完整的错误处理
- 清晰的代码结构
- 丰富的注释文档
---
## 里程碑
- [x] **M0:** 项目初始化完成 (2025-01-12 13:50)
- [x] **M1:** WechatHook 层完成 (2025-01-12 14:10)
- [x] **M2:** Bot 核心层完成 (2025-01-12 14:30)
- [x] **M3:** 插件系统集成完成 (2025-01-12 14:35)
- [ ] **M4:** 基础功能测试通过
- [ ] **M5:** 项目可用版本发布
---
## 最终统计数据
**总任务数:** 45
**已完成:** 40
**进行中:** 0
**待开始:** 5
**代码行数:**
- WechatHook 层: ~1090 行
- Bot 核心层: ~320 行
- 示例插件: ~50 行
- 复用代码: ~600 行
- **总计:~2060 行**
**文档页数:** 11
- 技术文档: 6 个
- Memory Bank: 5 个
**测试用例:** 0 (待用户测试)
---
## 项目文件结构
```
WechatHookBot/
├── Memory Bank/ # 项目管理文档
│ ├── projectBrief.md
│ ├── activeContext.md
│ ├── progress.md
│ ├── decisionLog.md
│ └── systemPatterns.md
├── docs/ # 技术文档
│ ├── 架构设计.md
│ ├── 插件开发.md
│ ├── API文档.md
│ ├── 快速开始.md
│ └── 项目概览.md
├── WechatHook/ # Hook 层
│ ├── __init__.py
│ ├── loader.py
│ ├── client.py
│ ├── message_types.py
│ └── callbacks.py
├── utils/ # 工具类
│ ├── __init__.py
│ ├── plugin_base.py
│ ├── plugin_manager.py
│ ├── event_manager.py
│ ├── decorators.py
│ ├── singleton.py
│ └── hookbot.py
├── database/ # 数据库
│ ├── __init__.py
│ ├── XYBotDB.py
│ ├── keyvalDB.py
│ └── messsagDB.py
├── plugins/ # 插件
│ └── ExamplePlugin/
│ ├── __init__.py
│ └── main.py
├── libs/ # DLL 文件
│ ├── Loader.dll
│ └── Helper.dll
├── bot.py # 主入口
├── main_config.toml # 配置文件
├── requirements.txt # 依赖列表
├── .gitignore # Git 忽略
└── README.md # 项目说明
```
---
## 待测试确认事项
### 高优先级
1. **消息类型 type 值** - 需要实际测试获取
2. **API type 值** - 需要实际测试获取
3. **消息数据字段名** - 需要实际测试确认
4. **登录信息获取** - 需要实现正确的获取方式
### 中优先级
5. **API 返回格式** - 需要用户反馈
6. **错误处理** - 需要测试各种异常情况
7. **性能优化** - 需要测试消息处理速度
### 低优先级
8. **WebUI 实现** - 可选功能
9. **更多插件** - 根据需求添加
10. **文档完善** - 根据测试结果补充
---
## 项目亮点
1. **完整的架构设计** - 四层架构,职责清晰
2. **高度代码复用** - 80% 复用 XYBot 代码
3. **详细的文档** - 11 个文档文件
4. **Memory Bank 系统** - 完整的项目管理
5. **插件兼容性** - 完全兼容 XYBot 插件
6. **异步编程** - 全异步设计,性能优秀
7. **详细日志** - 便于调试和问题定位
8. **错误处理** - 完善的异常处理机制
---
## 下一步行动
### 用户需要做的:
1. **安装依赖**
```bash
pip install -r requirements.txt
```
2. **运行程序**
```bash
python bot.py
```
3. **测试功能**
- 发送 "ping" 测试
- 发送各类消息
- 查看日志输出
4. **反馈信息**
- 消息 type 值
- 数据字段名
- 错误日志
- 功能是否正常
### 开发者需要做的:
1. **等待测试反馈**
2. **根据反馈调整代码**
3. **修复发现的问题**
4. **迭代优化**
---
**项目状态:** ✅ 开发完成,等待测试
**预计可用时间:** 测试通过后即可使用

View File

@@ -1,77 +0,0 @@
# WechatHookBot 项目简介
## 项目概述
**项目名称:** WechatHookBot
**创建时间:** 2025-01-12
**当前版本:** v0.1.0-dev
**项目状态:** 🚧 开发中
## 项目目标
基于个微大客户版 Hook API 构建一个类似 XYBotV2 的微信机器人框架,实现插件化、事件驱动的架构。
## 核心特性
-**无需登录系统**Hook 已登录的微信客户端
-**插件化架构**:完全兼容 XYBotV2 插件系统
-**实时消息回调**Socket 回调机制
-**轻量级设计**:无需 Redis 依赖
## 技术栈
- **语言:** Python 3.x (32位)
- **异步框架:** asyncio
- **DLL 调用:** ctypes
- **数据库:** SQLite + aiosqlite
- **定时任务:** APScheduler
- **Web 框架:** Flask + SocketIO (可选)
## 架构设计
```
DLL Hook 层 → WechatHook 层 → Bot 核心层 → 插件层 → WebUI 层
```
## 参考项目
- **XYBotV2** - 插件系统、事件管理、数据库架构
- **个微大客户版** - DLL Hook API 和调用示例
## 开发原则
1. **代码复用优先**:最大化复用 XYBotV2 代码
2. **最小化实现**:只写必要的代码
3. **不本地测试**:所有测试在远程设备进行
4. **文档先行**:保持 Memory Bank 实时更新
## 项目结构
```
WechatHookBot/
├── Memory Bank/ # 项目管理和进度跟踪
├── docs/ # 技术文档
├── WechatHook/ # Hook 层实现
├── utils/ # 工具类(复用 XYBot
├── database/ # 数据库(复用 XYBot
├── plugins/ # 插件目录
├── WebUI/ # Web 界面(可选)
├── libs/ # DLL 文件
├── bot.py # 主入口
└── main_config.toml # 配置文件
```
## 关键里程碑
- [x] 文档系统完成
- [x] DLL 文件准备
- [ ] WechatHook 层实现
- [ ] Bot 核心层实现
- [ ] 插件系统集成
- [ ] 基础功能测试
- [ ] WebUI 实现(可选)
## 联系方式
**开发者:** Claude
**项目路径:** D:\project\shrobot\WechatHookBot\

View File

@@ -1,252 +0,0 @@
# 系统模式和最佳实践
## 代码模式
### 1. DLL 函数调用模式
```python
# 通过内存偏移调用未导出函数
def __get_non_exported_func(self, offset: int, arg_types, return_type):
func_addr = self.loader_module_base + offset
func_type = ctypes.WINFUNCTYPE(return_type, *arg_types)
return func_type(func_addr)
```
### 2. 回调装饰器模式
```python
@CONNECT_CALLBACK(in_class=True)
def on_connect(self, client_id):
# 处理连接
pass
@RECV_CALLBACK(in_class=True)
def on_receive(self, client_id, message_type, data):
# 处理消息
pass
```
### 3. 异步 API 封装模式
```python
async def send_text(self, to_wxid: str, content: str) -> bool:
payload = {"type": 11036, "data": {"to_wxid": to_wxid, "content": content}}
return await asyncio.to_thread(
self.loader.SendWeChatData,
self.client_id,
json.dumps(payload, ensure_ascii=False)
)
```
### 4. 消息类型映射模式
```python
MESSAGE_TYPE_MAP = {
10001: "text_message",
10002: "image_message",
}
event_type = MESSAGE_TYPE_MAP.get(msg_type)
if event_type:
await EventManager.emit(event_type, client, message)
```
### 5. 插件事件处理模式
```python
@on_text_message(priority=50)
async def handle_text(self, client, message):
# 处理逻辑
return True # 继续执行后续处理器
```
## 架构模式
### 分层架构
```
应用层 (Plugins)
业务层 (HookBot)
服务层 (WechatHookClient)
基础层 (NoveLoader)
系统层 (DLL)
```
### 事件驱动模式
```
消息接收 → 回调触发 → 类型映射 → 事件分发 → 插件处理
```
## 命名规范
### 文件命名
- 模块文件:小写下划线 `loader.py`, `message_types.py`
- 类文件:大驼峰 `WechatHookClient`, `HookBot`
### 变量命名
- 常量:大写下划线 `MESSAGE_TYPE_MAP`
- 变量:小写下划线 `client_id`, `msg_type`
- 类名:大驼峰 `NoveLoader`
- 函数:小写下划线 `send_text()`
### API 命名
- 发送类:`send_xxx()` - send_text, send_image
- 获取类:`get_xxx()` - get_friend_list, get_chatroom_info
- 设置类:`set_xxx()` - set_friend_remark
## 错误处理模式
### API 调用错误处理
```python
try:
result = await client.send_text(wxid, content)
if result:
logger.info("发送成功")
else:
logger.error("发送失败")
except Exception as e:
logger.error(f"发送异常: {e}")
```
### DLL 调用错误处理
```python
if not os.path.exists(dll_path):
logger.error(f"DLL 文件不存在: {dll_path}")
return False
```
## 日志模式
### 日志级别使用
```python
logger.debug("调试信息") # 详细的调试信息
logger.info("普通信息") # 一般信息
logger.success("成功信息") # 操作成功
logger.warning("警告信息") # 警告但不影响运行
logger.error("错误信息") # 错误需要关注
```
### 日志格式
```python
logger.info(f"收到消息: FromWxid={from_wxid}, Content={content}")
```
## 配置模式
### TOML 配置读取
```python
import tomllib
with open("main_config.toml", "rb") as f:
config = tomllib.load(f)
value = config.get("section", {}).get("key", default_value)
```
## 数据库模式
### 异步数据库操作
```python
from database.keyvalDB import KeyvalDB
keyval_db = KeyvalDB()
await keyval_db.set("key", "value")
value = await keyval_db.get("key")
```
## 测试模式
### 远程测试流程
1. 实现功能代码
2. 添加详细日志
3. 提交给用户测试
4. 根据反馈修复
5. 重复直到通过
### 需要用户提供的测试信息
- API 返回的实际数据格式
- 消息的实际 type 值
- 错误信息和日志
- 功能是否正常工作
## 代码复用模式
### 从 XYBot 复用
```python
# 完全复用(不修改)
- utils/plugin_base.py
- utils/plugin_manager.py
- utils/event_manager.py
- utils/decorators.py
- utils/singleton.py
- database/
# 参考实现(需修改)
- utils/xybot.py utils/hookbot.py
```
## 性能优化模式
### 异步并发
```python
# 并发执行多个任务
tasks = [
client.send_text(wxid1, msg1),
client.send_text(wxid2, msg2),
]
results = await asyncio.gather(*tasks)
```
### 消息队列
```python
# 避免消息发送过快
await asyncio.sleep(0.5) # 每条消息间隔
```
## 安全模式
### 路径处理
```python
# 使用绝对路径
path = os.path.realpath(relative_path)
```
### 参数验证
```python
if not wxid or not content:
logger.error("参数不能为空")
return False
```
## 扩展模式
### 添加新消息类型
1.`message_types.py` 添加映射
2.`decorators.py` 添加装饰器(如需要)
3.`client.py` 添加发送方法(如需要)
### 添加新 API
1.`client.py` 添加方法
2. 构造正确的 payload
3. 调用 `SendWeChatData`
4. 处理返回结果

BIN
plugins/AIChat.zip Normal file

Binary file not shown.

View File

@@ -620,6 +620,615 @@ class AIChat(PluginBase):
return tools
# ==================== Gemini API 格式转换方法 ====================
def _convert_tools_to_gemini(self, openai_tools: list) -> list:
"""
将 OpenAI 格式的工具定义转换为 Gemini 格式
OpenAI: [{"type": "function", "function": {"name": ..., "parameters": ...}}]
Gemini: [{"function_declarations": [{"name": ..., "parameters": ...}]}]
"""
if not openai_tools:
return []
function_declarations = []
for tool in openai_tools:
if tool.get("type") == "function":
func = tool.get("function", {})
function_declarations.append({
"name": func.get("name", ""),
"description": func.get("description", ""),
"parameters": func.get("parameters", {})
})
if function_declarations:
return [{"function_declarations": function_declarations}]
return []
def _build_gemini_contents(self, system_content: str, history_messages: list,
current_message: dict, is_group: bool = False) -> list:
"""
构建 Gemini API 的 contents 格式
Args:
system_content: 系统提示词(包含人设、时间、持久记忆等)
history_messages: 历史消息列表
current_message: 当前用户消息 {"text": str, "media": optional}
is_group: 是否群聊
Returns:
Gemini contents 格式的列表
"""
contents = []
# Gemini 没有 system role将系统提示放在第一条 user 消息中
# 然后用一条简短的 model 回复来"确认"
system_parts = [{"text": f"[系统指令]\n{system_content}\n\n请按照以上指令进行对话。"}]
contents.append({"role": "user", "parts": system_parts})
contents.append({"role": "model", "parts": [{"text": "好的,我会按照指令进行对话。"}]})
# 添加历史消息
for msg in history_messages:
gemini_msg = self._convert_message_to_gemini(msg, is_group)
if gemini_msg:
contents.append(gemini_msg)
# 添加当前用户消息
current_parts = []
if current_message.get("text"):
current_parts.append({"text": current_message["text"]})
# 添加媒体内容(图片/视频)
if current_message.get("image_base64"):
image_data = current_message["image_base64"]
# 去除 data:image/xxx;base64, 前缀
if image_data.startswith("data:"):
mime_type = image_data.split(";")[0].split(":")[1]
image_data = image_data.split(",", 1)[1]
else:
mime_type = "image/jpeg"
current_parts.append({
"inline_data": {
"mime_type": mime_type,
"data": image_data
}
})
if current_message.get("video_base64"):
video_data = current_message["video_base64"]
# 去除 data:video/xxx;base64, 前缀
if video_data.startswith("data:"):
video_data = video_data.split(",", 1)[1]
current_parts.append({
"inline_data": {
"mime_type": "video/mp4",
"data": video_data
}
})
if current_parts:
contents.append({"role": "user", "parts": current_parts})
return contents
def _convert_message_to_gemini(self, msg: dict, is_group: bool = False) -> dict:
"""
将单条历史消息转换为 Gemini 格式
支持的输入格式:
1. 群聊历史: {"nickname": str, "content": str|list}
2. 私聊记忆: {"role": "user"|"assistant", "content": str|list}
"""
parts = []
# 群聊历史格式
if "nickname" in msg:
nickname = msg.get("nickname", "")
content = msg.get("content", "")
if isinstance(content, list):
# 多模态内容
for item in content:
if item.get("type") == "text":
text = item.get("text", "")
parts.append({"text": f"[{nickname}] {text}" if nickname else text})
elif item.get("type") == "image_url":
image_url = item.get("image_url", {}).get("url", "")
if image_url.startswith("data:"):
mime_type = image_url.split(";")[0].split(":")[1]
image_data = image_url.split(",", 1)[1]
parts.append({
"inline_data": {
"mime_type": mime_type,
"data": image_data
}
})
else:
# 纯文本
parts.append({"text": f"[{nickname}] {content}" if nickname else content})
# 群聊历史都作为 user 消息(因为是多人对话记录)
return {"role": "user", "parts": parts} if parts else None
# 私聊记忆格式
elif "role" in msg:
role = msg.get("role", "user")
content = msg.get("content", "")
# 转换角色名
gemini_role = "model" if role == "assistant" else "user"
if isinstance(content, list):
for item in content:
if item.get("type") == "text":
parts.append({"text": item.get("text", "")})
elif item.get("type") == "image_url":
image_url = item.get("image_url", {}).get("url", "")
if image_url.startswith("data:"):
mime_type = image_url.split(";")[0].split(":")[1]
image_data = image_url.split(",", 1)[1]
parts.append({
"inline_data": {
"mime_type": mime_type,
"data": image_data
}
})
else:
parts.append({"text": content})
return {"role": gemini_role, "parts": parts} if parts else None
return None
def _parse_gemini_tool_calls(self, response_parts: list) -> list:
"""
从 Gemini 响应中解析工具调用
Gemini 格式: {"functionCall": {"name": "...", "args": {...}}}
转换为内部格式: {"id": "...", "function": {"name": "...", "arguments": "..."}}
"""
tool_calls = []
for i, part in enumerate(response_parts):
if "functionCall" in part:
func_call = part["functionCall"]
tool_calls.append({
"id": f"call_{uuid.uuid4().hex[:8]}",
"type": "function",
"function": {
"name": func_call.get("name", ""),
"arguments": json.dumps(func_call.get("args", {}), ensure_ascii=False)
}
})
return tool_calls
def _build_tool_response_contents(self, contents: list, tool_calls: list,
tool_results: list) -> list:
"""
构建包含工具调用结果的 contents用于继续对话
Args:
contents: 原始 contents
tool_calls: 工具调用列表
tool_results: 工具执行结果列表
"""
new_contents = contents.copy()
# 添加 model 的工具调用响应
function_call_parts = []
for tc in tool_calls:
function_call_parts.append({
"functionCall": {
"name": tc["function"]["name"],
"args": json.loads(tc["function"]["arguments"])
}
})
if function_call_parts:
new_contents.append({"role": "model", "parts": function_call_parts})
# 添加工具执行结果
function_response_parts = []
for i, result in enumerate(tool_results):
tool_name = tool_calls[i]["function"]["name"] if i < len(tool_calls) else "unknown"
function_response_parts.append({
"functionResponse": {
"name": tool_name,
"response": {"result": result.get("message", str(result))}
}
})
if function_response_parts:
new_contents.append({"role": "user", "parts": function_response_parts})
return new_contents
# ==================== 统一的 Gemini API 调用 ====================
async def _call_gemini_api(self, contents: list, tools: list = None,
bot=None, from_wxid: str = None,
chat_id: str = None, nickname: str = "",
user_wxid: str = None, is_group: bool = False) -> tuple:
"""
统一的 Gemini API 调用方法
Args:
contents: Gemini 格式的对话内容
tools: Gemini 格式的工具定义(可选)
bot: WechatHookClient 实例
from_wxid: 消息来源
chat_id: 会话ID
nickname: 用户昵称
user_wxid: 用户wxid
is_group: 是否群聊
Returns:
(response_text, tool_calls) - 响应文本和工具调用列表
"""
import json
api_config = self.config["api"]
model = api_config["model"]
api_url = api_config.get("gemini_url", api_config.get("url", "").replace("/v1/chat/completions", "/v1beta/models"))
api_key = api_config["api_key"]
# 构建完整 URL
full_url = f"{api_url}/{model}:streamGenerateContent?alt=sse"
# 构建请求体
payload = {
"contents": contents,
"generationConfig": {
"maxOutputTokens": api_config.get("max_tokens", 8192)
}
}
if tools:
payload["tools"] = tools
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
timeout = aiohttp.ClientTimeout(total=api_config.get("timeout", 120))
# 配置代理
connector = None
proxy_config = self.config.get("proxy", {})
if proxy_config.get("enabled", False) and PROXY_SUPPORT:
proxy_type = proxy_config.get("type", "socks5").upper()
proxy_host = proxy_config.get("host", "127.0.0.1")
proxy_port = proxy_config.get("port", 7890)
proxy_username = proxy_config.get("username")
proxy_password = proxy_config.get("password")
if proxy_username and proxy_password:
proxy_url = f"{proxy_type}://{proxy_username}:{proxy_password}@{proxy_host}:{proxy_port}"
else:
proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}"
try:
connector = ProxyConnector.from_url(proxy_url)
logger.debug(f"[Gemini] 使用代理: {proxy_type}://{proxy_host}:{proxy_port}")
except Exception as e:
logger.warning(f"[Gemini] 代理配置失败: {e}")
# 保存用户信息供工具调用使用
self._current_user_wxid = user_wxid
self._current_is_group = is_group
try:
async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
logger.debug(f"[Gemini] 发送流式请求: {full_url}")
async with session.post(full_url, json=payload, headers=headers) as resp:
if resp.status != 200:
error_text = await resp.text()
logger.error(f"[Gemini] API 错误: {resp.status}, {error_text[:500]}")
raise Exception(f"Gemini API 错误 {resp.status}: {error_text[:200]}")
# 流式接收响应
full_text = ""
all_parts = []
tool_call_hint_sent = False
async for line in resp.content:
line = line.decode('utf-8').strip()
if not line or not line.startswith("data: "):
continue
try:
data = json.loads(line[6:])
candidates = data.get("candidates", [])
if not candidates:
continue
content = candidates[0].get("content", )
parts = content.get("parts", [])
for part in parts:
all_parts.append(part)
# 收集文本
if "text" in part:
full_text += part["text"]
# 检测到工具调用时,先发送已有文本
if "functionCall" in part:
if not tool_call_hint_sent and bot and from_wxid:
tool_call_hint_sent = True
if full_text.strip():
logger.info(f"[Gemini] 检测到工具调用,先发送文本: {full_text[:30]}...")
await bot.send_text(from_wxid, full_text.strip())
except json.JSONDecodeError:
continue
# 解析工具调用
tool_calls = self._parse_gemini_tool_calls(all_parts)
logger.info(f"[Gemini] 响应完成, 文本长度: {len(full_text)}, 工具调用: {len(tool_calls)}")
return full_text.strip(), tool_calls
except aiohttp.ClientError as e:
logger.error(f"[Gemini] 网络请求失败: {e}")
raise
except asyncio.TimeoutError:
logger.error(f"[Gemini] 请求超时")
raise
async def _handle_gemini_response(self, response_text: str, tool_calls: list,
contents: list, tools: list,
bot, from_wxid: str, chat_id: str,
nickname: str, user_wxid: str, is_group: bool):
"""
处理 Gemini API 响应,包括工具调用
Args:
response_text: AI 响应文本
tool_calls: 工具调用列表
contents: 原始 contents用于工具调用后继续对话
tools: 工具定义
bot, from_wxid, chat_id, nickname, user_wxid, is_group: 上下文信息
"""
if tool_calls:
# 有工具调用,异步执行
logger.info(f"[Gemini] 启动异步工具执行,共 {len(tool_calls)} 个工具")
asyncio.create_task(
self._execute_gemini_tools_async(
tool_calls, contents, tools,
bot, from_wxid, chat_id, nickname, user_wxid, is_group
)
)
return None # 工具调用异步处理
return response_text
async def _execute_gemini_tools_async(self, tool_calls: list, contents: list, tools: list,
bot, from_wxid: str, chat_id: str,
nickname: str, user_wxid: str, is_group: bool):
"""
异步执行 Gemini 工具调用
"""
import json
try:
logger.info(f"[Gemini] 开始执行 {len(tool_calls)} 个工具")
# 收集需要 AI 回复的工具结果
need_ai_reply_results = []
tool_results = []
for tool_call in tool_calls:
function_name = tool_call["function"]["name"]
try:
arguments = json.loads(tool_call["function"]["arguments"])
except:
arguments = {}
logger.info(f"[Gemini] 执行工具: {function_name}, 参数: {arguments}")
result = await self._execute_tool_and_get_result(function_name, arguments, bot, from_wxid)
tool_results.append(result)
if result and result.get("success"):
logger.success(f"[Gemini] 工具 {function_name} 执行成功")
# 检查是否需要 AI 继续回复
if result.get("need_ai_reply"):
need_ai_reply_results.append({
"tool_call": tool_call,
"result": result
})
elif not result.get("already_sent") and result.get("message"):
if result.get("send_result_text"):
await bot.send_text(from_wxid, result["message"])
else:
logger.warning(f"[Gemini] 工具 {function_name} 执行失败: {result}")
if result and result.get("message"):
await bot.send_text(from_wxid, f"{result['message']}")
# 如果有需要 AI 回复的工具结果,继续对话
if need_ai_reply_results:
await self._continue_gemini_with_tool_results(
contents, tools, tool_calls, tool_results,
bot, from_wxid, chat_id, nickname, user_wxid, is_group
)
logger.info("[Gemini] 所有工具执行完成")
except Exception as e:
logger.error(f"[Gemini] 工具执行异常: {e}")
import traceback
logger.error(traceback.format_exc())
try:
await bot.send_text(from_wxid, "❌ 工具执行出错")
except:
pass
async def _continue_gemini_with_tool_results(self, contents: list, tools: list,
tool_calls: list, tool_results: list,
bot, from_wxid: str, chat_id: str,
nickname: str, user_wxid: str, is_group: bool):
"""
基于工具结果继续 Gemini 对话
"""
try:
# 构建包含工具结果的新 contents
new_contents = self._build_tool_response_contents(contents, tool_calls, tool_results)
# 继续调用 API不带工具避免循环调用
response_text, new_tool_calls = await self._call_gemini_api(
new_contents, tools=None,
bot=bot, from_wxid=from_wxid, chat_id=chat_id,
nickname=nickname, user_wxid=user_wxid, is_group=is_group
)
if response_text:
await bot.send_text(from_wxid, response_text)
logger.success(f"[Gemini] 工具回传后 AI 回复: {response_text[:50]}...")
# 保存到记忆
if chat_id:
self._add_to_memory(chat_id, "assistant", response_text)
if is_group:
import tomllib
with open("main_config.toml", "rb") as f:
main_config = tomllib.load(f)
bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人")
await self._add_to_history(from_wxid, bot_nickname, response_text)
except Exception as e:
logger.error(f"[Gemini] 工具回传后继续对话失败: {e}")
import traceback
logger.error(traceback.format_exc())
async def _process_with_gemini(self, text: str = "", image_base64: str = None,
video_base64: str = None, bot=None,
from_wxid: str = None, chat_id: str = None,
nickname: str = "", user_wxid: str = None,
is_group: bool = False) -> str:
"""
统一的 Gemini 消息处理入口
支持:纯文本、图片+文本、视频+文本
Args:
text: 用户消息文本
image_base64: 图片 base64可选
video_base64: 视频 base64可选
bot, from_wxid, chat_id, nickname, user_wxid, is_group: 上下文信息
Returns:
AI 响应文本,如果是工具调用则返回 None
"""
import json
# 1. 构建系统提示词
system_content = self._build_system_content(nickname, from_wxid, user_wxid, is_group)
# 2. 加载历史消息
history_messages = []
if is_group and from_wxid:
history = await self._load_history(from_wxid)
max_context = self.config.get("history", {}).get("max_context", 50)
history_messages = history[-max_context:] if len(history) > max_context else history
elif chat_id:
memory_messages = self._get_memory_messages(chat_id)
if memory_messages and len(memory_messages) > 1:
history_messages = memory_messages[:-1] # 排除刚添加的当前消息
# 3. 构建当前消息
current_message = {"text": f"[{nickname}] {text}" if is_group and nickname else text}
if image_base64:
current_message["image_base64"] = image_base64
if video_base64:
current_message["video_base64"] = video_base64
# 4. 构建 Gemini contents
contents = self._build_gemini_contents(system_content, history_messages, current_message, is_group)
# 5. 收集并转换工具
openai_tools = self._collect_tools()
gemini_tools = self._convert_tools_to_gemini(openai_tools)
if gemini_tools:
logger.info(f"[Gemini] 已加载 {len(openai_tools)} 个工具")
# 6. 调用 Gemini API带重试
max_retries = self.config.get("api", {}).get("max_retries", 2)
last_error = None
for attempt in range(max_retries + 1):
try:
response_text, tool_calls = await self._call_gemini_api(
contents=contents,
tools=gemini_tools if gemini_tools else None,
bot=bot,
from_wxid=from_wxid,
chat_id=chat_id,
nickname=nickname,
user_wxid=user_wxid,
is_group=is_group
)
# 处理工具调用
if tool_calls:
result = await self._handle_gemini_response(
response_text, tool_calls, contents, gemini_tools,
bot, from_wxid, chat_id, nickname, user_wxid, is_group
)
return result # None 表示工具调用已异步处理
# 检查空响应
if not response_text and attempt < max_retries:
logger.warning(f"[Gemini] 返回空内容,重试 {attempt + 1}/{max_retries}")
await asyncio.sleep(1)
continue
return response_text
except Exception as e:
last_error = e
if attempt < max_retries:
logger.warning(f"[Gemini] API 调用失败,重试 {attempt + 1}/{max_retries}: {e}")
await asyncio.sleep(1)
else:
raise
return ""
def _build_system_content(self, nickname: str, from_wxid: str,
user_wxid: str, is_group: bool) -> str:
"""构建系统提示词(包含人设、时间、持久记忆等)"""
system_content = self.system_prompt
# 添加当前时间
current_time = datetime.now()
weekday_map = {
0: "星期一", 1: "星期二", 2: "星期三", 3: "星期四",
4: "星期五", 5: "星期六", 6: "星期日"
}
weekday = weekday_map[current_time.weekday()]
time_str = current_time.strftime(f"%Y年%m月%d日 %H:%M:%S {weekday}")
system_content += f"\n\n当前时间:{time_str}"
if nickname:
system_content += f"\n当前对话用户的昵称是:{nickname}"
# 加载持久记忆
memory_chat_id = from_wxid if is_group else user_wxid
if memory_chat_id:
persistent_memories = self._get_persistent_memories(memory_chat_id)
if persistent_memories:
system_content += "\n\n【持久记忆】以下是用户要求你记住的重要信息:\n"
for m in persistent_memories:
mem_time = m['time'][:10] if m['time'] else ""
system_content += f"- [{mem_time}] {m['nickname']}: {m['content']}\n"
return system_content
# ==================== 结束 Gemini API 方法 ====================
async def _handle_list_prompts(self, bot, from_wxid: str):
"""处理人设列表指令"""
try:
@@ -982,41 +1591,19 @@ class AIChat(PluginBase):
chat_id = self._get_chat_id(from_wxid, user_wxid, is_group)
self._add_to_memory(chat_id, "user", actual_content)
# 调用 AI API带重试机制
max_retries = self.config.get("api", {}).get("max_retries", 2)
response = None
last_error = None
for attempt in range(max_retries + 1):
try:
response = await self._call_ai_api(actual_content, bot, from_wxid, chat_id, nickname, user_wxid, is_group)
# 检查返回值:
# - None: 工具调用已异步处理,不需要重试
# - "": 真正的空响应,需要重试
# - 有内容: 正常响应
if response is None:
# 工具调用,不重试
logger.info("AI 触发工具调用,已异步处理")
break
if response == "" and attempt < max_retries:
logger.warning(f"AI 返回空内容,重试 {attempt + 1}/{max_retries}")
await asyncio.sleep(1) # 等待1秒后重试
continue
break # 成功或已达到最大重试次数
except Exception as e:
last_error = e
if attempt < max_retries:
logger.warning(f"AI API 调用失败,重试 {attempt + 1}/{max_retries}: {e}")
await asyncio.sleep(1)
else:
raise
# 使用统一的 Gemini API 处理消息
response = await self._process_with_gemini(
text=actual_content,
bot=bot,
from_wxid=from_wxid,
chat_id=chat_id,
nickname=nickname,
user_wxid=user_wxid,
is_group=is_group
)
# 发送回复并添加到记忆
# 注意:如果返回 None 或空字符串,说明已经以其他形式处理了,不需要再发送文本
# 注意:如果返回 None 或空字符串,说明已经以其他形式处理了(如工具调用)
if response:
await bot.send_text(from_wxid, response)
self._add_to_memory(chat_id, "assistant", response)
@@ -1028,7 +1615,7 @@ class AIChat(PluginBase):
await self._add_to_history(from_wxid, bot_nickname, response)
logger.success(f"AI 回复成功: {response[:50]}...")
else:
logger.info("AI 回复为空或已通过其他方式发送(如聊天记录")
logger.info("AI 回复为空或已通过其他方式发送(如工具调用")
except Exception as e:
import traceback
@@ -2060,8 +2647,17 @@ class AIChat(PluginBase):
if is_group:
await self._add_to_history(from_wxid, nickname, title_text, image_base64=image_base64)
# 调用AI API带图片
response = await self._call_ai_api_with_image(title_text, image_base64, bot, from_wxid, chat_id, nickname, user_wxid, is_group)
# 使用统一的 Gemini API 处理图片消息
response = await self._process_with_gemini(
text=title_text,
image_base64=image_base64,
bot=bot,
from_wxid=from_wxid,
chat_id=chat_id,
nickname=nickname,
user_wxid=user_wxid,
is_group=is_group
)
if response:
await bot.send_text(from_wxid, response)
@@ -2083,15 +2679,8 @@ class AIChat(PluginBase):
async def _handle_quote_video(self, bot, video_elem, title_text: str, from_wxid: str,
user_wxid: str, is_group: bool, nickname: str, chat_id: str):
"""处理引用的视频消息 - 双AI架构"""
"""处理引用的视频消息 - 统一 Gemini API直接处理视频"""
try:
# 检查视频识别功能是否启用
video_config = self.config.get("video_recognition", {})
if not video_config.get("enabled", True):
logger.info("[视频识别] 功能未启用")
await bot.send_text(from_wxid, "❌ 视频识别功能未启用")
return False
# 提取视频 CDN 信息
cdnvideourl = video_elem.get("cdnvideourl", "")
aeskey = video_elem.get("aeskey", "")
@@ -2102,11 +2691,11 @@ class AIChat(PluginBase):
aeskey = video_elem.get("cdnrawvideoaeskey", "")
if not cdnvideourl or not aeskey:
logger.warning(f"[视频识别] 视频信息不完整: cdnurl={bool(cdnvideourl)}, aeskey={bool(aeskey)}")
logger.warning(f"[视频] 视频信息不完整: cdnurl={bool(cdnvideourl)}, aeskey={bool(aeskey)}")
await bot.send_text(from_wxid, "❌ 无法获取视频信息")
return False
logger.info(f"[视频识别] 处理引用视频: {title_text[:50]}...")
logger.info(f"[视频] 处理引用视频: {title_text[:50]}...")
# 提示用户正在处理
await bot.send_text(from_wxid, "🎬 正在分析视频,请稍候...")
@@ -2114,35 +2703,33 @@ class AIChat(PluginBase):
# 下载并编码视频
video_base64 = await self._download_and_encode_video(bot, cdnvideourl, aeskey)
if not video_base64:
logger.error("[视频识别] 视频下载失败")
logger.error("[视频] 视频下载失败")
await bot.send_text(from_wxid, "❌ 视频下载失败")
return False
logger.info("[视频识别] 视频下载和编码成功")
logger.info("[视频] 视频下载和编码成功")
# ========== 第一步视频AI 分析视频内容 ==========
video_description = await self._analyze_video_content(video_base64, video_config)
if not video_description:
logger.error("[视频识别] 视频AI分析失败")
await bot.send_text(from_wxid, "❌ 视频分析失败")
return False
logger.info(f"[视频识别] 视频AI分析完成: {video_description[:100]}...")
# ========== 第二步主AI 基于视频描述生成回复 ==========
# 构造包含视频描述的用户消息
# 用户问题
user_question = title_text.strip() if title_text.strip() else "这个视频讲了什么?"
combined_message = f"[用户发送了一个视频,以下是视频内容描述]\n{video_description}\n\n[用户的问题]\n{user_question}"
# 添加到记忆让主AI知道用户发了视频
self._add_to_memory(chat_id, "user", combined_message)
# 添加到记忆
self._add_to_memory(chat_id, "user", f"[发送了一个视频] {user_question}")
# 如果是群聊,添加到历史记录
if is_group:
await self._add_to_history(from_wxid, nickname, f"[发送了一个视频] {user_question}")
# 调用主AI生成回复使用现有的 _call_ai_api 方法,继承完整上下文
response = await self._call_ai_api(combined_message, chat_id, from_wxid, is_group, nickname)
# 使用统一的 Gemini API 直接处理视频(不再需要两步架构
response = await self._process_with_gemini(
text=user_question,
video_base64=video_base64,
bot=bot,
from_wxid=from_wxid,
chat_id=chat_id,
nickname=nickname,
user_wxid=user_wxid,
is_group=is_group
)
if response:
await bot.send_text(from_wxid, response)
@@ -2154,7 +2741,7 @@ class AIChat(PluginBase):
main_config = tomllib.load(f)
bot_nickname = main_config.get("Bot", {}).get("nickname", "机器人")
await self._add_to_history(from_wxid, bot_nickname, response)
logger.success(f"[视频识别] AI回复成功: {response[:50]}...")
logger.success(f"[视频] AI回复成功: {response[:50]}...")
else:
await bot.send_text(from_wxid, "❌ AI 回复生成失败")

View File

@@ -1,83 +0,0 @@
"""
测试 Gemini API 视频识别
"""
import base64
import requests
import json
# 配置
API_URL = "https://api.functen.cn/v1beta/models/gemini-3-pro-preview:generateContent"
API_KEY = "sk-NeOtq0kOU39x3LMqY09aKYLoOBJIgFkgGuDwVGgGEstXPn3M"
VIDEO_PATH = r"D:\project\shrobot\WechatHookBot\plugins\AIChat\265d0df9ea89578bcb86f824c5255a42.mp4"
def test_video():
# 读取视频并编码为 base64
print(f"读取视频: {VIDEO_PATH}")
with open(VIDEO_PATH, "rb") as f:
video_data = f.read()
video_base64 = base64.b64encode(video_data).decode()
print(f"视频大小: {len(video_data) / 1024 / 1024:.2f} MB")
print(f"Base64 长度: {len(video_base64)}")
# 构建 Gemini 原生格式请求
payload = {
"contents": [
{
"parts": [
{"text": "请描述这个视频的内容"},
{
"inline_data": {
"mime_type": "video/mp4",
"data": video_base64
}
}
]
}
],
"generationConfig": {
"maxOutputTokens": 4096
}
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {API_KEY}"
}
print(f"\n发送请求到: {API_URL}")
print("请求中...")
try:
response = requests.post(
API_URL,
headers=headers,
json=payload,
timeout=180
)
print(f"\n状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
print("\n=== 响应内容 ===")
print(json.dumps(result, ensure_ascii=False, indent=2))
# 提取文本
if "candidates" in result:
for candidate in result["candidates"]:
content = candidate.get("content", {})
for part in content.get("parts", []):
if "text" in part:
print("\n=== AI 回复 ===")
print(part["text"])
else:
print(f"\n错误响应: {response.text}")
except Exception as e:
print(f"\n请求失败: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
test_video()