# -*- coding: utf-8 -*- from datetime import datetime from typing import Dict, Optional from db.connection import DBConnectionManager class SignInRedisDB: """签到系统Redis相关操作""" def __init__(self, db_manager: DBConnectionManager): self.db_manager = db_manager self.prefix = "group:sign_in:" def get_redis_connection(self): """获取Redis连接""" return self.db_manager.get_redis_connection() def load_signin_count(self) -> Dict[str, int]: """加载签到人数数据""" signin_count = {} with self.get_redis_connection() as redis_client: keys = redis_client.keys(f'{self.prefix}*') for key in keys: if isinstance(key, bytes): key = key.decode('utf-8') if key == f'{self.prefix}last_reset_date': continue group_id = key.replace(self.prefix, '') count = redis_client.get(key) if count is not None: if isinstance(count, bytes): count = count.decode('utf-8') signin_count[group_id] = int(count) return signin_count def save_signin_count(self, group_id: str, count: int) -> bool: """保存签到人数""" try: with self.get_redis_connection() as redis_client: redis_client.set(f'{self.prefix}{group_id}', count) return True except Exception: return False def get_last_reset_date(self) -> Optional[datetime.date]: """获取最后重置日期""" with self.get_redis_connection() as redis_client: date_str = redis_client.get(f'{self.prefix}last_reset_date') if date_str: if isinstance(date_str, bytes): date_str = date_str.decode('utf-8') return datetime.strptime(date_str, '%Y-%m-%d').date() return None def save_last_reset_date(self, date: datetime.date) -> bool: """保存最后重置日期""" try: with self.get_redis_connection() as redis_client: redis_client.set(f'{self.prefix}last_reset_date', date.strftime('%Y-%m-%d')) return True except Exception: return False def reset_daily_counts(self) -> bool: """重置每日签到计数""" try: with self.get_redis_connection() as redis_client: keys = redis_client.keys(f'{self.prefix}*') for key in keys: if isinstance(key, bytes): key = key.decode('utf-8') # 只删除每日签到计数键,保留用户总签到次数和重置日期 if key != f'{self.prefix}last_reset_date' and not key.startswith(f'{self.prefix}user_total:'): redis_client.delete(key) return True except Exception: return False def get_user_total_sign_count_key(self, wx_id: str, group_id: str) -> str: """获取用户总签到次数的Redis键""" return f"{self.prefix}user_total:{wx_id}:{group_id}" def increment_user_sign_count(self, wx_id: str, group_id: str, sign_in_db=None) -> int: """增加用户签到总次数""" key = self.get_user_total_sign_count_key(wx_id, group_id) with self.get_redis_connection() as redis_client: # 先检查是否存在,如果不存在则初始化 if not redis_client.exists(key) and sign_in_db: user_record = sign_in_db.get_user_record(wx_id, group_id) if user_record and user_record.get('signin_streak', 0) > 0: # 使用连签天数作为初始值 initial_count = user_record['signin_streak'] redis_client.set(key, initial_count) # 然后增加1 count = redis_client.incr(key) return count # 如果已存在或初始化失败,直接增加 count = redis_client.incr(key) return count def get_user_total_sign_count(self, wx_id: str, group_id: str, sign_in_db=None) -> int: """获取用户签到总次数,如果Redis中没有数据则使用连签天数初始化""" key = self.get_user_total_sign_count_key(wx_id, group_id) with self.get_redis_connection() as redis_client: count = redis_client.get(key) if count: return int(count) # Redis中没有数据,尝试从数据库初始化 if sign_in_db: user_record = sign_in_db.get_user_record(wx_id, group_id) if user_record and user_record.get('signin_streak', 0) > 0: # 使用连签天数作为初始总签到次数 initial_count = user_record['signin_streak'] redis_client.set(key, initial_count) return initial_count # 如果数据库也没有记录,返回0 return 0 def set_user_total_sign_count(self, wx_id: str, group_id: str, count: int) -> bool: """设置用户签到总次数""" key = self.get_user_total_sign_count_key(wx_id, group_id) try: with self.get_redis_connection() as redis_client: redis_client.set(key, count) return True except Exception: return False