68 lines
2.2 KiB
Python
68 lines
2.2 KiB
Python
import sqlite3
|
|
import datetime
|
|
import schedule
|
|
import time
|
|
|
|
# SQLite数据库文件名
|
|
DB_FILE = 'tasks.db'
|
|
|
|
|
|
# 初始化数据库
|
|
def init_db():
|
|
conn = sqlite3.connect(DB_FILE)
|
|
c = conn.cursor()
|
|
c.execute('''CREATE TABLE IF NOT EXISTS tasks
|
|
(id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
description TEXT NOT NULL,
|
|
reminder_time TEXT NOT NULL)''')
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
# 添加任务
|
|
def add_task(description, time_str):
|
|
conn = sqlite3.connect(DB_FILE)
|
|
c = conn.cursor()
|
|
time_obj = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
|
|
c.execute("INSERT INTO tasks (des cription, reminder_time) VALUES (?, ?)",
|
|
(description, time_obj.isoformat()))
|
|
conn.commit()
|
|
conn.close()
|
|
print(f"Task '{description}' added for {time_str}")
|
|
|
|
|
|
# 检查并执行任务
|
|
def check_tasks():
|
|
now = datetime.datetime.now().isoformat()
|
|
conn = sqlite3.connect(DB_FILE)
|
|
c = conn.cursor()
|
|
c.execute("SELECT * FROM tasks WHERE reminder_time <= ?", (now,))
|
|
tasks = c.fetchall()
|
|
for task in tasks:
|
|
print(f"Executing task: {task[1]}")
|
|
# 如果只想执行一次,则删除任务
|
|
c.execute("DELETE FROM tasks WHERE id = ?", (task[0],))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
# 定时检查任务
|
|
schedule.every(1).minute.do(check_tasks) # 每分钟检查一次任务
|
|
|
|
# 示例使用
|
|
if __name__ == "__main__":
|
|
init_db() # 初始化数据库(如果尚未初始化)
|
|
|
|
# 添加示例任务(注意:时间应该设置为未来的时间以测试)
|
|
add_task("Buy groceries", "2024-12-20 14:02:50") # 修改为未来时间进行测试
|
|
add_task("Attend meeting", "2024-12-20 14:03:50") # 修改为未来时间进行测试
|
|
|
|
# 由于我们设置了未来的时间,因此需要使用一个足够长的时间循环来等待任务执行
|
|
# 在实际应用中,你可能会有一个更优雅的方式来停止循环(例如监听特定事件)
|
|
print("Starting task scheduler...")
|
|
try:
|
|
while True:
|
|
schedule.run_pending()
|
|
time.sleep(1) # 等待下一分钟
|
|
except KeyboardInterrupt:
|
|
print("Scheduler stopped manually.") |