Files
abot/db/points_db.py

694 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
积分系统数据库操作类
"""
from loguru import logger
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional, Tuple, Any
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class PointSource(Enum):
"""积分来源枚举"""
CHECKIN = "checkin" # 签到
GAME = "game" # 游戏
ADMIN = "admin" # 管理员操作
TRADE = "trade" # 积分交易
PLUGIN = "plugin" # 插件使用
OTHER = "other" # 其他
class PointsDBOperator(BaseDBOperator):
"""积分系统数据库操作类"""
def __init__(self, db_manager=None):
"""初始化积分数据库操作类"""
super().__init__(db_manager or DBConnectionManager.get_instance())
self.LOG = logger
# 确保数据库表存在,后续不需要处理了。
# self._ensure_tables_exist()
def _ensure_tables_exist(self):
"""确保积分相关的数据库表存在"""
try:
# 创建用户积分表
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_user_points (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
user_id VARCHAR(100) NOT NULL,
group_id VARCHAR(100) NOT NULL,
total_points INTEGER DEFAULT 0,
checkin_points INTEGER DEFAULT 0,
game_points INTEGER DEFAULT 0,
other_points INTEGER DEFAULT 0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE(user_id, group_id)
) ENGINE=InnoDB CHARACTER SET utf8mb4;
""")
# 创建积分交易记录表
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_point_transactions (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
user_id VARCHAR(100) NOT NULL,
group_id VARCHAR(100) NOT NULL,
transaction_type VARCHAR(20) NOT NULL,
points INTEGER NOT NULL,
source VARCHAR(50) NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB CHARACTER SET utf8mb4;
""")
# 创建功能插件积分配置表
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_plugin_point_config (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
plugin_name VARCHAR(100) NOT NULL,
points_required INTEGER DEFAULT 0,
is_enabled BOOLEAN DEFAULT TRUE,
description TEXT,
UNIQUE(plugin_name)
) ENGINE=InnoDB CHARACTER SET utf8mb4;
""")
# 创建关禁闭记录表
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_prison_records (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
user_id VARCHAR(100) NOT NULL,
group_id VARCHAR(100) NOT NULL,
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP NOT NULL,
reason VARCHAR(255),
status TINYINT DEFAULT 1 COMMENT '1:在押 0:已释放',
bailout_user_id VARCHAR(100),
bailout_time TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY `idx_user_group` (user_id, group_id, status)
) ENGINE=InnoDB CHARACTER SET utf8mb4;
""")
self.LOG.info("积分系统数据库表检查/创建完成")
except Exception as e:
self.LOG.error(f"创建积分系统数据库表失败: {e}")
raise
def get_user_points(self, user_id: str, group_id: str) -> Dict:
"""
获取用户积分信息
Args:
user_id: 用户ID
group_id: 群组ID
Returns:
包含用户积分信息的字典
"""
try:
# 先尝试从新表获取
result = self.execute_query("""
SELECT * FROM t_user_points
WHERE user_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True)
if result:
return result
# 如果新表没有数据,尝试从旧表迁移数据
return self._migrate_user_points(user_id, group_id)
except Exception as e:
self.LOG.error(f"获取用户积分失败: {e}")
return {
"user_id": user_id,
"group_id": group_id,
"total_points": 0,
"checkin_points": 0,
"game_points": 0,
"other_points": 0
}
def _migrate_user_points(self, user_id: str, group_id: str) -> Dict:
"""
从旧表迁移用户积分数据到新表
Args:
user_id: 用户ID
group_id: 群组ID
Returns:
包含用户积分信息的字典
"""
result = {
"user_id": user_id,
"group_id": group_id,
"total_points": 0,
"checkin_points": 0,
"game_points": 0,
"other_points": 0
}
try:
# 查询签到积分
sign_result = self.execute_query("""
SELECT points FROM t_sign_record
WHERE wx_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True)
# 查询游戏积分
game_result = self.execute_query("""
SELECT points FROM t_encyclopedia_players
WHERE player_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True)
# 合并积分
checkin_points = sign_result["points"] if sign_result else 0
game_points = game_result["points"] if game_result else 0
total_points = checkin_points + game_points
# 更新结果
result["checkin_points"] = checkin_points
result["game_points"] = game_points
result["total_points"] = total_points
# 插入到新表
self.execute_update("""
INSERT INTO t_user_points
(user_id, group_id, total_points, checkin_points, game_points, other_points)
VALUES (%s, %s, %s, %s, %s, %s)
""", (user_id, group_id, total_points, checkin_points, game_points, 0))
# 获取插入后的完整记录
return self.execute_query("""
SELECT * FROM t_user_points
WHERE user_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True) or result
except Exception as e:
self.LOG.error(f"迁移用户积分失败: {e}")
return result
def add_points(self, user_id: str, group_id: str, points: int,
source: PointSource, description: str = None,
user_name: str = None) -> Tuple[bool, Dict]:
"""
增加用户积分
Args:
user_id: 用户ID
group_id: 群组ID
points: 积分数量
source: 积分来源
description: 描述
user_name: 用户名称
Returns:
(成功标志, 用户积分信息)
"""
if points <= 0:
return False, {"error": "积分必须为正数"}
try:
# 检查用户是否存在
user_exists = self.execute_query("""
SELECT * FROM t_user_points
WHERE user_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True)
if not user_exists:
# 如果用户不存在,先迁移或创建用户
self._migrate_user_points(user_id, group_id)
# 更新积分
source_field = f"{source.value}_points" if source.value in ["checkin", "game"] else "other_points"
self.execute_update(f"""
UPDATE t_user_points
SET total_points = total_points + %s,
{source_field} = {source_field} + %s
WHERE user_id = %s AND group_id = %s
""", (points, points, user_id, group_id))
# 记录交易
self.execute_update("""
INSERT INTO t_point_transactions
(user_id, group_id, transaction_type, points, source, description)
VALUES (%s, %s, %s, %s, %s, %s)
""", (user_id, group_id, "earn", points, source.value, description))
# 同时更新旧表,保持兼容
if source == PointSource.CHECKIN:
self.execute_update("""
UPDATE t_sign_record
SET points = points + %s
WHERE wx_id = %s AND group_id = %s
""", (points, user_id, group_id))
elif source == PointSource.GAME:
self.execute_update("""
UPDATE t_encyclopedia_players
SET points = points + %s
WHERE player_id = %s AND group_id = %s
""", (points, user_id, group_id))
# 获取更新后的积分信息
updated_points = self.execute_query("""
SELECT * FROM t_user_points
WHERE user_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True)
return True, updated_points
except Exception as e:
self.LOG.error(f"增加用户积分失败: {e}")
return False, {"error": str(e)}
def deduct_points(self, user_id: str, group_id: str, points: int,
source: PointSource, description: str = None) -> Tuple[bool, Dict]:
"""
扣除用户积分
Args:
user_id: 用户ID
group_id: 群组ID
points: 积分数量
source: 积分来源
description: 描述
Returns:
(成功标志, 用户积分信息)
"""
if points <= 0:
return False, {"error": "积分必须为正数"}
try:
# 检查用户是否存在及积分是否足够
user_points = self.execute_query("""
SELECT * FROM t_user_points
WHERE user_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True)
if not user_points:
# 如果用户不存在,先迁移或创建用户
user_points = self._migrate_user_points(user_id, group_id)
if user_points["total_points"] < points:
return False, {"error": "积分不足", "current_points": user_points["total_points"]}
# 更新积分
self.execute_update("""
UPDATE t_user_points
SET total_points = total_points - %s
WHERE user_id = %s AND group_id = %s
""", (points, user_id, group_id))
# 记录交易
self.execute_update("""
INSERT INTO t_point_transactions
(user_id, group_id, transaction_type, points, source, description)
VALUES (%s, %s, %s, %s, %s, %s)
""", (user_id, group_id, "spend", -points, source.value, description))
# 获取更新后的积分信息
updated_points = self.execute_query("""
SELECT * FROM t_user_points
WHERE user_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True)
return True, updated_points
except Exception as e:
self.LOG.error(f"扣除用户积分失败: {e}")
return False, {"error": str(e)}
def transfer_points(
self,
from_user_id: str,
to_user_id: str,
group_id: str,
points: int,
description: str = None,
from_user_name: str = None,
to_user_name: str = None,
) -> Tuple[bool, Dict]:
"""
转移积分从一个用户到另一个用户
Args:
from_user_id: 转出用户ID
to_user_id: 转入用户ID
group_id: 群组ID
points: 积分数量
description: 描述
from_user_name: 转出用户展示名(可选,优先用于流水描述)
to_user_name: 转入用户展示名(可选,优先用于流水描述)
Returns:
(成功标志, 结果信息)
"""
if points <= 0:
return False, {"error": "积分必须为正数"}
try:
# 流水描述统一使用“昵称优先ID兜底”策略
# 1. 业务层可显式传入昵称;
# 2. 未传时回落到 user_id确保兼容旧调用方。
from_display_name = str(from_user_name or from_user_id)
to_display_name = str(to_user_name or to_user_id)
# 先扣除转出用户积分
success, result = self.deduct_points(
from_user_id, group_id, points,
PointSource.TRADE, f"转账给用户 {to_display_name}: {description}"
)
if not success:
return False, result
# 再增加转入用户积分
success, to_result = self.add_points(
to_user_id, group_id, points,
PointSource.TRADE, f"收到用户 {from_display_name} 的转账: {description}"
)
if not success:
# 如果增加失败,回滚扣除操作
self.add_points(
from_user_id, group_id, points,
PointSource.TRADE, f"转账失败退回: {description}",
)
return False, to_result
return True, {
"from_user": result,
"to_user": to_result
}
except Exception as e:
self.LOG.error(f"转移用户积分失败: {e}")
return False, {"error": str(e)}
def get_user_transactions(self, user_id: str, group_id: str, limit: int = 10) -> List[Dict]:
"""
获取用户积分交易记录
Args:
user_id: 用户ID
group_id: 群组ID
limit: 记录数量限制
Returns:
交易记录列表
"""
try:
return self.execute_query("""
SELECT * FROM t_point_transactions
WHERE user_id = %s AND group_id = %s
ORDER BY created_at DESC
LIMIT %s
""", (user_id, group_id, limit))
except Exception as e:
self.LOG.error(f"获取用户交易记录失败: {e}")
return []
def get_points_ranking(self, group_id: str, limit: int = 10) -> List[Dict]:
"""
获取群组积分排行榜
Args:
group_id: 群组ID
limit: 记录数量限制
Returns:
排行榜列表
"""
try:
return self.execute_query("""
SELECT user_id, total_points, checkin_points, game_points, other_points
FROM t_user_points
WHERE group_id = %s
AND user_id !='SYSTEM'
ORDER BY total_points DESC
LIMIT %s
""", (group_id, limit))
except Exception as e:
self.LOG.error(f"获取积分排行榜失败: {e}")
return []
def get_plugin_config(self, plugin_name: str) -> Optional[Dict]:
"""
获取插件积分配置
Args:
plugin_name: 插件名称
Returns:
插件积分配置
"""
try:
return self.execute_query("""
SELECT * FROM t_plugin_point_config
WHERE plugin_name = %s
""", (plugin_name,), fetch_one=True)
except Exception as e:
self.LOG.error(f"获取插件积分配置失败: {e}")
return None
def set_plugin_config(self, plugin_name: str, points_required: int,
is_enabled: bool = True, description: str = None) -> bool:
"""
设置插件积分配置
Args:
plugin_name: 插件名称
points_required: 所需积分
is_enabled: 是否启用
description: 描述
Returns:
是否成功
"""
try:
self.execute_update("""
INSERT INTO t_plugin_point_config
(plugin_name, points_required, is_enabled, description)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
points_required = VALUES(points_required),
is_enabled = VALUES(is_enabled),
description = VALUES(description)
""", (plugin_name, points_required, is_enabled, description))
return True
except Exception as e:
self.LOG.error(f"设置插件积分配置失败: {e}")
return False
def get_all_plugin_configs(self) -> List[Dict]:
"""
获取所有插件积分配置
Returns:
所有插件积分配置
"""
try:
return self.execute_query("SELECT * FROM t_plugin_point_config")
except Exception as e:
self.LOG.error(f"获取所有插件积分配置失败: {e}")
return []
def check_plugin_points(self, user_id: str, group_id: str, plugin_name: str) -> Tuple[bool, Dict]:
"""
检查用户是否有足够积分使用插件
Args:
user_id: 用户ID
group_id: 群组ID
plugin_name: 插件名称
Returns:
(是否有足够积分, 结果信息)
"""
try:
# 获取插件积分配置
plugin_config = self.get_plugin_config(plugin_name)
# 如果插件未配置或未启用积分限制,直接返回成功
if not plugin_config or not plugin_config["is_enabled"]:
return True, {"message": "插件未配置积分限制"}
# 获取用户积分
user_points = self.get_user_points(user_id, group_id)
# 检查积分是否足够
if user_points["total_points"] < plugin_config["points_required"]:
return False, {
"error": "积分不足",
"current_points": user_points["total_points"],
"required_points": plugin_config["points_required"]
}
return True, {
"message": "积分充足",
"current_points": user_points["total_points"],
"required_points": plugin_config["points_required"]
}
except Exception as e:
self.LOG.error(f"检查插件积分失败: {e}")
return True, {"error": str(e)} # 出错时默认允许使用
def use_plugin(self, user_id: str, group_id: str, plugin_name: str) -> Tuple[bool, Dict]:
"""
使用插件并扣除积分
Args:
user_id: 用户ID
group_id: 群组ID
plugin_name: 插件名称
Returns:
(是否成功, 结果信息)
"""
try:
# 先检查积分是否足够
can_use, result = self.check_plugin_points(user_id, group_id, plugin_name)
if not can_use:
return False, result
# 如果插件未配置积分或不需要扣除积分,直接返回成功
if "required_points" not in result or result["required_points"] <= 0:
return True, {"message": "无需扣除积分"}
# 扣除积分
return self.deduct_points(
user_id, group_id, result["required_points"],
PointSource.PLUGIN, f"使用插件: {plugin_name}"
)
except Exception as e:
self.LOG.error(f"使用插件扣除积分失败: {e}")
return False, {"error": str(e)}
def get_user_points_stats(self, group_id: str) -> Dict[str, Any]:
"""
获取群组积分统计信息
Args:
group_id: 群组ID
Returns:
统计信息
"""
stats = {
"total_users": 0,
"total_points": 0,
"avg_points": 0,
"max_points": 0,
"min_points": 0,
"checkin_points_total": 0,
"game_points_total": 0,
"other_points_total": 0
}
try:
result = self.execute_query("""
SELECT
COUNT(*) as total_users,
SUM(total_points) as total_points,
AVG(total_points) as avg_points,
MAX(total_points) as max_points,
MIN(total_points) as min_points,
SUM(checkin_points) as checkin_points_total,
SUM(game_points) as game_points_total,
SUM(other_points) as other_points_total
FROM t_user_points
WHERE group_id = %s
""", (group_id,), fetch_one=True)
if result:
stats.update({k: v or 0 for k, v in result.items()})
return stats
except Exception as e:
self.LOG.error(f"获取群组积分统计信息失败: {e}")
return stats
def imprison_user(self, user_id: str, group_id: str, hours: int = 24, reason: str = None) -> bool:
"""关押用户
Args:
user_id: 用户ID
group_id: 群组ID
hours: 关押时长(小时)
reason: 关押原因
"""
try:
end_time = datetime.now() + timedelta(hours=hours)
self.execute_update("""
INSERT INTO t_prison_records (user_id, group_id, end_time, reason)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
start_time=CURRENT_TIMESTAMP, end_time=%s, reason=%s, status=1
""", (user_id, group_id, end_time, reason, end_time, reason))
return True
except Exception as e:
self.LOG.error(f"关押用户失败: {e}")
return False
def check_prison_status(self, user_id: str, group_id: str) -> Optional[Dict]:
"""检查用户是否在押
Returns:
None: 不在押
Dict: 在押信息
"""
try:
records = self.execute_query("""
SELECT * FROM t_prison_records
WHERE user_id = %s AND group_id = %s AND status = 1
AND end_time > CURRENT_TIMESTAMP
LIMIT 1
""", (user_id, group_id))
return records[0] if records else None
except Exception as e:
self.LOG.error(f"检查用户在押状态失败: {e}")
return None
def bailout_user(
self,
prisoner_id: str,
bailout_user_id: str,
group_id: str,
prisoner_name: str = None,
bailout_user_name: str = None,
) -> Tuple[bool, str]:
"""保释用户
Returns:
(bool, str): (是否成功, 错误信息)
"""
try:
# 检查是否在押
prison_record = self.check_prison_status(prisoner_id, group_id)
if not prison_record:
return False, "该用户未被关押"
# 扣除保释金
success, result = self.transfer_points(
bailout_user_id,
"SYSTEM",
group_id,
30,
f"{str(prisoner_name or prisoner_id)}支付保释金",
from_user_name=bailout_user_name,
to_user_name="系统",
)
if not success:
return False, result.get("error", "保释失败")
# 释放用户
self.execute_update("""
UPDATE t_prison_records
SET status = 0, bailout_user_id = %s, bailout_time = CURRENT_TIMESTAMP
WHERE user_id = %s AND group_id = %s AND status = 1
""", (bailout_user_id, prisoner_id, group_id))
return True, "保释成功"
except Exception as e:
self.LOG.error(f"保释用户失败: {e}")
return False, f"保释失败: {str(e)}"