Files
abot/plugins/message_sign/main.py
2025-09-19 10:00:05 +08:00

580 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import random
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Tuple
import pytz
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.connection import DBConnectionManager
from db.sign_in import SignInDB
from db.sign_in_redis import SignInRedisDB
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.decorator.points_decorator import points_reward_decorator
from utils.revoke.message_auto_revoke import MessageAutoRevoke
from utils.robot_cmd.robot_command import PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
class MessageSignPlugin(MessagePluginInterface):
"""签到插件"""
# 功能权限常量
FEATURE_KEY = "SIGN_IN"
FEATURE_DESCRIPTION = "✅ 签到功能 [签到, 每日签到, qd, Qd, QD, 上班, 牛马]"
@property
def name(self) -> str:
return "签到系统"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供群聊签到功能,支持连续签到奖励和积分系统"
@property
def author(self) -> str:
return "ABOT Team"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.today_signin_count = {}
self.last_reset_date = None
self.timezone = 'Asia/Shanghai'
self.sign_in_db = None
self.sign_in_redis = None
# 添加词汇表文件路径和词汇列表
self.vocab_file_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"resource", "TOEFL.txt")
self.vocab_list = []
self.bot: WechatAPIClient = None
# 注册功能权限
self.feature = self.register_feature()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self.gbm = context.get("gbm")
self.all_contacts = context.get("all_contacts", {})
self.db_manager = DBConnectionManager.get_instance()
# 初始化数据库操作类
self.sign_in_db = SignInDB(self.db_manager)
self.sign_in_redis = SignInRedisDB(self.db_manager)
# 从配置中获取参数
sign_in_config = self._config.get("SignIn", {})
self._commands = sign_in_config.get("command", ["签到", "每日签到", "qd", "Qd", "QD", "上班", "牛马"])
# 添加补签命令
self._makeup_commands = sign_in_config.get("makeup-command", ["补签", "续签"])
self.min_point = sign_in_config.get("min-point", 3)
self.max_point = sign_in_config.get("max-point", 50)
self.streak_cycle = sign_in_config.get("streak-cycle", 1)
self.max_streak_point = sign_in_config.get("max-streak-point", 50)
self.enable = sign_in_config.get("enable", True)
# 补签消费的积分
self.makeup_cost = sign_in_config.get("makeup-cost", 10)
# 从 Redis 初始化签到数据
self.today_signin_count = self.sign_in_redis.load_signin_count()
last_reset_date = self.sign_in_redis.get_last_reset_date()
if last_reset_date:
self.last_reset_date = last_reset_date
else:
self.last_reset_date = datetime.now(tz=pytz.timezone(self.timezone)).date()
self.sign_in_redis.save_last_reset_date(self.last_reset_date)
# 加载词汇表
self.load_vocabulary()
self.LOG.info(
f"[{self.name}] 插件初始化完成,指令:{self._commands},补签指令:{self._makeup_commands},已加载 {len(self.vocab_list)} 个词汇")
return True
def load_vocabulary(self):
"""加载词汇表到内存"""
try:
if os.path.exists(self.vocab_file_path):
with open(self.vocab_file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
self.vocab_list = [line.strip() for line in lines if line.strip()]
self.LOG.info(f"成功加载词汇表,共 {len(self.vocab_list)} 个单词")
else:
self.LOG.error(f"词汇表文件不存在: {self.vocab_file_path}")
except Exception as e:
self.LOG.error(f"加载词汇表出错: {e}")
def get_random_vocabulary(self) -> str:
"""从内存中获取随机词汇"""
if not self.vocab_list:
return "词汇表为空"
return random.choice(self.vocab_list)
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
# 修改 can_process 方法,保持不变
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands or command in self._makeup_commands
# 添加积分计算函数
def calculate_sign_in_points(self, message, success, response):
"""计算签到奖励积分
Args:
message: 消息内容
success: 处理结果
response: 响应内容
Returns:
int: 奖励积分数量
"""
points = 0
if success:
sender = message.get("sender")
roomid = message.get("roomid", "")
# 获取当前时间,带有时区信息
current_time = datetime.now(tz=pytz.timezone(self.timezone))
today_start = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday = today_start - timedelta(days=1)
# 获取用户的签到记录
user_record = self.get_user_record(sender, roomid)
# 直接使用用户当前的连续签到天数,而不是重新计算
# 这个值已经在 _handle_sign_in 方法中被正确更新
if "签到成功" in response:
# 从响应中提取连续签到天数
streak = 1 # 默认为1天
if user_record:
# 使用数据库中已更新的连签天数
streak = user_record.get('signin_streak', 1)
# 如果今天刚签到,连签天数已经+1所以这里不需要再加1
# 计算积分
points = self.calculate_points(streak)
return points
# 修改 process_message 方法,作为路由分发
@plugin_stats_decorator(plugin_name="签到系统")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
self.bot = message.get("bot")
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
# 处理补签命令
if command in self._makeup_commands:
return await self._handle_makeup_sign(message)
# 处理正常签到命令
if command in self._commands:
return await self._handle_sign_in(message)
return False, "不支持的命令"
# 添加签到处理方法,应用积分奖励装饰器
@points_reward_decorator(calculate_sign_in_points, "checkin", "每日签到奖励", FEATURE_KEY)
async def _handle_sign_in(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理签到请求"""
sender = message.get("sender")
roomid = message.get("roomid", "")
all_contacts = message.get("all_contacts", {})
revoke: MessageAutoRevoke = message.get("revoke")
try:
# 获取当前时间,带有时区信息
current_time = datetime.now(tz=pytz.timezone(self.timezone))
today_start = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday = today_start - timedelta(days=1)
# 获取用户的签到记录
user_record = self.get_user_record(sender, roomid)
wx_nick_name = all_contacts.get(sender, sender)
# 判断用户是否已经签到过
if user_record and user_record.get('sign_stat'):
sign_stat = user_record['sign_stat']
# 确保 sign_stat 和 today_start 是同一时区对象
if isinstance(sign_stat, datetime) and sign_stat.tzinfo is None:
sign_stat = pytz.timezone(self.timezone).localize(sign_stat)
# 如果 sign_stat 已经大于或等于今天的零点,则认为用户已经签到过了
if sign_stat >= today_start:
# 获取总签到次数(不增加)
total_sign_count = self.sign_in_redis.get_user_total_sign_count(sender, roomid, self.sign_in_db)
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(
(roomid if roomid else sender), f"您今天已经签到过了!\n总签到次数:{total_sign_count}", sender)
revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 4)
return False, "已签到"
# 在_handle_sign_in方法中修改断签处理逻辑
# 找到约在第247行的代码块
streak = 0
streak_broken = False
old_streak = 1
if user_record and user_record['sign_stat']:
last_sign_date = user_record['sign_stat'].replace(hour=0, minute=0, second=0, microsecond=0)
# 确保 sign_stat 和 today_start 是同一时区对象
if isinstance(last_sign_date, datetime) and last_sign_date.tzinfo is None:
last_sign_date = pytz.timezone(self.timezone).localize(last_sign_date)
self.LOG.info(
f"last_sign_date: {last_sign_date}, yesterday: {yesterday}, user_streak: {user_record['signin_streak']}")
streak = user_record['signin_streak'] + 1
old_streak = streak
if last_sign_date == yesterday:
streak_broken = False
else:
# 断签时,保存之前的连签天数
streak = 1
streak_broken = True
# 保存断签前的连签天数,用于后续补签恢复
previous_streak = user_record['signin_streak']
else:
streak = 1
today_signin_rank = self.get_today_signin_count(roomid) + 1
self.today_signin_count[roomid] = today_signin_rank
self.sign_in_redis.save_signin_count(roomid, today_signin_rank)
# 使用数据库操作类更新或创建签到记录
points_to_add = self.calculate_points(streak)
if user_record:
# 保存上次签到时间
last_sign_date = user_record.get('sign_stat')
# 如果断签,保存断签前的连签天数
if streak_broken and user_record['signin_streak'] > 1:
self.sign_in_db.update_sign_record_with_previous_streak(
sender, roomid, wx_nick_name,
points_to_add,
current_time, streak,
last_sign_date, # 保存上次签到时间
user_record['signin_streak'] # 保存断签前的连签天数
)
else:
self.sign_in_db.update_sign_record_with_last_date(
sender, roomid, wx_nick_name,
points_to_add,
current_time, streak,
last_sign_date # 保存上次签到时间
)
else:
self.sign_in_db.create_sign_record_with_last_date(
sender, roomid, wx_nick_name,
points_to_add,
current_time, streak,
None # 首次签到,没有上次签到时间
)
# 记录签到历史
self.sign_in_db.create_sign_history(
sender, roomid, current_time, current_time,
False, points_to_add, streak
)
# 更新Redis中的签到总次数
total_sign_count = self.sign_in_redis.increment_user_sign_count(sender, roomid, self.sign_in_db)
# 在输出信息中添加每日词汇和签到总次数
output = f"签到成功,加[{points_to_add}]积分,第[{today_signin_rank}]个!"
if streak_broken and old_streak > 0: # 只有在真的断签且之前有签到记录时才显示
output += f"断开了 {old_streak} 天连签!"
elif streak > 1:
output += f"连签 {streak} 天!"
# 添加总签到次数信息
output += f"\n总签到次数:{total_sign_count}"
# 从内存中获取随机词汇
daily_vocab = self.get_random_vocabulary()
output += f"\n今日词汇:{daily_vocab}"
client_msg_id, create_time, new_msg_id = await self.bot.send_at_message((roomid if roomid else sender),
output, [sender])
revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 60)
return True, "签到成功,已赠送积分"
except Exception as e:
self.LOG.error(f"处理签到请求出错: {e}")
client_msg_id, create_time, new_msg_id = await self.bot.send_at_message((roomid if roomid else sender),
f"签到出错:{e}",
[sender])
revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 3)
return False, f"处理出错: {e}"
def reset_today_count_if_needed(self):
"""检查并重置每日签到计数"""
current_date = datetime.now(tz=pytz.timezone(self.timezone)).date()
if current_date != self.last_reset_date:
self.today_signin_count.clear()
self.sign_in_redis.reset_daily_counts()
self.last_reset_date = current_date
self.sign_in_redis.save_last_reset_date(self.last_reset_date)
self.LOG.info(f"[签到] 已重置每日签到计数,日期更新为 {current_date}")
def get_today_signin_count(self, group_id: str) -> int:
"""获取群内今日签到人数(使用缓存)"""
self.reset_today_count_if_needed()
return self.today_signin_count.get(group_id, 0)
def get_user_record(self, wx_id: str, group_id: str) -> Optional[dict]:
"""获取用户签到记录"""
return self.sign_in_db.get_user_record(wx_id, group_id)
def calculate_points(self, streak: int) -> int:
"""根据连续签到天数计算积分"""
base_points = self.min_point
extra_points = min(streak // self.streak_cycle, self.max_streak_point)
total_points = base_points + extra_points
return min(total_points, self.max_point)
# 修改_handle_makeup_sign方法实现连签恢复功能
async def _handle_makeup_sign(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理补签请求"""
sender = message.get("sender")
roomid = message.get("roomid", "")
all_contacts = message.get("all_contacts", {})
revoke: MessageAutoRevoke = message.get("revoke")
try:
# 获取当前时间,带有时区信息
current_time = datetime.now(tz=pytz.timezone(self.timezone))
today_date = current_time.date()
today_start = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday = today_start - timedelta(days=1)
day_before_yesterday = today_start - timedelta(days=2)
# 获取用户的签到记录
user_record = self.get_user_record(sender, roomid)
wx_nick_name = all_contacts.get(sender, sender)
# 检查用户是否有签到记录
if not user_record:
await self.bot.send_text_message(
(roomid if roomid else sender),
"❌ 您还没有签到记录,请先进行签到!", sender
)
return True, "无签到记录"
# 获取上次签到时间并规范化到零点
last_sign_date = None
if user_record.get('last_sign_date'):
last_sign_date = user_record['last_sign_date'].replace(hour=0, minute=0, second=0, microsecond=0)
elif user_record.get('sign_stat'):
last_sign_date = user_record['sign_stat'].replace(hour=0, minute=0, second=0, microsecond=0)
# 确保时区一致
if isinstance(last_sign_date, datetime) and last_sign_date.tzinfo is None:
last_sign_date = pytz.timezone(self.timezone).localize(last_sign_date)
# 获取当前签到状态并规范化到零点
sign_stat = None
if user_record.get('sign_stat'):
sign_stat = user_record['sign_stat'].replace(hour=0, minute=0, second=0, microsecond=0)
if sign_stat.tzinfo is None:
sign_stat = pytz.timezone(self.timezone).localize(sign_stat)
# 检查是否已经签到今天
if sign_stat and sign_stat >= today_start:
# 今天已经签到,检查是否需要补签昨天
if last_sign_date and last_sign_date >= day_before_yesterday and last_sign_date < yesterday:
# 上次签到在前天,今天已签到,可以补签昨天
self.LOG.info(f"今天已签到可以补签昨天last_sign_date: {last_sign_date}")
else:
# 昨天已经签到了或者断签超过一天,不需要补签
self.LOG.info(f"不符合补签条件last_sign_date: {last_sign_date}, sign_stat: {sign_stat}")
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(
(roomid if roomid else sender), "❌ 您昨天已经签到过了或断签超过一天,不符合补签条件!", sender
)
revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 3)
return True, "不符合补签条件"
else:
# 今天未签到,检查是否符合补签条件(只能补签昨天)
if not last_sign_date or last_sign_date < day_before_yesterday:
self.LOG.info(f"断签超过一天last_sign_date: {last_sign_date}")
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(
(roomid if roomid else sender),
"❌ 只能补签断签一天的情况!您已断签超过一天或没有签到记录。",
sender
)
revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 5)
return True, "不符合补签条件"
if last_sign_date >= yesterday:
self.LOG.info(f"昨天已签到last_sign_date: {last_sign_date}")
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(
(roomid if roomid else sender),
"❌ 您昨天已经签到过了,不需要补签!",
sender
)
revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 5)
return True, "无需补签"
# 检查用户积分是否足够
from db.points_db import PointsDBOperator, PointSource
points_db = PointsDBOperator(self.db_manager)
user_points = points_db.get_user_points(sender, roomid)
if not user_points or user_points["total_points"] < self.makeup_cost:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(
(roomid if roomid else sender),
f"❌ 积分不足!补签需要 {self.makeup_cost} 积分,您当前只有 {user_points.get('total_points', 0)} 积分。",
sender
)
revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 5)
return True, "积分不足"
# 扣除积分
deduct_success, deduct_result = points_db.deduct_points(
sender, roomid, self.makeup_cost, PointSource.PLUGIN,
"签到补签消费"
)
if not deduct_success:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(
(roomid if roomid else sender),
f"❌ 扣除积分失败:{deduct_result.get('error', '未知错误')}",
sender
)
revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 5)
return True, "扣除积分失败"
# 在_handle_makeup_sign方法中修改计算新连签天数的逻辑
# 获取原连签天数和断签前连签天数
original_streak = user_record['signin_streak']
previous_streak = user_record.get('previous_streak', 0)
# 计算新的连签天数
new_streak = original_streak
# 如果有断签前记录,直接使用断签前连签天数+1
if previous_streak > 0:
# 恢复连签:断签前连签天数 + 1 (补签后相当于连续签到)
new_streak = previous_streak + 1
self.LOG.info(f"恢复连签: {previous_streak} + 1 = {new_streak}")
else:
# 如果没有断签前记录,则连签天数+1
new_streak = original_streak + 1
self.LOG.info(f"普通补签: {original_streak} + 1 = {new_streak}")
# 更新签到记录
yesterday_time = yesterday.replace(hour=current_time.hour, minute=current_time.minute,
second=current_time.second)
# 如果今天已经签到则更新last_sign_date为昨天保持sign_stat不变
if sign_stat and sign_stat >= today_start:
self.sign_in_db.update_makeup_sign_with_streak_recovery(
sender, roomid, wx_nick_name,
0, # 补签不增加积分
sign_stat, new_streak, # 使用恢复后的连签天数
yesterday_time, # 设置last_sign_date为昨天
0 # 清除previous_streak因为已经恢复了
)
else:
# 如果今天没签到则更新sign_stat为昨天last_sign_date为上次签到时间
self.sign_in_db.update_sign_record_with_streak_recovery(
sender, roomid, wx_nick_name,
0, # 补签不增加积分
yesterday_time, new_streak, # 使用恢复后的连签天数
last_sign_date, # 保留原来的last_sign_date
0 # 清除previous_streak因为已经恢复了
)
# 记录签到历史
self.sign_in_db.create_sign_history(
sender, roomid, yesterday_time, current_time,
True, 0, new_streak
)
# 更新Redis中的签到总次数
total_sign_count = self.sign_in_redis.increment_user_sign_count(sender, roomid, self.sign_in_db)
# 发送成功消息
success_message = f"✅ 补签成功!\n💰 消费 {self.makeup_cost} 积分\n"
# 如果恢复了连签,显示恢复信息
if new_streak > original_streak + 1:
success_message += f"🎉 恢复连签!连续签到天数:{new_streak}\n"
else:
success_message += f"🔄 连续签到天数:{new_streak}\n"
# 添加总签到次数信息
success_message += f"📊 总签到次数:{total_sign_count}\n"
success_message += f"💰 当前积分:{user_points['total_points'] - self.makeup_cost}"
await self.bot.send_text_message(
(roomid if roomid else sender),
success_message, sender
)
return True, "补签成功"
except Exception as e:
self.LOG.error(f"处理补签请求出错: {e}")
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(
(roomid if roomid else sender),
f"❌ 补签出错:{e}", sender
)
revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 5)
return False, f"处理出错: {e}"