Files
abot/plugins/lucky_pot/main.py
liuwei f86cbddbf1 debug
2025-08-13 17:59:20 +08:00

680 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
拼手气大奖池插件 - 多人积分消耗游戏
"""
import asyncio
import random
import time
import uuid
from typing import Dict, List, Any, Optional, Tuple
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.connection import DBConnectionManager
from db.points_db import PointsDBOperator, PointSource
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus
from utils.wechat.contact_manager import ContactManager
class LuckyPotRoom:
"""奖池房间类"""
def __init__(self, room_id: str, entry_fee: int, max_players: int, countdown: int, creator: str, group_id: str):
self.room_id = room_id
self.status = "waiting" # waiting | drawing | finished
self.entry_fee = entry_fee
self.max_players = max_players
self.players = [] # 玩家ID列表
self.player_nicknames = {} # 玩家昵称字典 {user_id: nickname}
self.start_time = int(time.time())
self.countdown = countdown
self.creator = creator
self.group_id = group_id
self.timer = None
self.result = None
def add_player(self, user_id: str, nickname: str) -> bool:
"""添加玩家到房间"""
if self.status != "waiting":
return False
if user_id in self.players:
return False
self.players.append(user_id)
self.player_nicknames[user_id] = nickname
return True
def is_full(self) -> bool:
"""检查房间是否已满"""
return len(self.players) >= self.max_players
def is_empty(self) -> bool:
"""检查房间是否为空"""
return len(self.players) == 0
def get_player_count(self) -> int:
"""获取玩家数量"""
return len(self.players)
def get_total_pot(self) -> int:
"""获取奖池总额"""
return self.entry_fee * len(self.players)
def get_remaining_time(self) -> int:
"""获取剩余时间(秒)"""
elapsed = int(time.time()) - self.start_time
remaining = max(0, self.countdown - elapsed)
return remaining
def cancel_timer(self):
"""取消倒计时"""
if self.timer:
self.timer.cancel()
self.timer = None
class LuckyPotPlugin(MessagePluginInterface):
"""拼手气大奖池插件"""
# 功能权限常量
FEATURE_KEY = "LUCKY_POT"
FEATURE_DESCRIPTION = "🎯 拼手气大奖池 [大奖池, 开奖池, 拼手气]"
@property
def name(self) -> str:
return "拼手气大奖池"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "多人参与的拼手气大奖池游戏,消耗积分获得随机奖励"
@property
def author(self) -> str:
return "AI助手"
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.feature = self.register_feature()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self.db_manager = DBConnectionManager.get_instance()
self.points_db = PointsDBOperator(self.db_manager)
# 加载配置
self._commands = self._config.get("LuckyPot", {}).get("command", ["大奖池", "开奖池", "拼手气"])
self.command_format = self._config.get("LuckyPot", {}).get("command-format", "大奖池 创建/参与/查看")
self.enable = self._config.get("LuckyPot", {}).get("enable", True)
# 游戏参数配置
self.entry_fee = self._config.get("LuckyPot", {}).get("entry_fee", 100) # 入场费
self.max_players = self._config.get("LuckyPot", {}).get("max_players", 10) # 最大玩家数
self.countdown = self._config.get("LuckyPot", {}).get("countdown", 30) # 倒计时(秒)
self.house_cut_percent = self._config.get("LuckyPot", {}).get("house_cut_percent", 20) # 抽水比例
self.lucky_bonus_chance = self._config.get("LuckyPot", {}).get("lucky_bonus_chance", 20) # 欧皇奖励概率
self.lucky_bonus_percent = self._config.get("LuckyPot", {}).get("lucky_bonus_percent", 20) # 欧皇额外奖励比例
self.cooldown = self._config.get("LuckyPot", {}).get("cooldown", 5) # 冷却时间(秒)
# 活跃房间字典 {group_id: room}
self.active_rooms = {}
# 玩家冷却时间 {user_id: timestamp}
self.player_cooldowns = {}
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
# 取消所有活跃房间的定时器
for room in self.active_rooms.values():
room.cancel_timer()
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="拼手气大奖池")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
command_parts = content.split(" ")
command = command_parts[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm = message.get("gbm")
self.bot = message.get("bot")
self.revoke = message.get("revoke")
# 检查命令是否匹配
if command not in self._commands:
return False, "命令不匹配"
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
# 获取子命令
sub_command = command_parts[1] if len(command_parts) > 1 else "help"
# 处理不同的子命令
if sub_command == "创建" or sub_command == "create":
return await self._handle_create_room(message)
elif sub_command == "参与" or sub_command == "join":
return await self._handle_join_room(message)
elif sub_command == "查看" or sub_command == "view":
return await self._handle_view_room(message)
elif sub_command == "取消" or sub_command == "cancel":
return await self._handle_cancel_room(message)
else:
# 显示帮助信息
return await self._handle_help(message)
async def _handle_create_room(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理创建房间命令"""
sender = message.get("sender")
roomid = message.get("roomid", "")
target = roomid if roomid else sender
# 检查是否已有活跃房间
if roomid in self.active_rooms:
room = self.active_rooms[roomid]
if room.status == "waiting":
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
f"⚠️ 已有一个活跃的大奖池房间正在等待中!\n"
f"🕒 剩余时间: {room.get_remaining_time()}\n"
f"👥 当前人数: {room.get_player_count()}/{room.max_players}\n"
f"💰 当前奖池: {room.get_total_pot()} 积分",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "已有活跃房间"
# 检查用户积分是否足够
user_points = self.points_db.get_user_points(sender, roomid)
if user_points["total_points"] < self.entry_fee:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
f"❌ 积分不足!创建大奖池需要 {self.entry_fee} 积分,您当前只有 {user_points['total_points']} 积分。",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "积分不足"
# 创建新房间
room_id = str(uuid.uuid4())[:8]
room = LuckyPotRoom(
room_id=room_id,
entry_fee=self.entry_fee,
max_players=self.max_players,
countdown=self.countdown,
creator=sender,
group_id=roomid
)
# 创建者自动加入房间
nickname = ContactManager.get_instance().get_group_name(roomid, sender) or sender
room.add_player(sender, nickname)
# 扣除创建者积分
deduct_success, _ = self.points_db.deduct_points(
sender, roomid, self.entry_fee, PointSource.GAME,
f"参与大奖池游戏 {room_id}"
)
if not deduct_success:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
"❌ 扣除积分失败,无法创建大奖池!",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "扣除积分失败"
# 设置倒计时
room.timer = asyncio.create_task(self._room_countdown(room))
# 保存房间
self.active_rooms[roomid] = room
# 发送创建成功消息
await self.bot.send_text_message(target,
f"🎯 大奖池已创建!\n"
f"🆔 房间ID: {room_id}\n"
f"💰 入场费: {self.entry_fee} 积分\n"
f"👥 人数上限: {self.max_players}\n"
f"⏳ 倒计时: {self.countdown}\n"
f"\n🎮 发送 \"{self._commands[0]} 参与\" 加入游戏!",
sender)
return True, "创建房间成功"
async def _handle_join_room(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理加入房间命令"""
sender = message.get("sender")
roomid = message.get("roomid", "")
target = roomid if roomid else sender
# 检查是否有活跃房间
if roomid not in self.active_rooms:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
f"❌ 当前没有活跃的大奖池房间!\n发送 \"{self._commands[0]} 创建\" 创建一个新的大奖池。",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "没有活跃房间"
room = self.active_rooms[roomid]
# 检查房间状态
if room.status != "waiting":
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
"❌ 当前房间已经开始开奖或已结束,无法加入!",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "房间状态不允许加入"
# 检查是否已在房间中
if sender in room.players:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
"⚠️ 您已经在大奖池中了,无需重复加入!",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "已在房间中"
# 检查房间是否已满
if room.is_full():
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
"❌ 大奖池已满员,无法加入!",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "房间已满"
# 检查冷却时间
if sender in self.player_cooldowns:
last_join_time = self.player_cooldowns[sender]
if time.time() - last_join_time < self.cooldown:
remaining = int(self.cooldown - (time.time() - last_join_time))
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
f"⏳ 操作过于频繁,请等待 {remaining} 秒后再试!",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "冷却中"
# 检查用户积分是否足够
user_points = self.points_db.get_user_points(sender, roomid)
if user_points["total_points"] < room.entry_fee:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
f"❌ 积分不足!参与大奖池需要 {room.entry_fee} 积分,您当前只有 {user_points['total_points']} 积分。",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "积分不足"
# 扣除用户积分
deduct_success, _ = self.points_db.deduct_points(
sender, roomid, room.entry_fee, PointSource.GAME,
f"参与大奖池游戏 {room.room_id}"
)
if not deduct_success:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
"❌ 扣除积分失败,无法加入大奖池!",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "扣除积分失败"
# 获取用户昵称
nickname = ContactManager.get_instance().get_group_name(roomid, sender) or sender
# 添加用户到房间
room.add_player(sender, nickname)
# 更新冷却时间
self.player_cooldowns[sender] = time.time()
# 发送加入成功消息
await self.bot.send_text_message(target,
f"{nickname} 成功加入大奖池!\n"
f"👥 当前人数: {room.get_player_count()}/{room.max_players}\n"
f"💰 当前奖池: {room.get_total_pot()} 积分\n"
f"⏳ 剩余时间: {room.get_remaining_time()}",
sender)
# 如果房间已满,立即开奖
if room.is_full():
room.cancel_timer()
await self._draw_pot(room)
return True, "加入房间成功"
async def _handle_view_room(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理查看房间命令"""
sender = message.get("sender")
roomid = message.get("roomid", "")
target = roomid if roomid else sender
# 检查是否有活跃房间
if roomid not in self.active_rooms:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
f"❌ 当前没有活跃的大奖池房间!\n发送 \"{self._commands[0]} 创建\" 创建一个新的大奖池。",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "没有活跃房间"
room = self.active_rooms[roomid]
# 显示房间信息
if room.status == "waiting":
player_list = "\n".join([f"👤 {room.player_nicknames[p]}" for p in room.players])
await self.bot.send_text_message(target,
f"🎯 大奖池状态: 等待中\n"
f"🆔 房间ID: {room.room_id}\n"
f"💰 入场费: {room.entry_fee} 积分\n"
f"👥 当前人数: {room.get_player_count()}/{room.max_players}\n"
f"💰 当前奖池: {room.get_total_pot()} 积分\n"
f"⏳ 剩余时间: {room.get_remaining_time()}\n\n"
f"参与玩家:\n{player_list}",
sender)
elif room.status == "drawing":
await self.bot.send_text_message(target, "🎯 大奖池正在开奖中,请稍候...", sender)
elif room.status == "finished" and room.result:
# 显示结果
result_text = self._format_result_message(room)
await self.bot.send_text_message(target, result_text, sender)
return True, "查看房间成功"
async def _handle_cancel_room(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理取消房间命令"""
sender = message.get("sender")
roomid = message.get("roomid", "")
target = roomid if roomid else sender
# 检查是否有活跃房间
if roomid not in self.active_rooms:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
"❌ 当前没有活跃的大奖池房间!",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "没有活跃房间"
room = self.active_rooms[roomid]
# 检查是否是创建者
if room.creator != sender:
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
"❌ 只有房间创建者才能取消大奖池!",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "不是创建者"
# 检查房间状态
if room.status != "waiting":
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target,
"❌ 房间已经开始开奖或已结束,无法取消!",
sender)
self.revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 5)
return True, "房间状态不允许取消"
# 取消倒计时
room.cancel_timer()
# 退还所有玩家积分
for player in room.players:
self.points_db.add_points(
player, roomid, room.entry_fee, PointSource.GAME,
f"大奖池 {room.room_id} 被取消,退还积分"
)
# 移除房间
del self.active_rooms[roomid]
# 发送取消成功消息
await self.bot.send_text_message(target,
f"🚫 大奖池已被创建者取消!\n所有玩家的积分已退还。",
sender)
return True, "取消房间成功"
async def _handle_help(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理帮助命令"""
sender = message.get("sender")
roomid = message.get("roomid", "")
target = roomid if roomid else sender
help_text = f"🎯 拼手气大奖池 - 游戏说明\n\n"
help_text += f"💰 入场费: {self.entry_fee} 积分\n"
help_text += f"👥 人数上限: {self.max_players}\n"
help_text += f"⏳ 倒计时: {self.countdown}\n"
help_text += f"💸 抽水比例: {self.house_cut_percent}%\n"
help_text += f"🍀 欧皇概率: {self.lucky_bonus_chance}%\n\n"
help_text += f"🎮 游戏玩法:\n"
help_text += f"1. 创建或加入大奖池,支付 {self.entry_fee} 积分入场费\n"
help_text += f"2. 人数满或倒计时结束后自动开奖\n"
help_text += f"3. 系统抽取 {self.house_cut_percent}% 后,剩余积分随机分配给所有参与者\n"
help_text += f"4. 有 {self.lucky_bonus_chance}% 概率产生欧皇,获得额外 {self.lucky_bonus_percent}% 奖励\n\n"
help_text += f"🎲 指令列表:\n"
help_text += f"{self._commands[0]} 创建 - 创建新的大奖池\n"
help_text += f"{self._commands[0]} 参与 - 加入当前大奖池\n"
help_text += f"{self._commands[0]} 查看 - 查看当前大奖池状态\n"
help_text += f"{self._commands[0]} 取消 - 取消当前大奖池(仅创建者可用)"
await self.bot.send_text_message(target, help_text, sender)
return True, "显示帮助信息"
async def _room_countdown(self, room: LuckyPotRoom):
"""房间倒计时"""
try:
await asyncio.sleep(room.countdown)
# 如果房间还在等待中且有玩家,则开奖
if room.status == "waiting" and not room.is_empty():
await self._draw_pot(room)
except asyncio.CancelledError:
# 倒计时被取消
pass
except Exception as e:
self.LOG.error(f"房间倒计时出错: {e}")
async def _draw_pot(self, room: LuckyPotRoom):
"""开奖"""
try:
# 更新房间状态
room.status = "drawing"
# 发送开奖提示
target = room.group_id if room.group_id else room.creator
await self.bot.send_text_message(target,
f"🧧 大奖池开奖中...\n💥💥💥",
room.creator)
# 等待一小段时间,增加悬念
await asyncio.sleep(2)
# 计算奖池金额
total_pot = room.get_total_pot()
# 计算抽水金额
house_cut = int(total_pot * (self.house_cut_percent / 100))
remaining_pot = total_pot - house_cut
# 决定是否有欧皇
has_lucky_winner = random.randint(1, 100) <= self.lucky_bonus_chance
lucky_winner = None
lucky_bonus = 0
if has_lucky_winner and len(room.players) > 1:
lucky_winner = random.choice(room.players)
lucky_bonus = int(remaining_pot * (self.lucky_bonus_percent / 100))
remaining_pot -= lucky_bonus
# 随机分配奖池
rewards = self._distribute_prizes(room.players, remaining_pot)
# 如果有欧皇,增加额外奖励
if lucky_winner:
for i, (player, reward) in enumerate(rewards):
if player == lucky_winner:
rewards[i] = (player, reward + lucky_bonus)
break
# 保存结果
room.result = {
"total_pot": total_pot,
"house_cut": house_cut,
"rewards": rewards,
"lucky_winner": lucky_winner,
"lucky_bonus": lucky_bonus
}
# 更新玩家积分
for player, reward in rewards:
self.points_db.add_points(
player, room.group_id, reward, PointSource.GAME,
f"大奖池 {room.room_id} 获得奖励"
)
# 更新房间状态
room.status = "finished"
# 发送结果
result_text = self._format_result_message(room)
await self.bot.send_text_message(target, result_text, room.creator)
# 延迟一段时间后清理房间
await asyncio.sleep(30)
if room.group_id in self.active_rooms and self.active_rooms[room.group_id].room_id == room.room_id:
del self.active_rooms[room.group_id]
except Exception as e:
self.LOG.error(f"开奖出错: {e}")
# 尝试恢复房间状态
room.status = "waiting"
# 重新设置倒计时
room.timer = asyncio.create_task(self._room_countdown(room))
def _distribute_prizes(self, players: List[str], total_points: int) -> List[Tuple[str, int]]:
"""随机分配奖池"""
rewards = []
remaining = total_points
player_count = len(players)
# 确保每个玩家至少获得1积分
min_points = 1
# 如果总积分不足以给每人1分则平均分配
if total_points < player_count:
points_per_player = max(1, total_points // player_count)
return [(player, points_per_player) for player in players]
# 为每个玩家随机分配积分,除了最后一个
for i in range(player_count - 1):
# 确保剩余玩家每人至少能分到1积分
max_points = remaining - (player_count - i - 1) * min_points
# 随机分配积分
share = random.randint(min_points, max_points) if max_points > min_points else min_points
rewards.append(share)
remaining -= share
# 最后一个玩家获得剩余所有积分
rewards.append(remaining)
# 随机打乱奖励顺序
random.shuffle(rewards)
# 将玩家和奖励配对
return list(zip(players, rewards))
def _format_result_message(self, room: LuckyPotRoom) -> str:
"""格式化结果消息"""
if not room.result:
return "❌ 没有结果数据"
result = room.result
total_pot = result["total_pot"]
house_cut = result["house_cut"]
rewards = result["rewards"]
lucky_winner = result["lucky_winner"]
lucky_bonus = result["lucky_bonus"]
# 按奖励金额排序
sorted_rewards = sorted(rewards, key=lambda x: x[1], reverse=True)
message = f"🎯 大奖池开奖结果\n\n"
message += f"💰 总奖池: {total_pot} 积分\n"
message += f"💸 系统抽水: {house_cut} 积分 ({self.house_cut_percent}%)\n"
message += f"🎁 实际分配: {total_pot - house_cut} 积分\n\n"
# 如果有欧皇,先显示欧皇信息
if lucky_winner:
lucky_name = room.player_nicknames.get(lucky_winner, lucky_winner)
message += f"🏆 恭喜 {lucky_name} 成为本轮欧皇!额外获得 {lucky_bonus} 积分!\n\n"
# 显示所有玩家的奖励
message += "🎮 玩家奖励:\n"
for i, (player, reward) in enumerate(sorted_rewards):
player_name = room.player_nicknames.get(player, player)
entry_fee = room.entry_fee
profit = reward - entry_fee
profit_text = f"+{profit}" if profit >= 0 else f"{profit}"
# 根据排名和盈亏添加不同的表情
if i == 0:
emoji = "🥇"
elif i == 1 and len(sorted_rewards) > 1:
emoji = "🥈"
elif i == 2 and len(sorted_rewards) > 2:
emoji = "🥉"
elif profit > 0:
emoji = "😄"
elif profit < 0:
emoji = "😭"
else:
emoji = "😐"
# 为欧皇添加特殊标记
if player == lucky_winner:
emoji = "🏆"
# 为特别惨的玩家添加特殊文案
if profit < -entry_fee * 0.9: # 亏损超过90%
message += f"{emoji} {player_name}: {reward} 积分 ({profit_text}) 💔 这也太惨了!\n"
else:
message += f"{emoji} {player_name}: {reward} 积分 ({profit_text})\n"
return message