from datetime import datetime, timedelta import logging import pytz from typing import Dict, Any, List, Optional, Tuple from wcferry import Wcf, WxMsg from db.connection import DBConnectionManager from message_util import MessageUtil from plugin_common.message_plugin_interface import MessagePluginInterface from plugin_common.plugin_interface import PluginStatus from plugins.stats_collector.decorators import plugin_stats_decorator from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from db.sign_in import SignInDB from db.sign_in_redis import SignInRedisDB class MessageSignPlugin(MessagePluginInterface): """签到插件""" @property def name(self) -> str: return "签到系统" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供群聊签到功能,支持连续签到奖励和积分系统" @property def author(self) -> str: return "WeChatRobot Team" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands def __init__(self): super().__init__() self.today_signin_count = {} self.last_reset_date = None self.timezone = 'Asia/Shanghai' self.sign_in_db = None self.sign_in_redis = None def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logging.getLogger(f"Plugin.{self.name}") self.LOG.info(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.wcf = context.get("wcf") self.event_system = context.get("event_system") self.message_util:MessageUtil = context.get("message_util") self.gbm = context.get("gbm") self.all_contacts = context.get("all_contacts", {}) self.db_manager = DBConnectionManager.get_instance() # 初始化数据库操作类 self.sign_in_db = SignInDB(self.db_manager) self.sign_in_redis = SignInRedisDB(self.db_manager) # 从配置中获取参数 sign_in_config = self._config.get("SignIn", {}) self._commands = sign_in_config.get("command", ["签到", "每日签到", "qd", "Qd", "QD", "上班", "牛马"]) self.min_point = sign_in_config.get("min-point", 3) self.max_point = sign_in_config.get("max-point", 50) self.streak_cycle = sign_in_config.get("streak-cycle", 1) self.max_streak_point = sign_in_config.get("max-streak-point", 50) self.enable = sign_in_config.get("enable", True) # 从 Redis 初始化签到数据 self.today_signin_count = self.sign_in_redis.load_signin_count() last_reset_date = self.sign_in_redis.get_last_reset_date() if last_reset_date: self.last_reset_date = last_reset_date else: self.last_reset_date = datetime.now(tz=pytz.timezone(self.timezone)).date() self.sign_in_redis.save_last_reset_date(self.last_reset_date) self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() command = content.split(" ")[0] return command in self._commands @plugin_stats_decorator(plugin_name="签到系统") def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.info(f"插件执行: {self.name}:{content}") command = content.split(" ")[0] sender = message.get("sender") roomid = message.get("roomid", "") wcf: Wcf = message.get("wcf") gbm: GroupBotManager = message.get("gbm") all_contacts = message.get("all_contacts", {}) # 检查权限 if roomid and gbm.get_group_permission(roomid, Feature.SIGNIN) == PermissionStatus.DISABLED: return False, "没有权限" try: # 获取当前时间,带有时区信息 current_time = datetime.now(tz=pytz.timezone(self.timezone)) # 获取当天零点的时间 today_start = current_time.replace(hour=0, minute=0, second=0, microsecond=0) # 获取昨天的时间 yesterday = today_start - timedelta(days=1) # 获取用户的签到记录 user_record = self.get_user_record(sender, roomid) wx_nick_name = all_contacts.get(sender, sender) # 判断用户是否已经签到过 if user_record and user_record.get('sign_stat'): sign_stat = user_record['sign_stat'] # 确保 sign_stat 和 today_start 是同一时区对象 if isinstance(sign_stat, datetime) and sign_stat.tzinfo is None: sign_stat = pytz.timezone(self.timezone).localize(sign_stat) # 如果 sign_stat 已经大于或等于今天的零点,则认为用户已经签到过了 if sign_stat >= today_start: self.message_util.send_text_msg(f"您今天已经签到过了!当前积分:{user_record['points']}", (roomid if roomid else sender), sender) return True, "已签到" streak = 0 streak_broken = False old_streak = 1 if user_record and user_record['sign_stat']: last_sign_date = user_record['sign_stat'].replace(hour=0, minute=0, second=0, microsecond=0) # 确保 sign_stat 和 today_start 是同一时区对象 if isinstance(last_sign_date, datetime) and last_sign_date.tzinfo is None: last_sign_date = pytz.timezone(self.timezone).localize(last_sign_date) self.LOG.info( f"last_sign_date: {last_sign_date}, yesterday: {yesterday}, user_streak: {user_record['signin_streak']}") if last_sign_date == yesterday: streak = user_record['signin_streak'] + 1 old_streak = streak streak_broken = False else: streak = 1 streak_broken = True else: streak = 1 today_signin_rank = self.get_today_signin_count(roomid) + 1 self.today_signin_count[roomid] = today_signin_rank self.sign_in_redis.save_signin_count(roomid, today_signin_rank) points_to_add = self.calculate_points(streak) # 使用数据库操作类更新或创建签到记录 if user_record: self.sign_in_db.update_sign_record( sender, roomid, wx_nick_name, points_to_add, current_time, streak ) else: self.sign_in_db.create_sign_record( sender, roomid, wx_nick_name, points_to_add, current_time, streak ) output = f"签到成功,加[{points_to_add}]分,第[{today_signin_rank}]个!" if streak_broken and old_streak > 0: # 只有在真的断签且之前有签到记录时才显示 output += f"断开了 {old_streak} 天连签!" elif streak > 1: output += f"连签 {streak} 天!" self.message_util.send_text_msg(output, (roomid if roomid else sender), sender) return True, "签到成功" except Exception as e: self.LOG.error(f"处理签到请求出错: {e}") self.message_util.send_text_msg(f"签到出错:{e}", (roomid if roomid else sender), sender) return True, f"处理出错: {e}" def reset_today_count_if_needed(self): """检查并重置每日签到计数""" current_date = datetime.now(tz=pytz.timezone(self.timezone)).date() if current_date != self.last_reset_date: self.today_signin_count.clear() self.sign_in_redis.reset_daily_counts() self.last_reset_date = current_date self.sign_in_redis.save_last_reset_date(self.last_reset_date) self.LOG.info(f"[签到] 已重置每日签到计数,日期更新为 {current_date}") def get_today_signin_count(self, group_id: str) -> int: """获取群内今日签到人数(使用缓存)""" self.reset_today_count_if_needed() return self.today_signin_count.get(group_id, 0) def get_user_record(self, wx_id: str, group_id: str) -> Optional[dict]: """获取用户签到记录""" return self.sign_in_db.get_user_record(wx_id, group_id) def calculate_points(self, streak: int) -> int: """根据连续签到天数计算积分""" base_points = self.min_point extra_points = min(streak // self.streak_cycle, self.max_streak_point) total_points = base_points + extra_points return min(total_points, self.max_point)