Files
abot/plugins/message_sign/main.py
2025-04-11 10:52:23 +08:00

474 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime, timedelta
import logging
import pytz
from typing import Dict, Any, List, Optional, Tuple
from wcferry import Wcf
from db.connection import DBConnectionManager
from message_util import MessageUtil
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from db.sign_in import SignInDB
from db.sign_in_redis import SignInRedisDB
import random
import os
from utils.decorator.points_decorator import points_reward_decorator
class MessageSignPlugin(MessagePluginInterface):
"""签到插件"""
@property
def name(self) -> str:
return "签到系统"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供群聊签到功能,支持连续签到奖励和积分系统"
@property
def author(self) -> str:
return "WeChatRobot Team"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.today_signin_count = {}
self.last_reset_date = None
self.timezone = 'Asia/Shanghai'
self.sign_in_db = None
self.sign_in_redis = None
# 添加词汇表文件路径和词汇列表
self.vocab_file_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"resource", "6 托福-乱序.txt")
self.vocab_list = []
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf = context.get("wcf")
self.event_system = context.get("event_system")
self.message_util: MessageUtil = context.get("message_util")
self.gbm = context.get("gbm")
self.all_contacts = context.get("all_contacts", {})
self.db_manager = DBConnectionManager.get_instance()
# 初始化数据库操作类
self.sign_in_db = SignInDB(self.db_manager)
self.sign_in_redis = SignInRedisDB(self.db_manager)
# 从配置中获取参数
sign_in_config = self._config.get("SignIn", {})
self._commands = sign_in_config.get("command", ["签到", "每日签到", "qd", "Qd", "QD", "上班", "牛马"])
# 添加补签命令
self._makeup_commands = sign_in_config.get("makeup-command", ["补签", "续签"])
self.min_point = sign_in_config.get("min-point", 3)
self.max_point = sign_in_config.get("max-point", 50)
self.streak_cycle = sign_in_config.get("streak-cycle", 1)
self.max_streak_point = sign_in_config.get("max-streak-point", 50)
self.enable = sign_in_config.get("enable", True)
# 补签消费的积分
self.makeup_cost = sign_in_config.get("makeup-cost", 10)
# 从 Redis 初始化签到数据
self.today_signin_count = self.sign_in_redis.load_signin_count()
last_reset_date = self.sign_in_redis.get_last_reset_date()
if last_reset_date:
self.last_reset_date = last_reset_date
else:
self.last_reset_date = datetime.now(tz=pytz.timezone(self.timezone)).date()
self.sign_in_redis.save_last_reset_date(self.last_reset_date)
# 加载词汇表
self.load_vocabulary()
self.LOG.info(
f"[{self.name}] 插件初始化完成,指令:{self._commands},补签指令:{self._makeup_commands},已加载 {len(self.vocab_list)} 个词汇")
return True
def load_vocabulary(self):
"""加载词汇表到内存"""
try:
if os.path.exists(self.vocab_file_path):
with open(self.vocab_file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
self.vocab_list = [line.strip() for line in lines if line.strip()]
self.LOG.info(f"成功加载词汇表,共 {len(self.vocab_list)} 个单词")
else:
self.LOG.error(f"词汇表文件不存在: {self.vocab_file_path}")
except Exception as e:
self.LOG.error(f"加载词汇表出错: {e}")
def get_random_vocabulary(self) -> str:
"""从内存中获取随机词汇"""
if not self.vocab_list:
return "词汇表为空"
return random.choice(self.vocab_list)
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
# 修改 can_process 方法,保持不变
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands or command in self._makeup_commands
# 添加积分计算函数
def calculate_sign_in_points(self, message, success, response):
"""计算签到奖励积分
Args:
message: 消息内容
success: 处理结果
response: 响应内容
Returns:
int: 奖励积分数量
"""
sender = message.get("sender")
roomid = message.get("roomid", "")
# 获取当前时间,带有时区信息
current_time = datetime.now(tz=pytz.timezone(self.timezone))
today_start = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday = today_start - timedelta(days=1)
# 获取用户的签到记录
user_record = self.get_user_record(sender, roomid)
# 计算连续签到天数
streak = 1
if user_record and user_record['sign_stat']:
last_sign_date = user_record['sign_stat'].replace(hour=0, minute=0, second=0, microsecond=0)
# 确保 sign_stat 和 today_start 是同一时区对象
if isinstance(last_sign_date, datetime) and last_sign_date.tzinfo is None:
last_sign_date = pytz.timezone(self.timezone).localize(last_sign_date)
if last_sign_date == yesterday:
streak = user_record['signin_streak'] + 1
# 计算积分
points = self.calculate_points(streak)
return points
# 修改 process_message 方法,作为路由分发
@plugin_stats_decorator(plugin_name="签到系统")
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.SIGNIN) == PermissionStatus.DISABLED:
return False, "没有权限"
# 处理补签命令
if command in self._makeup_commands:
return self._handle_makeup_sign(message)
# 处理正常签到命令
if command in self._commands:
return self._handle_sign_in(message)
return False, "不支持的命令"
# 添加签到处理方法,应用积分奖励装饰器
@points_reward_decorator(calculate_sign_in_points, "checkin", "每日签到奖励", Feature.SIGNIN)
def _handle_sign_in(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理签到请求"""
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf: Wcf = message.get("wcf")
all_contacts = message.get("all_contacts", {})
try:
# 获取当前时间,带有时区信息
current_time = datetime.now(tz=pytz.timezone(self.timezone))
today_start = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday = today_start - timedelta(days=1)
# 获取用户的签到记录
user_record = self.get_user_record(sender, roomid)
wx_nick_name = all_contacts.get(sender, sender)
# 判断用户是否已经签到过
if user_record and user_record.get('sign_stat'):
sign_stat = user_record['sign_stat']
# 确保 sign_stat 和 today_start 是同一时区对象
if isinstance(sign_stat, datetime) and sign_stat.tzinfo is None:
sign_stat = pytz.timezone(self.timezone).localize(sign_stat)
# 如果 sign_stat 已经大于或等于今天的零点,则认为用户已经签到过了
if sign_stat >= today_start:
self.message_util.send_text_msg(f"您今天已经签到过了!共获得积分:{user_record['points']}",
(roomid if roomid else sender), sender)
return True, "已签到"
streak = 0
streak_broken = False
old_streak = 1
if user_record and user_record['sign_stat']:
last_sign_date = user_record['sign_stat'].replace(hour=0, minute=0, second=0, microsecond=0)
# 确保 sign_stat 和 today_start 是同一时区对象
if isinstance(last_sign_date, datetime) and last_sign_date.tzinfo is None:
last_sign_date = pytz.timezone(self.timezone).localize(last_sign_date)
self.LOG.info(
f"last_sign_date: {last_sign_date}, yesterday: {yesterday}, user_streak: {user_record['signin_streak']}")
if last_sign_date == yesterday:
streak = user_record['signin_streak'] + 1
old_streak = streak
streak_broken = False
else:
streak = 1
streak_broken = True
else:
streak = 1
today_signin_rank = self.get_today_signin_count(roomid) + 1
self.today_signin_count[roomid] = today_signin_rank
self.sign_in_redis.save_signin_count(roomid, today_signin_rank)
# 使用数据库操作类更新或创建签到记录
points_to_add = self.calculate_points(streak)
if user_record:
# 保存上次签到时间
last_sign_date = user_record.get('sign_stat')
self.sign_in_db.update_sign_record_with_last_date(
sender, roomid, wx_nick_name,
points_to_add,
current_time, streak,
last_sign_date # 保存上次签到时间
)
else:
self.sign_in_db.create_sign_record_with_last_date(
sender, roomid, wx_nick_name,
points_to_add,
current_time, streak,
None # 首次签到,没有上次签到时间
)
# 在输出信息中添加每日词汇
output = f"签到成功,加[{points_to_add}]分,第[{today_signin_rank}]个!"
if streak_broken and old_streak > 0: # 只有在真的断签且之前有签到记录时才显示
output += f"断开了 {old_streak} 天连签!"
elif streak > 1:
output += f"连签 {streak} 天!"
# 从内存中获取随机词汇
daily_vocab = self.get_random_vocabulary()
output += f"\n今日词汇:{daily_vocab}"
self.message_util.send_text_msg(output, (roomid if roomid else sender), sender)
return True, "签到成功"
except Exception as e:
self.LOG.error(f"处理签到请求出错: {e}")
self.message_util.send_text_msg(f"签到出错:{e}",
(roomid if roomid else sender), sender)
return False, f"处理出错: {e}"
def reset_today_count_if_needed(self):
"""检查并重置每日签到计数"""
current_date = datetime.now(tz=pytz.timezone(self.timezone)).date()
if current_date != self.last_reset_date:
self.today_signin_count.clear()
self.sign_in_redis.reset_daily_counts()
self.last_reset_date = current_date
self.sign_in_redis.save_last_reset_date(self.last_reset_date)
self.LOG.info(f"[签到] 已重置每日签到计数,日期更新为 {current_date}")
def get_today_signin_count(self, group_id: str) -> int:
"""获取群内今日签到人数(使用缓存)"""
self.reset_today_count_if_needed()
return self.today_signin_count.get(group_id, 0)
def get_user_record(self, wx_id: str, group_id: str) -> Optional[dict]:
"""获取用户签到记录"""
return self.sign_in_db.get_user_record(wx_id, group_id)
def calculate_points(self, streak: int) -> int:
"""根据连续签到天数计算积分"""
base_points = self.min_point
extra_points = min(streak // self.streak_cycle, self.max_streak_point)
total_points = base_points + extra_points
return min(total_points, self.max_point)
def _handle_makeup_sign(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理补签请求"""
sender = message.get("sender")
roomid = message.get("roomid", "")
all_contacts = message.get("all_contacts", {})
try:
# 获取当前时间,带有时区信息
current_time = datetime.now(tz=pytz.timezone(self.timezone))
today_start = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday = today_start - timedelta(days=1)
day_before_yesterday = today_start - timedelta(days=2)
# 获取用户的签到记录
user_record = self.get_user_record(sender, roomid)
wx_nick_name = all_contacts.get(sender, sender)
# 检查用户是否有签到记录
if not user_record:
self.message_util.send_text_msg(
"❌ 您还没有签到记录,请先进行签到!",
(roomid if roomid else sender), sender
)
return True, "无签到记录"
# 获取上次签到时间
last_sign_date = None
if user_record.get('last_sign_date'):
# 如果有last_sign_date字段使用它
last_sign_date = user_record['last_sign_date']
elif user_record.get('sign_stat'):
# 否则使用sign_stat
last_sign_date = user_record['sign_stat']
# 确保时区一致
if isinstance(last_sign_date, datetime) and last_sign_date.tzinfo is None:
last_sign_date = pytz.timezone(self.timezone).localize(last_sign_date)
# 获取当前签到状态
sign_stat = user_record.get('sign_stat')
if isinstance(sign_stat, datetime) and sign_stat.tzinfo is None:
sign_stat = pytz.timezone(self.timezone).localize(sign_stat)
# 检查是否已经签到今天
if sign_stat and sign_stat >= today_start:
# 今天已经签到,检查是否需要补签昨天
if last_sign_date and last_sign_date >= day_before_yesterday and last_sign_date < yesterday:
# 上次签到在前天,今天已签到,可以补签昨天
pass
else:
# 昨天已经签到了或者断签超过一天,不需要补签
self.message_util.send_text_msg(
"❌ 您昨天已经签到过了或断签超过一天,不符合补签条件!",
(roomid if roomid else sender), sender
)
return True, "不符合补签条件"
else:
# 今天未签到,检查是否符合补签条件(只能补签昨天)
if not last_sign_date or last_sign_date < day_before_yesterday:
self.message_util.send_text_msg(
"❌ 只能补签断签一天的情况!您已断签超过一天或没有签到记录。",
(roomid if roomid else sender), sender
)
return True, "不符合补签条件"
if last_sign_date >= yesterday:
self.message_util.send_text_msg(
"❌ 您昨天已经签到过了,不需要补签!",
(roomid if roomid else sender), sender
)
return True, "无需补签"
# 检查用户积分是否足够
from db.points_db import PointsDBOperator, PointSource
points_db = PointsDBOperator(self.db_manager)
user_points = points_db.get_user_points(sender, roomid)
if not user_points or user_points["total_points"] < self.makeup_cost:
self.message_util.send_text_msg(
f"❌ 积分不足!补签需要 {self.makeup_cost} 积分,您当前只有 {user_points.get('total_points', 0)} 积分。",
(roomid if roomid else sender), sender
)
return True, "积分不足"
# 扣除积分
deduct_success, deduct_result = points_db.deduct_points(
sender, roomid, self.makeup_cost, PointSource.PLUGIN,
"签到补签消费"
)
if not deduct_success:
self.message_util.send_text_msg(
f"❌ 扣除积分失败:{deduct_result.get('error', '未知错误')}",
(roomid if roomid else sender), sender
)
return True, "扣除积分失败"
# 获取原连签天数
original_streak = user_record['signin_streak']
# 更新签到记录
yesterday_time = yesterday.replace(hour=current_time.hour, minute=current_time.minute,
second=current_time.second)
# 如果今天已经签到则更新last_sign_date为昨天保持sign_stat不变
if sign_stat and sign_stat >= today_start:
self.sign_in_db.update_makeup_sign(
sender, roomid, wx_nick_name,
0, # 补签不增加积分
sign_stat, original_streak + 1,
yesterday_time # 设置last_sign_date为昨天
)
else:
# 如果今天没签到则更新sign_stat为昨天last_sign_date为上次签到时间
self.sign_in_db.update_sign_record_with_last_date(
sender, roomid, wx_nick_name,
0, # 补签不增加积分
yesterday_time, original_streak + 1,
last_sign_date # 保留原来的last_sign_date
)
# 发送成功消息
self.message_util.send_text_msg(
f"✅ 补签成功!\n"
f"💰 消费 {self.makeup_cost} 积分\n"
f"🔄 连续签到天数:{original_streak + 1}\n"
f"💰 当前积分:{user_points['total_points'] - self.makeup_cost}",
(roomid if roomid else sender), sender
)
return True, "补签成功"
except Exception as e:
self.LOG.error(f"处理补签请求出错: {e}")
self.message_util.send_text_msg(
f"❌ 补签出错:{e}",
(roomid if roomid else sender), sender
)
return False, f"处理出错: {e}"