ScheduledThreadPoolExecutor/Timer最小堆原理

java中的ScheduledThreadPoolExecutor用于定时任务, 它使用最小堆来存放定时任务, Timer也是采用最小堆, 它们之间的区别:java

  1. ScheduledThreadPoolExecutor是多线程的, 多个任务能够同时执行; 可是Timer是单线程的, 不会出现两个任务同时执行的问题, 可是若是某个任务卡顿了太长时间, 会形成其它的任务延迟相应的时间。若是ScheduledThreadPoolExecutor的线程数设置成1,那么和Timer是同样的.
  2. ScheduledThreadPoolExecutor取消任务以后, 会将定时任务从最小堆中移除; 可是Timer的最小堆实现, 没有删除节点的功能, 取消定时器时, 只是将这个任务标记为CANCELLED状态, 等触发时间到达时, 才会移除. 所以Timer的被取消的定时任务没法及时被GC回收.

因此, 尽可能用ScheduledThreadPoolExecutor来代替Timer算法

定义

本文主要讲ScheduledThreadPoolExecutor中的DelayedWorkQueue的最小堆实现。先看最小堆的含义: 一种彻底二叉树, 父结点的值小于或等于它的左子节点和右子节点. 以下图 数组

存储方式

DelayedWorkQueue使用数组来存储定时任务, 按照从上到下, 从左到右的顺序依次排列: 多线程

第K个节点的左子节点位于2K+1的位置, 右子节点位于2K+2的位置, 父结点位于(K-1)/2的位置this

插入节点

依然使用上图的例子, 如今要插入数据**0**
第1步, 插入末尾的位置 线程

第2步, 0比19要小, 须要向上移动 code

第3步, 0比2要小, 再上移 对象

第4步, 0比1要小, 再上移
至此结束.blog

删除节点

以上面的最后一图为例, 假设删除**1** 索引

使用最后一个结点来填充被移除的位置

19与较小的子节点(2)交换位置
至此结束

算法复杂度

N个数据的最小堆, 共有logN层, 最坏的状况下, 须要移动logN次.

源代码解读(jdk8)

ScheduledThreadPoolExecutor的最小堆实现为DelayedWorkQueue

插入任务

public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow(); //数组长度不够了, 增大一倍
                size = i + 1;
                if (i == 0) {
					//空列队, 直接放在首位置
                    queue[0] = e;
					//index用于快速定位对象在数组中的位置
                    setIndex(e, 0);
                } else {
				    //i为最末尾的位置, 从i开始上移
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

上移

private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;//计算节点k的父节点
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0) //子节点大于等于父节点, 父节点不须要再移动, 结束
                    break;
                queue[k] = e;  //父节点大于子节点, 将父节点移到下面
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key; //最终找到的合适位置
            setIndex(key, k);
        }

删除

public boolean remove(Object x) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = indexOf(x); //索引已经存放在了x的成员变量中, 直接取出
                if (i < 0)  //若是定时任务已经被取消了, index会被置成-1
                    return false;

                setIndex(queue[i], -1);
                int s = --size;
                RunnableScheduledFuture<?> replacement = queue[s]; //最末尾的对象
                queue[s] = null;
                if (s != i) { //若是s == i, 则表示x是最末尾的对象, 不须要更多的操做
                    siftDown(i, replacement);  //将最末尾的对象, 从位置i开始下移.(由siftDown来判断须要不须要下移)
                    if (queue[i] == replacement) //若是并无下移, 那确定就是须要上移了.
                        siftUp(i, replacement);
                }
                return true;
            } finally {
                lock.unlock();
            }
        }

下移

private void siftDown(int k, RunnableScheduledFuture<?> key) {
            int half = size >>> 1;
            while (k < half) {
                int child = (k << 1) + 1;  //左子节点位置
                RunnableScheduledFuture<?> c = queue[child]; //左子节点对象
                int right = child + 1; //右子节点位置
                if (right < size && c.compareTo(queue[right]) > 0) //左右子节点中, 选出较小的那个子节点
                    c = queue[child = right];
                if (key.compareTo(c) <= 0) //key比子节点都小, 不须要再移动
                    break;
                queue[k] = c; //较小的子节点上移
                setIndex(c, k);
                k = child;
            }
            queue[k] = key; //为key找到的最终位置
            setIndex(key, k);
        }
相关文章
相关标签/搜索