From 66a9b7c4a3fe39e0cac16969098446aea58ba4db Mon Sep 17 00:00:00 2001 From: liuwei Date: Thu, 16 Apr 2026 14:54:23 +0800 Subject: [PATCH] feat(schedule): move system jobs to DB-driven config and dashboard management --- AGENTS.md | 0 admin/dashboard/blueprints/system_jobs.py | 111 ++++ admin/dashboard/server.py | 4 + admin/dashboard/templates/base.html | 3 +- admin/dashboard/templates/system_jobs.html | 287 ++++++++++ db/system_job_db.py | 112 ++++ main.py | 62 +-- robot.py | 5 + utils/decorator/async_job.py | 594 +++++++++++++++------ utils/system_jobs.py | 177 ++++++ 10 files changed, 1147 insertions(+), 208 deletions(-) create mode 100644 AGENTS.md create mode 100644 admin/dashboard/blueprints/system_jobs.py create mode 100644 admin/dashboard/templates/system_jobs.html create mode 100644 db/system_job_db.py create mode 100644 utils/system_jobs.py diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..e69de29 diff --git a/admin/dashboard/blueprints/system_jobs.py b/admin/dashboard/blueprints/system_jobs.py new file mode 100644 index 0000000..ee2a884 --- /dev/null +++ b/admin/dashboard/blueprints/system_jobs.py @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- +from flask import Blueprint, current_app, jsonify, render_template, request + +from utils.decorator.async_job import async_job +from .auth import login_required + + +system_jobs_bp = Blueprint("system_jobs", __name__, url_prefix="/system_jobs") + + +@system_jobs_bp.route("/") +@login_required +def page_system_jobs(): + return render_template("system_jobs.html") + + +@system_jobs_bp.route("/api/jobs", methods=["GET"]) +@login_required +def api_list_jobs(): + server = current_app.dashboard_server + db_rows = server.system_job_db.list_jobs() + runtime_rows = async_job.get_jobs_snapshot() + runtime_by_key = {row.get("job_key", ""): row for row in runtime_rows if row.get("job_key")} + + result = [] + for row in db_rows: + job_key = row.get("job_key") + runtime = runtime_by_key.get(job_key, {}) + result.append( + { + "job_key": job_key, + "name": row.get("name", ""), + "description": row.get("description", ""), + "trigger_type": row.get("trigger_type", ""), + "trigger_config": row.get("trigger_config", {}), + "enabled": bool(row.get("enabled", 0)), + "runtime_job_id": runtime.get("id"), + "runtime_enabled": runtime.get("enabled"), + "running": runtime.get("running", False), + "trigger_text": runtime.get("trigger_text", ""), + "last_run_at": runtime.get("last_run_at"), + "last_status": runtime.get("last_status"), + "last_error": runtime.get("last_error"), + "last_duration_ms": runtime.get("last_duration_ms"), + "next_run_at": runtime.get("next_run_at"), + "run_count": runtime.get("run_count", 0), + "success_count": runtime.get("success_count", 0), + "fail_count": runtime.get("fail_count", 0), + } + ) + + return jsonify({"success": True, "data": result}) + + +@system_jobs_bp.route("/api/jobs/", methods=["PUT"]) +@login_required +def api_update_job(job_key: str): + server = current_app.dashboard_server + payload = request.get_json(silent=True) or {} + + updates = {} + for key in ("name", "description", "trigger_type", "trigger_config", "enabled"): + if key in payload: + updates[key] = payload[key] + + if not updates: + return jsonify({"success": False, "message": "没有可更新字段"}), 400 + + ok = server.system_job_db.update_job(job_key, updates) + if not ok: + return jsonify({"success": False, "message": "数据库更新失败"}), 500 + + # 配置变更后立即重载调度器,确保实时生效 + server.system_job_loader.reload_from_db() + return jsonify({"success": True, "message": "更新成功"}) + + +@system_jobs_bp.route("/api/jobs//trigger", methods=["POST"]) +@login_required +def api_trigger_job(job_key: str): + server = current_app.dashboard_server + job_id = async_job.get_job_id_by_key(job_key) + if not job_id: + server.system_job_loader.reload_from_db() + job_id = async_job.get_job_id_by_key(job_key) + if not job_id: + return jsonify({"success": False, "message": "任务未加载或已禁用"}), 404 + + ok, msg = async_job.trigger_job_now(job_id, operator="dashboard") + code = 200 if ok else 400 + return jsonify({"success": ok, "message": msg}), code + + +@system_jobs_bp.route("/api/jobs//logs", methods=["GET"]) +@login_required +def api_job_logs(job_key: str): + job_id = async_job.get_job_id_by_key(job_key) + if not job_id: + return jsonify({"success": True, "data": []}) + + limit = int(request.args.get("limit", 100)) + logs = async_job.get_job_logs(job_id, limit=limit) + return jsonify({"success": True, "data": logs}) + + +@system_jobs_bp.route("/api/reload", methods=["POST"]) +@login_required +def api_reload_jobs(): + server = current_app.dashboard_server + server.system_job_loader.reload_from_db() + return jsonify({"success": True, "message": "已按数据库配置重载系统定时任务"}) diff --git a/admin/dashboard/server.py b/admin/dashboard/server.py index df8aaf7..ed16d41 100644 --- a/admin/dashboard/server.py +++ b/admin/dashboard/server.py @@ -46,6 +46,8 @@ class DashboardServer: self.contact_db: ContactsDBOperator = ContactsDBOperator(self.db_manager) self.member_context_db = MemberContextDBOperator(self.db_manager) self.task_db: TaskDBOperator = TaskDBOperator(self.db_manager) + self.system_job_db = robot_instance.system_job_db + self.system_job_loader = robot_instance.system_job_loader # 获取联系人管理器实例 self.contact_manager = robot_instance.contact_manager self.plugin_manager = robot_instance.plugin_manager @@ -148,6 +150,7 @@ class DashboardServer: from admin.dashboard.blueprints.file_browser import file_browser_bp from admin.dashboard.blueprints.message_push import message_push_bp from admin.dashboard.blueprints.friend_circle import friend_circle_bp + from admin.dashboard.blueprints.system_jobs import system_jobs_bp # 在app.register_blueprint部分添加 app.register_blueprint(virtual_group_bp, url_prefix='/virtual_group') @@ -162,6 +165,7 @@ class DashboardServer: app.register_blueprint(file_browser_bp) app.register_blueprint(message_push_bp) app.register_blueprint(friend_circle_bp) + app.register_blueprint(system_jobs_bp) self.LOG.info("所有蓝图已注册") diff --git a/admin/dashboard/templates/base.html b/admin/dashboard/templates/base.html index a4fddde..4363749 100644 --- a/admin/dashboard/templates/base.html +++ b/admin/dashboard/templates/base.html @@ -779,7 +779,8 @@ defaultPath: '/messages', items: [ { label: '消息列表', path: '/messages' }, - { label: '定时推送', path: '/message_push' } + { label: '定时推送', path: '/message_push' }, + { label: '系统定时任务', path: '/system_jobs' } ] }, { diff --git a/admin/dashboard/templates/system_jobs.html b/admin/dashboard/templates/system_jobs.html new file mode 100644 index 0000000..2e43fdc --- /dev/null +++ b/admin/dashboard/templates/system_jobs.html @@ -0,0 +1,287 @@ +{% extends "base.html" %} + +{% block title %}系统定时任务 - 机器人管理后台{% endblock %} + +{% block content %} +
+
+
+
System Scheduler
+

系统定时任务

+

任务配置存储在数据库表 `t_system_jobs`,支持在线启停、调度调整、手动触发与执行日志查看。

+
+
+ 按表重载 + 刷新 +
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ 取消 + 保存 +
+
+ + + + + + + + +
+{% endblock %} + +{% block scripts %} + +{% endblock %} + +{% block styles %} + +{% endblock %} diff --git a/db/system_job_db.py b/db/system_job_db.py new file mode 100644 index 0000000..f855760 --- /dev/null +++ b/db/system_job_db.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- +import json +from typing import Any, Dict, List, Optional + +from loguru import logger + +from db.base import BaseDBOperator +from db.connection import DBConnectionManager + + +class SystemJobDBOperator(BaseDBOperator): + """系统级定时任务配置表操作。""" + + def __init__(self, db_manager: DBConnectionManager): + super().__init__(db_manager) + + def init_tables(self) -> bool: + """初始化系统任务配置表。""" + try: + self.execute_update( + """ + CREATE TABLE IF NOT EXISTS t_system_jobs ( + job_key VARCHAR(64) PRIMARY KEY, + name VARCHAR(128) NOT NULL, + description VARCHAR(255) DEFAULT '', + trigger_type VARCHAR(64) NOT NULL, + trigger_config JSON NOT NULL, + enabled TINYINT(1) NOT NULL DEFAULT 1, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP + ) + """ + ) + return True + except Exception as e: + logger.error(f"初始化 t_system_jobs 失败: {e}") + return False + + def list_jobs(self) -> List[Dict[str, Any]]: + rows = self.execute_query("SELECT * FROM t_system_jobs ORDER BY created_at ASC") or [] + for row in rows: + cfg = row.get("trigger_config") + if isinstance(cfg, str): + try: + row["trigger_config"] = json.loads(cfg) + except json.JSONDecodeError: + row["trigger_config"] = {} + return rows + + def get_job(self, job_key: str) -> Optional[Dict[str, Any]]: + row = self.execute_query("SELECT * FROM t_system_jobs WHERE job_key = %s", (job_key,), fetch_one=True) + if not row: + return None + cfg = row.get("trigger_config") + if isinstance(cfg, str): + try: + row["trigger_config"] = json.loads(cfg) + except json.JSONDecodeError: + row["trigger_config"] = {} + return row + + def upsert_job(self, data: Dict[str, Any]) -> bool: + try: + sql = """ + INSERT INTO t_system_jobs ( + job_key, name, description, trigger_type, trigger_config, enabled + ) VALUES (%s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + name = VALUES(name), + description = VALUES(description), + trigger_type = VALUES(trigger_type), + trigger_config = VALUES(trigger_config), + enabled = VALUES(enabled) + """ + params = ( + data["job_key"], + data["name"], + data.get("description", ""), + data["trigger_type"], + json.dumps(data.get("trigger_config", {}), ensure_ascii=False), + 1 if data.get("enabled", True) else 0, + ) + return self.execute_update(sql, params) + except Exception as e: + logger.error(f"upsert 系统任务失败: {e}, data={data}") + return False + + def update_job(self, job_key: str, updates: Dict[str, Any]) -> bool: + fields = [] + values: List[Any] = [] + + for key in ("name", "description", "trigger_type", "enabled"): + if key in updates: + fields.append(f"{key} = %s") + if key == "enabled": + values.append(1 if updates[key] else 0) + else: + values.append(updates[key]) + + if "trigger_config" in updates: + fields.append("trigger_config = %s") + values.append(json.dumps(updates.get("trigger_config", {}), ensure_ascii=False)) + + if not fields: + return True + + values.append(job_key) + sql = f"UPDATE t_system_jobs SET {', '.join(fields)} WHERE job_key = %s" + return self.execute_update(sql, tuple(values)) + + def delete_job(self, job_key: str) -> bool: + return self.execute_update("DELETE FROM t_system_jobs WHERE job_key = %s", (job_key,)) diff --git a/main.py b/main.py index ab29fb3..ecdde0a 100644 --- a/main.py +++ b/main.py @@ -6,7 +6,6 @@ import threading from admin.GlancesMonitor import GlancesMonitor from utils.decorator.async_job import async_job from configuration import Config -from plugins.xiuren_image.images_cache import ImageCacheManager from robot import Robot from loguru import logger @@ -114,63 +113,10 @@ def main(): def jobs(robot: Robot): - # ✅ 每天 8:30 发送百度新闻 - @async_job.at_times(["08:30"]) - async def news_baidu_report_auto_job(): - await robot.news_baidu_report_auto() - - # ✅ 每天 10:30 推送 Epic 免费游戏 - @async_job.every_weekday_time(weekday=4, time_str="10:00") # 0=周一,4=周五 - async def epic_job(): - await robot.send_epic_free_games() - - # ✅ 每天 02:30 从 redis 写入 sqlite - @async_job.at_times(["02:30"]) - async def msg_count_to_db_job(): - await robot.message_count_to_db() - - # ✅ 每天 09:30 从 sqlite 读取并发送群排行 - @async_job.at_times(["09:30"]) - async def msg_ranking_job(): - await robot.generate_and_send_ranking() - - # ✅ 每天 15:30 发涩图 PDF - @async_job.at_times(["15:30"]) - async def sehuatang_pdf_job(): - await robot.generate_sehuatang_pdf() - - # ✅ 每天 01:30 下载秀人网帖子 - @async_job.at_times(["01:30"]) - async def xiuren_download_job(): - await robot.xiu_ren_download_task() - - # ✅ 每天 01:30 下载秀人网帖子 - @async_job.at_times(["2:30"]) - async def shenshiR15_download_job(): - await robot.shen_shi_download_task() - - # ✅ 每天 17:30 发秀人 PDF(如果启用) - # @async_job.at_times(["17:30"]) - # async def xiuren_pdf_send_job(): - # await robot.xiu_ren_pdf_send() - - # ✅ 每 3 小时登录验证 - @async_job.at_times(["14:43"]) - async def login_check_job(): - await robot.login_twice_auto_auth() - - @async_job.at_times(["05:00"]) - async def update_image_cache_job(): - logger.info("开始执行图片缓存更新任务") - manager = ImageCacheManager("/mnt/nfs_share") # 替换为你的图片目录 - await manager.update_image_cache() - logger.info("图片缓存更新完成") - - # ✅ 每5分钟处理一次待下载的图片/表情消息(串行处理,避免数据库锁竞争) - @async_job.every_minutes(5) - async def process_pending_images_job(): - if hasattr(robot, 'message_storage') and robot.message_storage: - await robot.message_storage.process_pending_images(minutes_ago=10, batch_size=20) + # 系统级定时任务统一改为数据库驱动,不再在 main.py 里硬编码维护。 + # 这里保留入口,只负责按表配置重新加载,便于运行时刷新。 + if hasattr(robot, "system_job_loader") and robot.system_job_loader: + robot.system_job_loader.reload_from_db() diff --git a/robot.py b/robot.py index 1d71191..b58d4b3 100644 --- a/robot.py +++ b/robot.py @@ -21,8 +21,10 @@ from base.plugin_common.plugin_registry import PluginRegistry from configuration import Config from db.connection import DBConnectionManager from db.contacts_db import ContactsDBOperator +from db.system_job_db import SystemJobDBOperator from plugins.xiuren_image.meitu_dl import meitu_dowload_pub_pic from plugins.xiuren_image.shenshi_r15 import run_daily_job +from utils.system_jobs import SystemJobLoader from utils.email_util import EmailSender from utils.revoke.message_auto_revoke import MessageAutoRevoke from utils.robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus @@ -73,6 +75,7 @@ class Robot: self.redis_pool = self.db_manager.redis_pool self.contacts_db = ContactsDBOperator(self.db_manager) + self.system_job_db = SystemJobDBOperator(self.db_manager) # 初始化联系人管理器 self.contact_manager = ContactManager.get_instance() self.allContacts = {} # 将在登录后填充 @@ -100,6 +103,8 @@ class Robot: self.plugins = self.plugin_manager.load_all_plugins() # 热加载改为低频扫描:每 60 秒检查一次插件文件变动 self.plugin_manager.start_hot_reload_watcher(interval_seconds=60.0) + self.system_job_loader = SystemJobLoader(self, self.system_job_db) + self.system_job_loader.init_and_load() # 加载插件 self.LOG.debug("插件系统初始化完成") diff --git a/utils/decorator/async_job.py b/utils/decorator/async_job.py index a41a09f..8bfdb56 100644 --- a/utils/decorator/async_job.py +++ b/utils/decorator/async_job.py @@ -1,10 +1,21 @@ import asyncio +import inspect import threading +from collections import deque from datetime import datetime, timedelta -from typing import Callable, Awaitable, List, Dict, Optional, Any +from typing import Callable, Awaitable, List, Dict, Optional, Any, Tuple class AsyncJob: + """异步定时任务中心。 + + 设计目标: + 1. 保持装饰器用法兼容(原有插件和 main.py 不用重写) + 2. 支持运行中动态增删改任务 + 3. 提供任务可观测信息(状态、下次运行、执行日志) + 4. 支持后台手动触发、启停、修改调度策略 + """ + def __init__(self): self._jobs: Dict[str, Dict[str, Any]] = {} self._running_tasks: Dict[str, asyncio.Task] = {} @@ -35,34 +46,242 @@ class AsyncJob: return value return None - def _register(self, func: Callable, wrapper: Callable[[], Awaitable], trigger: str): + @staticmethod + def _safe_iso(dt: Optional[datetime]) -> Optional[str]: + return dt.isoformat(timespec="seconds") if dt else None + + @staticmethod + def _normalize_time_str(time_str: str) -> str: + text = str(time_str or "").strip() + if not text: + raise ValueError("时间不能为空") + return datetime.strptime(text, "%H:%M").strftime("%H:%M") + + def _normalize_trigger_config(self, trigger_type: str, trigger_config: Dict[str, Any]) -> Dict[str, Any]: + if trigger_type == "every_seconds": + seconds = int(trigger_config.get("seconds", 0)) + if seconds <= 0: + raise ValueError("seconds 必须大于 0") + return {"seconds": seconds} + + if trigger_type == "at_times": + time_list = trigger_config.get("time_list", []) + if not isinstance(time_list, list) or not time_list: + raise ValueError("time_list 不能为空") + normalized = sorted(set(self._normalize_time_str(t) for t in time_list)) + return {"time_list": normalized} + + if trigger_type == "every_weekday_time": + weekday = int(trigger_config.get("weekday", -1)) + if weekday < 0 or weekday > 6: + raise ValueError("weekday 必须在 0-6") + return {"weekday": weekday, "time_str": self._normalize_time_str(trigger_config.get("time_str", ""))} + + if trigger_type == "every_week_time": + weekday = int(trigger_config.get("weekday", -1)) + if weekday < 0 or weekday > 6: + raise ValueError("weekday 必须在 0-6") + return {"weekday": weekday, "time_str": self._normalize_time_str(trigger_config.get("time_str", ""))} + + if trigger_type == "every_month_last_day_time": + return {"time_str": self._normalize_time_str(trigger_config.get("time_str", ""))} + + raise ValueError(f"不支持的触发器类型: {trigger_type}") + + def _format_trigger(self, trigger_type: str, trigger_config: Dict[str, Any]) -> str: + if trigger_type == "every_seconds": + return f"每 {trigger_config['seconds']} 秒" + if trigger_type == "at_times": + return "每天 " + ", ".join(trigger_config.get("time_list", [])) + if trigger_type == "every_weekday_time": + return f"每周{trigger_config['weekday']} {trigger_config['time_str']}" + if trigger_type == "every_week_time": + return f"每周{trigger_config['weekday']} {trigger_config['time_str']}" + if trigger_type == "every_month_last_day_time": + return f"每月最后一天 {trigger_config['time_str']}" + return trigger_type + + def _compute_next_run(self, trigger_type: str, trigger_config: Dict[str, Any], now: Optional[datetime] = None) -> datetime: + current = now or datetime.now() + + if trigger_type == "every_seconds": + return current + timedelta(seconds=int(trigger_config["seconds"])) + + if trigger_type == "at_times": + parsed_times = [datetime.strptime(t, "%H:%M").time() for t in trigger_config.get("time_list", [])] + targets = [] + for t in parsed_times: + target = datetime.combine(current.date(), t) + if target <= current: + target += timedelta(days=1) + targets.append(target) + return min(targets) + + if trigger_type in ("every_weekday_time", "every_week_time"): + weekday = int(trigger_config["weekday"]) + target_time = datetime.strptime(trigger_config["time_str"], "%H:%M").time() + days_ahead = (weekday - current.weekday() + 7) % 7 + target_date = current.date() + timedelta(days=days_ahead) + target_dt = datetime.combine(target_date, target_time) + if target_dt <= current: + target_dt += timedelta(days=7) + return target_dt + + if trigger_type == "every_month_last_day_time": + target_time = datetime.strptime(trigger_config["time_str"], "%H:%M").time() + if current.month == 12: + next_month = datetime(current.year + 1, 1, 1) + else: + next_month = datetime(current.year, current.month + 1, 1) + last_day = next_month - timedelta(days=1) + target_dt = datetime.combine(last_day.date(), target_time) + if target_dt <= current: + if current.month == 12: + next_month = datetime(current.year + 1, 2, 1) + elif current.month == 11: + next_month = datetime(current.year + 1, 1, 1) + else: + next_month = datetime(current.year, current.month + 2, 1) + last_day = next_month - timedelta(days=1) + target_dt = datetime.combine(last_day.date(), target_time) + return target_dt + + raise ValueError(f"未知触发器类型: {trigger_type}") + + def _append_log(self, job: Dict[str, Any], level: str, message: str): + logs: deque = job["logs"] + logs.append( + { + "time": self._safe_iso(datetime.now()), + "level": level, + "message": message, + } + ) + + def _register( + self, + func: Callable, + trigger_type: str, + trigger_config: Dict[str, Any], + job_name: Optional[str] = None, + description: str = "", + job_key: Optional[str] = None, + ): owner = self._infer_owner(func) + normalized_config = self._normalize_trigger_config(trigger_type, trigger_config) job_id = self._next_job_id() + display_name = (job_name or getattr(func, "__name__", None) or job_id).strip() + owner_name = owner.__class__.__name__ if owner is not None else "system" + created_at = datetime.now() + with self._lock: self._jobs[job_id] = { + "id": job_id, + "job_key": (job_key or "").strip(), + "name": display_name, + "description": description or "", "func": func, - "wrapper": wrapper, - "trigger": trigger, + "trigger_type": trigger_type, + "trigger_config": normalized_config, + "trigger_text": self._format_trigger(trigger_type, normalized_config), "owner_id": id(owner) if owner is not None else None, - "owner_name": owner.__class__.__name__ if owner is not None else None, + "owner_name": owner_name, + "enabled": True, + "running": False, + "last_run_at": None, + "last_status": "never", + "last_error": "", + "last_duration_ms": None, + "next_run_at": None, + "run_count": 0, + "success_count": 0, + "fail_count": 0, + "created_at": created_at, + "updated_at": created_at, + "logs": deque(maxlen=200), } + self._append_log(self._jobs[job_id], "info", f"任务已注册: {display_name}") if self._running and self._loop and self._loop.is_running(): self._loop.call_soon_threadsafe(self._start_job_in_loop, job_id) + return job_id + + async def _execute_job(self, job_id: str, reason: str = "schedule") -> Tuple[bool, str]: + job = self._jobs.get(job_id) + if not job: + return False, "任务不存在" + + if job.get("running"): + return False, "任务正在执行中" + + func = job["func"] + started_at = datetime.now() + job["running"] = True + job["last_run_at"] = started_at + self._append_log(job, "info", f"开始执行,触发来源: {reason}") + + try: + result = func() + if inspect.isawaitable(result): + await result + job["last_status"] = "success" + job["last_error"] = "" + job["success_count"] += 1 + self._append_log(job, "success", "执行成功") + return True, "执行成功" + except asyncio.CancelledError: + job["last_status"] = "cancelled" + job["last_error"] = "任务被取消" + self._append_log(job, "warning", "任务被取消") + raise + except Exception as e: + job["last_status"] = "failed" + job["last_error"] = str(e) + job["fail_count"] += 1 + self._append_log(job, "error", f"执行失败: {e}") + return False, str(e) + finally: + finished_at = datetime.now() + job["run_count"] += 1 + job["running"] = False + job["last_duration_ms"] = int((finished_at - started_at).total_seconds() * 1000) + job["updated_at"] = finished_at + + async def _job_loop(self, job_id: str): + while True: + job = self._jobs.get(job_id) + if not job: + return + + if not job.get("enabled", True): + job["next_run_at"] = None + await asyncio.sleep(1) + continue + + try: + next_run = self._compute_next_run(job["trigger_type"], job["trigger_config"]) + except Exception as e: + job["last_status"] = "invalid_schedule" + job["last_error"] = str(e) + job["next_run_at"] = None + self._append_log(job, "error", f"调度配置非法: {e}") + await asyncio.sleep(5) + continue + + job["next_run_at"] = next_run + wait_seconds = max((next_run - datetime.now()).total_seconds(), 0) + await asyncio.sleep(wait_seconds) + + # 睡眠结束后再检查一次,避免刚被禁用/删除还执行 + job = self._jobs.get(job_id) + if not job or not job.get("enabled", True): + continue + + await self._execute_job(job_id, reason="schedule") def _start_job_in_loop(self, job_id: str): - job = self._jobs.get(job_id) - if not job or job_id in self._running_tasks: + if job_id in self._running_tasks: return - - async def runner(): - try: - await job["wrapper"]() - except asyncio.CancelledError: - raise - except Exception as e: - print(f"[AsyncJob] 任务异常退出: {job_id}, trigger={job.get('trigger')}, error={e}") - - task = asyncio.create_task(runner(), name=f"async_job:{job_id}") + task = asyncio.create_task(self._job_loop(job_id), name=f"async_job:{job_id}") self._running_tasks[job_id] = task task.add_done_callback(lambda _task, _id=job_id: self._running_tasks.pop(_id, None)) @@ -71,6 +290,11 @@ class AsyncJob: if task: task.cancel() + def _restart_job_in_loop(self, job_id: str): + self._cancel_job_in_loop(job_id) + if job_id in self._jobs: + self._start_job_in_loop(job_id) + def remove_job(self, job_id: str) -> bool: with self._lock: existed = job_id in self._jobs or job_id in self._running_tasks @@ -97,161 +321,233 @@ class AsyncJob: removed += 1 return removed - def every_seconds(self, seconds: int): - def decorator(func: Callable): - async def wrapper(): - while True: - try: - await func() - except asyncio.CancelledError: - raise - except Exception as e: - print(f"[AsyncJob] every_seconds 任务执行异常: {e}") - await asyncio.sleep(seconds) + def get_jobs_snapshot(self) -> List[Dict[str, Any]]: + with self._lock: + snapshots = [] + for job in self._jobs.values(): + snapshots.append( + { + "id": job["id"], + "job_key": job.get("job_key", ""), + "name": job["name"], + "description": job.get("description", ""), + "trigger_type": job["trigger_type"], + "trigger_config": dict(job["trigger_config"]), + "trigger_text": job.get("trigger_text", ""), + "owner_name": job.get("owner_name", "system"), + "enabled": job.get("enabled", True), + "running": job.get("running", False), + "last_run_at": self._safe_iso(job.get("last_run_at")), + "last_status": job.get("last_status", "never"), + "last_error": job.get("last_error", ""), + "last_duration_ms": job.get("last_duration_ms"), + "next_run_at": self._safe_iso(job.get("next_run_at")), + "run_count": job.get("run_count", 0), + "success_count": job.get("success_count", 0), + "fail_count": job.get("fail_count", 0), + "created_at": self._safe_iso(job.get("created_at")), + "updated_at": self._safe_iso(job.get("updated_at")), + } + ) + snapshots.sort(key=lambda item: item["name"]) + return snapshots - self._register(func, wrapper, f"every_seconds({seconds})") + def get_job_logs(self, job_id: str, limit: int = 100) -> List[Dict[str, Any]]: + with self._lock: + job = self._jobs.get(job_id) + if not job: + return [] + logs: deque = job.get("logs", deque()) + data = list(logs) + if limit > 0: + data = data[-limit:] + return data + + def trigger_job_now(self, job_id: str, operator: str = "dashboard") -> Tuple[bool, str]: + with self._lock: + job = self._jobs.get(job_id) + loop = self._loop + running = self._running + + if not job: + return False, "任务不存在" + if job.get("running"): + return False, "任务正在执行中" + if not running or not loop or not loop.is_running(): + return False, "任务调度器未运行" + + def _trigger(): + asyncio.create_task(self._execute_job(job_id, reason=f"manual:{operator}")) + + loop.call_soon_threadsafe(_trigger) + return True, "任务已触发" + + def get_job_id_by_key(self, job_key: str) -> Optional[str]: + key = str(job_key or "").strip() + if not key: + return None + with self._lock: + for job_id, job in self._jobs.items(): + if job.get("job_key") == key: + return job_id + return None + + def register_callable( + self, + func: Callable, + trigger_type: str, + trigger_config: Dict[str, Any], + job_name: Optional[str] = None, + description: str = "", + job_key: Optional[str] = None, + ) -> str: + """运行时注册任务(非装饰器方式)。""" + return self._register( + func=func, + trigger_type=trigger_type, + trigger_config=trigger_config, + job_name=job_name, + description=description, + job_key=job_key, + ) + + def set_job_enabled(self, job_id: str, enabled: bool) -> Tuple[bool, str]: + with self._lock: + job = self._jobs.get(job_id) + loop = self._loop + running = self._running + if not job: + return False, "任务不存在" + job["enabled"] = bool(enabled) + job["updated_at"] = datetime.now() + state_text = "启用" if enabled else "停用" + self._append_log(job, "info", f"任务已{state_text}") + + if running and loop and loop.is_running(): + loop.call_soon_threadsafe(self._restart_job_in_loop, job_id) + return True, f"任务已{state_text}" + + def update_job_schedule(self, job_id: str, trigger_type: str, trigger_config: Dict[str, Any]) -> Tuple[bool, str]: + try: + normalized = self._normalize_trigger_config(trigger_type, trigger_config) + except Exception as e: + return False, f"调度参数非法: {e}" + + with self._lock: + job = self._jobs.get(job_id) + loop = self._loop + running = self._running + if not job: + return False, "任务不存在" + + job["trigger_type"] = trigger_type + job["trigger_config"] = normalized + job["trigger_text"] = self._format_trigger(trigger_type, normalized) + job["updated_at"] = datetime.now() + self._append_log(job, "info", f"调度已更新: {job['trigger_text']}") + + if running and loop and loop.is_running(): + loop.call_soon_threadsafe(self._restart_job_in_loop, job_id) + return True, "调度更新成功" + + def every_seconds(self, seconds: int, job_name: Optional[str] = None, description: str = "", job_key: Optional[str] = None): + def decorator(func: Callable): + self._register( + func=func, + trigger_type="every_seconds", + trigger_config={"seconds": seconds}, + job_name=job_name, + description=description, + job_key=job_key, + ) return func return decorator - def every_minutes(self, minutes: int): - return self.every_seconds(minutes * 60) + def every_minutes(self, minutes: int, job_name: Optional[str] = None, description: str = "", job_key: Optional[str] = None): + return self.every_seconds(minutes * 60, job_name=job_name, description=description, job_key=job_key) - def every_hours(self, hours: int): - return self.every_seconds(hours * 3600) + def every_hours(self, hours: int, job_name: Optional[str] = None, description: str = "", job_key: Optional[str] = None): + return self.every_seconds(hours * 3600, job_name=job_name, description=description, job_key=job_key) - def at_times(self, time_list: List[str]): + def at_times( + self, + time_list: List[str], + job_name: Optional[str] = None, + description: str = "", + job_key: Optional[str] = None, + ): def decorator(func: Callable): - parsed_times = [datetime.strptime(t, "%H:%M").time() for t in time_list] - - async def wrapper(): - while True: - now = datetime.now() - targets = [] - for t in parsed_times: - target = datetime.combine(now.date(), t) - if target <= now: - target += timedelta(days=1) - targets.append(target) - - next_target = min(targets) - wait_seconds = (next_target - now).total_seconds() - await asyncio.sleep(max(wait_seconds, 0)) - try: - await func() - except asyncio.CancelledError: - raise - except Exception as e: - print(f"[AsyncJob] at_times 任务执行异常: {e}") - - self._register(func, wrapper, f"at_times({time_list})") + self._register( + func=func, + trigger_type="at_times", + trigger_config={"time_list": time_list}, + job_name=job_name, + description=description, + job_key=job_key, + ) return func return decorator - def every_weekday_time(self, weekday: int, time_str: str): - """ - 每周 weekday(0=周一) 的 time_str(如10:00)时间执行 - """ - + def every_weekday_time( + self, + weekday: int, + time_str: str, + job_name: Optional[str] = None, + description: str = "", + job_key: Optional[str] = None, + ): def decorator(func: Callable): - async def wrapper(): - while True: - now = datetime.now() - target_time = datetime.strptime(time_str, "%H:%M").time() - - # 构造下一个执行时间 - days_ahead = (weekday - now.weekday() + 7) % 7 - target_date = now.date() + timedelta(days=days_ahead) - target_dt = datetime.combine(target_date, target_time) - - if target_dt <= now: - target_dt += timedelta(days=7) - - sleep_secs = (target_dt - now).total_seconds() - await asyncio.sleep(sleep_secs) - try: - await func() - except asyncio.CancelledError: - raise - except Exception as e: - print(f"[AsyncJob] every_weekday_time 任务执行异常: {e}") - - self._register(func, wrapper, f"every_weekday_time({weekday}, {time_str})") + self._register( + func=func, + trigger_type="every_weekday_time", + trigger_config={"weekday": weekday, "time_str": time_str}, + job_name=job_name, + description=description, + job_key=job_key, + ) return func return decorator - def every_week_time(self, weekday: int, time_str: str): - """ - 每周 weekday(0=周一,6=周日) 的 time_str 时间执行 - """ - + def every_week_time( + self, + weekday: int, + time_str: str, + job_name: Optional[str] = None, + description: str = "", + job_key: Optional[str] = None, + ): def decorator(func: Callable): - async def wrapper(): - while True: - now = datetime.now() - target_time = datetime.strptime(time_str, "%H:%M").time() - - days_ahead = (weekday - now.weekday() + 7) % 7 - target_date = now.date() + timedelta(days=days_ahead) - target_dt = datetime.combine(target_date, target_time) - - if target_dt <= now: - target_dt += timedelta(days=7) - - sleep_secs = (target_dt - now).total_seconds() - await asyncio.sleep(sleep_secs) - try: - await func() - except asyncio.CancelledError: - raise - except Exception as e: - print(f"[AsyncJob] every_week_time 任务执行异常: {e}") - - self._register(func, wrapper, f"every_week_time({weekday}, {time_str})") + self._register( + func=func, + trigger_type="every_week_time", + trigger_config={"weekday": weekday, "time_str": time_str}, + job_name=job_name, + description=description, + job_key=job_key, + ) return func return decorator - def every_month_last_day_time(self, time_str: str): - """ - 每月最后一天的 time_str 时间执行 - """ - + def every_month_last_day_time( + self, + time_str: str, + job_name: Optional[str] = None, + description: str = "", + job_key: Optional[str] = None, + ): def decorator(func: Callable): - async def wrapper(): - while True: - now = datetime.now() - target_time = datetime.strptime(time_str, "%H:%M").time() - - if now.month == 12: - next_month = datetime(now.year + 1, 1, 1) - else: - next_month = datetime(now.year, now.month + 1, 1) - last_day = next_month - timedelta(days=1) - target_dt = datetime.combine(last_day.date(), target_time) - - if target_dt <= now: - if now.month == 12: - next_month = datetime(now.year + 1, 2, 1) - elif now.month == 11: - next_month = datetime(now.year + 1, 1, 1) - else: - next_month = datetime(now.year, now.month + 2, 1) - last_day = next_month - timedelta(days=1) - target_dt = datetime.combine(last_day.date(), target_time) - - sleep_secs = (target_dt - now).total_seconds() - await asyncio.sleep(sleep_secs) - try: - await func() - except asyncio.CancelledError: - raise - except Exception as e: - print(f"[AsyncJob] every_month_last_day_time 任务执行异常: {e}") - - self._register(func, wrapper, f"every_month_last_day_time({time_str})") + self._register( + func=func, + trigger_type="every_month_last_day_time", + trigger_config={"time_str": time_str}, + job_name=job_name, + description=description, + job_key=job_key, + ) return func return decorator diff --git a/utils/system_jobs.py b/utils/system_jobs.py new file mode 100644 index 0000000..a7755e8 --- /dev/null +++ b/utils/system_jobs.py @@ -0,0 +1,177 @@ +# -*- coding: utf-8 -*- +from __future__ import annotations + +from typing import Any, Awaitable, Callable, Dict, List + +from loguru import logger + +from db.system_job_db import SystemJobDBOperator +from utils.decorator.async_job import async_job + + +def get_system_job_definitions(robot) -> List[Dict[str, Any]]: + """系统任务定义(业务函数映射)。 + + 说明:这里只维护“任务 key 与业务函数”的绑定关系; + 调度时间、启停状态全部从数据库 t_system_jobs 读取。 + """ + return [ + { + "job_key": "news_baidu_report_auto", + "name": "百度新闻日报", + "description": "每天 08:30 推送百度新闻", + "trigger_type": "at_times", + "trigger_config": {"time_list": ["08:30"]}, + "handler": robot.news_baidu_report_auto, + }, + { + "job_key": "epic_free_games", + "name": "Epic 免费游戏推送", + "description": "每周五 10:00 推送 Epic 免费游戏", + "trigger_type": "every_weekday_time", + "trigger_config": {"weekday": 4, "time_str": "10:00"}, + "handler": robot.send_epic_free_games, + }, + { + "job_key": "message_count_to_db", + "name": "消息计数入库", + "description": "每天 02:30 将 Redis 消息计数写入 SQLite", + "trigger_type": "at_times", + "trigger_config": {"time_list": ["02:30"]}, + "handler": robot.message_count_to_db, + }, + { + "job_key": "message_ranking_push", + "name": "群消息排行推送", + "description": "每天 09:30 生成并发送群消息排行", + "trigger_type": "at_times", + "trigger_config": {"time_list": ["09:30"]}, + "handler": robot.generate_and_send_ranking, + }, + { + "job_key": "sehuatang_pdf_push", + "name": "涩图 PDF 推送", + "description": "每天 15:30 生成并发送涩图 PDF", + "trigger_type": "at_times", + "trigger_config": {"time_list": ["15:30"]}, + "handler": robot.generate_sehuatang_pdf, + }, + { + "job_key": "xiuren_download", + "name": "秀人网下载任务", + "description": "每天 01:30 执行秀人网下载任务", + "trigger_type": "at_times", + "trigger_config": {"time_list": ["01:30"]}, + "handler": robot.xiu_ren_download_task, + }, + { + "job_key": "shenshi_r15_download", + "name": "绅士 R15 下载任务", + "description": "每天 02:30 执行绅士 R15 下载任务", + "trigger_type": "at_times", + "trigger_config": {"time_list": ["02:30"]}, + "handler": robot.shen_shi_download_task, + }, + { + "job_key": "login_check", + "name": "登录状态巡检", + "description": "每天 14:43 执行登录二次校验", + "trigger_type": "at_times", + "trigger_config": {"time_list": ["14:43"]}, + "handler": robot.login_twice_auto_auth, + }, + { + "job_key": "update_image_cache", + "name": "图片缓存更新", + "description": "每天 05:00 扫描并更新图片缓存", + "trigger_type": "at_times", + "trigger_config": {"time_list": ["05:00"]}, + "handler": _build_image_cache_handler(robot), + }, + { + "job_key": "process_pending_images", + "name": "待下载图片补偿处理", + "description": "每 5 分钟处理一次待下载图片/表情,避免数据库锁竞争", + "trigger_type": "every_seconds", + "trigger_config": {"seconds": 300}, + "handler": _build_process_pending_images_handler(robot), + }, + ] + + +def _build_image_cache_handler(robot) -> Callable[[], Awaitable[None]]: + async def _handler(): + from plugins.xiuren_image.images_cache import ImageCacheManager + + logger.info("开始执行图片缓存更新任务") + manager = ImageCacheManager("/mnt/nfs_share") + await manager.update_image_cache() + logger.info("图片缓存更新完成") + + return _handler + + +def _build_process_pending_images_handler(robot) -> Callable[[], Awaitable[None]]: + async def _handler(): + if hasattr(robot, "message_storage") and robot.message_storage: + await robot.message_storage.process_pending_images(minutes_ago=10, batch_size=20) + + return _handler + + +class SystemJobLoader: + """系统任务加载器:从数据库读取调度配置并注册到 async_job。""" + + def __init__(self, robot, system_job_db: SystemJobDBOperator): + self.robot = robot + self.db = system_job_db + self._job_defs = {item["job_key"]: item for item in get_system_job_definitions(robot)} + self._registered_job_ids: List[str] = [] + + def init_and_load(self): + self.db.init_tables() + self._seed_defaults() + self.reload_from_db() + + def _seed_defaults(self): + for item in self._job_defs.values(): + existed = self.db.get_job(item["job_key"]) + if existed: + continue + self.db.upsert_job( + { + "job_key": item["job_key"], + "name": item["name"], + "description": item.get("description", ""), + "trigger_type": item["trigger_type"], + "trigger_config": item["trigger_config"], + "enabled": True, + } + ) + + def reload_from_db(self): + # 先移除当前注册任务,避免重复调度 + for job_id in self._registered_job_ids: + async_job.remove_job(job_id) + self._registered_job_ids = [] + + jobs = self.db.list_jobs() + for row in jobs: + job_key = row.get("job_key") + if not row.get("enabled", 1): + continue + definition = self._job_defs.get(job_key) + if not definition: + logger.warning(f"系统任务 {job_key} 在代码中无处理器,已跳过注册") + continue + + handler = definition["handler"] + job_id = async_job.register_callable( + func=handler, + trigger_type=row.get("trigger_type", definition["trigger_type"]), + trigger_config=row.get("trigger_config", definition["trigger_config"]), + job_name=row.get("name") or definition["name"], + description=row.get("description") or definition.get("description", ""), + job_key=job_key, + ) + self._registered_job_ids.append(job_id)