Files
abot/db/plugin_schedule_db.py
liuwei 9652c2594e 系统业务任务插件化迁移:下沉7项非刚需任务并接入平滑迁移
- 系统任务保留刚需三项:登录巡检、消息计数入库、媒体补偿处理;移除新闻/Epic/排行/PDF/秀人维护等业务型系统任务定义\n- 新增 daily_news、epic_free、daily_ranking、sehuatang_push 四个插件,将原系统业务任务改为插件可调度动作\n- 扩展 xiuren_image 插件调度动作,新增秀人下载、绅士R15下载、图片缓存更新三项维护任务\n- 新增系统任务到插件任务的幂等迁移逻辑:按旧 job_key 映射到插件 action,同步 trigger_type/trigger_config/enabled,并通过 payload 标记防止反复覆盖\n- 在 Robot 启动流程中接入迁移执行与重载,并清理已迁移的历史系统任务记录,避免后台双份维护\n- 扩展插件调度数据库操作:支持按 plugin_name + action_key 精确查询,便于迁移与对账
2026-04-16 16:05:59 +08:00

202 lines
7.5 KiB
Python

# -*- coding: utf-8 -*-
import json
from typing import Any, Dict, List, Optional
from loguru import logger
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class PluginScheduleDBOperator(BaseDBOperator):
"""插件定时任务配置与日志表操作。"""
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
def init_tables(self) -> bool:
try:
self.execute_update(
"""
CREATE TABLE IF NOT EXISTS t_plugin_schedules (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
plugin_name VARCHAR(128) NOT NULL,
action_key VARCHAR(64) NOT NULL,
action_name VARCHAR(128) NOT NULL,
description VARCHAR(255) DEFAULT '',
trigger_type VARCHAR(64) NOT NULL,
trigger_config JSON NOT NULL,
target_scope VARCHAR(64) NOT NULL DEFAULT 'all_enabled_groups',
target_config JSON DEFAULT NULL,
payload JSON DEFAULT NULL,
enabled TINYINT(1) NOT NULL DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_plugin_action (plugin_name, action_key)
)
"""
)
self.execute_update(
"""
CREATE TABLE IF NOT EXISTS t_plugin_schedule_logs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
schedule_id BIGINT NOT NULL,
triggered_at DATETIME DEFAULT CURRENT_TIMESTAMP,
status VARCHAR(32) NOT NULL,
summary VARCHAR(255) DEFAULT '',
detail_json JSON DEFAULT NULL,
INDEX idx_schedule_time (schedule_id, triggered_at)
)
"""
)
return True
except Exception as e:
logger.error(f"初始化插件调度表失败: {e}")
return False
@staticmethod
def _parse_json_field(row: Dict[str, Any], key: str):
value = row.get(key)
if isinstance(value, str):
try:
row[key] = json.loads(value)
except json.JSONDecodeError:
row[key] = {}
elif value is None:
row[key] = {}
def list_schedules(self) -> List[Dict[str, Any]]:
rows = self.execute_query("SELECT * FROM t_plugin_schedules ORDER BY plugin_name, action_name") or []
for row in rows:
self._parse_json_field(row, "trigger_config")
self._parse_json_field(row, "target_config")
self._parse_json_field(row, "payload")
return rows
def list_enabled_schedules(self) -> List[Dict[str, Any]]:
rows = self.execute_query(
"SELECT * FROM t_plugin_schedules WHERE enabled = 1 ORDER BY plugin_name, action_name"
) or []
for row in rows:
self._parse_json_field(row, "trigger_config")
self._parse_json_field(row, "target_config")
self._parse_json_field(row, "payload")
return rows
def get_schedule(self, schedule_id: int) -> Optional[Dict[str, Any]]:
row = self.execute_query(
"SELECT * FROM t_plugin_schedules WHERE id = %s",
(schedule_id,),
fetch_one=True,
)
if not row:
return None
self._parse_json_field(row, "trigger_config")
self._parse_json_field(row, "target_config")
self._parse_json_field(row, "payload")
return row
def get_schedule_by_plugin_action(self, plugin_name: str, action_key: str) -> Optional[Dict[str, Any]]:
"""按插件名+动作键查询调度配置。"""
row = self.execute_query(
"""
SELECT * FROM t_plugin_schedules
WHERE plugin_name = %s AND action_key = %s
LIMIT 1
""",
(plugin_name, action_key),
fetch_one=True,
)
if not row:
return None
self._parse_json_field(row, "trigger_config")
self._parse_json_field(row, "target_config")
self._parse_json_field(row, "payload")
return row
def upsert_default_schedule(self, data: Dict[str, Any]) -> bool:
try:
sql = """
INSERT INTO t_plugin_schedules (
plugin_name, action_key, action_name, description,
trigger_type, trigger_config, target_scope, target_config, payload, enabled
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
action_name = VALUES(action_name),
description = VALUES(description)
"""
params = (
data["plugin_name"],
data["action_key"],
data["action_name"],
data.get("description", ""),
data["trigger_type"],
json.dumps(data.get("trigger_config", {}), ensure_ascii=False),
data.get("target_scope", "all_enabled_groups"),
json.dumps(data.get("target_config", {}), ensure_ascii=False),
json.dumps(data.get("payload", {}), ensure_ascii=False),
1 if data.get("enabled", False) else 0,
)
return self.execute_update(sql, params)
except Exception as e:
logger.error(f"upsert 插件默认调度失败: {e}, data={data}")
return False
def update_schedule(self, schedule_id: int, updates: Dict[str, Any]) -> bool:
fields = []
values = []
for key in (
"action_name",
"description",
"trigger_type",
"target_scope",
"enabled",
):
if key in updates:
fields.append(f"{key} = %s")
if key == "enabled":
values.append(1 if updates[key] else 0)
else:
values.append(updates[key])
for key in ("trigger_config", "target_config", "payload"):
if key in updates:
fields.append(f"{key} = %s")
values.append(json.dumps(updates.get(key, {}), ensure_ascii=False))
if not fields:
return True
values.append(schedule_id)
sql = f"UPDATE t_plugin_schedules SET {', '.join(fields)} WHERE id = %s"
return self.execute_update(sql, tuple(values))
def create_log(self, schedule_id: int, status: str, summary: str, detail: Dict[str, Any]) -> bool:
sql = """
INSERT INTO t_plugin_schedule_logs (schedule_id, status, summary, detail_json)
VALUES (%s, %s, %s, %s)
"""
params = (
schedule_id,
status,
summary,
json.dumps(detail or {}, ensure_ascii=False),
)
return self.execute_update(sql, params)
def get_logs(self, schedule_id: int, limit: int = 100) -> List[Dict[str, Any]]:
rows = self.execute_query(
"""
SELECT * FROM t_plugin_schedule_logs
WHERE schedule_id = %s
ORDER BY triggered_at DESC
LIMIT %s
""",
(schedule_id, int(limit)),
) or []
for row in rows:
self._parse_json_field(row, "detail_json")
return rows