diff --git a/db/xiuxian_db.py b/db/xiuxian_db.py new file mode 100644 index 0000000..39a7b2c --- /dev/null +++ b/db/xiuxian_db.py @@ -0,0 +1,393 @@ +from typing import Optional, Dict, Any, List, Tuple +from datetime import datetime +import mysql.connector + +from db.connection import DBConnectionManager + + +class XiuxianDB: + """ + 修仙系统的 MariaDB 持久化访问层。 + + 设计目标: + - 与业务层采用 Cache-Aside 模式协同:业务写 DB 后应显式失效 Redis,再由读取路径回填最新缓存; + - 保持所有写入操作的原子性与一致性:必要处使用事务与行级锁(SELECT ... FOR UPDATE)。 + + 使用方式: + - 在系统启动时通过 DBConnectionManager 初始化 MySQL 连接池; + - 插件层调用本类方法进行玩家/物品/背包/门派的读写,配合 Redis 缓存失效; + - 所有方法失败返回 False/None,业务层自行处理提示与回退逻辑。 + + 安全与稳定性: + - 所有 SQL 使用参数化,避免 SQL 注入; + - 事务方法在异常时显式回滚; + - 仅对必要的行使用 FOR UPDATE,降低锁粒度与阻塞风险。 + """ + + def __init__(self, db_manager: DBConnectionManager): + """构造函数 + + Args: + db_manager: 全局数据库连接管理器,需已初始化 MySQL 连接池 + """ + self.db_manager = db_manager + self.pool = db_manager.mysql_pool + + def _conn(self): + """从连接池获取连接。业务层无需手动关闭,with 语法自动释放。""" + return self.pool.get_connection() + + def get_player(self, user_id: str) -> Optional[Dict[str, Any]]: + """读取玩家核心信息。 + + Args: + user_id: 平台用户ID + + Returns: + dict 或 None:玩家记录,字段包含玩家基础信息、资源与状态机字段 + """ + try: + with self._conn() as conn: + with conn.cursor(dictionary=True) as cur: + cur.execute( + "SELECT user_id, group_id, dao_name, realm, spirit_root, cultivation_points, spirit_stone, status, status_until, last_cultivate_time, clan_id FROM t_xiuxian_player WHERE user_id=%s", + (user_id,) + ) + return cur.fetchone() + except mysql.connector.Error: + return None + + def create_player(self, player: Dict[str, Any]) -> bool: + """创建玩家记录。 + + 注意:业务层创建成功后需删除 Redis 玩家缓存键以便下次读取回填最新数据。 + """ + try: + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute( + "INSERT INTO t_xiuxian_player (user_id, group_id, dao_name, realm, spirit_root, cultivation_points, spirit_stone, status, status_until, last_cultivate_time, clan_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", + ( + player.get("user_id"), + player.get("group_id"), + player.get("dao_name"), + player.get("realm"), + player.get("spirit_root"), + int(player.get("cultivation_points", 0)), + int(player.get("spirit_stone", 0)), + player.get("status"), + player.get("status_until"), + player.get("last_cultivate_time"), + player.get("clan_id") + ) + ) + conn.commit() + return True + except mysql.connector.Error: + return False + + def update_player_fields(self, user_id: str, fields: Dict[str, Any]) -> bool: + """增量更新玩家字段。 + + Args: + user_id: 玩家ID + fields: 需更新的字段字典,如 {"status": "Injured", "status_until": datetime} + + Returns: + bool: 成功/失败 + + 说明: + - 动态拼接 SET 子句,值统一参数化; + - 业务层应在成功后失效 Redis 玩家缓存。 + """ + if not fields: + return True + cols = [] + vals = [] + for k, v in fields.items(): + cols.append(f"{k}=%s") + vals.append(v) + vals.append(user_id) + sql = f"UPDATE t_xiuxian_player SET {', '.join(cols)} WHERE user_id=%s" + try: + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute(sql, tuple(vals)) + conn.commit() + return True + except mysql.connector.Error: + return False + + def adjust_stone(self, user_id: str, delta: int) -> bool: + """调整玩家灵石(增量)。""" + try: + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute( + "UPDATE t_xiuxian_player SET spirit_stone = spirit_stone + %s WHERE user_id=%s", + (int(delta), user_id) + ) + conn.commit() + return True + except mysql.connector.Error: + return False + + def adjust_cultivation(self, user_id: str, delta: int) -> bool: + """调整玩家修为(增量)。""" + try: + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute( + "UPDATE t_xiuxian_player SET cultivation_points = cultivation_points + %s WHERE user_id=%s", + (int(delta), user_id) + ) + conn.commit() + return True + except mysql.connector.Error: + return False + + def get_item_id(self, name: str) -> Optional[int]: + """获取物品ID(若不存在返回 None)。""" + try: + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute("SELECT item_id FROM t_xiuxian_item WHERE name=%s", (name,)) + row = cur.fetchone() + return int(row[0]) if row else None + except mysql.connector.Error: + return None + + def ensure_item(self, name: str, type_: str, description: str = "") -> Optional[int]: + """确保物品存在,不存在则创建并返回其ID。""" + iid = self.get_item_id(name) + if iid: + return iid + try: + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute( + "INSERT INTO t_xiuxian_item (name, type, description) VALUES (%s,%s,%s)", + (name, type_, description) + ) + conn.commit() + return self.get_item_id(name) + except mysql.connector.Error: + return None + + def get_inventory(self, user_id: str) -> List[Dict[str, Any]]: + """读取玩家背包的物品名称、类型与数量。""" + try: + with self._conn() as conn: + with conn.cursor(dictionary=True) as cur: + cur.execute( + "SELECT i.name, i.type, inv.quantity FROM t_xiuxian_inventory inv JOIN t_xiuxian_item i ON inv.item_id=i.item_id WHERE inv.user_id=%s", + (user_id,) + ) + return cur.fetchall() or [] + except mysql.connector.Error: + return [] + + def add_item(self, user_id: str, item_name: str, item_type: str, qty: int) -> bool: + """向玩家背包增加物品数量(不存在则创建记录)。""" + iid = self.ensure_item(item_name, item_type) + if not iid: + return False + try: + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute( + "SELECT id, quantity FROM t_xiuxian_inventory WHERE user_id=%s AND item_id=%s", + (user_id, iid) + ) + row = cur.fetchone() + if row: + cur.execute( + "UPDATE t_xiuxian_inventory SET quantity=%s WHERE id=%s", + (int(row[1]) + int(qty), int(row[0])) + ) + else: + cur.execute( + "INSERT INTO t_xiuxian_inventory (user_id, item_id, quantity) VALUES (%s,%s,%s)", + (user_id, iid, int(qty)) + ) + conn.commit() + return True + except mysql.connector.Error: + return False + + def remove_item(self, user_id: str, item_name: str, qty: int) -> bool: + """从玩家背包扣减物品数量(数量减至0则删除记录)。""" + iid = self.get_item_id(item_name) + if not iid: + return False + try: + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute( + "SELECT id, quantity FROM t_xiuxian_inventory WHERE user_id=%s AND item_id=%s", + (user_id, iid) + ) + row = cur.fetchone() + if not row: + return False + new_qty = int(row[1]) - int(qty) + if new_qty < 0: + return False + if new_qty == 0: + cur.execute("DELETE FROM t_xiuxian_inventory WHERE id=%s", (int(row[0]),)) + else: + cur.execute("UPDATE t_xiuxian_inventory SET quantity=%s WHERE id=%s", (new_qty, int(row[0]))) + conn.commit() + return True + except mysql.connector.Error: + return False + + def transfer_item(self, from_user: str, to_user: str, item_name: str, qty: int) -> bool: + """在两个玩家之间转移物品(事务 + 行级锁)。 + + 流程: + 1) 对出方(from_user)背包记录加锁读取并扣减; + 2) 对入方(to_user)背包记录加锁读取并增加或插入; + 3) 成功则提交,异常则回滚。 + """ + iid = self.get_item_id(item_name) + if not iid: + return False + try: + with self._conn() as conn: + try: + with conn.cursor() as cur: + cur.execute("START TRANSACTION") + # 扣减出方库存(行级锁) + cur.execute( + "SELECT id, quantity FROM t_xiuxian_inventory WHERE user_id=%s AND item_id=%s FOR UPDATE", + (from_user, iid) + ) + row = cur.fetchone() + if not row or int(row[1]) < int(qty) or qty <= 0: + cur.execute("ROLLBACK") + return False + new_qty = int(row[1]) - int(qty) + if new_qty == 0: + cur.execute("DELETE FROM t_xiuxian_inventory WHERE id=%s", (int(row[0]),)) + else: + cur.execute("UPDATE t_xiuxian_inventory SET quantity=%s WHERE id=%s", (new_qty, int(row[0]))) + # 增加入方库存(行级锁) + cur.execute( + "SELECT id, quantity FROM t_xiuxian_inventory WHERE user_id=%s AND item_id=%s FOR UPDATE", + (to_user, iid) + ) + row2 = cur.fetchone() + if row2: + cur.execute("UPDATE t_xiuxian_inventory SET quantity=%s WHERE id=%s", (int(row2[1]) + int(qty), int(row2[0]))) + else: + cur.execute("INSERT INTO t_xiuxian_inventory (user_id, item_id, quantity) VALUES (%s,%s,%s)", (to_user, iid, int(qty))) + cur.execute("COMMIT") + return True + except mysql.connector.Error: + try: + conn.rollback() + except Exception: + pass + return False + except mysql.connector.Error: + return False + + def create_clan(self, clan_name: str, group_id: str, leader_user_id: str) -> Optional[int]: + """创建门派(同群名唯一),返回 clan_id。若已存在返回现有ID。""" + try: + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute("SELECT clan_id FROM t_xiuxian_clan WHERE group_id=%s AND clan_name=%s", (group_id, clan_name)) + row = cur.fetchone() + if row: + return int(row[0]) + cur.execute( + "INSERT INTO t_xiuxian_clan (clan_name, group_id, leader_user_id) VALUES (%s,%s,%s)", + (clan_name, group_id, leader_user_id) + ) + conn.commit() + cur.execute("SELECT LAST_INSERT_ID()") + rid = cur.fetchone() + return int(rid[0]) if rid else None + except mysql.connector.Error: + return None + + def get_clan_id(self, group_id: str, clan_name: str) -> Optional[int]: + """根据群ID和门派名查询 clan_id。""" + try: + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute("SELECT clan_id FROM t_xiuxian_clan WHERE group_id=%s AND clan_name=%s", (group_id, clan_name)) + row = cur.fetchone() + return int(row[0]) if row else None + except mysql.connector.Error: + return None + + def init_schema(self) -> bool: + """初始化修仙相关表结构(若不存在则创建)。""" + try: + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + CREATE TABLE IF NOT EXISTS t_xiuxian_clan ( + clan_id BIGINT NOT NULL AUTO_INCREMENT COMMENT '门派ID', + clan_name VARCHAR(100) NOT NULL COMMENT '门派名称', + group_id VARCHAR(100) NOT NULL COMMENT '所属群ID', + leader_user_id VARCHAR(100) NOT NULL COMMENT '掌门ID', + PRIMARY KEY (clan_id), + UNIQUE KEY uk_group_clan_name (group_id, clan_name) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='门派表'; + """ + ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS t_xiuxian_player ( + user_id VARCHAR(100) NOT NULL COMMENT '平台用户ID', + group_id VARCHAR(100) NOT NULL COMMENT '主要所在群ID', + dao_name VARCHAR(100) NOT NULL COMMENT '道号', + realm VARCHAR(50) DEFAULT '凡人' COMMENT '境界', + spirit_root VARCHAR(50) DEFAULT '凡灵根' COMMENT '灵根天赋', + clan_id BIGINT DEFAULT NULL COMMENT '所属门派ID', + cultivation_points BIGINT DEFAULT 0 COMMENT '修为', + spirit_stone BIGINT DEFAULT 0 COMMENT '灵石', + status VARCHAR(20) DEFAULT 'Idle' COMMENT '玩家状态', + status_until DATETIME DEFAULT NULL COMMENT '状态到期时间', + last_cultivate_time DATETIME DEFAULT NULL COMMENT '上次闭关开始时间', + PRIMARY KEY (user_id), + KEY idx_clan_id (clan_id), + KEY idx_realm (realm), + CONSTRAINT fk_clan_id FOREIGN KEY (clan_id) REFERENCES t_xiuxian_clan(clan_id) ON DELETE SET NULL + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='玩家核心数据表'; + """ + ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS t_xiuxian_item ( + item_id INT NOT NULL AUTO_INCREMENT COMMENT '物品ID', + name VARCHAR(100) NOT NULL COMMENT '物品名称', + type VARCHAR(50) NOT NULL COMMENT '物品类型', + description TEXT COMMENT '物品描述', + PRIMARY KEY (item_id), + UNIQUE KEY uk_name (name) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='物品定义表'; + """ + ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS t_xiuxian_inventory ( + id BIGINT NOT NULL AUTO_INCREMENT COMMENT '背包条目ID', + user_id VARCHAR(100) NOT NULL COMMENT '玩家ID', + item_id INT NOT NULL COMMENT '物品ID', + quantity INT NOT NULL DEFAULT 0 COMMENT '数量', + PRIMARY KEY (id), + UNIQUE KEY uk_user_item (user_id, item_id), + CONSTRAINT fk_inv_user FOREIGN KEY (user_id) REFERENCES t_xiuxian_player(user_id) ON DELETE CASCADE, + CONSTRAINT fk_inv_item FOREIGN KEY (item_id) REFERENCES t_xiuxian_item(item_id) ON DELETE CASCADE + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='玩家背包表'; + """ + ) + return True + except mysql.connector.Error: + return False \ No newline at end of file diff --git a/plugins/point_trade/config.toml b/plugins/point_trade/config.toml index 41e5225..525718b 100644 --- a/plugins/point_trade/config.toml +++ b/plugins/point_trade/config.toml @@ -1,6 +1,6 @@ [PointTrade] enable = true -command = ["积分交易", "积分转账", "转账积分", "积分赠送", "赠送积分", "积分转移", "转移积分", "送积分", "积分送人", "送人积分", "积分赠予", "赠予", "我的积分", "积分排行", "打劫","保释"] +command = ["积分交易", "积分转账", "转账积分", "积分赠送", "赠送积分", "积分转移", "转移积分", "送积分", "积分送人", "送人积分", "积分赠予", "我的积分", "积分排行", "打劫","保释"] command-format = """ 🔄积分交易指令: 积分转账 积分数 @用户 diff --git a/plugins/xiuxian/README.md b/plugins/xiuxian/README.md new file mode 100644 index 0000000..364b84b --- /dev/null +++ b/plugins/xiuxian/README.md @@ -0,0 +1,373 @@ +好的,我将把我们讨论过的**所有细节**,包括灵根类型、双轨制突破、所有保护机制、数据库设计、Redis 方案和防封策略,全部汇总成一份详尽的、可直接用于开发的 Markdown 研发文档。 + +----- + +# 🚀 (终稿) 群聊文字修仙Bot - 综合研发文档 + +## 1\. 核心设计原则 + +本项目旨在实现一个功能完整、可玩性高、同时严格遵守平台规则的文字修仙 Bot。 + +1. **简化交互 (防封/防骚扰):** 杜绝一切“问答式”或“多步骤”指令。所有指令都必须是原子操作(一次性完成)。通过强制冷却 (Cooldown) 和清晰的玩家状态机来防止刷屏和恶意PVP。 +2. **高性能:** 核心热数据(玩家状态、排行榜)必须由 Redis 管理,数据库 (MariaDB) 仅作为持久化存储。 +3. **策略深度:** 玩家有明确的“策略分支”,例如【丹药 vs 强行】突破,【PVE vs PVP】资源获取,【灵根天赋】带来的个体差异。 +好的,道友。修行之路漫漫,境界划分乃是天道之基石。 + +这里为你拟定一份详尽的、以“每阶10层”为基础的修炼级别体系。此体系平衡了前期“凡人”到“修士”的过渡,以及中后期“成仙”的漫长道路。 + +--- + +### 凡人篇 (Mortal Phase) + +* **凡人 (Mortal)** + * (无层级之分,尚未感应灵气) + +### 人仙篇 (Human Immortal Path) + +#### 第 1 境:练气 (Lianqi / Qi Refining) +* *描述:* 感应灵气,引气入体,淬炼肉身,开辟丹田。此为修行之始。 +* *层级:* **练气 1 层 ~ 练气 10 层 (大圆满)** +* *瓶颈:* 10层大圆满后,需冲击 **筑基** 瓶颈。 + +#### 第 2 境:筑基 (Zhuji / Foundation Establishment) +* *描述:* 灵气化液,在丹田内筑下道基。神识诞生,寿命大涨。 +* *层级:* **筑基 1 层 ~ 筑基 10 层 (大圆满)** +* *瓶颈:* 10层大圆满后,需碎丹田道基,凝结 **金丹**。 + +#### 第 3 境:金丹 (Jindan / Golden Core) +* *描述:* 灵液归一,结成金丹。丹成无悔,自此踏入修仙坦途,可御空飞行。 +* *层级:* **金丹 1 层 ~ 金丹 10 层 (大圆满)** +* *瓶颈:* 10层大圆满后,需碎丹,使丹中“真灵”化为 **元婴**。 + +#### 第 4 境:元婴 (Yuanying / Nascent Soul) +* *描述:* 金丹破碎,元婴出世。元婴为修士“第二性命”,可神魂出窍,夺舍重生。 +* *层级:* **元婴 1 层 ~ 元婴 10 层 (大圆满)** +* *瓶颈:* 10层大圆满后,需元婴与神魂合一,领悟天地法则,是为 **化神**。 + +#### 第 5 境:化神 (Huashang / Spirit Transformation) +* *描述:* 神游太虚,领悟法则。元神合一,开始真正掌控天地灵气,法力无边。 +* *层级:* **化神 1 层 ~ 化神 10 层 (大圆满)** +* *瓶颈:* 10层大圆满后,需元神与肉身彻底融合,达 **合体** 之境。 + +--- + +### 地仙篇 (Earthly Immortal Path) + +*(后续境界通常更为艰难,可作为游戏的后期内容)* + +#### 第 6 境:合体 (Heti / Integration) +* *描述:* 元神与肉身完美融合,不分彼此。举手投足引动天地之力,肉身不朽。 +* *层级:* **合体 1 层 ~ 合体 10 层 (大圆满)** + +#### 第 7 境:大乘 (Dacheng / Mahayana) +* *描述:* 法则大成,法力无边,已是人界巅峰。开始为飞升仙界做最后准备,积累底蕴。 +* *层级:* **大乘 1 层 ~ 大乘 10 层 (大圆满)** + +#### 第 8 境:渡劫 (Dujie / Tribulation) +* *描述:* 积累圆满,引动天劫(雷劫、心魔劫等)。此为成仙最后一道门槛,九死一生。 +* *层级:* **渡劫 1 层 ~ 渡劫 10 层 (大圆满)** + *(注:此处 10 层可理解为承受“十重天劫”,或积累底蕴的十个阶段)* + +### 真仙 (True Immortal) +* *描述:* 成功渡过天劫,飞升上界。 +* *(游戏终点或新篇章起点)* + +## 2\. 🛠️ 技术栈 + +| 类别 | 技术 | 备注 | +| :--- | :--- | :--- | +| 语言 | **Python 3.10+** | 异步 (asyncio) | +| Bot框架 | **NoneBot2** / **python-telegram-bot** | | +| 数据库 | **MariaDB 10.5+** | (InnoDB, utf8mb4) | +| ORM | **SQLAlchemy 2.0 (Async)** | | +| 缓存/限流 | **Redis** | 性能与防封的核心 | + +## 3\. 💾 数据库设计 (MariaDB) + +### 表 1: `t_xiuxian_player` (玩家核心表) + +```sql +CREATE TABLE t_xiuxian_player ( + -- 核心ID + user_id VARCHAR(100) NOT NULL COMMENT '平台用户ID', + group_id VARCHAR(100) NOT NULL COMMENT '主要所在群ID', + + -- 基础信息 + dao_name VARCHAR(100) NOT NULL COMMENT '道号', + realm VARCHAR(50) DEFAULT '凡人' COMMENT '境界 (如: 炼气1层, 筑基2层)', + spirit_root VARCHAR(50) DEFAULT '凡灵根' COMMENT '灵根天赋', + clan_id BIGINT DEFAULT NULL COMMENT '所属门派ID (外键)', + + -- 资源 + cultivation_points BIGINT DEFAULT 0 COMMENT '修为', + spirit_stone BIGINT DEFAULT 0 COMMENT '灵石 (对接你的积分系统)', + + -- 状态机 + status VARCHAR(20) DEFAULT 'Idle' COMMENT '玩家状态: Idle, Cultivating, Unstable_Qi, Injured', + status_until DATETIME DEFAULT NULL COMMENT '状态到期时间', + last_cultivate_time DATETIME DEFAULT NULL COMMENT '上次闭关开始时间', + + PRIMARY KEY (user_id), + KEY idx_clan_id (clan_id), + KEY idx_realm (realm), + CONSTRAINT fk_clan_id FOREIGN KEY (clan_id) REFERENCES t_xiuxian_clan(clan_id) ON DELETE SET NULL +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='玩家核心数据表'; +``` + +### 表 2: `t_xiuxian_clan` (门派表) + +```sql +CREATE TABLE t_xiuxian_clan ( + clan_id BIGINT NOT NULL AUTO_INCREMENT COMMENT '门派ID', + clan_name VARCHAR(100) NOT NULL COMMENT '门派名称', + group_id VARCHAR(100) NOT NULL COMMENT '所属群ID', + leader_user_id VARCHAR(100) NOT NULL COMMENT '掌门ID', + + PRIMARY KEY (clan_id), + UNIQUE KEY uk_group_clan_name (group_id, clan_name) COMMENT '同一群内门派名唯一' +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='门派表'; +``` + +### 表 3: `t_xiuxian_item` (物品表 - 静态) + +```sql +CREATE TABLE t_xiuxian_item ( + item_id INT NOT NULL AUTO_INCREMENT COMMENT '物品ID', + name VARCHAR(100) NOT NULL COMMENT '物品名称', + type VARCHAR(50) NOT NULL COMMENT '物品类型 (丹药, 材料)', + description TEXT COMMENT '物品描述', + + PRIMARY KEY (item_id), + UNIQUE KEY uk_name (name) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='物品定义表 (静态数据)'; +``` + +*注:开发初期,此表内容可硬编码在 `Config` 中。* + +### 表 4: `t_xiuxian_inventory` (玩家背包表) + +```sql +CREATE TABLE t_xiuxian_inventory ( + id BIGINT NOT NULL AUTO_INCREMENT COMMENT '背包条目ID', + user_id VARCHAR(100) NOT NULL COMMENT '玩家ID (外键)', + item_id INT NOT NULL COMMENT '物品ID (外键)', + quantity INT NOT NULL DEFAULT 0 COMMENT '数量', + + PRIMARY KEY (id), + UNIQUE KEY uk_user_item (user_id, item_id) COMMENT '同一玩家同种物品只有一条记录', + CONSTRAINT fk_inv_user FOREIGN KEY (user_id) REFERENCES t_xiuxian_player(user_id) ON DELETE CASCADE, + CONSTRAINT fk_inv_item FOREIGN KEY (item_id) REFERENCES t_xiuxian_item(item_id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='玩家背包表'; +``` + +----- + +## 4\. ⚙️ 核心系统配置 (Config) + +这是游戏平衡性的核心,**必须**以配置文件 (如 `config.py`) 存在,**严禁**硬编码在逻辑中。 + +### 4.1. 灵根配置 (`SPIRIT_ROOT_CONFIG`) + +用于 `/注册修仙` 和 `/出关`。 + +```python +# 灵根名称: (出现权重, 修为加成乘数) +SPIRIT_ROOT_CONFIG = { + "废灵根": (40, 0.8), # 40% 概率抽到, 修为效率 0.8x + "凡灵根": (30, 1.0), # 30% 概率, 基础效率 1.0x + "真灵根": (15, 1.2), # 15% 概率, 效率 1.2x + "异灵根": (10, 1.5), # 10% 概率, 效率 1.5x + "天灵根": (5, 2.0), # 5% 概率, 效率 2.0x +} + +# 基础修为速率 (每小时) +BASE_CULTIVATION_RATE = 100 +``` + +### 4.2. 突破配置 (`BREAKTHROUGH_CONFIG`) + +用于 `/突破` 和 `/强行突破`。这是**双轨制**设计的核心。 + +```python +# Key 是玩家当前境界 +BREAKTHROUGH_CONFIG = { + "炼气10层": { + "path_pill": { + "name": "丹药突破", + "required_item_id": 101, # 物品ID (筑基丹) + "required_item_name": "筑基丹", + "cultivation_cost": 5000, # 消耗修为 (低) + "base_success_rate": 0.4, # 成功率 (高) + "target_realm": "筑基1层" # 成功后的境界 + }, + "path_hard": { + "name": "强行突破", + "cultivation_cost": 20000, # 消耗修为 (非常高) + "base_success_rate": 0.1, # 成功率 (非常低) + "target_realm": "筑基2层" # 成功后的境界 (奖励更高!) + } + }, + "筑基10层": { + "path_pill": { + "name": "丹药突破", + "required_item_id": 102, # 物品ID (金元丹) + "required_item_name": "金元丹", + "cultivation_cost": 50000, + "base_success_rate": 0.2, + "target_realm": "金丹1层" + }, + "path_hard": { + "name": "强行突破", + "cultivation_cost": 200000, + "base_success_rate": 0.05, + "target_realm": "金丹2层" + } + } + # ... 更多境界 +} +``` + +## 5\. 🚀 核心系统设计 + +### 5.1. Redis 集成 (性能 & 防封) + +1. **玩家数据缓存 (Cache-Aside):** + + * **Key:** `xiuxian:cache:player:{user_id}` + * **Value:** `t_xiuxian_player` 数据的 JSON 序列化。 + * **读取:** 99% 的指令(如 `/我的状态`, `/劫掠` 检查)**必须**先读 Redis。 + * **写入:** 所有修改 `Player` 数据的操作(如 `/出关`),在写入 **MariaDB** 成功后,**必须** `DEL` 对应的 Redis 缓存键 (`...player:{user_id}`),实现“写后失效”。 + +2. **指令冷却 (Rate Limiting) - 防封核心:** + + * **Key:** `xiuxian:rate_limit:user:{user_id}:cmd:{command_name}` + * **逻辑:** 在执行**任何**指令前,检查此 Key。如果存在,静默失败或回复“操作过于频繁”。如果不存在,执行指令,并 `SET` 此 Key,设置 `EXPIRE`。 + * *( cooldown 时间见下方指令清单)* + +3. **排行榜 (Leaderboard):** + + * **Key:** `xiuxian:zset:leaderboard:realm` (Sorted Set) + * **逻辑:** 玩家 `/突破` 成功时,更新其 `Score` (可用境界等级量化)。 + +### 5.2. 玩家状态机 (防骚扰核心) + +这是 `t_xiuxian_player.status` 字段的流转设计,用于平衡 PVP 和 PVE。 + +| 状态 | 描述 | 能否被劫掠? | 能否 `/闭关`? | +| :--- | :--- | :--- | :--- | +| **`Idle`** (空闲) | 默认状态。 | ✅ **是** (危险) | ✅ **是** | +| **`Cultivating`** (闭关中)| `/闭关` 后进入。 | ❌ **否** (安全) | ❌ 否 | +| **`Unstable_Qi`** (气息不稳)| `/出关` 后进入,持续15分钟。 | ✅ **是** (强制暴露) | ❌ **否** (强制CD) | +| **`Injured`** (受伤中) | 被 `/劫掠` 成功后进入,持续1小时。| ❌ **否** (PVP保护) | ✅ **是** | + +**状态自动流转 (重要):** +所有指令在执行前,都必须调用一个 `check_status_update(user_id)` 函数。 +此函数检查玩家 `status` 是否为 `Unstable_Qi` 或 `Injured`,并检查 `status_until` 是否已过期。如果已过期,自动将其 `status` 改回 `Idle` (并更新DB和缓存)。 + +### 5.3. 玩家PVE循环 (成长) + + * **`/注册修仙 [道号]`** + 1. 检查 `user_id` 是否已存在。 + 2. 根据 `SPIRIT_ROOT_CONFIG` 的**权重**,为玩家随机抽取一个 `spirit_root`。 + 3. `INSERT` 到 `t_xiuxian_player`。 + 4. 回复:"一道灵光注入你的体内...经检测,你的灵根为:**【天灵根】**!" + * **`/闭关`** + 1. 调用 `check_status_update()`。 + 2. 检查 `status` 必须是 `Idle` 或 `Injured`。 + 3. `UPDATE Player SET status='Cultivating', last_cultivate_time=now()`。 + 4. 回复:"你已进入闭关,修行期间无法被劫掠。" + * **`/出关`** + 1. 检查 `status` 必须是 `Cultivating`。 + 2. **[平衡点1]** `duration = min(now() - last_cultivate_time, 8 hours)` (8小时收益上限)。 + 3. **[平衡点2]** `rate = BASE_CULTIVATION_RATE * spirit_root_multiplier` (灵根加成)。 + 4. `total_gain = duration * rate`。 + 5. **[平衡点3]** `UPDATE Player SET cultivation_points += total_gain, status='Unstable_Qi', status_until=now()+15 minutes`。 + 6. 回复:"你结束了闭关,获得了 X 点修为。你刚出关气息不稳,15分钟内无法再次闭关。" + * **`/聚灵 [数量]`** + 1. 检查 `spirit_stone` (灵石) 是否 \> `[数量]`。 + 2. `UPDATE Player SET spirit_stone -= [数量], cultivation_points += ([数量] * 10)` (比例可调)。 + * **`/突破` (丹药路径)** + 1. 从 `BREAKTHROUGH_CONFIG` 获取当前 `realm` 的 `path_pill` 配置。 + 2. 检查 `cultivation_points` 和 `背包` (是否有 `required_item_id`)。 + 3. (不满足) 回复:"丹药突破需:[筑基丹]x1, 修为x5000。你条件不足。" + 4. (满足) 扣除资源。Roll点 (按 `base_success_rate`)。 + 5. (成功) `UPDATE Player SET realm = '筑基1层'`。回复:"【突破成功!】..." + 6. (失败) 回复:"【突破失败!】..." + * **`/强行突破` (无丹药路径)** + 1. 从 `BREAKTHROUGH_CONFIG` 获取 `path_hard` 配置。 + 2. 检查 `cultivation_points`。 + 3. (不满足) 回复:"强行突破需:修为x20000。你条件不足。" + 4. (满足) 扣除修为。Roll点 (按 `base_success_rate`)。 + 5. (成功) `UPDATE Player SET realm = '筑基2层'`。回复:"【天道酬勤!】你强行突破成功,直达 **筑基2层**!" + 6. (失败) 回复:"【突破失败!】灵气反噬,你身受重伤!" (可选惩罚:`status` -\> `Injured`) + +### 5.4. 经济与PVP/GvG循环 + + * **`/签到`** + * (对接你的积分系统) 奖励 `spirit_stone`。 + * **`/坊市`, `/购买`, `/背包`** + * 常规的 `Item` 和 `Inventory` 表的 `CRUD` 操作。 + * **`/劫掠 @某人`** + 1. (调用 `check_status_update()` 检查自己和目标的状态)。 + 2. **[保护1]** 检查目标 `realm` (例如:必须 "筑基" 以上,新手保护)。 + 3. **[保护2]** 检查目标 `status` (必须是 `Idle` 或 `Unstable_Qi`)。 + 4. **[保护3]** 检查 `clan_id` (双方 `clan_id` 相同且不为 `NULL` 则禁止同门相残)。 + 5. (通过) 判定:根据双方 `realm` 计算成功率。 + 6. (成功) 转移 `spirit_stone`。`UPDATE Target SET status='Injured', status_until=now()+1 hour`。 + * **`/赠与 @某人 [数量]` (灵石)** + 1. **限制:** 双方 `clan_id` 必须相同且不为 `NULL`。 + 2. **操作:** 转移 `t_xiuxian_player.spirit_stone`。 + * **`/赠送 @某人 [物品] [数量]` (丹药/物品)** + 1. **限制:** 无限制(自由交易)。 + 2. **操作:** 转移 `t_xiuxian_inventory` 记录 (**必须使用数据库事务**)。 + * **`/创建门派`, `/加入门派`, `/退出门派`** + * 常规的 `Clan` 表和 `Player.clan_id` 的 `CRUD` 操作。 + +----- + +## 6\. 📋 最终指令清单 (附防封冷却) + +| 类别 | 指令 | 冷却 (建议) | 备注 | +| :--- | :--- | :--- | :--- | +| **核心** | `/注册修仙 [道号]` | 1次/天 | 随机灵根 | +| | `/我的状态` | 5 秒 | 显示核心数据 (含灵根) | +| **修行** | `/闭关` | 10 秒 | `status` -\> `Cultivating` (安全) | +| | `/出关` | 10 秒 | 结算修为 (8h上限)。`status` -\> `Unstable_Qi` (危险) | +| | `/聚灵 [数量]` | 30 秒 | (消耗灵石) 灵石 -\> 修为 | +| **突破** | `/突破` | 60 秒 | **[简化]** 丹药路径 (稳妥) | +| | `/强行突破` | 60 秒 | **[简化]** 无丹药路径 (高风险高回报) | +| **经济** | `/签到` | 1次/天 | 获得灵石 | +| | `/坊市` | 10 秒 | 查看商店 | +| | `/购买 [物品] [数量]`| 5 秒 | 灵石 -\> 物品 | +| | `/背包` | 5 秒 | 查看物品 | +| **社交** | `/劫掠 @某人` | 30 秒 | 抢夺灵石 (PVP) | +| | `/赠与 @某人 [数量]` | 10 秒 | **(同门)** 赠送灵石 | +| | `/赠送 @某人 [物品] [数量]`| 10 秒 | **(自由)** 赠送物品/丹药 | +| **门派** | `/创建门派 [名称]` | 1次/天 | | +| | `/加入门派 [名称]` | 1次/周 | | +| | `/退出门派` | 1次/周 | | +| **信息** | `/排行榜` | 60 秒 | 从 Redis 读取 | + +----- + +## 7\. 🛣️ 建议的开发路线图 + +1. **Step 1: 基础建设 (V0.1)** + * 搭建数据库 (4个表)。 + * 创建 `Config` 文件 (灵根, 突破)。 + * 实现 Redis 缓存 `Player` 数据的 `getter/setter`。 +2. **Step 2: PVE 核心循环 (V0.5)** + * 实现 `/注册修仙`, `/我的状态`。 + * 实现 `/闭关`, `/出关` (包含灵根加成 和 8小时上限)。 + * 实现 `check_status_update()` 函数和 `Unstable_Qi` 状态流转。 +3. **Step 3: 经济与突破 (V0.8)** + * 实现 `/坊市`, `/购买`, `/背包`。 + * 实现 `/聚灵`。 + * 实现 `/突破` 和 `/强行突破` (双轨制)。 +4. **Step 4: PVP 与社交 (V1.0)** + * 对接 `/签到` 和 `/劫掠` (包含所有保护机制)。 + * 实现 `/创建门派`, `/加入门派`。 + * 实现 `/赠与` (灵石) 和 `/赠送` (物品)。 +5. **Step 5: 优化 (V1.1)** + * 实现所有指令的 Redis 冷却 (防封)。 + * 实现 `/排行榜` (Redis Sorted Set)。 \ No newline at end of file diff --git a/plugins/xiuxian/__init__.py b/plugins/xiuxian/__init__.py new file mode 100644 index 0000000..1bda271 --- /dev/null +++ b/plugins/xiuxian/__init__.py @@ -0,0 +1,4 @@ +from .main import XiuxianPlugin + +def get_plugin(): + return XiuxianPlugin() \ No newline at end of file diff --git a/plugins/xiuxian/config.toml b/plugins/xiuxian/config.toml new file mode 100644 index 0000000..b9e9177 --- /dev/null +++ b/plugins/xiuxian/config.toml @@ -0,0 +1,122 @@ +# 修仙插件配置文件 +# 说明:定义指令集、状态机时长、限流策略、修为速率、坊市商品与突破参数。 +[Xiuxian] +enable = true + # 指令集合(无需前缀),按核心/经济/信息/社交/门派分类 + command = ["修仙帮助", "注册修仙", "我的状态", "闭关", "出关", "聚灵", "排行榜", "修仙签到", "坊市", "购买", "乾坤袋", "突破", "强行突破", "劫掠", "赠与", "赠送", "创建门派", "加入门派", "退出门派"] + # 用法提示:命令格式错误时的反馈文本 + command-format = """ +📜修仙指令: +注册修仙 道号 +我的状态 +闭关 +出关 +聚灵 数量 +排行榜 +修仙签到 +坊市 +购买 +乾坤袋 +突破 - 需要丹药 +强行突破 - 不需要丹药 +劫掠 - 抢劫其他门派弟子 +赠与 - 赠送灵石 +赠送 - 赠送丹药物品 + +""" + +[Xiuxian.status] +# 状态机参数:气息不稳/受伤保护时长(分钟),闭关收益计算上限(小时) +[Xiuxian.status] +unstable_qi_minutes = 15 +injured_minutes = 60 +max_cultivate_hours = 8 + +[Xiuxian.rate_limit] +# Redis 限流窗口(秒):用于防骚扰与防封,每个用户每条指令独立冷却 +[Xiuxian.rate_limit] +status_seconds = 3 +bag_seconds = 3 +rob_seconds = 30 +gather_seconds = 30 +break_seconds = 60 +force_break_seconds = 60 +inout_seconds = 5 +signin_seconds = 86400 +shop_seconds = 10 +buy_seconds = 5 +gift_seconds = 10 + +[Xiuxian.cultivation] +# 修为结算参数:基础速率(每小时),灵根乘数(名称:倍率) +[Xiuxian.cultivation] +base_rate_per_hour = 100 +spirit_roots = [ + "废灵根:0.8", + "凡灵根:1.0", + "地灵根:1.2", + "天灵根:2.0" +] + +[Xiuxian.shop] +# 坊市商品:名称:类型:价格(价格单位为灵石)。类型用于展示与分类,不影响逻辑。 +[Xiuxian.shop] +items = [ + "筑基丹:丹药:500", + "回气丹:丹药:200", + "聚灵符:材料:100" +] + +[Xiuxian.breakthrough] +# 双轨制突破参数:丹药路径与强行路径 +# pill_threshold:丹药突破修为需求;pill_item:消耗的丹药名;pill_success:成功率 +# force_threshold:强行突破修为需求;force_success:成功率;force_next_realm:成功后的境界 +[Xiuxian.breakthrough] +pill_threshold = 5000 +pill_item = "筑基丹" +pill_success = 0.4 +force_threshold = 20000 +force_success = 0.1 +force_next_realm = "筑基2层" + +[Xiuxian.leaderboard] +# 排行榜键(Redis Sorted Set)。默认使用修为分数,也支持按境界分值映射。 +key = "xiuxian:zset:leaderboard:cultivation" +realm_key = "xiuxian:zset:leaderboard:realm" + +[Xiuxian.realm_score] +# 境界分值映射:用于 realm 排行榜(基础值 + 层数),真仙为固定高分 +stages = [ + "凡人:0", + "炼气:10", + "筑基:20", + "金丹:30", + "元婴:40", + "化神:50", + "合体:60", + "大乘:70", + "渡劫:80", + "真仙:100" +] + +[Xiuxian.layer_up] +# 每提升一层的修为阈值(简化规则):按当前境界前缀应用固定阈值 +thresholds = [ + "炼气:1000", + "筑基:5000", + "金丹:50000", + "元婴:200000", + "化神:1000000", + "合体:5000000", + "大乘:10000000", + "渡劫:50000000" +] + +[Xiuxian.breakthrough_stages] +# 瓶颈突破定义:当前境界(10层):路径:修为消耗:成功率:目标境界 +paths = [ + "炼气10层:pill:5000:0.4:筑基1层", + "炼气10层:hard:20000:0.1:筑基2层", + "筑基10层:pill:50000:0.2:金丹1层", + "筑基10层:hard:200000:0.05:金丹2层" +] \ No newline at end of file diff --git a/plugins/xiuxian/main.py b/plugins/xiuxian/main.py new file mode 100644 index 0000000..14b6783 --- /dev/null +++ b/plugins/xiuxian/main.py @@ -0,0 +1,990 @@ +""" +群聊文字修仙插件 + +说明: +- 原子指令 + 状态机 + 限流 + Cache-Aside +- 读优先 Redis;写 DB 后删除缓存,由读取路径回填 +- 群频次限制与用户冷却共同防骚扰/防封 +""" +import json +import time +import random +from datetime import datetime, timedelta +from typing import Dict, Any, List, Optional, Tuple + +from loguru import logger + +from base.plugin_common.message_plugin_interface import MessagePluginInterface +from base.plugin_common.plugin_interface import PluginStatus +from utils.decorator.plugin_decorators import plugin_stats_decorator +from utils.decorator.points_decorator import plugin_points_cost +from utils.decorator.rate_limit_decorator import group_feature_rate_limit +from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager +from wechat_ipad import WechatAPIClient +from db.connection import DBConnectionManager +from db.xiuxian_db import XiuxianDB +from datetime import timezone + +class XiuxianRedisDB: + """修仙插件的 Redis 访问封装:玩家缓存、限流键与排行榜。""" + def __init__(self, db_manager: DBConnectionManager): + self.db_manager = db_manager + self.player_prefix = "xiuxian:cache:player:" + self.leaderboard_key = None + self.leaderboard_realm_key = None + + def set_leaderboard_key(self, key: str): + self.leaderboard_key = key + + def set_realm_leaderboard_key(self, key: str): + self.leaderboard_realm_key = key + + def get_redis(self): + return self.db_manager.get_redis_connection() + + def get_player(self, user_id: str) -> Optional[Dict[str, Any]]: + try: + with self.get_redis() as r: + data = r.get(f"{self.player_prefix}{user_id}") + if data: + if isinstance(data, bytes): + data = data.decode("utf-8") + return json.loads(data) + return None + except Exception as e: + logger.error(f"读取玩家数据失败: {e}") + return None + + def save_player(self, player: Dict[str, Any]) -> bool: + try: + with self.get_redis() as r: + r.set(f"{self.player_prefix}{player['user_id']}", json.dumps(player, ensure_ascii=False)) + return True + except Exception as e: + logger.error(f"保存玩家数据失败: {e}") + return False + + def invalidate_player(self, user_id: str): + try: + with self.get_redis() as r: + r.delete(f"{self.player_prefix}{user_id}") + except Exception as e: + logger.error(f"失效玩家缓存失败: {e}") + + def set_rate_limit(self, user_id: str, cmd: str, seconds: int) -> bool: + try: + with self.get_redis() as r: + key = f"xiuxian:rate_limit:user:{user_id}:cmd:{cmd}" + r.setex(key, seconds, "1") + return True + except Exception as e: + logger.error(f"设置限流失败: {e}") + return False + + def check_rate_limited(self, user_id: str, cmd: str) -> bool: + try: + with self.get_redis() as r: + key = f"xiuxian:rate_limit:user:{user_id}:cmd:{cmd}" + return r.exists(key) == 1 + except Exception as e: + logger.error(f"检查限流失败: {e}") + return False + + def leaderboard_add(self, user_id: str, score: float): + try: + if not self.leaderboard_key: + return + with self.get_redis() as r: + r.zadd(self.leaderboard_key, {user_id: score}) + except Exception as e: + logger.error(f"更新排行榜失败: {e}") + + def leaderboard_top(self, top_n: int = 10) -> List[Tuple[str, float]]: + try: + if not self.leaderboard_key: + return [] + with self.get_redis() as r: + res = r.zrevrange(self.leaderboard_key, 0, top_n - 1, withscores=True) + return [(uid if isinstance(uid, str) else uid.decode("utf-8"), score) for uid, score in res] + except Exception as e: + logger.error(f"读取排行榜失败: {e}") + return [] + + def leaderboard_realm_add(self, user_id: str, score: float): + try: + if not self.leaderboard_realm_key: + return + with self.get_redis() as r: + r.zadd(self.leaderboard_realm_key, {user_id: score}) + except Exception as e: + logger.error(f"更新境界排行榜失败: {e}") + + def leaderboard_realm_top(self, top_n: int = 10) -> List[Tuple[str, float]]: + try: + if not self.leaderboard_realm_key: + return [] + with self.get_redis() as r: + res = r.zrevrange(self.leaderboard_realm_key, 0, top_n - 1, withscores=True) + return [(uid if isinstance(uid, str) else uid.decode("utf-8"), score) for uid, score in res] + except Exception as e: + logger.error(f"读取境界排行榜失败: {e}") + return [] + + +class XiuxianPlugin(MessagePluginInterface): + """修仙主插件:指令分发、业务执行与 DB/Redis 协作。""" + FEATURE_KEY = "XIUXIAN" + FEATURE_DESCRIPTION = "🧙‍♂️ 文字修仙 [注册修仙|我的状态|闭关|出关|聚灵|排行榜]" + + @property + def name(self) -> str: + return "群聊文字修仙" + + @property + def version(self) -> str: + return "0.1.0" + + @property + def description(self) -> str: + return "基于Redis的简化修仙玩法,含闭关与状态机" + + @property + def author(self) -> str: + return "AI助手" + + @property + def command_prefix(self) -> Optional[str]: + return "" + + @property + def commands(self) -> List[str]: + return self._commands + + @property + def feature_key(self) -> Optional[str]: + return self.FEATURE_KEY + + @property + def feature_description(self) -> Optional[str]: + return self.FEATURE_DESCRIPTION + + def __init__(self): + super().__init__() + self.feature = self.register_feature() + self.redis_db: Optional[XiuxianRedisDB] = None + self.xdb: Optional[XiuxianDB] = None + + def initialize(self, context: Dict[str, Any]) -> bool: + """初始化插件:加载配置,接入 DB 与 Redis,并注册功能权限。""" + self.LOG = logger + self.LOG.info(f"正在初始化 {self.name} 插件...") + + self.event_system = context.get("event_system") + self.db_manager = DBConnectionManager.get_instance() + if self.db_manager: + self.redis_db = XiuxianRedisDB(self.db_manager) + # 初始化持久化层(如连接池可用) + if self.db_manager.mysql_pool: + self.xdb = XiuxianDB(self.db_manager) + try: + self.xdb.init_schema() + except Exception as e: + self.LOG.error(f"修仙表结构初始化失败: {e}") + + cfg = self._config.get("Xiuxian", {}) + self._commands = cfg.get("command", ["注册修仙", "我的状态", "闭关", "出关", "聚灵", "排行榜"]) + self.command_format = cfg.get("command-format", "注册修仙 道号 | 我的状态 | 闭关 | 出关 | 聚灵 数量 | 排行榜") + self.enable = cfg.get("enable", True) + + status_cfg = cfg.get("status", {}) + self.unstable_qi_minutes = status_cfg.get("unstable_qi_minutes", 15) + self.injured_minutes = status_cfg.get("injured_minutes", 60) + self.max_cultivate_hours = status_cfg.get("max_cultivate_hours", 8) + + rate_cfg = cfg.get("rate_limit", {}) + self.seconds_rl = { + "我的状态": rate_cfg.get("status_seconds", 3), + "闭关": rate_cfg.get("inout_seconds", 5), + "出关": rate_cfg.get("inout_seconds", 5), + "聚灵": rate_cfg.get("gather_seconds", 30), + "排行榜": rate_cfg.get("break_seconds", 60), + "注册修仙": 5, + "签到": rate_cfg.get("signin_seconds", 86400), + "坊市": rate_cfg.get("shop_seconds", 10), + "购买": rate_cfg.get("buy_seconds", 5), + "乾坤袋": rate_cfg.get("bag_seconds", 3), + "突破": rate_cfg.get("break_seconds", 60), + "强行突破": rate_cfg.get("force_break_seconds", 60), + "劫掠": rate_cfg.get("rob_seconds", 30), + "赠与": rate_cfg.get("gift_seconds", 10), + "赠送": rate_cfg.get("gift_seconds", 10), + "创建门派": 86400, + "加入门派": 604800, + "退出门派": 604800, + } + + cult_cfg = cfg.get("cultivation", {}) + self.base_rate_per_hour = cult_cfg.get("base_rate_per_hour", 100) + self.spirit_roots_cfg = cult_cfg.get("spirit_roots", ["凡灵根:1.0", "天灵根:2.0"]) + self.spirit_roots = [] + for item in self.spirit_roots_cfg: + try: + name, mult = item.split(":") + self.spirit_roots.append((name, float(mult))) + except Exception: + pass + + lb_cfg = cfg.get("leaderboard", {}) + leaderboard_key = lb_cfg.get("key", "xiuxian:zset:leaderboard:cultivation") + realm_lb_key = lb_cfg.get("realm_key", "xiuxian:zset:leaderboard:realm") + if self.redis_db: + self.redis_db.set_leaderboard_key(leaderboard_key) + self.redis_db.set_realm_leaderboard_key(realm_lb_key) + + shop_cfg = cfg.get("shop", {}) + self.shop_items = [] + for s in shop_cfg.get("items", []): + try: + n, t, p = s.split(":") + self.shop_items.append({"name": n, "type": t, "price": int(p)}) + except Exception: + pass + + bt_cfg = cfg.get("breakthrough", {}) + self.bt_pill_threshold = int(bt_cfg.get("pill_threshold", 5000)) + self.bt_pill_item = bt_cfg.get("pill_item", "筑基丹") + self.bt_pill_success = float(bt_cfg.get("pill_success", 0.4)) + self.bt_force_threshold = int(bt_cfg.get("force_threshold", 20000)) + self.bt_force_success = float(bt_cfg.get("force_success", 0.1)) + self.bt_force_next = bt_cfg.get("force_next_realm", "筑基2层") + + # 解析境界分值与层级提升阈值 + realm_score_cfg = self._config.get("Xiuxian", {}).get("realm_score", {}) + self.realm_score_map = {} + for s in realm_score_cfg.get("stages", []): + try: + n, v = s.split(":") + self.realm_score_map[n] = int(v) + except Exception: + pass + + layer_up_cfg = self._config.get("Xiuxian", {}).get("layer_up", {}) + self.layer_threshold_map = {} + for s in layer_up_cfg.get("thresholds", []): + try: + n, v = s.split(":") + self.layer_threshold_map[n] = int(v) + except Exception: + pass + + # 解析突破阶段配置 + stage_cfg = self._config.get("Xiuxian", {}).get("breakthrough_stages", {}) + self.break_config = {} + for p in stage_cfg.get("paths", []): + try: + cur, path, cost, rate, target = p.split(":") + if cur not in self.break_config: + self.break_config[cur] = {} + self.break_config[cur][path] = { + "cost": int(cost), + "rate": float(rate), + "target": target + } + except Exception: + pass + + self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") + return True + + def start(self) -> bool: + self.LOG.info(f"[{self.name}] 插件已启动") + self.status = PluginStatus.RUNNING + return True + + def stop(self) -> bool: + self.LOG.info(f"[{self.name}] 插件已停止") + self.status = PluginStatus.STOPPED + return True + + def can_process(self, message: Dict[str, Any]) -> bool: + if not self.enable: + return False + content = str(message.get("content", "")).strip() + for cmd in self._commands: + if content.startswith(cmd): + return True + return False + + @plugin_stats_decorator(plugin_name="群聊文字修仙") + @group_feature_rate_limit(max_per_minute=6, feature_key=FEATURE_KEY) + async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + content = str(message.get("content", "")).strip() + sender = message.get("sender") + roomid = message.get("roomid", "") + gbm: GroupBotManager = message.get("gbm") + bot: WechatAPIClient = message.get("bot") + + if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: + return False, "没有权限" + + cmd = None + for c in self._commands: + if content.startswith(c): + cmd = c + content = content[len(c):].strip() + break + if not cmd: + return False, "不匹配的命令" + + if not self.redis_db: + await bot.send_text_message((roomid if roomid else sender), "❌系统未初始化Redis", sender) + return False, "Redis未初始化" + + if self.redis_db.check_rate_limited(sender, cmd): + await bot.send_text_message((roomid if roomid else sender), "⚠️ 操作过于频繁,请稍候再试", sender) + return False, "限流" + + if cmd == "注册修仙": + return await self._cmd_register(bot, sender, roomid, content) + if cmd in ("修仙帮助"): + return await self._cmd_help(bot, sender, roomid) + if cmd == "我的状态": + return await self._cmd_status(bot, sender, roomid) + if cmd == "闭关": + return await self._cmd_cultivate(bot, sender, roomid) + if cmd == "出关": + return await self._cmd_finish_cultivate(bot, sender, roomid) + if cmd == "聚灵": + return await self._cmd_gather(bot, sender, roomid, content) + if cmd == "排行榜": + return await self._cmd_leaderboard(bot, sender, roomid) + if cmd == "修仙签到": + return await self._cmd_signin(bot, sender, roomid) + if cmd == "坊市": + return await self._cmd_shop(bot, sender, roomid) + if cmd == "购买": + return await self._cmd_buy(bot, sender, roomid, content) + if cmd == "乾坤袋": + return await self._cmd_bag(bot, sender, roomid) + if cmd == "突破": + return await self._cmd_breakthrough(bot, sender, roomid) + if cmd == "强行突破": + return await self._cmd_force_breakthrough(bot, sender, roomid) + if cmd == "劫掠": + return await self._cmd_rob(bot, sender, roomid, content) + if cmd == "赠与": + return await self._cmd_give_stone(bot, sender, roomid, content) + if cmd == "赠送": + return await self._cmd_give_item(bot, sender, roomid, content) + if cmd == "创建门派": + return await self._cmd_clan_create(bot, sender, roomid, content) + if cmd == "加入门派": + return await self._cmd_clan_join(bot, sender, roomid, content) + if cmd == "退出门派": + return await self._cmd_clan_exit(bot, sender, roomid) + + return False, "未知命令" + + def _rate_set(self, user_id: str, cmd: str): + """设置用户维度 Redis 冷却键,用于防骚扰与防封。""" + seconds = self.seconds_rl.get(cmd, 3) + self.redis_db.set_rate_limit(user_id, cmd, seconds) + + def _check_status_update(self, player: Dict[str, Any]) -> Dict[str, Any]: + """状态机自动流转:过期的 Unstable_Qi/Injured 恢复为 Idle。""" + now = datetime.now(timezone.utc) + status = player.get("status", "Idle") + until_str = player.get("status_until") + until = None + if until_str: + try: + until = datetime.fromisoformat(until_str) + except Exception: + until = None + if until and until.tzinfo is None: + until = until.replace(tzinfo=timezone.utc) + if status in ("Unstable_Qi", "Injured") and until and now >= until: + player["status"] = "Idle" + player["status_until"] = None + self.redis_db.save_player(player) + return player + + async def _cmd_register(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: + dao_name = content.strip() + if not dao_name: + await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", sender) + return False, "命令格式错误" + player = self.redis_db.get_player(sender) + if player: + await bot.send_text_message((roomid if roomid else sender), "⚠️ 已注册,无需重复注册", sender) + self._rate_set(sender, "注册修仙") + return True, "已注册" + root_name, mult = random.choice(self.spirit_roots) if self.spirit_roots else ("凡灵根", 1.0) + player = { + "user_id": sender, + "group_id": roomid or "", + "dao_name": dao_name, + "realm": "炼气1层", + "spirit_root": f"{root_name}", + "spirit_root_mult": mult, + "cultivation_points": 0, + "spirit_stone": 0, + "status": "Idle", + "status_until": None, + "last_cultivate_time": None, + } + self.redis_db.save_player(player) + self._rate_set(sender, "注册修仙") + # 初始化境界排行榜分值 + self.redis_db.leaderboard_realm_add(sender, float(self._realm_score(player["realm"])) ) + await bot.send_text_message((roomid if roomid else sender), f"✅ 注册成功,道号:{dao_name}\n灵根:{root_name}", sender) + return True, "注册成功" + + async def _cmd_help(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: + lines = ["📜 修仙帮助"] + lines.append("用法提示:") + lines.append(self.command_format.strip()) + lines.append("可用指令:") + lines.append(" | ".join(self._commands)) + lines.append("常用参数格式:") + lines.append("注册修仙 道号") + lines.append("聚灵 数量") + lines.append("购买 物品 数量") + lines.append("赠与 目标wxid 数量") + lines.append("赠送 目标wxid 物品 数量") + lines.append("劫掠 目标wxid") + lines.append("创建门派 名称") + lines.append("加入门派 名称") + msg = "\n".join(lines) + await bot.send_text_message((roomid if roomid else sender), msg, sender) + self._rate_set(sender, "帮助") + return True, "帮助" + + async def _cmd_status(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: + player = self.redis_db.get_player(sender) + if not player: + await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender) + return False, "未注册" + player = self._check_status_update(player) + msg = ( + f"🧙‍♂️ 我的状态\n" + f"道号:{player.get('dao_name')}\n" + f"境界:{player.get('realm')}\n" + f"灵根:{player.get('spirit_root')}\n" + f"修为:{player.get('cultivation_points')}\n" + f"灵石:{player.get('spirit_stone')}\n" + f"状态:{player.get('status')}\n" + ) + await bot.send_text_message((roomid if roomid else sender), msg, sender) + self._rate_set(sender, "我的状态") + return True, "状态展示" + + async def _cmd_cultivate(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: + player = self.redis_db.get_player(sender) + if not player: + await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender) + return False, "未注册" + player = self._check_status_update(player) + status = player.get("status", "Idle") + if status == "Cultivating": + await bot.send_text_message((roomid if roomid else sender), "⚠️ 已在闭关中", sender) + return False, "重复闭关" + if status not in ("Idle", "Injured"): + await bot.send_text_message((roomid if roomid else sender), f"当前状态[{status}]不可闭关", sender) + return False, "状态不可闭关" + player["status"] = "Cultivating" + player["last_cultivate_time"] = datetime.now(timezone.utc).isoformat() + self.redis_db.save_player(player) + self._rate_set(sender, "闭关") + await bot.send_text_message((roomid if roomid else sender), "✅ 已进入闭关,期间安全不可被劫掠", sender) + return True, "闭关成功" + + async def _cmd_finish_cultivate(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: + player = self.redis_db.get_player(sender) + if not player: + await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender) + return False, "未注册" + player = self._check_status_update(player) + if player.get("status") != "Cultivating": + await bot.send_text_message((roomid if roomid else sender), "⚠️ 非闭关状态,无需出关", sender) + return False, "非闭关" + start_iso = player.get("last_cultivate_time") + + try: + start = datetime.fromisoformat(start_iso) if start_iso else datetime.now(timezone.utc) + except Exception: + start = datetime.now(timezone.utc) + if start.tzinfo is None: + start = start.replace(tzinfo=timezone.utc) + now = datetime.now(timezone.utc) + duration_hours = (now - start).total_seconds() / 3600.0 + duration_hours = max(0.0, min(duration_hours, float(self.max_cultivate_hours))) + rate = self.base_rate_per_hour * float(player.get("spirit_root_mult", 1.0)) + gain = int(duration_hours * rate) + player["cultivation_points"] = int(player.get("cultivation_points", 0)) + gain + player["status"] = "Unstable_Qi" + player["status_until"] = (now + timedelta(minutes=int(self.unstable_qi_minutes))).isoformat() + player["last_cultivate_time"] = None + self.redis_db.save_player(player) + # 自动层级提升(不跨瓶颈) + self._auto_layer_up(sender, player) + self.redis_db.leaderboard_add(sender, float(player["cultivation_points"])) + self._rate_set(sender, "出关") + await bot.send_text_message((roomid if roomid else sender), f"✅ 出关成功,获得修为:{gain}({duration_hours:.1f}小时)\n当前修为:{player['cultivation_points']}\n状态:气息不稳 {self.unstable_qi_minutes}分钟", sender) + return True, "出关结算" + + async def _cmd_gather(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: + player = self.redis_db.get_player(sender) + if not player: + await bot.send_text_message((roomid if roomid else sender), "未注册,请先发送:注册修仙 道号", sender) + return False, "未注册" + try: + qty = int(content.strip()) + except Exception: + await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", sender) + return False, "命令格式错误" + stones = int(player.get("spirit_stone", 0)) + if qty <= 0 or stones < qty: + await bot.send_text_message((roomid if roomid else sender), "⚠️ 灵石不足或数量不合法", sender) + return False, "灵石不足" + player["spirit_stone"] = stones - qty + player["cultivation_points"] = int(player.get("cultivation_points", 0)) + qty * 10 + self.redis_db.save_player(player) + # 自动层级提升(不跨瓶颈) + self._auto_layer_up(sender, player) + self.redis_db.leaderboard_add(sender, float(player["cultivation_points"])) + self._rate_set(sender, "聚灵") + await bot.send_text_message((roomid if roomid else sender), f"✅ 聚灵成功,消耗灵石{qty},获得修为{qty * 10}", sender) + return True, "聚灵成功" + + async def _cmd_leaderboard(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: + top = self.redis_db.leaderboard_top(10) + lines = ["🏆 修为排行榜 Top10"] + rank = 1 + for uid, score in top: + mark = "你" if uid == sender else "" + lines.append(f"{rank}. {uid} - {int(score)} {mark}") + rank += 1 + await bot.send_text_message((roomid if roomid else sender), "\n".join(lines), sender) + self._rate_set(sender, "排行榜") + return True, "排行榜" + + def _get_player(self, user_id: str) -> Optional[Dict[str, Any]]: + return self.redis_db.get_player(user_id) + + def _save_player(self, player: Dict[str, Any]): + self.redis_db.save_player(player) + + def _get_player_with_cache(self, user_id: str) -> Optional[Dict[str, Any]]: + """读取玩家:优先 Redis,未命中则读 DB 并回填缓存。""" + p = self.redis_db.get_player(user_id) + if p: + return p + if self.xdb: + dbp = self.xdb.get_player(user_id) + if dbp: + dbp.setdefault("spirit_root_mult", 1.0) + dbp.setdefault("inventory", {}) + self.redis_db.save_player(dbp) + return dbp + return None + + def _parse_realm(self, realm: str) -> Tuple[str, Optional[int]]: + try: + if "层" in realm: + prefix, layer_str = realm.split("层")[0], realm.split(" ")[-1] if " " in realm else None + # expected format: 前缀N层,例如 炼气1层/筑基10层 + import re + m = re.match(r"^(.*?)?(\d+)层$", realm) + if m: + prefix = m.group(1) + layer = int(m.group(2)) + return prefix, layer + except Exception: + pass + return realm, None + + def _realm_score(self, realm: str) -> int: + prefix, layer = self._parse_realm(realm) + base = self.realm_score_map.get(prefix, 0) + return base + (layer or 0) + + def _set_realm(self, user_id: str, player: Dict[str, Any], new_realm: str): + player["realm"] = new_realm + if self.xdb: + try: + self.xdb.update_player_fields(user_id, {"realm": new_realm}) + self.redis_db.invalidate_player(user_id) + except Exception: + pass + self._save_player(player) + # 更新境界排行榜 + self.redis_db.leaderboard_realm_add(user_id, float(self._realm_score(new_realm))) + + def _auto_layer_up(self, user_id: str, player: Dict[str, Any]): + prefix, layer = self._parse_realm(player.get("realm", "凡人")) + if layer is None or layer >= 10: + return + threshold = self.layer_threshold_map.get(prefix) + if not threshold: + return + pts = int(player.get("cultivation_points", 0)) + new_layer = max(layer, min(10, pts // threshold)) + if new_layer != layer: + self._set_realm(user_id, player, f"{prefix}{new_layer}层") + + async def _cmd_signin(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: + player = self._get_player(sender) + if not player: + await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) + return False, "未注册" + if self.redis_db.check_rate_limited(sender, "签到"): + await bot.send_text_message(roomid or sender, "今日已签到,请明日再来", sender) + return False, "已签到" + reward = 50 + player["spirit_stone"] = int(player.get("spirit_stone", 0)) + reward + self._save_player(player) + self._rate_set(sender, "签到") + await bot.send_text_message(roomid or sender, f"✅ 签到成功,获得灵石{reward}", sender) + return True, "签到成功" + + async def _cmd_shop(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: + lines = ["🛒 坊市商品"] + for item in self.shop_items: + lines.append(f"{item['name']} [{item['type']}] - {item['price']}灵石") + await bot.send_text_message(roomid or sender, "\n".join(lines), sender) + self._rate_set(sender, "坊市") + return True, "坊市" + + async def _cmd_buy(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: + parts = content.split() + if len(parts) < 2: + await bot.send_text_message(roomid or sender, f"❌命令格式错误!\n购买 物品 数量", sender) + return False, "命令格式错误" + item_name = parts[0] + try: + qty = int(parts[1]) + except Exception: + await bot.send_text_message(roomid or sender, f"❌命令格式错误!\n购买 物品 数量", sender) + return False, "命令格式错误" + player = self._get_player(sender) + if not player: + await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) + return False, "未注册" + item = next((i for i in self.shop_items if i["name"] == item_name), None) + if not item: + await bot.send_text_message(roomid or sender, "商品不存在", sender) + return False, "商品不存在" + total = item["price"] * qty + stones = int(player.get("spirit_stone", 0)) + if qty <= 0 or stones < total: + await bot.send_text_message(roomid or sender, "灵石不足", sender) + return False, "灵石不足" + player["spirit_stone"] = stones - total + inv = player.get("inventory") or {} + inv[item_name] = int(inv.get(item_name, 0)) + qty + player["inventory"] = inv + self._save_player(player) + self._rate_set(sender, "购买") + await bot.send_text_message(roomid or sender, f"✅ 购买成功,{item_name} × {qty}", sender) + return True, "购买成功" + + async def _cmd_bag(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: + player = self._get_player(sender) + if not player: + await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) + return False, "未注册" + inv = player.get("inventory") or {} + lines = ["🎒 背包"] + if not inv: + lines.append("空") + else: + for k, v in inv.items(): + lines.append(f"{k} × {v}") + await bot.send_text_message(roomid or sender, "\n".join(lines), sender) + self._rate_set(sender, "背包") + return True, "背包" + + async def _cmd_breakthrough(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: + player = self._get_player(sender) + if not player: + await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) + return False, "未注册" + points = int(player.get("cultivation_points", 0)) + inv = player.get("inventory") or {} + # 读取当前瓶颈配置 + cur_realm = player.get("realm", "炼气1层") + prefix, layer = self._parse_realm(cur_realm) + stage_key = f"{prefix}10层" + stage_conf = self.break_config.get(stage_key) + if not stage_conf: + await bot.send_text_message(roomid or sender, "当前境界未到瓶颈或未配置突破路径", sender) + return False, "未到瓶颈" + pill_conf = stage_conf.get("pill") + if not pill_conf: + await bot.send_text_message(roomid or sender, "丹药路径未配置", sender) + return False, "未配置" + if points < pill_conf["cost"]: + await bot.send_text_message(roomid or sender, "修为不足,无法突破", sender) + return False, "修为不足" + if inv.get(self.bt_pill_item, 0) <= 0: + await bot.send_text_message(roomid or sender, f"缺少丹药:{self.bt_pill_item}", sender) + return False, "缺少丹药" + inv[self.bt_pill_item] = inv.get(self.bt_pill_item, 0) - 1 + player["inventory"] = inv + player["cultivation_points"] = points - pill_conf["cost"] + if self.xdb: + try: + self.xdb.remove_item(sender, self.bt_pill_item, 1) + self.xdb.update_player_fields(sender, {"cultivation_points": player["cultivation_points"]}) + self.redis_db.invalidate_player(sender) + except Exception: + pass + roll = random.random() + if roll < pill_conf["rate"]: + # 成功,更新境界并排行榜 + self._set_realm(sender, player, pill_conf["target"]) + self.redis_db.leaderboard_add(sender, float(player["cultivation_points"])) + self._rate_set(sender, "突破") + await bot.send_text_message(roomid or sender, f"✅ 突破成功,晋升至{pill_conf['target']}", sender) + return True, "突破成功" + else: + self._save_player(player) + self._rate_set(sender, "突破") + await bot.send_text_message(roomid or sender, "❌ 突破失败", sender) + return False, "突破失败" + + async def _cmd_force_breakthrough(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: + player = self._get_player(sender) + if not player: + await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) + return False, "未注册" + points = int(player.get("cultivation_points", 0)) + # 读取当前瓶颈配置 + cur_realm = player.get("realm", "炼气1层") + prefix, layer = self._parse_realm(cur_realm) + stage_key = f"{prefix}10层" + stage_conf = self.break_config.get(stage_key) + if not stage_conf: + await bot.send_text_message(roomid or sender, "当前境界未到瓶颈或未配置突破路径", sender) + return False, "未到瓶颈" + hard_conf = stage_conf.get("hard") + if not hard_conf: + await bot.send_text_message(roomid or sender, "强行路径未配置", sender) + return False, "未配置" + if points < hard_conf["cost"]: + await bot.send_text_message(roomid or sender, "修为不足,无法强行突破", sender) + return False, "修为不足" + player["cultivation_points"] = points - hard_conf["cost"] + if self.xdb: + try: + self.xdb.update_player_fields(sender, {"cultivation_points": player["cultivation_points"]}) + self.redis_db.invalidate_player(sender) + except Exception: + pass + roll = random.random() + if roll < hard_conf["rate"]: + self._set_realm(sender, player, hard_conf["target"]) + self.redis_db.leaderboard_add(sender, float(player["cultivation_points"])) + self._rate_set(sender, "强行突破") + await bot.send_text_message(roomid or sender, f"✅ 强行突破成功,晋升至{hard_conf['target']}", sender) + return True, "强行突破成功" + else: + self._save_player(player) + self._rate_set(sender, "强行突破") + await bot.send_text_message(roomid or sender, "❌ 强行突破失败,灵气反噬!", sender) + return False, "强行突破失败" + + async def _cmd_rob(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: + target = content.strip().lstrip("@") + if not target: + await bot.send_text_message(roomid or sender, "命令格式:劫掠 目标wxid", sender) + return False, "命令格式错误" + if target == sender: + await bot.send_text_message(roomid or sender, "不可劫掠自己", sender) + return False, "非法目标" + attacker = self._get_player(sender) + defender = self._get_player(target) + if not attacker or not defender: + await bot.send_text_message(roomid or sender, "双方需已注册", sender) + return False, "未注册" + attacker = self._check_status_update(attacker) + defender = self._check_status_update(defender) + # 新手保护:不可劫掠炼气期 + def_prefix, _ = self._parse_realm(defender.get("realm", "炼气1层")) + if def_prefix == "炼气": + await bot.send_text_message(roomid or sender, "目标处于新手保护期(炼气),不可劫掠", sender) + return False, "新手保护" + if defender.get("status") in ("Cultivating", "Injured"): + await bot.send_text_message(roomid or sender, "目标处于保护或闭关中", sender) + return False, "目标保护" + if roomid and (attacker.get("group_id") != roomid or defender.get("group_id") != roomid): + await bot.send_text_message(roomid or sender, "仅限同群内劫掠", sender) + return False, "跨群" + if attacker.get("clan_id") and defender.get("clan_id") and attacker.get("clan_id") == defender.get("clan_id"): + await bot.send_text_message(roomid or sender, "不可劫掠同门", sender) + return False, "同门" + a_pts = int(attacker.get("cultivation_points", 0)) + d_pts = int(defender.get("cultivation_points", 0)) + base = 0.5 + if a_pts > d_pts: + base += 0.2 + elif a_pts < d_pts: + base -= 0.2 + roll = random.random() + if roll < base: + d_stones = int(defender.get("spirit_stone", 0)) + gain = max(0, int(d_stones * 0.2)) + defender["spirit_stone"] = d_stones - gain + attacker["spirit_stone"] = int(attacker.get("spirit_stone", 0)) + gain + defender["status"] = "Injured" + defender["status_until"] = (datetime.now(timezone.utc) + timedelta(minutes=int(self.injured_minutes))).isoformat() + self._save_player(defender) + self._save_player(attacker) + self._rate_set(sender, "劫掠") + await bot.send_text_message(roomid or sender, f"✅ 劫掠成功,获得灵石{gain}", sender) + if roomid: + await bot.send_text_message(roomid, f"{sender} 劫掠 {target} 成功,目标进入受伤保护", [target]) + return True, "劫掠成功" + else: + self._rate_set(sender, "劫掠") + await bot.send_text_message(roomid or sender, "❌ 劫掠失败", sender) + return False, "劫掠失败" + + async def _cmd_give_stone(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: + parts = content.strip().split() + if len(parts) < 2: + await bot.send_text_message(roomid or sender, "命令格式:赠与 目标wxid 数量", sender) + return False, "命令格式错误" + target = parts[0].lstrip("@") + try: + qty = int(parts[1]) + except Exception: + await bot.send_text_message(roomid or sender, "命令格式:赠与 目标wxid 数量", sender) + return False, "命令格式错误" + giver = self._get_player_with_cache(sender) + receiver = self._get_player_with_cache(target) + if not giver or not receiver: + await bot.send_text_message(roomid or sender, "双方需已注册", sender) + return False, "未注册" + if not giver.get("clan_id") or giver.get("clan_id") != receiver.get("clan_id"): + await bot.send_text_message(roomid or sender, "仅同门可赠与灵石", sender) + return False, "不同门" + stones = int(giver.get("spirit_stone", 0)) + if qty <= 0 or stones < qty: + await bot.send_text_message(roomid or sender, "灵石不足或数量不合法", sender) + return False, "灵石不足" + giver["spirit_stone"] = stones - qty + receiver["spirit_stone"] = int(receiver.get("spirit_stone", 0)) + qty + self._save_player(giver) + self._save_player(receiver) + self._rate_set(sender, "赠与") + await bot.send_text_message(roomid or sender, f"✅ 已向 {target} 赠与灵石 {qty}", sender) + return True, "赠与成功" + + async def _cmd_give_item(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: + parts = content.strip().split() + if len(parts) < 3: + await bot.send_text_message(roomid or sender, "命令格式:赠送 目标wxid 物品 数量", sender) + return False, "命令格式错误" + target = parts[0].lstrip("@") + item_name = parts[1] + try: + qty = int(parts[2]) + except Exception: + await bot.send_text_message(roomid or sender, "命令格式:赠送 目标wxid 物品 数量", sender) + return False, "命令格式错误" + giver = self._get_player(sender) + receiver = self._get_player(target) + if not giver or not receiver: + await bot.send_text_message(roomid or sender, "双方需已注册", sender) + return False, "未注册" + inv_g = giver.get("inventory") or {} + if inv_g.get(item_name, 0) < qty or qty <= 0: + await bot.send_text_message(roomid or sender, "物品不足或数量不合法", sender) + return False, "物品不足" + inv_r = receiver.get("inventory") or {} + inv_g[item_name] = inv_g.get(item_name, 0) - qty + inv_r[item_name] = int(inv_r.get(item_name, 0)) + qty + giver["inventory"] = inv_g + receiver["inventory"] = inv_r + if self.xdb: + ok = self.xdb.transfer_item(sender, target, item_name, qty) + if not ok: + await bot.send_text_message(roomid or sender, "物品转移失败", sender) + return False, "转移失败" + self.redis_db.invalidate_player(sender) + self.redis_db.invalidate_player(target) + self._save_player(giver) + self._save_player(receiver) + self._rate_set(sender, "赠送") + await bot.send_text_message(roomid or sender, f"✅ 已向 {target} 赠送 {item_name} × {qty}", sender) + return True, "赠送成功" + + async def _cmd_clan_create(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: + name = content.strip() + if not name: + await bot.send_text_message(roomid or sender, "命令格式:创建门派 名称", sender) + return False, "命令格式错误" + player = self._get_player_with_cache(sender) + if not player: + await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) + return False, "未注册" + prefix, _ = self._parse_realm(player.get("realm", "炼气1层")) + allowed = {"元婴", "化神", "合体", "大乘", "渡劫", "真仙"} + if prefix not in allowed: + await bot.send_text_message(roomid or sender, "创建门派需达到元婴期及以上", sender) + return False, "境界不足" + clan_id = None + if self.xdb: + clan_id = self.xdb.create_clan(name, roomid or "", sender) + if clan_id is None: + await bot.send_text_message(roomid or sender, "门派已存在或创建失败", sender) + return False, "门派失败" + if player and clan_id is not None: + player["clan_id"] = int(clan_id) + if self.xdb: + self.xdb.update_player_fields(sender, {"clan_id": player["clan_id"]}) + self.redis_db.invalidate_player(sender) + self._save_player(player) + self._rate_set(sender, "创建门派") + await bot.send_text_message(roomid or sender, f"✅ 门派已创建:{name}", sender) + return True, "创建门派" + + async def _cmd_clan_join(self, bot: WechatAPIClient, sender: str, roomid: str, content: str) -> Tuple[bool, str]: + name = content.strip() + if not name: + await bot.send_text_message(roomid or sender, "命令格式:加入门派 名称", sender) + return False, "命令格式错误" + player = self._get_player_with_cache(sender) + if not player: + await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) + return False, "未注册" + cid = None + if self.xdb: + cid = self.xdb.get_clan_id(roomid or "", name) + if not cid: + await bot.send_text_message(roomid or sender, "门派不存在", sender) + return False, "门派不存在" + player["clan_id"] = int(cid) if isinstance(cid, str) else cid + if self.xdb: + self.xdb.update_player_fields(sender, {"clan_id": player["clan_id"]}) + self.redis_db.invalidate_player(sender) + self._save_player(player) + self._rate_set(sender, "加入门派") + await bot.send_text_message(roomid or sender, f"✅ 已加入门派:{name}", sender) + return True, "加入门派" + + async def _cmd_clan_exit(self, bot: WechatAPIClient, sender: str, roomid: str) -> Tuple[bool, str]: + player = self._get_player_with_cache(sender) + if not player: + await bot.send_text_message(roomid or sender, "未注册,请先发送:注册修仙 道号", sender) + return False, "未注册" + player["clan_id"] = None + if self.xdb: + self.xdb.update_player_fields(sender, {"clan_id": None}) + self.redis_db.invalidate_player(sender) + self._save_player(player) + self._rate_set(sender, "退出门派") + await bot.send_text_message(roomid or sender, "✅ 已退出门派", sender) + return True, "退出门派" \ No newline at end of file