394 lines
18 KiB
Python
394 lines
18 KiB
Python
from typing import Optional, Dict, Any, List, Tuple
|
||
from datetime import datetime
|
||
import mysql.connector
|
||
|
||
from db.connection import DBConnectionManager
|
||
|
||
|
||
class XiuxianDB:
|
||
"""
|
||
修仙系统的 MariaDB 持久化访问层。
|
||
|
||
设计目标:
|
||
- 与业务层采用 Cache-Aside 模式协同:业务写 DB 后应显式失效 Redis,再由读取路径回填最新缓存;
|
||
- 保持所有写入操作的原子性与一致性:必要处使用事务与行级锁(SELECT ... FOR UPDATE)。
|
||
|
||
使用方式:
|
||
- 在系统启动时通过 DBConnectionManager 初始化 MySQL 连接池;
|
||
- 插件层调用本类方法进行玩家/物品/背包/门派的读写,配合 Redis 缓存失效;
|
||
- 所有方法失败返回 False/None,业务层自行处理提示与回退逻辑。
|
||
|
||
安全与稳定性:
|
||
- 所有 SQL 使用参数化,避免 SQL 注入;
|
||
- 事务方法在异常时显式回滚;
|
||
- 仅对必要的行使用 FOR UPDATE,降低锁粒度与阻塞风险。
|
||
"""
|
||
|
||
def __init__(self, db_manager: DBConnectionManager):
|
||
"""构造函数
|
||
|
||
Args:
|
||
db_manager: 全局数据库连接管理器,需已初始化 MySQL 连接池
|
||
"""
|
||
self.db_manager = db_manager
|
||
self.pool = db_manager.mysql_pool
|
||
|
||
def _conn(self):
|
||
"""从连接池获取连接。业务层无需手动关闭,with 语法自动释放。"""
|
||
return self.pool.get_connection()
|
||
|
||
def get_player(self, user_id: str, group_id: str) -> Optional[Dict[str, Any]]:
|
||
"""读取玩家核心信息。
|
||
|
||
Args:
|
||
user_id: 平台用户ID
|
||
|
||
Returns:
|
||
dict 或 None:玩家记录,字段包含玩家基础信息、资源与状态机字段
|
||
"""
|
||
try:
|
||
with self._conn() as conn:
|
||
with conn.cursor(dictionary=True) as cur:
|
||
cur.execute(
|
||
"SELECT user_id, group_id, dao_name, realm, spirit_root, cultivation_points, spirit_stone, status, status_until, last_cultivate_time, clan_id FROM t_xiuxian_player WHERE user_id=%s AND group_id=%s",
|
||
(user_id, group_id)
|
||
)
|
||
return cur.fetchone()
|
||
except mysql.connector.Error:
|
||
return None
|
||
|
||
def create_player(self, player: Dict[str, Any]) -> bool:
|
||
"""创建玩家记录。
|
||
|
||
注意:业务层创建成功后需删除 Redis 玩家缓存键以便下次读取回填最新数据。
|
||
"""
|
||
try:
|
||
with self._conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"INSERT INTO t_xiuxian_player (user_id, group_id, dao_name, realm, spirit_root, cultivation_points, spirit_stone, status, status_until, last_cultivate_time, clan_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
|
||
(
|
||
player.get("user_id"),
|
||
player.get("group_id"),
|
||
player.get("dao_name"),
|
||
player.get("realm"),
|
||
player.get("spirit_root"),
|
||
int(player.get("cultivation_points", 0)),
|
||
int(player.get("spirit_stone", 0)),
|
||
player.get("status"),
|
||
player.get("status_until"),
|
||
player.get("last_cultivate_time"),
|
||
player.get("clan_id")
|
||
)
|
||
)
|
||
conn.commit()
|
||
return True
|
||
except mysql.connector.Error:
|
||
return False
|
||
|
||
def update_player_fields(self, user_id: str, group_id: str, fields: Dict[str, Any]) -> bool:
|
||
"""增量更新玩家字段。
|
||
|
||
Args:
|
||
user_id: 玩家ID
|
||
fields: 需更新的字段字典,如 {"status": "Injured", "status_until": datetime}
|
||
|
||
Returns:
|
||
bool: 成功/失败
|
||
|
||
说明:
|
||
- 动态拼接 SET 子句,值统一参数化;
|
||
- 业务层应在成功后失效 Redis 玩家缓存。
|
||
"""
|
||
if not fields:
|
||
return True
|
||
cols = []
|
||
vals = []
|
||
for k, v in fields.items():
|
||
cols.append(f"{k}=%s")
|
||
vals.append(v)
|
||
vals.append(user_id)
|
||
vals.append(group_id)
|
||
sql = f"UPDATE t_xiuxian_player SET {', '.join(cols)} WHERE user_id=%s AND group_id=%s"
|
||
try:
|
||
with self._conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql, tuple(vals))
|
||
conn.commit()
|
||
return True
|
||
except mysql.connector.Error:
|
||
return False
|
||
|
||
def adjust_stone(self, user_id: str, delta: int) -> bool:
|
||
"""调整玩家灵石(增量)。"""
|
||
try:
|
||
with self._conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"UPDATE t_xiuxian_player SET spirit_stone = spirit_stone + %s WHERE user_id=%s",
|
||
(int(delta), user_id)
|
||
)
|
||
conn.commit()
|
||
return True
|
||
except mysql.connector.Error:
|
||
return False
|
||
|
||
def adjust_cultivation(self, user_id: str, delta: int) -> bool:
|
||
"""调整玩家修为(增量)。"""
|
||
try:
|
||
with self._conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"UPDATE t_xiuxian_player SET cultivation_points = cultivation_points + %s WHERE user_id=%s",
|
||
(int(delta), user_id)
|
||
)
|
||
conn.commit()
|
||
return True
|
||
except mysql.connector.Error:
|
||
return False
|
||
|
||
def get_item_id(self, name: str) -> Optional[int]:
|
||
"""获取物品ID(若不存在返回 None)。"""
|
||
try:
|
||
with self._conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT item_id FROM t_xiuxian_item WHERE name=%s", (name,))
|
||
row = cur.fetchone()
|
||
return int(row[0]) if row else None
|
||
except mysql.connector.Error:
|
||
return None
|
||
|
||
def ensure_item(self, name: str, type_: str, description: str = "") -> Optional[int]:
|
||
"""确保物品存在,不存在则创建并返回其ID。"""
|
||
iid = self.get_item_id(name)
|
||
if iid:
|
||
return iid
|
||
try:
|
||
with self._conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"INSERT INTO t_xiuxian_item (name, type, description) VALUES (%s,%s,%s)",
|
||
(name, type_, description)
|
||
)
|
||
conn.commit()
|
||
return self.get_item_id(name)
|
||
except mysql.connector.Error:
|
||
return None
|
||
|
||
def get_inventory(self, user_id: str) -> List[Dict[str, Any]]:
|
||
"""读取玩家背包的物品名称、类型与数量。"""
|
||
try:
|
||
with self._conn() as conn:
|
||
with conn.cursor(dictionary=True) as cur:
|
||
cur.execute(
|
||
"SELECT i.name, i.type, inv.quantity FROM t_xiuxian_inventory inv JOIN t_xiuxian_item i ON inv.item_id=i.item_id WHERE inv.user_id=%s",
|
||
(user_id,)
|
||
)
|
||
return cur.fetchall() or []
|
||
except mysql.connector.Error:
|
||
return []
|
||
|
||
def add_item(self, user_id: str, item_name: str, item_type: str, qty: int) -> bool:
|
||
"""向玩家背包增加物品数量(不存在则创建记录)。"""
|
||
iid = self.ensure_item(item_name, item_type)
|
||
if not iid:
|
||
return False
|
||
try:
|
||
with self._conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"SELECT id, quantity FROM t_xiuxian_inventory WHERE user_id=%s AND item_id=%s",
|
||
(user_id, iid)
|
||
)
|
||
row = cur.fetchone()
|
||
if row:
|
||
cur.execute(
|
||
"UPDATE t_xiuxian_inventory SET quantity=%s WHERE id=%s",
|
||
(int(row[1]) + int(qty), int(row[0]))
|
||
)
|
||
else:
|
||
cur.execute(
|
||
"INSERT INTO t_xiuxian_inventory (user_id, item_id, quantity) VALUES (%s,%s,%s)",
|
||
(user_id, iid, int(qty))
|
||
)
|
||
conn.commit()
|
||
return True
|
||
except mysql.connector.Error:
|
||
return False
|
||
|
||
def remove_item(self, user_id: str, item_name: str, qty: int) -> bool:
|
||
"""从玩家背包扣减物品数量(数量减至0则删除记录)。"""
|
||
iid = self.get_item_id(item_name)
|
||
if not iid:
|
||
return False
|
||
try:
|
||
with self._conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"SELECT id, quantity FROM t_xiuxian_inventory WHERE user_id=%s AND item_id=%s",
|
||
(user_id, iid)
|
||
)
|
||
row = cur.fetchone()
|
||
if not row:
|
||
return False
|
||
new_qty = int(row[1]) - int(qty)
|
||
if new_qty < 0:
|
||
return False
|
||
if new_qty == 0:
|
||
cur.execute("DELETE FROM t_xiuxian_inventory WHERE id=%s", (int(row[0]),))
|
||
else:
|
||
cur.execute("UPDATE t_xiuxian_inventory SET quantity=%s WHERE id=%s", (new_qty, int(row[0])))
|
||
conn.commit()
|
||
return True
|
||
except mysql.connector.Error:
|
||
return False
|
||
|
||
def transfer_item(self, from_user: str, to_user: str, item_name: str, qty: int) -> bool:
|
||
"""在两个玩家之间转移物品(事务 + 行级锁)。
|
||
|
||
流程:
|
||
1) 对出方(from_user)背包记录加锁读取并扣减;
|
||
2) 对入方(to_user)背包记录加锁读取并增加或插入;
|
||
3) 成功则提交,异常则回滚。
|
||
"""
|
||
iid = self.get_item_id(item_name)
|
||
if not iid:
|
||
return False
|
||
try:
|
||
with self._conn() as conn:
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("START TRANSACTION")
|
||
# 扣减出方库存(行级锁)
|
||
cur.execute(
|
||
"SELECT id, quantity FROM t_xiuxian_inventory WHERE user_id=%s AND item_id=%s FOR UPDATE",
|
||
(from_user, iid)
|
||
)
|
||
row = cur.fetchone()
|
||
if not row or int(row[1]) < int(qty) or qty <= 0:
|
||
cur.execute("ROLLBACK")
|
||
return False
|
||
new_qty = int(row[1]) - int(qty)
|
||
if new_qty == 0:
|
||
cur.execute("DELETE FROM t_xiuxian_inventory WHERE id=%s", (int(row[0]),))
|
||
else:
|
||
cur.execute("UPDATE t_xiuxian_inventory SET quantity=%s WHERE id=%s", (new_qty, int(row[0])))
|
||
# 增加入方库存(行级锁)
|
||
cur.execute(
|
||
"SELECT id, quantity FROM t_xiuxian_inventory WHERE user_id=%s AND item_id=%s FOR UPDATE",
|
||
(to_user, iid)
|
||
)
|
||
row2 = cur.fetchone()
|
||
if row2:
|
||
cur.execute("UPDATE t_xiuxian_inventory SET quantity=%s WHERE id=%s", (int(row2[1]) + int(qty), int(row2[0])))
|
||
else:
|
||
cur.execute("INSERT INTO t_xiuxian_inventory (user_id, item_id, quantity) VALUES (%s,%s,%s)", (to_user, iid, int(qty)))
|
||
cur.execute("COMMIT")
|
||
return True
|
||
except mysql.connector.Error:
|
||
try:
|
||
conn.rollback()
|
||
except Exception:
|
||
pass
|
||
return False
|
||
except mysql.connector.Error:
|
||
return False
|
||
|
||
def create_clan(self, clan_name: str, group_id: str, leader_user_id: str) -> Optional[int]:
|
||
"""创建门派(同群名唯一),返回 clan_id。若已存在返回现有ID。"""
|
||
try:
|
||
with self._conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT clan_id FROM t_xiuxian_clan WHERE group_id=%s AND clan_name=%s", (group_id, clan_name))
|
||
row = cur.fetchone()
|
||
if row:
|
||
return int(row[0])
|
||
cur.execute(
|
||
"INSERT INTO t_xiuxian_clan (clan_name, group_id, leader_user_id) VALUES (%s,%s,%s)",
|
||
(clan_name, group_id, leader_user_id)
|
||
)
|
||
conn.commit()
|
||
cur.execute("SELECT LAST_INSERT_ID()")
|
||
rid = cur.fetchone()
|
||
return int(rid[0]) if rid else None
|
||
except mysql.connector.Error:
|
||
return None
|
||
|
||
def get_clan_id(self, group_id: str, clan_name: str) -> Optional[int]:
|
||
"""根据群ID和门派名查询 clan_id。"""
|
||
try:
|
||
with self._conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT clan_id FROM t_xiuxian_clan WHERE group_id=%s AND clan_name=%s", (group_id, clan_name))
|
||
row = cur.fetchone()
|
||
return int(row[0]) if row else None
|
||
except mysql.connector.Error:
|
||
return None
|
||
|
||
def init_schema(self) -> bool:
|
||
"""初始化修仙相关表结构(若不存在则创建)。"""
|
||
try:
|
||
with self._conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS t_xiuxian_clan (
|
||
clan_id BIGINT NOT NULL AUTO_INCREMENT COMMENT '门派ID',
|
||
clan_name VARCHAR(100) NOT NULL COMMENT '门派名称',
|
||
group_id VARCHAR(100) NOT NULL COMMENT '所属群ID',
|
||
leader_user_id VARCHAR(100) NOT NULL COMMENT '掌门ID',
|
||
PRIMARY KEY (clan_id),
|
||
UNIQUE KEY uk_group_clan_name (group_id, clan_name)
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='门派表';
|
||
"""
|
||
)
|
||
cur.execute(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS t_xiuxian_player (
|
||
user_id VARCHAR(100) NOT NULL COMMENT '平台用户ID',
|
||
group_id VARCHAR(100) NOT NULL COMMENT '主要所在群ID',
|
||
dao_name VARCHAR(100) NOT NULL COMMENT '道号',
|
||
realm VARCHAR(50) DEFAULT '凡人' COMMENT '境界',
|
||
spirit_root VARCHAR(50) DEFAULT '凡灵根' COMMENT '灵根天赋',
|
||
clan_id BIGINT DEFAULT NULL COMMENT '所属门派ID',
|
||
cultivation_points BIGINT DEFAULT 0 COMMENT '修为',
|
||
spirit_stone BIGINT DEFAULT 0 COMMENT '灵石',
|
||
status VARCHAR(20) DEFAULT 'Idle' COMMENT '玩家状态',
|
||
status_until DATETIME DEFAULT NULL COMMENT '状态到期时间',
|
||
last_cultivate_time DATETIME DEFAULT NULL COMMENT '上次闭关开始时间',
|
||
PRIMARY KEY (user_id),
|
||
KEY idx_clan_id (clan_id),
|
||
KEY idx_realm (realm),
|
||
CONSTRAINT fk_clan_id FOREIGN KEY (clan_id) REFERENCES t_xiuxian_clan(clan_id) ON DELETE SET NULL
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='玩家核心数据表';
|
||
"""
|
||
)
|
||
cur.execute(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS t_xiuxian_item (
|
||
item_id INT NOT NULL AUTO_INCREMENT COMMENT '物品ID',
|
||
name VARCHAR(100) NOT NULL COMMENT '物品名称',
|
||
type VARCHAR(50) NOT NULL COMMENT '物品类型',
|
||
description TEXT COMMENT '物品描述',
|
||
PRIMARY KEY (item_id),
|
||
UNIQUE KEY uk_name (name)
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='物品定义表';
|
||
"""
|
||
)
|
||
cur.execute(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS t_xiuxian_inventory (
|
||
id BIGINT NOT NULL AUTO_INCREMENT COMMENT '背包条目ID',
|
||
user_id VARCHAR(100) NOT NULL COMMENT '玩家ID',
|
||
item_id INT NOT NULL COMMENT '物品ID',
|
||
quantity INT NOT NULL DEFAULT 0 COMMENT '数量',
|
||
PRIMARY KEY (id),
|
||
UNIQUE KEY uk_user_item (user_id, item_id),
|
||
CONSTRAINT fk_inv_user FOREIGN KEY (user_id) REFERENCES t_xiuxian_player(user_id) ON DELETE CASCADE,
|
||
CONSTRAINT fk_inv_item FOREIGN KEY (item_id) REFERENCES t_xiuxian_item(item_id) ON DELETE CASCADE
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='玩家背包表';
|
||
"""
|
||
)
|
||
return True
|
||
except mysql.connector.Error:
|
||
return False |