305 lines
12 KiB
Python
305 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
||
from datetime import datetime
|
||
import json
|
||
import os
|
||
from loguru import logger
|
||
from typing import Dict, Optional, List, Any, Set
|
||
|
||
from db.connection import DBConnectionManager
|
||
|
||
|
||
class KidPhotoRedisDB:
|
||
"""小朋友照片提取功能Redis相关操作"""
|
||
|
||
def __init__(self, db_manager: DBConnectionManager):
|
||
self.db_manager = db_manager
|
||
self.prefix = "group:kid_photo:"
|
||
self.LOG = logger
|
||
|
||
def get_redis_connection(self):
|
||
"""获取Redis连接"""
|
||
try:
|
||
return self.db_manager.get_redis_connection()
|
||
except Exception as e:
|
||
self.LOG.error(f"获取Redis连接失败: {e}")
|
||
return None
|
||
|
||
def save_last_analysis_time(self, group_id: str) -> bool:
|
||
"""保存最后分析时间"""
|
||
try:
|
||
timestamp = int(datetime.now().timestamp())
|
||
with self.get_redis_connection() as redis_client:
|
||
if not redis_client:
|
||
return False
|
||
redis_client.set(f'{self.prefix}{group_id}:last_time', str(timestamp))
|
||
return True
|
||
except Exception as e:
|
||
self.LOG.error(f"保存最后分析时间失败: {e}")
|
||
return False
|
||
|
||
def get_last_analysis_time(self, group_id: str) -> Optional[int]:
|
||
"""获取最后分析时间"""
|
||
try:
|
||
with self.get_redis_connection() as redis_client:
|
||
if not redis_client:
|
||
return None
|
||
timestamp = redis_client.get(f'{self.prefix}{group_id}:last_time')
|
||
if timestamp:
|
||
if isinstance(timestamp, bytes):
|
||
timestamp = timestamp.decode('utf-8')
|
||
return int(timestamp)
|
||
return None
|
||
except Exception as e:
|
||
self.LOG.error(f"获取最后分析时间失败: {e}")
|
||
return None
|
||
|
||
def save_analysis_result(self, group_id: str, result: Dict) -> bool:
|
||
"""保存分析结果"""
|
||
try:
|
||
with self.get_redis_connection() as redis_client:
|
||
if not redis_client:
|
||
return False
|
||
redis_client.set(f'{self.prefix}{group_id}:results', json.dumps(result, ensure_ascii=False))
|
||
return True
|
||
except Exception as e:
|
||
self.LOG.error(f"保存分析结果失败: {e}")
|
||
return False
|
||
|
||
def get_last_analysis_result(self, group_id: str) -> Optional[Dict]:
|
||
"""获取最后一次分析结果"""
|
||
try:
|
||
with self.get_redis_connection() as redis_client:
|
||
if not redis_client:
|
||
return None
|
||
result = redis_client.get(f'{self.prefix}{group_id}:results')
|
||
if result:
|
||
if isinstance(result, bytes):
|
||
result = result.decode('utf-8')
|
||
return json.loads(result)
|
||
return None
|
||
except json.JSONDecodeError as e:
|
||
self.LOG.error(f"解析分析结果JSON失败: {e}")
|
||
return None
|
||
except Exception as e:
|
||
self.LOG.error(f"获取最后分析结果失败: {e}")
|
||
return None
|
||
|
||
def save_processed_photo(self, group_id: str, photo_path: str) -> bool:
|
||
"""保存已处理的照片路径"""
|
||
try:
|
||
if not photo_path:
|
||
return False
|
||
|
||
with self.get_redis_connection() as redis_client:
|
||
if not redis_client:
|
||
return False
|
||
redis_client.sadd(f'{self.prefix}{group_id}:processed_photos', photo_path)
|
||
return True
|
||
except Exception as e:
|
||
self.LOG.error(f"保存已处理照片失败: {e}")
|
||
return False
|
||
|
||
def save_processed_photos(self, group_id: str, photo_paths: List[str]) -> bool:
|
||
"""批量保存已处理的照片路径"""
|
||
try:
|
||
if not photo_paths:
|
||
return True
|
||
|
||
# 过滤空路径
|
||
valid_paths = [path for path in photo_paths if path]
|
||
if not valid_paths:
|
||
return True
|
||
|
||
with self.get_redis_connection() as redis_client:
|
||
if not redis_client:
|
||
return False
|
||
|
||
# 使用管道批量操作提高性能
|
||
pipeline = redis_client.pipeline()
|
||
key = f'{self.prefix}{group_id}:processed_photos'
|
||
|
||
# 每次最多添加1000个元素,避免命令过长
|
||
batch_size = 1000
|
||
for i in range(0, len(valid_paths), batch_size):
|
||
batch = valid_paths[i:i+batch_size]
|
||
if batch:
|
||
pipeline.sadd(key, *batch)
|
||
|
||
pipeline.execute()
|
||
return True
|
||
except Exception as e:
|
||
self.LOG.error(f"批量保存已处理照片失败: {e}")
|
||
return False
|
||
|
||
def get_processed_photos(self, group_id: str) -> Set[str]:
|
||
"""获取所有已处理的照片路径"""
|
||
photos = set()
|
||
try:
|
||
with self.get_redis_connection() as redis_client:
|
||
if not redis_client:
|
||
return photos
|
||
|
||
photo_paths = redis_client.smembers(f'{self.prefix}{group_id}:processed_photos')
|
||
for path in photo_paths:
|
||
if isinstance(path, bytes):
|
||
path = path.decode('utf-8')
|
||
photos.add(path)
|
||
return photos
|
||
except Exception as e:
|
||
self.LOG.error(f"获取已处理照片失败: {e}")
|
||
return set()
|
||
|
||
def is_photo_processed(self, group_id: str, photo_path: str) -> bool:
|
||
"""检查照片是否已处理"""
|
||
try:
|
||
if not photo_path:
|
||
return False
|
||
|
||
with self.get_redis_connection() as redis_client:
|
||
if not redis_client:
|
||
return False
|
||
return redis_client.sismember(f'{self.prefix}{group_id}:processed_photos', photo_path)
|
||
except Exception as e:
|
||
self.LOG.error(f"检查照片是否已处理失败: {e}")
|
||
return False
|
||
|
||
def save_photo_mapping(self, group_id: str, kid_id: str, photo_path: str) -> bool:
|
||
"""保存照片与小朋友的映射关系"""
|
||
try:
|
||
if not photo_path or not kid_id:
|
||
return False
|
||
|
||
photo_name = os.path.basename(photo_path)
|
||
with self.get_redis_connection() as redis_client:
|
||
if not redis_client:
|
||
return False
|
||
|
||
# 使用管道批量操作
|
||
pipeline = redis_client.pipeline()
|
||
# 保存小朋友的照片集合
|
||
pipeline.sadd(f'{self.prefix}{group_id}:kid:{kid_id}:photos', photo_path)
|
||
# 保存照片对应的小朋友ID
|
||
pipeline.set(f'{self.prefix}{group_id}:photo:{photo_name}', kid_id)
|
||
pipeline.execute()
|
||
return True
|
||
except Exception as e:
|
||
self.LOG.error(f"保存照片映射关系失败: {e}")
|
||
return False
|
||
|
||
def get_kid_photos(self, group_id: str, kid_id: str) -> List[str]:
|
||
"""获取小朋友的所有照片"""
|
||
photos = []
|
||
try:
|
||
if not kid_id:
|
||
return photos
|
||
|
||
with self.get_redis_connection() as redis_client:
|
||
if not redis_client:
|
||
return photos
|
||
|
||
photo_paths = redis_client.smembers(f'{self.prefix}{group_id}:kid:{kid_id}:photos')
|
||
for path in photo_paths:
|
||
if isinstance(path, bytes):
|
||
path = path.decode('utf-8')
|
||
photos.append(path)
|
||
return photos
|
||
except Exception as e:
|
||
self.LOG.error(f"获取小朋友照片失败: {e}")
|
||
return []
|
||
|
||
def get_photo_kid(self, group_id: str, photo_name: str) -> Optional[str]:
|
||
"""获取照片对应的小朋友ID"""
|
||
try:
|
||
if not photo_name:
|
||
return None
|
||
|
||
with self.get_redis_connection() as redis_client:
|
||
if not redis_client:
|
||
return None
|
||
|
||
kid_id = redis_client.get(f'{self.prefix}{group_id}:photo:{photo_name}')
|
||
if kid_id and isinstance(kid_id, bytes):
|
||
kid_id = kid_id.decode('utf-8')
|
||
return kid_id
|
||
except Exception as e:
|
||
self.LOG.error(f"获取照片对应的小朋友ID失败: {e}")
|
||
return None
|
||
|
||
def save_last_process_time(self, group_id: str) -> bool:
|
||
"""保存最后处理时间"""
|
||
try:
|
||
timestamp = int(datetime.now().timestamp())
|
||
with self.get_redis_connection() as redis_client:
|
||
if not redis_client:
|
||
return False
|
||
redis_client.set(f'{self.prefix}{group_id}:last_process_time', str(timestamp))
|
||
return True
|
||
except Exception as e:
|
||
self.LOG.error(f"保存最后处理时间失败: {e}")
|
||
return False
|
||
|
||
def get_last_process_time(self, group_id: str) -> Optional[int]:
|
||
"""获取最后处理时间"""
|
||
try:
|
||
with self.get_redis_connection() as redis_client:
|
||
if not redis_client:
|
||
return None
|
||
|
||
timestamp = redis_client.get(f'{self.prefix}{group_id}:last_process_time')
|
||
if timestamp:
|
||
if isinstance(timestamp, bytes):
|
||
timestamp = timestamp.decode('utf-8')
|
||
return int(timestamp)
|
||
return None
|
||
except Exception as e:
|
||
self.LOG.error(f"获取最后处理时间失败: {e}")
|
||
return None
|
||
|
||
def clear_processed_photos(self, group_id: str) -> bool:
|
||
"""清理已处理的照片记录"""
|
||
try:
|
||
with self.get_redis_connection() as redis_client:
|
||
if not redis_client:
|
||
return False
|
||
redis_client.delete(f'{self.prefix}{group_id}:processed_photos')
|
||
return True
|
||
except Exception as e:
|
||
self.LOG.error(f"清理已处理照片记录失败: {e}")
|
||
return False
|
||
|
||
def clear_analysis_data(self, group_id: str) -> bool:
|
||
"""清理分析数据"""
|
||
try:
|
||
with self.get_redis_connection() as redis_client:
|
||
if not redis_client:
|
||
return False
|
||
|
||
# 使用管道批量操作
|
||
pipeline = redis_client.pipeline()
|
||
|
||
# 获取所有小朋友ID
|
||
kid_keys = redis_client.keys(f'{self.prefix}{group_id}:kid:*')
|
||
|
||
# 删除小朋友照片映射
|
||
if kid_keys:
|
||
pipeline.delete(*kid_keys)
|
||
|
||
# 删除照片到小朋友的映射
|
||
photo_keys = redis_client.keys(f'{self.prefix}{group_id}:photo:*')
|
||
if photo_keys:
|
||
pipeline.delete(*photo_keys)
|
||
|
||
# 删除其他数据
|
||
keys_to_delete = [
|
||
f'{self.prefix}{group_id}:last_time',
|
||
f'{self.prefix}{group_id}:results',
|
||
f'{self.prefix}{group_id}:processed_photos',
|
||
f'{self.prefix}{group_id}:last_process_time'
|
||
]
|
||
pipeline.delete(*keys_to_delete)
|
||
|
||
pipeline.execute()
|
||
return True
|
||
except Exception as e:
|
||
self.LOG.error(f"清理分析数据失败: {e}")
|
||
return False |