Timer和ScheduledThreadPoolExecutor的定时任务

1 目录

2 调度概述

  • 1 说到调度,有最简单的Timer、ScheduledThreadPoolExecutor,又有Spring Task、quartz。数据库

  • 2 说到分布式调度,有基于数据库实现的quartz集群方案、当当网开源的基于ZooKeeper的elastic-jobapache

  • 3 说到大数据方向的调度,如apache的oozie、阿里的zeus。他们更可能是定制大数据方向的job,如MapReduce job,spark job,hive job等,还有解决了上面都没触碰的job依赖管理。数组

  • 4 说到集群资源的调度,如Yarn、Mesos。他们则是更高度的抽象,他们把调度拆分红资源调度任务调度,他们只负责资源方面的调度。微信

    资源调度:仅仅负责对集群全部机器的CPU 、内存、网络等资源进行统一规划和管理,有任务到来时,经过合理的分配(达到资源充分利用的目的)挑选出对应资源交付给任务来执行。网络

    任务调度:就要提到任务模型了,如MapReduce是一种任务模型,每一种任务模型有有对应的ApplicationMaster来负责任务调度,如MapReduce的就是MRAppMaster。MRAppMaster负责任务的分片,任务的failover,任务之间的逻辑、向Yarn申请资源来执行Map任务或者Reduce任务等等。目前Yarn已经集成了对MapReduce、Spark等任务模型的处理,若是你还想在Yarn上运行本身的任务模型,就须要实现一个本身的ApplicationMaster来负责任务调度。多线程

3 Timer

Timer是单线程的,比较简单,一个线程TimerThread 加一个任务队列TaskQueue,每个任务都是TimerTask类型的。分布式

提供了以下方式来调度任务的执行,这个就再也不说了,本身看下文档大数据

Timer的调度方法

3.1 TimerThread执行过程

见下图 TimerThread的执行过程spa

  • 1 一旦队列是空的,就进行等待,直到队列中有数据
  • 2 一旦Timer被取消,就会设置newTasksMayBeScheduled=false,而且清空队列。正常状况下走到这里队列是有数据的,没有数据则说明Timer被取消了,因此这里就退出while循环了,TimerThread执行结束了
  • 3 若是queue有任务,则取出最近要执行的任务,查看该任务是否取消了,一旦取消了,就从queue中移除,继续下一次循环
  • 4 若是该任务没有取消,则判断是否到达该任务的执行时间了,若是到达即taskFired=true,若是该任务不是周期任务,则直接从queue中删除该任务
  • 5 若是该任务时周期性任务,计算出下次执行时间,再将该任务放到queue中(实际上是重新调整该任务在队列中的位置)
  • 6 若是该任务还没到触发时间,则等待一段时间
  • 7 若是该任务触发了,就直接调用任务的run方法

存在的比较严重的问题:这里能够看到,这里并无对任务的run方法可能抛出的异常进行捕获,就会致使,一旦某个TimerTask任务抛出异常,就会致使TimerThread结束,Timer不可用。因此在使用Timer的时候,本身实现的TimerTask要对可能的异常进行捕获和处理。.net

3.2 TaskQueue对TimerTask的存储

从上面看到,是须要可以从TaskQueue中获取最近要执行的一个任务。若是对全部任务的执行时间进行排序存储也能够实现,可是该场景就没有必要,只须要知道一个最小的执行时间,因此使用二叉堆来进行存储,又分最大堆和最小堆,这里使用最小堆,这里再也不详细说明。

最小堆:就是全部的父节点的值都比左右孩子节点的值小。因此最小的值必定是在堆顶。

二叉堆通常使用数组来实现,TaskQueue实现以下:

private TimerTask[] queue = new TimerTask[128];
private int size = 0;

size用于标记该数组中任务的个数

添加一个任务的过程:

void add(TimerTask task) {
    // Grow backing store if necessary
    if (size + 1 == queue.length)
        queue = Arrays.copyOf(queue, 2*queue.length);

    queue[++size] = task;
    fixUp(size);
}

这里会先判断是否须要扩容,而后会将任务放到数组的末尾,而后调用fixUp(size)方法来调整该任务在数组中的位置,以达到二叉堆的特性。

private void fixUp(int k) {
    while (k > 1) {
        int j = k >> 1;
        if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
            break;
        TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
        k = j;
    }
}

这里也很简单,就是不断的找出该任务的父节点,判断该任务节点的下一次执行时间和父节点的下一次执行时间的大小,若是小于父节点的话,则与父节点交换位置,继续往上查找父节点再重复上述比较。

4 ScheduledThreadPoolExecutor

4.1 继承ThreadPoolExecutor

Timer是单线程的,而ScheduledThreadPoolExecutor是多线程的。ScheduledThreadPoolExecutor继承了线程池ThreadPoolExecutor,总体的执行流程不变,仍是有那几个核心东西

  • int corePoolSize
  • int maximumPoolSize
  • long keepAliveTime
  • BlockingQueue<Runnable> workQueue
  • ThreadFactory threadFactory
  • RejectedExecutionHandler handler

ThreadPoolExecutor这一部分能够参考我以前的文章线程池系列分析

从ThreadPoolExecutor的原理中能够看到并无对job的定时调度功能,它里面的Worker并不会去延迟执行一个任务,由于它是通用的,而Timer中的TimerThread是专用的,能够将延迟逻辑放到TimerThread中,而让TaskQueue更轻量的专作简单的二叉堆存储操做,ScheduledThreadPoolExecutor如何来基于ThreadPoolExecutor来实现定时任务呢?

那就只能将TimerThread中作的延迟逻辑放到queue中来作,ScheduledThreadPoolExecutor为此实现了一个DelayedWorkQueue。在取任务的时候会作延迟逻辑。

4.2 实现定时功能

咱们知道ThreadPoolExecutor会调用BlockingQueue<Runnable> workQueue的offer方法添加任务,会调用task、poll方法来获取任务,来看看DelayedWorkQueue是如何实现这2个操做的

对任务的添加:也是采用二叉堆形式来存放这些任务的,和上述Timer的添加任务方法相似,存储结构以下

private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture[] queue = new RunnableScheduledFuture[INITIAL_CAPACITY];
private int size = 0;

对于任务的获取:普通BlockingQueue<Runnable> workQueue中有数据了,poll方法就能够取到数据,而DelayedWorkQueue则不是,只有当有数据,而且达到任务执行时间了,才能poll出数据,从而实现对定时任务的调度。下面来详细看下这个poll过程

DelayedWorkQueue的poll过程

  • 1 因为在多线程环境下采用数据来存储,须要使用锁来控制
  • 2 而后判断数组中第一个是否为null,若是为null,而且没有等待时间已经用完,就直接返回了,本次poll就没有拿到数据
  • 3 若是数组中第一个是null,则等待一段时间,知道时间超时或者被唤醒(在添加任务的时候会进行唤醒操做)
  • 4 若是数组中第一个任务不为null,而且到了触发时间,则直接从数组中取出该任务,而后进行必定的调整,是数组中的数据仍然知足二叉堆的性质,而后将取出的任务返回,用于去执行
  • 5 若是数组中的任务没有到触发时间,则等待到它的触发时间

5 未完待续

下一篇就来讲说quartz的简单模式,Spring Task对定时任务的封装,以及quartz基于数据库的集群模式。

欢迎关注微信公众号:乒乓狂魔

微信公众号

相关文章
相关标签/搜索