feature:新增群百科游戏,用于促进群活跃。加入了新的指令 可用:/start, /tasks, /list, /answer [任务ID] [答案], /addgroup, /rank

This commit is contained in:
liuwei
2025-02-21 16:32:51 +08:00
parent 84f8a88ba4
commit 08e71722e8
5 changed files with 404 additions and 5 deletions

View File

@@ -0,0 +1,67 @@
import requests
import json
# 解析JSON
def extract_content(data_string):
try:
data = json.loads(data_string)
# 提取content字段
content = data["choices"][0]["message"].get("content", "")
return content
except json.JSONDecodeError:
print("Invalid JSON")
return None
def message_task_json(prompt, content):
# 设置Authorization和URL
authorization = "46a5674a-e978-491b-a810-5d54605f2c36" # 请替换为真实的Authorization token
url = 'http://127.0.0.1:8080/v1/chat/completions'
data = {
# "stream": True,
"model": "windsurf/gpt4o",
"messages": [
{
"role": "system",
"content": f"{prompt}"
},
{
"role": "user",
"content": f"{content}"
}
]
}
# 设置请求头
headers = {
"Content-Type": "application/json; charset=utf-8",
"Authorization": authorization
}
# 发送POST请求
response = requests.post(url, headers=headers, data=json.dumps(data), )
response.encoding = 'utf-8'
# 输出响应内容
print(response.status_code)
print(response.text)
return extract_content(response.text)
def game_question_json(question):
prompt = "你是一个益智百科问答大师可以随时提出百科类的问题问题需要有一定的难度回答完毕之后用户能有所收获并且对问题进行打分同时根据问题难度告知答对之后给多少分1-10请只返回JSON格式的内容格式要求如下{\"question\": \"哪个国家最早将玫瑰与爱情联系起来?\", \"score\":\"1\", \"answer\": \"波斯\",\"description\":\"描述问题答案的原因\"}"
return message_task_json(prompt, question)
def game_answer_json(answar):
prompt = "你是一个益智百科问答大师,可以根据用户回答的答案进行判断,并且对问题(question)答案(answer)进行打分,打分时请参考最高分要求(top_score)告知用户能获得多少分请在description中描述打分理由请只返回JSON格式的内容格式要求如下{\"question\": \"哪个国家最早将玫瑰与爱情联系起来?\", \"score\":\"1\", \"answer\": \"波斯\",\"description\":\"描述问题答案的原因\"}"
return message_task_json(prompt, answar)
if __name__ == '__main__':
print(game_question_json('请出题!'))
print(game_answer_json('question:哪个国家的节日与裸体狂欢有关?,answer:古罗马,top_score:3'))

View File

@@ -0,0 +1,321 @@
import random
import schedule
import time
from datetime import datetime
import pymysql
from game_task.game_chatgpt_qa import game_question_json, game_answer_json
# 数据库连接配置
db_config = {
'host': '192.168.2.32', # 替换为你的MariaDB服务器地址
'user': 'root', # 替换为你的MariaDB用户名
'password': 'lw123456', # 替换为你的MariaDB密码
'database': 'message_archive',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
# 连接数据库
def get_db_connection():
return pymysql.connect(**db_config)
# 添加群聊
def add_group(group_id):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO t_encyclopedia_groups (group_id) VALUES (%s)",
(group_id,)
)
conn.commit()
return f"{group_id} 已成功添加!"
except pymysql.err.IntegrityError:
return f"{group_id} 已存在!"
finally:
cursor.close()
conn.close()
# 获取所有群聊ID
def get_group_ids():
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT group_id FROM t_encyclopedia_groups")
return [row['group_id'] for row in cursor.fetchall()]
finally:
cursor.close()
conn.close()
# 开始游戏(使用 player_id
def start_game(group_id, player_id, player_name="未知玩家"):
conn = get_db_connection()
cursor = conn.cursor()
try:
# 确保群聊存在
cursor.execute("SELECT group_id FROM t_encyclopedia_groups WHERE group_id = %s", (group_id,))
if not cursor.fetchone():
return f"{group_id} 未注册,请先添加群聊!"
cursor.execute(
"INSERT INTO t_encyclopedia_players (player_id, group_id, player_name) VALUES (%s, %s, %s)",
(player_id, group_id, player_name)
)
conn.commit()
return f"欢迎加入百科全书挑战!{player_name} (ID: {player_id}) 已加入群 {group_id},任务将随机分配,任何人可抢答。"
except pymysql.err.IntegrityError:
cursor.execute(
"SELECT player_name FROM t_encyclopedia_players WHERE group_id = %s AND player_id = %s",
(group_id, player_id)
)
existing_name = cursor.fetchone()['player_name']
return f"你已在群 {group_id} 中加入游戏玩家ID: {player_id},名称: {existing_name}"
finally:
cursor.close()
conn.close()
# 随机分配任务
def assign_random_task(group_id):
conn = get_db_connection()
cursor = conn.cursor()
try:
# 获取当前群聊的玩家列表
cursor.execute("SELECT player_id FROM t_encyclopedia_players WHERE group_id = %s", (group_id,))
players = [row['player_id'] for row in cursor.fetchall()]
if not players:
return f"{group_id} 暂无玩家参与,无法分配任务!"
# 使用内部方法获取任务
task = game_question_json("请出题!")
question = task["question"]
answer = task["answer"]
score = int(task["score"])
description = task.get("description", "")
# 分配给随机玩家
holder_id = random.choice(players)
cursor.execute(
"INSERT INTO t_encyclopedia_active_tasks (group_id, question, answer, score, description, holder_id) VALUES (%s, %s, %s, %s, %s, %s)",
(group_id, question, answer, score, description, holder_id)
)
conn.commit()
# 获取分配的任务ID和持有者名称
cursor.execute(
"SELECT active_task_id FROM t_encyclopedia_active_tasks WHERE group_id = %s AND question = %s AND holder_id = %s ORDER BY assigned_at DESC LIMIT 1",
(group_id, question, holder_id)
)
active_task_id = cursor.fetchone()['active_task_id']
cursor.execute(
"SELECT player_name FROM t_encyclopedia_players WHERE group_id = %s AND player_id = %s",
(group_id, holder_id)
)
holder_name = cursor.fetchone()['player_name']
return f"任务分配给 {holder_name} (ID: {holder_id}){question}(群 {group_id} 所有人可抢答任务ID: task_{active_task_id},积分:{score}"
finally:
cursor.close()
conn.close()
# 提交答案并计分
def submit_answer(group_id, player_id, task_id, answer):
conn = get_db_connection()
cursor = conn.cursor()
try:
# 检查玩家是否存在并获取名称
cursor.execute("SELECT player_name FROM t_encyclopedia_players WHERE group_id = %s AND player_id = %s",
(group_id, player_id))
player_row = cursor.fetchone()
if not player_row:
return f"请先在群 {group_id} 输入 /start 加入游戏玩家ID: {player_id}"
player_name = player_row['player_name']
# 检查任务是否存在并获取任务详情
active_task_id = int(task_id.split('_')[1])
cursor.execute(
"SELECT question, answer, score, holder_id, status FROM t_encyclopedia_active_tasks WHERE group_id = %s AND active_task_id = %s",
(group_id, active_task_id)
)
task_data = cursor.fetchone()
if not task_data or task_data['status'] == 'completed':
return f"{group_id} 中此任务不存在或已完成!"
question, correct_answer_db, top_score, holder_id, _ = task_data['question'], task_data['answer'].lower(), \
task_data['score'], task_data['holder_id'], task_data['status']
# 获取持有者名称
cursor.execute("SELECT player_name FROM t_encyclopedia_players WHERE group_id = %s AND player_id = %s",
(group_id, holder_id))
holder_name = cursor.fetchone()['player_name']
# 调用内部方法校验答案
answer_json = {"question": question, "top_score": str(top_score), "answer": answer}
result = game_answer_json(answer_json)
user_answer = answer.strip().lower()
points = int(result["score"])
correct_answer = result["answer"].lower()
description = result["description"]
is_correct = points > 0
# 记录历史
cursor.execute(
"INSERT INTO t_encyclopedia_task_history (group_id, active_task_id, player_id, answer, is_correct, points_earned) VALUES (%s, %s, %s, %s, %s, %s)",
(group_id, active_task_id, player_id, answer, is_correct, points)
)
if is_correct:
# 更新玩家积分
cursor.execute(
"UPDATE t_encyclopedia_players SET points = points + %s WHERE group_id = %s AND player_id = %s",
(points, group_id, player_id)
)
# 标记任务完成
cursor.execute(
"UPDATE t_encyclopedia_active_tasks SET status = 'completed' WHERE group_id = %s AND active_task_id = %s",
(group_id, active_task_id)
)
conn.commit()
if player_id == holder_id:
return f"{player_name} (ID: {player_id}) 回答正确!任务:{question}\n获得 {points}\n描述:{description}"
else:
return f"{player_name} (ID: {player_id}) 抢答成功!任务:{question}(原持有者:{holder_name} (ID: {holder_id})\n获得 {points}\n描述:{description}"
else:
conn.commit()
return f"{player_name} (ID: {player_id}) 回答错误!任务:{question}\n你的答案:{answer},正确答案:{correct_answer}\n描述:{description}"
finally:
cursor.close()
conn.close()
# 显示排行榜
def show_rank(group_id):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(
"SELECT player_name, points FROM t_encyclopedia_players WHERE group_id = %s ORDER BY points DESC LIMIT 10",
(group_id,)
)
ranks = cursor.fetchall()
if not ranks:
return f"{group_id} 暂无玩家参与!"
rank_text = f"{group_id} 排行榜Top 10\n"
for i, row in enumerate(ranks, 1):
rank_text += f"{i}. {row['player_name']}: {row['points']}\n"
return rank_text
finally:
cursor.close()
conn.close()
# 显示当前活跃任务
def show_active_tasks(group_id):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT a.active_task_id, a.question, p.player_name, p.player_id
FROM t_encyclopedia_active_tasks a
JOIN t_encyclopedia_players p ON a.holder_id = p.player_id AND a.group_id = p.group_id
WHERE a.group_id = %s AND a.status = 'pending'
""", (group_id,))
tasks = cursor.fetchall()
if not tasks:
return f"{group_id} 当前没有活跃任务!"
task_text = f"{group_id} 当前活跃任务:\n"
for task in tasks:
task_text += f"任务ID: task_{task['active_task_id']} - {task['question']}(持有者:{task['player_name']} (ID: {task['player_id']})\n"
return task_text
finally:
cursor.close()
conn.close()
# 列举所有未完成任务及其所属者
def list_uncompleted_tasks(group_id):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT a.active_task_id, a.question, p.player_name, p.player_id
FROM t_encyclopedia_active_tasks a
JOIN t_encyclopedia_players p ON a.holder_id = p.player_id AND a.group_id = p.group_id
WHERE a.group_id = %s AND a.status = 'pending'
""", (group_id,))
tasks = cursor.fetchall()
if not tasks:
return f"{group_id} 当前没有未完成的任务!"
task_text = f"{group_id} 所有未完成任务列表:\n"
for task in tasks:
task_text += f"任务ID: task_{task['active_task_id']} - {task['question']}(所属:{task['player_name']} (ID: {task['player_id']})\n"
return task_text
finally:
cursor.close()
conn.close()
# 定时任务整点触发排除23:00-08:00
def run_random_task_assignment(group_id):
current_hour = datetime.now().hour
if current_hour >= 23 or current_hour < 9: # 排除23:00-08:00
print(f"{datetime.now()}{group_id} 当前时间 {current_hour}:00 在23:00-08:00区间跳过任务发放")
return
result = assign_random_task(group_id)
print(f"{datetime.now()} {result}")
# 处理群聊消息
def process_message(group_id, player_id, message, player_name="未知玩家"):
if message == "/start":
return start_game(group_id, player_id, player_name)
elif message == "/tasks":
return show_active_tasks(group_id)
elif message == "/list":
return list_uncompleted_tasks(group_id)
elif message.startswith("/answer"):
parts = message.split(" ", 2)
if len(parts) < 3:
return "请使用格式:/answer [任务ID] [答案],如 /answer task_1 钒Vanadium"
task_id, answer = parts[1], parts[2]
return submit_answer(group_id, player_id, task_id, answer)
elif message == "/rank":
return show_rank(group_id)
elif message.startswith("/addgroup"):
return add_group(group_id)
else:
return "无效命令!可用:/start, /tasks, /list, /answer [任务ID] [答案], /addgroup, /rank"
# 设置定时任务
def setup_schedule():
group_ids = get_group_ids()
for gid in group_ids:
schedule.every().hour.at(":00").do(run_random_task_assignment, group_id=gid)
# 主程序
if __name__ == "__main__":
# 初始化群聊
print(add_group(1))
print(add_group(2))
# 初始化玩家
print(process_message(1, 1001, "/start", "玩家1"))
print(process_message(1, 1002, "/start", "玩家2"))
print(process_message(2, 2001, "/start", "玩家A"))
print(process_message(2, 2002, "/start", "玩家B"))
# 设置调度
setup_schedule()
while True:
schedule.run_pending()
time.sleep(1)