加入了级别功能。

This commit is contained in:
liuwei
2025-11-14 15:16:56 +08:00
parent ceadaa03f1
commit 57c6a0d234
6 changed files with 169 additions and 17 deletions

138
db/levels_db.py Normal file
View File

@@ -0,0 +1,138 @@
from datetime import datetime
from typing import Dict, Optional, Tuple
import math
from loguru import logger
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class LevelsDBOperator(BaseDBOperator):
def __init__(self, db_manager: DBConnectionManager = None):
super().__init__(db_manager or DBConnectionManager.get_instance())
self.LOG = logger
self._ensure_table()
def _ensure_table(self) -> bool:
sql = (
"""
CREATE TABLE IF NOT EXISTS t_user_levels (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
user_id VARCHAR(100) NOT NULL,
group_id VARCHAR(100) NOT NULL,
exp BIGINT DEFAULT 0,
level INT DEFAULT 1,
last_calc DATETIME DEFAULT CURRENT_TIMESTAMP,
last_active_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uniq_user_group (user_id, group_id)
) ENGINE=InnoDB CHARACTER SET utf8mb4;
"""
)
return self.execute_update(sql)
def get_user_level(self, user_id: str, group_id: str) -> Dict:
sql = (
"SELECT user_id, group_id, exp, level, last_calc, last_active_at "
"FROM t_user_levels WHERE user_id = %s AND group_id = %s"
)
result = self.execute_query(sql, (user_id, group_id), fetch_one=True)
if result:
return result
now = datetime.now()
insert_sql = (
"INSERT INTO t_user_levels (user_id, group_id, exp, level, last_calc, last_active_at) "
"VALUES (%s, %s, %s, %s, %s, %s)"
)
self.execute_update(insert_sql, (user_id, group_id, 0, 1, now, now))
return {
"user_id": user_id,
"group_id": group_id,
"exp": 0,
"level": 1,
"last_calc": now,
"last_active_at": now,
}
def _apply_decay(self, cur: Dict, now: datetime) -> Tuple[int, bool]:
exp = int(cur.get("exp", 0))
last_active = cur.get("last_active_at")
if not last_active:
return exp, False
try:
inactive_days = (now - last_active).days
except Exception:
return exp, False
if inactive_days < 7:
return exp, False
weeks = inactive_days // 7
if weeks <= 0:
return exp, False
rate = 0.95
decayed = int(exp * (rate ** weeks))
update_sql = (
"UPDATE t_user_levels SET exp = %s, last_calc = %s WHERE user_id = %s AND group_id = %s"
)
self.execute_update(update_sql, (decayed, now, cur.get("user_id"), cur.get("group_id")))
return decayed, True
def _compute_level(self, exp: int) -> int:
thresholds = [0, 500, 1000, 1200, 1500, 1800, 4000, 8000, 9000, 20000]
lvl = 1
for t in thresholds:
if exp >= t:
lvl += 1
else:
break
return max(1, lvl - 1)
def level_title(self, level: int) -> str:
titles = [
"凡人",
"炼体期",
"筑基期",
"结丹期",
"元婴期",
"化神期",
"合体期",
"大乘期",
"渡劫期",
"真仙",
]
if level <= 0:
return titles[0]
idx = min(level - 1, len(titles) - 1)
return titles[idx]
def add_exp(self, user_id: str, group_id: str, delta: int, reason: Optional[str] = None) -> Tuple[bool, Dict]:
if not user_id or not group_id:
return False, {"error": "invalid_identity"}
try:
cur = self.get_user_level(user_id, group_id)
now = datetime.now()
base_exp, _ = self._apply_decay(cur, now)
new_exp = max(0, int(base_exp) + int(delta))
new_level = self._compute_level(new_exp)
update_sql = (
"UPDATE t_user_levels SET exp = %s, level = %s, last_calc = %s, last_active_at = %s "
"WHERE user_id = %s AND group_id = %s"
)
self.execute_update(update_sql, (new_exp, new_level, now, now, user_id, group_id))
return True, {"user_id": user_id, "group_id": group_id, "exp": new_exp, "level": new_level}
except Exception as e:
self.LOG.error(f"add_exp error: {e}")
return False, {"error": str(e)}
def recalc_level(self, user_id: str, group_id: str) -> Tuple[bool, Dict]:
try:
cur = self.get_user_level(user_id, group_id)
new_level = self._compute_level(int(cur.get("exp", 0)))
now = datetime.now()
update_sql = (
"UPDATE t_user_levels SET level = %s, last_calc = %s WHERE user_id = %s AND group_id = %s"
)
self.execute_update(update_sql, (new_level, now, user_id, group_id))
return True, {"user_id": user_id, "group_id": group_id, "exp": cur.get("exp", 0), "level": new_level}
except Exception as e:
self.LOG.error(f"recalc_level error: {e}")
return False, {"error": str(e)}