feat(plugin-schedule): add DB-driven plugin scheduler and xiuren scheduled push

This commit is contained in:
liuwei
2026-04-16 15:24:23 +08:00
parent 9d6609990b
commit 014985ac4a
9 changed files with 854 additions and 0 deletions

View File

@@ -0,0 +1,195 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from datetime import datetime
from typing import Any, Dict, List, Optional
from loguru import logger
from db.plugin_schedule_db import PluginScheduleDBOperator
from utils.decorator.async_job import async_job
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
class PluginScheduleManager:
"""插件定时任务管理器(数据库驱动)。"""
def __init__(self, plugin_manager, plugin_schedule_db: PluginScheduleDBOperator):
self.plugin_manager = plugin_manager
self.db = plugin_schedule_db
self._schedule_job_map: Dict[int, str] = {}
def init_and_load(self):
self.db.init_tables()
self.reload_from_db()
def _get_plugin_actions(self) -> List[Dict[str, Any]]:
actions = []
for plugin in self.plugin_manager.plugins.values():
if not hasattr(plugin, "get_schedule_actions"):
continue
try:
plugin_actions = plugin.get_schedule_actions() or []
except Exception as e:
logger.error(f"读取插件 {plugin.name} 调度动作失败: {e}")
continue
for action in plugin_actions:
actions.append(
{
"plugin_name": plugin.name,
"action_key": action.get("action_key"),
"action_name": action.get("name", action.get("action_key", "")),
"description": action.get("description", ""),
"trigger_type": action.get("trigger_type", "at_times"),
"trigger_config": action.get("trigger_config", {"time_list": ["09:00"]}),
"target_scope": action.get("target_scope", "all_enabled_groups"),
"target_config": action.get("target_config", {}),
"payload": action.get("payload", {}),
"enabled": bool(action.get("default_enabled", False)),
}
)
return actions
def sync_defaults(self):
for item in self._get_plugin_actions():
if not item.get("plugin_name") or not item.get("action_key"):
continue
self.db.upsert_default_schedule(item)
def _resolve_targets(self, plugin, schedule_row: Dict[str, Any]) -> List[str]:
scope = str(schedule_row.get("target_scope") or "all_enabled_groups")
target_cfg = schedule_row.get("target_config") or {}
if scope == "single_group":
gid = str(target_cfg.get("group_id") or "").strip()
return [gid] if gid else []
if scope == "group_whitelist":
group_ids = target_cfg.get("group_ids") or []
return [str(x).strip() for x in group_ids if str(x).strip()]
# 默认:所有已启用群
all_groups = GroupBotManager.get_group_list()
if not getattr(plugin, "feature", None):
return all_groups
enabled_groups = []
for gid in all_groups:
if GroupBotManager.get_group_permission(gid, plugin.feature) == PermissionStatus.ENABLED:
enabled_groups.append(gid)
return enabled_groups
async def _run_one_schedule(self, schedule_row: Dict[str, Any]) -> Dict[str, Any]:
schedule_id = int(schedule_row["id"])
action_key = schedule_row.get("action_key")
plugin_name = schedule_row.get("plugin_name")
_, plugin = self.plugin_manager.find_plugin_by_name(plugin_name)
if not plugin:
detail = {"error": f"未找到插件: {plugin_name}"}
self.db.create_log(schedule_id, "failed", detail["error"], detail)
return {"success": False, "summary": detail["error"], "detail": detail}
if not hasattr(plugin, "run_scheduled_action"):
detail = {"error": f"插件 {plugin.name} 未实现 run_scheduled_action"}
self.db.create_log(schedule_id, "failed", detail["error"], detail)
return {"success": False, "summary": detail["error"], "detail": detail}
targets = self._resolve_targets(plugin, schedule_row)
payload = schedule_row.get("payload") or {}
ctx = {
"schedule_id": schedule_id,
"triggered_at": datetime.now().isoformat(timespec="seconds"),
"target_scope": schedule_row.get("target_scope"),
"target_config": schedule_row.get("target_config") or {},
"target_groups": targets,
"payload": payload,
"bot": getattr(plugin, "bot", None),
}
try:
res = await plugin.run_scheduled_action(action_key, ctx)
if not isinstance(res, dict):
res = {"success": bool(res), "summary": "插件返回非 dict已兼容处理", "detail": {"result": str(res)}}
except Exception as e:
res = {"success": False, "summary": f"执行异常: {e}", "detail": {"error": str(e)}}
status = "success" if res.get("success") else "failed"
summary = str(res.get("summary") or ("执行成功" if status == "success" else "执行失败"))
detail = res.get("detail") or {}
detail["target_count"] = len(targets)
self.db.create_log(schedule_id, status, summary, detail)
return {"success": status == "success", "summary": summary, "detail": detail}
def reload_from_db(self):
self.sync_defaults()
# 清理旧注册,避免重复
for job_id in list(self._schedule_job_map.values()):
async_job.remove_job(job_id)
self._schedule_job_map = {}
rows = self.db.list_enabled_schedules()
for row in rows:
schedule_id = int(row["id"])
async def _runner(_row=row):
await self._run_one_schedule(_row)
job_id = async_job.register_callable(
func=_runner,
trigger_type=row.get("trigger_type", "at_times"),
trigger_config=row.get("trigger_config", {"time_list": ["09:00"]}),
job_name=f"[插件调度]{row.get('plugin_name')}:{row.get('action_name')}",
description=row.get("description", ""),
job_key=f"plugin_schedule:{schedule_id}",
)
self._schedule_job_map[schedule_id] = job_id
def list_schedules_with_runtime(self) -> List[Dict[str, Any]]:
db_rows = self.db.list_schedules()
runtime_rows = async_job.get_jobs_snapshot()
runtime_by_key = {row.get("job_key"): row for row in runtime_rows if row.get("job_key")}
data = []
for row in db_rows:
key = f"plugin_schedule:{row['id']}"
runtime = runtime_by_key.get(key, {})
merged = dict(row)
merged["runtime_job_id"] = runtime.get("id")
merged["running"] = runtime.get("running", False)
merged["trigger_text"] = runtime.get("trigger_text", "")
merged["next_run_at"] = runtime.get("next_run_at")
merged["last_run_at"] = runtime.get("last_run_at")
merged["last_status"] = runtime.get("last_status")
merged["last_error"] = runtime.get("last_error")
merged["last_duration_ms"] = runtime.get("last_duration_ms")
merged["run_count"] = runtime.get("run_count", 0)
merged["success_count"] = runtime.get("success_count", 0)
merged["fail_count"] = runtime.get("fail_count", 0)
data.append(merged)
return data
def trigger_now(self, schedule_id: int) -> (bool, str):
job_key = f"plugin_schedule:{int(schedule_id)}"
job_id = async_job.get_job_id_by_key(job_key)
if not job_id:
self.reload_from_db()
job_id = async_job.get_job_id_by_key(job_key)
if not job_id:
return False, "该调度未启用或未加载"
return async_job.trigger_job_now(job_id, operator="dashboard")
def update_schedule(self, schedule_id: int, updates: Dict[str, Any]) -> bool:
ok = self.db.update_schedule(int(schedule_id), updates)
if ok:
self.reload_from_db()
return ok
def get_logs(self, schedule_id: int, limit: int = 100) -> List[Dict[str, Any]]:
return self.db.get_logs(int(schedule_id), limit=limit)
def get_available_plugin_actions(self) -> List[Dict[str, Any]]:
return self._get_plugin_actions()