Files
abot/db/kid_photo_redis.py
2025-04-30 13:22:33 +08:00

305 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
from datetime import datetime
import json
import os
from loguru import logger
from typing import Dict, Optional, List, Any, Set
from db.connection import DBConnectionManager
class KidPhotoRedisDB:
"""小朋友照片提取功能Redis相关操作"""
def __init__(self, db_manager: DBConnectionManager):
self.db_manager = db_manager
self.prefix = "group:kid_photo:"
self.LOG = logger
def get_redis_connection(self):
"""获取Redis连接"""
try:
return self.db_manager.get_redis_connection()
except Exception as e:
self.LOG.error(f"获取Redis连接失败: {e}")
return None
def save_last_analysis_time(self, group_id: str) -> bool:
"""保存最后分析时间"""
try:
timestamp = int(datetime.now().timestamp())
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
redis_client.set(f'{self.prefix}{group_id}:last_time', str(timestamp))
return True
except Exception as e:
self.LOG.error(f"保存最后分析时间失败: {e}")
return False
def get_last_analysis_time(self, group_id: str) -> Optional[int]:
"""获取最后分析时间"""
try:
with self.get_redis_connection() as redis_client:
if not redis_client:
return None
timestamp = redis_client.get(f'{self.prefix}{group_id}:last_time')
if timestamp:
if isinstance(timestamp, bytes):
timestamp = timestamp.decode('utf-8')
return int(timestamp)
return None
except Exception as e:
self.LOG.error(f"获取最后分析时间失败: {e}")
return None
def save_analysis_result(self, group_id: str, result: Dict) -> bool:
"""保存分析结果"""
try:
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
redis_client.set(f'{self.prefix}{group_id}:results', json.dumps(result, ensure_ascii=False))
return True
except Exception as e:
self.LOG.error(f"保存分析结果失败: {e}")
return False
def get_last_analysis_result(self, group_id: str) -> Optional[Dict]:
"""获取最后一次分析结果"""
try:
with self.get_redis_connection() as redis_client:
if not redis_client:
return None
result = redis_client.get(f'{self.prefix}{group_id}:results')
if result:
if isinstance(result, bytes):
result = result.decode('utf-8')
return json.loads(result)
return None
except json.JSONDecodeError as e:
self.LOG.error(f"解析分析结果JSON失败: {e}")
return None
except Exception as e:
self.LOG.error(f"获取最后分析结果失败: {e}")
return None
def save_processed_photo(self, group_id: str, photo_path: str) -> bool:
"""保存已处理的照片路径"""
try:
if not photo_path:
return False
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
redis_client.sadd(f'{self.prefix}{group_id}:processed_photos', photo_path)
return True
except Exception as e:
self.LOG.error(f"保存已处理照片失败: {e}")
return False
def save_processed_photos(self, group_id: str, photo_paths: List[str]) -> bool:
"""批量保存已处理的照片路径"""
try:
if not photo_paths:
return True
# 过滤空路径
valid_paths = [path for path in photo_paths if path]
if not valid_paths:
return True
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
# 使用管道批量操作提高性能
pipeline = redis_client.pipeline()
key = f'{self.prefix}{group_id}:processed_photos'
# 每次最多添加1000个元素避免命令过长
batch_size = 1000
for i in range(0, len(valid_paths), batch_size):
batch = valid_paths[i:i+batch_size]
if batch:
pipeline.sadd(key, *batch)
pipeline.execute()
return True
except Exception as e:
self.LOG.error(f"批量保存已处理照片失败: {e}")
return False
def get_processed_photos(self, group_id: str) -> Set[str]:
"""获取所有已处理的照片路径"""
photos = set()
try:
with self.get_redis_connection() as redis_client:
if not redis_client:
return photos
photo_paths = redis_client.smembers(f'{self.prefix}{group_id}:processed_photos')
for path in photo_paths:
if isinstance(path, bytes):
path = path.decode('utf-8')
photos.add(path)
return photos
except Exception as e:
self.LOG.error(f"获取已处理照片失败: {e}")
return set()
def is_photo_processed(self, group_id: str, photo_path: str) -> bool:
"""检查照片是否已处理"""
try:
if not photo_path:
return False
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
return redis_client.sismember(f'{self.prefix}{group_id}:processed_photos', photo_path)
except Exception as e:
self.LOG.error(f"检查照片是否已处理失败: {e}")
return False
def save_photo_mapping(self, group_id: str, kid_id: str, photo_path: str) -> bool:
"""保存照片与小朋友的映射关系"""
try:
if not photo_path or not kid_id:
return False
photo_name = os.path.basename(photo_path)
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
# 使用管道批量操作
pipeline = redis_client.pipeline()
# 保存小朋友的照片集合
pipeline.sadd(f'{self.prefix}{group_id}:kid:{kid_id}:photos', photo_path)
# 保存照片对应的小朋友ID
pipeline.set(f'{self.prefix}{group_id}:photo:{photo_name}', kid_id)
pipeline.execute()
return True
except Exception as e:
self.LOG.error(f"保存照片映射关系失败: {e}")
return False
def get_kid_photos(self, group_id: str, kid_id: str) -> List[str]:
"""获取小朋友的所有照片"""
photos = []
try:
if not kid_id:
return photos
with self.get_redis_connection() as redis_client:
if not redis_client:
return photos
photo_paths = redis_client.smembers(f'{self.prefix}{group_id}:kid:{kid_id}:photos')
for path in photo_paths:
if isinstance(path, bytes):
path = path.decode('utf-8')
photos.append(path)
return photos
except Exception as e:
self.LOG.error(f"获取小朋友照片失败: {e}")
return []
def get_photo_kid(self, group_id: str, photo_name: str) -> Optional[str]:
"""获取照片对应的小朋友ID"""
try:
if not photo_name:
return None
with self.get_redis_connection() as redis_client:
if not redis_client:
return None
kid_id = redis_client.get(f'{self.prefix}{group_id}:photo:{photo_name}')
if kid_id and isinstance(kid_id, bytes):
kid_id = kid_id.decode('utf-8')
return kid_id
except Exception as e:
self.LOG.error(f"获取照片对应的小朋友ID失败: {e}")
return None
def save_last_process_time(self, group_id: str) -> bool:
"""保存最后处理时间"""
try:
timestamp = int(datetime.now().timestamp())
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
redis_client.set(f'{self.prefix}{group_id}:last_process_time', str(timestamp))
return True
except Exception as e:
self.LOG.error(f"保存最后处理时间失败: {e}")
return False
def get_last_process_time(self, group_id: str) -> Optional[int]:
"""获取最后处理时间"""
try:
with self.get_redis_connection() as redis_client:
if not redis_client:
return None
timestamp = redis_client.get(f'{self.prefix}{group_id}:last_process_time')
if timestamp:
if isinstance(timestamp, bytes):
timestamp = timestamp.decode('utf-8')
return int(timestamp)
return None
except Exception as e:
self.LOG.error(f"获取最后处理时间失败: {e}")
return None
def clear_processed_photos(self, group_id: str) -> bool:
"""清理已处理的照片记录"""
try:
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
redis_client.delete(f'{self.prefix}{group_id}:processed_photos')
return True
except Exception as e:
self.LOG.error(f"清理已处理照片记录失败: {e}")
return False
def clear_analysis_data(self, group_id: str) -> bool:
"""清理分析数据"""
try:
with self.get_redis_connection() as redis_client:
if not redis_client:
return False
# 使用管道批量操作
pipeline = redis_client.pipeline()
# 获取所有小朋友ID
kid_keys = redis_client.keys(f'{self.prefix}{group_id}:kid:*')
# 删除小朋友照片映射
if kid_keys:
pipeline.delete(*kid_keys)
# 删除照片到小朋友的映射
photo_keys = redis_client.keys(f'{self.prefix}{group_id}:photo:*')
if photo_keys:
pipeline.delete(*photo_keys)
# 删除其他数据
keys_to_delete = [
f'{self.prefix}{group_id}:last_time',
f'{self.prefix}{group_id}:results',
f'{self.prefix}{group_id}:processed_photos',
f'{self.prefix}{group_id}:last_process_time'
]
pipeline.delete(*keys_to_delete)
pipeline.execute()
return True
except Exception as e:
self.LOG.error(f"清理分析数据失败: {e}")
return False