Files
abot/task/task_list.py

157 lines
4.3 KiB
Python

import pymysql
import schedule
import json
from task.message_task_json import message_task_json
# 配置数据库连接
db_config = {
'host': '192.168.2.32', # 替换为你的MariaDB服务器地址
'user': 'root', # 替换为你的MariaDB用户名
'password': 'lw123456', # 替换为你的MariaDB密码
'database': 'message_archive'
}
# 数据库连接
def get_db_connection():
connection = pymysql.connect(**db_config)
return connection
# 插入任务到数据库
def insert_task_to_db(description, reminder_time, task_type):
connection = get_db_connection()
cursor = connection.cursor()
# 插入任务信息
query = "INSERT INTO tasks (task_description, reminder_time, task_type) VALUES (%s, %s, %s)"
cursor.execute(query, (description, reminder_time, task_type))
connection.commit()
cursor.close()
connection.close()
# 提醒任务(根据任务描述输出具体内容)
def reminder_task(task_id):
connection = get_db_connection()
cursor = connection.cursor()
# 获取任务描述
query = "SELECT task_description FROM tasks WHERE task_id = %s"
cursor.execute(query, (task_id,))
task = cursor.fetchone()
if task:
task_description = task[0]
print(f"Reminder: {task_description}") # 输出具体的任务内容
cursor.close()
connection.close()
# 调度单次任务
def schedule_single_task(reminder_time, task_id):
schedule.every().day.at(reminder_time).do(reminder_task, task_id=task_id)
print(f"Single task scheduled at {reminder_time}.")
# 调度周期性任务(例如:每天执行)
def schedule_recurring_task(reminder_time, task_id):
schedule.every().day.at(reminder_time).do(reminder_task, task_id=task_id)
print(f"Recurring task scheduled every day at {reminder_time}.")
# 处理用户输入
def handle_user_input(user_input, task_type):
task_json = message_task_json(user_input)
print(task_json)
if task_json:
# 将JSON字符串解析为字典
data = json.loads(task_json)
# 提取内容
task = data["task"]
reminder_time = data["reminder_time"]
reason = data["reason"]
# 将任务存入数据库
insert_task_to_db(user_input, reminder_time, task_type)
# 获取插入的任务ID
connection = get_db_connection()
cursor = connection.cursor()
cursor.execute("SELECT LAST_INSERT_ID()")
task_id = cursor.fetchone()[0]
cursor.close()
connection.commit()
connection.close()
# 根据任务类型调度任务
if task_type == 'single':
schedule_single_task(reminder_time, task_id)
elif task_type == 'recurring':
schedule_recurring_task(reminder_time, task_id)
# 查询指定类型的任务
def get_tasks_by_type(task_type):
connection = get_db_connection()
cursor = connection.cursor()
# 查询指定类型的任务
query = "SELECT task_id, task_description, reminder_time, status, task_type FROM tasks WHERE task_type = %s"
cursor.execute(query, (task_type,))
tasks = cursor.fetchall()
for task in tasks:
print(
f"Task ID: {task[0]}, Description: {task[1]}, Reminder Time: {task[2]}, Status: {task[3]}, Type: {task[4]}")
cursor.close()
connection.close()
# 更新任务状态
def update_task_status(task_id, status):
connection = get_db_connection()
cursor = connection.cursor()
# 更新任务状态
query = "UPDATE tasks SET status = %s WHERE task_id = %s"
cursor.execute(query, (status, task_id))
connection.commit()
cursor.close()
connection.close()
# 获取所有周期性任务
def get_all_recurring_tasks():
get_tasks_by_type('recurring')
# 获取所有单次任务
def get_all_single_tasks():
get_tasks_by_type('single')
# 主函数(示例输入)
def main():
# 示例:单次任务
user_input_single = "提醒我明天上午9点开会"
handle_user_input(user_input_single, 'single')
# 示例:周期性任务
user_input_recurring = "每天上午9点提醒我开会"
handle_user_input(user_input_recurring, 'recurring')
# # 启动定时任务调度
# while True:
# schedule.run_pending()
# time.sleep(1)
if __name__ == "__main__":
main()