feature: 数据库连接与SQL集中管理,提高代码可读性

This commit is contained in:
liuwei
2025-03-18 10:23:43 +08:00
parent 00d9a1d8eb
commit fd1676b908
4 changed files with 184 additions and 317 deletions

View File

@@ -1,79 +1,51 @@
import random
from datetime import datetime
import pymysql
from typing import Dict, List, Optional
from game_task.game_chatgpt_qa import game_question_json, game_answer_json
from db.connection import DBConnectionManager
from db.encyclopedia import EncyclopediaDB
# 数据库连接配置
db_config = {
'host': '192.168.2.32',
'user': 'root',
'password': 'lw123456',
'database': 'message_archive',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
# 连接数据库
def get_db_connection():
return pymysql.connect(**db_config)
# 获取数据库连接管理器的单例
db_manager = DBConnectionManager()
encyclopedia_db = EncyclopediaDB(db_manager)
# 添加群聊
def add_group(group_id, player_id):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO t_encyclopedia_groups (group_id) VALUES (%s)",
(group_id,)
)
conn.commit()
message = f"🎉 群 {group_id} 已就位,准备开燥!"
result = encyclopedia_db.add_group(group_id)
if result:
message = f"🎉 群 {group_id} 已就位,准备开燥!"
else:
message = f"🌟 群 {group_id} 早就蓄势待发啦!"
return {"message": message, "player_id": player_id}
except pymysql.err.IntegrityError:
except Exception as e:
print(f"添加群聊出错: {e}")
message = f"🌟 群 {group_id} 早就蓄势待发啦!"
return {"message": message, "player_id": player_id}
finally:
cursor.close()
conn.close()
# 获取所有群聊ID
def get_group_ids():
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT group_id FROM t_encyclopedia_groups")
return [row['group_id'] for row in cursor.fetchall()]
finally:
cursor.close()
conn.close()
return encyclopedia_db.get_all_groups()
except Exception as e:
print(f"获取群聊ID出错: {e}")
return []
# 确保游戏启动(自动初始化群聊和玩家)
def ensure_game_started(group_id, player_id, player_name="未知玩家"):
conn = get_db_connection()
cursor = conn.cursor()
try:
# 检查并添加群聊
cursor.execute("SELECT group_id FROM t_encyclopedia_groups WHERE group_id = %s", (group_id,))
if not cursor.fetchone():
if not encyclopedia_db.check_group_exists(group_id):
add_group(group_id, player_id)
# 检查并添加玩家
cursor.execute(
"SELECT player_name FROM t_encyclopedia_players WHERE group_id = %s AND player_id = %s",
(group_id, player_id)
)
existing_player = cursor.fetchone()
if not existing_player:
cursor.execute(
"INSERT INTO t_encyclopedia_players (player_id, group_id, player_name) VALUES (%s, %s, %s)",
(player_id, group_id, player_name)
)
conn.commit()
player = encyclopedia_db.get_player(player_id, group_id)
if not player:
encyclopedia_db.add_player(player_id, group_id, player_name)
message = (
f"🎉 哇塞,{player_name} 你来啦!\n"
f"🌟 群 {group_id} 瞬间燃爆!\n"
@@ -81,18 +53,17 @@ def ensure_game_started(group_id, player_id, player_name="未知玩家"):
)
return {"message": message, "player_id": player_id}
return {"message": None, "player_id": player_id}
finally:
cursor.close()
conn.close()
except Exception as e:
print(f"确保游戏启动出错: {e}")
return {"message": None, "player_id": player_id}
# 随机分配任务
def assign_random_task(group_id, player_id=None):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT player_id, player_name FROM t_encyclopedia_players WHERE group_id = %s", (group_id,))
players = cursor.fetchall()
# 获取群内所有玩家
players = encyclopedia_db.get_all_players_in_group(group_id)
if not players:
message = (
f"😔 哎呀,群 {group_id} 静悄悄\n"
@@ -122,17 +93,14 @@ def assign_random_task(group_id, player_id=None):
score = int(task["score"])
description = task.get("description", "")
cursor.execute(
"INSERT INTO t_encyclopedia_active_tasks (group_id, question, answer, score, description, holder_id) VALUES (%s, %s, %s, %s, %s, %s)",
(group_id, question, answer, score, description, holder_id)
# 创建活跃任务
active_task_id = encyclopedia_db.create_active_task(
group_id, question, answer, score, description, holder_id
)
conn.commit()
cursor.execute(
"SELECT active_task_id FROM t_encyclopedia_active_tasks WHERE group_id = %s AND question = %s AND holder_id = %s ORDER BY assigned_at DESC LIMIT 1",
(group_id, question, holder_id)
)
active_task_id = cursor.fetchone()['active_task_id']
if not active_task_id:
message = f"😔 任务创建失败,请稍后再试!"
return {"message": message, "player_id": player_id}
if player_id:
message = (
@@ -152,27 +120,25 @@ def assign_random_task(group_id, player_id=None):
f"🌈 抢答格式:/a {active_task_id} 答案"
)
return {"message": message, "player_id": holder_id}
finally:
cursor.close()
conn.close()
except Exception as e:
print(f"分配任务出错: {e}")
message = f"😔 任务分配出错,请稍后再试!"
return {"message": message, "player_id": player_id}
# 提交答案并计分
def submit_answer(group_id, player_id, task_id, answer):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT player_name FROM t_encyclopedia_players WHERE group_id = %s AND player_id = %s",
(group_id, player_id))
player_row = cursor.fetchone()
if not player_row:
# 获取玩家信息
player = encyclopedia_db.get_player(player_id, group_id)
if not player:
message = (
f"😅 嘿,{player_id} 你是路人甲吗?\n"
f"🌟 用 /s 先加入群 {group_id} 吧!"
)
return {"message": message, "player_id": player_id}
player_name = player_row['player_name']
player_name = player['player_name']
if not task_id.isdigit():
message = (
@@ -184,11 +150,9 @@ def submit_answer(group_id, player_id, task_id, answer):
active_task_id = int(task_id)
cursor.execute(
"SELECT question, answer, score, holder_id, status FROM t_encyclopedia_active_tasks WHERE group_id = %s AND active_task_id = %s",
(group_id, active_task_id)
)
task_data = cursor.fetchone()
# 获取任务信息
task_data = encyclopedia_db.get_task_by_id(group_id, active_task_id)
if not task_data:
message = (
f"😔 哎哟,任务 task_{active_task_id} 不翼而飞啦!\n"
@@ -204,12 +168,14 @@ def submit_answer(group_id, player_id, task_id, answer):
)
return {"message": message, "player_id": player_id}
question, correct_answer_db, top_score, holder_id, _ = task_data['question'], task_data['answer'].lower(), \
task_data['score'], task_data['holder_id'], task_data['status']
question = task_data['question']
correct_answer_db = task_data['answer'].lower()
top_score = task_data['score']
holder_id = task_data['holder_id']
cursor.execute("SELECT player_name FROM t_encyclopedia_players WHERE group_id = %s AND player_id = %s",
(group_id, holder_id))
holder_name = cursor.fetchone()['player_name']
# 获取任务持有者信息
holder = encyclopedia_db.get_task_holder(group_id, holder_id)
holder_name = holder['player_name'] if holder else "未知玩家"
answer_json = {"question": question, "top_score": str(top_score), "answer": answer}
result = game_answer_json(answer_json)
@@ -218,14 +184,12 @@ def submit_answer(group_id, player_id, task_id, answer):
is_correct = points > 0
if is_correct:
cursor.execute(
"UPDATE t_encyclopedia_players SET points = points + %s WHERE group_id = %s AND player_id = %s",
(points, group_id, player_id)
)
cursor.execute(
"UPDATE t_encyclopedia_active_tasks SET status = 'completed' WHERE group_id = %s AND active_task_id = %s",
(group_id, active_task_id)
)
# 更新玩家积分
encyclopedia_db.update_player_points(player_id, group_id, points)
# 完成任务
encyclopedia_db.complete_task(active_task_id)
if player_id == holder_id:
message = (
f"🎉 {player_name} 你是天才吗?\n"
@@ -243,10 +207,8 @@ def submit_answer(group_id, player_id, task_id, answer):
f"🎀 彩蛋:{description}"
)
else:
cursor.execute(
"UPDATE t_encyclopedia_players SET points = GREATEST(points - 1, 0) WHERE group_id = %s AND player_id = %s",
(group_id, player_id)
)
# 扣除积分
encyclopedia_db.update_player_points(player_id, group_id, -1)
points = -1
message = (
f"😅 {player_name} 你这是要笑死我吗?\n"
@@ -258,11 +220,10 @@ def submit_answer(group_id, player_id, task_id, answer):
f"🌟 任务ID: {active_task_id} 还能抢救一下!"
)
cursor.execute(
"INSERT INTO t_encyclopedia_task_history (group_id, active_task_id, player_id, answer, is_correct, points_earned) VALUES (%s, %s, %s, %s, %s, %s)",
(group_id, active_task_id, player_id, answer, is_correct, points)
# 添加任务历史记录
encyclopedia_db.add_task_history(
group_id, active_task_id, player_id, answer, is_correct, points
)
conn.commit()
return {"message": message, "player_id": player_id}
except ValueError as e:
@@ -281,54 +242,44 @@ def submit_answer(group_id, player_id, task_id, answer):
f"🎈 再试一次吧!"
)
return {"message": message, "player_id": player_id}
finally:
cursor.close()
conn.close()
# 显示排行榜
def show_rank(group_id, player_id):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(
"SELECT player_name, points FROM t_encyclopedia_players WHERE group_id = %s ORDER BY points DESC LIMIT 10",
(group_id,)
)
ranks = cursor.fetchall()
# 获取排行榜
ranks = encyclopedia_db.get_player_ranking(group_id, 10)
if not ranks:
message = (
f"😔 群 {group_id} 冷冷清清\n"
f"🌟 快来一起燥起来吧!"
)
return {"message": message, "player_id": player_id}
rank_text = f"🎉 群 {group_id} 排行榜Top 10来啦\n"
for i, row in enumerate(ranks, 1):
rank_text += f"🐓 {i}. {row['player_name']}: {row['points']}\n"
return {"message": rank_text, "player_id": player_id}
finally:
cursor.close()
conn.close()
except Exception as e:
print(f"显示排行榜出错: {e}")
message = f"😔 获取排行榜出错,请稍后再试!"
return {"message": message, "player_id": player_id}
# 显示当前活跃任务
def show_active_tasks(group_id, player_id):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT a.active_task_id, a.question, p.player_name, p.player_id
FROM t_encyclopedia_active_tasks a
JOIN t_encyclopedia_players p ON a.holder_id = p.player_id AND a.group_id = p.group_id
WHERE a.group_id = %s AND a.status = 'pending'
""", (group_id,))
tasks = cursor.fetchall()
# 获取活跃任务
tasks = encyclopedia_db.get_active_tasks_in_group(group_id)
if not tasks:
message = (
f"😄 群 {group_id} 现在一片祥和\n"
f"🌟 没任务?快用 /t 搞一个!"
)
return {"message": message, "player_id": player_id}
task_text = f"🎉 群 {group_id} 活跃任务速递:\n"
for task in tasks:
task_text += (
@@ -337,29 +288,25 @@ def show_active_tasks(group_id, player_id):
f"🌼 大佬:{task['player_name']}\n"
)
return {"message": task_text, "player_id": player_id}
finally:
cursor.close()
conn.close()
except Exception as e:
print(f"显示活跃任务出错: {e}")
message = f"😔 获取活跃任务出错,请稍后再试!"
return {"message": message, "player_id": player_id}
# 列举所有未完成任务
def list_uncompleted_tasks(group_id, player_id):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT a.active_task_id, a.question, p.player_name, p.player_id
FROM t_encyclopedia_active_tasks a
JOIN t_encyclopedia_players p ON a.holder_id = p.player_id AND a.group_id = p.group_id
WHERE a.group_id = %s AND a.status = 'pending'
""", (group_id,))
tasks = cursor.fetchall()
# 获取未完成任务
tasks = encyclopedia_db.get_active_tasks_in_group(group_id)
if not tasks:
message = (
f"😄 群 {group_id} 全员开挂?\n"
f"🌟 没未完成任务,快用 /t 再战!"
)
return {"message": message, "player_id": player_id}
task_text = f"🎉 群 {group_id} 未完成任务大曝光:\n"
for task in tasks:
task_text += (
@@ -368,9 +315,10 @@ def list_uncompleted_tasks(group_id, player_id):
f"🌼 主人:{task['player_name']}\n"
)
return {"message": task_text, "player_id": player_id}
finally:
cursor.close()
conn.close()
except Exception as e:
print(f"列举未完成任务出错: {e}")
message = f"😔 获取未完成任务出错,请稍后再试!"
return {"message": message, "player_id": player_id}
# 定时任务整点触发排除23:00-08:00
@@ -477,4 +425,4 @@ if __name__ == "__main__":
# 测试用例
print(game_process_message("45317011307@chatroom", "Jyunere", "/t")) # 新用户获取任务
print(game_process_message("45317011307@chatroom", "Jyunere", "/a 18 罗马斗兽场")) # 提交答案
print(game_process_message("45317011307@chatroom", "Jyunere", "/r")) # 查看排行榜
print(game_process_message("45317011307@chatroom", "Jyunere", "/r")) # 查看排行榜