新增拼手气功能。

This commit is contained in:
liuwei
2025-08-13 17:36:14 +08:00
parent cbceb71681
commit 07e2e35665
4 changed files with 870 additions and 0 deletions

651
plugins/lucky_pot/main.py Normal file
View File

@@ -0,0 +1,651 @@
# -*- coding: utf-8 -*-
"""
拼手气大奖池插件 - 多人积分消耗游戏
"""
import asyncio
import random
import time
import uuid
from typing import Dict, List, Any, Optional, Tuple
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.connection import DBConnectionManager
from db.points_db import PointsDBOperator, PointSource
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus
class LuckyPotRoom:
"""奖池房间类"""
def __init__(self, room_id: str, entry_fee: int, max_players: int, countdown: int, creator: str, group_id: str):
self.room_id = room_id
self.status = "waiting" # waiting | drawing | finished
self.entry_fee = entry_fee
self.max_players = max_players
self.players = [] # 玩家ID列表
self.player_nicknames = {} # 玩家昵称字典 {user_id: nickname}
self.start_time = int(time.time())
self.countdown = countdown
self.creator = creator
self.group_id = group_id
self.timer = None
self.result = None
def add_player(self, user_id: str, nickname: str) -> bool:
"""添加玩家到房间"""
if self.status != "waiting":
return False
if user_id in self.players:
return False
self.players.append(user_id)
self.player_nicknames[user_id] = nickname
return True
def is_full(self) -> bool:
"""检查房间是否已满"""
return len(self.players) >= self.max_players
def is_empty(self) -> bool:
"""检查房间是否为空"""
return len(self.players) == 0
def get_player_count(self) -> int:
"""获取玩家数量"""
return len(self.players)
def get_total_pot(self) -> int:
"""获取奖池总额"""
return self.entry_fee * len(self.players)
def get_remaining_time(self) -> int:
"""获取剩余时间(秒)"""
elapsed = int(time.time()) - self.start_time
remaining = max(0, self.countdown - elapsed)
return remaining
def cancel_timer(self):
"""取消倒计时"""
if self.timer:
self.timer.cancel()
self.timer = None
class LuckyPotPlugin(MessagePluginInterface):
"""拼手气大奖池插件"""
# 功能权限常量
FEATURE_KEY = "LUCKY_POT"
FEATURE_DESCRIPTION = "🎯 拼手气大奖池 [大奖池, 开奖池, 拼手气]"
@property
def name(self) -> str:
return "拼手气大奖池"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "多人参与的拼手气大奖池游戏,消耗积分获得随机奖励"
@property
def author(self) -> str:
return "AI助手"
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature(self) -> Optional[Feature]:
"""获取功能权限"""
return Feature.get_feature_by_key(self.FEATURE_KEY)
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self.db_manager = DBConnectionManager.get_instance()
self.points_db = PointsDBOperator(self.db_manager)
# 加载配置
self._commands = self._config.get("LuckyPot", {}).get("command", ["大奖池", "开奖池", "拼手气"])
self.command_format = self._config.get("LuckyPot", {}).get("command-format", "大奖池 创建/参与/查看")
self.enable = self._config.get("LuckyPot", {}).get("enable", True)
# 游戏参数配置
self.entry_fee = self._config.get("LuckyPot", {}).get("entry_fee", 100) # 入场费
self.max_players = self._config.get("LuckyPot", {}).get("max_players", 10) # 最大玩家数
self.countdown = self._config.get("LuckyPot", {}).get("countdown", 30) # 倒计时(秒)
self.house_cut_percent = self._config.get("LuckyPot", {}).get("house_cut_percent", 20) # 抽水比例
self.lucky_bonus_chance = self._config.get("LuckyPot", {}).get("lucky_bonus_chance", 20) # 欧皇奖励概率
self.lucky_bonus_percent = self._config.get("LuckyPot", {}).get("lucky_bonus_percent", 20) # 欧皇额外奖励比例
self.cooldown = self._config.get("LuckyPot", {}).get("cooldown", 5) # 冷却时间(秒)
# 活跃房间字典 {group_id: room}
self.active_rooms = {}
# 玩家冷却时间 {user_id: timestamp}
self.player_cooldowns = {}
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
# 取消所有活跃房间的定时器
for room in self.active_rooms.values():
room.cancel_timer()
self.status = PluginStatus.STOPPED
return True
@plugin_stats_decorator(plugin_name="拼手气大奖池")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
command_parts = content.split(" ")
command = command_parts[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm = message.get("gbm")
self.bot = message.get("bot")
self.revoke = message.get("revoke")
# 检查命令是否匹配
if command not in self._commands:
return False, "命令不匹配"
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
# 获取子命令
sub_command = command_parts[1] if len(command_parts) > 1 else "help"
# 处理不同的子命令
if sub_command == "创建" or sub_command == "create":
return await self._handle_create_room(message)
elif sub_command == "参与" or sub_command == "join":
return await self._handle_join_room(message)
elif sub_command == "查看" or sub_command == "view":
return await self._handle_view_room(message)
elif sub_command == "取消" or sub_command == "cancel":
return await self._handle_cancel_room(message)
else:
# 显示帮助信息
return await self._handle_help(message)
async def _handle_create_room(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理创建房间命令"""
sender = message.get("sender")
roomid = message.get("roomid", "")
target = roomid if roomid else sender
# 检查是否已有活跃房间
if roomid in self.active_rooms:
room = self.active_rooms[roomid]
if room.status == "waiting":
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
f"⚠️ 已有一个活跃的大奖池房间正在等待中!\n"
f"🕒 剩余时间: {room.get_remaining_time()}\n"
f"👥 当前人数: {room.get_player_count()}/{room.max_players}\n"
f"💰 当前奖池: {room.get_total_pot()} 积分",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "已有活跃房间"
# 检查用户积分是否足够
user_points = self.points_db.get_user_points(sender, roomid)
if user_points["total_points"] < self.entry_fee:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
f"❌ 积分不足!创建大奖池需要 {self.entry_fee} 积分,您当前只有 {user_points['total_points']} 积分。",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "积分不足"
# 创建新房间
room_id = str(uuid.uuid4())[:8]
room = LuckyPotRoom(
room_id=room_id,
entry_fee=self.entry_fee,
max_players=self.max_players,
countdown=self.countdown,
creator=sender,
group_id=roomid
)
# 创建者自动加入房间
nickname = self._get_nickname(message, sender)
room.add_player(sender, nickname)
# 扣除创建者积分
deduct_success, _ = self.points_db.deduct_points(
sender, roomid, self.entry_fee, PointSource.GAME,
f"参与大奖池游戏 {room_id}"
)
if not deduct_success:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target, "❌ 扣除积分失败,无法创建大奖池!", sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "扣除积分失败"
# 设置倒计时
room.timer = asyncio.create_task(self._room_countdown(room))
# 保存房间
self.active_rooms[roomid] = room
# 发送创建成功消息
await self.bot.send_text_message(target,
f"🎯 大奖池已创建!\n"
f"🆔 房间ID: {room_id}\n"
f"💰 入场费: {self.entry_fee} 积分\n"
f"👥 人数上限: {self.max_players}\n"
f"⏳ 倒计时: {self.countdown}\n"
f"\n🎮 发送 \"{self._commands[0]} 参与\" 加入游戏!",
sender)
return True, "创建房间成功"
async def _handle_join_room(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理加入房间命令"""
sender = message.get("sender")
roomid = message.get("roomid", "")
target = roomid if roomid else sender
# 检查是否有活跃房间
if roomid not in self.active_rooms:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
f"❌ 当前没有活跃的大奖池房间!\n发送 \"{self._commands[0]} 创建\" 创建一个新的大奖池。",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "没有活跃房间"
room = self.active_rooms[roomid]
# 检查房间状态
if room.status != "waiting":
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target, "❌ 当前房间已经开始开奖或已结束,无法加入!", sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "房间状态不允许加入"
# 检查是否已在房间中
if sender in room.players:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target, "⚠️ 您已经在大奖池中了,无需重复加入!", sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "已在房间中"
# 检查房间是否已满
if room.is_full():
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target, "❌ 大奖池已满员,无法加入!", sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "房间已满"
# 检查冷却时间
if sender in self.player_cooldowns:
last_join_time = self.player_cooldowns[sender]
if time.time() - last_join_time < self.cooldown:
remaining = int(self.cooldown - (time.time() - last_join_time))
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
f"⏳ 操作过于频繁,请等待 {remaining} 秒后再试!",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "冷却中"
# 检查用户积分是否足够
user_points = self.points_db.get_user_points(sender, roomid)
if user_points["total_points"] < room.entry_fee:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
f"❌ 积分不足!参与大奖池需要 {room.entry_fee} 积分,您当前只有 {user_points['total_points']} 积分。",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "积分不足"
# 扣除用户积分
deduct_success, _ = self.points_db.deduct_points(
sender, roomid, room.entry_fee, PointSource.GAME,
f"参与大奖池游戏 {room.room_id}"
)
if not deduct_success:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target, "❌ 扣除积分失败,无法加入大奖池!", sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "扣除积分失败"
# 获取用户昵称
nickname = self._get_nickname(message, sender)
# 添加用户到房间
room.add_player(sender, nickname)
# 更新冷却时间
self.player_cooldowns[sender] = time.time()
# 发送加入成功消息
await self.bot.send_text_message(target,
f"{nickname} 成功加入大奖池!\n"
f"👥 当前人数: {room.get_player_count()}/{room.max_players}\n"
f"💰 当前奖池: {room.get_total_pot()} 积分\n"
f"⏳ 剩余时间: {room.get_remaining_time()}",
sender)
# 如果房间已满,立即开奖
if room.is_full():
room.cancel_timer()
await self._draw_pot(room)
return True, "加入房间成功"
async def _handle_view_room(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理查看房间命令"""
sender = message.get("sender")
roomid = message.get("roomid", "")
target = roomid if roomid else sender
# 检查是否有活跃房间
if roomid not in self.active_rooms:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
f"❌ 当前没有活跃的大奖池房间!\n发送 \"{self._commands[0]} 创建\" 创建一个新的大奖池。",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "没有活跃房间"
room = self.active_rooms[roomid]
# 显示房间信息
if room.status == "waiting":
player_list = "\n".join([f"👤 {room.player_nicknames[p]}" for p in room.players])
await self.bot.send_text_message(target,
f"🎯 大奖池状态: 等待中\n"
f"🆔 房间ID: {room.room_id}\n"
f"💰 入场费: {room.entry_fee} 积分\n"
f"👥 当前人数: {room.get_player_count()}/{room.max_players}\n"
f"💰 当前奖池: {room.get_total_pot()} 积分\n"
f"⏳ 剩余时间: {room.get_remaining_time()}\n\n"
f"参与玩家:\n{player_list}",
sender)
elif room.status == "drawing":
await self.bot.send_text_message(target, "🎯 大奖池正在开奖中,请稍候...", sender)
elif room.status == "finished" and room.result:
# 显示结果
result_text = self._format_result_message(room)
await self.bot.send_text_message(target, result_text, sender)
return True, "查看房间成功"
async def _handle_cancel_room(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理取消房间命令"""
sender = message.get("sender")
roomid = message.get("roomid", "")
target = roomid if roomid else sender
# 检查是否有活跃房间
if roomid not in self.active_rooms:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target, "❌ 当前没有活跃的大奖池房间!", sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "没有活跃房间"
room = self.active_rooms[roomid]
# 检查是否是创建者
if room.creator != sender:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target, "❌ 只有房间创建者才能取消大奖池!", sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "不是创建者"
# 检查房间状态
if room.status != "waiting":
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target, "❌ 房间已经开始开奖或已结束,无法取消!", sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "房间状态不允许取消"
# 取消倒计时
room.cancel_timer()
# 退还所有玩家积分
for player in room.players:
self.points_db.add_points(
player, roomid, room.entry_fee, PointSource.GAME,
f"大奖池 {room.room_id} 被取消,退还积分"
)
# 移除房间
del self.active_rooms[roomid]
# 发送取消成功消息
await self.bot.send_text_message(target,
f"🚫 大奖池已被创建者取消!\n所有玩家的积分已退还。",
sender)
return True, "取消房间成功"
async def _handle_help(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理帮助命令"""
sender = message.get("sender")
roomid = message.get("roomid", "")
target = roomid if roomid else sender
help_text = f"🎯 拼手气大奖池 - 游戏说明\n\n"
help_text += f"💰 入场费: {self.entry_fee} 积分\n"
help_text += f"👥 人数上限: {self.max_players}\n"
help_text += f"⏳ 倒计时: {self.countdown}\n"
help_text += f"💸 抽水比例: {self.house_cut_percent}%\n"
help_text += f"🍀 欧皇概率: {self.lucky_bonus_chance}%\n\n"
help_text += f"🎮 游戏玩法:\n"
help_text += f"1. 创建或加入大奖池,支付 {self.entry_fee} 积分入场费\n"
help_text += f"2. 人数满或倒计时结束后自动开奖\n"
help_text += f"3. 系统抽取 {self.house_cut_percent}% 后,剩余积分随机分配给所有参与者\n"
help_text += f"4. 有 {self.lucky_bonus_chance}% 概率产生欧皇,获得额外 {self.lucky_bonus_percent}% 奖励\n\n"
help_text += f"🎲 指令列表:\n"
help_text += f"{self._commands[0]} 创建 - 创建新的大奖池\n"
help_text += f"{self._commands[0]} 参与 - 加入当前大奖池\n"
help_text += f"{self._commands[0]} 查看 - 查看当前大奖池状态\n"
help_text += f"{self._commands[0]} 取消 - 取消当前大奖池(仅创建者可用)"
await self.bot.send_text_message(target, help_text, sender)
return True, "显示帮助信息"
async def _room_countdown(self, room: LuckyPotRoom):
"""房间倒计时"""
try:
await asyncio.sleep(room.countdown)
# 如果房间还在等待中且有玩家,则开奖
if room.status == "waiting" and not room.is_empty():
await self._draw_pot(room)
except asyncio.CancelledError:
# 倒计时被取消
pass
except Exception as e:
self.LOG.error(f"房间倒计时出错: {e}")
async def _draw_pot(self, room: LuckyPotRoom):
"""开奖"""
try:
# 更新房间状态
room.status = "drawing"
# 发送开奖提示
target = room.group_id if room.group_id else room.creator
await self.bot.send_text_message(target,
f"🧧 大奖池开奖中...\n💥💥💥",
room.creator)
# 等待一小段时间,增加悬念
await asyncio.sleep(2)
# 计算奖池金额
total_pot = room.get_total_pot()
# 计算抽水金额
house_cut = int(total_pot * (self.house_cut_percent / 100))
remaining_pot = total_pot - house_cut
# 决定是否有欧皇
has_lucky_winner = random.randint(1, 100) <= self.lucky_bonus_chance
lucky_winner = None
lucky_bonus = 0
if has_lucky_winner and len(room.players) > 1:
lucky_winner = random.choice(room.players)
lucky_bonus = int(remaining_pot * (self.lucky_bonus_percent / 100))
remaining_pot -= lucky_bonus
# 随机分配奖池
rewards = self._distribute_prizes(room.players, remaining_pot)
# 如果有欧皇,增加额外奖励
if lucky_winner:
for i, (player, reward) in enumerate(rewards):
if player == lucky_winner:
rewards[i] = (player, reward + lucky_bonus)
break
# 保存结果
room.result = {
"total_pot": total_pot,
"house_cut": house_cut,
"rewards": rewards,
"lucky_winner": lucky_winner,
"lucky_bonus": lucky_bonus
}
# 更新玩家积分
for player, reward in rewards:
self.points_db.add_points(
player, room.group_id, reward, PointSource.GAME,
f"大奖池 {room.room_id} 获得奖励"
)
# 更新房间状态
room.status = "finished"
# 发送结果
result_text = self._format_result_message(room)
await self.bot.send_text_message(target, result_text, room.creator)
# 延迟一段时间后清理房间
await asyncio.sleep(30)
if room.group_id in self.active_rooms and self.active_rooms[room.group_id].room_id == room.room_id:
del self.active_rooms[room.group_id]
except Exception as e:
self.LOG.error(f"开奖出错: {e}")
# 尝试恢复房间状态
room.status = "waiting"
# 重新设置倒计时
room.timer = asyncio.create_task(self._room_countdown(room))
def _distribute_prizes(self, players: List[str], total_points: int) -> List[Tuple[str, int]]:
"""随机分配奖池"""
rewards = []
remaining = total_points
player_count = len(players)
# 确保每个玩家至少获得1积分
min_points = 1
# 如果总积分不足以给每人1分则平均分配
if total_points < player_count:
points_per_player = max(1, total_points // player_count)
return [(player, points_per_player) for player in players]
# 为每个玩家随机分配积分,除了最后一个
for i in range(player_count - 1):
# 确保剩余玩家每人至少能分到1积分
max_points = remaining - (player_count - i - 1) * min_points
# 随机分配积分
share = random.randint(min_points, max_points) if max_points > min_points else min_points
rewards.append(share)
remaining -= share
# 最后一个玩家获得剩余所有积分
rewards.append(remaining)
# 随机打乱奖励顺序
random.shuffle(rewards)
# 将玩家和奖励配对
return list(zip(players, rewards))
def _format_result_message(self, room: LuckyPotRoom) -> str:
"""格式化结果消息"""
if not room.result:
return "❌ 没有结果数据"
result = room.result
total_pot = result["total_pot"]
house_cut = result["house_cut"]
rewards = result["rewards"]
lucky_winner = result["lucky_winner"]
lucky_bonus = result["lucky_bonus"]
# 按奖励金额排序
sorted_rewards = sorted(rewards, key=lambda x: x[1], reverse=True)
message = f"🎯 大奖池开奖结果\n\n"
message += f"💰 总奖池: {total_pot} 积分\n"
message += f"💸 系统抽水: {house_cut} 积分 ({self.house_cut_percent}%)\n"
message += f"🎁 实际分配: {total_pot - house_cut} 积分\n\n"
# 如果有欧皇,先显示欧皇信息
if lucky_winner:
lucky_name = room.player_nicknames.get(lucky_winner, lucky_winner)
message += f"🏆 恭喜 {lucky_name} 成为本轮欧皇!额外获得 {lucky_bonus} 积分!\n\n"
# 显示所有玩家的奖励
message += "🎮 玩家奖励:\n"
for i, (player, reward) in enumerate(sorted_rewards):
player_name = room.player_nicknames.get(player, player)
entry_fee = room.entry_fee
profit = reward - entry_fee
profit_text = f"+{profit}" if profit >= 0 else f"{profit}"
# 根据排名和盈亏添加不同的表情
if i == 0:
emoji = "🥇"
elif i == 1 and len(sorted_rewards) > 1:
emoji = "🥈"
elif i == 2 and len(sorted_rewards) > 2:
emoji = "🥉"
elif profit > 0:
emoji = "😄"
elif profit < 0:
emoji = "😭"
else:
emoji = "😐"
# 为欧皇添加特殊标记
if player == lucky_winner:
emoji = "🏆"
# 为特别惨的玩家添加特殊文案
if profit < -entry_fee * 0.9: # 亏损超过90%
message += f"{emoji} {player_name}: {reward} 积分 ({profit_text}) 💔 这也太惨了!\n"
else:
message += f"{emoji} {player_name}: {reward} 积分 ({profit_text})\n"
return message
def _get_nickname(self, message: Dict[str, Any], user_id: str) -> str:
"""获取用户昵称"""
all_contacts = message.get("all_contacts", {})
if user_id in all_contacts:
return all_contacts[user_id].get("nickname", user_id)
return user_id