feat(schedule): move system jobs to DB-driven config and dashboard management

This commit is contained in:
liuwei
2026-04-16 14:54:23 +08:00
parent cb0d11e657
commit 66a9b7c4a3
10 changed files with 1147 additions and 208 deletions

0
AGENTS.md Normal file
View File

View File

@@ -0,0 +1,111 @@
# -*- coding: utf-8 -*-
from flask import Blueprint, current_app, jsonify, render_template, request
from utils.decorator.async_job import async_job
from .auth import login_required
system_jobs_bp = Blueprint("system_jobs", __name__, url_prefix="/system_jobs")
@system_jobs_bp.route("/")
@login_required
def page_system_jobs():
return render_template("system_jobs.html")
@system_jobs_bp.route("/api/jobs", methods=["GET"])
@login_required
def api_list_jobs():
server = current_app.dashboard_server
db_rows = server.system_job_db.list_jobs()
runtime_rows = async_job.get_jobs_snapshot()
runtime_by_key = {row.get("job_key", ""): row for row in runtime_rows if row.get("job_key")}
result = []
for row in db_rows:
job_key = row.get("job_key")
runtime = runtime_by_key.get(job_key, {})
result.append(
{
"job_key": job_key,
"name": row.get("name", ""),
"description": row.get("description", ""),
"trigger_type": row.get("trigger_type", ""),
"trigger_config": row.get("trigger_config", {}),
"enabled": bool(row.get("enabled", 0)),
"runtime_job_id": runtime.get("id"),
"runtime_enabled": runtime.get("enabled"),
"running": runtime.get("running", False),
"trigger_text": runtime.get("trigger_text", ""),
"last_run_at": runtime.get("last_run_at"),
"last_status": runtime.get("last_status"),
"last_error": runtime.get("last_error"),
"last_duration_ms": runtime.get("last_duration_ms"),
"next_run_at": runtime.get("next_run_at"),
"run_count": runtime.get("run_count", 0),
"success_count": runtime.get("success_count", 0),
"fail_count": runtime.get("fail_count", 0),
}
)
return jsonify({"success": True, "data": result})
@system_jobs_bp.route("/api/jobs/<job_key>", methods=["PUT"])
@login_required
def api_update_job(job_key: str):
server = current_app.dashboard_server
payload = request.get_json(silent=True) or {}
updates = {}
for key in ("name", "description", "trigger_type", "trigger_config", "enabled"):
if key in payload:
updates[key] = payload[key]
if not updates:
return jsonify({"success": False, "message": "没有可更新字段"}), 400
ok = server.system_job_db.update_job(job_key, updates)
if not ok:
return jsonify({"success": False, "message": "数据库更新失败"}), 500
# 配置变更后立即重载调度器,确保实时生效
server.system_job_loader.reload_from_db()
return jsonify({"success": True, "message": "更新成功"})
@system_jobs_bp.route("/api/jobs/<job_key>/trigger", methods=["POST"])
@login_required
def api_trigger_job(job_key: str):
server = current_app.dashboard_server
job_id = async_job.get_job_id_by_key(job_key)
if not job_id:
server.system_job_loader.reload_from_db()
job_id = async_job.get_job_id_by_key(job_key)
if not job_id:
return jsonify({"success": False, "message": "任务未加载或已禁用"}), 404
ok, msg = async_job.trigger_job_now(job_id, operator="dashboard")
code = 200 if ok else 400
return jsonify({"success": ok, "message": msg}), code
@system_jobs_bp.route("/api/jobs/<job_key>/logs", methods=["GET"])
@login_required
def api_job_logs(job_key: str):
job_id = async_job.get_job_id_by_key(job_key)
if not job_id:
return jsonify({"success": True, "data": []})
limit = int(request.args.get("limit", 100))
logs = async_job.get_job_logs(job_id, limit=limit)
return jsonify({"success": True, "data": logs})
@system_jobs_bp.route("/api/reload", methods=["POST"])
@login_required
def api_reload_jobs():
server = current_app.dashboard_server
server.system_job_loader.reload_from_db()
return jsonify({"success": True, "message": "已按数据库配置重载系统定时任务"})

View File

@@ -46,6 +46,8 @@ class DashboardServer:
self.contact_db: ContactsDBOperator = ContactsDBOperator(self.db_manager)
self.member_context_db = MemberContextDBOperator(self.db_manager)
self.task_db: TaskDBOperator = TaskDBOperator(self.db_manager)
self.system_job_db = robot_instance.system_job_db
self.system_job_loader = robot_instance.system_job_loader
# 获取联系人管理器实例
self.contact_manager = robot_instance.contact_manager
self.plugin_manager = robot_instance.plugin_manager
@@ -148,6 +150,7 @@ class DashboardServer:
from admin.dashboard.blueprints.file_browser import file_browser_bp
from admin.dashboard.blueprints.message_push import message_push_bp
from admin.dashboard.blueprints.friend_circle import friend_circle_bp
from admin.dashboard.blueprints.system_jobs import system_jobs_bp
# 在app.register_blueprint部分添加
app.register_blueprint(virtual_group_bp, url_prefix='/virtual_group')
@@ -162,6 +165,7 @@ class DashboardServer:
app.register_blueprint(file_browser_bp)
app.register_blueprint(message_push_bp)
app.register_blueprint(friend_circle_bp)
app.register_blueprint(system_jobs_bp)
self.LOG.info("所有蓝图已注册")

View File

@@ -779,7 +779,8 @@
defaultPath: '/messages',
items: [
{ label: '消息列表', path: '/messages' },
{ label: '定时推送', path: '/message_push' }
{ label: '定时推送', path: '/message_push' },
{ label: '系统定时任务', path: '/system_jobs' }
]
},
{

View File

@@ -0,0 +1,287 @@
{% extends "base.html" %}
{% block title %}系统定时任务 - 机器人管理后台{% endblock %}
{% block content %}
<div class="page-shell">
<div class="page-hero">
<div class="page-hero-copy">
<div class="page-eyebrow">System Scheduler</div>
<h1>系统定时任务</h1>
<p>任务配置存储在数据库表 `t_system_jobs`,支持在线启停、调度调整、手动触发与执行日志查看。</p>
</div>
<div class="page-hero-actions">
<el-button type="primary" plain @click="reloadFromDB">按表重载</el-button>
<el-button type="success" @click="loadJobs">刷新</el-button>
</div>
</div>
<el-card shadow="hover">
<el-table :data="jobs" style="width:100%" v-loading="loading">
<el-table-column prop="job_key" label="任务Key" min-width="180"></el-table-column>
<el-table-column prop="name" label="任务名称" min-width="160"></el-table-column>
<el-table-column prop="trigger_text" label="当前调度" min-width="180"></el-table-column>
<el-table-column label="状态" width="140" align="center">
<template slot-scope="scope">
<el-tag :type="scope.row.enabled ? 'success' : 'info'">{% raw %}{{ scope.row.enabled ? '启用' : '停用' }}{% endraw %}</el-tag>
<el-tag style="margin-left:6px" :type="scope.row.running ? 'warning' : ''">{% raw %}{{ scope.row.running ? '执行中' : '空闲' }}{% endraw %}</el-tag>
</template>
</el-table-column>
<el-table-column prop="next_run_at" label="下次执行" min-width="170"></el-table-column>
<el-table-column prop="last_run_at" label="上次执行" min-width="170"></el-table-column>
<el-table-column label="最近结果" width="120" align="center">
<template slot-scope="scope">
<el-tag :type="statusTag(scope.row.last_status)">{% raw %}{{ scope.row.last_status || 'never' }}{% endraw %}</el-tag>
</template>
</el-table-column>
<el-table-column label="操作" min-width="280">
<template slot-scope="scope">
<div class="action-row">
<el-button size="mini" type="primary" plain @click="openEdit(scope.row)">编辑</el-button>
<el-button size="mini" type="success" plain @click="triggerNow(scope.row)">立即触发</el-button>
<el-button
size="mini"
:type="scope.row.enabled ? 'warning' : 'success'"
plain
@click="toggleEnabled(scope.row)">
{% raw %}{{ scope.row.enabled ? '停用' : '启用' }}{% endraw %}
</el-button>
<el-button size="mini" type="text" @click="viewLogs(scope.row)">日志</el-button>
</div>
</template>
</el-table-column>
</el-table>
</el-card>
<el-dialog title="编辑任务调度" :visible.sync="editDialogVisible" width="520px">
<el-form :model="editForm" label-width="100px">
<el-form-item label="任务名称">
<el-input v-model="editForm.name"></el-input>
</el-form-item>
<el-form-item label="任务说明">
<el-input v-model="editForm.description"></el-input>
</el-form-item>
<el-form-item label="启用状态">
<el-switch v-model="editForm.enabled"></el-switch>
</el-form-item>
<el-form-item label="触发类型">
<el-select v-model="editForm.trigger_type" @change="onTriggerTypeChange">
<el-option label="每天固定时间" value="at_times"></el-option>
<el-option label="固定间隔(秒)" value="every_seconds"></el-option>
<el-option label="每周固定时间" value="every_weekday_time"></el-option>
</el-select>
</el-form-item>
<el-form-item v-if="editForm.trigger_type === 'at_times'" label="时间列表">
<el-input v-model="editForm.time_list_text" placeholder="例如 09:00,21:00"></el-input>
</el-form-item>
<el-form-item v-if="editForm.trigger_type === 'every_seconds'" label="间隔秒数">
<el-input-number v-model="editForm.seconds" :min="1"></el-input-number>
</el-form-item>
<el-form-item v-if="editForm.trigger_type === 'every_weekday_time'" label="星期">
<el-select v-model="editForm.weekday">
<el-option label="周一" :value="0"></el-option>
<el-option label="周二" :value="1"></el-option>
<el-option label="周三" :value="2"></el-option>
<el-option label="周四" :value="3"></el-option>
<el-option label="周五" :value="4"></el-option>
<el-option label="周六" :value="5"></el-option>
<el-option label="周日" :value="6"></el-option>
</el-select>
</el-form-item>
<el-form-item v-if="editForm.trigger_type === 'every_weekday_time'" label="时间">
<el-input v-model="editForm.time_str" placeholder="例如 10:00"></el-input>
</el-form-item>
</el-form>
<div slot="footer">
<el-button @click="editDialogVisible = false">取消</el-button>
<el-button type="primary" @click="saveEdit">保存</el-button>
</div>
</el-dialog>
<el-dialog title="执行日志" :visible.sync="logsDialogVisible" width="760px">
<el-table :data="logs" style="width:100%">
<el-table-column prop="time" label="时间" width="180"></el-table-column>
<el-table-column prop="level" label="级别" width="90"></el-table-column>
<el-table-column prop="message" label="内容"></el-table-column>
</el-table>
</el-dialog>
</div>
{% endblock %}
{% block scripts %}
<script>
new Vue({
el: '#app',
mixins: [baseApp],
data() {
return {
loading: false,
jobs: [],
editDialogVisible: false,
logsDialogVisible: false,
logs: [],
currentJobKey: '',
editForm: {
job_key: '',
name: '',
description: '',
enabled: true,
trigger_type: 'at_times',
time_list_text: '',
seconds: 60,
weekday: 0,
time_str: '09:00'
}
};
},
mounted() {
this.loadJobs();
},
methods: {
statusTag(status) {
if (status === 'success') return 'success';
if (status === 'failed') return 'danger';
if (status === 'running') return 'warning';
return 'info';
},
async loadJobs() {
this.loading = true;
try {
const resp = await axios.get('/system_jobs/api/jobs');
if (resp.data.success) {
this.jobs = resp.data.data || [];
} else {
this.$message.error(resp.data.message || '加载失败');
}
} catch (e) {
this.$message.error('加载任务失败');
}
this.loading = false;
},
onTriggerTypeChange() {},
openEdit(row) {
this.currentJobKey = row.job_key;
this.editForm.job_key = row.job_key;
this.editForm.name = row.name || '';
this.editForm.description = row.description || '';
this.editForm.enabled = !!row.enabled;
this.editForm.trigger_type = row.trigger_type || 'at_times';
const cfg = row.trigger_config || {};
this.editForm.time_list_text = (cfg.time_list || []).join(',');
this.editForm.seconds = Number(cfg.seconds || 60);
this.editForm.weekday = Number(cfg.weekday || 0);
this.editForm.time_str = cfg.time_str || '09:00';
this.editDialogVisible = true;
},
buildTriggerConfig() {
if (this.editForm.trigger_type === 'at_times') {
const list = String(this.editForm.time_list_text || '')
.split(',')
.map(s => s.trim())
.filter(Boolean);
return { time_list: list };
}
if (this.editForm.trigger_type === 'every_seconds') {
return { seconds: Number(this.editForm.seconds || 60) };
}
if (this.editForm.trigger_type === 'every_weekday_time') {
return { weekday: Number(this.editForm.weekday || 0), time_str: String(this.editForm.time_str || '09:00') };
}
return {};
},
async saveEdit() {
try {
const payload = {
name: this.editForm.name,
description: this.editForm.description,
enabled: this.editForm.enabled,
trigger_type: this.editForm.trigger_type,
trigger_config: this.buildTriggerConfig()
};
const resp = await axios.put(`/system_jobs/api/jobs/${this.currentJobKey}`, payload);
if (resp.data.success) {
this.$message.success('保存成功');
this.editDialogVisible = false;
await this.loadJobs();
} else {
this.$message.error(resp.data.message || '保存失败');
}
} catch (e) {
this.$message.error('保存失败');
}
},
async triggerNow(row) {
try {
const resp = await axios.post(`/system_jobs/api/jobs/${row.job_key}/trigger`);
if (resp.data.success) {
this.$message.success(resp.data.message || '触发成功');
} else {
this.$message.warning(resp.data.message || '触发失败');
}
await this.loadJobs();
} catch (e) {
this.$message.error('触发失败');
}
},
async toggleEnabled(row) {
try {
const payload = {
enabled: !row.enabled,
name: row.name,
description: row.description,
trigger_type: row.trigger_type,
trigger_config: row.trigger_config
};
const resp = await axios.put(`/system_jobs/api/jobs/${row.job_key}`, payload);
if (resp.data.success) {
this.$message.success(row.enabled ? '已停用' : '已启用');
await this.loadJobs();
} else {
this.$message.error(resp.data.message || '更新失败');
}
} catch (e) {
this.$message.error('更新失败');
}
},
async viewLogs(row) {
try {
const resp = await axios.get(`/system_jobs/api/jobs/${row.job_key}/logs`);
if (resp.data.success) {
this.logs = resp.data.data || [];
this.logsDialogVisible = true;
} else {
this.$message.error('加载日志失败');
}
} catch (e) {
this.$message.error('加载日志失败');
}
},
async reloadFromDB() {
try {
const resp = await axios.post('/system_jobs/api/reload');
if (resp.data.success) {
this.$message.success(resp.data.message || '重载成功');
await this.loadJobs();
} else {
this.$message.error(resp.data.message || '重载失败');
}
} catch (e) {
this.$message.error('重载失败');
}
}
}
});
</script>
{% endblock %}
{% block styles %}
<style>
.page-shell{display:flex;flex-direction:column;gap:16px}
.page-hero{display:flex;align-items:flex-end;justify-content:space-between;gap:18px;padding:24px 26px;border-radius:24px;background:linear-gradient(135deg, rgba(79,70,229,.10), rgba(59,130,246,.08), rgba(255,255,255,.9));border:1px solid rgba(148,163,184,.16);box-shadow:0 18px 40px rgba(15,23,42,.06)}
.page-hero-actions{display:flex;align-items:center;gap:12px}
.page-eyebrow{font-size:12px;text-transform:uppercase;letter-spacing:.08em;color:#6366f1;font-weight:700;margin-bottom:8px}
.page-hero-copy h1{font-size:30px;line-height:1.1;margin-bottom:10px;color:#0f172a}
.page-hero-copy p{color:#64748b;font-size:14px}
.action-row{display:flex;align-items:center;gap:8px;flex-wrap:wrap}
</style>
{% endblock %}

112
db/system_job_db.py Normal file
View File

@@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
import json
from typing import Any, Dict, List, Optional
from loguru import logger
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class SystemJobDBOperator(BaseDBOperator):
"""系统级定时任务配置表操作。"""
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
def init_tables(self) -> bool:
"""初始化系统任务配置表。"""
try:
self.execute_update(
"""
CREATE TABLE IF NOT EXISTS t_system_jobs (
job_key VARCHAR(64) PRIMARY KEY,
name VARCHAR(128) NOT NULL,
description VARCHAR(255) DEFAULT '',
trigger_type VARCHAR(64) NOT NULL,
trigger_config JSON NOT NULL,
enabled TINYINT(1) NOT NULL DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
"""
)
return True
except Exception as e:
logger.error(f"初始化 t_system_jobs 失败: {e}")
return False
def list_jobs(self) -> List[Dict[str, Any]]:
rows = self.execute_query("SELECT * FROM t_system_jobs ORDER BY created_at ASC") or []
for row in rows:
cfg = row.get("trigger_config")
if isinstance(cfg, str):
try:
row["trigger_config"] = json.loads(cfg)
except json.JSONDecodeError:
row["trigger_config"] = {}
return rows
def get_job(self, job_key: str) -> Optional[Dict[str, Any]]:
row = self.execute_query("SELECT * FROM t_system_jobs WHERE job_key = %s", (job_key,), fetch_one=True)
if not row:
return None
cfg = row.get("trigger_config")
if isinstance(cfg, str):
try:
row["trigger_config"] = json.loads(cfg)
except json.JSONDecodeError:
row["trigger_config"] = {}
return row
def upsert_job(self, data: Dict[str, Any]) -> bool:
try:
sql = """
INSERT INTO t_system_jobs (
job_key, name, description, trigger_type, trigger_config, enabled
) VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
name = VALUES(name),
description = VALUES(description),
trigger_type = VALUES(trigger_type),
trigger_config = VALUES(trigger_config),
enabled = VALUES(enabled)
"""
params = (
data["job_key"],
data["name"],
data.get("description", ""),
data["trigger_type"],
json.dumps(data.get("trigger_config", {}), ensure_ascii=False),
1 if data.get("enabled", True) else 0,
)
return self.execute_update(sql, params)
except Exception as e:
logger.error(f"upsert 系统任务失败: {e}, data={data}")
return False
def update_job(self, job_key: str, updates: Dict[str, Any]) -> bool:
fields = []
values: List[Any] = []
for key in ("name", "description", "trigger_type", "enabled"):
if key in updates:
fields.append(f"{key} = %s")
if key == "enabled":
values.append(1 if updates[key] else 0)
else:
values.append(updates[key])
if "trigger_config" in updates:
fields.append("trigger_config = %s")
values.append(json.dumps(updates.get("trigger_config", {}), ensure_ascii=False))
if not fields:
return True
values.append(job_key)
sql = f"UPDATE t_system_jobs SET {', '.join(fields)} WHERE job_key = %s"
return self.execute_update(sql, tuple(values))
def delete_job(self, job_key: str) -> bool:
return self.execute_update("DELETE FROM t_system_jobs WHERE job_key = %s", (job_key,))

62
main.py
View File

@@ -6,7 +6,6 @@ import threading
from admin.GlancesMonitor import GlancesMonitor
from utils.decorator.async_job import async_job
from configuration import Config
from plugins.xiuren_image.images_cache import ImageCacheManager
from robot import Robot
from loguru import logger
@@ -114,63 +113,10 @@ def main():
def jobs(robot: Robot):
# ✅ 每天 8:30 发送百度新闻
@async_job.at_times(["08:30"])
async def news_baidu_report_auto_job():
await robot.news_baidu_report_auto()
# ✅ 每天 10:30 推送 Epic 免费游戏
@async_job.every_weekday_time(weekday=4, time_str="10:00") # 0=周一4=周五
async def epic_job():
await robot.send_epic_free_games()
# ✅ 每天 02:30 从 redis 写入 sqlite
@async_job.at_times(["02:30"])
async def msg_count_to_db_job():
await robot.message_count_to_db()
# ✅ 每天 09:30 从 sqlite 读取并发送群排行
@async_job.at_times(["09:30"])
async def msg_ranking_job():
await robot.generate_and_send_ranking()
# ✅ 每天 15:30 发涩图 PDF
@async_job.at_times(["15:30"])
async def sehuatang_pdf_job():
await robot.generate_sehuatang_pdf()
# ✅ 每天 01:30 下载秀人网帖子
@async_job.at_times(["01:30"])
async def xiuren_download_job():
await robot.xiu_ren_download_task()
# ✅ 每天 01:30 下载秀人网帖子
@async_job.at_times(["2:30"])
async def shenshiR15_download_job():
await robot.shen_shi_download_task()
# ✅ 每天 17:30 发秀人 PDF如果启用
# @async_job.at_times(["17:30"])
# async def xiuren_pdf_send_job():
# await robot.xiu_ren_pdf_send()
# ✅ 每 3 小时登录验证
@async_job.at_times(["14:43"])
async def login_check_job():
await robot.login_twice_auto_auth()
@async_job.at_times(["05:00"])
async def update_image_cache_job():
logger.info("开始执行图片缓存更新任务")
manager = ImageCacheManager("/mnt/nfs_share") # 替换为你的图片目录
await manager.update_image_cache()
logger.info("图片缓存更新完成")
# ✅ 每5分钟处理一次待下载的图片/表情消息(串行处理,避免数据库锁竞争)
@async_job.every_minutes(5)
async def process_pending_images_job():
if hasattr(robot, 'message_storage') and robot.message_storage:
await robot.message_storage.process_pending_images(minutes_ago=10, batch_size=20)
# 系统级定时任务统一改为数据库驱动,不再在 main.py 里硬编码维护。
# 这里保留入口,只负责按表配置重新加载,便于运行时刷新。
if hasattr(robot, "system_job_loader") and robot.system_job_loader:
robot.system_job_loader.reload_from_db()

View File

@@ -21,8 +21,10 @@ from base.plugin_common.plugin_registry import PluginRegistry
from configuration import Config
from db.connection import DBConnectionManager
from db.contacts_db import ContactsDBOperator
from db.system_job_db import SystemJobDBOperator
from plugins.xiuren_image.meitu_dl import meitu_dowload_pub_pic
from plugins.xiuren_image.shenshi_r15 import run_daily_job
from utils.system_jobs import SystemJobLoader
from utils.email_util import EmailSender
from utils.revoke.message_auto_revoke import MessageAutoRevoke
from utils.robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
@@ -73,6 +75,7 @@ class Robot:
self.redis_pool = self.db_manager.redis_pool
self.contacts_db = ContactsDBOperator(self.db_manager)
self.system_job_db = SystemJobDBOperator(self.db_manager)
# 初始化联系人管理器
self.contact_manager = ContactManager.get_instance()
self.allContacts = {} # 将在登录后填充
@@ -100,6 +103,8 @@ class Robot:
self.plugins = self.plugin_manager.load_all_plugins()
# 热加载改为低频扫描:每 60 秒检查一次插件文件变动
self.plugin_manager.start_hot_reload_watcher(interval_seconds=60.0)
self.system_job_loader = SystemJobLoader(self, self.system_job_db)
self.system_job_loader.init_and_load()
# 加载插件
self.LOG.debug("插件系统初始化完成")

View File

@@ -1,10 +1,21 @@
import asyncio
import inspect
import threading
from collections import deque
from datetime import datetime, timedelta
from typing import Callable, Awaitable, List, Dict, Optional, Any
from typing import Callable, Awaitable, List, Dict, Optional, Any, Tuple
class AsyncJob:
"""异步定时任务中心。
设计目标:
1. 保持装饰器用法兼容(原有插件和 main.py 不用重写)
2. 支持运行中动态增删改任务
3. 提供任务可观测信息(状态、下次运行、执行日志)
4. 支持后台手动触发、启停、修改调度策略
"""
def __init__(self):
self._jobs: Dict[str, Dict[str, Any]] = {}
self._running_tasks: Dict[str, asyncio.Task] = {}
@@ -35,34 +46,242 @@ class AsyncJob:
return value
return None
def _register(self, func: Callable, wrapper: Callable[[], Awaitable], trigger: str):
@staticmethod
def _safe_iso(dt: Optional[datetime]) -> Optional[str]:
return dt.isoformat(timespec="seconds") if dt else None
@staticmethod
def _normalize_time_str(time_str: str) -> str:
text = str(time_str or "").strip()
if not text:
raise ValueError("时间不能为空")
return datetime.strptime(text, "%H:%M").strftime("%H:%M")
def _normalize_trigger_config(self, trigger_type: str, trigger_config: Dict[str, Any]) -> Dict[str, Any]:
if trigger_type == "every_seconds":
seconds = int(trigger_config.get("seconds", 0))
if seconds <= 0:
raise ValueError("seconds 必须大于 0")
return {"seconds": seconds}
if trigger_type == "at_times":
time_list = trigger_config.get("time_list", [])
if not isinstance(time_list, list) or not time_list:
raise ValueError("time_list 不能为空")
normalized = sorted(set(self._normalize_time_str(t) for t in time_list))
return {"time_list": normalized}
if trigger_type == "every_weekday_time":
weekday = int(trigger_config.get("weekday", -1))
if weekday < 0 or weekday > 6:
raise ValueError("weekday 必须在 0-6")
return {"weekday": weekday, "time_str": self._normalize_time_str(trigger_config.get("time_str", ""))}
if trigger_type == "every_week_time":
weekday = int(trigger_config.get("weekday", -1))
if weekday < 0 or weekday > 6:
raise ValueError("weekday 必须在 0-6")
return {"weekday": weekday, "time_str": self._normalize_time_str(trigger_config.get("time_str", ""))}
if trigger_type == "every_month_last_day_time":
return {"time_str": self._normalize_time_str(trigger_config.get("time_str", ""))}
raise ValueError(f"不支持的触发器类型: {trigger_type}")
def _format_trigger(self, trigger_type: str, trigger_config: Dict[str, Any]) -> str:
if trigger_type == "every_seconds":
return f"{trigger_config['seconds']}"
if trigger_type == "at_times":
return "每天 " + ", ".join(trigger_config.get("time_list", []))
if trigger_type == "every_weekday_time":
return f"每周{trigger_config['weekday']} {trigger_config['time_str']}"
if trigger_type == "every_week_time":
return f"每周{trigger_config['weekday']} {trigger_config['time_str']}"
if trigger_type == "every_month_last_day_time":
return f"每月最后一天 {trigger_config['time_str']}"
return trigger_type
def _compute_next_run(self, trigger_type: str, trigger_config: Dict[str, Any], now: Optional[datetime] = None) -> datetime:
current = now or datetime.now()
if trigger_type == "every_seconds":
return current + timedelta(seconds=int(trigger_config["seconds"]))
if trigger_type == "at_times":
parsed_times = [datetime.strptime(t, "%H:%M").time() for t in trigger_config.get("time_list", [])]
targets = []
for t in parsed_times:
target = datetime.combine(current.date(), t)
if target <= current:
target += timedelta(days=1)
targets.append(target)
return min(targets)
if trigger_type in ("every_weekday_time", "every_week_time"):
weekday = int(trigger_config["weekday"])
target_time = datetime.strptime(trigger_config["time_str"], "%H:%M").time()
days_ahead = (weekday - current.weekday() + 7) % 7
target_date = current.date() + timedelta(days=days_ahead)
target_dt = datetime.combine(target_date, target_time)
if target_dt <= current:
target_dt += timedelta(days=7)
return target_dt
if trigger_type == "every_month_last_day_time":
target_time = datetime.strptime(trigger_config["time_str"], "%H:%M").time()
if current.month == 12:
next_month = datetime(current.year + 1, 1, 1)
else:
next_month = datetime(current.year, current.month + 1, 1)
last_day = next_month - timedelta(days=1)
target_dt = datetime.combine(last_day.date(), target_time)
if target_dt <= current:
if current.month == 12:
next_month = datetime(current.year + 1, 2, 1)
elif current.month == 11:
next_month = datetime(current.year + 1, 1, 1)
else:
next_month = datetime(current.year, current.month + 2, 1)
last_day = next_month - timedelta(days=1)
target_dt = datetime.combine(last_day.date(), target_time)
return target_dt
raise ValueError(f"未知触发器类型: {trigger_type}")
def _append_log(self, job: Dict[str, Any], level: str, message: str):
logs: deque = job["logs"]
logs.append(
{
"time": self._safe_iso(datetime.now()),
"level": level,
"message": message,
}
)
def _register(
self,
func: Callable,
trigger_type: str,
trigger_config: Dict[str, Any],
job_name: Optional[str] = None,
description: str = "",
job_key: Optional[str] = None,
):
owner = self._infer_owner(func)
normalized_config = self._normalize_trigger_config(trigger_type, trigger_config)
job_id = self._next_job_id()
display_name = (job_name or getattr(func, "__name__", None) or job_id).strip()
owner_name = owner.__class__.__name__ if owner is not None else "system"
created_at = datetime.now()
with self._lock:
self._jobs[job_id] = {
"id": job_id,
"job_key": (job_key or "").strip(),
"name": display_name,
"description": description or "",
"func": func,
"wrapper": wrapper,
"trigger": trigger,
"trigger_type": trigger_type,
"trigger_config": normalized_config,
"trigger_text": self._format_trigger(trigger_type, normalized_config),
"owner_id": id(owner) if owner is not None else None,
"owner_name": owner.__class__.__name__ if owner is not None else None,
"owner_name": owner_name,
"enabled": True,
"running": False,
"last_run_at": None,
"last_status": "never",
"last_error": "",
"last_duration_ms": None,
"next_run_at": None,
"run_count": 0,
"success_count": 0,
"fail_count": 0,
"created_at": created_at,
"updated_at": created_at,
"logs": deque(maxlen=200),
}
self._append_log(self._jobs[job_id], "info", f"任务已注册: {display_name}")
if self._running and self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._start_job_in_loop, job_id)
return job_id
async def _execute_job(self, job_id: str, reason: str = "schedule") -> Tuple[bool, str]:
job = self._jobs.get(job_id)
if not job:
return False, "任务不存在"
if job.get("running"):
return False, "任务正在执行中"
func = job["func"]
started_at = datetime.now()
job["running"] = True
job["last_run_at"] = started_at
self._append_log(job, "info", f"开始执行,触发来源: {reason}")
try:
result = func()
if inspect.isawaitable(result):
await result
job["last_status"] = "success"
job["last_error"] = ""
job["success_count"] += 1
self._append_log(job, "success", "执行成功")
return True, "执行成功"
except asyncio.CancelledError:
job["last_status"] = "cancelled"
job["last_error"] = "任务被取消"
self._append_log(job, "warning", "任务被取消")
raise
except Exception as e:
job["last_status"] = "failed"
job["last_error"] = str(e)
job["fail_count"] += 1
self._append_log(job, "error", f"执行失败: {e}")
return False, str(e)
finally:
finished_at = datetime.now()
job["run_count"] += 1
job["running"] = False
job["last_duration_ms"] = int((finished_at - started_at).total_seconds() * 1000)
job["updated_at"] = finished_at
async def _job_loop(self, job_id: str):
while True:
job = self._jobs.get(job_id)
if not job:
return
if not job.get("enabled", True):
job["next_run_at"] = None
await asyncio.sleep(1)
continue
try:
next_run = self._compute_next_run(job["trigger_type"], job["trigger_config"])
except Exception as e:
job["last_status"] = "invalid_schedule"
job["last_error"] = str(e)
job["next_run_at"] = None
self._append_log(job, "error", f"调度配置非法: {e}")
await asyncio.sleep(5)
continue
job["next_run_at"] = next_run
wait_seconds = max((next_run - datetime.now()).total_seconds(), 0)
await asyncio.sleep(wait_seconds)
# 睡眠结束后再检查一次,避免刚被禁用/删除还执行
job = self._jobs.get(job_id)
if not job or not job.get("enabled", True):
continue
await self._execute_job(job_id, reason="schedule")
def _start_job_in_loop(self, job_id: str):
job = self._jobs.get(job_id)
if not job or job_id in self._running_tasks:
if job_id in self._running_tasks:
return
async def runner():
try:
await job["wrapper"]()
except asyncio.CancelledError:
raise
except Exception as e:
print(f"[AsyncJob] 任务异常退出: {job_id}, trigger={job.get('trigger')}, error={e}")
task = asyncio.create_task(runner(), name=f"async_job:{job_id}")
task = asyncio.create_task(self._job_loop(job_id), name=f"async_job:{job_id}")
self._running_tasks[job_id] = task
task.add_done_callback(lambda _task, _id=job_id: self._running_tasks.pop(_id, None))
@@ -71,6 +290,11 @@ class AsyncJob:
if task:
task.cancel()
def _restart_job_in_loop(self, job_id: str):
self._cancel_job_in_loop(job_id)
if job_id in self._jobs:
self._start_job_in_loop(job_id)
def remove_job(self, job_id: str) -> bool:
with self._lock:
existed = job_id in self._jobs or job_id in self._running_tasks
@@ -97,161 +321,233 @@ class AsyncJob:
removed += 1
return removed
def every_seconds(self, seconds: int):
def decorator(func: Callable):
async def wrapper():
while True:
try:
await func()
except asyncio.CancelledError:
raise
except Exception as e:
print(f"[AsyncJob] every_seconds 任务执行异常: {e}")
await asyncio.sleep(seconds)
def get_jobs_snapshot(self) -> List[Dict[str, Any]]:
with self._lock:
snapshots = []
for job in self._jobs.values():
snapshots.append(
{
"id": job["id"],
"job_key": job.get("job_key", ""),
"name": job["name"],
"description": job.get("description", ""),
"trigger_type": job["trigger_type"],
"trigger_config": dict(job["trigger_config"]),
"trigger_text": job.get("trigger_text", ""),
"owner_name": job.get("owner_name", "system"),
"enabled": job.get("enabled", True),
"running": job.get("running", False),
"last_run_at": self._safe_iso(job.get("last_run_at")),
"last_status": job.get("last_status", "never"),
"last_error": job.get("last_error", ""),
"last_duration_ms": job.get("last_duration_ms"),
"next_run_at": self._safe_iso(job.get("next_run_at")),
"run_count": job.get("run_count", 0),
"success_count": job.get("success_count", 0),
"fail_count": job.get("fail_count", 0),
"created_at": self._safe_iso(job.get("created_at")),
"updated_at": self._safe_iso(job.get("updated_at")),
}
)
snapshots.sort(key=lambda item: item["name"])
return snapshots
self._register(func, wrapper, f"every_seconds({seconds})")
def get_job_logs(self, job_id: str, limit: int = 100) -> List[Dict[str, Any]]:
with self._lock:
job = self._jobs.get(job_id)
if not job:
return []
logs: deque = job.get("logs", deque())
data = list(logs)
if limit > 0:
data = data[-limit:]
return data
def trigger_job_now(self, job_id: str, operator: str = "dashboard") -> Tuple[bool, str]:
with self._lock:
job = self._jobs.get(job_id)
loop = self._loop
running = self._running
if not job:
return False, "任务不存在"
if job.get("running"):
return False, "任务正在执行中"
if not running or not loop or not loop.is_running():
return False, "任务调度器未运行"
def _trigger():
asyncio.create_task(self._execute_job(job_id, reason=f"manual:{operator}"))
loop.call_soon_threadsafe(_trigger)
return True, "任务已触发"
def get_job_id_by_key(self, job_key: str) -> Optional[str]:
key = str(job_key or "").strip()
if not key:
return None
with self._lock:
for job_id, job in self._jobs.items():
if job.get("job_key") == key:
return job_id
return None
def register_callable(
self,
func: Callable,
trigger_type: str,
trigger_config: Dict[str, Any],
job_name: Optional[str] = None,
description: str = "",
job_key: Optional[str] = None,
) -> str:
"""运行时注册任务(非装饰器方式)。"""
return self._register(
func=func,
trigger_type=trigger_type,
trigger_config=trigger_config,
job_name=job_name,
description=description,
job_key=job_key,
)
def set_job_enabled(self, job_id: str, enabled: bool) -> Tuple[bool, str]:
with self._lock:
job = self._jobs.get(job_id)
loop = self._loop
running = self._running
if not job:
return False, "任务不存在"
job["enabled"] = bool(enabled)
job["updated_at"] = datetime.now()
state_text = "启用" if enabled else "停用"
self._append_log(job, "info", f"任务已{state_text}")
if running and loop and loop.is_running():
loop.call_soon_threadsafe(self._restart_job_in_loop, job_id)
return True, f"任务已{state_text}"
def update_job_schedule(self, job_id: str, trigger_type: str, trigger_config: Dict[str, Any]) -> Tuple[bool, str]:
try:
normalized = self._normalize_trigger_config(trigger_type, trigger_config)
except Exception as e:
return False, f"调度参数非法: {e}"
with self._lock:
job = self._jobs.get(job_id)
loop = self._loop
running = self._running
if not job:
return False, "任务不存在"
job["trigger_type"] = trigger_type
job["trigger_config"] = normalized
job["trigger_text"] = self._format_trigger(trigger_type, normalized)
job["updated_at"] = datetime.now()
self._append_log(job, "info", f"调度已更新: {job['trigger_text']}")
if running and loop and loop.is_running():
loop.call_soon_threadsafe(self._restart_job_in_loop, job_id)
return True, "调度更新成功"
def every_seconds(self, seconds: int, job_name: Optional[str] = None, description: str = "", job_key: Optional[str] = None):
def decorator(func: Callable):
self._register(
func=func,
trigger_type="every_seconds",
trigger_config={"seconds": seconds},
job_name=job_name,
description=description,
job_key=job_key,
)
return func
return decorator
def every_minutes(self, minutes: int):
return self.every_seconds(minutes * 60)
def every_minutes(self, minutes: int, job_name: Optional[str] = None, description: str = "", job_key: Optional[str] = None):
return self.every_seconds(minutes * 60, job_name=job_name, description=description, job_key=job_key)
def every_hours(self, hours: int):
return self.every_seconds(hours * 3600)
def every_hours(self, hours: int, job_name: Optional[str] = None, description: str = "", job_key: Optional[str] = None):
return self.every_seconds(hours * 3600, job_name=job_name, description=description, job_key=job_key)
def at_times(self, time_list: List[str]):
def at_times(
self,
time_list: List[str],
job_name: Optional[str] = None,
description: str = "",
job_key: Optional[str] = None,
):
def decorator(func: Callable):
parsed_times = [datetime.strptime(t, "%H:%M").time() for t in time_list]
async def wrapper():
while True:
now = datetime.now()
targets = []
for t in parsed_times:
target = datetime.combine(now.date(), t)
if target <= now:
target += timedelta(days=1)
targets.append(target)
next_target = min(targets)
wait_seconds = (next_target - now).total_seconds()
await asyncio.sleep(max(wait_seconds, 0))
try:
await func()
except asyncio.CancelledError:
raise
except Exception as e:
print(f"[AsyncJob] at_times 任务执行异常: {e}")
self._register(func, wrapper, f"at_times({time_list})")
self._register(
func=func,
trigger_type="at_times",
trigger_config={"time_list": time_list},
job_name=job_name,
description=description,
job_key=job_key,
)
return func
return decorator
def every_weekday_time(self, weekday: int, time_str: str):
"""
每周 weekday0=周一) 的 time_str如10:00时间执行
"""
def every_weekday_time(
self,
weekday: int,
time_str: str,
job_name: Optional[str] = None,
description: str = "",
job_key: Optional[str] = None,
):
def decorator(func: Callable):
async def wrapper():
while True:
now = datetime.now()
target_time = datetime.strptime(time_str, "%H:%M").time()
# 构造下一个执行时间
days_ahead = (weekday - now.weekday() + 7) % 7
target_date = now.date() + timedelta(days=days_ahead)
target_dt = datetime.combine(target_date, target_time)
if target_dt <= now:
target_dt += timedelta(days=7)
sleep_secs = (target_dt - now).total_seconds()
await asyncio.sleep(sleep_secs)
try:
await func()
except asyncio.CancelledError:
raise
except Exception as e:
print(f"[AsyncJob] every_weekday_time 任务执行异常: {e}")
self._register(func, wrapper, f"every_weekday_time({weekday}, {time_str})")
self._register(
func=func,
trigger_type="every_weekday_time",
trigger_config={"weekday": weekday, "time_str": time_str},
job_name=job_name,
description=description,
job_key=job_key,
)
return func
return decorator
def every_week_time(self, weekday: int, time_str: str):
"""
每周 weekday0=周一6=周日) 的 time_str 时间执行
"""
def every_week_time(
self,
weekday: int,
time_str: str,
job_name: Optional[str] = None,
description: str = "",
job_key: Optional[str] = None,
):
def decorator(func: Callable):
async def wrapper():
while True:
now = datetime.now()
target_time = datetime.strptime(time_str, "%H:%M").time()
days_ahead = (weekday - now.weekday() + 7) % 7
target_date = now.date() + timedelta(days=days_ahead)
target_dt = datetime.combine(target_date, target_time)
if target_dt <= now:
target_dt += timedelta(days=7)
sleep_secs = (target_dt - now).total_seconds()
await asyncio.sleep(sleep_secs)
try:
await func()
except asyncio.CancelledError:
raise
except Exception as e:
print(f"[AsyncJob] every_week_time 任务执行异常: {e}")
self._register(func, wrapper, f"every_week_time({weekday}, {time_str})")
self._register(
func=func,
trigger_type="every_week_time",
trigger_config={"weekday": weekday, "time_str": time_str},
job_name=job_name,
description=description,
job_key=job_key,
)
return func
return decorator
def every_month_last_day_time(self, time_str: str):
"""
每月最后一天的 time_str 时间执行
"""
def every_month_last_day_time(
self,
time_str: str,
job_name: Optional[str] = None,
description: str = "",
job_key: Optional[str] = None,
):
def decorator(func: Callable):
async def wrapper():
while True:
now = datetime.now()
target_time = datetime.strptime(time_str, "%H:%M").time()
if now.month == 12:
next_month = datetime(now.year + 1, 1, 1)
else:
next_month = datetime(now.year, now.month + 1, 1)
last_day = next_month - timedelta(days=1)
target_dt = datetime.combine(last_day.date(), target_time)
if target_dt <= now:
if now.month == 12:
next_month = datetime(now.year + 1, 2, 1)
elif now.month == 11:
next_month = datetime(now.year + 1, 1, 1)
else:
next_month = datetime(now.year, now.month + 2, 1)
last_day = next_month - timedelta(days=1)
target_dt = datetime.combine(last_day.date(), target_time)
sleep_secs = (target_dt - now).total_seconds()
await asyncio.sleep(sleep_secs)
try:
await func()
except asyncio.CancelledError:
raise
except Exception as e:
print(f"[AsyncJob] every_month_last_day_time 任务执行异常: {e}")
self._register(func, wrapper, f"every_month_last_day_time({time_str})")
self._register(
func=func,
trigger_type="every_month_last_day_time",
trigger_config={"time_str": time_str},
job_name=job_name,
description=description,
job_key=job_key,
)
return func
return decorator

177
utils/system_jobs.py Normal file
View File

@@ -0,0 +1,177 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Any, Awaitable, Callable, Dict, List
from loguru import logger
from db.system_job_db import SystemJobDBOperator
from utils.decorator.async_job import async_job
def get_system_job_definitions(robot) -> List[Dict[str, Any]]:
"""系统任务定义(业务函数映射)。
说明:这里只维护“任务 key 与业务函数”的绑定关系;
调度时间、启停状态全部从数据库 t_system_jobs 读取。
"""
return [
{
"job_key": "news_baidu_report_auto",
"name": "百度新闻日报",
"description": "每天 08:30 推送百度新闻",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["08:30"]},
"handler": robot.news_baidu_report_auto,
},
{
"job_key": "epic_free_games",
"name": "Epic 免费游戏推送",
"description": "每周五 10:00 推送 Epic 免费游戏",
"trigger_type": "every_weekday_time",
"trigger_config": {"weekday": 4, "time_str": "10:00"},
"handler": robot.send_epic_free_games,
},
{
"job_key": "message_count_to_db",
"name": "消息计数入库",
"description": "每天 02:30 将 Redis 消息计数写入 SQLite",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["02:30"]},
"handler": robot.message_count_to_db,
},
{
"job_key": "message_ranking_push",
"name": "群消息排行推送",
"description": "每天 09:30 生成并发送群消息排行",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["09:30"]},
"handler": robot.generate_and_send_ranking,
},
{
"job_key": "sehuatang_pdf_push",
"name": "涩图 PDF 推送",
"description": "每天 15:30 生成并发送涩图 PDF",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["15:30"]},
"handler": robot.generate_sehuatang_pdf,
},
{
"job_key": "xiuren_download",
"name": "秀人网下载任务",
"description": "每天 01:30 执行秀人网下载任务",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["01:30"]},
"handler": robot.xiu_ren_download_task,
},
{
"job_key": "shenshi_r15_download",
"name": "绅士 R15 下载任务",
"description": "每天 02:30 执行绅士 R15 下载任务",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["02:30"]},
"handler": robot.shen_shi_download_task,
},
{
"job_key": "login_check",
"name": "登录状态巡检",
"description": "每天 14:43 执行登录二次校验",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["14:43"]},
"handler": robot.login_twice_auto_auth,
},
{
"job_key": "update_image_cache",
"name": "图片缓存更新",
"description": "每天 05:00 扫描并更新图片缓存",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["05:00"]},
"handler": _build_image_cache_handler(robot),
},
{
"job_key": "process_pending_images",
"name": "待下载图片补偿处理",
"description": "每 5 分钟处理一次待下载图片/表情,避免数据库锁竞争",
"trigger_type": "every_seconds",
"trigger_config": {"seconds": 300},
"handler": _build_process_pending_images_handler(robot),
},
]
def _build_image_cache_handler(robot) -> Callable[[], Awaitable[None]]:
async def _handler():
from plugins.xiuren_image.images_cache import ImageCacheManager
logger.info("开始执行图片缓存更新任务")
manager = ImageCacheManager("/mnt/nfs_share")
await manager.update_image_cache()
logger.info("图片缓存更新完成")
return _handler
def _build_process_pending_images_handler(robot) -> Callable[[], Awaitable[None]]:
async def _handler():
if hasattr(robot, "message_storage") and robot.message_storage:
await robot.message_storage.process_pending_images(minutes_ago=10, batch_size=20)
return _handler
class SystemJobLoader:
"""系统任务加载器:从数据库读取调度配置并注册到 async_job。"""
def __init__(self, robot, system_job_db: SystemJobDBOperator):
self.robot = robot
self.db = system_job_db
self._job_defs = {item["job_key"]: item for item in get_system_job_definitions(robot)}
self._registered_job_ids: List[str] = []
def init_and_load(self):
self.db.init_tables()
self._seed_defaults()
self.reload_from_db()
def _seed_defaults(self):
for item in self._job_defs.values():
existed = self.db.get_job(item["job_key"])
if existed:
continue
self.db.upsert_job(
{
"job_key": item["job_key"],
"name": item["name"],
"description": item.get("description", ""),
"trigger_type": item["trigger_type"],
"trigger_config": item["trigger_config"],
"enabled": True,
}
)
def reload_from_db(self):
# 先移除当前注册任务,避免重复调度
for job_id in self._registered_job_ids:
async_job.remove_job(job_id)
self._registered_job_ids = []
jobs = self.db.list_jobs()
for row in jobs:
job_key = row.get("job_key")
if not row.get("enabled", 1):
continue
definition = self._job_defs.get(job_key)
if not definition:
logger.warning(f"系统任务 {job_key} 在代码中无处理器,已跳过注册")
continue
handler = definition["handler"]
job_id = async_job.register_callable(
func=handler,
trigger_type=row.get("trigger_type", definition["trigger_type"]),
trigger_config=row.get("trigger_config", definition["trigger_config"]),
job_name=row.get("name") or definition["name"],
description=row.get("description") or definition.get("description", ""),
job_key=job_key,
)
self._registered_job_ids.append(job_id)