调试定时装饰器,优化代码

This commit is contained in:
liuwei
2025-03-20 15:08:04 +08:00
parent cf78facbad
commit 27220e0357
3 changed files with 89 additions and 36 deletions

View File

@@ -1,65 +1,110 @@
import functools import functools
import inspect import inspect
import logging import logging
import weakref
from typing import Callable, Optional, Dict, List, Any from typing import Callable, Optional, Dict, List, Any
LOG = logging.getLogger("JobDecorator") LOG = logging.getLogger("JobDecorator")
# 存储所有被装饰的任务 # 存储所有被装饰的任务
scheduled_tasks = [] scheduled_tasks = []
# 存储所有实例的弱引用
instances = []
def scheduled_job(cron: str, name: Optional[str] = None, enabled: bool = True): def scheduled_job(cron: str, name: Optional[str] = None, enabled: bool = True):
""" """
定时任务装饰器,用于标记需要定时执行的方法 定时任务装饰器,用于标记需要定时执行的方法
:param cron: cron表达式例如 "0 0 * * * *" 表示每小时执行一次 :param cron: cron表达式例如 "0 0 * * *" 表示每天0点执行一次
:param name: 任务名称,默认使用方法名 :param name: 任务名称,默认使用方法名
:param enabled: 是否启用该任务默认为True :param enabled: 是否启用该任务默认为True
""" """
def decorator(func): def decorator(func):
task_name = name or func.__name__ task_name = name or func.__name__
# 记录任务信息 # 记录任务信息
task_info = { task_info = {
"func": func, "func": func,
"cron": cron, "cron": cron,
"name": task_name, "name": task_name,
"enabled": enabled "enabled": enabled,
"method_name": func.__name__,
"class_name": func.__qualname__.split('.')[0] if '.' in func.__qualname__ else None
} }
scheduled_tasks.append(task_info) scheduled_tasks.append(task_info)
LOG.info(f"注册定时任务: {task_name}, cron: {cron}, enabled: {enabled}") LOG.info(f"注册定时任务: {task_name}, cron: {cron}, enabled: {enabled}")
@functools.wraps(func) @functools.wraps(func)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
# 如果是实例方法的第一次调用,注册实例
if args and not isinstance(args[0], type) and hasattr(args[0], '__class__'):
register_instance(args[0])
return func(*args, **kwargs) return func(*args, **kwargs)
return wrapper return wrapper
return decorator return decorator
# 修改 register_scheduled_jobs 函数
def register_scheduled_jobs(job_instance): def register_instance(instance):
"""注册所有标记了 @scheduled_job 装饰器的方法 """注册实例到全局实例列表"""
# 检查实例是否已注册
Args: for ref in instances:
job_instance: 包含定时任务的实例 if ref() is instance:
""" return
if not hasattr(job_instance, 'scheduled_jobs'):
return # 添加实例的弱引用
instances.append(weakref.ref(instance))
for task in job_instance.scheduled_jobs:
method = getattr(job_instance, task["method_name"]) # 清理已失效的弱引用
# 使用正确的方法添加任务 global instances
if hasattr(job_instance, 'add_job'): instances = [ref for ref in instances if ref() is not None]
job_instance.add_job(method, task["cron"], task["name"])
elif hasattr(job_instance, 'add_scheduled_job'): # 尝试为这个实例注册任务
job_instance.add_scheduled_job(method, task["cron"], task["name"]) register_tasks_for_instance(instance)
else:
# 如果没有合适的方法,记录错误
import logging def register_tasks_for_instance(instance):
logging.getLogger(__name__).error( """为指定实例注册任务"""
f"无法为 {job_instance.__class__.__name__} 添加定时任务," class_name = instance.__class__.__name__
f"缺少 add_job 或 add_scheduled_job 方法"
) for task in scheduled_tasks:
# 检查任务是否属于这个类
if task["class_name"] == class_name and hasattr(instance, task["method_name"]):
method = getattr(instance, task["method_name"])
# 注册任务
if hasattr(instance, 'add_job'):
LOG.info(f"{class_name} 添加定时任务: {task['method_name']}, cron: {task['cron']}")
instance.add_job(method, task["cron"], task["name"])
elif hasattr(instance, 'onEveryTime'):
# 将cron表达式转换为时间字符串 (简化处理,仅支持每天固定时间)
cron_parts = task["cron"].split()
if len(cron_parts) >= 2:
minute, hour = cron_parts[0], cron_parts[1]
# 移除可能的前导0
if minute.startswith('0') and len(minute) > 1:
minute = minute[1:]
if hour.startswith('0') and len(hour) > 1:
hour = hour[1:]
time_str = f"{hour}:{minute}"
LOG.info(f"{class_name} 添加定时任务: {task['method_name']}, 时间: {time_str}")
instance.onEveryTime(time_str, method)
def initialize_job_system():
"""初始化任务系统,扫描所有已创建的实例并注册任务"""
LOG.info("初始化任务系统")
# 清理已失效的弱引用
global instances
instances = [ref for ref in instances if ref() is not None]
# 为所有实例注册任务
for ref in instances:
instance = ref()
if instance:
register_tasks_for_instance(instance)

View File

@@ -8,7 +8,8 @@ from configuration import Config
from constants import ChatType from constants import ChatType
from robot import Robot, __version__ from robot import Robot, __version__
from wcferry import Wcf from wcferry import Wcf
# 在文件顶部导入装饰器
from job_decorators import initialize_job_system
def main(chat_type: int): def main(chat_type: int):
config = Config() config = Config()
@@ -53,6 +54,9 @@ def main(chat_type: int):
# 秀人网每天自动发pdf # 秀人网每天自动发pdf
robot.onEveryTime("17:30", robot.xiu_ren_pdf_send) robot.onEveryTime("17:30", robot.xiu_ren_pdf_send)
# 初始化任务系统,自动注册所有任务
initialize_job_system()
# 让机器人一直跑 # 让机器人一直跑
robot.keep_running_and_block_process() robot.keep_running_and_block_process()

View File

@@ -23,6 +23,7 @@ from base.func_claude import Claude
from configuration import Config from configuration import Config
from constants import ChatType from constants import ChatType
from game_task.game_task_encyclopedia import game_process_message, get_group_ids, run_random_task_assignment from game_task.game_task_encyclopedia import game_process_message, get_group_ids, run_random_task_assignment
from job_decorators import scheduled_job
from message_storage.message_to_db import MessageStorage from message_storage.message_to_db import MessageStorage
from plugin_common.event_system import EventType, EventSystem from plugin_common.event_system import EventType, EventSystem
from plugin_common.message_plugin_interface import MessagePluginInterface from plugin_common.message_plugin_interface import MessagePluginInterface
@@ -43,8 +44,6 @@ from xiuren.xiuren_pdf import generate_pdf_from_images
from db.connection import DBConnectionManager from db.connection import DBConnectionManager
from message_util import MessageUtil from message_util import MessageUtil
# 在文件顶部导入装饰器
from job_decorators import register_scheduled_jobs
class Robot(Job): class Robot(Job):
@@ -105,8 +104,6 @@ class Robot(Job):
# 消息存档模块初始化,自动完成入库动作 # 消息存档模块初始化,自动完成入库动作
self.message_storage = MessageStorage() self.message_storage = MessageStorage()
# 在初始化结束时注册所有被装饰的定时任务
register_scheduled_jobs(self)
if ChatType.is_in_chat_types(chat_type): if ChatType.is_in_chat_types(chat_type):
if chat_type == ChatType.TIGER_BOT.value and TigerBot.value_check(self.config.TIGERBOT): if chat_type == ChatType.TIGER_BOT.value and TigerBot.value_check(self.config.TIGERBOT):
self.chat = TigerBot(self.config.TIGERBOT) self.chat = TigerBot(self.config.TIGERBOT)
@@ -546,3 +543,10 @@ class Robot(Job):
self.wcf.send_file(pub_path, "45317011307@chatroom") self.wcf.send_file(pub_path, "45317011307@chatroom")
except Exception as e: except Exception as e:
self.LOG.error(f"xiu_ren_pdf_send error{e}") self.LOG.error(f"xiu_ren_pdf_send error{e}")
@scheduled_job("*/1 * * * *", name="test_job")
def send_message(self):
try:
self.wcf.send_text("定时任务执行!", "filehelper")
except Exception as e:
self.LOG.error(f"send_message error{e}")