【本人秃顶程序员】SpringBoot中并发定时任务的实现、动态定时任务的实现(看这一篇就够了)

←←←←←←←←←←←← 快!点关注

1、在JAVA开发领域,目前能够经过如下几种方式进行定时任务

一、单机部署模式

  • Timer:jdk中自带的一个定时调度类,能够简单的实现按某一频度进行任务执行。提供的功能比较单一,没法实现复杂的调度任务。
  • ScheduledExecutorService:也是jdk自带的一个基于线程池设计的定时任务类。其每一个调度任务都会分配到线程池中的一个线程执行,因此其任务是并发执行的,互不影响。
  • Spring Task:Spring提供的一个任务调度工具,支持注解和配置文件形式,支持Cron表达式,使用简单但功能强大。
  • Quartz:一款功能强大的任务调度器,能够实现较为复杂的调度功能,如每个月一号执行、天天凌晨执行、每周五执行等等,还支持分布式调度,就是配置稍显复杂。

二、分布式集群模式(很少介绍,简单提一下)

问题java

  • 如何解决定时任务的屡次执行?
  • 如何解决任务的单点问题,实现任务的故障转移?

问题1的简单思考mysql

  • 固定执行定时任务的机器(能够有效避免屡次执行的状况 ,缺点就是单点故障问题)。
  • 借助Redis的过时机制和分布式锁。
  • 借助mysql的锁机制等。

成熟的解决方案spring

  • Quartz:能够去看看这篇文章Quartz分布式。
  • elastic-job:当开发的弹性分布式任务调度系统,采用zookeeper实现分布式协调,实现任务高可用以及分片。
  • xxl-job:是大众点评员发布的分布式任务调度平台,是一个轻量级分布式任务调度框架。
  • saturn:是惟品会提供一个分布式、容错和高可用的做业调度服务框架。

2、SpringTask实现定时任务(这里是基于springboot)

一、简单的定时任务实现

使用方式sql

使用@EnableScheduling注解开启对定时任务的支持。 使用@Scheduled 注解便可,基于corn、fixedRate、fixedDelay等一些定时策略来实现定时任务。数据库

使用缺点springboot

  • 多个定时任务使用的是同一个调度线程,因此任务是阻塞执行的,执行效率不高。
  • 其次若是出现任务阻塞,致使一些场景的定时计算没有实际意义,好比天天12点的一个计算任务被阻塞到1点去执行,会致使结果并不是咱们想要的。

使用优势bash

  • 配置简单
  • 适用于单个后台线程执行周期任务,而且保证顺序一致执行的场景

源码分析:服务器

//默认使用的调度器
if(this.taskScheduler == null) {  
    this.localExecutor = Executors.newSingleThreadScheduledExecutor();
    this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
//能够看到SingleThreadScheduledExecutor指定的核心线程为1,说白了就是单线程执行
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}
//利用了DelayedWorkQueue延时队列做为任务的存放队列,这样即可以实现任务延迟执行或者定时执行
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
复制代码

二、实现并发的定时任务

使用方式架构

方式一:由1中咱们知道之因此定时任务是阻塞执行,是配置的线程池决定的,那就好办了,换一个不就好了!直接上代码:并发

@Configuration
  public class ScheduledConfig implements SchedulingConfigurer {

      @Autowired
      private TaskScheduler myThreadPoolTaskScheduler;

      @Override
      public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
          //简单粗暴的方式直接指定
          //scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
          //也能够自定义的线程池,方便线程的使用与维护,这里很少说了
          scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
      }
  }

  @Bean(name = "myThreadPoolTaskScheduler")
  public TaskScheduler getMyThreadPoolTaskScheduler() {
      ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
      taskScheduler.setPoolSize(10);
      taskScheduler.setThreadNamePrefix("Haina-Scheduled-");
      taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
      //调度器shutdown被调用时等待当前被调度的任务完成
      taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
      //等待时长
      taskScheduler.setAwaitTerminationSeconds(60);
      return taskScheduler;
  }       
复制代码

方式二:方式一的本质改变了任务调度器默认使用的线程池,接下来这种是不改变调度器的默认线程池,而是把当前任务交给一个异步线程池去执行

  • 首先使用@EnableAsync 启用异步任务
  • 而后在定时任务的方法加上@Async便可,默认使用的线程池为SimpleAsyncTaskExecutor(该线程池默认来一个任务建立一个线程,就会不断建立大量线程,极有可能压爆服务器内存。固然它有本身的限流机制,这里就很少说了,有兴趣的本身翻翻源码~)
  • 项目中为了更好的控制线程的使用,咱们能够自定义咱们本身的线程池,使用方式@Async("myThreadPool")

废话太多,直接上代码:

@Scheduled(fixedRate = 1000*10,initialDelay = 1000*20)
  @Async("myThreadPoolTaskExecutor")
  //@Async
  public void scheduledTest02(){
      System.out.println(Thread.currentThread().getName()+"--->xxxxx--->"+Thread.currentThread().getId());
  }
  //自定义线程池
  @Bean(name = "myThreadPoolTaskExecutor")
  public TaskExecutor  getMyThreadPoolTaskExecutor() {
      ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
      taskExecutor.setCorePoolSize(20);
      taskExecutor.setMaxPoolSize(200);
      taskExecutor.setQueueCapacity(25);
      taskExecutor.setKeepAliveSeconds(200);
      taskExecutor.setThreadNamePrefix("Haina-ThreadPool-");
      // 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者
      taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
      //调度器shutdown被调用时等待当前被调度的任务完成
      taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
      //等待时长
      taskExecutor.setAwaitTerminationSeconds(60);
      taskExecutor.initialize();
      return taskExecutor;
  }
复制代码

线程池的使用心得

  • java中提供了ThreadPoolExecutor和ScheduledThreadPoolExecutor,对应与spring中的ThreadPoolTaskExecutor和ThreadPoolTaskScheduler,可是在原有的基础上增长了新的特性,在spring环境下更容易使用和控制。
  • 使用自定义的线程池可以避免一些默认线程池形成的内存溢出、阻塞等等问题,更贴合本身的服务特性
  • 使用自定义的线程池便于对项目中线程的管理、维护以及监控。
  • 即使在非spring环境下也不要使用java默认提供的那几种线程池,坑不少,阿里代码规约不说了吗,得相信大厂!!!

3、动态定时任务的实现

问题

使用@Scheduled注解来完成设置定时任务,可是有时候咱们每每须要对周期性的时间的设置会作一些改变,或者要动态的启停一个定时任务,那么这个时候使用此注解就不太方便了,缘由在于这个注解中配置的cron表达式必须是常量,那么当咱们修改定时参数的时候,就须要中止服务,从新部署。

解决办法方式一:实现SchedulingConfigurer接口,重写configureTasks方法,从新制定Trigger,核心方法就是addTriggerTask(Runnable task, Trigger trigger) ,不过须要注意的是,此种方式修改了配置值后,须要在下一次调度结束后,才会更新调度器,并不会在修改配置值时实时更新,实时更新须要在修改配置值时额外增长相关逻辑处理。

@Configuration
  public class ScheduledConfig implements SchedulingConfigurer {

  @Autowired
  private TaskScheduler myThreadPoolTaskScheduler;

  @Override
  public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
      //scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
      scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
      //能够实现动态调整定时任务的执行频率
      scheduledTaskRegistrar.addTriggerTask(
              //1.添加任务内容(Runnable)
              () -> System.out.println("cccccccccccccccc--->" + Thread.currentThread().getId()),
              //2.设置执行周期(Trigger)
              triggerContext -> {
                  //2.1 从数据库动态获取执行周期
                  String cron = "0/2 * * * * ? ";
                  //2.2 合法性校验.
  //                    if (StringUtils.isEmpty(cron)) {
  //                        // Omitted Code ..
  //                    }
                      //2.3 返回执行周期(Date)
                      return new CronTrigger(cron).nextExecutionTime(triggerContext);
                  }
          );
  }
  }
复制代码

方式二:使用threadPoolTaskScheduler类可实现动态添加删除功能,固然也可实现执行频率的调整

首先,咱们要认识下这个调度类,它实际上是对java中ScheduledThreadPoolExecutor的一个封装改进后的产物,主要改进有如下几点:

  • 提供默认配置,由于是ScheduledThreadPoolExecutor,因此只有poolSize这一个默认参数。
  • 支持自定义任务,经过传入Trigger参数。
  • 对任务出错处理进行优化,若是是重复性的任务,不抛出异常,经过日志记录下来,不影响下次运行,若是是只执行一次的任务,将异常往上抛。

顺便说下ThreadPoolTaskExecutor相对于ThreadPoolExecutor的改进点:

  • 提供默认配置,原生的ThreadPoolExecutor的除了ThreadFactory和RejectedExecutionHandler其余没有默认配置
  • 实现AsyncListenableTaskExecutor接口,支持对FutureTask添加success和fail的回调,任务成功或失败的时候回执行对应回调方法。
  • 由于是spring的工具类,因此抛出的RejectedExecutionException也会被转换为spring框架的TaskRejectedException异常(这个无所谓)
  • 提供默认ThreadFactory实现,直接经过参数重载配置

扯了这么多,仍是直接上代码:

@Component
  public class DynamicTimedTask {
      private static final Logger logger = LoggerFactory.getLogger(DynamicTimedTask.class);
      //利用建立好的调度类统一管理
      //@Autowired
      //@Qualifier("myThreadPoolTaskScheduler")
      //private ThreadPoolTaskScheduler myThreadPoolTaskScheduler;
      //接受任务的返回结果
      private ScheduledFuture<?> future;
      @Autowired
      private ThreadPoolTaskScheduler threadPoolTaskScheduler;
      //实例化一个线程池任务调度类,可使用自定义的ThreadPoolTaskScheduler
      @Bean
      public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
          ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
          return new ThreadPoolTaskScheduler();
      }

      /**
       * 启动定时任务
       * @return
       */
      public boolean startCron() {
          boolean flag = false;
          //从数据库动态获取执行周期
          String cron = "0/2 * * * * ? ";
          future = threadPoolTaskScheduler.schedule(new CheckModelFile(),cron);
          if (future!=null){
              flag = true;
              logger.info - 最佳的logger 来源和相关信息。("定时check训练模型文件,任务启动成功!!!");
          }else {
              logger.info - 最佳的logger 来源和相关信息。("定时check训练模型文件,任务启动失败!!!");
          }
          return flag;
      }

      /**
       * 中止定时任务
       * @return
       */
      public boolean stopCron() {
          boolean flag = false;
          if (future != null) {
              boolean cancel = future.cancel(true);
              if (cancel){
                  flag = true;
                  logger.info - 最佳的logger 来源和相关信息。("定时check训练模型文件,任务中止成功!!!");
              }else {
                  logger.info - 最佳的logger 来源和相关信息。("定时check训练模型文件,任务中止失败!!!");
              }
          }else {
              flag = true;
              logger.info - 最佳的logger 来源和相关信息。("定时check训练模型文件,任务已经中止!!!");
          }
          return flag;
      }
      class CheckModelFile implements Runnable{
          @Override
          public void run() {
              //编写你本身的业务逻辑  
              System.out.print("模型文件检查完毕!!!")
          }
      }
  }
复制代码

4、总结

到此基于springtask下的定时任务的简单使用算是差很少了,其中难免有些错误的地方,或者理解有偏颇的地方欢迎你们提出来!

读者福利:

分享免费学习资料

针对于还会准备免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料) 为何某些人会一直比你优秀,是由于他自己就很优秀还一直在持续努力变得更优秀,而你是否是还在知足于现状心里在窃喜!但愿读到这的您能点个小赞和关注下我,之后还会更新技术干货,谢谢您的支持!

资料领取方式:加入粉丝群963944895,私信管理员便可免费领取

相关文章
相关标签/搜索