修仙功能开发上线

This commit is contained in:
liuwei
2025-11-18 11:19:59 +08:00
parent ca6969303d
commit eddc013182
6 changed files with 1883 additions and 1 deletions

990
plugins/xiuxian/main.py Normal file
View File

@@ -0,0 +1,990 @@
"""
群聊文字修仙插件
说明:
- 原子指令 + 状态机 + 限流 + Cache-Aside
- 读优先 Redis写 DB 后删除缓存,由读取路径回填
- 群频次限制与用户冷却共同防骚扰/防封
"""
import json
import time
import random
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Tuple
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.decorator.points_decorator import plugin_points_cost
from utils.decorator.rate_limit_decorator import group_feature_rate_limit
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
from db.connection import DBConnectionManager
from db.xiuxian_db import XiuxianDB
from datetime import timezone
class XiuxianRedisDB:
"""修仙插件的 Redis 访问封装:玩家缓存、限流键与排行榜。"""
def __init__(self, db_manager: DBConnectionManager):
self.db_manager = db_manager
self.player_prefix = "xiuxian:cache:player:"
self.leaderboard_key = None
self.leaderboard_realm_key = None
def set_leaderboard_key(self, key: str):
self.leaderboard_key = key
def set_realm_leaderboard_key(self, key: str):
self.leaderboard_realm_key = key
def get_redis(self):
return self.db_manager.get_redis_connection()
def get_player(self, user_id: str) -> Optional[Dict[str, Any]]:
try:
with self.get_redis() as r:
data = r.get(f"{self.player_prefix}{user_id}")
if data:
if isinstance(data, bytes):
data = data.decode("utf-8")
return json.loads(data)
return None
except Exception as e:
logger.error(f"读取玩家数据失败: {e}")
return None
def save_player(self, player: Dict[str, Any]) -> bool:
try:
with self.get_redis() as r:
r.set(f"{self.player_prefix}{player['user_id']}", json.dumps(player, ensure_ascii=False))
return True
except Exception as e:
logger.error(f"保存玩家数据失败: {e}")
return False
def invalidate_player(self, user_id: str):
try:
with self.get_redis() as r:
r.delete(f"{self.player_prefix}{user_id}")
except Exception as e:
logger.error(f"失效玩家缓存失败: {e}")
def set_rate_limit(self, user_id: str, cmd: str, seconds: int) -> bool:
try:
with self.get_redis() as r:
key = f"xiuxian:rate_limit:user:{user_id}:cmd:{cmd}"
r.setex(key, seconds, "1")
return True
except Exception as e:
logger.error(f"设置限流失败: {e}")
return False
def check_rate_limited(self, user_id: str, cmd: str) -> bool:
try:
with self.get_redis() as r:
key = f"xiuxian:rate_limit:user:{user_id}:cmd:{cmd}"
return r.exists(key) == 1
except Exception as e:
logger.error(f"检查限流失败: {e}")
return False
def leaderboard_add(self, user_id: str, score: float):
try:
if not self.leaderboard_key:
return
with self.get_redis() as r:
r.zadd(self.leaderboard_key, {user_id: score})
except Exception as e:
logger.error(f"更新排行榜失败: {e}")
def leaderboard_top(self, top_n: int = 10) -> List[Tuple[str, float]]:
try:
if not self.leaderboard_key:
return []
with self.get_redis() as r:
res = r.zrevrange(self.leaderboard_key, 0, top_n - 1, withscores=True)
return [(uid if isinstance(uid, str) else uid.decode("utf-8"), score) for uid, score in res]
except Exception as e:
logger.error(f"读取排行榜失败: {e}")
return []
def leaderboard_realm_add(self, user_id: str, score: float):
try:
if not self.leaderboard_realm_key:
return
with self.get_redis() as r:
r.zadd(self.leaderboard_realm_key, {user_id: score})
except Exception as e:
logger.error(f"更新境界排行榜失败: {e}")
def leaderboard_realm_top(self, top_n: int = 10) -> List[Tuple[str, float]]:
try:
if not self.leaderboard_realm_key:
return []
with self.get_redis() as r:
res = r.zrevrange(self.leaderboard_realm_key, 0, top_n - 1, withscores=True)
return [(uid if isinstance(uid, str) else uid.decode("utf-8"), score) for uid, score in res]
except Exception as e:
logger.error(f"读取境界排行榜失败: {e}")
return []
class XiuxianPlugin(MessagePluginInterface):
"""修仙主插件:指令分发、业务执行与 DB/Redis 协作。"""
FEATURE_KEY = "XIUXIAN"
FEATURE_DESCRIPTION = "🧙‍♂️ 文字修仙 [注册修仙|我的状态|闭关|出关|聚灵|排行榜]"
@property
def name(self) -> str:
return "群聊文字修仙"
@property
def version(self) -> str:
return "0.1.0"
@property
def description(self) -> str:
return "基于Redis的简化修仙玩法含闭关与状态机"
@property
def author(self) -> str:
return "AI助手"
@property
def command_prefix(self) -> Optional[str]:
return ""
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.feature = self.register_feature()
self.redis_db: Optional[XiuxianRedisDB] = None
self.xdb: Optional[XiuxianDB] = None
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件:加载配置,接入 DB 与 Redis并注册功能权限。"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
self.event_system = context.get("event_system")
self.db_manager = DBConnectionManager.get_instance()
if self.db_manager:
self.redis_db = XiuxianRedisDB(self.db_manager)
# 初始化持久化层(如连接池可用)
if self.db_manager.mysql_pool:
self.xdb = XiuxianDB(self.db_manager)
try:
self.xdb.init_schema()
except Exception as e:
self.LOG.error(f"修仙表结构初始化失败: {e}")
cfg = self._config.get("Xiuxian", {})
self._commands = cfg.get("command", ["注册修仙", "我的状态", "闭关", "出关", "聚灵", "排行榜"])
self.command_format = cfg.get("command-format", "注册修仙 道号 | 我的状态 | 闭关 | 出关 | 聚灵 数量 | 排行榜")
self.enable = cfg.get("enable", True)
status_cfg = cfg.get("status", {})
self.unstable_qi_minutes = status_cfg.get("unstable_qi_minutes", 15)
self.injured_minutes = status_cfg.get("injured_minutes", 60)
self.max_cultivate_hours = status_cfg.get("max_cultivate_hours", 8)
rate_cfg = cfg.get("rate_limit", {})
self.seconds_rl = {
"我的状态": rate_cfg.get("status_seconds", 3),
"闭关": rate_cfg.get("inout_seconds", 5),
"出关": rate_cfg.get("inout_seconds", 5),
"聚灵": rate_cfg.get("gather_seconds", 30),
"排行榜": rate_cfg.get("break_seconds", 60),
"注册修仙": 5,
"签到": rate_cfg.get("signin_seconds", 86400),
"坊市": rate_cfg.get("shop_seconds", 10),
"购买": rate_cfg.get("buy_seconds", 5),
"乾坤袋": rate_cfg.get("bag_seconds", 3),
"突破": rate_cfg.get("break_seconds", 60),
"强行突破": rate_cfg.get("force_break_seconds", 60),
"劫掠": rate_cfg.get("rob_seconds", 30),
"赠与": rate_cfg.get("gift_seconds", 10),
"赠送": rate_cfg.get("gift_seconds", 10),
"创建门派": 86400,
"加入门派": 604800,
"退出门派": 604800,
}
cult_cfg = cfg.get("cultivation", {})
self.base_rate_per_hour = cult_cfg.get("base_rate_per_hour", 100)
self.spirit_roots_cfg = cult_cfg.get("spirit_roots", ["凡灵根:1.0", "天灵根:2.0"])
self.spirit_roots = []
for item in self.spirit_roots_cfg:
try:
name, mult = item.split(":")
self.spirit_roots.append((name, float(mult)))
except Exception:
pass
lb_cfg = cfg.get("leaderboard", {})
leaderboard_key = lb_cfg.get("key", "xiuxian:zset:leaderboard:cultivation")
realm_lb_key = lb_cfg.get("realm_key", "xiuxian:zset:leaderboard:realm")
if self.redis_db:
self.redis_db.set_leaderboard_key(leaderboard_key)
self.redis_db.set_realm_leaderboard_key(realm_lb_key)
shop_cfg = cfg.get("shop", {})
self.shop_items = []
for s in shop_cfg.get("items", []):
try:
n, t, p = s.split(":")
self.shop_items.append({"name": n, "type": t, "price": int(p)})
except Exception:
pass
bt_cfg = cfg.get("breakthrough", {})
self.bt_pill_threshold = int(bt_cfg.get("pill_threshold", 5000))
self.bt_pill_item = bt_cfg.get("pill_item", "筑基丹")
self.bt_pill_success = float(bt_cfg.get("pill_success", 0.4))
self.bt_force_threshold = int(bt_cfg.get("force_threshold", 20000))
self.bt_force_success = float(bt_cfg.get("force_success", 0.1))
self.bt_force_next = bt_cfg.get("force_next_realm", "筑基2层")
# 解析境界分值与层级提升阈值
realm_score_cfg = self._config.get("Xiuxian", {}).get("realm_score", {})
self.realm_score_map = {}
for s in realm_score_cfg.get("stages", []):
try:
n, v = s.split(":")
self.realm_score_map[n] = int(v)
except Exception:
pass
layer_up_cfg = self._config.get("Xiuxian", {}).get("layer_up", {})
self.layer_threshold_map = {}
for s in layer_up_cfg.get("thresholds", []):
try:
n, v = s.split(":")
self.layer_threshold_map[n] = int(v)
except Exception:
pass
# 解析突破阶段配置
stage_cfg = self._config.get("Xiuxian", {}).get("breakthrough_stages", {})
self.break_config = {}
for p in stage_cfg.get("paths", []):
try:
cur, path, cost, rate, target = p.split(":")
if cur not in self.break_config:
self.break_config[cur] = {}
self.break_config[cur][path] = {
"cost": int(cost),
"rate": float(rate),
"target": target
}
except Exception:
pass
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
if not self.enable:
return False
content = str(message.get("content", "")).strip()
for cmd in self._commands:
if content.startswith(cmd):
return True
return False
@plugin_stats_decorator(plugin_name="群聊文字修仙")
@group_feature_rate_limit(max_per_minute=6, feature_key=FEATURE_KEY)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
content = str(message.get("content", "")).strip()
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
cmd = None
for c in self._commands:
if content.startswith(c):
cmd = c
content = content[len(c):].strip()
break
if not cmd:
return False, "不匹配的命令"
if not self.redis_db:
await bot.send_text_message((roomid if roomid else sender), "❌系统未初始化Redis", sender)
return False, "Redis未初始化"
if self.redis_db.check_rate_limited(sender, cmd):
await bot.send_text_message((roomid if roomid else sender), "⚠️ 操作过于频繁,请稍候再试", sender)
return False, "限流"
if cmd == "注册修仙":
return await self._cmd_register(bot, sender, roomid, content)
if cmd in ("修仙帮助"):
return await self._cmd_help(bot, sender, roomid)
if cmd == "我的状态":
return await self._cmd_status(bot, sender, roomid)
if cmd == "闭关":
return await self._cmd_cultivate(bot, sender, roomid)
if cmd == "出关":
return await self._cmd_finish_cultivate(bot, sender, roomid)
if cmd == "聚灵":
return await self._cmd_gather(bot, sender, roomid, content)
if cmd == "排行榜":
return await self._cmd_leaderboard(bot, sender, roomid)
if cmd == "修仙签到":
return await self._cmd_signin(bot, sender, roomid)
if cmd == "坊市":
return await self._cmd_shop(bot, sender, roomid)
if cmd == "购买":
return await self._cmd_buy(bot, sender, roomid, content)
if cmd == "乾坤袋":
return await self._cmd_bag(bot, sender, roomid)
if cmd == "突破":
return await self._cmd_breakthrough(bot, sender, roomid)
if cmd == "强行突破":
return await self._cmd_force_breakthrough(bot, sender, roomid)
if cmd == "劫掠":
return await self._cmd_rob(bot, sender, roomid, content)
if cmd == "赠与":
return await self._cmd_give_stone(bot, sender, roomid, content)
if cmd == "赠送":
return await self._cmd_give_item(bot, sender, roomid, content)
if cmd == "创建门派":
return await self._cmd_clan_create(bot, sender, roomid, content)
if cmd == "加入门派":
return await self._cmd_clan_join(bot, sender, roomid, content)
if cmd == "退出门派":
return await self._cmd_clan_exit(bot, sender, roomid)
return False, "未知命令"
def _rate_set(self, user_id: str, cmd: str):
"""设置用户维度 Redis 冷却键,用于防骚扰与防封。"""
seconds = self.seconds_rl.get(cmd, 3)
self.redis_db.set_rate_limit(user_id, cmd, seconds)
def _check_status_update(self, player: Dict[str, Any]) -> Dict[str, Any]:
"""状态机自动流转:过期的 Unstable_Qi/Injured 恢复为 Idle。"""
now = datetime.now(timezone.utc)
status = player.get("status", "Idle")
until_str = player.get("status_until")
until = None
if until_str:
try:
until = datetime.fromisoformat(until_str)
except Exception:
until = None
if until and until.tzinfo is None:
until = until.replace(tzinfo=timezone.utc)
if status in ("Unstable_Qi", "Injured") and until and now >= until:
player["status"] = "Idle"
player["status_until"] = None
self.redis_db.save_player(player)
return player
async def _cmd_register(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
dao_name = content.strip()
if not dao_name:
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", sender)
return False, "命令格式错误"
player = self.redis_db.get_player(sender)
if player:
await bot.send_text_message((roomid if roomid else sender), "⚠️ 已注册,无需重复注册", sender)
self._rate_set(sender, "注册修仙")
return True, "已注册"
root_name, mult = random.choice(self.spirit_roots) if self.spirit_roots else ("凡灵根", 1.0)
player = {
"user_id": sender,
"group_id": roomid or "",
"dao_name": dao_name,
"realm": "炼气1层",
"spirit_root": f"{root_name}",
"spirit_root_mult": mult,
"cultivation_points": 0,
"spirit_stone": 0,
"status": "Idle",
"status_until": None,
"last_cultivate_time": None,
}
self.redis_db.save_player(player)
self._rate_set(sender, "注册修仙")
# 初始化境界排行榜分值
self.redis_db.leaderboard_realm_add(sender, float(self._realm_score(player["realm"])) )
await bot.send_text_message((roomid if roomid else sender), f"✅ 注册成功,道号:{dao_name}\n灵根:{root_name}", sender)
return True, "注册成功"
async def _cmd_help(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
lines = ["📜 修仙帮助"]
lines.append("用法提示:")
lines.append(self.command_format.strip())
lines.append("可用指令:")
lines.append(" | ".join(self._commands))
lines.append("常用参数格式:")
lines.append("注册修仙 道号")
lines.append("聚灵 数量")
lines.append("购买 物品 数量")
lines.append("赠与 目标wxid 数量")
lines.append("赠送 目标wxid 物品 数量")
lines.append("劫掠 目标wxid")
lines.append("创建门派 名称")
lines.append("加入门派 名称")
msg = "\n".join(lines)
await bot.send_text_message((roomid if roomid else sender), msg, sender)
self._rate_set(sender, "帮助")
return True, "帮助"
async def _cmd_status(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self.redis_db.get_player(sender)
if not player:
await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender)
return False, "未注册"
player = self._check_status_update(player)
msg = (
f"🧙‍♂️ 我的状态\n"
f"道号:{player.get('dao_name')}\n"
f"境界:{player.get('realm')}\n"
f"灵根:{player.get('spirit_root')}\n"
f"修为:{player.get('cultivation_points')}\n"
f"灵石:{player.get('spirit_stone')}\n"
f"状态:{player.get('status')}\n"
)
await bot.send_text_message((roomid if roomid else sender), msg, sender)
self._rate_set(sender, "我的状态")
return True, "状态展示"
async def _cmd_cultivate(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self.redis_db.get_player(sender)
if not player:
await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender)
return False, "未注册"
player = self._check_status_update(player)
status = player.get("status", "Idle")
if status == "Cultivating":
await bot.send_text_message((roomid if roomid else sender), "⚠️ 已在闭关中", sender)
return False, "重复闭关"
if status not in ("Idle", "Injured"):
await bot.send_text_message((roomid if roomid else sender), f"当前状态[{status}]不可闭关", sender)
return False, "状态不可闭关"
player["status"] = "Cultivating"
player["last_cultivate_time"] = datetime.now(timezone.utc).isoformat()
self.redis_db.save_player(player)
self._rate_set(sender, "闭关")
await bot.send_text_message((roomid if roomid else sender), "✅ 已进入闭关,期间安全不可被劫掠", sender)
return True, "闭关成功"
async def _cmd_finish_cultivate(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self.redis_db.get_player(sender)
if not player:
await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender)
return False, "未注册"
player = self._check_status_update(player)
if player.get("status") != "Cultivating":
await bot.send_text_message((roomid if roomid else sender), "⚠️ 非闭关状态,无需出关", sender)
return False, "非闭关"
start_iso = player.get("last_cultivate_time")
try:
start = datetime.fromisoformat(start_iso) if start_iso else datetime.now(timezone.utc)
except Exception:
start = datetime.now(timezone.utc)
if start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
duration_hours = (now - start).total_seconds() / 3600.0
duration_hours = max(0.0, min(duration_hours, float(self.max_cultivate_hours)))
rate = self.base_rate_per_hour * float(player.get("spirit_root_mult", 1.0))
gain = int(duration_hours * rate)
player["cultivation_points"] = int(player.get("cultivation_points", 0)) + gain
player["status"] = "Unstable_Qi"
player["status_until"] = (now + timedelta(minutes=int(self.unstable_qi_minutes))).isoformat()
player["last_cultivate_time"] = None
self.redis_db.save_player(player)
# 自动层级提升(不跨瓶颈)
self._auto_layer_up(sender, player)
self.redis_db.leaderboard_add(sender, float(player["cultivation_points"]))
self._rate_set(sender, "出关")
await bot.send_text_message((roomid if roomid else sender), f"✅ 出关成功,获得修为:{gain}{duration_hours:.1f}小时)\n当前修为:{player['cultivation_points']}\n状态:气息不稳 {self.unstable_qi_minutes}分钟", sender)
return True, "出关结算"
async def _cmd_gather(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
player = self.redis_db.get_player(sender)
if not player:
await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender)
return False, "未注册"
try:
qty = int(content.strip())
except Exception:
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", sender)
return False, "命令格式错误"
stones = int(player.get("spirit_stone", 0))
if qty <= 0 or stones < qty:
await bot.send_text_message((roomid if roomid else sender), "⚠️ 灵石不足或数量不合法", sender)
return False, "灵石不足"
player["spirit_stone"] = stones - qty
player["cultivation_points"] = int(player.get("cultivation_points", 0)) + qty * 10
self.redis_db.save_player(player)
# 自动层级提升(不跨瓶颈)
self._auto_layer_up(sender, player)
self.redis_db.leaderboard_add(sender, float(player["cultivation_points"]))
self._rate_set(sender, "聚灵")
await bot.send_text_message((roomid if roomid else sender), f"✅ 聚灵成功,消耗灵石{qty},获得修为{qty * 10}", sender)
return True, "聚灵成功"
async def _cmd_leaderboard(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
top = self.redis_db.leaderboard_top(10)
lines = ["🏆 修为排行榜 Top10"]
rank = 1
for uid, score in top:
mark = "" if uid == sender else ""
lines.append(f"{rank}. {uid} - {int(score)} {mark}")
rank += 1
await bot.send_text_message((roomid if roomid else sender), "\n".join(lines), sender)
self._rate_set(sender, "排行榜")
return True, "排行榜"
def _get_player(self, user_id: str) -> Optional[Dict[str, Any]]:
return self.redis_db.get_player(user_id)
def _save_player(self, player: Dict[str, Any]):
self.redis_db.save_player(player)
def _get_player_with_cache(self, user_id: str) -> Optional[Dict[str, Any]]:
"""读取玩家:优先 Redis未命中则读 DB 并回填缓存。"""
p = self.redis_db.get_player(user_id)
if p:
return p
if self.xdb:
dbp = self.xdb.get_player(user_id)
if dbp:
dbp.setdefault("spirit_root_mult", 1.0)
dbp.setdefault("inventory", {})
self.redis_db.save_player(dbp)
return dbp
return None
def _parse_realm(self, realm: str) -> Tuple[str, Optional[int]]:
try:
if "" in realm:
prefix, layer_str = realm.split("")[0], realm.split(" ")[-1] if " " in realm else None
# expected format: 前缀N层例如 炼气1层/筑基10层
import re
m = re.match(r"^(.*?)?(\d+)层$", realm)
if m:
prefix = m.group(1)
layer = int(m.group(2))
return prefix, layer
except Exception:
pass
return realm, None
def _realm_score(self, realm: str) -> int:
prefix, layer = self._parse_realm(realm)
base = self.realm_score_map.get(prefix, 0)
return base + (layer or 0)
def _set_realm(self, user_id: str, player: Dict[str, Any], new_realm: str):
player["realm"] = new_realm
if self.xdb:
try:
self.xdb.update_player_fields(user_id, {"realm": new_realm})
self.redis_db.invalidate_player(user_id)
except Exception:
pass
self._save_player(player)
# 更新境界排行榜
self.redis_db.leaderboard_realm_add(user_id, float(self._realm_score(new_realm)))
def _auto_layer_up(self, user_id: str, player: Dict[str, Any]):
prefix, layer = self._parse_realm(player.get("realm", "凡人"))
if layer is None or layer >= 10:
return
threshold = self.layer_threshold_map.get(prefix)
if not threshold:
return
pts = int(player.get("cultivation_points", 0))
new_layer = max(layer, min(10, pts // threshold))
if new_layer != layer:
self._set_realm(user_id, player, f"{prefix}{new_layer}")
async def _cmd_signin(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player(sender)
if not player:
await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
return False, "未注册"
if self.redis_db.check_rate_limited(sender, "签到"):
await bot.send_text_message(roomid or sender, "今日已签到,请明日再来", sender)
return False, "已签到"
reward = 50
player["spirit_stone"] = int(player.get("spirit_stone", 0)) + reward
self._save_player(player)
self._rate_set(sender, "签到")
await bot.send_text_message(roomid or sender, f"✅ 签到成功,获得灵石{reward}", sender)
return True, "签到成功"
async def _cmd_shop(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
lines = ["🛒 坊市商品"]
for item in self.shop_items:
lines.append(f"{item['name']} [{item['type']}] - {item['price']}灵石")
await bot.send_text_message(roomid or sender, "\n".join(lines), sender)
self._rate_set(sender, "坊市")
return True, "坊市"
async def _cmd_buy(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
parts = content.split()
if len(parts) < 2:
await bot.send_text_message(roomid or sender, f"❌命令格式错误!\n购买 物品 数量", sender)
return False, "命令格式错误"
item_name = parts[0]
try:
qty = int(parts[1])
except Exception:
await bot.send_text_message(roomid or sender, f"❌命令格式错误!\n购买 物品 数量", sender)
return False, "命令格式错误"
player = self._get_player(sender)
if not player:
await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
return False, "未注册"
item = next((i for i in self.shop_items if i["name"] == item_name), None)
if not item:
await bot.send_text_message(roomid or sender, "商品不存在", sender)
return False, "商品不存在"
total = item["price"] * qty
stones = int(player.get("spirit_stone", 0))
if qty <= 0 or stones < total:
await bot.send_text_message(roomid or sender, "灵石不足", sender)
return False, "灵石不足"
player["spirit_stone"] = stones - total
inv = player.get("inventory") or {}
inv[item_name] = int(inv.get(item_name, 0)) + qty
player["inventory"] = inv
self._save_player(player)
self._rate_set(sender, "购买")
await bot.send_text_message(roomid or sender, f"✅ 购买成功,{item_name} × {qty}", sender)
return True, "购买成功"
async def _cmd_bag(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player(sender)
if not player:
await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
return False, "未注册"
inv = player.get("inventory") or {}
lines = ["🎒 背包"]
if not inv:
lines.append("")
else:
for k, v in inv.items():
lines.append(f"{k} × {v}")
await bot.send_text_message(roomid or sender, "\n".join(lines), sender)
self._rate_set(sender, "背包")
return True, "背包"
async def _cmd_breakthrough(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player(sender)
if not player:
await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
return False, "未注册"
points = int(player.get("cultivation_points", 0))
inv = player.get("inventory") or {}
# 读取当前瓶颈配置
cur_realm = player.get("realm", "炼气1层")
prefix, layer = self._parse_realm(cur_realm)
stage_key = f"{prefix}10层"
stage_conf = self.break_config.get(stage_key)
if not stage_conf:
await bot.send_text_message(roomid or sender, "当前境界未到瓶颈或未配置突破路径", sender)
return False, "未到瓶颈"
pill_conf = stage_conf.get("pill")
if not pill_conf:
await bot.send_text_message(roomid or sender, "丹药路径未配置", sender)
return False, "未配置"
if points < pill_conf["cost"]:
await bot.send_text_message(roomid or sender, "修为不足,无法突破", sender)
return False, "修为不足"
if inv.get(self.bt_pill_item, 0) <= 0:
await bot.send_text_message(roomid or sender, f"缺少丹药:{self.bt_pill_item}", sender)
return False, "缺少丹药"
inv[self.bt_pill_item] = inv.get(self.bt_pill_item, 0) - 1
player["inventory"] = inv
player["cultivation_points"] = points - pill_conf["cost"]
if self.xdb:
try:
self.xdb.remove_item(sender, self.bt_pill_item, 1)
self.xdb.update_player_fields(sender, {"cultivation_points": player["cultivation_points"]})
self.redis_db.invalidate_player(sender)
except Exception:
pass
roll = random.random()
if roll < pill_conf["rate"]:
# 成功,更新境界并排行榜
self._set_realm(sender, player, pill_conf["target"])
self.redis_db.leaderboard_add(sender, float(player["cultivation_points"]))
self._rate_set(sender, "突破")
await bot.send_text_message(roomid or sender, f"✅ 突破成功,晋升至{pill_conf['target']}", sender)
return True, "突破成功"
else:
self._save_player(player)
self._rate_set(sender, "突破")
await bot.send_text_message(roomid or sender, "❌ 突破失败", sender)
return False, "突破失败"
async def _cmd_force_breakthrough(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player(sender)
if not player:
await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
return False, "未注册"
points = int(player.get("cultivation_points", 0))
# 读取当前瓶颈配置
cur_realm = player.get("realm", "炼气1层")
prefix, layer = self._parse_realm(cur_realm)
stage_key = f"{prefix}10层"
stage_conf = self.break_config.get(stage_key)
if not stage_conf:
await bot.send_text_message(roomid or sender, "当前境界未到瓶颈或未配置突破路径", sender)
return False, "未到瓶颈"
hard_conf = stage_conf.get("hard")
if not hard_conf:
await bot.send_text_message(roomid or sender, "强行路径未配置", sender)
return False, "未配置"
if points < hard_conf["cost"]:
await bot.send_text_message(roomid or sender, "修为不足,无法强行突破", sender)
return False, "修为不足"
player["cultivation_points"] = points - hard_conf["cost"]
if self.xdb:
try:
self.xdb.update_player_fields(sender, {"cultivation_points": player["cultivation_points"]})
self.redis_db.invalidate_player(sender)
except Exception:
pass
roll = random.random()
if roll < hard_conf["rate"]:
self._set_realm(sender, player, hard_conf["target"])
self.redis_db.leaderboard_add(sender, float(player["cultivation_points"]))
self._rate_set(sender, "强行突破")
await bot.send_text_message(roomid or sender, f"✅ 强行突破成功,晋升至{hard_conf['target']}", sender)
return True, "强行突破成功"
else:
self._save_player(player)
self._rate_set(sender, "强行突破")
await bot.send_text_message(roomid or sender, "❌ 强行突破失败,灵气反噬!", sender)
return False, "强行突破失败"
async def _cmd_rob(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
target = content.strip().lstrip("@")
if not target:
await bot.send_text_message(roomid or sender, "命令格式:劫掠 目标wxid", sender)
return False, "命令格式错误"
if target == sender:
await bot.send_text_message(roomid or sender, "不可劫掠自己", sender)
return False, "非法目标"
attacker = self._get_player(sender)
defender = self._get_player(target)
if not attacker or not defender:
await bot.send_text_message(roomid or sender, "双方需已注册", sender)
return False, "未注册"
attacker = self._check_status_update(attacker)
defender = self._check_status_update(defender)
# 新手保护:不可劫掠炼气期
def_prefix, _ = self._parse_realm(defender.get("realm", "炼气1层"))
if def_prefix == "炼气":
await bot.send_text_message(roomid or sender, "目标处于新手保护期(炼气),不可劫掠", sender)
return False, "新手保护"
if defender.get("status") in ("Cultivating", "Injured"):
await bot.send_text_message(roomid or sender, "目标处于保护或闭关中", sender)
return False, "目标保护"
if roomid and (attacker.get("group_id") != roomid or defender.get("group_id") != roomid):
await bot.send_text_message(roomid or sender, "仅限同群内劫掠", sender)
return False, "跨群"
if attacker.get("clan_id") and defender.get("clan_id") and attacker.get("clan_id") == defender.get("clan_id"):
await bot.send_text_message(roomid or sender, "不可劫掠同门", sender)
return False, "同门"
a_pts = int(attacker.get("cultivation_points", 0))
d_pts = int(defender.get("cultivation_points", 0))
base = 0.5
if a_pts > d_pts:
base += 0.2
elif a_pts < d_pts:
base -= 0.2
roll = random.random()
if roll < base:
d_stones = int(defender.get("spirit_stone", 0))
gain = max(0, int(d_stones * 0.2))
defender["spirit_stone"] = d_stones - gain
attacker["spirit_stone"] = int(attacker.get("spirit_stone", 0)) + gain
defender["status"] = "Injured"
defender["status_until"] = (datetime.now(timezone.utc) + timedelta(minutes=int(self.injured_minutes))).isoformat()
self._save_player(defender)
self._save_player(attacker)
self._rate_set(sender, "劫掠")
await bot.send_text_message(roomid or sender, f"✅ 劫掠成功,获得灵石{gain}", sender)
if roomid:
await bot.send_text_message(roomid, f"{sender} 劫掠 {target} 成功,目标进入受伤保护", [target])
return True, "劫掠成功"
else:
self._rate_set(sender, "劫掠")
await bot.send_text_message(roomid or sender, "❌ 劫掠失败", sender)
return False, "劫掠失败"
async def _cmd_give_stone(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
parts = content.strip().split()
if len(parts) < 2:
await bot.send_text_message(roomid or sender, "命令格式:赠与 目标wxid 数量", sender)
return False, "命令格式错误"
target = parts[0].lstrip("@")
try:
qty = int(parts[1])
except Exception:
await bot.send_text_message(roomid or sender, "命令格式:赠与 目标wxid 数量", sender)
return False, "命令格式错误"
giver = self._get_player_with_cache(sender)
receiver = self._get_player_with_cache(target)
if not giver or not receiver:
await bot.send_text_message(roomid or sender, "双方需已注册", sender)
return False, "未注册"
if not giver.get("clan_id") or giver.get("clan_id") != receiver.get("clan_id"):
await bot.send_text_message(roomid or sender, "仅同门可赠与灵石", sender)
return False, "不同门"
stones = int(giver.get("spirit_stone", 0))
if qty <= 0 or stones < qty:
await bot.send_text_message(roomid or sender, "灵石不足或数量不合法", sender)
return False, "灵石不足"
giver["spirit_stone"] = stones - qty
receiver["spirit_stone"] = int(receiver.get("spirit_stone", 0)) + qty
self._save_player(giver)
self._save_player(receiver)
self._rate_set(sender, "赠与")
await bot.send_text_message(roomid or sender, f"✅ 已向 {target} 赠与灵石 {qty}", sender)
return True, "赠与成功"
async def _cmd_give_item(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
parts = content.strip().split()
if len(parts) < 3:
await bot.send_text_message(roomid or sender, "命令格式:赠送 目标wxid 物品 数量", sender)
return False, "命令格式错误"
target = parts[0].lstrip("@")
item_name = parts[1]
try:
qty = int(parts[2])
except Exception:
await bot.send_text_message(roomid or sender, "命令格式:赠送 目标wxid 物品 数量", sender)
return False, "命令格式错误"
giver = self._get_player(sender)
receiver = self._get_player(target)
if not giver or not receiver:
await bot.send_text_message(roomid or sender, "双方需已注册", sender)
return False, "未注册"
inv_g = giver.get("inventory") or {}
if inv_g.get(item_name, 0) < qty or qty <= 0:
await bot.send_text_message(roomid or sender, "物品不足或数量不合法", sender)
return False, "物品不足"
inv_r = receiver.get("inventory") or {}
inv_g[item_name] = inv_g.get(item_name, 0) - qty
inv_r[item_name] = int(inv_r.get(item_name, 0)) + qty
giver["inventory"] = inv_g
receiver["inventory"] = inv_r
if self.xdb:
ok = self.xdb.transfer_item(sender, target, item_name, qty)
if not ok:
await bot.send_text_message(roomid or sender, "物品转移失败", sender)
return False, "转移失败"
self.redis_db.invalidate_player(sender)
self.redis_db.invalidate_player(target)
self._save_player(giver)
self._save_player(receiver)
self._rate_set(sender, "赠送")
await bot.send_text_message(roomid or sender, f"✅ 已向 {target} 赠送 {item_name} × {qty}", sender)
return True, "赠送成功"
async def _cmd_clan_create(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
name = content.strip()
if not name:
await bot.send_text_message(roomid or sender, "命令格式:创建门派 名称", sender)
return False, "命令格式错误"
player = self._get_player_with_cache(sender)
if not player:
await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
return False, "未注册"
prefix, _ = self._parse_realm(player.get("realm", "炼气1层"))
allowed = {"元婴", "化神", "合体", "大乘", "渡劫", "真仙"}
if prefix not in allowed:
await bot.send_text_message(roomid or sender, "创建门派需达到元婴期及以上", sender)
return False, "境界不足"
clan_id = None
if self.xdb:
clan_id = self.xdb.create_clan(name, roomid or "", sender)
if clan_id is None:
await bot.send_text_message(roomid or sender, "门派已存在或创建失败", sender)
return False, "门派失败"
if player and clan_id is not None:
player["clan_id"] = int(clan_id)
if self.xdb:
self.xdb.update_player_fields(sender, {"clan_id": player["clan_id"]})
self.redis_db.invalidate_player(sender)
self._save_player(player)
self._rate_set(sender, "创建门派")
await bot.send_text_message(roomid or sender, f"✅ 门派已创建:{name}", sender)
return True, "创建门派"
async def _cmd_clan_join(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
name = content.strip()
if not name:
await bot.send_text_message(roomid or sender, "命令格式:加入门派 名称", sender)
return False, "命令格式错误"
player = self._get_player_with_cache(sender)
if not player:
await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
return False, "未注册"
cid = None
if self.xdb:
cid = self.xdb.get_clan_id(roomid or "", name)
if not cid:
await bot.send_text_message(roomid or sender, "门派不存在", sender)
return False, "门派不存在"
player["clan_id"] = int(cid) if isinstance(cid, str) else cid
if self.xdb:
self.xdb.update_player_fields(sender, {"clan_id": player["clan_id"]})
self.redis_db.invalidate_player(sender)
self._save_player(player)
self._rate_set(sender, "加入门派")
await bot.send_text_message(roomid or sender, f"✅ 已加入门派:{name}", sender)
return True, "加入门派"
async def _cmd_clan_exit(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player_with_cache(sender)
if not player:
await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
return False, "未注册"
player["clan_id"] = None
if self.xdb:
self.xdb.update_player_fields(sender, {"clan_id": None})
self.redis_db.invalidate_player(sender)
self._save_player(player)
self._rate_set(sender, "退出门派")
await bot.send_text_message(roomid or sender, "✅ 已退出门派", sender)
return True, "退出门派"