Files
abot/db/points_db.py
liuwei 3b49e340e4 添加打劫保释功能
- 打劫失败后自动关押24小时
- 其他人可以使用"保释 @用户"命令花费30积分保释
- 在押期间无法进行打劫
- 保释后立即释放
- 所有记录都会保存在数据库中
2025-04-10 10:00:54 +08:00

664 lines
24 KiB
Python

# -*- coding: utf-8 -*-
"""
积分系统数据库操作类
"""
import logging
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional, Tuple, Any
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class PointSource(Enum):
"""积分来源枚举"""
CHECKIN = "checkin" # 签到
GAME = "game" # 游戏
ADMIN = "admin" # 管理员操作
TRADE = "trade" # 积分交易
PLUGIN = "plugin" # 插件使用
OTHER = "other" # 其他
class PointsDBOperator(BaseDBOperator):
"""积分系统数据库操作类"""
def __init__(self, db_manager=None):
"""初始化积分数据库操作类"""
super().__init__(db_manager or DBConnectionManager.get_instance())
self.logger = logging.getLogger("PointsDBOperator")
# 确保数据库表存在,后续不需要处理了。
# self._ensure_tables_exist()
def _ensure_tables_exist(self):
"""确保积分相关的数据库表存在"""
try:
# 创建用户积分表
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_user_points (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
user_id VARCHAR(100) NOT NULL,
group_id VARCHAR(100) NOT NULL,
total_points INTEGER DEFAULT 0,
checkin_points INTEGER DEFAULT 0,
game_points INTEGER DEFAULT 0,
other_points INTEGER DEFAULT 0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE(user_id, group_id)
) ENGINE=InnoDB CHARACTER SET utf8mb4;
""")
# 创建积分交易记录表
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_point_transactions (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
user_id VARCHAR(100) NOT NULL,
group_id VARCHAR(100) NOT NULL,
transaction_type VARCHAR(20) NOT NULL,
points INTEGER NOT NULL,
source VARCHAR(50) NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB CHARACTER SET utf8mb4;
""")
# 创建功能插件积分配置表
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_plugin_point_config (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
plugin_name VARCHAR(100) NOT NULL,
points_required INTEGER DEFAULT 0,
is_enabled BOOLEAN DEFAULT TRUE,
description TEXT,
UNIQUE(plugin_name)
) ENGINE=InnoDB CHARACTER SET utf8mb4;
""")
# 创建关禁闭记录表
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_prison_records (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
user_id VARCHAR(100) NOT NULL,
group_id VARCHAR(100) NOT NULL,
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP NOT NULL,
reason VARCHAR(255),
status TINYINT DEFAULT 1 COMMENT '1:在押 0:已释放',
bailout_user_id VARCHAR(100),
bailout_time TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY `idx_user_group` (user_id, group_id, status)
) ENGINE=InnoDB CHARACTER SET utf8mb4;
""")
self.logger.info("积分系统数据库表检查/创建完成")
except Exception as e:
self.logger.error(f"创建积分系统数据库表失败: {e}")
raise
def get_user_points(self, user_id: str, group_id: str) -> Dict:
"""
获取用户积分信息
Args:
user_id: 用户ID
group_id: 群组ID
Returns:
包含用户积分信息的字典
"""
try:
# 先尝试从新表获取
result = self.execute_query("""
SELECT * FROM t_user_points
WHERE user_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True)
if result:
return result
# 如果新表没有数据,尝试从旧表迁移数据
return self._migrate_user_points(user_id, group_id)
except Exception as e:
self.logger.error(f"获取用户积分失败: {e}")
return {
"user_id": user_id,
"group_id": group_id,
"total_points": 0,
"checkin_points": 0,
"game_points": 0,
"other_points": 0
}
def _migrate_user_points(self, user_id: str, group_id: str) -> Dict:
"""
从旧表迁移用户积分数据到新表
Args:
user_id: 用户ID
group_id: 群组ID
Returns:
包含用户积分信息的字典
"""
result = {
"user_id": user_id,
"group_id": group_id,
"total_points": 0,
"checkin_points": 0,
"game_points": 0,
"other_points": 0
}
try:
# 查询签到积分
sign_result = self.execute_query("""
SELECT points FROM t_sign_record
WHERE wx_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True)
# 查询游戏积分
game_result = self.execute_query("""
SELECT points FROM t_encyclopedia_players
WHERE player_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True)
# 合并积分
checkin_points = sign_result["points"] if sign_result else 0
game_points = game_result["points"] if game_result else 0
total_points = checkin_points + game_points
# 更新结果
result["checkin_points"] = checkin_points
result["game_points"] = game_points
result["total_points"] = total_points
# 插入到新表
self.execute_update("""
INSERT INTO t_user_points
(user_id, group_id, total_points, checkin_points, game_points, other_points)
VALUES (%s, %s, %s, %s, %s, %s)
""", (user_id, group_id, total_points, checkin_points, game_points, 0))
# 获取插入后的完整记录
return self.execute_query("""
SELECT * FROM t_user_points
WHERE user_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True) or result
except Exception as e:
self.logger.error(f"迁移用户积分失败: {e}")
return result
def add_points(self, user_id: str, group_id: str, points: int,
source: PointSource, description: str = None,
user_name: str = None) -> Tuple[bool, Dict]:
"""
增加用户积分
Args:
user_id: 用户ID
group_id: 群组ID
points: 积分数量
source: 积分来源
description: 描述
user_name: 用户名称
Returns:
(成功标志, 用户积分信息)
"""
if points <= 0:
return False, {"error": "积分必须为正数"}
try:
# 检查用户是否存在
user_exists = self.execute_query("""
SELECT * FROM t_user_points
WHERE user_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True)
if not user_exists:
# 如果用户不存在,先迁移或创建用户
self._migrate_user_points(user_id, group_id)
# 更新积分
source_field = f"{source.value}_points" if source.value in ["checkin", "game"] else "other_points"
self.execute_update(f"""
UPDATE t_user_points
SET total_points = total_points + %s,
{source_field} = {source_field} + %s
WHERE user_id = %s AND group_id = %s
""", (points, points, user_id, group_id))
# 记录交易
self.execute_update("""
INSERT INTO t_point_transactions
(user_id, group_id, transaction_type, points, source, description)
VALUES (%s, %s, %s, %s, %s, %s)
""", (user_id, group_id, "earn", points, source.value, description))
# 同时更新旧表,保持兼容
if source == PointSource.CHECKIN:
self.execute_update("""
UPDATE t_sign_record
SET points = points + %s
WHERE wx_id = %s AND group__id = %s
""", (points, user_id, group_id))
elif source == PointSource.GAME:
self.execute_update("""
UPDATE t_encyclopedia_players
SET points = points + %s
WHERE player_id = %s AND group_id = %s
""", (points, user_id, group_id))
# 获取更新后的积分信息
updated_points = self.execute_query("""
SELECT * FROM t_user_points
WHERE user_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True)
return True, updated_points
except Exception as e:
self.logger.error(f"增加用户积分失败: {e}")
return False, {"error": str(e)}
def deduct_points(self, user_id: str, group_id: str, points: int,
source: PointSource, description: str = None) -> Tuple[bool, Dict]:
"""
扣除用户积分
Args:
user_id: 用户ID
group_id: 群组ID
points: 积分数量
source: 积分来源
description: 描述
Returns:
(成功标志, 用户积分信息)
"""
if points <= 0:
return False, {"error": "积分必须为正数"}
try:
# 检查用户是否存在及积分是否足够
user_points = self.execute_query("""
SELECT * FROM t_user_points
WHERE user_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True)
if not user_points:
# 如果用户不存在,先迁移或创建用户
user_points = self._migrate_user_points(user_id, group_id)
if user_points["total_points"] < points:
return False, {"error": "积分不足", "current_points": user_points["total_points"]}
# 更新积分
self.execute_update("""
UPDATE t_user_points
SET total_points = total_points - %s
WHERE user_id = %s AND group_id = %s
""", (points, user_id, group_id))
# 记录交易
self.execute_update("""
INSERT INTO t_point_transactions
(user_id, group_id, transaction_type, points, source, description)
VALUES (%s, %s, %s, %s, %s, %s)
""", (user_id, group_id, "spend", -points, source.value, description))
# 获取更新后的积分信息
updated_points = self.execute_query("""
SELECT * FROM t_user_points
WHERE user_id = %s AND group_id = %s
""", (user_id, group_id), fetch_one=True)
return True, updated_points
except Exception as e:
self.logger.error(f"扣除用户积分失败: {e}")
return False, {"error": str(e)}
def transfer_points(self, from_user_id: str, to_user_id: str, group_id: str,
points: int, description: str = None) -> Tuple[bool, Dict]:
"""
转移积分从一个用户到另一个用户
Args:
from_user_id: 转出用户ID
to_user_id: 转入用户ID
group_id: 群组ID
points: 积分数量
description: 描述
Returns:
(成功标志, 结果信息)
"""
if points <= 0:
return False, {"error": "积分必须为正数"}
try:
# 先扣除转出用户积分
success, result = self.deduct_points(
from_user_id, group_id, points,
PointSource.TRADE, f"转账给用户 {to_user_id}: {description}"
)
if not success:
return False, result
# 再增加转入用户积分
success, to_result = self.add_points(
to_user_id, group_id, points,
PointSource.TRADE, f"收到用户 {from_user_id} 的转账: {description}"
)
if not success:
# 如果增加失败,回滚扣除操作
self.add_points(
from_user_id, group_id, points,
PointSource.TRADE, f"转账失败退回: {description}"
)
return False, to_result
return True, {
"from_user": result,
"to_user": to_result
}
except Exception as e:
self.logger.error(f"转移用户积分失败: {e}")
return False, {"error": str(e)}
def get_user_transactions(self, user_id: str, group_id: str, limit: int = 10) -> List[Dict]:
"""
获取用户积分交易记录
Args:
user_id: 用户ID
group_id: 群组ID
limit: 记录数量限制
Returns:
交易记录列表
"""
try:
return self.execute_query("""
SELECT * FROM t_point_transactions
WHERE user_id = %s AND group_id = %s
ORDER BY created_at DESC
LIMIT %s
""", (user_id, group_id, limit))
except Exception as e:
self.logger.error(f"获取用户交易记录失败: {e}")
return []
def get_points_ranking(self, group_id: str, limit: int = 10) -> List[Dict]:
"""
获取群组积分排行榜
Args:
group_id: 群组ID
limit: 记录数量限制
Returns:
排行榜列表
"""
try:
return self.execute_query("""
SELECT user_id, total_points, checkin_points, game_points, other_points
FROM t_user_points
WHERE group_id = %s
ORDER BY total_points DESC
LIMIT %s
""", (group_id, limit))
except Exception as e:
self.logger.error(f"获取积分排行榜失败: {e}")
return []
def get_plugin_config(self, plugin_name: str) -> Optional[Dict]:
"""
获取插件积分配置
Args:
plugin_name: 插件名称
Returns:
插件积分配置
"""
try:
return self.execute_query("""
SELECT * FROM t_plugin_point_config
WHERE plugin_name = %s
""", (plugin_name,), fetch_one=True)
except Exception as e:
self.logger.error(f"获取插件积分配置失败: {e}")
return None
def set_plugin_config(self, plugin_name: str, points_required: int,
is_enabled: bool = True, description: str = None) -> bool:
"""
设置插件积分配置
Args:
plugin_name: 插件名称
points_required: 所需积分
is_enabled: 是否启用
description: 描述
Returns:
是否成功
"""
try:
self.execute_update("""
INSERT INTO t_plugin_point_config
(plugin_name, points_required, is_enabled, description)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
points_required = VALUES(points_required),
is_enabled = VALUES(is_enabled),
description = VALUES(description)
""", (plugin_name, points_required, is_enabled, description))
return True
except Exception as e:
self.logger.error(f"设置插件积分配置失败: {e}")
return False
def get_all_plugin_configs(self) -> List[Dict]:
"""
获取所有插件积分配置
Returns:
所有插件积分配置
"""
try:
return self.execute_query("SELECT * FROM t_plugin_point_config")
except Exception as e:
self.logger.error(f"获取所有插件积分配置失败: {e}")
return []
def check_plugin_points(self, user_id: str, group_id: str, plugin_name: str) -> Tuple[bool, Dict]:
"""
检查用户是否有足够积分使用插件
Args:
user_id: 用户ID
group_id: 群组ID
plugin_name: 插件名称
Returns:
(是否有足够积分, 结果信息)
"""
try:
# 获取插件积分配置
plugin_config = self.get_plugin_config(plugin_name)
# 如果插件未配置或未启用积分限制,直接返回成功
if not plugin_config or not plugin_config["is_enabled"]:
return True, {"message": "插件未配置积分限制"}
# 获取用户积分
user_points = self.get_user_points(user_id, group_id)
# 检查积分是否足够
if user_points["total_points"] < plugin_config["points_required"]:
return False, {
"error": "积分不足",
"current_points": user_points["total_points"],
"required_points": plugin_config["points_required"]
}
return True, {
"message": "积分充足",
"current_points": user_points["total_points"],
"required_points": plugin_config["points_required"]
}
except Exception as e:
self.logger.error(f"检查插件积分失败: {e}")
return True, {"error": str(e)} # 出错时默认允许使用
def use_plugin(self, user_id: str, group_id: str, plugin_name: str) -> Tuple[bool, Dict]:
"""
使用插件并扣除积分
Args:
user_id: 用户ID
group_id: 群组ID
plugin_name: 插件名称
Returns:
(是否成功, 结果信息)
"""
try:
# 先检查积分是否足够
can_use, result = self.check_plugin_points(user_id, group_id, plugin_name)
if not can_use:
return False, result
# 如果插件未配置积分或不需要扣除积分,直接返回成功
if "required_points" not in result or result["required_points"] <= 0:
return True, {"message": "无需扣除积分"}
# 扣除积分
return self.deduct_points(
user_id, group_id, result["required_points"],
PointSource.PLUGIN, f"使用插件: {plugin_name}"
)
except Exception as e:
self.logger.error(f"使用插件扣除积分失败: {e}")
return False, {"error": str(e)}
def get_user_points_stats(self, group_id: str) -> Dict[str, Any]:
"""
获取群组积分统计信息
Args:
group_id: 群组ID
Returns:
统计信息
"""
stats = {
"total_users": 0,
"total_points": 0,
"avg_points": 0,
"max_points": 0,
"min_points": 0,
"checkin_points_total": 0,
"game_points_total": 0,
"other_points_total": 0
}
try:
result = self.execute_query("""
SELECT
COUNT(*) as total_users,
SUM(total_points) as total_points,
AVG(total_points) as avg_points,
MAX(total_points) as max_points,
MIN(total_points) as min_points,
SUM(checkin_points) as checkin_points_total,
SUM(game_points) as game_points_total,
SUM(other_points) as other_points_total
FROM t_user_points
WHERE group_id = %s
""", (group_id,), fetch_one=True)
if result:
stats.update({k: v or 0 for k, v in result.items()})
return stats
except Exception as e:
self.logger.error(f"获取群组积分统计信息失败: {e}")
return stats
def imprison_user(self, user_id: str, group_id: str, hours: int = 24, reason: str = None) -> bool:
"""关押用户
Args:
user_id: 用户ID
group_id: 群组ID
hours: 关押时长(小时)
reason: 关押原因
"""
try:
end_time = datetime.now() + timedelta(hours=hours)
self.execute_update("""
INSERT INTO t_prison_records (user_id, group_id, end_time, reason)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
start_time=CURRENT_TIMESTAMP, end_time=%s, reason=%s, status=1
""", (user_id, group_id, end_time, reason, end_time, reason))
return True
except Exception as e:
self.logger.error(f"关押用户失败: {e}")
return False
def check_prison_status(self, user_id: str, group_id: str) -> Optional[Dict]:
"""检查用户是否在押
Returns:
None: 不在押
Dict: 在押信息
"""
try:
records = self.execute_query("""
SELECT * FROM t_prison_records
WHERE user_id = %s AND group_id = %s AND status = 1
AND end_time > CURRENT_TIMESTAMP
LIMIT 1
""", (user_id, group_id))
return records[0] if records else None
except Exception as e:
self.logger.error(f"检查用户在押状态失败: {e}")
return None
def bailout_user(self, prisoner_id: str, bailout_user_id: str, group_id: str) -> Tuple[bool, str]:
"""保释用户
Returns:
(bool, str): (是否成功, 错误信息)
"""
try:
# 检查是否在押
prison_record = self.check_prison_status(prisoner_id, group_id)
if not prison_record:
return False, "该用户未被关押"
# 扣除保释金
success, result = self.transfer_points(
bailout_user_id, "SYSTEM", group_id, 30, "保释金"
)
if not success:
return False, result.get("error", "保释失败")
# 释放用户
self.execute_update("""
UPDATE t_prison_records
SET status = 0, bailout_user_id = %s, bailout_time = CURRENT_TIMESTAMP
WHERE user_id = %s AND group_id = %s AND status = 1
""", (bailout_user_id, prisoner_id, group_id))
return True, "保释成功"
except Exception as e:
self.logger.error(f"保释用户失败: {e}")
return False, f"保释失败: {str(e)}"