Files
abot/db/xiuxian_db.py
2025-11-24 10:00:07 +08:00

394 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Optional, Dict, Any, List, Tuple
from datetime import datetime
import mysql.connector
from db.connection import DBConnectionManager
class XiuxianDB:
"""
修仙系统的 MariaDB 持久化访问层。
设计目标:
- 与业务层采用 Cache-Aside 模式协同:业务写 DB 后应显式失效 Redis再由读取路径回填最新缓存
- 保持所有写入操作的原子性与一致性必要处使用事务与行级锁SELECT ... FOR UPDATE
使用方式:
- 在系统启动时通过 DBConnectionManager 初始化 MySQL 连接池;
- 插件层调用本类方法进行玩家/物品/背包/门派的读写,配合 Redis 缓存失效;
- 所有方法失败返回 False/None业务层自行处理提示与回退逻辑。
安全与稳定性:
- 所有 SQL 使用参数化,避免 SQL 注入;
- 事务方法在异常时显式回滚;
- 仅对必要的行使用 FOR UPDATE降低锁粒度与阻塞风险。
"""
def __init__(self, db_manager: DBConnectionManager):
"""构造函数
Args:
db_manager: 全局数据库连接管理器,需已初始化 MySQL 连接池
"""
self.db_manager = db_manager
self.pool = db_manager.mysql_pool
def _conn(self):
"""从连接池获取连接。业务层无需手动关闭with 语法自动释放。"""
return self.pool.get_connection()
def get_player(self, user_id: str, group_id: str) -> Optional[Dict[str, Any]]:
"""读取玩家核心信息。
Args:
user_id: 平台用户ID
Returns:
dict 或 None玩家记录字段包含玩家基础信息、资源与状态机字段
"""
try:
with self._conn() as conn:
with conn.cursor(dictionary=True) as cur:
cur.execute(
"SELECT user_id, group_id, dao_name, realm, spirit_root, cultivation_points, spirit_stone, status, status_until, last_cultivate_time, clan_id FROM t_xiuxian_player WHERE user_id=%s AND group_id=%s",
(user_id, group_id)
)
return cur.fetchone()
except mysql.connector.Error:
return None
def create_player(self, player: Dict[str, Any]) -> bool:
"""创建玩家记录。
注意:业务层创建成功后需删除 Redis 玩家缓存键以便下次读取回填最新数据。
"""
try:
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO t_xiuxian_player (user_id, group_id, dao_name, realm, spirit_root, cultivation_points, spirit_stone, status, status_until, last_cultivate_time, clan_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(
player.get("user_id"),
player.get("group_id"),
player.get("dao_name"),
player.get("realm"),
player.get("spirit_root"),
int(player.get("cultivation_points", 0)),
int(player.get("spirit_stone", 0)),
player.get("status"),
player.get("status_until"),
player.get("last_cultivate_time"),
player.get("clan_id")
)
)
conn.commit()
return True
except mysql.connector.Error:
return False
def update_player_fields(self, user_id: str, group_id: str, fields: Dict[str, Any]) -> bool:
"""增量更新玩家字段。
Args:
user_id: 玩家ID
fields: 需更新的字段字典,如 {"status": "Injured", "status_until": datetime}
Returns:
bool: 成功/失败
说明:
- 动态拼接 SET 子句,值统一参数化;
- 业务层应在成功后失效 Redis 玩家缓存。
"""
if not fields:
return True
cols = []
vals = []
for k, v in fields.items():
cols.append(f"{k}=%s")
vals.append(v)
vals.append(user_id)
vals.append(group_id)
sql = f"UPDATE t_xiuxian_player SET {', '.join(cols)} WHERE user_id=%s AND group_id=%s"
try:
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(sql, tuple(vals))
conn.commit()
return True
except mysql.connector.Error:
return False
def adjust_stone(self, user_id: str, delta: int) -> bool:
"""调整玩家灵石(增量)。"""
try:
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE t_xiuxian_player SET spirit_stone = spirit_stone + %s WHERE user_id=%s",
(int(delta), user_id)
)
conn.commit()
return True
except mysql.connector.Error:
return False
def adjust_cultivation(self, user_id: str, delta: int) -> bool:
"""调整玩家修为(增量)。"""
try:
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE t_xiuxian_player SET cultivation_points = cultivation_points + %s WHERE user_id=%s",
(int(delta), user_id)
)
conn.commit()
return True
except mysql.connector.Error:
return False
def get_item_id(self, name: str) -> Optional[int]:
"""获取物品ID若不存在返回 None"""
try:
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT item_id FROM t_xiuxian_item WHERE name=%s", (name,))
row = cur.fetchone()
return int(row[0]) if row else None
except mysql.connector.Error:
return None
def ensure_item(self, name: str, type_: str, description: str = "") -> Optional[int]:
"""确保物品存在不存在则创建并返回其ID。"""
iid = self.get_item_id(name)
if iid:
return iid
try:
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO t_xiuxian_item (name, type, description) VALUES (%s,%s,%s)",
(name, type_, description)
)
conn.commit()
return self.get_item_id(name)
except mysql.connector.Error:
return None
def get_inventory(self, user_id: str) -> List[Dict[str, Any]]:
"""读取玩家背包的物品名称、类型与数量。"""
try:
with self._conn() as conn:
with conn.cursor(dictionary=True) as cur:
cur.execute(
"SELECT i.name, i.type, inv.quantity FROM t_xiuxian_inventory inv JOIN t_xiuxian_item i ON inv.item_id=i.item_id WHERE inv.user_id=%s",
(user_id,)
)
return cur.fetchall() or []
except mysql.connector.Error:
return []
def add_item(self, user_id: str, item_name: str, item_type: str, qty: int) -> bool:
"""向玩家背包增加物品数量(不存在则创建记录)。"""
iid = self.ensure_item(item_name, item_type)
if not iid:
return False
try:
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, quantity FROM t_xiuxian_inventory WHERE user_id=%s AND item_id=%s",
(user_id, iid)
)
row = cur.fetchone()
if row:
cur.execute(
"UPDATE t_xiuxian_inventory SET quantity=%s WHERE id=%s",
(int(row[1]) + int(qty), int(row[0]))
)
else:
cur.execute(
"INSERT INTO t_xiuxian_inventory (user_id, item_id, quantity) VALUES (%s,%s,%s)",
(user_id, iid, int(qty))
)
conn.commit()
return True
except mysql.connector.Error:
return False
def remove_item(self, user_id: str, item_name: str, qty: int) -> bool:
"""从玩家背包扣减物品数量数量减至0则删除记录"""
iid = self.get_item_id(item_name)
if not iid:
return False
try:
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, quantity FROM t_xiuxian_inventory WHERE user_id=%s AND item_id=%s",
(user_id, iid)
)
row = cur.fetchone()
if not row:
return False
new_qty = int(row[1]) - int(qty)
if new_qty < 0:
return False
if new_qty == 0:
cur.execute("DELETE FROM t_xiuxian_inventory WHERE id=%s", (int(row[0]),))
else:
cur.execute("UPDATE t_xiuxian_inventory SET quantity=%s WHERE id=%s", (new_qty, int(row[0])))
conn.commit()
return True
except mysql.connector.Error:
return False
def transfer_item(self, from_user: str, to_user: str, item_name: str, qty: int) -> bool:
"""在两个玩家之间转移物品(事务 + 行级锁)。
流程:
1) 对出方(from_user)背包记录加锁读取并扣减;
2) 对入方(to_user)背包记录加锁读取并增加或插入;
3) 成功则提交,异常则回滚。
"""
iid = self.get_item_id(item_name)
if not iid:
return False
try:
with self._conn() as conn:
try:
with conn.cursor() as cur:
cur.execute("START TRANSACTION")
# 扣减出方库存(行级锁)
cur.execute(
"SELECT id, quantity FROM t_xiuxian_inventory WHERE user_id=%s AND item_id=%s FOR UPDATE",
(from_user, iid)
)
row = cur.fetchone()
if not row or int(row[1]) < int(qty) or qty <= 0:
cur.execute("ROLLBACK")
return False
new_qty = int(row[1]) - int(qty)
if new_qty == 0:
cur.execute("DELETE FROM t_xiuxian_inventory WHERE id=%s", (int(row[0]),))
else:
cur.execute("UPDATE t_xiuxian_inventory SET quantity=%s WHERE id=%s", (new_qty, int(row[0])))
# 增加入方库存(行级锁)
cur.execute(
"SELECT id, quantity FROM t_xiuxian_inventory WHERE user_id=%s AND item_id=%s FOR UPDATE",
(to_user, iid)
)
row2 = cur.fetchone()
if row2:
cur.execute("UPDATE t_xiuxian_inventory SET quantity=%s WHERE id=%s", (int(row2[1]) + int(qty), int(row2[0])))
else:
cur.execute("INSERT INTO t_xiuxian_inventory (user_id, item_id, quantity) VALUES (%s,%s,%s)", (to_user, iid, int(qty)))
cur.execute("COMMIT")
return True
except mysql.connector.Error:
try:
conn.rollback()
except Exception:
pass
return False
except mysql.connector.Error:
return False
def create_clan(self, clan_name: str, group_id: str, leader_user_id: str) -> Optional[int]:
"""创建门派(同群名唯一),返回 clan_id。若已存在返回现有ID。"""
try:
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT clan_id FROM t_xiuxian_clan WHERE group_id=%s AND clan_name=%s", (group_id, clan_name))
row = cur.fetchone()
if row:
return int(row[0])
cur.execute(
"INSERT INTO t_xiuxian_clan (clan_name, group_id, leader_user_id) VALUES (%s,%s,%s)",
(clan_name, group_id, leader_user_id)
)
conn.commit()
cur.execute("SELECT LAST_INSERT_ID()")
rid = cur.fetchone()
return int(rid[0]) if rid else None
except mysql.connector.Error:
return None
def get_clan_id(self, group_id: str, clan_name: str) -> Optional[int]:
"""根据群ID和门派名查询 clan_id。"""
try:
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT clan_id FROM t_xiuxian_clan WHERE group_id=%s AND clan_name=%s", (group_id, clan_name))
row = cur.fetchone()
return int(row[0]) if row else None
except mysql.connector.Error:
return None
def init_schema(self) -> bool:
"""初始化修仙相关表结构(若不存在则创建)。"""
try:
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS t_xiuxian_clan (
clan_id BIGINT NOT NULL AUTO_INCREMENT COMMENT '门派ID',
clan_name VARCHAR(100) NOT NULL COMMENT '门派名称',
group_id VARCHAR(100) NOT NULL COMMENT '所属群ID',
leader_user_id VARCHAR(100) NOT NULL COMMENT '掌门ID',
PRIMARY KEY (clan_id),
UNIQUE KEY uk_group_clan_name (group_id, clan_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='门派表';
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS t_xiuxian_player (
user_id VARCHAR(100) NOT NULL COMMENT '平台用户ID',
group_id VARCHAR(100) NOT NULL COMMENT '主要所在群ID',
dao_name VARCHAR(100) NOT NULL COMMENT '道号',
realm VARCHAR(50) DEFAULT '凡人' COMMENT '境界',
spirit_root VARCHAR(50) DEFAULT '凡灵根' COMMENT '灵根天赋',
clan_id BIGINT DEFAULT NULL COMMENT '所属门派ID',
cultivation_points BIGINT DEFAULT 0 COMMENT '修为',
spirit_stone BIGINT DEFAULT 0 COMMENT '灵石',
status VARCHAR(20) DEFAULT 'Idle' COMMENT '玩家状态',
status_until DATETIME DEFAULT NULL COMMENT '状态到期时间',
last_cultivate_time DATETIME DEFAULT NULL COMMENT '上次闭关开始时间',
PRIMARY KEY (user_id),
KEY idx_clan_id (clan_id),
KEY idx_realm (realm),
CONSTRAINT fk_clan_id FOREIGN KEY (clan_id) REFERENCES t_xiuxian_clan(clan_id) ON DELETE SET NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='玩家核心数据表';
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS t_xiuxian_item (
item_id INT NOT NULL AUTO_INCREMENT COMMENT '物品ID',
name VARCHAR(100) NOT NULL COMMENT '物品名称',
type VARCHAR(50) NOT NULL COMMENT '物品类型',
description TEXT COMMENT '物品描述',
PRIMARY KEY (item_id),
UNIQUE KEY uk_name (name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='物品定义表';
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS t_xiuxian_inventory (
id BIGINT NOT NULL AUTO_INCREMENT COMMENT '背包条目ID',
user_id VARCHAR(100) NOT NULL COMMENT '玩家ID',
item_id INT NOT NULL COMMENT '物品ID',
quantity INT NOT NULL DEFAULT 0 COMMENT '数量',
PRIMARY KEY (id),
UNIQUE KEY uk_user_item (user_id, item_id),
CONSTRAINT fk_inv_user FOREIGN KEY (user_id) REFERENCES t_xiuxian_player(user_id) ON DELETE CASCADE,
CONSTRAINT fk_inv_item FOREIGN KEY (item_id) REFERENCES t_xiuxian_item(item_id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='玩家背包表';
"""
)
return True
except mysql.connector.Error:
return False