167 lines
6.3 KiB
Python
167 lines
6.3 KiB
Python
from datetime import datetime
|
|
from typing import Dict, Optional, Tuple
|
|
import math
|
|
|
|
from loguru import logger
|
|
|
|
from db.base import BaseDBOperator
|
|
from db.connection import DBConnectionManager
|
|
|
|
|
|
class LevelsDBOperator(BaseDBOperator):
|
|
def __init__(self, db_manager: DBConnectionManager = None):
|
|
super().__init__(db_manager or DBConnectionManager.get_instance())
|
|
self.LOG = logger
|
|
self._ensure_table()
|
|
|
|
def _ensure_table(self) -> bool:
|
|
sql = (
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS t_user_levels (
|
|
id BIGINT AUTO_INCREMENT PRIMARY KEY,
|
|
user_id VARCHAR(100) NOT NULL,
|
|
group_id VARCHAR(100) NOT NULL,
|
|
exp BIGINT DEFAULT 0,
|
|
level INT DEFAULT 1,
|
|
last_calc DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
last_active_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE KEY uniq_user_group (user_id, group_id)
|
|
) ENGINE=InnoDB CHARACTER SET utf8mb4;
|
|
"""
|
|
)
|
|
return self.execute_update(sql)
|
|
|
|
def get_user_level(self, user_id: str, group_id: str) -> Dict:
|
|
sql = (
|
|
"SELECT user_id, group_id, exp, level, last_calc, last_active_at "
|
|
"FROM t_user_levels WHERE user_id = %s AND group_id = %s"
|
|
)
|
|
result = self.execute_query(sql, (user_id, group_id), fetch_one=True)
|
|
if result:
|
|
return result
|
|
now = datetime.now()
|
|
insert_sql = (
|
|
"INSERT INTO t_user_levels (user_id, group_id, exp, level, last_calc, last_active_at) "
|
|
"VALUES (%s, %s, %s, %s, %s, %s)"
|
|
)
|
|
self.execute_update(insert_sql, (user_id, group_id, 0, 1, now, now))
|
|
return {
|
|
"user_id": user_id,
|
|
"group_id": group_id,
|
|
"exp": 0,
|
|
"level": 1,
|
|
"last_calc": now,
|
|
"last_active_at": now,
|
|
}
|
|
|
|
def _apply_decay(self, cur: Dict, now: datetime) -> Tuple[int, bool]:
|
|
exp = int(cur.get("exp", 0))
|
|
last_active = cur.get("last_active_at")
|
|
if not last_active:
|
|
return exp, False
|
|
try:
|
|
inactive_days = (now - last_active).days
|
|
except Exception:
|
|
return exp, False
|
|
if inactive_days < 7:
|
|
return exp, False
|
|
weeks = inactive_days // 7
|
|
if weeks <= 0:
|
|
return exp, False
|
|
rate = 0.95
|
|
decayed = int(exp * (rate ** weeks))
|
|
update_sql = (
|
|
"UPDATE t_user_levels SET exp = %s, last_calc = %s WHERE user_id = %s AND group_id = %s"
|
|
)
|
|
self.execute_update(update_sql, (decayed, now, cur.get("user_id"), cur.get("group_id")))
|
|
return decayed, True
|
|
|
|
def _thresholds(self):
|
|
return [
|
|
0,
|
|
300, 800, 1500, 2500, 4000,
|
|
6000, 9000, 13000, 18000, 24000,
|
|
32000, 42000, 54000, 70000, 90000,
|
|
115000, 145000, 180000, 220000
|
|
]
|
|
|
|
def _compute_level(self, exp: int) -> int:
|
|
thresholds = self._thresholds()
|
|
lvl = 1
|
|
for t in thresholds:
|
|
if exp >= t:
|
|
lvl += 1
|
|
else:
|
|
break
|
|
return max(1, lvl - 1)
|
|
|
|
def level_title(self, level: int) -> str:
|
|
# 统一使用xiuxian插件的境界定义
|
|
# 与 plugins/xiuxian/config.toml 中的 realm_score 保持一致
|
|
titles = [
|
|
"凡人", # 0
|
|
"炼气", # 1 (原"练气期")
|
|
"筑基", # 2 (原"炼体期"、"筑基期"合并)
|
|
"金丹", # 3 (原"结丹期")
|
|
"元婴", # 4
|
|
"化神", # 5
|
|
"合体", # 6 (原"炼虚期"、"合体期"合并)
|
|
"大乘", # 7
|
|
"渡劫", # 8
|
|
"真仙", # 9 (原"散仙"、"地仙"、"天仙"、"真仙"合并)
|
|
"金仙", # 10
|
|
"玄仙", # 11
|
|
"太乙金仙", # 12
|
|
"大罗金仙", # 13
|
|
"圣人", # 14
|
|
]
|
|
if level <= 0:
|
|
return titles[0]
|
|
idx = min(level - 1, len(titles) - 1)
|
|
return titles[idx]
|
|
|
|
def get_progress(self, exp: int):
|
|
thresholds = self._thresholds()
|
|
level = self._compute_level(exp)
|
|
idx = max(0, min(level - 1, len(thresholds) - 1))
|
|
current = thresholds[idx]
|
|
next_t = thresholds[idx + 1] if idx + 1 < len(thresholds) else None
|
|
if next_t is None:
|
|
return level, current, None, 1.0, 0
|
|
denom = max(1, next_t - current)
|
|
percent = max(0.0, min(1.0, (exp - current) / denom))
|
|
remaining = max(0, next_t - exp)
|
|
return level, current, next_t, percent, remaining
|
|
|
|
def add_exp(self, user_id: str, group_id: str, delta: int, reason: Optional[str] = None) -> Tuple[bool, Dict]:
|
|
if not user_id or not group_id:
|
|
return False, {"error": "invalid_identity"}
|
|
try:
|
|
cur = self.get_user_level(user_id, group_id)
|
|
now = datetime.now()
|
|
base_exp, _ = self._apply_decay(cur, now)
|
|
new_exp = max(0, int(base_exp) + int(delta))
|
|
new_level = self._compute_level(new_exp)
|
|
update_sql = (
|
|
"UPDATE t_user_levels SET exp = %s, level = %s, last_calc = %s, last_active_at = %s "
|
|
"WHERE user_id = %s AND group_id = %s"
|
|
)
|
|
self.execute_update(update_sql, (new_exp, new_level, now, now, user_id, group_id))
|
|
return True, {"user_id": user_id, "group_id": group_id, "exp": new_exp, "level": new_level}
|
|
except Exception as e:
|
|
self.LOG.error(f"add_exp error: {e}")
|
|
return False, {"error": str(e)}
|
|
|
|
def recalc_level(self, user_id: str, group_id: str) -> Tuple[bool, Dict]:
|
|
try:
|
|
cur = self.get_user_level(user_id, group_id)
|
|
new_level = self._compute_level(int(cur.get("exp", 0)))
|
|
now = datetime.now()
|
|
update_sql = (
|
|
"UPDATE t_user_levels SET level = %s, last_calc = %s WHERE user_id = %s AND group_id = %s"
|
|
)
|
|
self.execute_update(update_sql, (new_level, now, user_id, group_id))
|
|
return True, {"user_id": user_id, "group_id": group_id, "exp": cur.get("exp", 0), "level": new_level}
|
|
except Exception as e:
|
|
self.LOG.error(f"recalc_level error: {e}")
|
|
return False, {"error": str(e)} |