Files
abot/utils/system_jobs.py

181 lines
6.8 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Any, Awaitable, Callable, Dict, List
from loguru import logger
from db.system_job_db import SystemJobDBOperator
from utils.decorator.async_job import async_job
def get_system_job_definitions(robot) -> List[Dict[str, Any]]:
"""系统任务定义(业务函数映射)。
说明:这里只维护“任务 key 与业务函数”的绑定关系;
调度时间、启停状态全部从数据库 t_system_jobs 读取。
"""
return [
{
"job_key": "news_baidu_report_auto",
"name": "百度新闻日报",
"description": "每天 08:30 推送百度新闻",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["08:30"]},
"handler": robot.news_baidu_report_auto,
},
{
"job_key": "epic_free_games",
"name": "Epic 免费游戏推送",
"description": "每周五 10:00 推送 Epic 免费游戏",
"trigger_type": "every_weekday_time",
"trigger_config": {"weekday": 4, "time_str": "10:00"},
"handler": robot.send_epic_free_games,
},
{
"job_key": "message_count_to_db",
"name": "消息计数入库",
"description": "每天 02:30 将 Redis 消息计数写入 SQLite",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["02:30"]},
"handler": robot.message_count_to_db,
},
{
"job_key": "message_ranking_push",
"name": "群消息排行推送",
"description": "每天 09:30 生成并发送群消息排行",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["09:30"]},
"handler": robot.generate_and_send_ranking,
},
{
"job_key": "sehuatang_pdf_push",
"name": "涩图 PDF 推送",
"description": "每天 15:30 生成并发送涩图 PDF",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["15:30"]},
"handler": robot.generate_sehuatang_pdf,
},
{
"job_key": "xiuren_download",
"name": "秀人网下载任务",
"description": "每天 01:30 执行秀人网下载任务",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["01:30"]},
"handler": robot.xiu_ren_download_task,
},
{
"job_key": "shenshi_r15_download",
"name": "绅士 R15 下载任务",
"description": "每天 02:30 执行绅士 R15 下载任务",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["02:30"]},
"handler": robot.shen_shi_download_task,
},
{
"job_key": "login_check",
"name": "登录状态巡检",
"description": "每天 14:43 执行登录二次校验",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["14:43"]},
"handler": robot.login_twice_auto_auth,
},
{
"job_key": "update_image_cache",
"name": "图片缓存更新",
"description": "每天 05:00 扫描并更新图片缓存",
"trigger_type": "at_times",
"trigger_config": {"time_list": ["05:00"]},
"handler": _build_image_cache_handler(robot),
},
{
"job_key": "process_pending_images",
"name": "待下载图片补偿处理",
"description": "每 5 分钟处理一次待下载图片/表情,避免数据库锁竞争",
"trigger_type": "every_seconds",
"trigger_config": {"seconds": 300},
"handler": _build_process_pending_images_handler(robot),
},
]
def _build_image_cache_handler(robot) -> Callable[[], Awaitable[None]]:
async def _handler():
from plugins.xiuren_image.images_cache import ImageCacheManager
logger.info("开始执行图片缓存更新任务")
manager = ImageCacheManager("/mnt/nfs_share")
await manager.update_image_cache()
logger.info("图片缓存更新完成")
return _handler
def _build_process_pending_images_handler(robot) -> Callable[[], Awaitable[None]]:
async def _handler():
if hasattr(robot, "message_storage") and robot.message_storage:
await robot.message_storage.process_pending_images(minutes_ago=10, batch_size=20)
return _handler
class SystemJobLoader:
"""系统任务加载器:从数据库读取调度配置并注册到 async_job。"""
def __init__(self, robot, system_job_db: SystemJobDBOperator):
self.robot = robot
self.db = system_job_db
self._job_defs = {item["job_key"]: item for item in get_system_job_definitions(robot)}
self._registered_job_ids: List[str] = []
def init_and_load(self):
self.db.init_tables()
self._seed_defaults()
self.reload_from_db()
def _seed_defaults(self):
for item in self._job_defs.values():
existed = self.db.get_job(item["job_key"])
if existed:
continue
self.db.upsert_job(
{
"job_key": item["job_key"],
"name": item["name"],
"description": item.get("description", ""),
"trigger_type": item["trigger_type"],
"trigger_config": item["trigger_config"],
"enabled": True,
}
)
def reload_from_db(self):
# 每次重载前先补齐默认任务,避免误删后无法恢复
self._seed_defaults()
# 先移除当前注册任务,避免重复调度
for job_id in self._registered_job_ids:
async_job.remove_job(job_id)
self._registered_job_ids = []
jobs = self.db.list_jobs()
for row in jobs:
job_key = row.get("job_key")
if not row.get("enabled", 1):
continue
definition = self._job_defs.get(job_key)
if not definition:
logger.warning(f"系统任务 {job_key} 在代码中无处理器,已跳过注册")
continue
handler = definition["handler"]
job_id = async_job.register_callable(
func=handler,
trigger_type=row.get("trigger_type", definition["trigger_type"]),
trigger_config=row.get("trigger_config", definition["trigger_config"]),
job_name=row.get("name") or definition["name"],
description=row.get("description") or definition.get("description", ""),
job_key=job_key,
)
self._registered_job_ids.append(job_id)