我的积分+积分排行

This commit is contained in:
liuwei
2025-04-09 16:42:06 +08:00
parent edee17849f
commit d6050ff428

View File

@@ -70,8 +70,13 @@ class PointTradePlugin(MessagePluginInterface):
# 从配置中获取参数
point_trade_config = self._config.get("PointTrade", {})
self._commands = point_trade_config.get("command", ["积分交易", "积分转账", "转账积分"])
self.command_format = point_trade_config.get("command-format", "积分转账 积分数 @用户")
self._commands = point_trade_config.get("command", ["积分交易", "积分转账", "转账积分", "我的积分", "积分排行"])
self.command_format = point_trade_config.get("command-format", """
积分交易指令:
积分转账 积分数 @用户 - 转账给指定用户
我的积分 - 查询个人积分详情
积分排行 - 查看群内积分排行榜
""")
self.enable = point_trade_config.get("enable", True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
@@ -111,26 +116,47 @@ class PointTradePlugin(MessagePluginInterface):
gbm: GroupBotManager = message.get("gbm")
xml = message.get("xml", "")
# 检查命令格式
if len(command) < 3:
wcf.send_text(f"❌命令格式错误!{self.command_format}",
(roomid if roomid else sender), sender)
return True, "命令格式错误"
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.POINT_TRADE) == PermissionStatus.DISABLED:
return False, "没有权限"
# 处理不同的命令
if command[0] in ["积分转账", "转账积分"]:
return self._handle_transfer_points(message)
elif command[0] == "我的积分":
return self._handle_my_points(message)
elif command[0] == "积分排行":
return self._handle_points_ranking(message)
else:
wcf.send_text(f"❌未知命令!{self.command_format}",
(roomid if roomid else sender), sender)
return True, "未知命令"
def _handle_transfer_points(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理积分转账命令"""
content = str(message.get("content", "")).strip()
command = content.split(" ")
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf: Wcf = message.get("wcf")
xml = message.get("xml", "")
# 检查命令格式
if len(command) < 3:
wcf.send_text(f"❌命令格式错误!积分转账 积分数 @用户",
(roomid if roomid else sender), sender)
return True, "命令格式错误"
# 检查积分数是否为正整数
if not command[1].isdigit():
wcf.send_text(f"🈚️转账积分无效(必须为正整数!) \n{self.command_format}",
wcf.send_text(f"🈚️转账积分无效(必须为正整数!) \n积分转账 积分数 @用户",
(roomid if roomid else sender), sender)
return True, "积分无效"
# 检查@用户是否有效
at_users = self.at_list(xml)
if len(at_users) != 1:
wcf.send_text(f"转账失败❌\n🈚️转账人无效! \n{self.command_format}",
wcf.send_text(f"转账失败❌\n🈚️转账人无效! \n积分转账 积分数 @用户",
(roomid if roomid else sender), sender)
return True, "转账人无效"
@@ -140,7 +166,7 @@ class PointTradePlugin(MessagePluginInterface):
group_id = roomid
try:
# 使用新的积分系统进行转账
# 使用积分系统进行转账
success, result = self.points_db.transfer_points(
trader_wxid, target_wxid, group_id,
reward_points, f"积分转账命令执行"
@@ -186,6 +212,116 @@ class PointTradePlugin(MessagePluginInterface):
(roomid if roomid else sender), sender)
return True, f"处理出错: {str(e)}"
def _handle_my_points(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理查询个人积分命令"""
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf: Wcf = message.get("wcf")
try:
# 获取用户积分信息
user_points = self.points_db.get_user_points(sender, roomid)
if not user_points:
wcf.send_text(f"❌未找到你的积分记录!请先参与积分活动[签到,答题/t]。",
(roomid if roomid else sender), sender)
return True, "未找到积分记录"
# 获取用户昵称
user_info = self._get_user_record(sender, roomid)
user_name = user_info.get('wx_nick_name', sender) if user_info else sender
# 获取用户积分交易记录
transactions = self.points_db.get_user_transactions(sender, roomid, 5)
# 统计不同来源的积分
source_stats = {}
for tx in transactions:
source = tx.get('source', '其他')
points = tx.get('points', 0)
if source not in source_stats:
source_stats[source] = 0
source_stats[source] += points
# 构建积分详情消息
source_details = "\n".join([f"- {source}: {points}" for source, points in source_stats.items()])
# 构建最近交易记录
recent_txs = ""
if transactions:
recent_txs = "\n最近交易记录:\n"
for i, tx in enumerate(transactions[:5], 1):
tx_type = "获得" if tx.get('points', 0) > 0 else "消费"
points = abs(tx.get('points', 0))
desc = tx.get('description', '无描述')
date = tx.get('created_at', '').strftime('%m-%d %H:%M') if tx.get('created_at') else '未知时间'
recent_txs += f"{i}. {date} {tx_type} {points} 积分 - {desc}\n"
output = (
f"📊 {user_name} 的积分详情\n"
f"总积分: {user_points.get('total_points', 0)}\n"
f"\n积分来源统计:\n{source_details}"
f"{recent_txs}"
)
wcf.send_text(output, (roomid if roomid else sender), sender)
return True, "查询积分成功"
except Exception as e:
self.LOG.error(f"查询积分出错: {e}")
wcf.send_text(f"❌查询积分失败!请稍后重试。错误: {str(e)}",
(roomid if roomid else sender), sender)
return True, f"处理出错: {str(e)}"
def _handle_points_ranking(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理积分排行榜命令"""
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf: Wcf = message.get("wcf")
if not roomid:
wcf.send_text("❌积分排行榜仅在群聊中可用!", sender, "")
return True, "非群聊环境"
try:
# 获取群内积分排行
ranking = self.points_db.get_points_ranking(roomid, 10)
if not ranking:
wcf.send_text("❌暂无积分排行数据!请先参与积分活动。", roomid, sender)
return True, "无排行数据"
# 构建排行榜消息
rank_list = []
for i, user in enumerate(ranking, 1):
user_id = user.get('user_id', '')
points = user.get('total_points', 0)
# 获取用户昵称
user_info = self._get_user_record(user_id, roomid)
user_name = user_info.get('wx_nick_name', user_id) if user_info else user_id
# 添加排名标记
rank_mark = "🥇" if i == 1 else "🥈" if i == 2 else "🥉" if i == 3 else f"{i}."
rank_list.append(f"{rank_mark} {user_name}: {points} 积分")
output = (
f"🏆 积分排行榜 🏆\n"
f"群组: {roomid}\n"
f"\n"
f"{chr(10).join(rank_list)}\n"
f"\n"
f"更新时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
wcf.send_text(output, roomid, sender)
return True, "查询排行榜成功"
except Exception as e:
self.LOG.error(f"查询积分排行榜出错: {e}")
wcf.send_text(f"❌查询积分排行榜失败!请稍后重试。错误: {str(e)}", roomid, sender)
return True, f"处理出错: {str(e)}"
def at_list(self, xml):
"""
解析消息中的 @用户列表