Files
abot/plugins/xiuxian/main.py
2025-11-25 14:00:48 +08:00

1920 lines
96 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
群聊文字修仙插件
说明:
- 原子指令 + 状态机 + 限流 + Cache-Aside
- 读优先 Redis写 DB 后删除缓存,由读取路径回填
- 群频次限制与用户冷却共同防骚扰/防封
"""
import json
import time
import random
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Tuple
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.decorator.points_decorator import plugin_points_cost
from utils.decorator.rate_limit_decorator import user_feature_rate_limit
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
from db.connection import DBConnectionManager
from db.xiuxian_db import XiuxianDB
from db.points_db import PointsDBOperator, PointSource
from datetime import timezone
class XiuxianRedisDB:
"""修仙插件的 Redis 访问封装:玩家缓存、限流键与排行榜。"""
def __init__(self, db_manager: DBConnectionManager):
self.db_manager = db_manager
self.player_prefix = "xiuxian:cache:player:"
self.leaderboard_key = None
self.leaderboard_realm_key = None
def set_leaderboard_key(self, key: str):
self.leaderboard_key = key
def set_realm_leaderboard_key(self, key: str):
self.leaderboard_realm_key = key
def get_redis(self):
return self.db_manager.get_redis_connection()
def get_player(self, user_id: str, group_id: str) -> Optional[Dict[str, Any]]:
"""读取玩家数据兼容旧格式无group_id和新格式有group_id"""
try:
with self.get_redis() as r:
# 优先尝试新格式xiuxian:cache:player:{group_id}:{user_id}
new_key = f"{self.player_prefix}{group_id}:{user_id}"
data = r.get(new_key)
if data:
if isinstance(data, bytes):
data = data.decode("utf-8")
return json.loads(data)
# 兼容旧格式xiuxian:cache:player:{user_id}
old_key = f"{self.player_prefix}{user_id}"
data = r.get(old_key)
if data:
if isinstance(data, bytes):
data = data.decode("utf-8")
player = json.loads(data)
# 如果旧数据没有 group_id使用传入的 group_id
if not player.get("group_id"):
player["group_id"] = group_id
# 迁移到新格式并删除旧格式
r.set(new_key, json.dumps(player, ensure_ascii=False))
r.delete(old_key)
return player
return None
except Exception as e:
logger.error(f"读取玩家数据失败: {e}")
return None
def save_player(self, player: Dict[str, Any]) -> bool:
"""保存玩家数据到新格式,同时删除旧格式(如果存在)"""
try:
with self.get_redis() as r:
user_id = player.get("user_id")
group_id = player.get("group_id", "")
new_key = f"{self.player_prefix}{group_id}:{user_id}"
old_key = f"{self.player_prefix}{user_id}"
# 保存到新格式
r.set(new_key, json.dumps(player, ensure_ascii=False))
# 删除旧格式(如果存在),确保数据一致性
r.delete(old_key)
return True
except Exception as e:
logger.error(f"保存玩家数据失败: {e}")
return False
def invalidate_player(self, user_id: str, group_id: str):
"""失效玩家缓存,同时删除新旧两种格式"""
try:
with self.get_redis() as r:
new_key = f"{self.player_prefix}{group_id}:{user_id}"
old_key = f"{self.player_prefix}{user_id}"
r.delete(new_key)
r.delete(old_key) # 兼容删除旧格式
except Exception as e:
logger.error(f"失效玩家缓存失败: {e}")
def set_rate_limit(self, user_id: str, group_id: str, cmd: str, seconds: int) -> bool:
"""设置限流支持自然日重置当seconds >= 86400时"""
try:
with self.get_redis() as r:
# 如果是24小时或更长的限流如签到使用基于日期的键
if seconds >= 86400:
# 使用自然日键包含日期过期时间设为到明天0点的秒数
now = datetime.now(timezone.utc)
# 获取明天的0点UTC
tomorrow = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
# 计算到明天0点的秒数
expire_seconds = int((tomorrow - now).total_seconds())
# 使用日期作为键的一部分包含group_id
date_str = now.strftime("%Y-%m-%d")
key = f"xiuxian:rate_limit:group:{group_id}:user:{user_id}:cmd:{cmd}:date:{date_str}"
r.setex(key, expire_seconds, "1")
else:
# 短时间限流使用原来的逻辑包含group_id
key = f"xiuxian:rate_limit:group:{group_id}:user:{user_id}:cmd:{cmd}"
r.setex(key, seconds, "1")
return True
except Exception as e:
logger.error(f"设置限流失败: {e}")
return False
def check_rate_limited(self, user_id: str, group_id: str, cmd: str) -> bool:
"""检查限流,支持自然日重置(当限流时间 >= 86400秒时"""
try:
with self.get_redis() as r:
# 检查是否是长时间限流(如签到)
# 先尝试基于日期的键新格式包含group_id
now = datetime.now(timezone.utc)
date_str = now.strftime("%Y-%m-%d")
date_key = f"xiuxian:rate_limit:group:{group_id}:user:{user_id}:cmd:{cmd}:date:{date_str}"
key_no_date = f"xiuxian:rate_limit:group:{group_id}:user:{user_id}:cmd:{cmd}"
if r.exists(date_key) == 1 or r.exists(key_no_date) == 1:
return True
return False
except Exception as e:
logger.error(f"检查限流失败: {e}")
return False
def leaderboard_add(self, group_id: str, user_id: str, score: float):
try:
if not self.leaderboard_key:
return
with self.get_redis() as r:
r.zadd(f"{self.leaderboard_key}:{group_id}", {user_id: score})
except Exception as e:
logger.error(f"更新排行榜失败: {e}")
def leaderboard_top(self, group_id: str, top_n: int = 10) -> List[Tuple[str, float]]:
try:
if not self.leaderboard_key:
return []
with self.get_redis() as r:
res = r.zrevrange(f"{self.leaderboard_key}:{group_id}", 0, top_n - 1, withscores=True)
return [(uid if isinstance(uid, str) else uid.decode("utf-8"), score) for uid, score in res]
except Exception as e:
logger.error(f"读取排行榜失败: {e}")
return []
def leaderboard_realm_add(self, group_id: str, user_id: str, score: float):
try:
if not self.leaderboard_realm_key:
return
with self.get_redis() as r:
r.zadd(f"{self.leaderboard_realm_key}:{group_id}", {user_id: score})
except Exception as e:
logger.error(f"更新境界排行榜失败: {e}")
def leaderboard_realm_top(self, group_id: str, top_n: int = 10) -> List[Tuple[str, float]]:
try:
if not self.leaderboard_realm_key:
return []
with self.get_redis() as r:
res = r.zrevrange(f"{self.leaderboard_realm_key}:{group_id}", 0, top_n - 1, withscores=True)
return [(uid if isinstance(uid, str) else uid.decode("utf-8"), score) for uid, score in res]
except Exception as e:
logger.error(f"读取境界排行榜失败: {e}")
return []
class XiuxianPlugin(MessagePluginInterface):
"""修仙主插件:指令分发、业务执行与 DB/Redis 协作。"""
FEATURE_KEY = "XIUXIAN"
FEATURE_DESCRIPTION = "🧙‍♂️ 文字修仙 [注册修仙|我的状态|闭关|出关|聚灵|排行榜]"
@property
def name(self) -> str:
return "群聊文字修仙"
@property
def version(self) -> str:
return "0.1.0"
@property
def description(self) -> str:
return "基于Redis的简化修仙玩法含闭关与状态机"
@property
def author(self) -> str:
return "AI助手"
@property
def command_prefix(self) -> Optional[str]:
return ""
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.feature = self.register_feature()
self.redis_db: Optional[XiuxianRedisDB] = None
self.xdb: Optional[XiuxianDB] = None
self.points_db: Optional[PointsDBOperator] = None
self.revoke = None
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件:加载配置,接入 DB 与 Redis并注册功能权限。"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
self.event_system = context.get("event_system")
self.db_manager = DBConnectionManager.get_instance()
if self.db_manager:
self.redis_db = XiuxianRedisDB(self.db_manager)
# 初始化持久化层(如连接池可用)
if self.db_manager.mysql_pool:
self.xdb = XiuxianDB(self.db_manager)
self.points_db = PointsDBOperator(self.db_manager)
try:
self.xdb.init_schema()
except Exception as e:
self.LOG.error(f"修仙表结构初始化失败: {e}")
cfg = self._config.get("Xiuxian", {})
self._commands = cfg.get("command", ["注册修仙", "我的状态", "闭关", "出关", "聚灵", "排行榜"])
self.command_format = cfg.get("command-format", "注册修仙 道号 | 我的状态 | 闭关 | 出关 | 聚灵 数量 | 排行榜")
self.enable = cfg.get("enable", True)
status_cfg = cfg.get("status", {})
self.unstable_qi_minutes = status_cfg.get("unstable_qi_minutes", 15)
self.injured_minutes = status_cfg.get("injured_minutes", 60)
self.max_cultivate_hours = status_cfg.get("max_cultivate_hours", 8)
rate_cfg = cfg.get("rate_limit", {})
self.seconds_rl = {
"我的状态": rate_cfg.get("status_seconds", 3),
"闭关": rate_cfg.get("inout_seconds", 5),
"出关": rate_cfg.get("inout_seconds", 5),
"聚灵": rate_cfg.get("gather_seconds", 30),
"排行榜": rate_cfg.get("break_seconds", 60),
"注册修仙": 5,
"签到": rate_cfg.get("signin_seconds", 86400),
"坊市": rate_cfg.get("shop_seconds", 10),
"购买": rate_cfg.get("buy_seconds", 5),
"出售": rate_cfg.get("buy_seconds", 5),
"乾坤袋": rate_cfg.get("bag_seconds", 3),
"突破": rate_cfg.get("break_seconds", 60),
"强行突破": rate_cfg.get("force_break_seconds", 60),
"劫掠": rate_cfg.get("rob_seconds", 30),
"赠与": rate_cfg.get("gift_seconds", 10),
"赠送": rate_cfg.get("gift_seconds", 10),
"创建门派": 86400,
"加入门派": 604800,
"退出门派": 604800,
"积分购石": rate_cfg.get("points_to_stone_seconds", 10),
"积分换灵石": rate_cfg.get("points_to_stone_seconds", 10),
"出门历练": rate_cfg.get("expedition_seconds", 1800),
"炼丹": rate_cfg.get("alchemy_seconds", 600),
}
cult_cfg = cfg.get("cultivation", {})
self.base_rate_per_hour = cult_cfg.get("base_rate_per_hour", 100)
self.spirit_roots_cfg = cult_cfg.get("spirit_roots", ["凡灵根:1.0", "天灵根:2.0"])
self.spirit_roots = []
for item in self.spirit_roots_cfg:
try:
name, mult = item.split(":")
self.spirit_roots.append((name, float(mult)))
except Exception as e:
self.LOG.warning(f"解析灵根配置失败: {item}, 错误: {e}")
lb_cfg = cfg.get("leaderboard", {})
leaderboard_key = lb_cfg.get("key", "xiuxian:zset:leaderboard:cultivation")
realm_lb_key = lb_cfg.get("realm_key", "xiuxian:zset:leaderboard:realm")
if self.redis_db:
self.redis_db.set_leaderboard_key(leaderboard_key)
self.redis_db.set_realm_leaderboard_key(realm_lb_key)
shop_cfg = cfg.get("shop", {})
self.shop_items = []
for s in shop_cfg.get("items", []):
try:
n, t, p = s.split(":")
self.shop_items.append({"name": n, "type": t, "price": int(p)})
except Exception as e:
self.LOG.warning(f"解析商品配置失败: {s}, 错误: {e}")
materials_cfg = cfg.get("materials", {})
self.material_tier = {}
self.materials_by_tier = {"T1": [], "T2": [], "T3": []}
for m in materials_cfg.get("items", []):
try:
n, tier = m.split(":")
self.material_tier[n] = tier
if tier in self.materials_by_tier:
self.materials_by_tier[tier].append(n)
except Exception as e:
self.LOG.warning(f"解析材料配置失败: {m}, 错误: {e}")
recipes_cfg = cfg.get("recipes", {})
self.recipes = {}
for r in recipes_cfg.get("items", []):
try:
name, rest = r.split(":")
mats_part, stone_part, rate_part = rest.split(";")
mats = {}
for kv in mats_part.split(","):
kv = kv.strip()
if "*" in kv:
mn, q = kv.split("*")
elif "x" in kv:
mn, q = kv.split("x")
else:
mn, q = kv, "1"
mats[mn.strip()] = int(q.strip())
self.recipes[name] = {"materials": mats, "stone": int(stone_part.strip()),
"rate": float(rate_part.strip())}
except Exception as e:
self.LOG.warning(f"解析配方失败: {r}, 错误: {e}")
pts_cfg = cfg.get("points_exchange", {})
self.point_to_stone_rate = int(pts_cfg.get("point_to_stone_rate", 10))
# 统一解析境界配置:每层阈值、境界分值、突破规则
realms_cfg = self._config.get("Xiuxian", {}).get("realms", {})
if not realms_cfg:
self.LOG.warning("未找到境界配置 [Xiuxian.realms],使用默认配置")
# 提供默认配置作为后备
realms_cfg = {
"炼气": "1000,10,筑基丹,0.4,0.1,2.0",
"筑基": "5000,20,金元丹,0.2,0.05,2.0",
"金丹": "50000,30,结婴丹,0.15,0.03,2.0",
"元婴": "200000,40,化神丹,0.1,0.02,2.0",
"化神": "1000000,50,合体丹,0.08,0.01,2.0",
"合体": "5000000,60,大乘丹,0.05,0.005,2.0",
"大乘": "10000000,70,渡劫丹,0.03,0.003,2.0",
"渡劫": "50000000,80,飞升丹,0.02,0.001,2.0",
"真仙": "0,100,,0,0,0"
}
self.realm_score_map = {} # 境界分值映射
self.layer_threshold_map = {} # 每层修为阈值
self.break_config = {} # 突破配置:{境界10层: {pill: {...}, hard: {...}}}
# 境界顺序(用于计算下一个境界)
realm_order = ["炼气", "筑基", "金丹", "元婴", "化神", "合体", "大乘", "渡劫", "真仙"]
self.LOG.info(f"开始解析境界配置,共{len(realms_cfg)}个境界")
for realm_name, config_str in realms_cfg.items():
try:
# 解析配置:每层阈值,境界分值,突破丹药,丹药成功率,强行成功率,强行倍率
parts = config_str.split(",")
if len(parts) < 6:
self.LOG.warning(f"境界配置格式错误: {realm_name}={config_str}")
continue
layer_threshold = int(parts[0].strip())
realm_score = int(parts[1].strip())
pill_item = parts[2].strip() if parts[2].strip() else None
pill_rate = float(parts[3].strip()) if parts[3].strip() else 0.0
hard_rate = float(parts[4].strip()) if parts[4].strip() else 0.0
hard_multiplier = float(parts[5].strip()) if parts[5].strip() else 2.0
# 存储层级阈值和境界分值
self.layer_threshold_map[realm_name] = layer_threshold
self.realm_score_map[realm_name] = realm_score
# 计算突破要求达到10层需要9层×每层阈值
breakthrough_cost = 9 * layer_threshold
hard_breakthrough_cost = int(breakthrough_cost * hard_multiplier)
# 计算下一个境界
next_realm = None
if realm_name != "真仙":
try:
current_idx = realm_order.index(realm_name)
if current_idx + 1 < len(realm_order):
next_realm = realm_order[current_idx + 1]
except ValueError:
pass
# 构建突破配置
if next_realm and layer_threshold > 0:
stage_key = f"{realm_name}10层"
self.break_config[stage_key] = {}
# 丹药突破路径
if pill_item and pill_rate > 0:
self.break_config[stage_key]["pill"] = {
"cost": breakthrough_cost,
"rate": pill_rate,
"target": f"{next_realm}1层",
"item": pill_item
}
# 强行突破路径
if hard_rate > 0:
self.break_config[stage_key]["hard"] = {
"cost": hard_breakthrough_cost,
"rate": hard_rate,
"target": f"{next_realm}2层"
}
except Exception as e:
self.LOG.warning(f"解析境界配置失败: {realm_name}={config_str}, 错误: {e}")
self.LOG.info(f"境界配置解析完成:层级阈值{len(self.layer_threshold_map)}个,突破配置{len(self.break_config)}")
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
if not self.enable:
return False
content = str(message.get("content", "")).strip()
for cmd in self._commands:
if content.startswith(cmd):
return True
return False
@plugin_stats_decorator(plugin_name="群聊文字修仙")
@user_feature_rate_limit(max_per_minute=6, feature_key=FEATURE_KEY)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
content = str(message.get("content", "")).strip()
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
self.revoke = message.get("revoke")
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
cmd = None
for c in self._commands:
if content.startswith(c):
cmd = c
content = content[len(c):].strip()
break
if not cmd:
return False, "不匹配的命令"
if not self.redis_db:
text = "❌ 天道失序仙府未初始化Redis" + self._compose_status_text(sender, roomid)
client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), text,
sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time,
new_msg_id, 5)
return False, "Redis未初始化"
# 限流:默认按用户+群维度;历练按用户全局维度
if self.redis_db.check_rate_limited(sender, roomid or "", cmd):
text = "⚠️ 道友莫急,天道有序,稍候再试" + self._compose_status_text(sender, roomid)
client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), text,
sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time,
new_msg_id, 5)
return False, "限流"
if cmd == "注册修仙":
ok, msg = await self._cmd_register(bot, sender, roomid, content)
return ok, msg
if cmd in ("修仙帮助", "帮助", "help", "修仙指令", "指令"):
ok, msg = await self._cmd_help(bot, sender, roomid)
return ok, msg
if cmd in ("积分购石", "积分换灵石"):
ok, msg = await self._cmd_points_to_stone(bot, sender, roomid, content)
return ok, msg
if cmd == "我的状态":
ok, msg = await self._cmd_status(bot, sender, roomid)
return ok, msg
if cmd == "闭关":
ok, msg = await self._cmd_cultivate(bot, sender, roomid)
return ok, msg
if cmd == "出关":
ok, msg = await self._cmd_finish_cultivate(bot, sender, roomid)
return ok, msg
if cmd == "聚灵":
ok, msg = await self._cmd_gather(bot, sender, roomid, content)
return ok, msg
if cmd == "排行榜":
ok, msg = await self._cmd_leaderboard(bot, sender, roomid)
return ok, msg
if cmd == "修仙签到":
ok, msg = await self._cmd_signin(bot, sender, roomid)
return ok, msg
if cmd == "坊市":
ok, msg = await self._cmd_shop(bot, sender, roomid)
return ok, msg
if cmd == "购买":
ok, msg = await self._cmd_buy(bot, sender, roomid, content)
return ok, msg
if cmd == "出售":
ok, msg = await self._cmd_sell(bot, sender, roomid, content)
return ok, msg
if cmd == "乾坤袋":
ok, msg = await self._cmd_bag(bot, sender, roomid)
return ok, msg
if cmd in ("使用", "服用"):
ok, msg = await self._cmd_use(bot, sender, roomid, content)
return ok, msg
if cmd == "出门历练":
ok, msg = await self._cmd_expedition(bot, sender, roomid)
return ok, msg
if cmd == "炼丹":
ok, msg = await self._cmd_alchemy(bot, sender, roomid, content)
return ok, msg
if cmd == "突破":
ok, msg = await self._cmd_breakthrough(bot, sender, roomid)
return ok, msg
if cmd == "强行突破":
ok, msg = await self._cmd_force_breakthrough(bot, sender, roomid)
return ok, msg
if cmd == "劫掠":
ok, msg = await self._cmd_rob(bot, sender, roomid, content)
return ok, msg
if cmd == "赠与":
ok, msg = await self._cmd_give_stone(bot, sender, roomid, content)
return ok, msg
if cmd == "赠送":
ok, msg = await self._cmd_give_item(bot, sender, roomid, content)
return ok, msg
if cmd == "创建门派":
ok, msg = await self._cmd_clan_create(bot, sender, roomid, content)
return ok, msg
if cmd == "加入门派":
ok, msg = await self._cmd_clan_join(bot, sender, roomid, content)
return ok, msg
if cmd == "退出门派":
ok, msg = await self._cmd_clan_exit(bot, sender, roomid)
return ok, msg
return False, "未知命令"
def _rate_set(self, user_id: str, group_id: str, cmd: str):
"""设置用户维度 Redis 冷却键,用于防骚扰与防封。"""
seconds = self.seconds_rl.get(cmd, 3)
self.redis_db.set_rate_limit(user_id, group_id, cmd, seconds)
def _rate_set_global(self, user_id: str, cmd: str):
seconds = self.seconds_rl.get(cmd, 3)
self.redis_db.set_rate_limit(user_id, "__global__", cmd, seconds)
def _check_status_update(self, player: Dict[str, Any]) -> Dict[str, Any]:
"""状态机自动流转:过期的 Unstable_Qi/Injured 恢复为 Idle。"""
now = datetime.now(timezone.utc)
status = player.get("status", "Idle")
until_str = player.get("status_until")
until = None
if until_str:
try:
until = datetime.fromisoformat(until_str)
except Exception:
until = None
if until and until.tzinfo is None:
until = until.replace(tzinfo=timezone.utc)
if status in ("Unstable_Qi", "Injured", "Expedition") and until and now >= until:
player["status"] = "Idle"
player["status_until"] = None
self._save_player(player)
return player
async def _cmd_register(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
dao_name = content.strip()
if not dao_name:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友施法有误,指令格式不对"
player = self._get_player_with_cache(sender, roomid or "")
if player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "⚠️ 已注册,无需重复注册", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
# self._rate_set(sender, "注册修仙")
return True, "已注册"
root_name, mult = random.choice(self.spirit_roots) if self.spirit_roots else ("凡灵根", 1.0)
player = {
"user_id": sender,
"group_id": roomid or "",
"dao_name": dao_name,
"realm": "炼气1层",
"spirit_root": f"{root_name}",
"spirit_root_mult": mult,
"cultivation_points": 0,
"spirit_stone": 0,
"status": "Idle",
"status_until": None,
"last_cultivate_time": None,
}
if self.xdb:
try:
self.xdb.create_player(player)
except Exception as e:
logger.warning(f"创建玩家数据库记录失败: {e}, user_id={sender}")
self._save_player(player)
self._rate_set(sender, roomid or "", "注册修仙")
# 初始化境界排行榜分值
self.redis_db.leaderboard_realm_add(roomid or "", sender, float(self._realm_score(player["realm"])))
client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender),
f"✅ 入道已定,道号:{dao_name}\n灵根:{root_name}",
sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id,
10)
return True, "注册成功"
async def _cmd_help(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
lines = ["📜 修仙帮助"]
lines.append("用法提示:")
lines.append(self.command_format.strip())
lines.append("可用指令:")
lines.append(" | ".join(self._commands))
lines.append("常用参数格式:")
lines.append("注册修仙 道号")
lines.append("聚灵 数量")
lines.append("购买 物品 数量")
lines.append("出售 物品 数量")
lines.append("出门历练")
lines.append("炼丹 物品 数量")
lines.append("使用 物品名")
lines.append("服用 回气丹")
lines.append("赠与 目标wxid 数量")
lines.append("赠送 目标wxid 物品 数量")
lines.append("劫掠 目标wxid")
lines.append("创建门派 名称")
lines.append("加入门派 名称")
lines.append("积分购石 积分数")
msg = "\n".join(lines)
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), msg, sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 30)
# self._rate_set(sender, "帮助")
return True, "帮助"
async def _cmd_points_to_stone(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[
bool, str]:
if not self.points_db:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "系统未初始化积分模块", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "积分体系未开,暂不可化石"
player = self._get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
try:
pts = int(content.strip())
except Exception:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:积分购石 积分数", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友施法有误,指令格式不对"
if pts <= 0:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "积分数需为正整数", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友,数量需为正整数"
group_id = roomid or ""
ok, res = self.points_db.deduct_points(sender, group_id, pts, PointSource.PLUGIN, "修仙购买灵石")
if not ok:
cur = res.get("current_points", 0)
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"积分不足,当前积分:{cur}", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友积分不足"
rate = int(self.point_to_stone_rate)
if pts < rate:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"积分不足以兑换1灵石至少需要{rate}积分", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友积分不足"
stones_gain = pts // rate
player["spirit_stone"] = int(player.get("spirit_stone", 0)) + stones_gain
self._save_player(player)
self._rate_set(sender, roomid or "", "积分购石")
await self._send_text_with_status(bot, sender, roomid,
f"✅ 积分化石,消耗{pts},得灵石{stones_gain}{rate}分=1石", 10)
return True, "积分购石"
async def _cmd_status(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player_with_cache(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
player = self._check_status_update(player)
status_code = player.get("status")
status_cn = {
"Idle": "空闲",
"Cultivating": "闭关",
"Unstable_Qi": "气息不稳",
"Injured": "受伤保护",
"Expedition": "历练"
}.get(status_code, str(status_code))
msg = (
f"📇 道号:{player.get('dao_name')}{player.get('spirit_root')}\n"
f"✨ 境界:{player.get('realm')}(修为:{player.get('cultivation_points')}点)\n"
f"💎 灵石:{player.get('spirit_stone')}\n"
f"💚 状态:{status_cn}"
)
client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), msg,
sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id,
20)
self._rate_set(sender, roomid or "", "我的状态")
return True, "状态展示"
def _compose_status_text(self, sender: str, roomid: str) -> str:
player = self._get_player_with_cache(sender, roomid or "")
if not player:
return ""
player = self._check_status_update(player)
status_code = player.get("status")
status_cn = {
"Idle": "空闲",
"Cultivating": "闭关",
"Unstable_Qi": "气息不稳",
"Injured": "受伤保护",
"Expedition": "历练"
}.get(status_code, str(status_code))
return (
f"\n—— 当前状态 ——\n"
f"📇 道号:{player.get('dao_name')}{player.get('spirit_root')}\n"
f"✨ 境界:{player.get('realm')}(修为:{player.get('cultivation_points')}点)\n"
f"💎 灵石:{player.get('spirit_stone')}\n"
f"💚 状态:{status_cn}"
)
async def _send_text_with_status(self, bot: WechatAPIClient, sender: str, roomid: str, main_text: str,
revoke_seconds: int = 10):
status_text = self._compose_status_text(sender, roomid)
text = main_text + (status_text if status_text else "")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, text, sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id,
revoke_seconds)
async def _cmd_cultivate(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player_with_cache(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
player = self._check_status_update(player)
status = player.get("status", "Idle")
if status == "Cultivating":
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "⚠️ 已在闭关中", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "重复闭关"
if status not in ("Idle", "Injured"):
status_cn = {
"Idle": "空闲",
"Cultivating": "闭关",
"Unstable_Qi": "气息不稳",
"Injured": "受伤保护",
"Expedition": "历练"
}.get(status, str(status))
await self._send_text_with_status(bot, sender, roomid, f"当前状态[{status_cn}]不宜闭关", 10)
return False, "状态不可闭关"
player["status"] = "Cultivating"
player["last_cultivate_time"] = datetime.now(timezone.utc).isoformat()
self._save_player(player)
self._rate_set(sender, roomid or "", "闭关")
await self._send_text_with_status(bot, sender, roomid, "✅ 已入静闭关,期间天道护持,不可被劫", 10)
return True, "闭关成功"
async def _cmd_finish_cultivate(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player_with_cache(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
player = self._check_status_update(player)
if player.get("status") != "Cultivating":
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "⚠️ 非闭关状态,无需出关", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "非闭关"
start_iso = player.get("last_cultivate_time")
try:
start = datetime.fromisoformat(start_iso) if start_iso else datetime.now(timezone.utc)
except Exception:
start = datetime.now(timezone.utc)
if start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
duration_hours = (now - start).total_seconds() / 3600.0
duration_hours = max(0.0, min(duration_hours, float(self.max_cultivate_hours)))
mult = self._yield_multiplier(player.get("realm", "炼气1层"))
rate = self.base_rate_per_hour * float(player.get("spirit_root_mult", 1.0)) * mult
cbn = float(player.get("cultivate_bonus_next", 0.0) or 0.0)
if cbn > 0:
rate = rate * (1.0 + cbn)
player["cultivate_bonus_next"] = 0.0
gain = int(duration_hours * rate)
player["cultivation_points"] = int(player.get("cultivation_points", 0)) + gain
player["status"] = "Unstable_Qi"
player["status_until"] = (now + timedelta(minutes=int(self.unstable_qi_minutes))).isoformat()
player["last_cultivate_time"] = None
self._save_player(player)
# 自动层级提升(不跨瓶颈)
self._auto_layer_up(sender, player)
self.redis_db.leaderboard_add(player.get("group_id", ""), sender, float(player["cultivation_points"]))
self._rate_set(sender, roomid or "", "出关")
await self._send_text_with_status(bot, sender, roomid,
f"✅ 出关一朝,修为进益:{gain}{duration_hours:.1f}小时)\n现有修为:{player['cultivation_points']}\n状态:气息未平 {self.unstable_qi_minutes}分钟",
10)
return True, "出关结算"
async def _cmd_gather(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
player = self._get_player_with_cache(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
player = self._check_status_update(player)
try:
qty = int(content.strip())
except Exception:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友施法有误,指令格式不对"
stones = int(player.get("spirit_stone", 0))
if qty <= 0 or stones < qty:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "⚠️ 灵石不足或数量不合法", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "灵石不足"
player["spirit_stone"] = stones - qty
bonus = float(player.get("gather_bonus", 0.0))
debuff = float(player.get("gather_debuff", 0.0) or 0.0)
gd_until = player.get("gather_debuff_until")
if gd_until:
try:
u = datetime.fromisoformat(gd_until)
if u and u.tzinfo is None:
u = u.replace(tzinfo=timezone.utc)
if not u or datetime.now(timezone.utc) >= u:
debuff = 0.0
player["gather_debuff"] = 0.0
player["gather_debuff_until"] = None
except Exception:
debuff = 0.0
player["gather_debuff"] = 0.0
player["gather_debuff_until"] = None
base_gain = qty * 10
mult = self._yield_multiplier(player.get("realm", "炼气1层"))
factor = max(0.1, 1.0 + bonus + debuff)
gain = int(base_gain * factor * mult)
player["cultivation_points"] = int(player.get("cultivation_points", 0)) + gain
if bonus > 0:
player["gather_bonus"] = 0.0
self._save_player(player)
# 自动层级提升(不跨瓶颈)
self._auto_layer_up(sender, player)
self.redis_db.leaderboard_add(player.get("group_id", ""), sender, float(player["cultivation_points"]))
self._rate_set(sender, roomid or "", "聚灵")
await self._send_text_with_status(bot, sender, roomid, f"✅ 灵气入体,消耗灵石{qty},修为涨{gain}", 10)
return True, "聚灵成功"
async def _cmd_leaderboard(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
top = self.redis_db.leaderboard_top(roomid or "", 50)
items = []
for uid, score in top:
player = self._get_player_with_cache(uid, roomid or "")
if not player:
continue
cp = int(player.get("cultivation_points", int(score)))
realm = player.get("realm", "炼气1层")
rscore = self._realm_score(realm)
root = player.get("spirit_root", "凡灵根")
nick = player.get("dao_name")
items.append((uid, nick or uid, root, realm, cp, rscore))
# 排序:先按境界分值降序,再按修为降序
items.sort(key=lambda x: (x[5], x[4]), reverse=True)
lines = ["🏆 修仙榜 Top10"]
for i, (_, nick, root, realm, cp, _) in enumerate(items[:10], start=1):
lines.append(f"{i}. {nick}[{root}]-{realm}[{cp}]")
await self._send_text_with_status(bot, sender, roomid, "\n".join(lines), 90)
self._rate_set(sender, roomid or "", "排行榜")
return True, "排行榜"
def _get_player(self, user_id: str, group_id: str) -> Optional[Dict[str, Any]]:
return self._get_player_with_cache(user_id, group_id)
def _save_player(self, player: Dict[str, Any]):
if self.xdb:
fields = {}
for k in ("group_id", "dao_name", "realm", "spirit_root", "cultivation_points", "spirit_stone", "status",
"status_until", "last_cultivate_time", "clan_id"):
if k in player:
fields[k] = player[k]
try:
self.xdb.update_player_fields(player.get("user_id"), player.get("group_id", ""), fields)
except Exception as e:
logger.warning(f"更新玩家字段失败: {e}, user_id={player.get('user_id')}")
self.redis_db.save_player(player)
def _get_player_with_cache(self, user_id: str, group_id: str) -> Optional[Dict[str, Any]]:
"""从缓存或数据库获取玩家数据,并补充缺失字段"""
p = self.redis_db.get_player(user_id, group_id)
if p:
return p
if self.xdb:
dbp = self.xdb.get_player(user_id, group_id)
if dbp and dbp.get("group_id", "") == group_id:
# 补充spirit_root_mult根据spirit_root名称查找对应的倍率
if "spirit_root_mult" not in dbp:
spirit_root_name = dbp.get("spirit_root", "凡灵根")
mult = 1.0
for name, m in self.spirit_roots:
if name == spirit_root_name:
mult = m
break
dbp["spirit_root_mult"] = mult
# 补充inventory从数据库读取背包数据
if "inventory" not in dbp:
try:
items = self.xdb.get_inventory(user_id)
dbp["inventory"] = {item["name"]: item["quantity"] for item in items}
except Exception as e:
logger.warning(f"读取背包数据失败: {e}")
dbp["inventory"] = {}
self.redis_db.save_player(dbp)
return dbp
return None
def _parse_realm(self, realm: str) -> Tuple[str, Optional[int]]:
try:
if "" in realm:
prefix, layer_str = realm.split("")[0], realm.split(" ")[-1] if " " in realm else None
# expected format: 前缀N层例如 炼气1层/筑基10层
import re
m = re.match(r"^(.*?)?(\d+)层$", realm)
if m:
prefix = m.group(1)
layer = int(m.group(2))
return prefix, layer
except Exception:
pass
return realm, None
def _realm_score(self, realm: str) -> int:
prefix, layer = self._parse_realm(realm)
base = self.realm_score_map.get(prefix, 0)
return base + (layer or 0)
def _yield_multiplier(self, realm: str) -> float:
prefix, layer = self._parse_realm(realm)
rs = self.realm_score_map.get(prefix, 0)
realm_mult = 1.0 + (float(rs) / 100.0)
layer_bonus = 1.0 + (max(0, (layer or 1) - 1) * 0.02)
return realm_mult * layer_bonus
def _set_realm(self, user_id: str, player: Dict[str, Any], new_realm: str):
player["realm"] = new_realm
if self.xdb:
try:
self.xdb.update_player_fields(user_id, player.get("group_id", ""), {"realm": new_realm})
self.redis_db.invalidate_player(user_id, player.get("group_id", ""))
except Exception:
pass
self._save_player(player)
# 更新境界排行榜
self.redis_db.leaderboard_realm_add(player.get("group_id", ""), user_id, float(self._realm_score(new_realm)))
def _auto_layer_up(self, user_id: str, player: Dict[str, Any]):
"""自动层级提升:根据修为自动提升层数(不跨瓶颈)
逻辑:
- 炼气1层0-999修为
- 炼气2层1000-1999修为
- ...
- 炼气10层9000-9999修为
计算公式new_layer = min(10, (pts // threshold) + 1)
"""
prefix, layer = self._parse_realm(player.get("realm", "凡人"))
if layer is None or layer >= 10:
return
threshold = self.layer_threshold_map.get(prefix)
if not threshold:
return
pts = int(player.get("cultivation_points", 0))
# 修复:计算应该达到的层数 = (修为 // 每层阈值) + 1
# 例如1000修为 → 1000 // 1000 = 1 → 1 + 1 = 2层
calculated_layer = min(10, (pts // threshold) + 1)
# 只允许提升,不允许降低
new_layer = max(layer, calculated_layer)
if new_layer != layer:
self._set_realm(user_id, player, f"{prefix}{new_layer}")
async def _cmd_signin(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
player = self._check_status_update(player)
if self.redis_db.check_rate_limited(sender, roomid or "", "签到"):
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "今日已签到,请明日再来", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "已签到"
reward = 50
player["spirit_stone"] = int(player.get("spirit_stone", 0)) + reward
self._save_player(player)
self._rate_set(sender, roomid or "", "签到")
await self._send_text_with_status(bot, sender, roomid, f"✅ 灵运昌盛,获灵石{reward}", 10)
return True, "签到成功"
async def _cmd_shop(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
lines = ["🛒 坊市陈列"]
for item in self.shop_items:
lines.append(f"{item['name']} [{item['type']}] - {item['price']}灵石")
await self._send_text_with_status(bot, sender, roomid, "\n".join(lines), 10)
self._rate_set(sender, roomid or "", "坊市")
return True, "坊市"
async def _cmd_buy(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
parts = content.split()
if len(parts) < 2:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"❌命令格式错误!\n购买 物品 数量", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友施法有误,指令格式不对"
item_name = parts[0]
try:
qty = int(parts[1])
except Exception:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"❌命令格式错误!\n购买 物品 数量", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友施法有误,指令格式不对"
player = self._get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
player = self._check_status_update(player)
item = next((i for i in self.shop_items if i["name"] == item_name), None)
if not item:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "商品不存在", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "坊市中无此物"
total = item["price"] * qty
stones = int(player.get("spirit_stone", 0))
if qty <= 0 or stones < total:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "灵石不足", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "灵石不够,修行不可强行为之"
player["spirit_stone"] = stones - total
inv = player.get("inventory") or {}
inv[item_name] = int(inv.get(item_name, 0)) + qty
player["inventory"] = inv
if self.xdb:
try:
self.xdb.update_player_fields(sender, player.get("group_id", ""),
{"spirit_stone": player["spirit_stone"]})
self.xdb.add_item(sender, item_name, item.get("type", "other"), qty)
except Exception:
pass
self._save_player(player)
self._rate_set(sender, roomid or "", "购买")
await self._send_text_with_status(bot, sender, roomid, f"✅ 已购得 {item_name} × {qty}", 10)
return True, "购买成功"
async def _cmd_sell(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
parts = content.split()
if len(parts) < 2:
return False, "道友施法有误,指令格式不对"
item_name = parts[0]
try:
qty = int(parts[1])
except Exception:
return False, "道友施法有误,指令格式不对"
player = self._get_player(sender, roomid or "")
if not player:
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
player = self._check_status_update(player)
inv = player.get("inventory") or {}
have = int(inv.get(item_name, 0))
if qty <= 0 or have < qty:
return False, "乾坤袋中此物不足"
item = next((i for i in self.shop_items if i["name"] == item_name), None)
if not item:
return False, "坊市中无此物"
sell_ratio = 0.5
revenue = int(item["price"] * qty * sell_ratio)
inv[item_name] = have - qty
player["inventory"] = inv
player["spirit_stone"] = int(player.get("spirit_stone", 0)) + revenue
if self.xdb:
try:
self.xdb.remove_item(sender, item_name, qty)
self.xdb.update_player_fields(sender, player.get("group_id", ""),
{"spirit_stone": player["spirit_stone"]})
except Exception:
pass
self._save_player(player)
self._rate_set(sender, roomid or "", "出售")
await self._send_text_with_status(bot, sender, roomid, f"✅ 已出手 {item_name} × {qty},入账灵石{revenue}", 10)
return True, "出售成功"
async def _cmd_bag(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
inv = player.get("inventory") or {}
items = []
if self.xdb:
try:
items = self.xdb.get_inventory(sender)
except Exception:
items = []
lines = ["🎒 背包"]
if not items:
lines.append("")
else:
for it in items:
lines.append(f"{it['name']} × {it['quantity']}")
await self._send_text_with_status(bot, sender, roomid, "\n".join(lines), 10)
self._rate_set(sender, roomid or "", "背包")
return True, "背包"
async def _cmd_expedition(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player(sender, roomid or "")
if not player:
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
if self.redis_db.check_rate_limited(sender, "__global__", "出门历练"):
client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender),
"⚠️ 天机不可泄,今日行程已定,稍候再试",
sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time,
new_msg_id, 5)
return False, "限流"
player = self._check_status_update(player)
st = player.get("status", "Idle")
if st in ("Cultivating", "Injured", "Expedition"):
return False, "状态不可历练"
now = datetime.now(timezone.utc)
player["status"] = "Expedition"
player["status_until"] = (now + timedelta(minutes=20)).isoformat()
self._save_player(player)
self._rate_set_global(sender, "出门历练")
prefix, layer = self._parse_realm(player.get("realm", "炼气1层"))
rs = self.realm_score_map.get(prefix, 0)
mult = self._yield_multiplier(player.get("realm", "炼气1层"))
orig_stones = int(player.get("spirit_stone", 0))
stones = orig_stones
mats_gain: Dict[str, int] = {}
cult_gain = 0
final_status = "Expedition"
final_until = player.get("status_until")
events = [
"洞府遗址",
"古老传承",
"天材地宝",
"大妖横行",
"魔头埋伏",
"天劫试炼",
"禁地余威",
"灵脉枯竭",
"灵潮涌动",
]
weights = [
30 + rs,
20 + rs,
25 + max(rs - 10, 0),
25,
20,
15 + max(rs - 20, 0),
15,
10,
10 + rs,
]
count = 1
if prefix in ("筑基", "金丹"):
count = 2
elif prefix in ("元婴", "化神", "合体", "大乘", "渡劫", "真仙"):
count = random.choice([2, 3])
happened = []
for _ in range(count):
idx = random.choices(range(len(events)), weights=weights, k=1)[0]
name = events[idx]
happened.append(name)
if name == "洞府遗址":
stones += int(random.uniform(80, 200) * mult)
n = random.randint(2, 4)
tiers = ["T3", "T2"] if rs >= 40 else ["T2", "T1"]
for _m in range(n):
tier = tiers[0] if random.random() < 0.5 else tiers[1]
pool = self.materials_by_tier.get(tier, [])
if pool:
mk = random.choice(pool)
mats_gain[mk] = mats_gain.get(mk, 0) + 1
if random.random() < 0.25:
final_status = "Unstable_Qi"
final_until = (datetime.now(timezone.utc) + timedelta(minutes=20)).isoformat()
elif name == "古老传承":
gain = int(random.uniform(100, 500) * mult)
cult_gain += gain
player["cultivate_bonus_next"] = float(player.get("cultivate_bonus_next", 0.0) or 0.0) + 0.1
elif name == "天材地宝":
stones += int(random.uniform(30, 80) * mult)
n = random.randint(1, 3)
tiers = ["T2", "T1"]
for _m in range(n):
tier = tiers[0] if random.random() < min(0.3 * mult, 0.7) else tiers[1]
pool = self.materials_by_tier.get(tier, [])
if pool:
mk = random.choice(pool)
mats_gain[mk] = mats_gain.get(mk, 0) + 1
if rs >= 50 and random.random() < 0.3:
pool = self.materials_by_tier.get("T3", [])
if pool:
mk = random.choice(pool)
mats_gain[mk] = mats_gain.get(mk, 0) + 1
elif name == "大妖横行":
p = min(0.5 + rs / 200.0, 0.8)
if random.random() < p:
gain = int(max(0, stones) * random.uniform(0.2, 0.5))
stones += gain
n = random.randint(2, 5)
pool = self.materials_by_tier.get("T2", [])
for _m in range(n):
if pool:
mk = random.choice(pool)
mats_gain[mk] = mats_gain.get(mk, 0) + 1
if rs >= 50 and random.random() < 0.2:
pool3 = self.materials_by_tier.get("T3", [])
if pool3:
mk = random.choice(pool3)
mats_gain[mk] = mats_gain.get(mk, 0) + 1
else:
loss = int(max(0, stones) * random.uniform(0.1, 0.3))
stones = max(0, stones - loss)
final_status = "Injured"
final_until = (datetime.now(timezone.utc) + timedelta(minutes=30)).isoformat()
elif name == "魔头埋伏":
loss = int(max(0, stones) * random.uniform(0.2, 0.4))
stones = max(0, stones - loss)
if random.random() < 0.3:
inv = player.get("inventory") or {}
keys = [k for k, v in inv.items() if int(v) > 0]
if keys:
rk = random.choice(keys)
inv[rk] = max(0, int(inv.get(rk, 0)) - 1)
player["inventory"] = inv
player["gather_debuff"] = -random.uniform(0.1, 0.2)
player["gather_debuff_until"] = (datetime.now(timezone.utc) + timedelta(minutes=30)).isoformat()
elif name == "天劫试炼":
if rs >= 40 and random.random() < 0.4:
gain = int(random.uniform(200, 800) * mult)
cult_gain += gain
else:
final_status = "Injured"
final_until = (datetime.now(timezone.utc) + timedelta(minutes=30)).isoformat()
elif name == "禁地余威":
final_status = "Unstable_Qi"
final_until = (datetime.now(timezone.utc) + timedelta(minutes=random.randint(20, 30))).isoformat()
elif name == "灵脉枯竭":
pass
elif name == "灵潮涌动":
player["gather_bonus"] = float(player.get("gather_bonus", 0.0) or 0.0) + 0.2
player["spirit_stone"] = stones
if cult_gain > 0:
player["cultivation_points"] = int(player.get("cultivation_points", 0)) + cult_gain
self.redis_db.leaderboard_add(player.get("group_id", ""), sender, float(player["cultivation_points"]))
inv = player.get("inventory") or {}
for k, v in mats_gain.items():
inv[k] = int(inv.get(k, 0)) + v
player["inventory"] = inv
if final_status in ("Injured", "Unstable_Qi"):
player["status"] = final_status
player["status_until"] = final_until
if self.xdb:
try:
self.xdb.update_player_fields(sender, player.get("group_id", ""),
{"spirit_stone": player["spirit_stone"],
"cultivation_points": player.get("cultivation_points", 0),
"status": player.get("status"),
"status_until": player.get("status_until")})
for k, v in mats_gain.items():
self.xdb.add_item(sender, k, "材料", v)
except Exception:
pass
self._save_player(player)
self._rate_set_global(sender, "出门历练")
mats_text = ", ".join([f"{k}×{v}" for k, v in mats_gain.items()]) if mats_gain else ""
total_stones_delta = player["spirit_stone"] - orig_stones
if cult_gain > 0:
msg = f"✅ 行走四方所得:灵石{total_stones_delta},修为{cult_gain},材料:{mats_text}\n遭遇:{''.join(happened)}"
else:
msg = f"✅ 行走四方所得:灵石{total_stones_delta},材料:{mats_text}\n遭遇:{''.join(happened)}"
await self._send_text_with_status(bot, sender, roomid, msg, 10)
return True, "历练成功"
async def _cmd_use(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
item_name = content.strip()
if not item_name:
return False, "道友施法有误,指令格式不对"
player = self._get_player(sender, roomid or "")
if not player:
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
player = self._check_status_update(player)
inv = player.get("inventory") or {}
qty = int(inv.get(item_name, 0))
if qty <= 0:
return False, "物品不足"
if item_name == "聚灵符":
inv[item_name] = qty - 1
player["inventory"] = inv
player["gather_bonus"] = float(player.get("gather_bonus", 0.0)) + 0.2
if self.xdb:
try:
self.xdb.remove_item(sender, item_name, 1)
except Exception:
pass
self._save_player(player)
self._rate_set(sender, roomid or "", "使用")
await self._send_text_with_status(bot, sender, roomid, "✅ 符文已启,本次聚灵更为顺畅", 10)
return True, "使用聚灵符"
if item_name == "回气丹":
status = player.get("status", "Idle")
if status not in ("Unstable_Qi", "Injured"):
return False, "当前状态无需服用"
inv[item_name] = qty - 1
player["inventory"] = inv
player["status"] = "Idle"
player["status_until"] = None
if self.xdb:
try:
self.xdb.remove_item(sender, item_name, 1)
except Exception:
pass
self._save_player(player)
self._rate_set(sender, roomid or "", "使用")
await self._send_text_with_status(bot, sender, roomid, "✅ 服下回气丹,气息平稳,恢复空闲", 10)
return True, "服用回气丹"
# 灵根提升丹药
upgrade_map = {
"洗髓丹": {"from": "废灵根", "to": "凡灵根", "rate": 0.6, "injured_min": 30},
"改灵丹": {"from": "凡灵根", "to": "地灵根", "rate": 0.3, "injured_min": 60},
"天灵露": {"from": "地灵根", "to": "天灵根", "rate": 0.1, "injured_min": 120},
}
if item_name in upgrade_map:
cur_root = player.get("spirit_root", "凡灵根")
rule = upgrade_map[item_name]
if cur_root != rule["from"]:
return False, "当前灵根不适用"
# 扣减丹药
inv[item_name] = qty - 1
player["inventory"] = inv
if self.xdb:
try:
self.xdb.remove_item(sender, item_name, 1)
except Exception:
pass
roll = random.random()
if roll < rule["rate"]:
# 成功,更新灵根与倍率
target = rule["to"]
player["spirit_root"] = target
mult = 1.0
for name, m in self.spirit_roots:
if name == target:
mult = m
break
player["spirit_root_mult"] = mult
self._save_player(player)
self._rate_set(sender, roomid or "", "使用")
await self._send_text_with_status(bot, sender, roomid, f"✅ 灵根蜕变,当前灵根:{target}", 10)
return True, "灵根提升成功"
else:
# 失败,受伤保护
player["status"] = "Injured"
player["status_until"] = (
datetime.now(timezone.utc) + timedelta(minutes=rule["injured_min"])).isoformat()
self._save_player(player)
self._rate_set(sender, roomid or "", "使用")
await self._send_text_with_status(bot, sender, roomid,
f"❌ 灵根淬炼失利,灵气反噬,受伤{rule['injured_min']}分钟", 10)
return False, "灵根提升失败"
return False, "不可使用的物品"
async def _cmd_alchemy(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
parts = content.split()
if len(parts) < 2:
return False, "道友施法有误,指令格式不对"
item_name = parts[0]
try:
qty = int(parts[1])
except Exception:
return False, "道友施法有误,指令格式不对"
player = self._get_player(sender, roomid or "")
if not player:
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
player = self._check_status_update(player)
recipe = self.recipes.get(item_name)
if not recipe:
return False, "此丹方不传外人,尚未掌握"
total_stone = recipe["stone"] * qty
if int(player.get("spirit_stone", 0)) < total_stone:
return False, "灵石不够,修行不可强行为之"
inv = player.get("inventory") or {}
need = {}
for mk, mv in recipe["materials"].items():
need[mk] = mv * qty
for mk, nv in need.items():
if int(inv.get(mk, 0)) < nv:
return False, "乾坤袋中材料不足"
for mk, nv in need.items():
inv[mk] = int(inv.get(mk, 0)) - nv
success = 0
fail = 0
item_type = next((i["type"] for i in self.shop_items if i["name"] == item_name), "丹药")
for _ in range(qty):
if random.random() < float(recipe["rate"]):
success += 1
else:
fail += 1
player["spirit_stone"] = int(player.get("spirit_stone", 0)) - total_stone
inv[item_name] = int(inv.get(item_name, 0)) + success
player["inventory"] = inv
if fail > 0:
player["status"] = "Unstable_Qi"
player["status_until"] = (
datetime.now(timezone.utc) + timedelta(minutes=int(self.unstable_qi_minutes))).isoformat()
if self.xdb:
try:
self.xdb.update_player_fields(sender, player.get("group_id", ""),
{"spirit_stone": player["spirit_stone"], "status": player.get("status"),
"status_until": player.get("status_until")})
if success > 0:
self.xdb.add_item(sender, item_name, item_type, success)
for mk, nv in need.items():
if nv > 0:
self.xdb.remove_item(sender, mk, nv)
except Exception:
pass
self._save_player(player)
self._rate_set(sender, roomid or "", "炼丹")
msg = f"✅ 丹炉开盖,成丹{success},走丹{fail}"
await self._send_text_with_status(bot, sender, roomid, msg, 10)
return True, "炼丹完成"
async def _cmd_breakthrough(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
player = self._check_status_update(player)
points = int(player.get("cultivation_points", 0))
# 读取当前瓶颈配置
cur_realm = player.get("realm", "炼气1层")
prefix, layer = self._parse_realm(cur_realm)
# 优化检查是否真的达到10层
if layer is None or layer < 10:
await self._send_text_with_status(bot, sender, roomid, f"当前境界:{cur_realm},唯有至{prefix}10层方可冲关",
10)
return False, "未到瓶颈"
stage_key = f"{prefix}10层"
stage_conf = self.break_config.get(stage_key)
if not stage_conf:
await self._send_text_with_status(bot, sender, roomid, "此境瓶颈暂未开示,无法突破", 10)
return False, "未配置突破"
pill_conf = stage_conf.get("pill")
if not pill_conf:
await self._send_text_with_status(bot, sender, roomid, "丹道途径未明,暂不可试", 10)
return False, "未配置"
# 从配置中获取丹药名称
pill_item_name = pill_conf.get("item")
if not pill_item_name:
await self._send_text_with_status(bot, sender, roomid, "缺少突破丹药的指引", 10)
return False, "丹药未配置"
# 优化:从数据库读取最新的背包数据,确保数据准确性
pill_item_count = 0
if self.xdb:
try:
items = self.xdb.get_inventory(sender)
for item in items:
if item["name"] == pill_item_name:
pill_item_count = item["quantity"]
break
except Exception as e:
logger.warning(f"读取背包数据失败: {e}, user_id={sender}")
# 降级到使用内存中的inventory
inv = player.get("inventory") or {}
pill_item_count = inv.get(pill_item_name, 0)
else:
inv = player.get("inventory") or {}
pill_item_count = inv.get(pill_item_name, 0)
if points < pill_conf["cost"]:
await self._send_text_with_status(bot, sender, roomid, "底蕴未满,暂不可冲关", 10)
return False, "修为不足"
if pill_item_count <= 0:
await self._send_text_with_status(bot, sender, roomid, f"丹药未备:{pill_item_name}", 10)
return False, "缺少丹药"
# 更新内存中的inventory用于缓存
inv = player.get("inventory") or {}
inv[pill_item_name] = pill_item_count - 1
player["inventory"] = inv
player["cultivation_points"] = points - pill_conf["cost"]
if self.xdb:
try:
self.xdb.remove_item(sender, pill_item_name, 1)
self.xdb.update_player_fields(sender, player.get("group_id", ""),
{"cultivation_points": player["cultivation_points"]})
self.redis_db.invalidate_player(sender, player.get("group_id", ""))
except Exception as e:
logger.warning(f"突破时更新数据库失败: {e}, user_id={sender}")
roll = random.random()
if roll < pill_conf["rate"]:
# 成功,更新境界并排行榜
self._set_realm(sender, player, pill_conf["target"])
self.redis_db.leaderboard_add(player.get("group_id", ""), sender, float(player["cultivation_points"]))
self._rate_set(sender, roomid or "", "突破")
await self._send_text_with_status(bot, sender, roomid, f"✅ 雷霆破关,晋至{pill_conf['target']}", 10)
return True, "突破成功"
else:
# 失败时也要保存玩家数据
self._save_player(player)
self._rate_set(sender, roomid or "", "突破")
await self._send_text_with_status(bot, sender, roomid, "❌ 闭关未成,功亏一篑", 10)
return False, "突破失败"
async def _cmd_force_breakthrough(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
player = self._check_status_update(player)
points = int(player.get("cultivation_points", 0))
# 读取当前瓶颈配置
cur_realm = player.get("realm", "炼气1层")
prefix, layer = self._parse_realm(cur_realm)
# 优化检查是否真的达到10层
if layer is None or layer < 10:
await self._send_text_with_status(bot, sender, roomid,
f"当前境界:{cur_realm},至{prefix}10层方能冒险强行冲关", 10)
return False, "未到瓶颈"
stage_key = f"{prefix}10层"
stage_conf = self.break_config.get(stage_key)
if not stage_conf:
await self._send_text_with_status(bot, sender, roomid, "此境瓶颈暂未开示,无法强行冲关", 10)
return False, "未配置突破"
hard_conf = stage_conf.get("hard")
if not hard_conf:
await self._send_text_with_status(bot, sender, roomid, "强行之法未载,难以施为", 10)
return False, "未配置"
if points < hard_conf["cost"]:
await self._send_text_with_status(bot, sender, roomid, "底蕴不足,强行为之只会贻害", 10)
return False, "修为不足"
player["cultivation_points"] = points - hard_conf["cost"]
if self.xdb:
try:
self.xdb.update_player_fields(sender, player.get("group_id", ""),
{"cultivation_points": player["cultivation_points"]})
self.redis_db.invalidate_player(sender, player.get("group_id", ""))
except Exception as e:
logger.warning(f"强行突破时更新数据库失败: {e}, user_id={sender}")
roll = random.random()
if roll < hard_conf["rate"]:
self._set_realm(sender, player, hard_conf["target"])
self.redis_db.leaderboard_add(player.get("group_id", ""), sender, float(player["cultivation_points"]))
self._rate_set(sender, roomid or "", "强行突破")
await self._send_text_with_status(bot, sender, roomid, f"✅ 硬撼瓶颈,勉强晋至{hard_conf['target']}", 10)
return True, "强行突破成功"
else:
# 失败时也要保存玩家数据
self._save_player(player)
self._rate_set(sender, roomid or "", "强行突破")
await self._send_text_with_status(bot, sender, roomid, "❌ 强行冲关失败,灵气反噬!", 10)
return False, "强行突破失败"
async def _cmd_rob(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
target = content.strip().lstrip("@")
if not target:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:劫掠 目标wxid", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "命令格式错误"
if target == sender:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "不可劫掠自己", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "非法目标"
attacker = self._get_player(sender, roomid or "")
defender = self._get_player(target, roomid or "")
if not attacker or not defender:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "双方需已注册", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
attacker = self._check_status_update(attacker)
defender = self._check_status_update(defender)
# 新手保护:不可劫掠炼气期
def_prefix, _ = self._parse_realm(defender.get("realm", "炼气1层"))
if def_prefix == "炼气":
await self._send_text_with_status(bot, sender, roomid, "对方新入仙途,天道庇护,暂不可劫", 10)
return False, "新手保护"
if defender.get("status") in ("Cultivating", "Injured"):
await self._send_text_with_status(bot, sender, roomid, "对方正受天道护持或闭关参悟,切莫打扰", 10)
return False, "目标保护"
if roomid and (attacker.get("group_id") != roomid or defender.get("group_id") != roomid):
await self._send_text_with_status(bot, sender, roomid, "只可在同一仙门之境内行劫", 10)
return False, "跨群"
if attacker.get("clan_id") and defender.get("clan_id") and attacker.get("clan_id") == defender.get("clan_id"):
await self._send_text_with_status(bot, sender, roomid, "同门手足,不可相互劫掠", 10)
return False, "同门"
a_pts = int(attacker.get("cultivation_points", 0))
d_pts = int(defender.get("cultivation_points", 0))
base = 0.5
if a_pts > d_pts:
base += 0.2
elif a_pts < d_pts:
base -= 0.2
roll = random.random()
if roll < base:
d_stones = int(defender.get("spirit_stone", 0))
gain = max(0, int(d_stones * 0.2))
defender["spirit_stone"] = d_stones - gain
attacker["spirit_stone"] = int(attacker.get("spirit_stone", 0)) + gain
defender["status"] = "Injured"
defender["status_until"] = (
datetime.now(timezone.utc) + timedelta(minutes=int(self.injured_minutes))).isoformat()
if self.xdb:
try:
self.xdb.update_player_fields(defender.get("user_id"), defender.get("group_id", ""),
{"spirit_stone": defender["spirit_stone"],
"status": defender["status"],
"status_until": defender["status_until"]})
self.xdb.update_player_fields(attacker.get("user_id"), attacker.get("group_id", ""),
{"spirit_stone": attacker["spirit_stone"]})
except Exception:
pass
self._save_player(defender)
self._save_player(attacker)
self._rate_set(sender, roomid or "", "劫掠")
await self._send_text_with_status(bot, sender, roomid, f"✅ 劫掠得手,入账灵石{gain}", 10)
if roomid:
g_client_msg_id, g_create_time, g_new_msg_id = await bot.send_text_message(roomid,
f"{sender} 劫掠 {target} 得手,目标陷入重创,暂受天道庇护",
[target])
if self.revoke:
self.revoke.add_message_to_revoke(roomid, g_client_msg_id, g_create_time, g_new_msg_id, 10)
return True, "劫掠成功"
else:
self._rate_set(sender, roomid or "", "劫掠")
await self._send_text_with_status(bot, sender, roomid, "❌ 劫掠受挫,风声紧,暂且退去", 10)
return False, "劫掠失败"
async def _cmd_give_stone(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
parts = content.strip().split()
if len(parts) < 2:
await self._send_text_with_status(bot, sender, roomid, "格式:赠与 目标 数量", 10)
return False, "命令格式错误"
target = parts[0].lstrip("@")
try:
qty = int(parts[1])
except Exception:
await self._send_text_with_status(bot, sender, roomid, "格式:赠与 目标 数量", 10)
return False, "命令格式错误"
giver = self._get_player_with_cache(sender, roomid or "")
receiver = self._get_player_with_cache(target, roomid or "")
if not giver or not receiver:
await self._send_text_with_status(bot, sender, roomid, "需双方皆踏入仙途", 10)
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
if not giver.get("clan_id") or giver.get("clan_id") != receiver.get("clan_id"):
await self._send_text_with_status(bot, sender, roomid, "灵石只可同门相赠", 10)
return False, "不同门"
stones = int(giver.get("spirit_stone", 0))
if qty <= 0 or stones < qty:
await self._send_text_with_status(bot, sender, roomid, "灵石不足或数量有误", 10)
return False, "灵石不够,修行不可强行为之"
giver["spirit_stone"] = stones - qty
receiver["spirit_stone"] = int(receiver.get("spirit_stone", 0)) + qty
if self.xdb:
try:
self.xdb.update_player_fields(sender, giver.get("group_id", ""),
{"spirit_stone": giver["spirit_stone"]})
self.xdb.update_player_fields(target, receiver.get("group_id", ""),
{"spirit_stone": receiver["spirit_stone"]})
except Exception:
pass
# 使用_save_player确保同时保存到Redis和MariaDB
self._save_player(giver)
self._save_player(receiver)
self._rate_set(sender, roomid or "", "赠与")
await self._send_text_with_status(bot, sender, roomid, f"✅ 已向 {target} 相赠灵石 {qty}", 10)
return True, "赠与成功"
async def _cmd_give_item(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
parts = content.strip().split()
if len(parts) < 3:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:赠送 目标wxid 物品 数量", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友施法有误,指令格式不对"
target = parts[0].lstrip("@")
item_name = parts[1]
try:
qty = int(parts[2])
except Exception:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:赠送 目标wxid 物品 数量", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友施法有误,指令格式不对"
giver = self._get_player(sender, roomid or "")
receiver = self._get_player(target, roomid or "")
if not giver or not receiver:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "双方需已注册", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
inv_g = giver.get("inventory") or {}
if inv_g.get(item_name, 0) < qty or qty <= 0:
await self._send_text_with_status(bot, sender, roomid, "乾坤袋物品不足或数量有误", 10)
return False, "物品不足"
inv_r = receiver.get("inventory") or {}
inv_g[item_name] = inv_g.get(item_name, 0) - qty
inv_r[item_name] = int(inv_r.get(item_name, 0)) + qty
giver["inventory"] = inv_g
receiver["inventory"] = inv_r
if self.xdb:
ok = self.xdb.transfer_item(sender, target, item_name, qty)
if not ok:
await self._send_text_with_status(bot, sender, roomid, "物品转移不成", 10)
return False, "转移失败"
self.redis_db.invalidate_player(sender, giver.get("group_id", ""))
self.redis_db.invalidate_player(target, receiver.get("group_id", ""))
self._save_player(giver)
self._save_player(receiver)
self._rate_set(sender, roomid or "", "赠送")
await self._send_text_with_status(bot, sender, roomid, f"✅ 已向 {target} 赠送 {item_name} × {qty}", 10)
return True, "赠送成功"
async def _cmd_clan_create(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
name = content.strip()
if not name:
await self._send_text_with_status(bot, sender, roomid, "格式:创建门派 名称", 10)
return False, "命令格式错误"
player = self._get_player_with_cache(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
prefix, _ = self._parse_realm(player.get("realm", "炼气1层"))
allowed = {"元婴", "化神", "合体", "大乘", "渡劫", "真仙"}
if prefix not in allowed:
await self._send_text_with_status(bot, sender, roomid, "唯元婴以上方可立宗开派", 10)
return False, "境界不足"
clan_id = None
if self.xdb:
clan_id = self.xdb.create_clan(name, roomid or "", sender)
if clan_id is None:
await self._send_text_with_status(bot, sender, roomid, "此门已在或因故未成", 10)
return False, "门派失败"
if player and clan_id is not None:
player["clan_id"] = int(clan_id)
if self.xdb:
try:
self.xdb.update_player_fields(sender, player.get("group_id", ""), {"clan_id": player["clan_id"]})
self.redis_db.invalidate_player(sender, player.get("group_id", ""))
except Exception:
pass
self._save_player(player)
self._rate_set(sender, roomid or "", "创建门派")
await self._send_text_with_status(bot, sender, roomid, f"✅ 山门立成:{name}", 10)
return True, "创建门派"
async def _cmd_clan_join(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
name = content.strip()
if not name:
await self._send_text_with_status(bot, sender, roomid, "格式:加入门派 名称", 10)
return False, "命令格式错误"
player = self._get_player_with_cache(sender, roomid or "")
if not player:
await self._send_text_with_status(bot, sender, roomid, "道友尚未踏入仙途,请先发送:注册修仙 道号", 10)
return False, "道友尚未踏入仙途,请先发送:注册修仙 道号"
cid = None
if self.xdb:
cid = self.xdb.get_clan_id(roomid or "", name)
if not cid:
await self._send_text_with_status(bot, sender, roomid, "世间无此门派", 10)
return False, "门派不存在"
player["clan_id"] = int(cid) if isinstance(cid, str) else cid
if self.xdb:
try:
self.xdb.update_player_fields(sender, player.get("group_id", ""), {"clan_id": player["clan_id"]})
self.redis_db.invalidate_player(sender, player.get("group_id", ""))
except Exception:
pass
self._save_player(player)
self._rate_set(sender, roomid or "", "加入门派")
await self._send_text_with_status(bot, sender, roomid, f"✅ 已拜入:{name}", 10)
return True, "加入门派"
async def _cmd_clan_exit(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player_with_cache(sender, roomid or "")
if not player:
await self._send_text_with_status(bot, sender, roomid, "道友尚未踏入仙途,请先发送:注册修仙 道号", 10)
return False, "未注册"
player["clan_id"] = None
if self.xdb:
try:
self.xdb.update_player_fields(sender, player.get("group_id", ""), {"clan_id": None})
self.redis_db.invalidate_player(sender, player.get("group_id", ""))
except Exception:
pass
self._save_player(player)
self._rate_set(sender, roomid or "", "退出门派")
await self._send_text_with_status(bot, sender, roomid, "✅ 已离出山门", 10)
return True, "退出门派"