feature:加入功能,小朋友人脸识别与分类。
This commit is contained in:
@@ -30,6 +30,7 @@ class FaceAnalyzer:
|
||||
"""人脸分析器,负责检测和分析人脸"""
|
||||
|
||||
def __init__(self, kid_age_threshold=14):
|
||||
# 保留参数但不再使用
|
||||
self.kid_age_threshold = kid_age_threshold
|
||||
self.logger = logging.getLogger("Plugin.KidPhotoExtractor.FaceAnalyzer")
|
||||
|
||||
@@ -47,18 +48,32 @@ class FaceAnalyzer:
|
||||
if img is None:
|
||||
self.logger.error(f"无法读取图片: {image_path}")
|
||||
return []
|
||||
|
||||
# 打印图片信息以便调试
|
||||
self.logger.info(f"图片尺寸: {img.shape}, 类型: {img.dtype}")
|
||||
except Exception as e:
|
||||
self.logger.error(f"读取图片失败: {image_path}, 错误: {e}")
|
||||
return []
|
||||
|
||||
faces = DeepFace.extract_faces(img_path=image_path, enforce_detection=False)
|
||||
# 使用更宽松的人脸检测参数
|
||||
faces = DeepFace.extract_faces(
|
||||
img_path=image_path,
|
||||
enforce_detection=False,
|
||||
detector_backend='opencv', # 使用更快的opencv检测器
|
||||
align=True
|
||||
)
|
||||
|
||||
# 记录检测到的人脸数量
|
||||
self.logger.info(f"在图片 {image_path} 中检测到 {len(faces)} 个人脸")
|
||||
|
||||
return faces
|
||||
except Exception as e:
|
||||
self.logger.error(f"人脸检测失败: {image_path}, 错误: {e}")
|
||||
self.logger.error(traceback.format_exc()) # 打印完整的错误堆栈
|
||||
return []
|
||||
|
||||
def analyze_face(self, image_path, face_area=None):
|
||||
"""分析人脸,获取年龄和特征向量"""
|
||||
"""分析人脸,获取特征向量"""
|
||||
try:
|
||||
# 检查文件是否存在
|
||||
if not os.path.exists(image_path):
|
||||
@@ -85,15 +100,15 @@ class FaceAnalyzer:
|
||||
self.logger.error(f"读取图片失败: {image_path}, 错误: {e}")
|
||||
return None
|
||||
|
||||
# 分析人脸属性
|
||||
analysis = DeepFace.analyze(img_path=image_path,
|
||||
actions=['age', 'gender'],
|
||||
enforce_detection=False)
|
||||
|
||||
# 提取人脸特征向量用于后续比对
|
||||
embedding = DeepFace.represent(img_path=image_path,
|
||||
model_name='Facenet',
|
||||
enforce_detection=False)
|
||||
embedding = DeepFace.represent(
|
||||
img_path=image_path,
|
||||
model_name='Facenet',
|
||||
enforce_detection=False,
|
||||
detector_backend='opencv' # 使用更快的opencv检测器
|
||||
)
|
||||
|
||||
self.logger.info(f"成功提取人脸特征向量: {image_path}")
|
||||
|
||||
# 如果使用了临时文件,删除它
|
||||
if face_area and os.path.exists(f"{image_path}.temp.jpg"):
|
||||
@@ -102,24 +117,21 @@ class FaceAnalyzer:
|
||||
except Exception as e:
|
||||
self.logger.warning(f"删除临时文件失败: {e}")
|
||||
|
||||
# 不再进行年龄判断,直接返回特征向量
|
||||
return {
|
||||
'age': analysis[0]['age'],
|
||||
'gender': analysis[0]['gender'],
|
||||
'is_kid': analysis[0]['age'] < self.kid_age_threshold,
|
||||
'embedding': embedding
|
||||
'embedding': embedding,
|
||||
'is_kid': True # 默认所有人脸都处理
|
||||
}
|
||||
except IndexError as e:
|
||||
self.logger.error(f"人脸分析结果索引错误: {image_path}, 错误: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
self.logger.error(f"人脸分析失败: {image_path}, 错误: {e}")
|
||||
self.logger.error(traceback.format_exc()) # 打印完整的错误堆栈
|
||||
return None
|
||||
|
||||
def is_kid(self, face_info):
|
||||
"""判断是否为小朋友"""
|
||||
"""判断是否为小朋友 - 现在总是返回True"""
|
||||
if not face_info:
|
||||
return False
|
||||
return face_info.get('is_kid', False)
|
||||
return True # 所有人脸都视为需要处理
|
||||
|
||||
|
||||
class FaceGrouper:
|
||||
@@ -486,9 +498,9 @@ class KidPhotoExtractorPlugin(MessagePluginInterface):
|
||||
"total_images": 0,
|
||||
"processed_images": 0,
|
||||
"total_faces": 0,
|
||||
"kid_faces": 0,
|
||||
"kid_groups": 0,
|
||||
"kids": {},
|
||||
"face_groups": 0, # 改名,不再是kid_faces
|
||||
"groups": 0, # 改名,不再是kid_groups
|
||||
"persons": {}, # 改名,不再是kids
|
||||
"is_full": is_full
|
||||
}
|
||||
|
||||
@@ -544,9 +556,9 @@ class KidPhotoExtractorPlugin(MessagePluginInterface):
|
||||
progress_interval = 5 # 每5秒更新一次进度
|
||||
|
||||
# 处理每张图片
|
||||
kid_faces = [] # 存储所有小朋友的人脸特征
|
||||
kid_face_images = [] # 存储对应的图片路径
|
||||
kid_face_regions = [] # 存储人脸区域
|
||||
all_faces = [] # 存储所有的人脸特征,不再区分小朋友
|
||||
face_images = [] # 存储对应的图片路径
|
||||
face_regions = [] # 存储人脸区域
|
||||
|
||||
# 记录本次处理的照片
|
||||
newly_processed_photos = []
|
||||
@@ -571,12 +583,12 @@ class KidPhotoExtractorPlugin(MessagePluginInterface):
|
||||
face_region = face.get('facial_area', None)
|
||||
face_info = self.face_analyzer.analyze_face(image_path, face_region)
|
||||
|
||||
if face_info and self.face_analyzer.is_kid(face_info):
|
||||
# 是小朋友的人脸
|
||||
kid_faces.append(face_info['embedding'])
|
||||
kid_face_images.append(image_path)
|
||||
kid_face_regions.append(face_region)
|
||||
result["kid_faces"] += 1
|
||||
if face_info: # 不再判断是否为小朋友
|
||||
# 保存人脸特征
|
||||
all_faces.append(face_info['embedding'])
|
||||
face_images.append(image_path)
|
||||
face_regions.append(face_region)
|
||||
result["face_groups"] += 1 # 更新计数器名称
|
||||
|
||||
result["total_faces"] += 1
|
||||
|
||||
@@ -609,49 +621,50 @@ class KidPhotoExtractorPlugin(MessagePluginInterface):
|
||||
# 更新最后处理时间
|
||||
self.kid_photo_db.save_last_process_time(group_key)
|
||||
|
||||
# 如果没有找到小朋友人脸
|
||||
if not kid_faces:
|
||||
wcf.send_text("⚠️未检测到任何小朋友的人脸", target, sender)
|
||||
# 如果没有找到人脸
|
||||
if not all_faces:
|
||||
wcf.send_text("⚠️未检测到任何人脸", target, sender)
|
||||
self.analysis_tasks[group_key]["running"] = False
|
||||
|
||||
|
||||
# 保存分析结果
|
||||
result["end_time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
result["duration"] = time.time() - start_time
|
||||
self._save_analysis_result(group_key, result)
|
||||
|
||||
|
||||
return
|
||||
|
||||
# 对小朋友人脸进行聚类分组
|
||||
wcf.send_text("🧩正在对小朋友人脸进行分组...", target, sender)
|
||||
cluster_labels = self.face_grouper.cluster_faces(kid_faces)
|
||||
|
||||
|
||||
# 对人脸进行聚类分组
|
||||
wcf.send_text("🧩正在对人脸进行分组...", target, sender)
|
||||
cluster_labels = self.face_grouper.cluster_faces(all_faces)
|
||||
|
||||
# 统计每个分组的人脸数量
|
||||
kid_groups = {}
|
||||
face_groups = {}
|
||||
for i, label in enumerate(cluster_labels):
|
||||
if label not in kid_groups:
|
||||
kid_groups[label] = []
|
||||
kid_groups[label].append((kid_face_images[i], kid_face_regions[i]))
|
||||
if label not in face_groups:
|
||||
face_groups[label] = []
|
||||
face_groups[label].append((face_images[i], face_regions[i]))
|
||||
|
||||
result["groups"] = len(face_groups)
|
||||
|
||||
result["kid_groups"] = len(kid_groups)
|
||||
# 为每个人创建文件夹并复制照片
|
||||
wcf.send_text(f"📁正在创建分类文件夹,共有{len(face_groups)}个人...", target, sender)
|
||||
|
||||
# 为每个小朋友创建文件夹并复制照片
|
||||
wcf.send_text(f"📁正在创建分类文件夹,共有{len(kid_groups)}个小朋友...", target, sender)
|
||||
|
||||
for kid_id, faces in kid_groups.items():
|
||||
# 创建小朋友文件夹
|
||||
kid_folder = self.photo_classifier.create_kid_folder(output_dir, kid_id)
|
||||
for person_id, faces in face_groups.items():
|
||||
# 创建人物文件夹
|
||||
person_folder = os.path.join(output_dir, f"person_{person_id}")
|
||||
os.makedirs(person_folder, exist_ok=True)
|
||||
|
||||
# 复制照片
|
||||
copied_photos = []
|
||||
for image_path, _ in faces:
|
||||
if self.photo_classifier.copy_photo(image_path, kid_folder):
|
||||
if self.photo_classifier.copy_photo(image_path, person_folder):
|
||||
copied_photos.append(os.path.basename(image_path))
|
||||
# 保存照片映射关系
|
||||
if self.kid_photo_db:
|
||||
self.kid_photo_db.save_photo_mapping(group_key, f"kid_{kid_id}", image_path)
|
||||
self.kid_photo_db.save_photo_mapping(group_key, f"person_{person_id}", image_path)
|
||||
|
||||
# 记录结果
|
||||
result["kids"][f"kid_{kid_id}"] = {
|
||||
result["persons"][f"person_{person_id}"] = {
|
||||
"photo_count": len(copied_photos),
|
||||
"photos": copied_photos
|
||||
}
|
||||
@@ -794,21 +807,20 @@ class KidPhotoExtractorPlugin(MessagePluginInterface):
|
||||
|
||||
def _generate_analysis_summary(self, result, output_dir):
|
||||
"""生成分析结果摘要"""
|
||||
summary = "📊小朋友照片分析结果:\n\n"
|
||||
summary = "📊人脸照片分析结果:\n\n" # 更改标题
|
||||
|
||||
# 基本信息
|
||||
summary += f"📷 总照片数: {result.get('total_images', 0)}\n"
|
||||
summary += f"👤 处理照片数: {result.get('processed_images', 0)}\n"
|
||||
summary += f"😊 检测到的人脸: {result.get('total_faces', 0)}\n"
|
||||
summary += f"👶 小朋友人脸: {result.get('kid_faces', 0)}\n"
|
||||
summary += f"👪 小朋友分组: {result.get('kid_groups', 0)}\n\n"
|
||||
summary += f"👪 人脸分组: {result.get('groups', 0)}\n\n" # 更新字段名
|
||||
|
||||
# 小朋友分组信息
|
||||
kids = result.get('kids', {})
|
||||
if kids:
|
||||
summary += "🧒 小朋友照片统计:\n"
|
||||
for kid_id, kid_info in kids.items():
|
||||
summary += f" - {kid_id}: {kid_info.get('photo_count', 0)}张照片\n"
|
||||
# 人物分组信息
|
||||
persons = result.get('persons', {}) # 更新字段名
|
||||
if persons:
|
||||
summary += "🧒 人物照片统计:\n" # 更新描述
|
||||
for person_id, person_info in persons.items(): # 更新变量名
|
||||
summary += f" - {person_id}: {person_info.get('photo_count', 0)}张照片\n"
|
||||
|
||||
# 分析时间
|
||||
start_time = result.get('start_time', '')
|
||||
|
||||
Reference in New Issue
Block a user