import pymysql import schedule import json from task.message_task_json import message_task_json # 配置数据库连接 db_config = { 'host': '192.168.2.32', # 替换为你的MariaDB服务器地址 'user': 'root', # 替换为你的MariaDB用户名 'password': 'lw123456', # 替换为你的MariaDB密码 'database': 'message_archive' } # 数据库连接 def get_db_connection(): connection = pymysql.connect(**db_config) return connection # 插入任务到数据库 def insert_task_to_db(description, reminder_time, task_type): connection = get_db_connection() cursor = connection.cursor() # 插入任务信息 query = "INSERT INTO tasks (task_description, reminder_time, task_type) VALUES (%s, %s, %s)" cursor.execute(query, (description, reminder_time, task_type)) connection.commit() cursor.close() connection.close() # 提醒任务(根据任务描述输出具体内容) def reminder_task(task_id): connection = get_db_connection() cursor = connection.cursor() # 获取任务描述 query = "SELECT task_description FROM tasks WHERE task_id = %s" cursor.execute(query, (task_id,)) task = cursor.fetchone() if task: task_description = task[0] print(f"Reminder: {task_description}") # 输出具体的任务内容 cursor.close() connection.close() # 调度单次任务 def schedule_single_task(reminder_time, task_id): schedule.every().day.at(reminder_time).do(reminder_task, task_id=task_id) print(f"Single task scheduled at {reminder_time}.") # 调度周期性任务(例如:每天执行) def schedule_recurring_task(reminder_time, task_id): schedule.every().day.at(reminder_time).do(reminder_task, task_id=task_id) print(f"Recurring task scheduled every day at {reminder_time}.") # 处理用户输入 def handle_user_input(user_input, task_type): task_json = message_task_json(user_input) print(task_json) if task_json: # 将JSON字符串解析为字典 data = json.loads(task_json) # 提取内容 task = data["task"] reminder_time = data["reminder_time"] reason = data["reason"] # 将任务存入数据库 insert_task_to_db(user_input, reminder_time, task_type) # 获取插入的任务ID connection = get_db_connection() cursor = connection.cursor() cursor.execute("SELECT LAST_INSERT_ID()") task_id = cursor.fetchone()[0] cursor.close() connection.commit() connection.close() # 根据任务类型调度任务 if task_type == 'single': schedule_single_task(reminder_time, task_id) elif task_type == 'recurring': schedule_recurring_task(reminder_time, task_id) # 查询指定类型的任务 def get_tasks_by_type(task_type): connection = get_db_connection() cursor = connection.cursor() # 查询指定类型的任务 query = "SELECT task_id, task_description, reminder_time, status, task_type FROM tasks WHERE task_type = %s" cursor.execute(query, (task_type,)) tasks = cursor.fetchall() for task in tasks: print( f"Task ID: {task[0]}, Description: {task[1]}, Reminder Time: {task[2]}, Status: {task[3]}, Type: {task[4]}") cursor.close() connection.close() # 更新任务状态 def update_task_status(task_id, status): connection = get_db_connection() cursor = connection.cursor() # 更新任务状态 query = "UPDATE tasks SET status = %s WHERE task_id = %s" cursor.execute(query, (status, task_id)) connection.commit() cursor.close() connection.close() # 获取所有周期性任务 def get_all_recurring_tasks(): get_tasks_by_type('recurring') # 获取所有单次任务 def get_all_single_tasks(): get_tasks_by_type('single') # 主函数(示例输入) def main(): # 示例:单次任务 user_input_single = "提醒我明天上午9点开会" handle_user_input(user_input_single, 'single') # 示例:周期性任务 user_input_recurring = "每天上午9点提醒我开会" handle_user_input(user_input_recurring, 'recurring') # # 启动定时任务调度 # while True: # schedule.run_pending() # time.sleep(1) if __name__ == "__main__": main()