feature:加入功能,小朋友人脸识别与分类。

This commit is contained in:
liuwei
2025-04-14 11:12:47 +08:00
parent c59dacb36b
commit 2f2e0a8113
9 changed files with 1473 additions and 7 deletions

305
db/kid_photo_redis.py Normal file
View File

@@ -0,0 +1,305 @@
# -*- coding: utf-8 -*-
from datetime import datetime
import json
import os
import logging
from typing import Dict, Optional, List, Any, Set
from db.connection import DBConnectionManager
class KidPhotoRedisDB:
"""小朋友照片提取功能Redis相关操作"""
def __init__(self, db_manager: DBConnectionManager):
self.db_manager = db_manager
self.prefix = "group:kid_photo:"
self.logger = logging.getLogger("DB.KidPhotoRedis")
def get_redis_connection(self):
"""获取Redis连接"""
try:
return self.db_manager.get_redis_connection()
except Exception as e:
self.logger.error(f"获取Redis连接失败: {e}")
return None
def save_last_analysis_time(self, group_id: str) -> bool:
"""保存最后分析时间"""
try:
timestamp = int(datetime.now().timestamp())
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
redis_client.set(f'{self.prefix}{group_id}:last_time', str(timestamp))
return True
except Exception as e:
self.logger.error(f"保存最后分析时间失败: {e}")
return False
def get_last_analysis_time(self, group_id: str) -> Optional[int]:
"""获取最后分析时间"""
try:
with self.get_redis_connection() as redis_client:
if not redis_client:
return None
timestamp = redis_client.get(f'{self.prefix}{group_id}:last_time')
if timestamp:
if isinstance(timestamp, bytes):
timestamp = timestamp.decode('utf-8')
return int(timestamp)
return None
except Exception as e:
self.logger.error(f"获取最后分析时间失败: {e}")
return None
def save_analysis_result(self, group_id: str, result: Dict) -> bool:
"""保存分析结果"""
try:
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
redis_client.set(f'{self.prefix}{group_id}:results', json.dumps(result, ensure_ascii=False))
return True
except Exception as e:
self.logger.error(f"保存分析结果失败: {e}")
return False
def get_last_analysis_result(self, group_id: str) -> Optional[Dict]:
"""获取最后一次分析结果"""
try:
with self.get_redis_connection() as redis_client:
if not redis_client:
return None
result = redis_client.get(f'{self.prefix}{group_id}:results')
if result:
if isinstance(result, bytes):
result = result.decode('utf-8')
return json.loads(result)
return None
except json.JSONDecodeError as e:
self.logger.error(f"解析分析结果JSON失败: {e}")
return None
except Exception as e:
self.logger.error(f"获取最后分析结果失败: {e}")
return None
def save_processed_photo(self, group_id: str, photo_path: str) -> bool:
"""保存已处理的照片路径"""
try:
if not photo_path:
return False
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
redis_client.sadd(f'{self.prefix}{group_id}:processed_photos', photo_path)
return True
except Exception as e:
self.logger.error(f"保存已处理照片失败: {e}")
return False
def save_processed_photos(self, group_id: str, photo_paths: List[str]) -> bool:
"""批量保存已处理的照片路径"""
try:
if not photo_paths:
return True
# 过滤空路径
valid_paths = [path for path in photo_paths if path]
if not valid_paths:
return True
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
# 使用管道批量操作提高性能
pipeline = redis_client.pipeline()
key = f'{self.prefix}{group_id}:processed_photos'
# 每次最多添加1000个元素避免命令过长
batch_size = 1000
for i in range(0, len(valid_paths), batch_size):
batch = valid_paths[i:i+batch_size]
if batch:
pipeline.sadd(key, *batch)
pipeline.execute()
return True
except Exception as e:
self.logger.error(f"批量保存已处理照片失败: {e}")
return False
def get_processed_photos(self, group_id: str) -> Set[str]:
"""获取所有已处理的照片路径"""
photos = set()
try:
with self.get_redis_connection() as redis_client:
if not redis_client:
return photos
photo_paths = redis_client.smembers(f'{self.prefix}{group_id}:processed_photos')
for path in photo_paths:
if isinstance(path, bytes):
path = path.decode('utf-8')
photos.add(path)
return photos
except Exception as e:
self.logger.error(f"获取已处理照片失败: {e}")
return set()
def is_photo_processed(self, group_id: str, photo_path: str) -> bool:
"""检查照片是否已处理"""
try:
if not photo_path:
return False
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
return redis_client.sismember(f'{self.prefix}{group_id}:processed_photos', photo_path)
except Exception as e:
self.logger.error(f"检查照片是否已处理失败: {e}")
return False
def save_photo_mapping(self, group_id: str, kid_id: str, photo_path: str) -> bool:
"""保存照片与小朋友的映射关系"""
try:
if not photo_path or not kid_id:
return False
photo_name = os.path.basename(photo_path)
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
# 使用管道批量操作
pipeline = redis_client.pipeline()
# 保存小朋友的照片集合
pipeline.sadd(f'{self.prefix}{group_id}:kid:{kid_id}:photos', photo_path)
# 保存照片对应的小朋友ID
pipeline.set(f'{self.prefix}{group_id}:photo:{photo_name}', kid_id)
pipeline.execute()
return True
except Exception as e:
self.logger.error(f"保存照片映射关系失败: {e}")
return False
def get_kid_photos(self, group_id: str, kid_id: str) -> List[str]:
"""获取小朋友的所有照片"""
photos = []
try:
if not kid_id:
return photos
with self.get_redis_connection() as redis_client:
if not redis_client:
return photos
photo_paths = redis_client.smembers(f'{self.prefix}{group_id}:kid:{kid_id}:photos')
for path in photo_paths:
if isinstance(path, bytes):
path = path.decode('utf-8')
photos.append(path)
return photos
except Exception as e:
self.logger.error(f"获取小朋友照片失败: {e}")
return []
def get_photo_kid(self, group_id: str, photo_name: str) -> Optional[str]:
"""获取照片对应的小朋友ID"""
try:
if not photo_name:
return None
with self.get_redis_connection() as redis_client:
if not redis_client:
return None
kid_id = redis_client.get(f'{self.prefix}{group_id}:photo:{photo_name}')
if kid_id and isinstance(kid_id, bytes):
kid_id = kid_id.decode('utf-8')
return kid_id
except Exception as e:
self.logger.error(f"获取照片对应的小朋友ID失败: {e}")
return None
def save_last_process_time(self, group_id: str) -> bool:
"""保存最后处理时间"""
try:
timestamp = int(datetime.now().timestamp())
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
redis_client.set(f'{self.prefix}{group_id}:last_process_time', str(timestamp))
return True
except Exception as e:
self.logger.error(f"保存最后处理时间失败: {e}")
return False
def get_last_process_time(self, group_id: str) -> Optional[int]:
"""获取最后处理时间"""
try:
with self.get_redis_connection() as redis_client:
if not redis_client:
return None
timestamp = redis_client.get(f'{self.prefix}{group_id}:last_process_time')
if timestamp:
if isinstance(timestamp, bytes):
timestamp = timestamp.decode('utf-8')
return int(timestamp)
return None
except Exception as e:
self.logger.error(f"获取最后处理时间失败: {e}")
return None
def clear_processed_photos(self, group_id: str) -> bool:
"""清理已处理的照片记录"""
try:
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
redis_client.delete(f'{self.prefix}{group_id}:processed_photos')
return True
except Exception as e:
self.logger.error(f"清理已处理照片记录失败: {e}")
return False
def clear_analysis_data(self, group_id: str) -> bool:
"""清理分析数据"""
try:
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
# 使用管道批量操作
pipeline = redis_client.pipeline()
# 获取所有小朋友ID
kid_keys = redis_client.keys(f'{self.prefix}{group_id}:kid:*')
# 删除小朋友照片映射
if kid_keys:
pipeline.delete(*kid_keys)
# 删除照片到小朋友的映射
photo_keys = redis_client.keys(f'{self.prefix}{group_id}:photo:*')
if photo_keys:
pipeline.delete(*photo_keys)
# 删除其他数据
keys_to_delete = [
f'{self.prefix}{group_id}:last_time',
f'{self.prefix}{group_id}:results',
f'{self.prefix}{group_id}:processed_photos',
f'{self.prefix}{group_id}:last_process_time'
]
pipeline.delete(*keys_to_delete)
pipeline.execute()
return True
except Exception as e:
self.logger.error(f"清理分析数据失败: {e}")
return False

View File

@@ -436,20 +436,20 @@ class PluginManager:
display_name, plugin = self.find_plugin_by_name(name)
if not plugin:
self.LOG.debug(f"PluginManager插件 {name} 未加载")
self.LOG.info(f"PluginManager插件 {name} 未加载")
return False
if plugin.status == PluginStatus.RUNNING:
self.LOG.debug(f"PluginManager插件 {display_name} 已经在运行")
self.LOG.info(f"PluginManager插件 {display_name} 已经在运行")
return True
if plugin.start():
plugin.status = PluginStatus.RUNNING
self.LOG.debug(f"PluginManager插件 {display_name} 状态变更为在运行")
self.LOG.info(f"PluginManager插件 {display_name} 状态变更为在运行")
return True
else:
plugin.status = PluginStatus.ERROR
self.LOG.debug(f"PluginManager插件 {display_name} 状态变更为异常")
self.LOG.info(f"PluginManager插件 {display_name} 状态变更为异常")
return False
def stop_plugin(self, name: str) -> bool:

View File

@@ -0,0 +1,323 @@
# 需求分析与设计文档:群聊小朋友照片提取功能
## 1. 需求概述
开发一个基于 deepface 的人脸识别与分析功能,通过权限管理系统控制该功能在群聊中的启用状态。该功能可以自动识别群聊中分享的图片,提取包含小朋友人脸的照片,并按照人脸进行分类整理。
## 2. 功能需求
### 2.1 基本功能
- 通过群权限管理系统控制功能的开启与关闭
- 处理指定目录下的图片文件
- 分析图片中的人脸,识别小朋友的人脸
- 将包含小朋友人脸的图片进行分类存储
- 支持通过指令触发人脸分组功能
- 记录每个群的最后分析时间
- 分析完成后输出详细的分析结果统计
### 2.2 详细需求
1. **权限管理**
- 使用现有的 `GroupBotManager` 管理该功能的权限
- 通过指令启用/关闭该功能
2. **图片处理**
- 处理指定目录下的图片文件(不需要处理微信消息)
- 根据指令处理对应群聊的图片目录
- 支持常见图片格式jpg, png, jpeg等
- 图片预处理(调整大小、格式转换等)
3. **人脸识别**
- 识别图片中的所有人脸
- 判断人脸是否为小朋友(儿童)
- 提取小朋友人脸特征用于分组
4. **照片分类**
- 为每个识别出的小朋友创建独立文件夹
- 将包含同一小朋友的照片复制到对应文件夹
- 处理一张图片中多个小朋友的情况
5. **指令控制**
- 提供开始分析的指令
- 提供查看分析结果的指令
- 提供清理分析数据的指令
- 提供查询最后分析时间的指令
6. **分析记录**
- 记录每个群的最后分析时间
- 记录每次分析的统计结果(处理图片数、识别人脸数、分类结果等)
- 支持查询历史分析记录
7. **结果输出**
- 分析完成后自动输出详细的分析结果
- 包括处理图片总数、识别出的小朋友数量、每个小朋友的照片数量
- 提供分类后的文件夹路径信息
## 3. 技术方案
### 3.1 技术栈
- **deepface**:用于人脸检测、年龄估计和人脸识别
- **Python**:核心开发语言
- **Redis**:存储权限配置和人脸特征数据
- **OpenCV**:辅助图像处理
### 3.2 系统架构
```
+------------------+ +------------------+ +------------------+
| | | | | |
| 权限管理模块 |---->| 图片处理模块 |---->| 人脸识别模块 |
| (GroupBotManager)| | (ImageProcessor) | | (FaceAnalyzer) |
| | | | | |
+------------------+ +------------------+ +------------------+
|
v
+------------------+ +------------------+ +------------------+
| | | | | |
| 指令处理模块 |<----| 照片分类模块 |<----| 人脸分组模块 |
| (CommandHandler) | | (PhotoClassifier)| | (FaceGrouper) |
| | | | | |
+------------------+ +------------------+ +------------------+
```
### 3.3 模块设计
#### 3.3.1 权限管理模块
- 使用现有的 `GroupBotManager`
- 添加 `KID_PHOTO_EXTRACT` 功能权限(已存在)
- 提供权限检查方法
#### 3.3.2 图片处理模块
- 扫描指定目录下的图片
- 图片预处理(调整大小、格式转换等)
- 支持批量处理多张图片
#### 3.3.3 人脸识别模块
- 使用 deepface 进行人脸检测
- 年龄估计,识别小朋友(例如年龄 < 14 岁)
- 提取人脸特征向量
#### 3.3.4 人脸分组模块
- 基于人脸特征向量进行聚类
- 为每个聚类分配唯一 ID
- 维护人脸 ID 与照片的映射关系
#### 3.3.5 照片分类模块
- 创建分类目录结构
- 将照片复制到对应的分类目录
- 处理一张照片中多个小朋友的情况
#### 3.3.6 指令处理模块
- 解析用户指令
- 触发相应的功能模块
- 返回操作结果消息
## 4. 数据结构设计
### 4.1 Redis 数据结构
1. **权限配置**
```
group:{group_id}:permissions
```
- 哈希表,存储群组的功能权限配置
- 包含 KID_PHOTO_EXTRACT 的启用状态
2. **人脸特征数据**
```
group:{group_id}:kid_faces
```
- 哈希表,存储识别出的小朋友人脸特征
- 键为人脸 ID值为特征向量的序列化数据
3. **照片映射**
```
group:{group_id}:kid_photos:{face_id}
```
- 集合,存储包含特定小朋友的照片路径
4. **分析记录**
```
group:{group_id}:photo_analysis:last_time
```
- 字符串,存储最后一次分析的时间戳
5. **分析结果**
```
group:{group_id}:photo_analysis:results
```
- 列表存储历史分析结果的JSON数据
- 每次分析后将结果添加到列表头部
### 4.2 文件目录结构
```
/群聊文件目录/
├── 原始图片/ # 已下载好的图片目录
│ ├── image1.jpg
│ ├── image2.png
│ └── ...
└── kid_photos/ # 分析结果目录
├── kid_1/
│ ├── photo1.jpg
│ └── photo2.jpg
├── kid_2/
│ ├── photo1.jpg
│ └── photo3.jpg
└── analysis_report.json # 分析报告文件
```
## 5. 接口设计
### 5.1 插件接口
```python
class KidPhotoExtractor(PluginInterface):
def __init__(self, system_context):
super().__init__("kid_photo_extract", "小朋友照片提取", system_context)
self.face_analyzer = FaceAnalyzer()
self.photo_classifier = PhotoClassifier()
def handle_command(self, command, message):
# 处理用户指令
pass
def analyze_photos(self, group_id, source_dir):
# 分析指定目录下的照片
pass
def generate_report(self, group_id, analysis_result):
# 生成分析报告
pass
```
### 5.2 指令接口
1. **开始分析**
```
#开始分析照片 [可选:目录路径]
```
- 触发对指定目录下图片的分析
- 如果不指定目录,则使用默认的群聊文件目录
2. **查看结果**
```
#查看照片分析
```
- 返回最近一次分析结果统计信息
3. **清理数据**
```
#清理照片分析
```
- 清理分析数据和分类目录
4. **查询分析时间**
```
#照片分析时间
```
- 返回最后一次分析的时间信息
## 6. 实现流程
### 6.1 权限管理流程
1. 用户发送权限设置指令
2. 系统验证用户权限
3. 更新群组的 KID_PHOTO_EXTRACT 权限状态
4. 返回设置结果
### 6.2 照片分析流程
1. 用户发送开始分析指令(可指定目录)
2. 系统验证群组权限
3. 扫描指定目录中的图片
4. 对每张图片进行人脸检测
5. 识别小朋友人脸并提取特征
6. 对人脸特征进行聚类分组
7. 创建分类目录结构
8. 将照片复制到对应分类目录
9. 生成分析报告并保存
10. 记录分析时间和结果
11. 返回分析完成消息和结果统计
### 6.3 分析结果输出流程
1. 分析完成后自动生成结果报告
2. 报告包含以下内容:
- 分析时间
- 处理图片总数
- 识别出的小朋友总数
- 每个小朋友的照片数量
- 分类后的文件夹路径
3. 将报告以文本形式发送到群聊
4. 同时保存报告为JSON文件
## 7. 安全与隐私考虑
1. **数据安全**
- 图片处理完成后及时清理临时文件
- 人脸特征数据加密存储
2. **隐私保护**
- 只在功能启用的群组中处理图片
- 提供清理数据的指令
- 不上传或分享用户照片到外部服务
- 分析结果仅在群内可见
3. **权限控制**
- 只有群管理员可以启用/关闭该功能
- 只有授权用户可以触发分析指令
## 8. 性能优化
1. **图片处理优化**
- 限制处理图片的大小
- 使用多线程并行处理多张图片
2. **人脸识别优化**
- 缓存已识别的人脸特征
- 使用轻量级模型进行初步筛选
3. **存储优化**
- 定期清理长时间未访问的数据
- 对大量数据进行分片存储
## 9. 开发计划
### 9.1 阶段一:基础功能实现
- 实现权限管理集成
- 实现基本的人脸检测和年龄估计
- 实现简单的照片分类功能
### 9.2 阶段二:功能完善
- 实现人脸聚类和分组
- 优化小朋友识别准确率
- 完善指令系统
### 9.3 阶段三:性能优化与测试
- 性能优化
- 全面测试
- 文档完善
## 10. 测试计划
### 10.1 单元测试
- 测试人脸检测准确率
- 测试年龄估计准确率
- 测试人脸聚类效果
### 10.2 集成测试
- 测试与权限系统的集成
- 测试与消息系统的集成
- 测试与文件系统的集成
### 10.3 性能测试
- 测试大量图片处理性能
- 测试多人脸图片处理性能
- 测试系统资源占用情况
## 11. 总结
本功能通过 deepface 技术实现群聊中小朋友照片的自动识别与分类,帮助用户更好地管理和整理群聊中分享的照片。该功能与现有的权限管理系统无缝集成,用户可以通过简单的指令控制功能的启用和使用。
实现该功能需要解决人脸检测、年龄估计、人脸聚类等技术挑战,同时需要考虑数据安全和隐私保护问题。通过分阶段开发和全面测试,确保功能的稳定性和可靠性。

View File

@@ -0,0 +1,7 @@
# 从当前包的main模块导入KidPhotoExtractorPlugin类
from .main import KidPhotoExtractorPlugin
# 提供get_plugin函数返回插件实例
def get_plugin():
"""获取插件实例"""
return KidPhotoExtractorPlugin()

View File

@@ -0,0 +1,10 @@
[KidPhotoExtractor]
enable = true
command = ["开始分析照片", "查看照片分析", "清理照片分析", "照片分析时间"]
command-format = """
📷小朋友照片提取指令:
#开始分析照片 [可选:全量] [可选:目录路径] - 开始分析指定目录下的照片(默认增量分析)
#查看照片分析 - 查看最近一次分析结果
#清理照片分析 - 清理分析数据
#照片分析时间 - 查询最后分析时间
"""

View File

@@ -0,0 +1,815 @@
import os
import time
import json
import logging
import shutil
import datetime
from typing import Dict, Any, List, Tuple, Optional
import threading
import traceback
from wcferry import Wcf
from plugin_common.plugin_interface import PluginStatus
try:
import numpy as np
import cv2
from deepface import DeepFace
from sklearn.cluster import DBSCAN
except ImportError as e:
raise ImportError(f"缺少必要的依赖库: {e}。请安装 requirements.txt 中的依赖: pip install -r requirements.txt")
from plugin_common.message_plugin_interface import MessagePluginInterface
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.plugin_decorators import plugin_stats_decorator
from db.kid_photo_redis import KidPhotoRedisDB
class FaceAnalyzer:
"""人脸分析器,负责检测和分析人脸"""
def __init__(self, kid_age_threshold=14):
self.kid_age_threshold = kid_age_threshold
self.logger = logging.getLogger("Plugin.KidPhotoExtractor.FaceAnalyzer")
def detect_faces(self, image_path):
"""检测图片中的所有人脸"""
try:
# 检查文件是否存在
if not os.path.exists(image_path):
self.logger.error(f"图片文件不存在: {image_path}")
return []
# 检查文件是否可读
try:
img = cv2.imread(image_path)
if img is None:
self.logger.error(f"无法读取图片: {image_path}")
return []
except Exception as e:
self.logger.error(f"读取图片失败: {image_path}, 错误: {e}")
return []
faces = DeepFace.extract_faces(img_path=image_path, enforce_detection=False)
return faces
except Exception as e:
self.logger.error(f"人脸检测失败: {image_path}, 错误: {e}")
return []
# 在 FaceAnalyzer 类的 analyze_face 方法中,需要增强错误处理
def analyze_face(self, image_path, face_area=None):
"""分析人脸,获取年龄和特征向量"""
try:
# 检查文件是否存在
if not os.path.exists(image_path):
self.logger.error(f"图片文件不存在: {image_path}")
return None
# 检查文件是否可读
try:
img = cv2.imread(image_path)
if img is None:
self.logger.error(f"无法读取图片: {image_path}")
return None
except Exception as e:
self.logger.error(f"读取图片失败: {image_path}, 错误: {e}")
return None
# 分析人脸属性
analysis = DeepFace.analyze(img_path=image_path,
actions=['age', 'gender'],
enforce_detection=False,
region=face_area)
# 提取人脸特征向量用于后续比对
embedding = DeepFace.represent(img_path=image_path,
model_name='Facenet',
enforce_detection=False,
region=face_area)
return {
'age': analysis[0]['age'],
'gender': analysis[0]['gender'],
'is_kid': analysis[0]['age'] < self.kid_age_threshold,
'embedding': embedding
}
except IndexError as e:
self.logger.error(f"人脸分析结果索引错误: {image_path}, 错误: {e}")
return None
except Exception as e:
self.logger.error(f"人脸分析失败: {image_path}, 错误: {e}")
return None
def is_kid(self, face_info):
"""判断是否为小朋友"""
if not face_info:
return False
return face_info.get('is_kid', False)
class FaceGrouper:
"""人脸分组器,负责对人脸进行聚类分组"""
def __init__(self, eps=0.5, min_samples=3):
self.eps = eps # DBSCAN的邻域半径
self.min_samples = min_samples # 形成核心点所需的最小样本数
self.logger = logging.getLogger("Plugin.KidPhotoExtractor.FaceGrouper")
# 在 FaceGrouper 类的 cluster_faces 方法中,优化性能和错误处理
def cluster_faces(self, face_embeddings):
"""对人脸特征向量进行聚类"""
if not face_embeddings:
return []
if len(face_embeddings) < 2:
# 如果只有一个人脸,直接返回
return [0] * len(face_embeddings)
try:
# 将特征向量转换为numpy数组
embeddings_array = np.array(face_embeddings)
# 检查数据有效性
if np.isnan(embeddings_array).any() or np.isinf(embeddings_array).any():
self.logger.error("特征向量包含无效值(NaN或Inf)")
# 清理无效值
embeddings_array = np.nan_to_num(embeddings_array)
# 使用DBSCAN进行聚类
clustering = DBSCAN(eps=self.eps, min_samples=self.min_samples, metric='euclidean').fit(embeddings_array)
# 获取聚类标签
labels = clustering.labels_
# 处理噪声点(标签为-1的点
# 将噪声点分配到最近的聚类
noise_indices = np.where(labels == -1)[0]
if len(noise_indices) > 0 and len(set(labels) - {-1}) > 0:
for idx in noise_indices:
# 计算该点到所有非噪声点的距离
distances = []
for cluster_id in set(labels) - {-1}:
cluster_points = embeddings_array[labels == cluster_id]
if len(cluster_points) > 0:
# 计算到该聚类所有点的平均距离
dist = np.mean([np.linalg.norm(embeddings_array[idx] - point) for point in cluster_points])
distances.append((cluster_id, dist))
# 分配到最近的聚类
if distances:
nearest_cluster = min(distances, key=lambda x: x[1])[0]
labels[idx] = nearest_cluster
return labels.tolist()
except MemoryError as e:
self.logger.error(f"聚类过程内存不足: {e}")
return [0] * len(face_embeddings) # 失败时,将所有人脸分到同一组
except Exception as e:
self.logger.error(f"人脸聚类失败: {e}")
self.logger.error(traceback.format_exc())
return [0] * len(face_embeddings) # 失败时,将所有人脸分到同一组
class PhotoClassifier:
"""照片分类器,负责创建分类目录并复制照片"""
def __init__(self):
self.logger = logging.getLogger("Plugin.KidPhotoExtractor.PhotoClassifier")
def create_kid_folder(self, base_dir, kid_id):
"""创建小朋友的文件夹"""
kid_folder = os.path.join(base_dir, f"kid_{kid_id}")
os.makedirs(kid_folder, exist_ok=True)
return kid_folder
def copy_photo(self, src_path, dest_folder, new_name=None):
"""复制照片到目标文件夹"""
try:
if not os.path.exists(src_path):
self.logger.error(f"源文件不存在: {src_path}")
return False
if new_name:
dest_path = os.path.join(dest_folder, new_name)
else:
dest_path = os.path.join(dest_folder, os.path.basename(src_path))
# 如果目标文件已存在,添加时间戳避免重名
if os.path.exists(dest_path):
name, ext = os.path.splitext(os.path.basename(src_path))
timestamp = int(time.time())
dest_path = os.path.join(dest_folder, f"{name}_{timestamp}{ext}")
shutil.copy2(src_path, dest_path)
return True
except Exception as e:
self.logger.error(f"复制照片失败: {e}")
return False
def save_analysis_report(self, output_dir, report_data):
"""保存分析报告"""
try:
report_path = os.path.join(output_dir, "analysis_report.json")
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report_data, f, ensure_ascii=False, indent=2)
return report_path
except Exception as e:
self.logger.error(f"保存分析报告失败: {e}")
return None
class KidPhotoExtractorPlugin(MessagePluginInterface):
"""小朋友照片提取插件"""
@property
def name(self) -> str:
return "小朋友照片提取"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供小朋友照片提取和分类功能,基于人脸识别技术"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "#" # 使用#作为命令前缀
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self._commands = []
self.face_analyzer = None
self.face_grouper = None
self.photo_classifier = None
self.analysis_tasks = {} # 存储分析任务状态
self.db_manager = None
self.kid_photo_db = None
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf = context.get("wcf")
self.event_system = context.get("event_system")
self.message_util = context.get("message_util")
self.db_manager = context.get("db_manager")
# 初始化数据库
if self.db_manager:
self.kid_photo_db = KidPhotoRedisDB(self.db_manager)
else:
self.LOG.warning("数据库管理器未提供将无法使用Redis功能")
# 初始化配置
self._commands = self._config.get("KidPhotoExtractor", {}).get("command",
["开始分析照片", "查看照片分析", "清理照片分析",
"照片分析时间"])
self.command_format = self._config.get("KidPhotoExtractor", {}).get("command-format",
"使用 #开始分析照片 [目录路径] 开始分析")
self.enable = self._config.get("KidPhotoExtractor", {}).get("enable", True)
# 初始化组件
self.face_analyzer = FaceAnalyzer()
self.face_grouper = FaceGrouper()
self.photo_classifier = PhotoClassifier()
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
# 检查是否以命令前缀开头
if not content.startswith(self.command_prefix):
return False
# 去掉前缀后检查命令
command_text = content[len(self.command_prefix):].strip()
command = command_text.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="小朋友照片提取")
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
# 去掉前缀
command_text = content[len(self.command_prefix):].strip()
command_parts = command_text.split(" ")
command = command_parts[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf: Wcf = message.get("wcf")
gbm: GroupBotManager = message.get("gbm")
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.KID_PHOTO_EXTRACT) == PermissionStatus.DISABLED:
return False, "没有权限"
# 根据命令分发处理
if command == "开始分析照片":
return self._handle_start_analysis(command_parts, wcf, sender, roomid, gbm)
elif command == "查看照片分析":
return self._handle_view_analysis(wcf, sender, roomid, gbm)
elif command == "清理照片分析":
return self._handle_clean_analysis(wcf, sender, roomid, gbm)
elif command == "照片分析时间":
return self._handle_analysis_time(wcf, sender, roomid, gbm)
else:
wcf.send_text(f"❌未知命令!\n{self.command_format}",
(roomid if roomid else sender), sender)
return True, "未知命令"
# 在 _handle_start_analysis 方法中,增加输入验证和错误处理
def _handle_start_analysis(self, command_parts, wcf, sender, roomid, gbm):
"""处理开始分析命令"""
target = roomid if roomid else sender
# 检查是否已有分析任务在进行
group_key = roomid or sender
if group_key in self.analysis_tasks and self.analysis_tasks[group_key].get("running", False):
wcf.send_text("⚠️已有分析任务正在进行,请等待完成后再试", target, sender)
return True, "任务已在进行"
# 判断是否为全量分析
is_full = False
if len(command_parts) > 1 and command_parts[1].lower() == "全量":
is_full = True
command_parts.pop(1) # 移除"全量"参数
# 获取目录路径
source_dir = None
if len(command_parts) > 1:
source_dir = " ".join(command_parts[1:])
# 验证路径安全性
if not self._is_safe_path(source_dir):
wcf.send_text("⚠️指定的路径不安全或包含非法字符", target, sender)
return True, "路径不安全"
else:
# 使用默认目录
if roomid:
# 群聊默认目录 - 使用与message_to_db.py相同的图片存储结构
image_base_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static",
"images")
source_dir = os.path.join(image_base_dir, roomid)
else:
# 暂不支持私聊
wcf.send_text("⚠️当前版本仅支持群聊图片分析", target, sender)
return True, "不支持私聊"
# 检查目录是否存在
if not os.path.exists(source_dir):
wcf.send_text(f"❌目录不存在: {source_dir}", target, sender)
return True, "目录不存在"
# 检查目录是否有图片文件
has_images = False
for root, _, files in os.walk(source_dir):
for file in files:
if file.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp')):
has_images = True
break
if has_images:
break
if not has_images:
wcf.send_text(f"❌目录中没有图片文件: {source_dir}", target, sender)
return True, "没有图片文件"
# 创建输出目录
output_dir = os.path.join(os.path.dirname(source_dir), f"kid_photos_{roomid}")
try:
os.makedirs(output_dir, exist_ok=True)
except Exception as e:
wcf.send_text(f"❌创建输出目录失败: {str(e)}", target, sender)
return True, "创建目录失败"
# 启动分析任务
analysis_type = "全量" if is_full else "增量"
wcf.send_text(f"✅开始{analysis_type}分析照片,源目录: {source_dir}\n分析结果将保存到: {output_dir}", target,
sender)
# 记录任务状态
self.analysis_tasks[group_key] = {
"running": True,
"start_time": time.time(),
"source_dir": source_dir,
"output_dir": output_dir,
"is_full": is_full
}
# 在后台线程中执行分析
thread = threading.Thread(
target=self._run_analysis_task,
args=(group_key, source_dir, output_dir, wcf, target, sender)
)
thread.daemon = True
thread.start()
return True, f"开始{analysis_type}分析"
def _is_safe_path(self, path):
"""检查路径是否安全"""
# 检查路径是否包含可疑字符
suspicious_chars = ['..', '~', '`', '$', '|', ';', '&', '*', '>', '<', '"', "'"]
for char in suspicious_chars:
if char in path:
return False
# 检查路径是否为绝对路径
if os.path.isabs(path):
# 检查是否在允许的目录范围内
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
normalized_path = os.path.normpath(path)
if not normalized_path.startswith(base_dir):
return False
return True
def _run_analysis_task(self, group_key, source_dir, output_dir, wcf, target, sender):
"""在后台运行分析任务"""
start_time = time.time()
try:
is_full = self.analysis_tasks[group_key].get("is_full", False)
self.LOG.info(f"开始{'全量' if is_full else '增量'}分析任务: {source_dir}")
wcf.send_text("🔍正在分析照片,请稍候...", target, sender)
# 分析结果
result = {
"start_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"total_images": 0,
"processed_images": 0,
"total_faces": 0,
"kid_faces": 0,
"kid_groups": 0,
"kids": {},
"is_full": is_full
}
# 获取所有图片文件
image_files = []
for root, _, files in os.walk(source_dir):
for file in files:
if file.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp')):
image_files.append(os.path.join(root, file))
result["total_images"] = len(image_files)
if result["total_images"] == 0:
wcf.send_text("⚠️未找到任何图片文件", target, sender)
self.analysis_tasks[group_key]["running"] = False
return
# 如果是增量分析,获取已处理的照片和最后处理时间
processed_photos = set()
last_process_time = None
if not is_full and self.kid_photo_db:
processed_photos = self.kid_photo_db.get_processed_photos(group_key)
last_process_time = self.kid_photo_db.get_last_process_time(group_key)
if last_process_time:
wcf.send_text(
f"📊上次处理时间: {datetime.datetime.fromtimestamp(last_process_time).strftime('%Y-%m-%d %H:%M:%S')}\n已处理照片数: {len(processed_photos)}",
target, sender)
else:
wcf.send_text("⚠️未找到上次处理记录,将执行首次完整分析", target, sender)
# 筛选需要处理的图片
if not is_full and processed_photos:
# 只处理未处理过的文件或上次处理后修改的文件
filtered_image_files = []
for img_path in image_files:
# 如果文件不在已处理列表中,或者文件的修改时间晚于上次处理时间
if img_path not in processed_photos or (
last_process_time and os.path.getmtime(img_path) > last_process_time):
filtered_image_files.append(img_path)
image_files = filtered_image_files
wcf.send_text(f"📊本次需要处理的新增/修改照片数: {len(image_files)}", target, sender)
if len(image_files) == 0:
wcf.send_text("✅没有新增或修改的照片,无需分析", target, sender)
self.analysis_tasks[group_key]["running"] = False
return
# 进度更新
last_progress_time = time.time()
progress_interval = 5 # 每5秒更新一次进度
# 处理每张图片
kid_faces = [] # 存储所有小朋友的人脸特征
kid_face_images = [] # 存储对应的图片路径
kid_face_regions = [] # 存储人脸区域
# 记录本次处理的照片
newly_processed_photos = []
# 批量处理,避免内存溢出
batch_size = 50 # 每批处理的图片数量
total_batches = (len(image_files) + batch_size - 1) // batch_size
for batch_idx in range(total_batches):
start_idx = batch_idx * batch_size
end_idx = min((batch_idx + 1) * batch_size, len(image_files))
batch_images = image_files[start_idx:end_idx]
for i, image_path in enumerate(batch_images):
overall_idx = start_idx + i
try:
# 检测图片中的人脸
faces = self.face_analyzer.detect_faces(image_path)
for face in faces:
# 分析人脸
face_region = face.get('facial_area', None)
face_info = self.face_analyzer.analyze_face(image_path, face_region)
if face_info and self.face_analyzer.is_kid(face_info):
# 是小朋友的人脸
kid_faces.append(face_info['embedding'])
kid_face_images.append(image_path)
kid_face_regions.append(face_region)
result["kid_faces"] += 1
result["total_faces"] += 1
result["processed_images"] += 1
newly_processed_photos.append(image_path)
# 更新进度
current_time = time.time()
if current_time - last_progress_time > progress_interval:
progress = (overall_idx + 1) / len(image_files) * 100
wcf.send_text(f"📊分析进度: {progress:.1f}% ({overall_idx + 1}/{len(image_files)})", target, sender)
last_progress_time = current_time
except Exception as e:
self.LOG.error(f"处理图片失败: {image_path}, 错误: {e}")
continue
# 每批处理完成后保存进度,避免全部失败
if newly_processed_photos and self.kid_photo_db:
self.kid_photo_db.save_processed_photos(group_key, newly_processed_photos)
newly_processed_photos = [] # 清空已保存的记录
# 强制垃圾回收,释放内存
import gc
gc.collect()
# 保存最后一批已处理的照片记录
if newly_processed_photos and self.kid_photo_db:
self.kid_photo_db.save_processed_photos(group_key, newly_processed_photos)
# 更新最后处理时间
self.kid_photo_db.save_last_process_time(group_key)
# 如果没有找到小朋友人脸
if not kid_faces:
wcf.send_text("⚠️未检测到任何小朋友的人脸", target, sender)
self.analysis_tasks[group_key]["running"] = False
# 保存分析结果
result["end_time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
result["duration"] = time.time() - start_time
self._save_analysis_result(group_key, result)
return
# 对小朋友人脸进行聚类分组
wcf.send_text("🧩正在对小朋友人脸进行分组...", target, sender)
cluster_labels = self.face_grouper.cluster_faces(kid_faces)
# 统计每个分组的人脸数量
kid_groups = {}
for i, label in enumerate(cluster_labels):
if label not in kid_groups:
kid_groups[label] = []
kid_groups[label].append((kid_face_images[i], kid_face_regions[i]))
result["kid_groups"] = len(kid_groups)
# 为每个小朋友创建文件夹并复制照片
wcf.send_text(f"📁正在创建分类文件夹,共有{len(kid_groups)}个小朋友...", target, sender)
for kid_id, faces in kid_groups.items():
# 创建小朋友文件夹
kid_folder = self.photo_classifier.create_kid_folder(output_dir, kid_id)
# 复制照片
copied_photos = []
for image_path, _ in faces:
if self.photo_classifier.copy_photo(image_path, kid_folder):
copied_photos.append(os.path.basename(image_path))
# 保存照片映射关系
if self.kid_photo_db:
self.kid_photo_db.save_photo_mapping(group_key, f"kid_{kid_id}", image_path)
# 记录结果
result["kids"][f"kid_{kid_id}"] = {
"photo_count": len(copied_photos),
"photos": copied_photos
}
# 保存分析报告
report_path = self.photo_classifier.save_analysis_report(output_dir, result)
# 完成分析
result["end_time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
result["duration"] = time.time() - start_time
# 保存分析结果
self._save_analysis_result(group_key, result)
# 发送分析结果
summary = self._generate_analysis_summary(result, output_dir)
wcf.send_text(summary, target, sender)
except MemoryError as e:
self.LOG.error(f"分析任务内存不足: {e}")
wcf.send_text(f"❌分析过程中内存不足,请减少照片数量或分批处理", target, sender)
except Exception as e:
self.LOG.error(f"分析任务出错: {e}")
self.LOG.error(traceback.format_exc())
wcf.send_text(f"❌分析过程中出错: {str(e)}", target, sender)
finally:
# 标记任务完成
self.analysis_tasks[group_key]["running"] = False
# 强制垃圾回收
import gc
gc.collect()
def _save_analysis_result(self, group_key, result):
"""保存分析结果到Redis"""
try:
if self.kid_photo_db:
# 保存最后分析时间和分析结果
self.kid_photo_db.save_last_analysis_time(group_key)
self.kid_photo_db.save_analysis_result(group_key, result)
self.LOG.info(f"已保存分析结果: {group_key}")
else:
self.LOG.error("数据库未初始化")
except Exception as e:
self.LOG.error(f"保存分析结果失败: {e}")
def _get_last_analysis_result(self, group_key):
"""获取最后一次分析结果"""
try:
if self.kid_photo_db:
return self.kid_photo_db.get_last_analysis_result(group_key)
else:
self.LOG.error("数据库未初始化")
return None
except Exception as e:
self.LOG.error(f"获取分析结果失败: {e}")
return None
def _get_last_analysis_time(self, group_key):
"""获取最后一次分析时间"""
try:
if self.kid_photo_db:
return self.kid_photo_db.get_last_analysis_time(group_key)
else:
self.LOG.error("数据库未初始化")
return None
except Exception as e:
self.LOG.error(f"获取分析时间失败: {e}")
return None
def _handle_clean_analysis(self, wcf, sender, roomid, gbm):
"""处理清理分析数据命令"""
target = roomid if roomid else sender
group_key = roomid or sender
# 检查是否有分析任务在进行
if group_key in self.analysis_tasks and self.analysis_tasks[group_key].get("running", False):
wcf.send_text("⚠️当前有分析任务正在进行,无法清理数据", target, sender)
return True, "任务进行中"
# 清理数据
if self.kid_photo_db:
if self.kid_photo_db.clear_analysis_data(group_key):
wcf.send_text("✅已清理所有照片分析数据", target, sender)
return True, "清理成功"
else:
wcf.send_text("❌清理数据失败", target, sender)
return True, "清理失败"
else:
wcf.send_text("⚠️数据库未初始化,无法清理数据", target, sender)
return True, "数据库未初始化"
def _handle_view_analysis(self, wcf, sender, roomid, gbm):
"""处理查看分析结果命令"""
target = roomid if roomid else sender
group_key = roomid or sender
# 获取最近一次分析结果
result = self._get_last_analysis_result(group_key)
if not result:
wcf.send_text("⚠️未找到分析结果,请先执行照片分析", target, sender)
return True, "无分析结果"
# 生成分析摘要
output_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"static", "images", f"kid_photos_{roomid}")
summary = self._generate_analysis_summary(result, output_dir)
wcf.send_text(summary, target, sender)
return True, "查看分析结果"
def _handle_analysis_time(self, wcf, sender, roomid, gbm):
"""处理查询分析时间命令"""
target = roomid if roomid else sender
group_key = roomid or sender
# 获取最后分析时间
last_time = self._get_last_analysis_time(group_key)
last_process_time = None
if self.kid_photo_db:
last_process_time = self.kid_photo_db.get_last_process_time(group_key)
if not last_time and not last_process_time:
wcf.send_text("⚠️未找到分析记录,请先执行照片分析", target, sender)
return True, "无分析记录"
# 生成时间信息
time_info = "📊照片分析时间信息:\n"
if last_time:
time_info += f"最后分析时间: {datetime.datetime.fromtimestamp(last_time).strftime('%Y-%m-%d %H:%M:%S')}\n"
if last_process_time:
time_info += f"最后处理时间: {datetime.datetime.fromtimestamp(last_process_time).strftime('%Y-%m-%d %H:%M:%S')}"
wcf.send_text(time_info, target, sender)
return True, "查询分析时间"
def _generate_analysis_summary(self, result, output_dir):
"""生成分析结果摘要"""
summary = "📊小朋友照片分析结果:\n\n"
# 基本信息
summary += f"📷 总照片数: {result.get('total_images', 0)}\n"
summary += f"👤 处理照片数: {result.get('processed_images', 0)}\n"
summary += f"😊 检测到的人脸: {result.get('total_faces', 0)}\n"
summary += f"👶 小朋友人脸: {result.get('kid_faces', 0)}\n"
summary += f"👪 小朋友分组: {result.get('kid_groups', 0)}\n\n"
# 小朋友分组信息
kids = result.get('kids', {})
if kids:
summary += "🧒 小朋友照片统计:\n"
for kid_id, kid_info in kids.items():
summary += f" - {kid_id}: {kid_info.get('photo_count', 0)}张照片\n"
# 分析时间
start_time = result.get('start_time', '')
end_time = result.get('end_time', '')
duration = result.get('duration', 0)
summary += f"\n⏱️ 开始时间: {start_time}\n"
summary += f"⏱️ 结束时间: {end_time}\n"
summary += f"⏱️ 耗时: {duration:.2f}\n"
# 输出目录
summary += f"\n📁 照片已保存到: {output_dir}\n"
# 分析类型
is_full = result.get('is_full', False)
summary += f"📝 分析类型: {'全量' if is_full else '增量'}"
return summary

View File

@@ -42,3 +42,7 @@ PyAutoGUI~=0.9.54
psutil~=6.1.1
numpy~=1.26.4
pywin32==306
opencv-python~=4.11.0.86
deepface~=0.0.93
scikit-learn>=1.0.2

View File

@@ -409,7 +409,8 @@ class Robot(Job):
"wcf": self.wcf, # 提供wcf对象让插件可以直接发送消息
"message_util": self.message_util, # 提供消息工具类
"gbm": self.gbm, # 每次从程序变量中取,保证最新
"all_contacts": self.allContacts
"all_contacts": self.allContacts,
"full_wx_msg": msg
}
# 检查插件是否可以处理该消息
@@ -490,6 +491,7 @@ class Robot(Job):
self.send_text_msg(output, r)
except Exception as e:
self.LOG.error(f"SendRanking error{e}")
#
# # 设置定时任务
# def game_auto_tasks(self):

View File

@@ -181,10 +181,10 @@ class MessageStorage:
if result['success']:
# 修改日志输出,包含消息内容
compressed = result['content'].replace('\n', '').replace('\r', '')
logger.info(f"消息存档成功: {result['roomid']}:{result['sender']}: {compressed}")
logger.info(f"archive_success: {result['roomid']}:{result['sender']}: {compressed}")
else:
error_msg = result.get('error', '未知错误')
logger.error(f"消息存档失败: {result['roomid']}:{result['sender']} - {error_msg}")
logger.error(f"archive_fail: {result['roomid']}:{result['sender']} - {error_msg}")
except Exception as e:
logger.error(f"处理存档回调时出错: {e}")