Files
abot/plugins/xiuxian/main.py
2025-11-19 08:59:58 +08:00

1301 lines
69 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
群聊文字修仙插件
说明:
- 原子指令 + 状态机 + 限流 + Cache-Aside
- 读优先 Redis写 DB 后删除缓存,由读取路径回填
- 群频次限制与用户冷却共同防骚扰/防封
"""
import json
import time
import random
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Tuple
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.decorator.points_decorator import plugin_points_cost
from utils.decorator.rate_limit_decorator import group_feature_rate_limit
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
from db.connection import DBConnectionManager
from db.xiuxian_db import XiuxianDB
from db.points_db import PointsDBOperator, PointSource
from datetime import timezone
class XiuxianRedisDB:
"""修仙插件的 Redis 访问封装:玩家缓存、限流键与排行榜。"""
def __init__(self, db_manager: DBConnectionManager):
self.db_manager = db_manager
self.player_prefix = "xiuxian:cache:player:"
self.leaderboard_key = None
self.leaderboard_realm_key = None
def set_leaderboard_key(self, key: str):
self.leaderboard_key = key
def set_realm_leaderboard_key(self, key: str):
self.leaderboard_realm_key = key
def get_redis(self):
return self.db_manager.get_redis_connection()
def get_player(self, user_id: str, group_id: str) -> Optional[Dict[str, Any]]:
"""读取玩家数据兼容旧格式无group_id和新格式有group_id"""
try:
with self.get_redis() as r:
# 优先尝试新格式xiuxian:cache:player:{group_id}:{user_id}
new_key = f"{self.player_prefix}{group_id}:{user_id}"
data = r.get(new_key)
if data:
if isinstance(data, bytes):
data = data.decode("utf-8")
return json.loads(data)
# 兼容旧格式xiuxian:cache:player:{user_id}
old_key = f"{self.player_prefix}{user_id}"
data = r.get(old_key)
if data:
if isinstance(data, bytes):
data = data.decode("utf-8")
player = json.loads(data)
# 如果旧数据没有 group_id使用传入的 group_id
if not player.get("group_id"):
player["group_id"] = group_id
# 迁移到新格式并删除旧格式
r.set(new_key, json.dumps(player, ensure_ascii=False))
r.delete(old_key)
return player
return None
except Exception as e:
logger.error(f"读取玩家数据失败: {e}")
return None
def save_player(self, player: Dict[str, Any]) -> bool:
"""保存玩家数据到新格式,同时删除旧格式(如果存在)"""
try:
with self.get_redis() as r:
user_id = player.get("user_id")
group_id = player.get("group_id", "")
new_key = f"{self.player_prefix}{group_id}:{user_id}"
old_key = f"{self.player_prefix}{user_id}"
# 保存到新格式
r.set(new_key, json.dumps(player, ensure_ascii=False))
# 删除旧格式(如果存在),确保数据一致性
r.delete(old_key)
return True
except Exception as e:
logger.error(f"保存玩家数据失败: {e}")
return False
def invalidate_player(self, user_id: str, group_id: str):
"""失效玩家缓存,同时删除新旧两种格式"""
try:
with self.get_redis() as r:
new_key = f"{self.player_prefix}{group_id}:{user_id}"
old_key = f"{self.player_prefix}{user_id}"
r.delete(new_key)
r.delete(old_key) # 兼容删除旧格式
except Exception as e:
logger.error(f"失效玩家缓存失败: {e}")
def set_rate_limit(self, user_id: str, cmd: str, seconds: int) -> bool:
try:
with self.get_redis() as r:
key = f"xiuxian:rate_limit:user:{user_id}:cmd:{cmd}"
r.setex(key, seconds, "1")
return True
except Exception as e:
logger.error(f"设置限流失败: {e}")
return False
def check_rate_limited(self, user_id: str, cmd: str) -> bool:
try:
with self.get_redis() as r:
key = f"xiuxian:rate_limit:user:{user_id}:cmd:{cmd}"
return r.exists(key) == 1
except Exception as e:
logger.error(f"检查限流失败: {e}")
return False
def leaderboard_add(self, user_id: str, score: float):
try:
if not self.leaderboard_key:
return
with self.get_redis() as r:
r.zadd(self.leaderboard_key, {user_id: score})
except Exception as e:
logger.error(f"更新排行榜失败: {e}")
def leaderboard_top(self, top_n: int = 10) -> List[Tuple[str, float]]:
try:
if not self.leaderboard_key:
return []
with self.get_redis() as r:
res = r.zrevrange(self.leaderboard_key, 0, top_n - 1, withscores=True)
return [(uid if isinstance(uid, str) else uid.decode("utf-8"), score) for uid, score in res]
except Exception as e:
logger.error(f"读取排行榜失败: {e}")
return []
def leaderboard_realm_add(self, user_id: str, score: float):
try:
if not self.leaderboard_realm_key:
return
with self.get_redis() as r:
r.zadd(self.leaderboard_realm_key, {user_id: score})
except Exception as e:
logger.error(f"更新境界排行榜失败: {e}")
def leaderboard_realm_top(self, top_n: int = 10) -> List[Tuple[str, float]]:
try:
if not self.leaderboard_realm_key:
return []
with self.get_redis() as r:
res = r.zrevrange(self.leaderboard_realm_key, 0, top_n - 1, withscores=True)
return [(uid if isinstance(uid, str) else uid.decode("utf-8"), score) for uid, score in res]
except Exception as e:
logger.error(f"读取境界排行榜失败: {e}")
return []
class XiuxianPlugin(MessagePluginInterface):
"""修仙主插件:指令分发、业务执行与 DB/Redis 协作。"""
FEATURE_KEY = "XIUXIAN"
FEATURE_DESCRIPTION = "🧙‍♂️ 文字修仙 [注册修仙|我的状态|闭关|出关|聚灵|排行榜]"
@property
def name(self) -> str:
return "群聊文字修仙"
@property
def version(self) -> str:
return "0.1.0"
@property
def description(self) -> str:
return "基于Redis的简化修仙玩法含闭关与状态机"
@property
def author(self) -> str:
return "AI助手"
@property
def command_prefix(self) -> Optional[str]:
return ""
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.feature = self.register_feature()
self.redis_db: Optional[XiuxianRedisDB] = None
self.xdb: Optional[XiuxianDB] = None
self.points_db: Optional[PointsDBOperator] = None
self.revoke = None
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件:加载配置,接入 DB 与 Redis并注册功能权限。"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
self.event_system = context.get("event_system")
self.db_manager = DBConnectionManager.get_instance()
if self.db_manager:
self.redis_db = XiuxianRedisDB(self.db_manager)
# 初始化持久化层(如连接池可用)
if self.db_manager.mysql_pool:
self.xdb = XiuxianDB(self.db_manager)
self.points_db = PointsDBOperator(self.db_manager)
try:
self.xdb.init_schema()
except Exception as e:
self.LOG.error(f"修仙表结构初始化失败: {e}")
cfg = self._config.get("Xiuxian", {})
self._commands = cfg.get("command", ["注册修仙", "我的状态", "闭关", "出关", "聚灵", "排行榜"])
self.command_format = cfg.get("command-format", "注册修仙 道号 | 我的状态 | 闭关 | 出关 | 聚灵 数量 | 排行榜")
self.enable = cfg.get("enable", True)
status_cfg = cfg.get("status", {})
self.unstable_qi_minutes = status_cfg.get("unstable_qi_minutes", 15)
self.injured_minutes = status_cfg.get("injured_minutes", 60)
self.max_cultivate_hours = status_cfg.get("max_cultivate_hours", 8)
rate_cfg = cfg.get("rate_limit", {})
self.seconds_rl = {
"我的状态": rate_cfg.get("status_seconds", 3),
"闭关": rate_cfg.get("inout_seconds", 5),
"出关": rate_cfg.get("inout_seconds", 5),
"聚灵": rate_cfg.get("gather_seconds", 30),
"排行榜": rate_cfg.get("break_seconds", 60),
"注册修仙": 5,
"签到": rate_cfg.get("signin_seconds", 86400),
"坊市": rate_cfg.get("shop_seconds", 10),
"购买": rate_cfg.get("buy_seconds", 5),
"乾坤袋": rate_cfg.get("bag_seconds", 3),
"突破": rate_cfg.get("break_seconds", 60),
"强行突破": rate_cfg.get("force_break_seconds", 60),
"劫掠": rate_cfg.get("rob_seconds", 30),
"赠与": rate_cfg.get("gift_seconds", 10),
"赠送": rate_cfg.get("gift_seconds", 10),
"创建门派": 86400,
"加入门派": 604800,
"退出门派": 604800,
"积分购石": rate_cfg.get("points_to_stone_seconds", 10),
"积分换灵石": rate_cfg.get("points_to_stone_seconds", 10),
}
cult_cfg = cfg.get("cultivation", {})
self.base_rate_per_hour = cult_cfg.get("base_rate_per_hour", 100)
self.spirit_roots_cfg = cult_cfg.get("spirit_roots", ["凡灵根:1.0", "天灵根:2.0"])
self.spirit_roots = []
for item in self.spirit_roots_cfg:
try:
name, mult = item.split(":")
self.spirit_roots.append((name, float(mult)))
except Exception:
pass
lb_cfg = cfg.get("leaderboard", {})
leaderboard_key = lb_cfg.get("key", "xiuxian:zset:leaderboard:cultivation")
realm_lb_key = lb_cfg.get("realm_key", "xiuxian:zset:leaderboard:realm")
if self.redis_db:
self.redis_db.set_leaderboard_key(leaderboard_key)
self.redis_db.set_realm_leaderboard_key(realm_lb_key)
shop_cfg = cfg.get("shop", {})
self.shop_items = []
for s in shop_cfg.get("items", []):
try:
n, t, p = s.split(":")
self.shop_items.append({"name": n, "type": t, "price": int(p)})
except Exception:
pass
pts_cfg = cfg.get("points_exchange", {})
self.point_to_stone_rate = int(pts_cfg.get("point_to_stone_rate", 10))
bt_cfg = cfg.get("breakthrough", {})
self.bt_pill_threshold = int(bt_cfg.get("pill_threshold", 5000))
self.bt_pill_item = bt_cfg.get("pill_item", "筑基丹")
self.bt_pill_success = float(bt_cfg.get("pill_success", 0.4))
self.bt_force_threshold = int(bt_cfg.get("force_threshold", 20000))
self.bt_force_success = float(bt_cfg.get("force_success", 0.1))
self.bt_force_next = bt_cfg.get("force_next_realm", "筑基2层")
# 解析境界分值与层级提升阈值
realm_score_cfg = self._config.get("Xiuxian", {}).get("realm_score", {})
self.realm_score_map = {}
for s in realm_score_cfg.get("stages", []):
try:
n, v = s.split(":")
self.realm_score_map[n] = int(v)
except Exception:
pass
layer_up_cfg = self._config.get("Xiuxian", {}).get("layer_up", {})
self.layer_threshold_map = {}
for s in layer_up_cfg.get("thresholds", []):
try:
n, v = s.split(":")
self.layer_threshold_map[n] = int(v)
except Exception:
pass
# 解析突破阶段配置
stage_cfg = self._config.get("Xiuxian", {}).get("breakthrough_stages", {})
self.break_config = {}
for p in stage_cfg.get("paths", []):
try:
cur, path, cost, rate, target = p.split(":")
if cur not in self.break_config:
self.break_config[cur] = {}
self.break_config[cur][path] = {
"cost": int(cost),
"rate": float(rate),
"target": target
}
except Exception:
pass
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
if not self.enable:
return False
content = str(message.get("content", "")).strip()
for cmd in self._commands:
if content.startswith(cmd):
return True
return False
@plugin_stats_decorator(plugin_name="群聊文字修仙")
@group_feature_rate_limit(max_per_minute=6, feature_key=FEATURE_KEY)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
content = str(message.get("content", "")).strip()
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
self.revoke = message.get("revoke")
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
cmd = None
for c in self._commands:
if content.startswith(c):
cmd = c
content = content[len(c):].strip()
break
if not cmd:
return False, "不匹配的命令"
if not self.redis_db:
client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "❌系统未初始化Redis", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "Redis未初始化"
if self.redis_db.check_rate_limited(sender, cmd):
client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "⚠️ 操作过于频繁,请稍候再试", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "限流"
if cmd == "注册修仙":
return await self._cmd_register(bot, sender, roomid, content)
if cmd in ("修仙帮助", "帮助", "help", "修仙指令", "指令"):
return await self._cmd_help(bot, sender, roomid)
if cmd in ("积分购石", "积分换灵石"):
return await self._cmd_points_to_stone(bot, sender, roomid, content)
if cmd == "我的状态":
return await self._cmd_status(bot, sender, roomid)
if cmd == "闭关":
return await self._cmd_cultivate(bot, sender, roomid)
if cmd == "出关":
return await self._cmd_finish_cultivate(bot, sender, roomid)
if cmd == "聚灵":
return await self._cmd_gather(bot, sender, roomid, content)
if cmd == "排行榜":
return await self._cmd_leaderboard(bot, sender, roomid)
if cmd == "修仙签到":
return await self._cmd_signin(bot, sender, roomid)
if cmd == "坊市":
return await self._cmd_shop(bot, sender, roomid)
if cmd == "购买":
return await self._cmd_buy(bot, sender, roomid, content)
if cmd == "乾坤袋":
return await self._cmd_bag(bot, sender, roomid)
if cmd == "突破":
return await self._cmd_breakthrough(bot, sender, roomid)
if cmd == "强行突破":
return await self._cmd_force_breakthrough(bot, sender, roomid)
if cmd == "劫掠":
return await self._cmd_rob(bot, sender, roomid, content)
if cmd == "赠与":
return await self._cmd_give_stone(bot, sender, roomid, content)
if cmd == "赠送":
return await self._cmd_give_item(bot, sender, roomid, content)
if cmd == "创建门派":
return await self._cmd_clan_create(bot, sender, roomid, content)
if cmd == "加入门派":
return await self._cmd_clan_join(bot, sender, roomid, content)
if cmd == "退出门派":
return await self._cmd_clan_exit(bot, sender, roomid)
return False, "未知命令"
def _rate_set(self, user_id: str, cmd: str):
"""设置用户维度 Redis 冷却键,用于防骚扰与防封。"""
seconds = self.seconds_rl.get(cmd, 3)
self.redis_db.set_rate_limit(user_id, cmd, seconds)
def _check_status_update(self, player: Dict[str, Any]) -> Dict[str, Any]:
"""状态机自动流转:过期的 Unstable_Qi/Injured 恢复为 Idle。"""
now = datetime.now(timezone.utc)
status = player.get("status", "Idle")
until_str = player.get("status_until")
until = None
if until_str:
try:
until = datetime.fromisoformat(until_str)
except Exception:
until = None
if until and until.tzinfo is None:
until = until.replace(tzinfo=timezone.utc)
if status in ("Unstable_Qi", "Injured") and until and now >= until:
player["status"] = "Idle"
player["status_until"] = None
self._save_player(player)
return player
async def _cmd_register(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
dao_name = content.strip()
if not dao_name:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "命令格式错误"
player = self.redis_db.get_player(sender, roomid or "")
if player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "⚠️ 已注册,无需重复注册", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
# self._rate_set(sender, "注册修仙")
return True, "已注册"
root_name, mult = random.choice(self.spirit_roots) if self.spirit_roots else ("凡灵根", 1.0)
player = {
"user_id": sender,
"group_id": roomid or "",
"dao_name": dao_name,
"realm": "炼气1层",
"spirit_root": f"{root_name}",
"spirit_root_mult": mult,
"cultivation_points": 0,
"spirit_stone": 0,
"status": "Idle",
"status_until": None,
"last_cultivate_time": None,
}
if self.xdb:
try:
self.xdb.create_player(player)
except Exception:
pass
self._save_player(player)
self._rate_set(sender, "注册修仙")
# 初始化境界排行榜分值
self.redis_db.leaderboard_realm_add(sender, float(self._realm_score(player["realm"])) )
client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), f"✅ 注册成功,道号:{dao_name}\n灵根:{root_name}", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10)
return True, "注册成功"
async def _cmd_help(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
lines = ["📜 修仙帮助"]
lines.append("用法提示:")
lines.append(self.command_format.strip())
lines.append("可用指令:")
lines.append(" | ".join(self._commands))
lines.append("常用参数格式:")
lines.append("注册修仙 道号")
lines.append("聚灵 数量")
lines.append("购买 物品 数量")
lines.append("赠与 目标wxid 数量")
lines.append("赠送 目标wxid 物品 数量")
lines.append("劫掠 目标wxid")
lines.append("创建门派 名称")
lines.append("加入门派 名称")
lines.append("积分购石 积分数")
msg = "\n".join(lines)
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), msg, sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 30)
# self._rate_set(sender, "帮助")
return True, "帮助"
async def _cmd_points_to_stone(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
if not self.points_db:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "系统未初始化积分模块", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "积分未初始化"
player = self._get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
try:
pts = int(content.strip())
except Exception:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:积分购石 积分数", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "命令格式错误"
if pts <= 0:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "积分数需为正整数", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "非法数量"
group_id = roomid or ""
ok, res = self.points_db.deduct_points(sender, group_id, pts, PointSource.PLUGIN, "修仙购买灵石")
if not ok:
cur = res.get("current_points", 0)
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"积分不足,当前积分:{cur}", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "积分不足"
rate = int(self.point_to_stone_rate)
if pts < rate:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"积分不足以兑换1灵石至少需要{rate}积分", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "积分不足"
stones_gain = pts // rate
player["spirit_stone"] = int(player.get("spirit_stone", 0)) + stones_gain
self._save_player(player)
self._rate_set(sender, "积分购石")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 兑换成功,消耗积分{pts},获得灵石{stones_gain}{rate}积分=1灵石", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10)
return True, "积分购石"
async def _cmd_status(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self.redis_db.get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
player = self._check_status_update(player)
status_code = player.get("status")
status_cn = {
"Idle": "空闲",
"Cultivating": "闭关",
"Unstable_Qi": "气息不稳",
"Injured": "受伤保护",
}.get(status_code, str(status_code))
msg = (
f"📇 道号:{player.get('dao_name')}{player.get('spirit_root')}\n"
f"✨ 境界:{player.get('realm')}(修为:{player.get('cultivation_points')}点)\n"
f"💎 灵石:{player.get('spirit_stone')}\n"
f"💚 状态:{status_cn}"
)
client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), msg, sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 20)
self._rate_set(sender, "我的状态")
return True, "状态展示"
async def _cmd_cultivate(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self.redis_db.get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
player = self._check_status_update(player)
status = player.get("status", "Idle")
if status == "Cultivating":
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "⚠️ 已在闭关中", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "重复闭关"
if status not in ("Idle", "Injured"):
status_cn = {
"Idle": "空闲",
"Cultivating": "闭关",
"Unstable_Qi": "气息不稳",
"Injured": "受伤保护",
}.get(status, str(status))
client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), f"当前状态[{status_cn}]不可闭关", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "状态不可闭关"
player["status"] = "Cultivating"
player["last_cultivate_time"] = datetime.now(timezone.utc).isoformat()
self._save_player(player)
self._rate_set(sender, "闭关")
client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "✅ 已进入闭关,期间安全不可被劫掠", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10)
return True, "闭关成功"
async def _cmd_finish_cultivate(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self.redis_db.get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
player = self._check_status_update(player)
if player.get("status") != "Cultivating":
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "⚠️ 非闭关状态,无需出关", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "非闭关"
start_iso = player.get("last_cultivate_time")
try:
start = datetime.fromisoformat(start_iso) if start_iso else datetime.now(timezone.utc)
except Exception:
start = datetime.now(timezone.utc)
if start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
duration_hours = (now - start).total_seconds() / 3600.0
duration_hours = max(0.0, min(duration_hours, float(self.max_cultivate_hours)))
rate = self.base_rate_per_hour * float(player.get("spirit_root_mult", 1.0))
gain = int(duration_hours * rate)
player["cultivation_points"] = int(player.get("cultivation_points", 0)) + gain
player["status"] = "Unstable_Qi"
player["status_until"] = (now + timedelta(minutes=int(self.unstable_qi_minutes))).isoformat()
player["last_cultivate_time"] = None
self._save_player(player)
# 自动层级提升(不跨瓶颈)
self._auto_layer_up(sender, player)
self.redis_db.leaderboard_add(sender, float(player["cultivation_points"]))
self._rate_set(sender, "出关")
client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), f"✅ 出关成功,获得修为:{gain}{duration_hours:.1f}小时)\n当前修为:{player['cultivation_points']}\n状态:气息不稳 {self.unstable_qi_minutes}分钟", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 20)
return True, "出关结算"
async def _cmd_gather(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
player = self.redis_db.get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
player = self._check_status_update(player)
try:
qty = int(content.strip())
except Exception:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "命令格式错误"
stones = int(player.get("spirit_stone", 0))
if qty <= 0 or stones < qty:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "⚠️ 灵石不足或数量不合法", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "灵石不足"
player["spirit_stone"] = stones - qty
player["cultivation_points"] = int(player.get("cultivation_points", 0)) + qty * 10
self._save_player(player)
# 自动层级提升(不跨瓶颈)
self._auto_layer_up(sender, player)
self.redis_db.leaderboard_add(sender, float(player["cultivation_points"]))
self._rate_set(sender, "聚灵")
client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), f"✅ 聚灵成功,消耗灵石{qty},获得修为{qty * 10}", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10)
return True, "聚灵成功"
async def _cmd_leaderboard(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
top = self.redis_db.leaderboard_top(10)
lines = ["🏆 修为排行榜 Top10"]
rank = 1
for uid, score in top:
mark = "" if uid == sender else ""
lines.append(f"{rank}. {uid} - {int(score)} {mark}")
rank += 1
client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "\n".join(lines), sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 20)
self._rate_set(sender, "排行榜")
return True, "排行榜"
def _get_player(self, user_id: str, group_id: str) -> Optional[Dict[str, Any]]:
return self._get_player_with_cache(user_id, group_id)
def _save_player(self, player: Dict[str, Any]):
if self.xdb:
fields = {}
for k in ("group_id","dao_name","realm","spirit_root","cultivation_points","spirit_stone","status","status_until","last_cultivate_time","clan_id"):
if k in player:
fields[k] = player[k]
try:
self.xdb.update_player_fields(player.get("user_id"), fields)
except Exception:
pass
self.redis_db.save_player(player)
def _get_player_with_cache(self, user_id: str, group_id: str) -> Optional[Dict[str, Any]]:
p = self.redis_db.get_player(user_id, group_id)
if p:
return p
if self.xdb:
dbp = self.xdb.get_player(user_id)
if dbp and dbp.get("group_id", "") == group_id:
dbp.setdefault("spirit_root_mult", 1.0)
dbp.setdefault("inventory", {})
self.redis_db.save_player(dbp)
return dbp
return None
def _parse_realm(self, realm: str) -> Tuple[str, Optional[int]]:
try:
if "" in realm:
prefix, layer_str = realm.split("")[0], realm.split(" ")[-1] if " " in realm else None
# expected format: 前缀N层例如 炼气1层/筑基10层
import re
m = re.match(r"^(.*?)?(\d+)层$", realm)
if m:
prefix = m.group(1)
layer = int(m.group(2))
return prefix, layer
except Exception:
pass
return realm, None
def _realm_score(self, realm: str) -> int:
prefix, layer = self._parse_realm(realm)
base = self.realm_score_map.get(prefix, 0)
return base + (layer or 0)
def _set_realm(self, user_id: str, player: Dict[str, Any], new_realm: str):
player["realm"] = new_realm
if self.xdb:
try:
self.xdb.update_player_fields(user_id, {"realm": new_realm})
self.redis_db.invalidate_player(user_id, player.get("group_id", ""))
except Exception:
pass
self._save_player(player)
# 更新境界排行榜
self.redis_db.leaderboard_realm_add(user_id, float(self._realm_score(new_realm)))
def _auto_layer_up(self, user_id: str, player: Dict[str, Any]):
prefix, layer = self._parse_realm(player.get("realm", "凡人"))
if layer is None or layer >= 10:
return
threshold = self.layer_threshold_map.get(prefix)
if not threshold:
return
pts = int(player.get("cultivation_points", 0))
new_layer = max(layer, min(10, pts // threshold))
if new_layer != layer:
self._set_realm(user_id, player, f"{prefix}{new_layer}")
async def _cmd_signin(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
player = self._check_status_update(player)
if self.redis_db.check_rate_limited(sender, "签到"):
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "今日已签到,请明日再来", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "已签到"
reward = 50
player["spirit_stone"] = int(player.get("spirit_stone", 0)) + reward
self._save_player(player)
self._rate_set(sender, "签到")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 签到成功,获得灵石{reward}", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10)
return True, "签到成功"
async def _cmd_shop(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
lines = ["🛒 坊市商品"]
for item in self.shop_items:
lines.append(f"{item['name']} [{item['type']}] - {item['price']}灵石")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "\n".join(lines), sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 20)
self._rate_set(sender, "坊市")
return True, "坊市"
async def _cmd_buy(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
parts = content.split()
if len(parts) < 2:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"❌命令格式错误!\n购买 物品 数量", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "命令格式错误"
item_name = parts[0]
try:
qty = int(parts[1])
except Exception:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"❌命令格式错误!\n购买 物品 数量", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "命令格式错误"
player = self._get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
player = self._check_status_update(player)
item = next((i for i in self.shop_items if i["name"] == item_name), None)
if not item:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "商品不存在", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "商品不存在"
total = item["price"] * qty
stones = int(player.get("spirit_stone", 0))
if qty <= 0 or stones < total:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "灵石不足", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "灵石不足"
player["spirit_stone"] = stones - total
inv = player.get("inventory") or {}
inv[item_name] = int(inv.get(item_name, 0)) + qty
player["inventory"] = inv
if self.xdb:
try:
self.xdb.update_player_fields(sender, {"spirit_stone": player["spirit_stone"]})
self.xdb.add_item(sender, item_name, item.get("type","other"), qty)
except Exception:
pass
self._save_player(player)
self._rate_set(sender, "购买")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 购买成功,{item_name} × {qty}", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10)
return True, "购买成功"
async def _cmd_bag(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
inv = player.get("inventory") or {}
items = []
if self.xdb:
try:
items = self.xdb.get_inventory(sender)
except Exception:
items = []
lines = ["🎒 背包"]
if not items:
lines.append("")
else:
for it in items:
lines.append(f"{it['name']} × {it['quantity']}")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "\n".join(lines), sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 20)
self._rate_set(sender, "背包")
return True, "背包"
async def _cmd_breakthrough(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
player = self._check_status_update(player)
points = int(player.get("cultivation_points", 0))
inv = player.get("inventory") or {}
# 读取当前瓶颈配置
cur_realm = player.get("realm", "炼气1层")
prefix, layer = self._parse_realm(cur_realm)
stage_key = f"{prefix}10层"
stage_conf = self.break_config.get(stage_key)
if not stage_conf:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "当前境界未到瓶颈或未配置突破路径", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未到瓶颈"
pill_conf = stage_conf.get("pill")
if not pill_conf:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "丹药路径未配置", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未配置"
if points < pill_conf["cost"]:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "修为不足,无法突破", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "修为不足"
if inv.get(self.bt_pill_item, 0) <= 0:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"缺少丹药:{self.bt_pill_item}", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "缺少丹药"
inv[self.bt_pill_item] = inv.get(self.bt_pill_item, 0) - 1
player["inventory"] = inv
player["cultivation_points"] = points - pill_conf["cost"]
if self.xdb:
try:
self.xdb.remove_item(sender, self.bt_pill_item, 1)
self.xdb.update_player_fields(sender, {"cultivation_points": player["cultivation_points"]})
self.redis_db.invalidate_player(sender, player.get("group_id", ""))
except Exception:
pass
roll = random.random()
if roll < pill_conf["rate"]:
# 成功,更新境界并排行榜
self._set_realm(sender, player, pill_conf["target"])
self.redis_db.leaderboard_add(sender, float(player["cultivation_points"]))
self._rate_set(sender, "突破")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 突破成功,晋升至{pill_conf['target']}", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10)
return True, "突破成功"
else:
# 失败时也要保存玩家数据
self._save_player(player)
self._rate_set(sender, "突破")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "❌ 突破失败", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "突破失败"
async def _cmd_force_breakthrough(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
player = self._check_status_update(player)
points = int(player.get("cultivation_points", 0))
# 读取当前瓶颈配置
cur_realm = player.get("realm", "炼气1层")
prefix, layer = self._parse_realm(cur_realm)
stage_key = f"{prefix}10层"
stage_conf = self.break_config.get(stage_key)
if not stage_conf:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "当前境界未到瓶颈或未配置突破路径", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未到瓶颈"
hard_conf = stage_conf.get("hard")
if not hard_conf:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "强行路径未配置", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未配置"
if points < hard_conf["cost"]:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "修为不足,无法强行突破", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "修为不足"
player["cultivation_points"] = points - hard_conf["cost"]
if self.xdb:
try:
self.xdb.update_player_fields(sender, {"cultivation_points": player["cultivation_points"]})
self.redis_db.invalidate_player(sender, player.get("group_id", ""))
except Exception:
pass
roll = random.random()
if roll < hard_conf["rate"]:
self._set_realm(sender, player, hard_conf["target"])
self.redis_db.leaderboard_add(sender, float(player["cultivation_points"]))
self._rate_set(sender, "强行突破")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 强行突破成功,晋升至{hard_conf['target']}", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10)
return True, "强行突破成功"
else:
# 失败时也要保存玩家数据
self._save_player(player)
self._rate_set(sender, "强行突破")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "❌ 强行突破失败,灵气反噬!", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "强行突破失败"
async def _cmd_rob(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
target = content.strip().lstrip("@")
if not target:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:劫掠 目标wxid", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "命令格式错误"
if target == sender:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "不可劫掠自己", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "非法目标"
attacker = self._get_player(sender, roomid or "")
defender = self._get_player(target, roomid or "")
if not attacker or not defender:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "双方需已注册", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
attacker = self._check_status_update(attacker)
defender = self._check_status_update(defender)
# 新手保护:不可劫掠炼气期
def_prefix, _ = self._parse_realm(defender.get("realm", "炼气1层"))
if def_prefix == "炼气":
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "目标处于新手保护期(炼气),不可劫掠", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "新手保护"
if defender.get("status") in ("Cultivating", "Injured"):
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "目标处于保护或闭关中", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "目标保护"
if roomid and (attacker.get("group_id") != roomid or defender.get("group_id") != roomid):
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "仅限同群内劫掠", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "跨群"
if attacker.get("clan_id") and defender.get("clan_id") and attacker.get("clan_id") == defender.get("clan_id"):
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "不可劫掠同门", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "同门"
a_pts = int(attacker.get("cultivation_points", 0))
d_pts = int(defender.get("cultivation_points", 0))
base = 0.5
if a_pts > d_pts:
base += 0.2
elif a_pts < d_pts:
base -= 0.2
roll = random.random()
if roll < base:
d_stones = int(defender.get("spirit_stone", 0))
gain = max(0, int(d_stones * 0.2))
defender["spirit_stone"] = d_stones - gain
attacker["spirit_stone"] = int(attacker.get("spirit_stone", 0)) + gain
defender["status"] = "Injured"
defender["status_until"] = (datetime.now(timezone.utc) + timedelta(minutes=int(self.injured_minutes))).isoformat()
if self.xdb:
try:
self.xdb.update_player_fields(defender.get("user_id"), {"spirit_stone": defender["spirit_stone"], "status": defender["status"], "status_until": defender["status_until"]})
self.xdb.update_player_fields(attacker.get("user_id"), {"spirit_stone": attacker["spirit_stone"]})
except Exception:
pass
self._save_player(defender)
self._save_player(attacker)
self._rate_set(sender, "劫掠")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 劫掠成功,获得灵石{gain}", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10)
if roomid:
g_client_msg_id, g_create_time, g_new_msg_id = await bot.send_text_message(roomid, f"{sender} 劫掠 {target} 成功,目标进入受伤保护", [target])
if self.revoke:
self.revoke.add_message_to_revoke(roomid, g_client_msg_id, g_create_time, g_new_msg_id, 10)
return True, "劫掠成功"
else:
self._rate_set(sender, "劫掠")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "❌ 劫掠失败", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "劫掠失败"
async def _cmd_give_stone(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
parts = content.strip().split()
if len(parts) < 2:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:赠与 目标wxid 数量", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "命令格式错误"
target = parts[0].lstrip("@")
try:
qty = int(parts[1])
except Exception:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:赠与 目标wxid 数量", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "命令格式错误"
giver = self._get_player_with_cache(sender, roomid or "")
receiver = self._get_player_with_cache(target, roomid or "")
if not giver or not receiver:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "双方需已注册", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
if not giver.get("clan_id") or giver.get("clan_id") != receiver.get("clan_id"):
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "仅同门可赠与灵石", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "不同门"
stones = int(giver.get("spirit_stone", 0))
if qty <= 0 or stones < qty:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "灵石不足或数量不合法", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "灵石不足"
giver["spirit_stone"] = stones - qty
receiver["spirit_stone"] = int(receiver.get("spirit_stone", 0)) + qty
if self.xdb:
try:
self.xdb.update_player_fields(sender, {"spirit_stone": giver["spirit_stone"]})
self.xdb.update_player_fields(target, {"spirit_stone": receiver["spirit_stone"]})
except Exception:
pass
# 使用_save_player确保同时保存到Redis和MariaDB
self._save_player(giver)
self._save_player(receiver)
self._rate_set(sender, "赠与")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 已向 {target} 赠与灵石 {qty}", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10)
return True, "赠与成功"
async def _cmd_give_item(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
parts = content.strip().split()
if len(parts) < 3:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:赠送 目标wxid 物品 数量", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "命令格式错误"
target = parts[0].lstrip("@")
item_name = parts[1]
try:
qty = int(parts[2])
except Exception:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:赠送 目标wxid 物品 数量", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "命令格式错误"
giver = self._get_player(sender, roomid or "")
receiver = self._get_player(target, roomid or "")
if not giver or not receiver:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "双方需已注册", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
inv_g = giver.get("inventory") or {}
if inv_g.get(item_name, 0) < qty or qty <= 0:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "物品不足或数量不合法", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "物品不足"
inv_r = receiver.get("inventory") or {}
inv_g[item_name] = inv_g.get(item_name, 0) - qty
inv_r[item_name] = int(inv_r.get(item_name, 0)) + qty
giver["inventory"] = inv_g
receiver["inventory"] = inv_r
if self.xdb:
ok = self.xdb.transfer_item(sender, target, item_name, qty)
if not ok:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "物品转移失败", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "转移失败"
self.redis_db.invalidate_player(sender, giver.get("group_id", ""))
self.redis_db.invalidate_player(target, receiver.get("group_id", ""))
self._save_player(giver)
self._save_player(receiver)
self._rate_set(sender, "赠送")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 已向 {target} 赠送 {item_name} × {qty}", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10)
return True, "赠送成功"
async def _cmd_clan_create(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
name = content.strip()
if not name:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:创建门派 名称", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "命令格式错误"
player = self._get_player_with_cache(sender, roomid or "")
if not player:
# client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
# if self.revoke:
# self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
prefix, _ = self._parse_realm(player.get("realm", "炼气1层"))
allowed = {"元婴", "化神", "合体", "大乘", "渡劫", "真仙"}
if prefix not in allowed:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "创建门派需达到元婴期及以上", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "境界不足"
clan_id = None
if self.xdb:
clan_id = self.xdb.create_clan(name, roomid or "", sender)
if clan_id is None:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "门派已存在或创建失败", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "门派失败"
if player and clan_id is not None:
player["clan_id"] = int(clan_id)
if self.xdb:
try:
self.xdb.update_player_fields(sender, {"clan_id": player["clan_id"]})
self.redis_db.invalidate_player(sender, player.get("group_id", ""))
except Exception:
pass
self._save_player(player)
self._rate_set(sender, "创建门派")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 门派已创建:{name}", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10)
return True, "创建门派"
async def _cmd_clan_join(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]:
name = content.strip()
if not name:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:加入门派 名称", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "命令格式错误"
player = self._get_player_with_cache(sender, roomid or "")
if not player:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
cid = None
if self.xdb:
cid = self.xdb.get_clan_id(roomid or "", name)
if not cid:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "门派不存在", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "门派不存在"
player["clan_id"] = int(cid) if isinstance(cid, str) else cid
if self.xdb:
try:
self.xdb.update_player_fields(sender, {"clan_id": player["clan_id"]})
self.redis_db.invalidate_player(sender, player.get("group_id", ""))
except Exception:
pass
self._save_player(player)
self._rate_set(sender, "加入门派")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 已加入门派:{name}", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10)
return True, "加入门派"
async def _cmd_clan_exit(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]:
player = self._get_player_with_cache(sender, roomid or "")
if not player:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5)
return False, "未注册"
player["clan_id"] = None
if self.xdb:
try:
self.xdb.update_player_fields(sender, {"clan_id": None})
self.redis_db.invalidate_player(sender, player.get("group_id", ""))
except Exception:
pass
self._save_player(player)
self._rate_set(sender, "退出门派")
client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "✅ 已退出门派", sender)
if self.revoke:
self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10)
return True, "退出门派"