# -*- coding: utf-8 -*- from datetime import datetime from typing import Dict, List, Optional from db.base import BaseDBOperator from db.connection import DBConnectionManager class TasksDB(BaseDBOperator): """任务管理相关数据库操作""" def __init__(self, db_manager: DBConnectionManager): super().__init__(db_manager) def initialize_table(self) -> bool: """初始化任务表""" sql = """ CREATE TABLE IF NOT EXISTS tasks ( task_id INT AUTO_INCREMENT PRIMARY KEY, task_description VARCHAR(255) NOT NULL, reminder_time TIME NOT NULL, task_type ENUM('single', 'recurring') DEFAULT 'single', status ENUM('pending', 'completed') DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ return self.execute_update(sql) def add_task(self, description: str, reminder_time: str, task_type: str = 'single') -> Optional[int]: """添加任务""" sql = """ INSERT INTO tasks (task_description, reminder_time, task_type) VALUES (%s, %s, %s) """ conn = self.db_manager.get_mysql_connection() try: with conn.cursor() as cursor: cursor.execute(sql, (description, reminder_time, task_type)) task_id = cursor.lastrowid conn.commit() return task_id except Exception as e: self.LOG.error(f"添加任务出错: {e}") conn.rollback() return None finally: conn.close() def get_pending_tasks(self) -> List[Dict]: """获取待办任务""" sql = """ SELECT task_id, task_description, reminder_time, task_type, status, created_at FROM tasks WHERE status = 'pending' ORDER BY reminder_time """ return self.execute_query(sql) or [] def complete_task(self, task_id: int) -> bool: """完成任务""" sql = """ UPDATE tasks SET status = 'completed' WHERE task_id = %s """ return self.execute_update(sql, (task_id,)) def get_tasks_by_time(self, current_time: str) -> List[Dict]: """获取指定时间的任务""" sql = """ SELECT task_id, task_description, reminder_time, task_type FROM tasks WHERE TIME_FORMAT(reminder_time, '%H:%i') = %s AND (status = 'pending' OR (status = 'completed' AND task_type = 'recurring')) """ return self.execute_query(sql, (current_time,)) or []