Files
abot/plugins/message_sign/main.py
2025-04-09 14:49:27 +08:00

308 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime, timedelta
import logging
import pytz
from typing import Dict, Any, List, Optional, Tuple
from wcferry import Wcf
from db.connection import DBConnectionManager
from message_util import MessageUtil
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from db.sign_in import SignInDB
from db.sign_in_redis import SignInRedisDB
import random
import os
from utils.decorator.points_decorator import points_reward_decorator
class MessageSignPlugin(MessagePluginInterface):
"""签到插件"""
@property
def name(self) -> str:
return "签到系统"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供群聊签到功能,支持连续签到奖励和积分系统"
@property
def author(self) -> str:
return "WeChatRobot Team"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.today_signin_count = {}
self.last_reset_date = None
self.timezone = 'Asia/Shanghai'
self.sign_in_db = None
self.sign_in_redis = None
# 添加词汇表文件路径和词汇列表
self.vocab_file_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"resource", "6 托福-乱序.txt")
self.vocab_list = []
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf = context.get("wcf")
self.event_system = context.get("event_system")
self.message_util:MessageUtil = context.get("message_util")
self.gbm = context.get("gbm")
self.all_contacts = context.get("all_contacts", {})
self.db_manager = DBConnectionManager.get_instance()
# 初始化数据库操作类
self.sign_in_db = SignInDB(self.db_manager)
self.sign_in_redis = SignInRedisDB(self.db_manager)
# 从配置中获取参数
sign_in_config = self._config.get("SignIn", {})
self._commands = sign_in_config.get("command", ["签到", "每日签到", "qd", "Qd", "QD", "上班", "牛马"])
self.min_point = sign_in_config.get("min-point", 3)
self.max_point = sign_in_config.get("max-point", 50)
self.streak_cycle = sign_in_config.get("streak-cycle", 1)
self.max_streak_point = sign_in_config.get("max-streak-point", 50)
self.enable = sign_in_config.get("enable", True)
# 从 Redis 初始化签到数据
self.today_signin_count = self.sign_in_redis.load_signin_count()
last_reset_date = self.sign_in_redis.get_last_reset_date()
if last_reset_date:
self.last_reset_date = last_reset_date
else:
self.last_reset_date = datetime.now(tz=pytz.timezone(self.timezone)).date()
self.sign_in_redis.save_last_reset_date(self.last_reset_date)
# 加载词汇表
self.load_vocabulary()
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands},已加载 {len(self.vocab_list)} 个词汇")
return True
def load_vocabulary(self):
"""加载词汇表到内存"""
try:
if os.path.exists(self.vocab_file_path):
with open(self.vocab_file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
self.vocab_list = [line.strip() for line in lines if line.strip()]
self.LOG.info(f"成功加载词汇表,共 {len(self.vocab_list)} 个单词")
else:
self.LOG.error(f"词汇表文件不存在: {self.vocab_file_path}")
except Exception as e:
self.LOG.error(f"加载词汇表出错: {e}")
def get_random_vocabulary(self) -> str:
"""从内存中获取随机词汇"""
if not self.vocab_list:
return "词汇表为空"
return random.choice(self.vocab_list)
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
# 添加积分计算函数
def calculate_sign_in_points(self, message, success, response):
"""计算签到奖励积分
Args:
message: 消息内容
success: 处理结果
response: 响应内容
Returns:
int: 奖励积分数量
"""
sender = message.get("sender")
roomid = message.get("roomid", "")
# 获取当前时间,带有时区信息
current_time = datetime.now(tz=pytz.timezone(self.timezone))
today_start = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday = today_start - timedelta(days=1)
# 获取用户的签到记录
user_record = self.get_user_record(sender, roomid)
# 计算连续签到天数
streak = 1
if user_record and user_record['sign_stat']:
last_sign_date = user_record['sign_stat'].replace(hour=0, minute=0, second=0, microsecond=0)
# 确保 sign_stat 和 today_start 是同一时区对象
if isinstance(last_sign_date, datetime) and last_sign_date.tzinfo is None:
last_sign_date = pytz.timezone(self.timezone).localize(last_sign_date)
if last_sign_date == yesterday:
streak = user_record['signin_streak'] + 1
# 计算积分
points = self.calculate_points(streak)
return points
# 修改process_message方法使用装饰器
@plugin_stats_decorator(plugin_name="签到系统")
@points_reward_decorator(calculate_sign_in_points, "checkin", "每日签到奖励",Feature.SIGNIN)
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf: Wcf = message.get("wcf")
gbm: GroupBotManager = message.get("gbm")
all_contacts = message.get("all_contacts", {})
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.SIGNIN) == PermissionStatus.DISABLED:
return False, "没有权限"
try:
# 获取当前时间,带有时区信息
current_time = datetime.now(tz=pytz.timezone(self.timezone))
today_start = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday = today_start - timedelta(days=1)
# 获取用户的签到记录
user_record = self.get_user_record(sender, roomid)
wx_nick_name = all_contacts.get(sender, sender)
# 判断用户是否已经签到过
if user_record and user_record.get('sign_stat'):
sign_stat = user_record['sign_stat']
# 确保 sign_stat 和 today_start 是同一时区对象
if isinstance(sign_stat, datetime) and sign_stat.tzinfo is None:
sign_stat = pytz.timezone(self.timezone).localize(sign_stat)
# 如果 sign_stat 已经大于或等于今天的零点,则认为用户已经签到过了
if sign_stat >= today_start:
self.message_util.send_text_msg(f"您今天已经签到过了!当前积分:{user_record['points']}",
(roomid if roomid else sender), sender)
return True, "已签到"
streak = 0
streak_broken = False
old_streak = 1
if user_record and user_record['sign_stat']:
last_sign_date = user_record['sign_stat'].replace(hour=0, minute=0, second=0, microsecond=0)
# 确保 sign_stat 和 today_start 是同一时区对象
if isinstance(last_sign_date, datetime) and last_sign_date.tzinfo is None:
last_sign_date = pytz.timezone(self.timezone).localize(last_sign_date)
self.LOG.info(
f"last_sign_date: {last_sign_date}, yesterday: {yesterday}, user_streak: {user_record['signin_streak']}")
if last_sign_date == yesterday:
streak = user_record['signin_streak'] + 1
old_streak = streak
streak_broken = False
else:
streak = 1
streak_broken = True
else:
streak = 1
today_signin_rank = self.get_today_signin_count(roomid) + 1
self.today_signin_count[roomid] = today_signin_rank
self.sign_in_redis.save_signin_count(roomid, today_signin_rank)
# 使用数据库操作类更新或创建签到记录
# 注意:不再在这里计算和添加积分,由装饰器处理
points_to_add = self.calculate_points(streak)
if user_record:
self.sign_in_db.update_sign_record(
sender, roomid, wx_nick_name,
points_to_add, # 不在这里添加积分,由装饰器处理
current_time, streak
)
else:
self.sign_in_db.create_sign_record(
sender, roomid, wx_nick_name,
points_to_add, # 不在这里添加积分,由装饰器处理
current_time, streak
)
# 在输出信息中添加每日词汇
output = f"签到成功,加[{points_to_add}]分,第[{today_signin_rank}]个!"
if streak_broken and old_streak > 0: # 只有在真的断签且之前有签到记录时才显示
output += f"断开了 {old_streak} 天连签!"
elif streak > 1:
output += f"连签 {streak} 天!"
# 从内存中获取随机词汇
daily_vocab = self.get_random_vocabulary()
output += f"\n今日词汇:{daily_vocab}"
self.message_util.send_text_msg(output, (roomid if roomid else sender), sender)
return True, "签到成功"
except Exception as e:
self.LOG.error(f"处理签到请求出错: {e}")
self.message_util.send_text_msg(f"签到出错:{e}",
(roomid if roomid else sender), sender)
return False, f"处理出错: {e}"
def reset_today_count_if_needed(self):
"""检查并重置每日签到计数"""
current_date = datetime.now(tz=pytz.timezone(self.timezone)).date()
if current_date != self.last_reset_date:
self.today_signin_count.clear()
self.sign_in_redis.reset_daily_counts()
self.last_reset_date = current_date
self.sign_in_redis.save_last_reset_date(self.last_reset_date)
self.LOG.info(f"[签到] 已重置每日签到计数,日期更新为 {current_date}")
def get_today_signin_count(self, group_id: str) -> int:
"""获取群内今日签到人数(使用缓存)"""
self.reset_today_count_if_needed()
return self.today_signin_count.get(group_id, 0)
def get_user_record(self, wx_id: str, group_id: str) -> Optional[dict]:
"""获取用户签到记录"""
return self.sign_in_db.get_user_record(wx_id, group_id)
def calculate_points(self, streak: int) -> int:
"""根据连续签到天数计算积分"""
base_points = self.min_point
extra_points = min(streak // self.streak_cycle, self.max_streak_point)
total_points = base_points + extra_points
return min(total_points, self.max_point)