From 7be1811c1c55cb2908c162d8eeeaacfe588cf1c4 Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 15 Apr 2025 17:14:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=20=E4=BA=BA=E5=83=8F?= =?UTF-8?q?=E8=AF=86=E5=88=AB=E6=8F=92=E4=BB=B6=20=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=EF=BC=8C=E4=BD=BF=E7=94=A8=E5=9C=BA=E6=99=AF=E6=AF=94=E8=BE=83?= =?UTF-8?q?=E5=BC=B1=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugins/kid_photo_extractor/README.md | 327 -------- plugins/kid_photo_extractor/__init__.py | 7 - plugins/kid_photo_extractor/config.toml | 10 - plugins/kid_photo_extractor/main.py | 971 ------------------------ 4 files changed, 1315 deletions(-) delete mode 100644 plugins/kid_photo_extractor/README.md delete mode 100644 plugins/kid_photo_extractor/__init__.py delete mode 100644 plugins/kid_photo_extractor/config.toml delete mode 100644 plugins/kid_photo_extractor/main.py diff --git a/plugins/kid_photo_extractor/README.md b/plugins/kid_photo_extractor/README.md deleted file mode 100644 index 7d7f70f..0000000 --- a/plugins/kid_photo_extractor/README.md +++ /dev/null @@ -1,327 +0,0 @@ -# 需求分析与设计文档:群聊小朋友照片提取功能 - -## 1. 需求概述 - -开发一个基于 deepface 的人脸识别与分析功能,通过权限管理系统控制该功能在群聊中的启用状态。该功能可以自动识别群聊中分享的图片,提取包含小朋友人脸的照片,并按照人脸进行分类整理。 - -## 2. 功能需求 - -### 2.1 基本功能 -- 通过群权限管理系统控制功能的开启与关闭 -- 处理指定目录下的图片文件 -- 分析图片中的人脸,识别小朋友的人脸 -- 将包含小朋友人脸的图片进行分类存储 -- 支持通过指令触发人脸分组功能 -- 记录每个群的最后分析时间 -- 分析完成后输出详细的分析结果统计 - -### 2.2 详细需求 -1. **权限管理**: - - 使用现有的 `GroupBotManager` 管理该功能的权限 - - 通过指令启用/关闭该功能 - -2. **图片处理**: - - 处理指定目录下的图片文件(不需要处理微信消息) - - 根据指令处理对应群聊的图片目录 - - 支持常见图片格式(jpg, png, jpeg等) - - 图片预处理(调整大小、格式转换等) - -3. **人脸识别**: - - 识别图片中的所有人脸 - - 判断人脸是否为小朋友(儿童) - - 提取小朋友人脸特征用于分组 - -4. **照片分类**: - - 为每个识别出的小朋友创建独立文件夹 - - 将包含同一小朋友的照片复制到对应文件夹 - - 处理一张图片中多个小朋友的情况 - -5. **指令控制**: - - 提供开始分析的指令 - - 提供查看分析结果的指令 - - 提供清理分析数据的指令 - - 提供查询最后分析时间的指令 - -6. **分析记录**: - - 记录每个群的最后分析时间 - - 记录每次分析的统计结果(处理图片数、识别人脸数、分类结果等) - - 支持查询历史分析记录 - -7. **结果输出**: - - 分析完成后自动输出详细的分析结果 - - 包括处理图片总数、识别出的小朋友数量、每个小朋友的照片数量 - - 提供分类后的文件夹路径信息 - -## 3. 技术方案 - -### 3.1 技术栈 -- **deepface**:用于人脸检测、年龄估计和人脸识别 -- **Python**:核心开发语言 -- **Redis**:存储权限配置和人脸特征数据 -- **OpenCV**:辅助图像处理 - -### 3.2 系统架构 - -``` -+------------------+ +------------------+ +------------------+ -| | | | | | -| 权限管理模块 |---->| 图片处理模块 |---->| 人脸识别模块 | -| (GroupBotManager)| | (ImageProcessor) | | (FaceAnalyzer) | -| | | | | | -+------------------+ +------------------+ +------------------+ - | - v -+------------------+ +------------------+ +------------------+ -| | | | | | -| 指令处理模块 |<----| 照片分类模块 |<----| 人脸分组模块 | -| (CommandHandler) | | (PhotoClassifier)| | (FaceGrouper) | -| | | | | | -+------------------+ +------------------+ +------------------+ -``` - -### 3.3 模块设计 - -#### 3.3.1 权限管理模块 -- 使用现有的 `GroupBotManager` 类 -- 添加 `KID_PHOTO_EXTRACT` 功能权限(已存在) -- 提供权限检查方法 - -#### 3.3.2 图片处理模块 -- 扫描指定目录下的图片 -- 图片预处理(调整大小、格式转换等) -- 支持批量处理多张图片 - -#### 3.3.3 人脸识别模块 -- 使用 deepface 进行人脸检测 -- 年龄估计,识别小朋友(例如年龄 < 14 岁) -- 提取人脸特征向量 - -#### 3.3.4 人脸分组模块 -- 基于人脸特征向量进行聚类 -- 为每个聚类分配唯一 ID -- 维护人脸 ID 与照片的映射关系 - -#### 3.3.5 照片分类模块 -- 创建分类目录结构 -- 将照片复制到对应的分类目录 -- 处理一张照片中多个小朋友的情况 - -#### 3.3.6 指令处理模块 -- 解析用户指令 -- 触发相应的功能模块 -- 返回操作结果消息 - -## 4. 数据结构设计 - -### 4.1 Redis 数据结构 - -1. **权限配置**: - ``` - group:{group_id}:permissions - ``` - - 哈希表,存储群组的功能权限配置 - - 包含 KID_PHOTO_EXTRACT 的启用状态 - -2. **人脸特征数据**: - ``` - group:{group_id}:kid_faces - ``` - - 哈希表,存储识别出的小朋友人脸特征 - - 键为人脸 ID,值为特征向量的序列化数据 - -3. **照片映射**: - ``` - group:{group_id}:kid_photos:{face_id} - ``` - - 集合,存储包含特定小朋友的照片路径 - -4. **分析记录**: - ``` - group:{group_id}:photo_analysis:last_time - ``` - - 字符串,存储最后一次分析的时间戳 - -5. **分析结果**: - ``` - group:{group_id}:photo_analysis:results - ``` - - 列表,存储历史分析结果的JSON数据 - - 每次分析后将结果添加到列表头部 - -### 4.2 文件目录结构 - -``` -/群聊文件目录/ - ├── 原始图片/ # 已下载好的图片目录 - │ ├── image1.jpg - │ ├── image2.png - │ └── ... - └── kid_photos/ # 分析结果目录 - ├── kid_1/ - │ ├── photo1.jpg - │ └── photo2.jpg - ├── kid_2/ - │ ├── photo1.jpg - │ └── photo3.jpg - └── analysis_report.json # 分析报告文件 -``` - -## 5. 接口设计 - -### 5.1 插件接口 - -```python -class KidPhotoExtractor(PluginInterface): - def __init__(self, system_context): - super().__init__("kid_photo_extract", "小朋友照片提取", system_context) - self.face_analyzer = FaceAnalyzer() - self.photo_classifier = PhotoClassifier() - - def handle_command(self, command, message): - # 处理用户指令 - pass - - def analyze_photos(self, group_id, source_dir): - # 分析指定目录下的照片 - pass - - def generate_report(self, group_id, analysis_result): - # 生成分析报告 - pass -``` - -### 5.2 指令接口 - -1. **开始分析**: - ``` - #开始分析照片 [可选:目录路径] - ``` - - 触发对指定目录下图片的分析 - - 如果不指定目录,则使用默认的群聊文件目录 - -2. **查看结果**: - ``` - #查看照片分析 - ``` - - 返回最近一次分析结果统计信息 - -3. **清理数据**: - ``` - #清理照片分析 - ``` - - 清理分析数据和分类目录 - -4. **查询分析时间**: - ``` - #照片分析时间 - ``` - - 返回最后一次分析的时间信息 - -## 6. 实现流程 - -### 6.1 权限管理流程 - -1. 用户发送权限设置指令 -2. 系统验证用户权限 -3. 更新群组的 KID_PHOTO_EXTRACT 权限状态 -4. 返回设置结果 - -### 6.2 照片分析流程 - -1. 用户发送开始分析指令(可指定目录) -2. 系统验证群组权限 -3. 扫描指定目录中的图片 -4. 对每张图片进行人脸检测 -5. 识别小朋友人脸并提取特征 -6. 对人脸特征进行聚类分组 -7. 创建分类目录结构 -8. 将照片复制到对应分类目录 -9. 生成分析报告并保存 -10. 记录分析时间和结果 -11. 返回分析完成消息和结果统计 - -### 6.3 分析结果输出流程 - -1. 分析完成后自动生成结果报告 -2. 报告包含以下内容: - - 分析时间 - - 处理图片总数 - - 识别出的小朋友总数 - - 每个小朋友的照片数量 - - 分类后的文件夹路径 -3. 将报告以文本形式发送到群聊 -4. 同时保存报告为JSON文件 - -## 7. 安全与隐私考虑 - -1. **数据安全**: - - 图片处理完成后及时清理临时文件 - - 人脸特征数据加密存储 - -2. **隐私保护**: - - 只在功能启用的群组中处理图片 - - 提供清理数据的指令 - - 不上传或分享用户照片到外部服务 - - 分析结果仅在群内可见 - -3. **权限控制**: - - 只有群管理员可以启用/关闭该功能 - - 只有授权用户可以触发分析指令 - -## 8. 性能优化 - -1. **图片处理优化**: - - 限制处理图片的大小 - - 使用多线程并行处理多张图片 - -2. **人脸识别优化**: - - 缓存已识别的人脸特征 - - 使用轻量级模型进行初步筛选 - -3. **存储优化**: - - 定期清理长时间未访问的数据 - - 对大量数据进行分片存储 - -## 9. 开发计划 - -### 9.1 阶段一:基础功能实现 -- 实现权限管理集成 -- 实现基本的人脸检测和年龄估计 -- 实现简单的照片分类功能 - -### 9.2 阶段二:功能完善 -- 实现人脸聚类和分组 -- 优化小朋友识别准确率 -- 完善指令系统 - -### 9.3 阶段三:性能优化与测试 -- 性能优化 -- 全面测试 -- 文档完善 - -## 10. 测试计划 - -### 10.1 单元测试 -- 测试人脸检测准确率 -- 测试年龄估计准确率 -- 测试人脸聚类效果 - -### 10.2 集成测试 -- 测试与权限系统的集成 -- 测试与消息系统的集成 -- 测试与文件系统的集成 - -### 10.3 性能测试 -- 测试大量图片处理性能 -- 测试多人脸图片处理性能 -- 测试系统资源占用情况 - -## 11. 总结 - -本功能通过 deepface 技术实现群聊中小朋友照片的自动识别与分类,帮助用户更好地管理和整理群聊中分享的照片。该功能与现有的权限管理系统无缝集成,用户可以通过简单的指令控制功能的启用和使用。 - -实现该功能需要解决人脸检测、年龄估计、人脸聚类等技术挑战,同时需要考虑数据安全和隐私保护问题。通过分阶段开发和全面测试,确保功能的稳定性和可靠性。 - - -接下来的内容: -1.需要支持人脸人工打标,打标之后,记录到数据库里面,作为分析的对比源头。 \ No newline at end of file diff --git a/plugins/kid_photo_extractor/__init__.py b/plugins/kid_photo_extractor/__init__.py deleted file mode 100644 index 0d2af41..0000000 --- a/plugins/kid_photo_extractor/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -# 从当前包的main模块导入KidPhotoExtractorPlugin类 -from .main import KidPhotoExtractorPlugin - -# 提供get_plugin函数,返回插件实例 -def get_plugin(): - """获取插件实例""" - return KidPhotoExtractorPlugin() \ No newline at end of file diff --git a/plugins/kid_photo_extractor/config.toml b/plugins/kid_photo_extractor/config.toml deleted file mode 100644 index b504626..0000000 --- a/plugins/kid_photo_extractor/config.toml +++ /dev/null @@ -1,10 +0,0 @@ -[KidPhotoExtractor] -enable = false -command = ["开始分析照片", "查看照片分析", "清理照片分析", "照片分析时间"] -command-format = """ -📷小朋友照片提取指令: -#开始分析照片 [可选:全量] [可选:目录路径] - 开始分析指定目录下的照片(默认增量分析) -#查看照片分析 - 查看最近一次分析结果 -#清理照片分析 - 清理分析数据 -#照片分析时间 - 查询最后分析时间 -""" \ No newline at end of file diff --git a/plugins/kid_photo_extractor/main.py b/plugins/kid_photo_extractor/main.py deleted file mode 100644 index e02229b..0000000 --- a/plugins/kid_photo_extractor/main.py +++ /dev/null @@ -1,971 +0,0 @@ -import os -import time -import json -import logging -import shutil -import datetime -from typing import Dict, Any, List, Tuple, Optional -import threading -import traceback - -from wcferry import Wcf - -from db.connection import DBConnectionManager -from plugin_common.plugin_interface import PluginStatus - -try: - import numpy as np - import cv2 - from deepface import DeepFace - from sklearn.cluster import DBSCAN -except ImportError as e: - raise ImportError(f"缺少必要的依赖库: {e}。请安装 requirements.txt 中的依赖: pip install -r requirements.txt") - -from plugin_common.message_plugin_interface import MessagePluginInterface -from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager -from utils.decorator.plugin_decorators import plugin_stats_decorator -from db.kid_photo_redis import KidPhotoRedisDB - - -class FaceAnalyzer: - """人脸分析器,负责检测和分析人脸""" - - def __init__(self, kid_age_threshold=14): - # 保留参数但不再使用 - self.kid_age_threshold = kid_age_threshold - self.logger = logging.getLogger("Plugin.KidPhotoExtractor.FaceAnalyzer") - - def detect_faces(self, image_path): - """检测图片中的所有人脸""" - try: - # 检查文件是否存在 - if not os.path.exists(image_path): - self.logger.error(f"图片文件不存在: {image_path}") - return [] - - # 检查文件是否可读 - try: - img = cv2.imread(image_path) - if img is None: - self.logger.error(f"无法读取图片: {image_path}") - return [] - - # 打印图片信息以便调试 - self.logger.info(f"图片尺寸: {img.shape}, 类型: {img.dtype}") - except Exception as e: - self.logger.error(f"读取图片失败: {image_path}, 错误: {e}") - return [] - - # 使用更精确的人脸检测参数,移除不支持的threshold参数 - faces = DeepFace.extract_faces( - img_path=image_path, - enforce_detection=False, - detector_backend='retinaface', # 使用更精确的RetinaFace检测器 - align=True - ) - - # 过滤掉可能的误检 - valid_faces = [] - for face in faces: - # 检查置信度,过滤掉低置信度的检测结果 - confidence = face.get('confidence', 0) - if confidence < 0.8: # 提高置信度阈值到0.8 - self.logger.info(f"过滤掉低置信度人脸: {confidence}") - continue - - # 检查人脸区域的大小,过滤掉太小的区域(可能是图标) - if 'facial_area' in face: - area = face['facial_area'] - face_width = area['w'] - face_height = area['h'] - - # 过滤条件:人脸必须足够大(通常图标较小) - min_face_size = 60 # 增加最小人脸尺寸(像素) - if face_width > min_face_size and face_height > min_face_size: - # 检查人脸宽高比,过滤掉不合理的比例 - aspect_ratio = face_width / face_height - if 0.7 <= aspect_ratio <= 1.5: # 缩小人脸宽高比范围,更接近真实人脸 - # 计算人脸区域占图片的比例,过滤掉太小的区域 - img_height, img_width = img.shape[:2] - face_area_ratio = (face_width * face_height) / (img_width * img_height) - if 0.01 <= face_area_ratio <= 0.9: # 人脸区域应该在合理范围内 - valid_faces.append(face) - else: - self.logger.info(f"过滤掉不合理区域比例的人脸: {face_area_ratio}") - else: - self.logger.info(f"过滤掉不合理宽高比的人脸: {aspect_ratio}") - else: - self.logger.info(f"过滤掉过小的人脸: {face_width}x{face_height}") - - # 记录检测到的人脸数量 - self.logger.info(f"在图片 {image_path} 中检测到 {len(faces)} 个人脸,有效人脸 {len(valid_faces)} 个") - - return valid_faces - except Exception as e: - self.logger.error(f"人脸检测失败: {image_path}, 错误: {e}") - self.logger.error(traceback.format_exc()) # 打印完整的错误堆栈 - return [] - - def analyze_face(self, image_path, face_area=None): - """分析人脸,获取特征向量""" - temp_path = None - try: - # 检查文件是否存在 - if not os.path.exists(image_path): - self.logger.error(f"图片文件不存在: {image_path}") - return None, None - - # 检查文件是否可读 - try: - img = cv2.imread(image_path) - if img is None: - self.logger.error(f"无法读取图片: {image_path}") - return None, None - - # 如果指定了人脸区域,裁剪图片 - if face_area: - x, y, w, h = face_area['x'], face_area['y'], face_area['w'], face_area['h'] - img = img[y:y+h, x:x+w] - # 保存临时裁剪图片 - temp_path = f"{image_path}.temp.jpg" - cv2.imwrite(temp_path, img) - - # 使用人脸检测来验证这是否是一张人脸,而不是使用verify - try: - # 使用DeepFace的detect_face函数检测是否包含人脸 - detection = DeepFace.extract_faces( - img_path=temp_path, - enforce_detection=True, # 强制检测 - detector_backend='retinaface' - ) - - # 如果没有检测到人脸,则返回None - if not detection or len(detection) == 0: - self.logger.info(f"二次验证失败,可能不是真实人脸: {image_path}") - if os.path.exists(temp_path): - os.remove(temp_path) - return None, None - except Exception as e: - # 如果强制检测失败,说明可能不是人脸 - self.logger.warning(f"人脸二次验证失败: {e}") - if os.path.exists(temp_path): - os.remove(temp_path) - return None, None - - image_path = temp_path - - except Exception as e: - self.logger.error(f"读取图片失败: {image_path}, 错误: {e}") - return None, None - - # 提取人脸特征向量用于后续比对 - embedding_result = DeepFace.represent( - img_path=image_path, - model_name='ArcFace', # 使用ArcFace模型,对不同年龄段人脸效果更好 - enforce_detection=True, # 改为True,确保是人脸 - detector_backend='retinaface' # 使用更精确的检测器 - ) - - # 处理embedding结果,确保它是一个数值数组 - embedding = None - if isinstance(embedding_result, list) and len(embedding_result) > 0: - if isinstance(embedding_result[0], dict) and 'embedding' in embedding_result[0]: - embedding = embedding_result[0]['embedding'] - else: - embedding = embedding_result[0] - elif isinstance(embedding_result, dict) and 'embedding' in embedding_result: - embedding = embedding_result['embedding'] - else: - embedding = embedding_result - - # 确保embedding是数值列表 - if embedding is not None: - try: - # 尝试转换为浮点数列表 - embedding = [float(x) for x in embedding] - except (TypeError, ValueError): - self.logger.error(f"无法将嵌入向量转换为浮点数列表: {image_path}") - return None, temp_path - - self.logger.info(f"成功提取人脸特征向量: {image_path}") - - # 不再进行年龄判断,直接返回特征向量和临时文件路径 - return { - 'embedding': embedding, - 'is_kid': True # 默认所有人脸都处理 - }, temp_path - - except Exception as e: - self.logger.error(f"人脸分析失败: {image_path}, 错误: {e}") - self.logger.error(traceback.format_exc()) # 打印完整的错误堆栈 - return None, temp_path - - def is_kid(self, face_info): - """判断是否为小朋友 - 现在总是返回True""" - if not face_info: - return False - return True # 所有人脸都视为需要处理 - - -class FaceGrouper: - """人脸分组器,负责对人脸进行聚类分组""" - - def __init__(self, eps=0.4, min_samples=4): # 减小eps值,增加min_samples值 - self.eps = eps # DBSCAN的邻域半径 - self.min_samples = min_samples # 形成核心点所需的最小样本数 - self.logger = logging.getLogger("Plugin.KidPhotoExtractor.FaceGrouper") - - def cluster_faces(self, face_embeddings): - """对人脸特征向量进行聚类""" - if not face_embeddings: - return [] - - if len(face_embeddings) < 2: - # 如果只有一个人脸,直接返回 - return [0] * len(face_embeddings) - - try: - # 将特征向量转换为numpy数组,确保是浮点数类型 - # 首先提取实际的嵌入向量数据 - processed_embeddings = [] - for emb in face_embeddings: - # DeepFace.represent() 可能返回字典或列表,需要提取实际的向量 - if isinstance(emb, dict) and 'embedding' in emb: - processed_embeddings.append(emb['embedding']) - elif isinstance(emb, list) and len(emb) > 0: - # 如果是列表,取第一个元素 - if isinstance(emb[0], dict) and 'embedding' in emb[0]: - processed_embeddings.append(emb[0]['embedding']) - else: - processed_embeddings.append(emb) - else: - processed_embeddings.append(emb) - - # 转换为numpy数组并确保是浮点数类型 - embeddings_array = np.array(processed_embeddings, dtype=np.float64) - - # 安全地检查无效值 - try: - has_nan = np.isnan(embeddings_array).any() - has_inf = np.isinf(embeddings_array).any() - if has_nan or has_inf: - self.logger.error("特征向量包含无效值(NaN或Inf)") - # 清理无效值 - embeddings_array = np.nan_to_num(embeddings_array) - except TypeError: - # 如果仍然无法检查NaN/Inf,记录警告并继续 - self.logger.warning("无法检查特征向量中的无效值,将直接进行聚类") - - # 使用DBSCAN进行聚类 - clustering = DBSCAN(eps=self.eps, min_samples=self.min_samples, metric='euclidean').fit(embeddings_array) - - # 获取聚类标签 - labels = clustering.labels_ - - # 处理噪声点(标签为-1的点) - # 将噪声点分配到最近的聚类 - noise_indices = np.where(labels == -1)[0] - if len(noise_indices) > 0 and len(set(labels) - {-1}) > 0: - for idx in noise_indices: - # 计算该点到所有非噪声点的距离 - distances = [] - for cluster_id in set(labels) - {-1}: - cluster_points = embeddings_array[labels == cluster_id] - if len(cluster_points) > 0: - # 计算到该聚类所有点的平均距离 - dist = np.mean([np.linalg.norm(embeddings_array[idx] - point) for point in cluster_points]) - distances.append((cluster_id, dist)) - - # 分配到最近的聚类 - if distances: - nearest_cluster = min(distances, key=lambda x: x[1])[0] - labels[idx] = nearest_cluster - - return labels.tolist() - except MemoryError as e: - self.logger.error(f"聚类过程内存不足: {e}") - return [0] * len(face_embeddings) # 失败时,将所有人脸分到同一组 - except Exception as e: - self.logger.error(f"人脸聚类失败: {e}") - self.logger.error(traceback.format_exc()) - return [0] * len(face_embeddings) # 失败时,将所有人脸分到同一组 - - -class PhotoClassifier: - """照片分类器,负责创建分类目录并复制照片""" - - def __init__(self): - self.logger = logging.getLogger("Plugin.KidPhotoExtractor.PhotoClassifier") - - def create_kid_folder(self, base_dir, kid_id): - """创建小朋友的文件夹""" - kid_folder = os.path.join(base_dir, f"kid_{kid_id}") - os.makedirs(kid_folder, exist_ok=True) - return kid_folder - - def copy_photo(self, src_path, dest_folder, new_name=None): - """复制照片到目标文件夹""" - try: - if not os.path.exists(src_path): - self.logger.error(f"源文件不存在: {src_path}") - return False - - if new_name: - dest_path = os.path.join(dest_folder, new_name) - else: - dest_path = os.path.join(dest_folder, os.path.basename(src_path)) - - # 如果目标文件已存在,添加时间戳避免重名 - if os.path.exists(dest_path): - name, ext = os.path.splitext(os.path.basename(src_path)) - timestamp = int(time.time()) - dest_path = os.path.join(dest_folder, f"{name}_{timestamp}{ext}") - - shutil.copy2(src_path, dest_path) - return True - except Exception as e: - self.logger.error(f"复制照片失败: {e}") - return False - - def save_analysis_report(self, output_dir, report_data): - """保存分析报告""" - try: - report_path = os.path.join(output_dir, "analysis_report.json") - with open(report_path, 'w', encoding='utf-8') as f: - json.dump(report_data, f, ensure_ascii=False, indent=2) - return report_path - except Exception as e: - self.logger.error(f"保存分析报告失败: {e}") - return None - - -class KidPhotoExtractorPlugin(MessagePluginInterface): - """小朋友照片提取插件""" - - @property - def name(self) -> str: - return "小朋友照片提取" - - @property - def version(self) -> str: - return "0.0.1" - - @property - def description(self) -> str: - return "未完成-提供小朋友照片提取和分类功能,基于人脸识别技术" - - @property - def author(self) -> str: - return "Trae AI" - - @property - def command_prefix(self) -> Optional[str]: - return "#" # 使用#作为命令前缀 - - @property - def commands(self) -> List[str]: - return self._commands - - def __init__(self): - super().__init__() - self._commands = [] - self.face_analyzer = None - self.face_grouper = None - self.photo_classifier = None - self.analysis_tasks = {} # 存储分析任务状态 - self.db_manager = None - self.kid_photo_db = None - - def initialize(self, context: Dict[str, Any]) -> bool: - """初始化插件""" - self.LOG = logging.getLogger(f"Plugin.{self.name}") - self.LOG.info(f"正在初始化 {self.name} 插件...") - - # 保存上下文对象 - self.wcf = context.get("wcf") - self.event_system = context.get("event_system") - self.message_util = context.get("message_util") - self.db_manager = DBConnectionManager.get_instance() - - # 初始化数据库 - if self.db_manager: - self.kid_photo_db = KidPhotoRedisDB(self.db_manager) - else: - self.LOG.warning("数据库管理器未提供,将无法使用Redis功能") - - # 初始化配置 - self._commands = self._config.get("KidPhotoExtractor", {}).get("command", - ["开始分析照片", "查看照片分析", "清理照片分析", - "照片分析时间"]) - self.command_format = self._config.get("KidPhotoExtractor", {}).get("command-format", - "使用 #开始分析照片 [目录路径] 开始分析") - self.enable = self._config.get("KidPhotoExtractor", {}).get("enable", True) - # 只在插件启用时初始化数据库和其他组件 TODO,现在过于复杂,暂时不启用 - if not self.enable: - self.LOG.info(f"[{self.name}] 插件已禁用,跳过组件初始化") - return True - # 初始化组件 - self.face_analyzer = FaceAnalyzer() - self.face_grouper = FaceGrouper() - self.photo_classifier = PhotoClassifier() - - self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") - return True - - def start(self) -> bool: - """启动插件""" - if not self.enable: - self.LOG.info(f"[{self.name}] 插件已启动") - self.status = PluginStatus.RUNNING - return False - return True - - def stop(self) -> bool: - """停止插件""" - self.LOG.info(f"[{self.name}] 插件已停止") - self.status = PluginStatus.STOPPED - return True - - def can_process(self, message: Dict[str, Any]) -> bool: - """检查是否可以处理该消息""" - if not self.enable: - return False - - content = str(message.get("content", "")).strip() - - # 检查是否以命令前缀开头 - if not content.startswith(self.command_prefix): - return False - - # 去掉前缀后检查命令 - command_text = content[len(self.command_prefix):].strip() - command = command_text.split(" ")[0] - - return command in self._commands - - @plugin_stats_decorator(plugin_name="小朋友照片提取") - def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: - """处理消息""" - content = str(message.get("content", "")).strip() - self.LOG.info(f"插件执行: {self.name}:{content}") - - # 去掉前缀 - command_text = content[len(self.command_prefix):].strip() - command_parts = command_text.split(" ") - command = command_parts[0] - - sender = message.get("sender") - roomid = message.get("roomid", "") - wcf: Wcf = message.get("wcf") - gbm: GroupBotManager = message.get("gbm") - - # 检查权限 - if roomid and gbm.get_group_permission(roomid, Feature.KID_PHOTO_EXTRACT) == PermissionStatus.DISABLED: - return False, "没有权限" - - # 根据命令分发处理 - if command == "开始分析照片": - return self._handle_start_analysis(command_parts, wcf, sender, roomid, gbm) - elif command == "查看照片分析": - return self._handle_view_analysis(wcf, sender, roomid, gbm) - elif command == "清理照片分析": - return self._handle_clean_analysis(wcf, sender, roomid, gbm) - elif command == "照片分析时间": - return self._handle_analysis_time(wcf, sender, roomid, gbm) - else: - wcf.send_text(f"❌未知命令!\n{self.command_format}", - (roomid if roomid else sender), sender) - return True, "未知命令" - - # 在 _handle_start_analysis 方法中,增加输入验证和错误处理 - def _handle_start_analysis(self, command_parts, wcf, sender, roomid, gbm): - """处理开始分析命令""" - target = roomid if roomid else sender - - # 检查是否已有分析任务在进行 - group_key = roomid or sender - if group_key in self.analysis_tasks and self.analysis_tasks[group_key].get("running", False): - wcf.send_text("⚠️已有分析任务正在进行,请等待完成后再试", target, sender) - return True, "任务已在进行" - - # 判断是否为全量分析 - is_full = False - if len(command_parts) > 1 and command_parts[1].lower() == "全量": - is_full = True - command_parts.pop(1) # 移除"全量"参数 - - # 获取目录路径 - source_dir = None - if len(command_parts) > 1: - source_dir = " ".join(command_parts[1:]) - # 验证路径安全性 - if not self._is_safe_path(source_dir): - wcf.send_text("⚠️指定的路径不安全或包含非法字符", target, sender) - return True, "路径不安全" - else: - # 使用默认目录 - if roomid: - # 群聊默认目录 - 使用与message_to_db.py相同的图片存储结构 - image_base_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static", - "images") - source_dir = os.path.join(image_base_dir, roomid) - else: - # 暂不支持私聊 - wcf.send_text("⚠️当前版本仅支持群聊图片分析", target, sender) - return True, "不支持私聊" - - # 检查目录是否存在 - if not os.path.exists(source_dir): - wcf.send_text(f"❌目录不存在: {source_dir}", target, sender) - return True, "目录不存在" - - # 检查目录是否有图片文件 - has_images = False - for root, _, files in os.walk(source_dir): - for file in files: - if file.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp')): - has_images = True - break - if has_images: - break - - if not has_images: - wcf.send_text(f"❌目录中没有图片文件: {source_dir}", target, sender) - return True, "没有图片文件" - - # 创建输出目录 - output_dir = os.path.join(os.path.dirname(source_dir), f"kid_photos_{roomid}") - try: - os.makedirs(output_dir, exist_ok=True) - except Exception as e: - wcf.send_text(f"❌创建输出目录失败: {str(e)}", target, sender) - return True, "创建目录失败" - - # 启动分析任务 - analysis_type = "全量" if is_full else "增量" - wcf.send_text(f"✅开始{analysis_type}分析照片,源目录: {source_dir}\n分析结果将保存到: {output_dir}", target, - sender) - - # 记录任务状态 - self.analysis_tasks[group_key] = { - "running": True, - "start_time": time.time(), - "source_dir": source_dir, - "output_dir": output_dir, - "is_full": is_full - } - - # 在后台线程中执行分析 - thread = threading.Thread( - target=self._run_analysis_task, - args=(group_key, source_dir, output_dir, wcf, target, sender) - ) - thread.daemon = True - thread.start() - - return True, f"开始{analysis_type}分析" - - def _is_safe_path(self, path): - """检查路径是否安全""" - # 检查路径是否包含可疑字符 - suspicious_chars = ['..', '~', '`', '$', '|', ';', '&', '*', '>', '<', '"', "'"] - for char in suspicious_chars: - if char in path: - return False - - # 检查路径是否为绝对路径 - if os.path.isabs(path): - # 检查是否在允许的目录范围内 - base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) - normalized_path = os.path.normpath(path) - if not normalized_path.startswith(base_dir): - return False - - return True - - def _run_analysis_task(self, group_key, source_dir, output_dir, wcf, target, sender): - """在后台运行分析任务""" - start_time = time.time() - temp_files = [] # 用于跟踪所有创建的临时文件 - - try: - is_full = self.analysis_tasks[group_key].get("is_full", False) - self.LOG.info(f"开始{'全量' if is_full else '增量'}分析任务: {source_dir}") - wcf.send_text("🔍正在分析照片,请稍候...", target, sender) - - # 分析结果 - result = { - "start_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "total_images": 0, - "processed_images": 0, - "total_faces": 0, - "face_groups": 0, # 改名,不再是kid_faces - "groups": 0, # 改名,不再是kid_groups - "persons": {}, # 改名,不再是kids - "is_full": is_full - } - - # 获取所有图片文件 - image_files = [] - for root, _, files in os.walk(source_dir): - for file in files: - if file.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp')): - image_files.append(os.path.join(root, file)) - - result["total_images"] = len(image_files) - - if result["total_images"] == 0: - wcf.send_text("⚠️未找到任何图片文件", target, sender) - self.analysis_tasks[group_key]["running"] = False - return - - # 如果是增量分析,获取已处理的照片和最后处理时间 - processed_photos = set() - last_process_time = None - - if not is_full and self.kid_photo_db: - processed_photos = self.kid_photo_db.get_processed_photos(group_key) - last_process_time = self.kid_photo_db.get_last_process_time(group_key) - - if last_process_time: - wcf.send_text( - f"📊上次处理时间: {datetime.datetime.fromtimestamp(last_process_time).strftime('%Y-%m-%d %H:%M:%S')}\n已处理照片数: {len(processed_photos)}", - target, sender) - else: - wcf.send_text("⚠️未找到上次处理记录,将执行首次完整分析", target, sender) - - # 筛选需要处理的图片 - if not is_full and processed_photos: - # 只处理未处理过的文件或上次处理后修改的文件 - filtered_image_files = [] - for img_path in image_files: - # 如果文件不在已处理列表中,或者文件的修改时间晚于上次处理时间 - if img_path not in processed_photos or ( - last_process_time and os.path.getmtime(img_path) > last_process_time): - filtered_image_files.append(img_path) - - image_files = filtered_image_files - wcf.send_text(f"📊本次需要处理的新增/修改照片数: {len(image_files)}", target, sender) - - if len(image_files) == 0: - wcf.send_text("✅没有新增或修改的照片,无需分析", target, sender) - self.analysis_tasks[group_key]["running"] = False - return - - # 进度更新 - last_progress_time = time.time() - progress_interval = 10 # 每5秒更新一次进度 - - # 处理每张图片 - all_faces = [] # 存储所有的人脸特征,不再区分小朋友 - face_images = [] # 存储对应的图片路径 - face_regions = [] # 存储人脸区域 - - # 记录本次处理的照片 - newly_processed_photos = [] - - # 批量处理,避免内存溢出 - batch_size = 50 # 每批处理的图片数量 - total_batches = (len(image_files) + batch_size - 1) // batch_size - - for batch_idx in range(total_batches): - start_idx = batch_idx * batch_size - end_idx = min((batch_idx + 1) * batch_size, len(image_files)) - batch_images = image_files[start_idx:end_idx] - - for i, image_path in enumerate(batch_images): - overall_idx = start_idx + i - try: - # 检测图片中的人脸 - faces = self.face_analyzer.detect_faces(image_path) - - for face in faces: - # 分析人脸 - face_region = face.get('facial_area', None) - face_info, temp_file = self.face_analyzer.analyze_face(image_path, face_region) - - # 如果创建了临时文件,添加到跟踪列表 - if temp_file: - temp_files.append(temp_file) - - if face_info: - # 保存人脸特征 - all_faces.append(face_info['embedding']) - face_images.append(image_path) - face_regions.append(face_region) - result["face_groups"] += 1 # 更新计数器名称 - - result["total_faces"] += 1 - - result["processed_images"] += 1 - newly_processed_photos.append(image_path) - - # 更新进度 - current_time = time.time() - if current_time - last_progress_time > progress_interval: - progress = (overall_idx + 1) / len(image_files) * 100 - wcf.send_text(f"📊分析进度: {progress:.1f}% ({overall_idx + 1}/{len(image_files)})", target, sender) - last_progress_time = current_time - - except Exception as e: - self.LOG.error(f"处理图片失败: {image_path}, 错误: {e}") - continue - - # 每批处理完成后保存进度,避免全部失败 - if newly_processed_photos and self.kid_photo_db: - self.kid_photo_db.save_processed_photos(group_key, newly_processed_photos) - newly_processed_photos = [] # 清空已保存的记录 - - # 强制垃圾回收,释放内存 - import gc - gc.collect() - - # 保存最后一批已处理的照片记录 - if newly_processed_photos and self.kid_photo_db: - self.kid_photo_db.save_processed_photos(group_key, newly_processed_photos) - # 更新最后处理时间 - self.kid_photo_db.save_last_process_time(group_key) - - # 如果没有找到人脸 - if not all_faces: - wcf.send_text("⚠️未检测到任何人脸", target, sender) - self.analysis_tasks[group_key]["running"] = False - - # 保存分析结果 - result["end_time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - result["duration"] = time.time() - start_time - self._save_analysis_result(group_key, result) - - return - - # 对人脸进行聚类分组 - wcf.send_text("🧩正在对人脸进行分组...", target, sender) - cluster_labels = self.face_grouper.cluster_faces(all_faces) - - # 统计每个分组的人脸数量 - face_groups = {} - for i, label in enumerate(cluster_labels): - if label not in face_groups: - face_groups[label] = [] - face_groups[label].append((face_images[i], face_regions[i])) - - result["groups"] = len(face_groups) - - # 为每个人创建文件夹并复制照片 - wcf.send_text(f"📁正在创建分类文件夹,共有{len(face_groups)}个人...", target, sender) - - for person_id, faces in face_groups.items(): - # 创建人物文件夹 - person_folder = os.path.join(output_dir, f"person_{person_id}") - os.makedirs(person_folder, exist_ok=True) - - # 复制照片 - 修改为只复制原始照片,不复制人脸区域 - copied_photos = [] - processed_paths = set() # 用于跟踪已处理的照片路径,避免重复复制 - - for image_path, _ in faces: - # 跳过临时文件 - if ".temp." in image_path: - continue - - # 避免重复复制同一张照片 - if image_path in processed_paths: - continue - - processed_paths.add(image_path) - - if self.photo_classifier.copy_photo(image_path, person_folder): - copied_photos.append(os.path.basename(image_path)) - # 保存照片映射关系 - if self.kid_photo_db: - self.kid_photo_db.save_photo_mapping(group_key, f"person_{person_id}", image_path) - - # 记录结果 - result["persons"][f"person_{person_id}"] = { - "photo_count": len(copied_photos), - "photos": copied_photos - } - - # 保存分析报告 - report_path = self.photo_classifier.save_analysis_report(output_dir, result) - - # 完成分析 - result["end_time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - result["duration"] = time.time() - start_time - - # 保存分析结果 - self._save_analysis_result(group_key, result) - - # 发送分析结果 - summary = self._generate_analysis_summary(result, output_dir) - wcf.send_text(summary, target, sender) - - except MemoryError as e: - self.LOG.error(f"分析任务内存不足: {e}") - wcf.send_text(f"❌分析过程中内存不足,请减少照片数量或分批处理", target, sender) - except Exception as e: - self.LOG.error(f"分析任务出错: {e}") - self.LOG.error(traceback.format_exc()) - wcf.send_text(f"❌分析过程中出错: {str(e)}", target, sender) - finally: - # 清理所有临时文件 - self._cleanup_temp_files(temp_files) - - # 标记任务完成 - self.analysis_tasks[group_key]["running"] = False - - # 强制垃圾回收 - import gc - gc.collect() - - def _cleanup_temp_files(self, temp_files): - """清理所有临时文件""" - for temp_file in temp_files: - try: - if os.path.exists(temp_file): - os.remove(temp_file) - self.LOG.info(f"已删除临时文件: {temp_file}") - except Exception as e: - self.LOG.error(f"删除临时文件失败: {temp_file}, 错误: {e}") - - def _save_analysis_result(self, group_key, result): - """保存分析结果到Redis""" - try: - if self.kid_photo_db: - # 保存最后分析时间和分析结果 - self.kid_photo_db.save_last_analysis_time(group_key) - self.kid_photo_db.save_analysis_result(group_key, result) - self.LOG.info(f"已保存分析结果: {group_key}") - else: - self.LOG.error("数据库未初始化") - except Exception as e: - self.LOG.error(f"保存分析结果失败: {e}") - - def _get_last_analysis_result(self, group_key): - """获取最后一次分析结果""" - try: - if self.kid_photo_db: - return self.kid_photo_db.get_last_analysis_result(group_key) - else: - self.LOG.error("数据库未初始化") - return None - except Exception as e: - self.LOG.error(f"获取分析结果失败: {e}") - return None - - def _get_last_analysis_time(self, group_key): - """获取最后一次分析时间""" - try: - if self.kid_photo_db: - return self.kid_photo_db.get_last_analysis_time(group_key) - else: - self.LOG.error("数据库未初始化") - return None - except Exception as e: - self.LOG.error(f"获取分析时间失败: {e}") - return None - - def _handle_clean_analysis(self, wcf, sender, roomid, gbm): - """处理清理分析数据命令""" - target = roomid if roomid else sender - group_key = roomid or sender - - # 检查是否有分析任务在进行 - if group_key in self.analysis_tasks and self.analysis_tasks[group_key].get("running", False): - wcf.send_text("⚠️当前有分析任务正在进行,无法清理数据", target, sender) - return True, "任务进行中" - - # 清理数据 - if self.kid_photo_db: - if self.kid_photo_db.clear_analysis_data(group_key): - wcf.send_text("✅已清理所有照片分析数据", target, sender) - return True, "清理成功" - else: - wcf.send_text("❌清理数据失败", target, sender) - return True, "清理失败" - else: - wcf.send_text("⚠️数据库未初始化,无法清理数据", target, sender) - return True, "数据库未初始化" - - def _handle_view_analysis(self, wcf, sender, roomid, gbm): - """处理查看分析结果命令""" - target = roomid if roomid else sender - group_key = roomid or sender - - # 获取最近一次分析结果 - result = self._get_last_analysis_result(group_key) - - if not result: - wcf.send_text("⚠️未找到分析结果,请先执行照片分析", target, sender) - return True, "无分析结果" - - # 生成分析摘要 - output_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), - "static", "images", f"kid_photos_{roomid}") - summary = self._generate_analysis_summary(result, output_dir) - wcf.send_text(summary, target, sender) - - return True, "查看分析结果" - - def _handle_analysis_time(self, wcf, sender, roomid, gbm): - """处理查询分析时间命令""" - target = roomid if roomid else sender - group_key = roomid or sender - - # 获取最后分析时间 - last_time = self._get_last_analysis_time(group_key) - last_process_time = None - - if self.kid_photo_db: - last_process_time = self.kid_photo_db.get_last_process_time(group_key) - - if not last_time and not last_process_time: - wcf.send_text("⚠️未找到分析记录,请先执行照片分析", target, sender) - return True, "无分析记录" - - # 生成时间信息 - time_info = "📊照片分析时间信息:\n" - - if last_time: - time_info += f"最后分析时间: {datetime.datetime.fromtimestamp(last_time).strftime('%Y-%m-%d %H:%M:%S')}\n" - - if last_process_time: - time_info += f"最后处理时间: {datetime.datetime.fromtimestamp(last_process_time).strftime('%Y-%m-%d %H:%M:%S')}" - - wcf.send_text(time_info, target, sender) - return True, "查询分析时间" - - def _generate_analysis_summary(self, result, output_dir): - """生成分析结果摘要""" - summary = "📊人脸照片分析结果:\n\n" # 更改标题 - - # 基本信息 - summary += f"📷 总照片数: {result.get('total_images', 0)}\n" - summary += f"👤 处理照片数: {result.get('processed_images', 0)}\n" - summary += f"😊 检测到的人脸: {result.get('total_faces', 0)}\n" - summary += f"👪 人脸分组: {result.get('groups', 0)}\n\n" # 更新字段名 - - # 人物分组信息 - persons = result.get('persons', {}) # 更新字段名 - if persons: - summary += "🧒 人物照片统计:\n" # 更新描述 - for person_id, person_info in persons.items(): # 更新变量名 - summary += f" - {person_id}: {person_info.get('photo_count', 0)}张照片\n" - - # 分析时间 - start_time = result.get('start_time', '') - end_time = result.get('end_time', '') - duration = result.get('duration', 0) - - summary += f"\n⏱️ 开始时间: {start_time}\n" - summary += f"⏱️ 结束时间: {end_time}\n" - summary += f"⏱️ 耗时: {duration:.2f}秒\n" - - # 输出目录 - summary += f"\n📁 照片已保存到: {output_dir}\n" - - # 分析类型 - is_full = result.get('is_full', False) - summary += f"📝 分析类型: {'全量' if is_full else '增量'}" - - return summary