Files
abot/db/levels_db.py
2025-11-14 15:16:56 +08:00

138 lines
5.1 KiB
Python

from datetime import datetime
from typing import Dict, Optional, Tuple
import math
from loguru import logger
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class LevelsDBOperator(BaseDBOperator):
def __init__(self, db_manager: DBConnectionManager = None):
super().__init__(db_manager or DBConnectionManager.get_instance())
self.LOG = logger
self._ensure_table()
def _ensure_table(self) -> bool:
sql = (
"""
CREATE TABLE IF NOT EXISTS t_user_levels (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
user_id VARCHAR(100) NOT NULL,
group_id VARCHAR(100) NOT NULL,
exp BIGINT DEFAULT 0,
level INT DEFAULT 1,
last_calc DATETIME DEFAULT CURRENT_TIMESTAMP,
last_active_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uniq_user_group (user_id, group_id)
) ENGINE=InnoDB CHARACTER SET utf8mb4;
"""
)
return self.execute_update(sql)
def get_user_level(self, user_id: str, group_id: str) -> Dict:
sql = (
"SELECT user_id, group_id, exp, level, last_calc, last_active_at "
"FROM t_user_levels WHERE user_id = %s AND group_id = %s"
)
result = self.execute_query(sql, (user_id, group_id), fetch_one=True)
if result:
return result
now = datetime.now()
insert_sql = (
"INSERT INTO t_user_levels (user_id, group_id, exp, level, last_calc, last_active_at) "
"VALUES (%s, %s, %s, %s, %s, %s)"
)
self.execute_update(insert_sql, (user_id, group_id, 0, 1, now, now))
return {
"user_id": user_id,
"group_id": group_id,
"exp": 0,
"level": 1,
"last_calc": now,
"last_active_at": now,
}
def _apply_decay(self, cur: Dict, now: datetime) -> Tuple[int, bool]:
exp = int(cur.get("exp", 0))
last_active = cur.get("last_active_at")
if not last_active:
return exp, False
try:
inactive_days = (now - last_active).days
except Exception:
return exp, False
if inactive_days < 7:
return exp, False
weeks = inactive_days // 7
if weeks <= 0:
return exp, False
rate = 0.95
decayed = int(exp * (rate ** weeks))
update_sql = (
"UPDATE t_user_levels SET exp = %s, last_calc = %s WHERE user_id = %s AND group_id = %s"
)
self.execute_update(update_sql, (decayed, now, cur.get("user_id"), cur.get("group_id")))
return decayed, True
def _compute_level(self, exp: int) -> int:
thresholds = [0, 500, 1000, 1200, 1500, 1800, 4000, 8000, 9000, 20000]
lvl = 1
for t in thresholds:
if exp >= t:
lvl += 1
else:
break
return max(1, lvl - 1)
def level_title(self, level: int) -> str:
titles = [
"凡人",
"炼体期",
"筑基期",
"结丹期",
"元婴期",
"化神期",
"合体期",
"大乘期",
"渡劫期",
"真仙",
]
if level <= 0:
return titles[0]
idx = min(level - 1, len(titles) - 1)
return titles[idx]
def add_exp(self, user_id: str, group_id: str, delta: int, reason: Optional[str] = None) -> Tuple[bool, Dict]:
if not user_id or not group_id:
return False, {"error": "invalid_identity"}
try:
cur = self.get_user_level(user_id, group_id)
now = datetime.now()
base_exp, _ = self._apply_decay(cur, now)
new_exp = max(0, int(base_exp) + int(delta))
new_level = self._compute_level(new_exp)
update_sql = (
"UPDATE t_user_levels SET exp = %s, level = %s, last_calc = %s, last_active_at = %s "
"WHERE user_id = %s AND group_id = %s"
)
self.execute_update(update_sql, (new_exp, new_level, now, now, user_id, group_id))
return True, {"user_id": user_id, "group_id": group_id, "exp": new_exp, "level": new_level}
except Exception as e:
self.LOG.error(f"add_exp error: {e}")
return False, {"error": str(e)}
def recalc_level(self, user_id: str, group_id: str) -> Tuple[bool, Dict]:
try:
cur = self.get_user_level(user_id, group_id)
new_level = self._compute_level(int(cur.get("exp", 0)))
now = datetime.now()
update_sql = (
"UPDATE t_user_levels SET level = %s, last_calc = %s WHERE user_id = %s AND group_id = %s"
)
self.execute_update(update_sql, (new_level, now, user_id, group_id))
return True, {"user_id": user_id, "group_id": group_id, "exp": cur.get("exp", 0), "level": new_level}
except Exception as e:
self.LOG.error(f"recalc_level error: {e}")
return False, {"error": str(e)}