Files
abot/task/task_list.py
2025-02-14 08:58:48 +08:00

158 lines
4.4 KiB
Python

import pymysql
import dateparser
import schedule
# 配置数据库连接
db_config = {
'host': '192.168.2.32', # 替换为你的MariaDB服务器地址
'user': 'root', # 替换为你的MariaDB用户名
'password': 'lw123456', # 替换为你的MariaDB密码
'database': 'message_archive'
}
# 数据库连接
def get_db_connection():
connection = pymysql.connect(**db_config)
return connection
# 解析自然语言时间
def parse_natural_language_time(text):
parsed_time = dateparser.parse(text)
return parsed_time
# 插入任务到数据库
def insert_task_to_db(description, reminder_time, task_type):
connection = get_db_connection()
cursor = connection.cursor()
# 插入任务信息
query = "INSERT INTO tasks (task_description, reminder_time, task_type) VALUES (%s, %s, %s)"
cursor.execute(query, (description, reminder_time, task_type))
connection.commit()
cursor.close()
connection.close()
# 提醒任务(根据任务描述输出具体内容)
def reminder_task(task_id):
connection = get_db_connection()
cursor = connection.cursor()
# 获取任务描述
query = "SELECT task_description FROM tasks WHERE task_id = %s"
cursor.execute(query, (task_id,))
task = cursor.fetchone()
if task:
task_description = task[0]
print(f"Reminder: {task_description}") # 输出具体的任务内容
cursor.close()
connection.close()
# 调度单次任务
def schedule_single_task(reminder_time, task_id):
schedule.every().day.at(reminder_time).do(reminder_task, task_id=task_id)
print(f"Single task scheduled at {reminder_time}.")
# 调度周期性任务(例如:每天执行)
def schedule_recurring_task(reminder_time, task_id):
schedule.every().day.at(reminder_time).do(reminder_task, task_id=task_id)
print(f"Recurring task scheduled every day at {reminder_time}.")
# 处理用户输入
def handle_user_input(user_input, task_type):
parsed_time = parse_natural_language_time(user_input)
print(parsed_time)
if parsed_time:
formatted_time = parsed_time.strftime('%H:%M')
print(f"Scheduling task for {formatted_time}")
# 将任务存入数据库
insert_task_to_db(user_input, formatted_time, task_type)
# 获取插入的任务ID
connection = get_db_connection()
cursor = connection.cursor()
cursor.execute("SELECT LAST_INSERT_ID()")
task_id = cursor.fetchone()[0]
cursor.close()
connection.commit()
connection.close()
# 根据任务类型调度任务
if task_type == 'single':
schedule_single_task(formatted_time, task_id)
elif task_type == 'recurring':
schedule_recurring_task(formatted_time, task_id)
# 查询指定类型的任务
def get_tasks_by_type(task_type):
connection = get_db_connection()
cursor = connection.cursor()
# 查询指定类型的任务
query = "SELECT task_id, task_description, reminder_time, status, task_type FROM tasks WHERE task_type = %s"
cursor.execute(query, (task_type,))
tasks = cursor.fetchall()
for task in tasks:
print(
f"Task ID: {task[0]}, Description: {task[1]}, Reminder Time: {task[2]}, Status: {task[3]}, Type: {task[4]}")
cursor.close()
connection.close()
# 更新任务状态
def update_task_status(task_id, status):
connection = get_db_connection()
cursor = connection.cursor()
# 更新任务状态
query = "UPDATE tasks SET status = %s WHERE task_id = %s"
cursor.execute(query, (status, task_id))
connection.commit()
cursor.close()
connection.close()
# 获取所有周期性任务
def get_all_recurring_tasks():
get_tasks_by_type('recurring')
# 获取所有单次任务
def get_all_single_tasks():
get_tasks_by_type('single')
# 主函数(示例输入)
def main():
# 示例:单次任务
user_input_single = "提醒我明天上午9点开会"
handle_user_input(user_input_single, 'single')
# 示例:周期性任务
user_input_recurring = "每天上午9点提醒我开会"
handle_user_input(user_input_recurring, 'recurring')
# # 启动定时任务调度
# while True:
# schedule.run_pending()
# time.sleep(1)
if __name__ == "__main__":
# main()
parsed_time = dateparser.parse('每天上午9:00提醒我开会', settings={'TIMEZONE': 'Asia/Shanghai'},)
print(parsed_time)