🌑

Shawn Fux

Spring实现动态添加停止定时任务

在 Spring-Context 模块 scheduling 包下提供了一个定时任务管理工具,一般我们使用都是基于注解事先硬编码好的,但是如果想实现动态的添加和停止定时任务视乎就不是那么容易,本文通过分析源码来实现动态添加删除的需求。

@EnableScheduling @Scheduled

一般我们在使用 Spring 为我们提供的定时任务机制时,首先会在 Spring 配置类上标注一个 @EnableScheduling 注解表示开启定时任务管理,然后在需要实现定时任务的方法上标注 @Scheduled 注解。那么接下来首先先分析 @EnableScheduling 注解,依据笔者的经验来看,一般 Spring 中以 Enable 打头的注解通常会导入一些配置类来实现功能:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {

}

而在 @EnableScheduling 注解里面果然导入了一个 SchedulingConfiguration 配置类:

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {

   @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
   @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
   public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
      return new ScheduledAnnotationBeanPostProcessor();
   }

}

而在 SchedulingConfiguration 配置类里面,做的事情比较少,就是将一个 ScheduledAnnotationBeanPostProcessor 注册成为 Bean 到 Spring 容器,熟悉 Spring 生命周期的都应该知道,像 BeanPostProcessor 实现类都是在 Bean 的创建过程中,可以收集 Bean 的信息做一些特殊的处理,那其实看到这里,我们可以大胆猜测,ScheduledAnnotationBeanPostProcessor 这个 Bean 后置处理器做的工作就是收集那些带有 @Scheduled 注解的方法,然后将其封装成定时任务去帮我们提交到线程池去执行,那么其实我们只要找到它提交任务的线程池,那么我们拿到这个线程池是不是也可以自己去动态的提交任务了?

ScheduledAnnotationBeanPostProcessor

因为前面说过,ScheduledAnnotationBeanPostProcessor 是实现了 BeanPostProcessor 接口的,所以我们可以优先从该接口的实现方法查找入手,这里我就直接告诉你,它是在 postProcessAfterInitialization 这个实现方法去解析 @Scheduled 注解的,它会找到带有 @Scheduled 的方法然后遍历方法上的注解,根据注解信息,调用 processScheduled 封装成不同的定时任务实现类,一般来说 @Scheduled 注解提供了三种定时任务的时间配置:

  1. 基于 cron 表达式的
  2. fixedDelay 方式配置
  3. fixedRate 方式配置

上面是 processScheduled 方法处理解析 @Scheduled 注解的 cron 值的代码片段,如果解析成功有值,则将对应的信息封装成了 CronTask 类,最后调用 this.registrar.scheduleCronTask 完成定时任务的注册,那么看到这里,就终于找到了我想要完成动态注册所需要的对象了,接下来就看看如何拿到这个 this.registrar 对象。

ScheduledTaskRegistrar

private final ScheduledTaskRegistrar registrar;

上面就是定义在 ScheduledAnnotationBeanPostProcessor 类的一个成员变量 registrar ,通过检索整个类,并未发现有任何公共方法可以获取到这个成员变量,而且这个 registrar 成员变量对象也不是 Bean 对象也就是说并不在 Spring 的容器,虽然可以拿到 ScheduledAnnotationBeanPostProcessor 这个 Bean 对象,然后使用反射暴力获取 registrar 成员变量,但是这种方式不够优雅。最后通过分析 ScheduledAnnotationBeanPostProcessor 的构造方法,让我找到了另一种方式:

public ScheduledAnnotationBeanPostProcessor() {
   this.registrar = new ScheduledTaskRegistrar();
}

/**
 * @since 5.1
 */
public ScheduledAnnotationBeanPostProcessor(ScheduledTaskRegistrar registrar) {
   Assert.notNull(registrar, "ScheduledTaskRegistrar is required");
   this.registrar = registrar;
}

可以看到 ScheduledAnnotationBeanPostProcessor 有两个构造方法,一个是无参的构造方法在其内部自动创建了一个 ScheduledTaskRegistrar 定时任务注册器,另外一个有参构造方法可以外部传入一个 ScheduledTaskRegistrar 对象,其实在我们使用 @EnableScheduling 注解的时候,它就是使用的默认的无参构造方法去创建的一个 ScheduledAnnotationBeanPostProcessor

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {

   @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
   @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
   public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
      // 默认使用无参构造创建注册的
      return new ScheduledAnnotationBeanPostProcessor();
   }

}

那么我们可以不使用 @EnableScheduling 注解,不让它帮我们去注册一个默认无参的 ScheduledAnnotationBeanPostProcessor ,我们可以自己这样做:

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {

    @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor(ScheduledTaskRegistrar scheduledTaskRegistrar) {
        return new ScheduledAnnotationBeanPostProcessor(scheduledTaskRegistrar);
    }

    @Bean
    public ScheduledTaskRegistrar scheduledTaskRegistrar() {
        return new ScheduledTaskRegistrar();
    }
 }

我们自己手动去注册一个有参构造的,并且指定使用我们自己创建定义的 ScheduledTaskRegistrar ,在这一步只是完成了整合,接下来就是拿到 ScheduledTaskRegistrar 如何自己去注册添加定时任务了。

这里我列举了几个重点常用的方法:

schedule

上面这些方法和注解的使用都是可以对应起来的,这些注册定时任务的方法返回值都是一个 ScheduledTask ,而这个返回值其中有一个 cancel 方法就可以取消对应的定时任务,那么我们其实只需要在注册的时候将这个返回值保存下来,将来如果要取消这个定时任务,找到对应的 ScheduledTask 调用 cancel 方法即可完成定时任务的取消。

分析到这里,其实我们可以发现,真正管理注册定时任务的是 ScheduledTaskRegistrar 对象,而 ScheduledAnnotationBeanPostProcessor 的主要工作就是找到所有带有 @Scheduled 注解的 Bean 对象,然后根据注解参数封装成对应的 Task 对象,最后调用 ScheduledTaskRegistrar 提供的方法来完成定时任务的注册,那么其实如果你不需要使用注解来定义定时任务的话,你完全可以不注册 ScheduledAnnotationBeanPostProcessor ,只要有 ScheduledTaskRegistrar 就可以工作了。

封装 ScheduledTaskRegistrar

import org.springframework.scheduling.config.*;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class ScheduledTaskRegistrarCenter {

    private final Map<String, ScheduledTask> scheduledTaskMap = new ConcurrentHashMap<>();

    private final ScheduledTaskRegistrar registrar;

    public ScheduledTaskRegistrarCenter(ScheduledTaskRegistrar registrar) {
        Assert.notNull(registrar, "ScheduledTaskRegistrar is required");
        this.registrar = registrar;
    }

    public ScheduledTask addTask(String taskName, Task task) {
        ScheduledTask scheduledTask;
        if (task instanceof CronTask) {
            scheduledTask = registrar.scheduleCronTask((CronTask) task);
        } else if (task instanceof FixedDelayTask) {
            scheduledTask = registrar.scheduleFixedDelayTask((FixedDelayTask) task);
        } else if (task instanceof FixedRateTask) {
            scheduledTask = registrar.scheduleFixedRateTask((FixedRateTask) task);
        } else if (task instanceof TriggerTask) {
            scheduledTask = registrar.scheduleTriggerTask((TriggerTask) task);
        } else {
            throw new IllegalArgumentException("unsupported type: " + task.getClass());
        }
        scheduledTaskMap.put(taskName, scheduledTask);
        return scheduledTask;
    }

    public boolean stopTask(String taskName) {
        ScheduledTask scheduledTask = scheduledTaskMap.get(taskName);
        if (scheduledTask != null) {
            scheduledTask.cancel();
            return true;
        }
        return false;
    }
}

通过提供一个统一的管理定时任务的对象,在注册的时候判断 Task 的类型,然后调用相应的定时任务注册方法,最后将返回的 ScheduledTask 保存到一个 Map 集合,在将来需要停止定时任务时,只需要传入当时注册定时任务的名字即可。

接下来就简单的使用一下:

@RestController
@RequestMapping
@Slf4j
public class DemoController {

    @Autowired
    private ScheduledTaskRegistrarCenter registrar;
    private final String taskName = "demoFixedDelayTask";

    @PostConstruct
    public void init() {
        registrar.addTask(taskName, new FixedDelayTask(() -> 
                log.warn("执行一下:{}", System.currentTimeMillis()), 1000, 0));
    }

    @GetMapping("/cancel")
    public void cancel() {
        System.out.println(registrar.stopTask(taskName));
    }
}

该演示类会在初始化的时候添加一个定时任务,然后你如果请求了 /cancel API 接口,那么就可以将该定时任务删除。

自定义线程池

如果你没有为 ScheduledTaskRegistrar 指定一个线程池的话,那么它会创建一个默认的线程池,源代码如下:

protected void scheduleTasks() {
   if (this.taskScheduler == null) {
      this.localExecutor = Executors.newSingleThreadScheduledExecutor();
      this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
   }
}

该方法目前已被标注为过时的,建议你自己通过构造方法指定一个自定义的线程池。

, — Mar 30, 2023