""" 群聊文字修仙插件 说明: - 原子指令 + 状态机 + 限流 + Cache-Aside - 读优先 Redis;写 DB 后删除缓存,由读取路径回填 - 群频次限制与用户冷却共同防骚扰/防封 """ import json import time import random from datetime import datetime, timedelta from typing import Dict, Any, List, Optional, Tuple from loguru import logger from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.decorator.rate_limit_decorator import group_feature_rate_limit from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from wechat_ipad import WechatAPIClient from db.connection import DBConnectionManager from db.xiuxian_db import XiuxianDB from db.points_db import PointsDBOperator, PointSource from datetime import timezone class XiuxianRedisDB: """修仙插件的 Redis 访问封装:玩家缓存、限流键与排行榜。""" def __init__(self, db_manager: DBConnectionManager): self.db_manager = db_manager self.player_prefix = "xiuxian:cache:player:" self.leaderboard_key = None self.leaderboard_realm_key = None def set_leaderboard_key(self, key: str): self.leaderboard_key = key def set_realm_leaderboard_key(self, key: str): self.leaderboard_realm_key = key def get_redis(self): return self.db_manager.get_redis_connection() def get_player(self, user_id: str, group_id: str) -> Optional[Dict[str, Any]]: """读取玩家数据,兼容旧格式(无group_id)和新格式(有group_id)""" try: with self.get_redis() as r: # 优先尝试新格式:xiuxian:cache:player:{group_id}:{user_id} new_key = f"{self.player_prefix}{group_id}:{user_id}" data = r.get(new_key) if data: if isinstance(data, bytes): data = data.decode("utf-8") return json.loads(data) # 兼容旧格式:xiuxian:cache:player:{user_id} old_key = f"{self.player_prefix}{user_id}" data = r.get(old_key) if data: if isinstance(data, bytes): data = data.decode("utf-8") player = json.loads(data) # 如果旧数据没有 group_id,使用传入的 group_id if not player.get("group_id"): player["group_id"] = group_id # 迁移到新格式并删除旧格式 r.set(new_key, json.dumps(player, ensure_ascii=False)) r.delete(old_key) return player return None except Exception as e: logger.error(f"读取玩家数据失败: {e}") return None def save_player(self, player: Dict[str, Any]) -> bool: """保存玩家数据到新格式,同时删除旧格式(如果存在)""" try: with self.get_redis() as r: user_id = player.get("user_id") group_id = player.get("group_id", "") new_key = f"{self.player_prefix}{group_id}:{user_id}" old_key = f"{self.player_prefix}{user_id}" # 保存到新格式 r.set(new_key, json.dumps(player, ensure_ascii=False)) # 删除旧格式(如果存在),确保数据一致性 r.delete(old_key) return True except Exception as e: logger.error(f"保存玩家数据失败: {e}") return False def invalidate_player(self, user_id: str, group_id: str): """失效玩家缓存,同时删除新旧两种格式""" try: with self.get_redis() as r: new_key = f"{self.player_prefix}{group_id}:{user_id}" old_key = f"{self.player_prefix}{user_id}" r.delete(new_key) r.delete(old_key) # 兼容删除旧格式 except Exception as e: logger.error(f"失效玩家缓存失败: {e}") def set_rate_limit(self, user_id: str, cmd: str, seconds: int) -> bool: try: with self.get_redis() as r: key = f"xiuxian:rate_limit:user:{user_id}:cmd:{cmd}" r.setex(key, seconds, "1") return True except Exception as e: logger.error(f"设置限流失败: {e}") return False def check_rate_limited(self, user_id: str, cmd: str) -> bool: try: with self.get_redis() as r: key = f"xiuxian:rate_limit:user:{user_id}:cmd:{cmd}" return r.exists(key) == 1 except Exception as e: logger.error(f"检查限流失败: {e}") return False def leaderboard_add(self, user_id: str, score: float): try: if not self.leaderboard_key: return with self.get_redis() as r: r.zadd(self.leaderboard_key, {user_id: score}) except Exception as e: logger.error(f"更新排行榜失败: {e}") def leaderboard_top(self, top_n: int = 10) -> List[Tuple[str, float]]: try: if not self.leaderboard_key: return [] with self.get_redis() as r: res = r.zrevrange(self.leaderboard_key, 0, top_n - 1, withscores=True) return [(uid if isinstance(uid, str) else uid.decode("utf-8"), score) for uid, score in res] except Exception as e: logger.error(f"读取排行榜失败: {e}") return [] def leaderboard_realm_add(self, user_id: str, score: float): try: if not self.leaderboard_realm_key: return with self.get_redis() as r: r.zadd(self.leaderboard_realm_key, {user_id: score}) except Exception as e: logger.error(f"更新境界排行榜失败: {e}") def leaderboard_realm_top(self, top_n: int = 10) -> List[Tuple[str, float]]: try: if not self.leaderboard_realm_key: return [] with self.get_redis() as r: res = r.zrevrange(self.leaderboard_realm_key, 0, top_n - 1, withscores=True) return [(uid if isinstance(uid, str) else uid.decode("utf-8"), score) for uid, score in res] except Exception as e: logger.error(f"读取境界排行榜失败: {e}") return [] class XiuxianPlugin(MessagePluginInterface): """修仙主插件:指令分发、业务执行与 DB/Redis 协作。""" FEATURE_KEY = "XIUXIAN" FEATURE_DESCRIPTION = "🧙‍♂️ 文字修仙 [注册修仙|我的状态|闭关|出关|聚灵|排行榜]" @property def name(self) -> str: return "群聊文字修仙" @property def version(self) -> str: return "0.1.0" @property def description(self) -> str: return "基于Redis的简化修仙玩法,含闭关与状态机" @property def author(self) -> str: return "AI助手" @property def command_prefix(self) -> Optional[str]: return "" @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.feature = self.register_feature() self.redis_db: Optional[XiuxianRedisDB] = None self.xdb: Optional[XiuxianDB] = None self.points_db: Optional[PointsDBOperator] = None self.revoke = None def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件:加载配置,接入 DB 与 Redis,并注册功能权限。""" self.LOG = logger self.LOG.info(f"正在初始化 {self.name} 插件...") self.event_system = context.get("event_system") self.db_manager = DBConnectionManager.get_instance() if self.db_manager: self.redis_db = XiuxianRedisDB(self.db_manager) # 初始化持久化层(如连接池可用) if self.db_manager.mysql_pool: self.xdb = XiuxianDB(self.db_manager) self.points_db = PointsDBOperator(self.db_manager) try: self.xdb.init_schema() except Exception as e: self.LOG.error(f"修仙表结构初始化失败: {e}") cfg = self._config.get("Xiuxian", {}) self._commands = cfg.get("command", ["注册修仙", "我的状态", "闭关", "出关", "聚灵", "排行榜"]) self.command_format = cfg.get("command-format", "注册修仙 道号 | 我的状态 | 闭关 | 出关 | 聚灵 数量 | 排行榜") self.enable = cfg.get("enable", True) status_cfg = cfg.get("status", {}) self.unstable_qi_minutes = status_cfg.get("unstable_qi_minutes", 15) self.injured_minutes = status_cfg.get("injured_minutes", 60) self.max_cultivate_hours = status_cfg.get("max_cultivate_hours", 8) rate_cfg = cfg.get("rate_limit", {}) self.seconds_rl = { "我的状态": rate_cfg.get("status_seconds", 3), "闭关": rate_cfg.get("inout_seconds", 5), "出关": rate_cfg.get("inout_seconds", 5), "聚灵": rate_cfg.get("gather_seconds", 30), "排行榜": rate_cfg.get("break_seconds", 60), "注册修仙": 5, "签到": rate_cfg.get("signin_seconds", 86400), "坊市": rate_cfg.get("shop_seconds", 10), "购买": rate_cfg.get("buy_seconds", 5), "乾坤袋": rate_cfg.get("bag_seconds", 3), "突破": rate_cfg.get("break_seconds", 60), "强行突破": rate_cfg.get("force_break_seconds", 60), "劫掠": rate_cfg.get("rob_seconds", 30), "赠与": rate_cfg.get("gift_seconds", 10), "赠送": rate_cfg.get("gift_seconds", 10), "创建门派": 86400, "加入门派": 604800, "退出门派": 604800, "积分购石": rate_cfg.get("points_to_stone_seconds", 10), "积分换灵石": rate_cfg.get("points_to_stone_seconds", 10), } cult_cfg = cfg.get("cultivation", {}) self.base_rate_per_hour = cult_cfg.get("base_rate_per_hour", 100) self.spirit_roots_cfg = cult_cfg.get("spirit_roots", ["凡灵根:1.0", "天灵根:2.0"]) self.spirit_roots = [] for item in self.spirit_roots_cfg: try: name, mult = item.split(":") self.spirit_roots.append((name, float(mult))) except Exception as e: self.LOG.warning(f"解析灵根配置失败: {item}, 错误: {e}") lb_cfg = cfg.get("leaderboard", {}) leaderboard_key = lb_cfg.get("key", "xiuxian:zset:leaderboard:cultivation") realm_lb_key = lb_cfg.get("realm_key", "xiuxian:zset:leaderboard:realm") if self.redis_db: self.redis_db.set_leaderboard_key(leaderboard_key) self.redis_db.set_realm_leaderboard_key(realm_lb_key) shop_cfg = cfg.get("shop", {}) self.shop_items = [] for s in shop_cfg.get("items", []): try: n, t, p = s.split(":") self.shop_items.append({"name": n, "type": t, "price": int(p)}) except Exception as e: self.LOG.warning(f"解析商品配置失败: {s}, 错误: {e}") pts_cfg = cfg.get("points_exchange", {}) self.point_to_stone_rate = int(pts_cfg.get("point_to_stone_rate", 10)) # 统一解析境界配置:每层阈值、境界分值、突破规则 realms_cfg = self._config.get("Xiuxian", {}).get("realms", {}) if not realms_cfg: self.LOG.warning("未找到境界配置 [Xiuxian.realms],使用默认配置") # 提供默认配置作为后备 realms_cfg = { "炼气": "1000,10,筑基丹,0.4,0.1,2.0", "筑基": "5000,20,金元丹,0.2,0.05,2.0", "金丹": "50000,30,结婴丹,0.15,0.03,2.0", "元婴": "200000,40,化神丹,0.1,0.02,2.0", "化神": "1000000,50,合体丹,0.08,0.01,2.0", "合体": "5000000,60,大乘丹,0.05,0.005,2.0", "大乘": "10000000,70,渡劫丹,0.03,0.003,2.0", "渡劫": "50000000,80,飞升丹,0.02,0.001,2.0", "真仙": "0,100,,0,0,0" } self.realm_score_map = {} # 境界分值映射 self.layer_threshold_map = {} # 每层修为阈值 self.break_config = {} # 突破配置:{境界10层: {pill: {...}, hard: {...}}} # 境界顺序(用于计算下一个境界) realm_order = ["炼气", "筑基", "金丹", "元婴", "化神", "合体", "大乘", "渡劫", "真仙"] self.LOG.info(f"开始解析境界配置,共{len(realms_cfg)}个境界") for realm_name, config_str in realms_cfg.items(): try: # 解析配置:每层阈值,境界分值,突破丹药,丹药成功率,强行成功率,强行倍率 parts = config_str.split(",") if len(parts) < 6: self.LOG.warning(f"境界配置格式错误: {realm_name}={config_str}") continue layer_threshold = int(parts[0].strip()) realm_score = int(parts[1].strip()) pill_item = parts[2].strip() if parts[2].strip() else None pill_rate = float(parts[3].strip()) if parts[3].strip() else 0.0 hard_rate = float(parts[4].strip()) if parts[4].strip() else 0.0 hard_multiplier = float(parts[5].strip()) if parts[5].strip() else 2.0 # 存储层级阈值和境界分值 self.layer_threshold_map[realm_name] = layer_threshold self.realm_score_map[realm_name] = realm_score # 计算突破要求:达到10层需要9层×每层阈值 breakthrough_cost = 9 * layer_threshold hard_breakthrough_cost = int(breakthrough_cost * hard_multiplier) # 计算下一个境界 next_realm = None if realm_name != "真仙": try: current_idx = realm_order.index(realm_name) if current_idx + 1 < len(realm_order): next_realm = realm_order[current_idx + 1] except ValueError: pass # 构建突破配置 if next_realm and layer_threshold > 0: stage_key = f"{realm_name}10层" self.break_config[stage_key] = {} # 丹药突破路径 if pill_item and pill_rate > 0: self.break_config[stage_key]["pill"] = { "cost": breakthrough_cost, "rate": pill_rate, "target": f"{next_realm}1层", "item": pill_item } # 强行突破路径 if hard_rate > 0: self.break_config[stage_key]["hard"] = { "cost": hard_breakthrough_cost, "rate": hard_rate, "target": f"{next_realm}2层" } except Exception as e: self.LOG.warning(f"解析境界配置失败: {realm_name}={config_str}, 错误: {e}") self.LOG.info(f"境界配置解析完成:层级阈值{len(self.layer_threshold_map)}个,突破配置{len(self.break_config)}个") self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: if not self.enable: return False content = str(message.get("content", "")).strip() for cmd in self._commands: if content.startswith(cmd): return True return False @plugin_stats_decorator(plugin_name="群聊文字修仙") @group_feature_rate_limit(max_per_minute=6, feature_key=FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: content = str(message.get("content", "")).strip() sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") self.revoke = message.get("revoke") if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" cmd = None for c in self._commands: if content.startswith(c): cmd = c content = content[len(c):].strip() break if not cmd: return False, "不匹配的命令" if not self.redis_db: client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "❌系统未初始化Redis", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "Redis未初始化" if self.redis_db.check_rate_limited(sender, cmd): client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "⚠️ 操作过于频繁,请稍候再试", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "限流" if cmd == "注册修仙": return await self._cmd_register(bot, sender, roomid, content) if cmd in ("修仙帮助", "帮助", "help", "修仙指令", "指令"): return await self._cmd_help(bot, sender, roomid) if cmd in ("积分购石", "积分换灵石"): return await self._cmd_points_to_stone(bot, sender, roomid, content) if cmd == "我的状态": return await self._cmd_status(bot, sender, roomid) if cmd == "闭关": return await self._cmd_cultivate(bot, sender, roomid) if cmd == "出关": return await self._cmd_finish_cultivate(bot, sender, roomid) if cmd == "聚灵": return await self._cmd_gather(bot, sender, roomid, content) if cmd == "排行榜": return await self._cmd_leaderboard(bot, sender, roomid) if cmd == "修仙签到": return await self._cmd_signin(bot, sender, roomid) if cmd == "坊市": return await self._cmd_shop(bot, sender, roomid) if cmd == "购买": return await self._cmd_buy(bot, sender, roomid, content) if cmd == "乾坤袋": return await self._cmd_bag(bot, sender, roomid) if cmd == "突破": return await self._cmd_breakthrough(bot, sender, roomid) if cmd == "强行突破": return await self._cmd_force_breakthrough(bot, sender, roomid) if cmd == "劫掠": return await self._cmd_rob(bot, sender, roomid, content) if cmd == "赠与": return await self._cmd_give_stone(bot, sender, roomid, content) if cmd == "赠送": return await self._cmd_give_item(bot, sender, roomid, content) if cmd == "创建门派": return await self._cmd_clan_create(bot, sender, roomid, content) if cmd == "加入门派": return await self._cmd_clan_join(bot, sender, roomid, content) if cmd == "退出门派": return await self._cmd_clan_exit(bot, sender, roomid) return False, "未知命令" def _rate_set(self, user_id: str, cmd: str): """设置用户维度 Redis 冷却键,用于防骚扰与防封。""" seconds = self.seconds_rl.get(cmd, 3) self.redis_db.set_rate_limit(user_id, cmd, seconds) def _check_status_update(self, player: Dict[str, Any]) -> Dict[str, Any]: """状态机自动流转:过期的 Unstable_Qi/Injured 恢复为 Idle。""" now = datetime.now(timezone.utc) status = player.get("status", "Idle") until_str = player.get("status_until") until = None if until_str: try: until = datetime.fromisoformat(until_str) except Exception: until = None if until and until.tzinfo is None: until = until.replace(tzinfo=timezone.utc) if status in ("Unstable_Qi", "Injured") and until and now >= until: player["status"] = "Idle" player["status_until"] = None self._save_player(player) return player async def _cmd_register(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: dao_name = content.strip() if not dao_name: # client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "命令格式错误" player = self.redis_db.get_player(sender, roomid or "") if player: # client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "⚠️ 已注册,无需重复注册", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) # self._rate_set(sender, "注册修仙") return True, "已注册" root_name, mult = random.choice(self.spirit_roots) if self.spirit_roots else ("凡灵根", 1.0) player = { "user_id": sender, "group_id": roomid or "", "dao_name": dao_name, "realm": "炼气1层", "spirit_root": f"{root_name}", "spirit_root_mult": mult, "cultivation_points": 0, "spirit_stone": 0, "status": "Idle", "status_until": None, "last_cultivate_time": None, } if self.xdb: try: self.xdb.create_player(player) except Exception as e: logger.warning(f"创建玩家数据库记录失败: {e}, user_id={sender}") self._save_player(player) self._rate_set(sender, "注册修仙") # 初始化境界排行榜分值 self.redis_db.leaderboard_realm_add(sender, float(self._realm_score(player["realm"])) ) client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), f"✅ 注册成功,道号:{dao_name}\n灵根:{root_name}", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10) return True, "注册成功" async def _cmd_help(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: lines = ["📜 修仙帮助"] lines.append("用法提示:") lines.append(self.command_format.strip()) lines.append("可用指令:") lines.append(" | ".join(self._commands)) lines.append("常用参数格式:") lines.append("注册修仙 道号") lines.append("聚灵 数量") lines.append("购买 物品 数量") lines.append("赠与 目标wxid 数量") lines.append("赠送 目标wxid 物品 数量") lines.append("劫掠 目标wxid") lines.append("创建门派 名称") lines.append("加入门派 名称") lines.append("积分购石 积分数") msg = "\n".join(lines) # client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), msg, sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 30) # self._rate_set(sender, "帮助") return True, "帮助" async def _cmd_points_to_stone(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: if not self.points_db: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "系统未初始化积分模块", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "积分未初始化" player = self._get_player(sender, roomid or "") if not player: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" try: pts = int(content.strip()) except Exception: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:积分购石 积分数", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "命令格式错误" if pts <= 0: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "积分数需为正整数", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "非法数量" group_id = roomid or "" ok, res = self.points_db.deduct_points(sender, group_id, pts, PointSource.PLUGIN, "修仙购买灵石") if not ok: cur = res.get("current_points", 0) # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"积分不足,当前积分:{cur}", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "积分不足" rate = int(self.point_to_stone_rate) if pts < rate: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"积分不足以兑换1灵石,至少需要{rate}积分", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "积分不足" stones_gain = pts // rate player["spirit_stone"] = int(player.get("spirit_stone", 0)) + stones_gain self._save_player(player) self._rate_set(sender, "积分购石") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 兑换成功,消耗积分{pts},获得灵石{stones_gain}({rate}积分=1灵石)", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10) return True, "积分购石" async def _cmd_status(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: player = self.redis_db.get_player(sender, roomid or "") if not player: # client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" player = self._check_status_update(player) status_code = player.get("status") status_cn = { "Idle": "空闲", "Cultivating": "闭关", "Unstable_Qi": "气息不稳", "Injured": "受伤保护", }.get(status_code, str(status_code)) msg = ( f"📇 道号:{player.get('dao_name')}【{player.get('spirit_root')}】\n" f"✨ 境界:{player.get('realm')}(修为:{player.get('cultivation_points')}点)\n" f"💎 灵石:{player.get('spirit_stone')}枚\n" f"💚 状态:{status_cn}" ) client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), msg, sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 20) self._rate_set(sender, "我的状态") return True, "状态展示" async def _cmd_cultivate(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: player = self.redis_db.get_player(sender, roomid or "") if not player: # client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" player = self._check_status_update(player) status = player.get("status", "Idle") if status == "Cultivating": # client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "⚠️ 已在闭关中", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "重复闭关" if status not in ("Idle", "Injured"): status_cn = { "Idle": "空闲", "Cultivating": "闭关", "Unstable_Qi": "气息不稳", "Injured": "受伤保护", }.get(status, str(status)) client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), f"当前状态[{status_cn}]不可闭关", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "状态不可闭关" player["status"] = "Cultivating" player["last_cultivate_time"] = datetime.now(timezone.utc).isoformat() self._save_player(player) self._rate_set(sender, "闭关") client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "✅ 已进入闭关,期间安全不可被劫掠", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10) return True, "闭关成功" async def _cmd_finish_cultivate(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: player = self.redis_db.get_player(sender, roomid or "") if not player: # client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" player = self._check_status_update(player) if player.get("status") != "Cultivating": # client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "⚠️ 非闭关状态,无需出关", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "非闭关" start_iso = player.get("last_cultivate_time") try: start = datetime.fromisoformat(start_iso) if start_iso else datetime.now(timezone.utc) except Exception: start = datetime.now(timezone.utc) if start.tzinfo is None: start = start.replace(tzinfo=timezone.utc) now = datetime.now(timezone.utc) duration_hours = (now - start).total_seconds() / 3600.0 duration_hours = max(0.0, min(duration_hours, float(self.max_cultivate_hours))) rate = self.base_rate_per_hour * float(player.get("spirit_root_mult", 1.0)) gain = int(duration_hours * rate) player["cultivation_points"] = int(player.get("cultivation_points", 0)) + gain player["status"] = "Unstable_Qi" player["status_until"] = (now + timedelta(minutes=int(self.unstable_qi_minutes))).isoformat() player["last_cultivate_time"] = None self._save_player(player) # 自动层级提升(不跨瓶颈) self._auto_layer_up(sender, player) self.redis_db.leaderboard_add(sender, float(player["cultivation_points"])) self._rate_set(sender, "出关") client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), f"✅ 出关成功,获得修为:{gain}({duration_hours:.1f}小时)\n当前修为:{player['cultivation_points']}\n状态:气息不稳 {self.unstable_qi_minutes}分钟", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 20) return True, "出关结算" async def _cmd_gather(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: player = self.redis_db.get_player(sender, roomid or "") if not player: # client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" player = self._check_status_update(player) try: qty = int(content.strip()) except Exception: # client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "命令格式错误" stones = int(player.get("spirit_stone", 0)) if qty <= 0 or stones < qty: # client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "⚠️ 灵石不足或数量不合法", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "灵石不足" player["spirit_stone"] = stones - qty player["cultivation_points"] = int(player.get("cultivation_points", 0)) + qty * 10 self._save_player(player) # 自动层级提升(不跨瓶颈) self._auto_layer_up(sender, player) self.redis_db.leaderboard_add(sender, float(player["cultivation_points"])) self._rate_set(sender, "聚灵") client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), f"✅ 聚灵成功,消耗灵石{qty},获得修为{qty * 10}", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10) return True, "聚灵成功" async def _cmd_leaderboard(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: top = self.redis_db.leaderboard_top(10) lines = ["🏆 修为排行榜 Top10"] rank = 1 for uid, score in top: mark = "你" if uid == sender else "" lines.append(f"{rank}. {uid} - {int(score)} {mark}") rank += 1 client_msg_id, create_time, new_msg_id = await bot.send_text_message((roomid if roomid else sender), "\n".join(lines), sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 20) self._rate_set(sender, "排行榜") return True, "排行榜" def _get_player(self, user_id: str, group_id: str) -> Optional[Dict[str, Any]]: return self._get_player_with_cache(user_id, group_id) def _save_player(self, player: Dict[str, Any]): if self.xdb: fields = {} for k in ("group_id","dao_name","realm","spirit_root","cultivation_points","spirit_stone","status","status_until","last_cultivate_time","clan_id"): if k in player: fields[k] = player[k] try: self.xdb.update_player_fields(player.get("user_id"), fields) except Exception as e: logger.warning(f"更新玩家字段失败: {e}, user_id={player.get('user_id')}") self.redis_db.save_player(player) def _get_player_with_cache(self, user_id: str, group_id: str) -> Optional[Dict[str, Any]]: """从缓存或数据库获取玩家数据,并补充缺失字段""" p = self.redis_db.get_player(user_id, group_id) if p: return p if self.xdb: dbp = self.xdb.get_player(user_id) if dbp and dbp.get("group_id", "") == group_id: # 补充spirit_root_mult:根据spirit_root名称查找对应的倍率 if "spirit_root_mult" not in dbp: spirit_root_name = dbp.get("spirit_root", "凡灵根") mult = 1.0 for name, m in self.spirit_roots: if name == spirit_root_name: mult = m break dbp["spirit_root_mult"] = mult # 补充inventory:从数据库读取背包数据 if "inventory" not in dbp: try: items = self.xdb.get_inventory(user_id) dbp["inventory"] = {item["name"]: item["quantity"] for item in items} except Exception as e: logger.warning(f"读取背包数据失败: {e}") dbp["inventory"] = {} self.redis_db.save_player(dbp) return dbp return None def _parse_realm(self, realm: str) -> Tuple[str, Optional[int]]: try: if "层" in realm: prefix, layer_str = realm.split("层")[0], realm.split(" ")[-1] if " " in realm else None # expected format: 前缀N层,例如 炼气1层/筑基10层 import re m = re.match(r"^(.*?)?(\d+)层$", realm) if m: prefix = m.group(1) layer = int(m.group(2)) return prefix, layer except Exception: pass return realm, None def _realm_score(self, realm: str) -> int: prefix, layer = self._parse_realm(realm) base = self.realm_score_map.get(prefix, 0) return base + (layer or 0) def _set_realm(self, user_id: str, player: Dict[str, Any], new_realm: str): player["realm"] = new_realm if self.xdb: try: self.xdb.update_player_fields(user_id, {"realm": new_realm}) self.redis_db.invalidate_player(user_id, player.get("group_id", "")) except Exception: pass self._save_player(player) # 更新境界排行榜 self.redis_db.leaderboard_realm_add(user_id, float(self._realm_score(new_realm))) def _auto_layer_up(self, user_id: str, player: Dict[str, Any]): """自动层级提升:根据修为自动提升层数(不跨瓶颈) 逻辑: - 炼气1层:0-999修为 - 炼气2层:1000-1999修为 - ... - 炼气10层:9000-9999修为 计算公式:new_layer = min(10, (pts // threshold) + 1) """ prefix, layer = self._parse_realm(player.get("realm", "凡人")) if layer is None or layer >= 10: return threshold = self.layer_threshold_map.get(prefix) if not threshold: return pts = int(player.get("cultivation_points", 0)) # 修复:计算应该达到的层数 = (修为 // 每层阈值) + 1 # 例如:1000修为 → 1000 // 1000 = 1 → 1 + 1 = 2层 calculated_layer = min(10, (pts // threshold) + 1) # 只允许提升,不允许降低 new_layer = max(layer, calculated_layer) if new_layer != layer: self._set_realm(user_id, player, f"{prefix}{new_layer}层") async def _cmd_signin(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: player = self._get_player(sender, roomid or "") if not player: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" player = self._check_status_update(player) if self.redis_db.check_rate_limited(sender, "签到"): # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "今日已签到,请明日再来", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "已签到" reward = 50 player["spirit_stone"] = int(player.get("spirit_stone", 0)) + reward self._save_player(player) self._rate_set(sender, "签到") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 签到成功,获得灵石{reward}", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10) return True, "签到成功" async def _cmd_shop(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: lines = ["🛒 坊市商品"] for item in self.shop_items: lines.append(f"{item['name']} [{item['type']}] - {item['price']}灵石") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "\n".join(lines), sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 20) self._rate_set(sender, "坊市") return True, "坊市" async def _cmd_buy(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: parts = content.split() if len(parts) < 2: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"❌命令格式错误!\n购买 物品 数量", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "命令格式错误" item_name = parts[0] try: qty = int(parts[1]) except Exception: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"❌命令格式错误!\n购买 物品 数量", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "命令格式错误" player = self._get_player(sender, roomid or "") if not player: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" player = self._check_status_update(player) item = next((i for i in self.shop_items if i["name"] == item_name), None) if not item: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "商品不存在", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "商品不存在" total = item["price"] * qty stones = int(player.get("spirit_stone", 0)) if qty <= 0 or stones < total: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "灵石不足", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "灵石不足" player["spirit_stone"] = stones - total inv = player.get("inventory") or {} inv[item_name] = int(inv.get(item_name, 0)) + qty player["inventory"] = inv if self.xdb: try: self.xdb.update_player_fields(sender, {"spirit_stone": player["spirit_stone"]}) self.xdb.add_item(sender, item_name, item.get("type","other"), qty) except Exception: pass self._save_player(player) self._rate_set(sender, "购买") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 购买成功,{item_name} × {qty}", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10) return True, "购买成功" async def _cmd_bag(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: player = self._get_player(sender, roomid or "") if not player: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" inv = player.get("inventory") or {} items = [] if self.xdb: try: items = self.xdb.get_inventory(sender) except Exception: items = [] lines = ["🎒 背包"] if not items: lines.append("空") else: for it in items: lines.append(f"{it['name']} × {it['quantity']}") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "\n".join(lines), sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 20) self._rate_set(sender, "背包") return True, "背包" async def _cmd_breakthrough(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: player = self._get_player(sender, roomid or "") if not player: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" player = self._check_status_update(player) points = int(player.get("cultivation_points", 0)) # 读取当前瓶颈配置 cur_realm = player.get("realm", "炼气1层") prefix, layer = self._parse_realm(cur_realm) # 优化:检查是否真的达到10层 if layer is None or layer < 10: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"当前境界为{cur_realm},需达到{prefix}10层才能突破", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未到瓶颈" stage_key = f"{prefix}10层" stage_conf = self.break_config.get(stage_key) if not stage_conf: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "当前境界未配置突破路径", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未配置突破" pill_conf = stage_conf.get("pill") if not pill_conf: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "丹药路径未配置", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未配置" # 从配置中获取丹药名称 pill_item_name = pill_conf.get("item") if not pill_item_name: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "突破丹药未配置", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "丹药未配置" # 优化:从数据库读取最新的背包数据,确保数据准确性 pill_item_count = 0 if self.xdb: try: items = self.xdb.get_inventory(sender) for item in items: if item["name"] == pill_item_name: pill_item_count = item["quantity"] break except Exception as e: logger.warning(f"读取背包数据失败: {e}, user_id={sender}") # 降级到使用内存中的inventory inv = player.get("inventory") or {} pill_item_count = inv.get(pill_item_name, 0) else: inv = player.get("inventory") or {} pill_item_count = inv.get(pill_item_name, 0) if points < pill_conf["cost"]: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "修为不足,无法突破", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "修为不足" if pill_item_count <= 0: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"缺少丹药:{pill_item_name}", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "缺少丹药" # 更新内存中的inventory(用于缓存) inv = player.get("inventory") or {} inv[pill_item_name] = pill_item_count - 1 player["inventory"] = inv player["cultivation_points"] = points - pill_conf["cost"] if self.xdb: try: self.xdb.remove_item(sender, pill_item_name, 1) self.xdb.update_player_fields(sender, {"cultivation_points": player["cultivation_points"]}) self.redis_db.invalidate_player(sender, player.get("group_id", "")) except Exception as e: logger.warning(f"突破时更新数据库失败: {e}, user_id={sender}") roll = random.random() if roll < pill_conf["rate"]: # 成功,更新境界并排行榜 self._set_realm(sender, player, pill_conf["target"]) self.redis_db.leaderboard_add(sender, float(player["cultivation_points"])) self._rate_set(sender, "突破") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 突破成功,晋升至{pill_conf['target']}", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10) return True, "突破成功" else: # 失败时也要保存玩家数据 self._save_player(player) self._rate_set(sender, "突破") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "❌ 突破失败", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "突破失败" async def _cmd_force_breakthrough(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: player = self._get_player(sender, roomid or "") if not player: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" player = self._check_status_update(player) points = int(player.get("cultivation_points", 0)) # 读取当前瓶颈配置 cur_realm = player.get("realm", "炼气1层") prefix, layer = self._parse_realm(cur_realm) # 优化:检查是否真的达到10层 if layer is None or layer < 10: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"当前境界为{cur_realm},需达到{prefix}10层才能强行突破", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未到瓶颈" stage_key = f"{prefix}10层" stage_conf = self.break_config.get(stage_key) if not stage_conf: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "当前境界未配置突破路径", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未配置突破" hard_conf = stage_conf.get("hard") if not hard_conf: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "强行路径未配置", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未配置" if points < hard_conf["cost"]: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "修为不足,无法强行突破", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "修为不足" player["cultivation_points"] = points - hard_conf["cost"] if self.xdb: try: self.xdb.update_player_fields(sender, {"cultivation_points": player["cultivation_points"]}) self.redis_db.invalidate_player(sender, player.get("group_id", "")) except Exception as e: logger.warning(f"强行突破时更新数据库失败: {e}, user_id={sender}") roll = random.random() if roll < hard_conf["rate"]: self._set_realm(sender, player, hard_conf["target"]) self.redis_db.leaderboard_add(sender, float(player["cultivation_points"])) self._rate_set(sender, "强行突破") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 强行突破成功,晋升至{hard_conf['target']}", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10) return True, "强行突破成功" else: # 失败时也要保存玩家数据 self._save_player(player) self._rate_set(sender, "强行突破") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "❌ 强行突破失败,灵气反噬!", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "强行突破失败" async def _cmd_rob(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: target = content.strip().lstrip("@") if not target: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:劫掠 目标wxid", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "命令格式错误" if target == sender: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "不可劫掠自己", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "非法目标" attacker = self._get_player(sender, roomid or "") defender = self._get_player(target, roomid or "") if not attacker or not defender: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "双方需已注册", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" attacker = self._check_status_update(attacker) defender = self._check_status_update(defender) # 新手保护:不可劫掠炼气期 def_prefix, _ = self._parse_realm(defender.get("realm", "炼气1层")) if def_prefix == "炼气": client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "目标处于新手保护期(炼气),不可劫掠", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "新手保护" if defender.get("status") in ("Cultivating", "Injured"): client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "目标处于保护或闭关中", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "目标保护" if roomid and (attacker.get("group_id") != roomid or defender.get("group_id") != roomid): client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "仅限同群内劫掠", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "跨群" if attacker.get("clan_id") and defender.get("clan_id") and attacker.get("clan_id") == defender.get("clan_id"): client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "不可劫掠同门", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "同门" a_pts = int(attacker.get("cultivation_points", 0)) d_pts = int(defender.get("cultivation_points", 0)) base = 0.5 if a_pts > d_pts: base += 0.2 elif a_pts < d_pts: base -= 0.2 roll = random.random() if roll < base: d_stones = int(defender.get("spirit_stone", 0)) gain = max(0, int(d_stones * 0.2)) defender["spirit_stone"] = d_stones - gain attacker["spirit_stone"] = int(attacker.get("spirit_stone", 0)) + gain defender["status"] = "Injured" defender["status_until"] = (datetime.now(timezone.utc) + timedelta(minutes=int(self.injured_minutes))).isoformat() if self.xdb: try: self.xdb.update_player_fields(defender.get("user_id"), {"spirit_stone": defender["spirit_stone"], "status": defender["status"], "status_until": defender["status_until"]}) self.xdb.update_player_fields(attacker.get("user_id"), {"spirit_stone": attacker["spirit_stone"]}) except Exception: pass self._save_player(defender) self._save_player(attacker) self._rate_set(sender, "劫掠") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 劫掠成功,获得灵石{gain}", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10) if roomid: g_client_msg_id, g_create_time, g_new_msg_id = await bot.send_text_message(roomid, f"{sender} 劫掠 {target} 成功,目标进入受伤保护", [target]) if self.revoke: self.revoke.add_message_to_revoke(roomid, g_client_msg_id, g_create_time, g_new_msg_id, 10) return True, "劫掠成功" else: self._rate_set(sender, "劫掠") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "❌ 劫掠失败", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "劫掠失败" async def _cmd_give_stone(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: parts = content.strip().split() if len(parts) < 2: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:赠与 目标wxid 数量", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "命令格式错误" target = parts[0].lstrip("@") try: qty = int(parts[1]) except Exception: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:赠与 目标wxid 数量", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "命令格式错误" giver = self._get_player_with_cache(sender, roomid or "") receiver = self._get_player_with_cache(target, roomid or "") if not giver or not receiver: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "双方需已注册", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" if not giver.get("clan_id") or giver.get("clan_id") != receiver.get("clan_id"): client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "仅同门可赠与灵石", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "不同门" stones = int(giver.get("spirit_stone", 0)) if qty <= 0 or stones < qty: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "灵石不足或数量不合法", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "灵石不足" giver["spirit_stone"] = stones - qty receiver["spirit_stone"] = int(receiver.get("spirit_stone", 0)) + qty if self.xdb: try: self.xdb.update_player_fields(sender, {"spirit_stone": giver["spirit_stone"]}) self.xdb.update_player_fields(target, {"spirit_stone": receiver["spirit_stone"]}) except Exception: pass # 使用_save_player确保同时保存到Redis和MariaDB self._save_player(giver) self._save_player(receiver) self._rate_set(sender, "赠与") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 已向 {target} 赠与灵石 {qty}", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10) return True, "赠与成功" async def _cmd_give_item(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: parts = content.strip().split() if len(parts) < 3: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:赠送 目标wxid 物品 数量", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "命令格式错误" target = parts[0].lstrip("@") item_name = parts[1] try: qty = int(parts[2]) except Exception: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:赠送 目标wxid 物品 数量", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "命令格式错误" giver = self._get_player(sender, roomid or "") receiver = self._get_player(target, roomid or "") if not giver or not receiver: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "双方需已注册", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" inv_g = giver.get("inventory") or {} if inv_g.get(item_name, 0) < qty or qty <= 0: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "物品不足或数量不合法", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "物品不足" inv_r = receiver.get("inventory") or {} inv_g[item_name] = inv_g.get(item_name, 0) - qty inv_r[item_name] = int(inv_r.get(item_name, 0)) + qty giver["inventory"] = inv_g receiver["inventory"] = inv_r if self.xdb: ok = self.xdb.transfer_item(sender, target, item_name, qty) if not ok: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "物品转移失败", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "转移失败" self.redis_db.invalidate_player(sender, giver.get("group_id", "")) self.redis_db.invalidate_player(target, receiver.get("group_id", "")) self._save_player(giver) self._save_player(receiver) self._rate_set(sender, "赠送") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 已向 {target} 赠送 {item_name} × {qty}", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10) return True, "赠送成功" async def _cmd_clan_create(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: name = content.strip() if not name: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:创建门派 名称", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "命令格式错误" player = self._get_player_with_cache(sender, roomid or "") if not player: # client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) # if self.revoke: # self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" prefix, _ = self._parse_realm(player.get("realm", "炼气1层")) allowed = {"元婴", "化神", "合体", "大乘", "渡劫", "真仙"} if prefix not in allowed: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "创建门派需达到元婴期及以上", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "境界不足" clan_id = None if self.xdb: clan_id = self.xdb.create_clan(name, roomid or "", sender) if clan_id is None: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "门派已存在或创建失败", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "门派失败" if player and clan_id is not None: player["clan_id"] = int(clan_id) if self.xdb: try: self.xdb.update_player_fields(sender, {"clan_id": player["clan_id"]}) self.redis_db.invalidate_player(sender, player.get("group_id", "")) except Exception: pass self._save_player(player) self._rate_set(sender, "创建门派") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 门派已创建:{name}", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10) return True, "创建门派" async def _cmd_clan_join(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: name = content.strip() if not name: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "命令格式:加入门派 名称", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "命令格式错误" player = self._get_player_with_cache(sender, roomid or "") if not player: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" cid = None if self.xdb: cid = self.xdb.get_clan_id(roomid or "", name) if not cid: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "门派不存在", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "门派不存在" player["clan_id"] = int(cid) if isinstance(cid, str) else cid if self.xdb: try: self.xdb.update_player_fields(sender, {"clan_id": player["clan_id"]}) self.redis_db.invalidate_player(sender, player.get("group_id", "")) except Exception: pass self._save_player(player) self._rate_set(sender, "加入门派") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, f"✅ 已加入门派:{name}", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10) return True, "加入门派" async def _cmd_clan_exit(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: player = self._get_player_with_cache(sender, roomid or "") if not player: client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 5) return False, "未注册" player["clan_id"] = None if self.xdb: try: self.xdb.update_player_fields(sender, {"clan_id": None}) self.redis_db.invalidate_player(sender, player.get("group_id", "")) except Exception: pass self._save_player(player) self._rate_set(sender, "退出门派") client_msg_id, create_time, new_msg_id = await bot.send_text_message(roomid or sender, "✅ 已退出门派", sender) if self.revoke: self.revoke.add_message_to_revoke((roomid if roomid else sender), client_msg_id, create_time, new_msg_id, 10) return True, "退出门派"