import random import schedule import time from datetime import datetime import pymysql from game_task.game_chatgpt_qa import game_question_json, game_answer_json # 数据库连接配置 db_config = { 'host': '192.168.2.32', # 替换为你的MariaDB服务器地址 'user': 'root', # 替换为你的MariaDB用户名 'password': 'lw123456', # 替换为你的MariaDB密码 'database': 'message_archive', 'charset': 'utf8mb4', 'cursorclass': pymysql.cursors.DictCursor } # 连接数据库 def get_db_connection(): return pymysql.connect(**db_config) # 添加群聊 def add_group(group_id): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute( "INSERT INTO t_encyclopedia_groups (group_id) VALUES (%s)", (group_id,) ) conn.commit() return f"群 {group_id} 已成功添加!" except pymysql.err.IntegrityError: return f"群 {group_id} 已存在!" finally: cursor.close() conn.close() # 获取所有群聊ID def get_group_ids(): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute("SELECT group_id FROM t_encyclopedia_groups") return [row['group_id'] for row in cursor.fetchall()] finally: cursor.close() conn.close() # 开始游戏(使用 player_id) def start_game(group_id, player_id, player_name="未知玩家"): conn = get_db_connection() cursor = conn.cursor() try: # 确保群聊存在 cursor.execute("SELECT group_id FROM t_encyclopedia_groups WHERE group_id = %s", (group_id,)) if not cursor.fetchone(): return f"群 {group_id} 未注册,请先添加群聊!" cursor.execute( "INSERT INTO t_encyclopedia_players (player_id, group_id, player_name) VALUES (%s, %s, %s)", (player_id, group_id, player_name) ) conn.commit() return f"欢迎加入百科全书挑战!{player_name} (ID: {player_id}) 已加入群 {group_id},任务将随机分配,任何人可抢答。" except pymysql.err.IntegrityError: cursor.execute( "SELECT player_name FROM t_encyclopedia_players WHERE group_id = %s AND player_id = %s", (group_id, player_id) ) existing_name = cursor.fetchone()['player_name'] return f"你已在群 {group_id} 中加入游戏!玩家ID: {player_id},名称: {existing_name}" finally: cursor.close() conn.close() # 随机分配任务 def assign_random_task(group_id): conn = get_db_connection() cursor = conn.cursor() try: # 获取当前群聊的玩家列表 cursor.execute("SELECT player_id FROM t_encyclopedia_players WHERE group_id = %s", (group_id,)) players = [row['player_id'] for row in cursor.fetchall()] if not players: return f"群 {group_id} 暂无玩家参与,无法分配任务!" # 使用内部方法获取任务 task = game_question_json("请出题!") question = task["question"] answer = task["answer"] score = int(task["score"]) description = task.get("description", "") # 分配给随机玩家 holder_id = random.choice(players) cursor.execute( "INSERT INTO t_encyclopedia_active_tasks (group_id, question, answer, score, description, holder_id) VALUES (%s, %s, %s, %s, %s, %s)", (group_id, question, answer, score, description, holder_id) ) conn.commit() # 获取分配的任务ID和持有者名称 cursor.execute( "SELECT active_task_id FROM t_encyclopedia_active_tasks WHERE group_id = %s AND question = %s AND holder_id = %s ORDER BY assigned_at DESC LIMIT 1", (group_id, question, holder_id) ) active_task_id = cursor.fetchone()['active_task_id'] cursor.execute( "SELECT player_name FROM t_encyclopedia_players WHERE group_id = %s AND player_id = %s", (group_id, holder_id) ) holder_name = cursor.fetchone()['player_name'] return f"任务分配给 {holder_name} (ID: {holder_id}):{question}(群 {group_id} 所有人可抢答!任务ID: task_{active_task_id},积分:{score})" finally: cursor.close() conn.close() # 提交答案并计分 def submit_answer(group_id, player_id, task_id, answer): conn = get_db_connection() cursor = conn.cursor() try: # 检查玩家是否存在并获取名称 cursor.execute("SELECT player_name FROM t_encyclopedia_players WHERE group_id = %s AND player_id = %s", (group_id, player_id)) player_row = cursor.fetchone() if not player_row: return f"请先在群 {group_id} 输入 /start 加入游戏!玩家ID: {player_id}" player_name = player_row['player_name'] # 检查任务是否存在并获取任务详情 active_task_id = int(task_id.split('_')[1]) cursor.execute( "SELECT question, answer, score, holder_id, status FROM t_encyclopedia_active_tasks WHERE group_id = %s AND active_task_id = %s", (group_id, active_task_id) ) task_data = cursor.fetchone() if not task_data or task_data['status'] == 'completed': return f"群 {group_id} 中此任务不存在或已完成!" question, correct_answer_db, top_score, holder_id, _ = task_data['question'], task_data['answer'].lower(), \ task_data['score'], task_data['holder_id'], task_data['status'] # 获取持有者名称 cursor.execute("SELECT player_name FROM t_encyclopedia_players WHERE group_id = %s AND player_id = %s", (group_id, holder_id)) holder_name = cursor.fetchone()['player_name'] # 调用内部方法校验答案 answer_json = {"question": question, "top_score": str(top_score), "answer": answer} result = game_answer_json(answer_json) user_answer = answer.strip().lower() points = int(result["score"]) correct_answer = result["answer"].lower() description = result["description"] is_correct = points > 0 # 记录历史 cursor.execute( "INSERT INTO t_encyclopedia_task_history (group_id, active_task_id, player_id, answer, is_correct, points_earned) VALUES (%s, %s, %s, %s, %s, %s)", (group_id, active_task_id, player_id, answer, is_correct, points) ) if is_correct: # 更新玩家积分 cursor.execute( "UPDATE t_encyclopedia_players SET points = points + %s WHERE group_id = %s AND player_id = %s", (points, group_id, player_id) ) # 标记任务完成 cursor.execute( "UPDATE t_encyclopedia_active_tasks SET status = 'completed' WHERE group_id = %s AND active_task_id = %s", (group_id, active_task_id) ) conn.commit() if player_id == holder_id: return f"{player_name} (ID: {player_id}) 回答正确!任务:{question}\n获得 {points} 分\n描述:{description}" else: return f"{player_name} (ID: {player_id}) 抢答成功!任务:{question}(原持有者:{holder_name} (ID: {holder_id}))\n获得 {points} 分\n描述:{description}" else: conn.commit() return f"{player_name} (ID: {player_id}) 回答错误!任务:{question}\n你的答案:{answer},正确答案:{correct_answer}\n描述:{description}" finally: cursor.close() conn.close() # 显示排行榜 def show_rank(group_id): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute( "SELECT player_name, points FROM t_encyclopedia_players WHERE group_id = %s ORDER BY points DESC LIMIT 10", (group_id,) ) ranks = cursor.fetchall() if not ranks: return f"群 {group_id} 暂无玩家参与!" rank_text = f"群 {group_id} 排行榜(Top 10)\n" for i, row in enumerate(ranks, 1): rank_text += f"{i}. {row['player_name']}: {row['points']} 分\n" return rank_text finally: cursor.close() conn.close() # 显示当前活跃任务 def show_active_tasks(group_id): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute(""" SELECT a.active_task_id, a.question, p.player_name, p.player_id FROM t_encyclopedia_active_tasks a JOIN t_encyclopedia_players p ON a.holder_id = p.player_id AND a.group_id = p.group_id WHERE a.group_id = %s AND a.status = 'pending' """, (group_id,)) tasks = cursor.fetchall() if not tasks: return f"群 {group_id} 当前没有活跃任务!" task_text = f"群 {group_id} 当前活跃任务:\n" for task in tasks: task_text += f"任务ID: task_{task['active_task_id']} - {task['question']}(持有者:{task['player_name']} (ID: {task['player_id']}))\n" return task_text finally: cursor.close() conn.close() # 列举所有未完成任务及其所属者 def list_uncompleted_tasks(group_id): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute(""" SELECT a.active_task_id, a.question, p.player_name, p.player_id FROM t_encyclopedia_active_tasks a JOIN t_encyclopedia_players p ON a.holder_id = p.player_id AND a.group_id = p.group_id WHERE a.group_id = %s AND a.status = 'pending' """, (group_id,)) tasks = cursor.fetchall() if not tasks: return f"群 {group_id} 当前没有未完成的任务!" task_text = f"群 {group_id} 所有未完成任务列表:\n" for task in tasks: task_text += f"任务ID: task_{task['active_task_id']} - {task['question']}(所属:{task['player_name']} (ID: {task['player_id']}))\n" return task_text finally: cursor.close() conn.close() # 定时任务:整点触发,排除23:00-08:00 def run_random_task_assignment(group_id): current_hour = datetime.now().hour if current_hour >= 23 or current_hour < 9: # 排除23:00-08:00 print(f"{datetime.now()} 群 {group_id} 当前时间 {current_hour}:00 在23:00-08:00区间,跳过任务发放") return result = assign_random_task(group_id) print(f"{datetime.now()} {result}") # 处理群聊消息 def game_process_message(group_id, player_id, message, player_name="未知玩家"): if message == "/start": return start_game(group_id, player_id, player_name) elif message == "/tasks": return show_active_tasks(group_id) elif message == "/list": return list_uncompleted_tasks(group_id) elif message.startswith("/answer"): parts = message.split(" ", 2) if len(parts) < 3: return "请使用格式:/answer [任务ID] [答案],如 /answer task_1 钒(Vanadium)" task_id, answer = parts[1], parts[2] return submit_answer(group_id, player_id, task_id, answer) elif message == "/rank": return show_rank(group_id) elif message.startswith("/addgroup"): return add_group(group_id) else: return "无效命令!可用:/start, /tasks, /list, /answer [任务ID] [答案], /addgroup, /rank" # 设置定时任务 def setup_schedule(): group_ids = get_group_ids() for gid in group_ids: schedule.every().hour.at(":00").do(run_random_task_assignment, group_id=gid) # 主程序 if __name__ == "__main__": # 初始化群聊 print(add_group(1)) print(add_group(2)) # 初始化玩家 print(process_message(1, 1001, "/start", "玩家1")) print(process_message(1, 1002, "/start", "玩家2")) print(process_message(2, 2001, "/start", "玩家A")) print(process_message(2, 2002, "/start", "玩家B")) # 设置调度 setup_schedule() while True: schedule.run_pending() time.sleep(1)