Files
abot/game_task/game_task_encyclopedia.py
2025-02-21 17:07:00 +08:00

334 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import random
import schedule
import time
from datetime import datetime
import pymysql
from game_task.game_chatgpt_qa import game_question_json, game_answer_json
# 数据库连接配置
db_config = {
'host': '192.168.2.32', # 替换为你的MariaDB服务器地址
'user': 'root', # 替换为你的MariaDB用户名
'password': 'lw123456', # 替换为你的MariaDB密码
'database': 'message_archive',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
# 连接数据库
def get_db_connection():
return pymysql.connect(**db_config)
# 添加群聊
def add_group(group_id, player_id):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO t_encyclopedia_groups (group_id) VALUES (%s)",
(group_id,)
)
conn.commit()
return f"{group_id} 已成功添加!"
except pymysql.err.IntegrityError:
return f"{group_id} 已存在!"
finally:
cursor.close()
conn.close()
# 获取所有群聊ID
def get_group_ids():
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT group_id FROM t_encyclopedia_groups")
return [row['group_id'] for row in cursor.fetchall()]
finally:
cursor.close()
conn.close()
# 开始游戏(使用 player_id
def start_game(group_id, player_id, player_name="未知玩家"):
conn = get_db_connection()
cursor = conn.cursor()
try:
# 确保群聊存在
cursor.execute("SELECT group_id FROM t_encyclopedia_groups WHERE group_id = %s", (group_id,))
if not cursor.fetchone():
return f"{group_id} 未注册,请先添加群聊!"
cursor.execute(
"INSERT INTO t_encyclopedia_players (player_id, group_id, player_name) VALUES (%s, %s, %s)",
(player_id, group_id, player_name)
)
conn.commit()
return f"欢迎加入百科全书挑战!{player_name} (ID: {player_id}) 已加入群 {group_id},任务将随机分配,任何人可抢答。"
except pymysql.err.IntegrityError:
cursor.execute(
"SELECT player_name FROM t_encyclopedia_players WHERE group_id = %s AND player_id = %s",
(group_id, player_id)
)
existing_name = cursor.fetchone()['player_name']
return f"你已在群 {group_id} 中加入游戏玩家ID: {player_id},名称: {existing_name}"
finally:
cursor.close()
conn.close()
# 随机分配任务
def assign_random_task(group_id):
conn = get_db_connection()
cursor = conn.cursor()
try:
# 获取当前群聊的玩家列表
cursor.execute("SELECT player_id FROM t_encyclopedia_players WHERE group_id = %s", (group_id,))
players = [row['player_id'] for row in cursor.fetchall()]
if not players:
return f"{group_id} 暂无玩家参与,无法分配任务!"
# 使用内部方法获取任务
task = game_question_json("请出题!")
question = task["question"]
answer = task["answer"]
score = int(task["score"])
description = task.get("description", "")
# 分配给随机玩家
holder_id = random.choice(players)
cursor.execute(
"INSERT INTO t_encyclopedia_active_tasks (group_id, question, answer, score, description, holder_id) VALUES (%s, %s, %s, %s, %s, %s)",
(group_id, question, answer, score, description, holder_id)
)
conn.commit()
# 获取分配的任务ID和持有者名称
cursor.execute(
"SELECT active_task_id FROM t_encyclopedia_active_tasks WHERE group_id = %s AND question = %s AND holder_id = %s ORDER BY assigned_at DESC LIMIT 1",
(group_id, question, holder_id)
)
active_task_id = cursor.fetchone()['active_task_id']
cursor.execute(
"SELECT player_name FROM t_encyclopedia_players WHERE group_id = %s AND player_id = %s",
(group_id, holder_id)
)
holder_name = cursor.fetchone()['player_name']
return f"任务分配给 {holder_name} (ID: {holder_id}){question}(群 {group_id} 所有人可抢答任务ID: task_{active_task_id},积分:{score}"
finally:
cursor.close()
conn.close()
# 提交答案并计分错误扣1分
def submit_answer(group_id, player_id, task_id, answer):
conn = get_db_connection()
cursor = conn.cursor()
try:
# 检查玩家是否存在并获取名称
cursor.execute("SELECT player_name FROM t_encyclopedia_players WHERE group_id = %s AND player_id = %s",
(group_id, player_id))
player_row = cursor.fetchone()
if not player_row:
return f"请先在群 {group_id} 输入 /start 加入游戏玩家ID: {player_id}"
player_name = player_row['player_name']
# 检查任务是否存在并获取任务详情
active_task_id = int(task_id.split('_')[1])
cursor.execute(
"SELECT question, answer, score, holder_id, status FROM t_encyclopedia_active_tasks WHERE group_id = %s AND active_task_id = %s",
(group_id, active_task_id)
)
task_data = cursor.fetchone()
if not task_data or task_data['status'] == 'completed':
return f"{group_id} 中此任务不存在或已完成!"
question, correct_answer_db, top_score, holder_id, _ = task_data['question'], task_data['answer'].lower(), \
task_data['score'], task_data['holder_id'], task_data['status']
# 获取持有者名称
cursor.execute("SELECT player_name FROM t_encyclopedia_players WHERE group_id = %s AND player_id = %s",
(group_id, holder_id))
holder_name = cursor.fetchone()['player_name']
# 调用内部方法校验答案
answer_json = {"question": question, "top_score": str(top_score), "answer": answer}
result = game_answer_json(answer_json)
user_answer = answer.strip().lower()
points = int(result["score"])
correct_answer = result["answer"].lower()
description = result["description"]
is_correct = points > 0
# 记录历史并更新积分
if is_correct:
cursor.execute(
"UPDATE t_encyclopedia_players SET points = points + %s WHERE group_id = %s AND player_id = %s",
(points, group_id, player_id)
)
cursor.execute(
"UPDATE t_encyclopedia_active_tasks SET status = 'completed' WHERE group_id = %s AND active_task_id = %s",
(group_id, active_task_id)
)
message = (
f"{player_name} (ID: {player_id}) 回答正确!任务:{question}\n获得 {points}\n描述:{description}"
if player_id == holder_id
else f"{player_name} (ID: {player_id}) 抢答成功!任务:{question}(原持有者:{holder_name} (ID: {holder_id})\n获得 {points}\n描述:{description}"
)
else:
# 错误扣1分确保积分不低于0
cursor.execute(
"UPDATE t_encyclopedia_players SET points = GREATEST(points - 1, 0) WHERE group_id = %s AND player_id = %s",
(group_id, player_id)
)
points = -1 # 记录惩罚分
message = f"{player_name} (ID: {player_id}) 回答错误!任务:{question}\n你的答案:{answer},正确答案:{correct_answer}\n扣除 1 分\n描述:{description}"
cursor.execute(
"INSERT INTO t_encyclopedia_task_history (group_id, active_task_id, player_id, answer, is_correct, points_earned) VALUES (%s, %s, %s, %s, %s, %s)",
(group_id, active_task_id, player_id, answer, is_correct, points)
)
conn.commit()
return message
finally:
cursor.close()
conn.close()
# 显示排行榜
def show_rank(group_id, player_id):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(
"SELECT player_name, points FROM t_encyclopedia_players WHERE group_id = %s ORDER BY points DESC LIMIT 10",
(group_id,)
)
ranks = cursor.fetchall()
if not ranks:
return f"{group_id} 暂无玩家参与!"
rank_text = f"{group_id} 排行榜Top 10\n"
for i, row in enumerate(ranks, 1):
rank_text += f"{i}. {row['player_name']}: {row['points']}\n"
return rank_text
finally:
cursor.close()
conn.close()
# 显示当前活跃任务
def show_active_tasks(group_id, player_id):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT a.active_task_id, a.question, p.player_name, p.player_id
FROM t_encyclopedia_active_tasks a
JOIN t_encyclopedia_players p ON a.holder_id = p.player_id AND a.group_id = p.group_id
WHERE a.group_id = %s AND a.status = 'pending'
""", (group_id,))
tasks = cursor.fetchall()
if not tasks:
return f"{group_id} 当前没有活跃任务!"
task_text = f"{group_id} 当前活跃任务:\n"
for task in tasks:
task_text += f"任务ID: task_{task['active_task_id']} - {task['question']}(持有者:{task['player_name']} (ID: {task['player_id']})\n"
return task_text
finally:
cursor.close()
conn.close()
# 列举所有未完成任务及其所属者
def list_uncompleted_tasks(group_id, player_id):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT a.active_task_id, a.question, p.player_name, p.player_id
FROM t_encyclopedia_active_tasks a
JOIN t_encyclopedia_players p ON a.holder_id = p.player_id AND a.group_id = p.group_id
WHERE a.group_id = %s AND a.status = 'pending'
""", (group_id,))
tasks = cursor.fetchall()
if not tasks:
return f"{group_id} 当前没有未完成的任务!"
task_text = f"{group_id} 所有未完成任务列表:\n"
for task in tasks:
task_text += f"任务ID: task_{task['active_task_id']} - {task['question']}(所属:{task['player_name']} (ID: {task['player_id']})\n"
return task_text
finally:
cursor.close()
conn.close()
# 定时任务整点触发排除23:00-08:00
def run_random_task_assignment(group_id):
current_hour = datetime.now().hour
if current_hour >= 23 or current_hour < 9: # 排除23:00-08:00
print(f"{datetime.now()}{group_id} 当前时间 {current_hour}:00 在23:00-08:00区间跳过任务发放")
return
result = assign_random_task(group_id)
print(f"{datetime.now()} {result}")
# 处理群聊消息
def game_process_message(group_id, player_id, message, player_name="未知玩家"):
if message == "/start":
return start_game(group_id, player_id, player_name)
elif message == "/tasks":
return show_active_tasks(group_id, player_id)
elif message == "/list":
return list_uncompleted_tasks(group_id, player_id)
elif message.startswith("/answer"):
parts = message.split(" ", 2)
if len(parts) < 3:
return "请使用格式:/answer [任务ID] [答案],如 /answer task_1 钒Vanadium"
task_id, answer = parts[1], parts[2]
return submit_answer(group_id, player_id, task_id, answer)
elif message == "/rank":
return show_rank(group_id, player_id)
elif message.startswith("/addgroup"):
return add_group(group_id, player_id)
elif message == "/gettask":
current_hour = datetime.now().hour
if current_hour >= 23 or current_hour < 9:
return f"当前时间 {current_hour}:00 在23:00-08:00区间无法领取任务"
return assign_random_task(group_id)
else:
return "无效命令!可用:/start, /tasks, /list, /answer [任务ID] [答案], /addgroup, /gettask, /rank"
# 设置定时任务
def setup_schedule():
group_ids = get_group_ids()
for gid in group_ids:
schedule.every().hour.at(":00").do(run_random_task_assignment, group_id=gid)
# 主程序
if __name__ == "__main__":
# 初始化群聊
print(add_group("group1", "admin1"))
print(add_group("group2", "admin1"))
# 初始化玩家
print(game_process_message("group1", "player1001", "/start", "玩家1"))
print(game_process_message("group1", "player1002", "/start", "玩家2"))
print(game_process_message("group2", "player2001", "/start", "玩家A"))
print(game_process_message("group2", "player2002", "/start", "玩家B"))
# 设置调度
setup_schedule()
# 测试 /gettask
print(game_process_message("group1", "player1001", "/gettask"))
while True:
schedule.run_pending()
time.sleep(1)