java中的ScheduledThreadPoolExecutor用于定时任务, 它使用最小堆来存放定时任务, Timer也是采用最小堆, 它们之间的区别:java
因此, 尽可能用ScheduledThreadPoolExecutor来代替Timer算法
本文主要讲ScheduledThreadPoolExecutor中的DelayedWorkQueue的最小堆实现。先看最小堆的含义: 一种彻底二叉树, 父结点的值小于或等于它的左子节点和右子节点. 以下图 数组
DelayedWorkQueue使用数组来存储定时任务, 按照从上到下, 从左到右的顺序依次排列: 多线程
第K个节点的左子节点位于2K+1的位置, 右子节点位于2K+2的位置, 父结点位于(K-1)/2的位置this
依然使用上图的例子, 如今要插入数据**0
**
第1步, 插入末尾的位置 线程
第2步, 0比19要小, 须要向上移动 code
第3步, 0比2要小, 再上移 对象
第4步, 0比1要小, 再上移
至此结束.blog
以上面的最后一图为例, 假设删除**1
** 索引
使用最后一个结点来填充被移除的位置
19与较小的子节点(2)交换位置
至此结束
N个数据的最小堆, 共有logN层, 最坏的状况下, 须要移动logN次.
ScheduledThreadPoolExecutor的最小堆实现为DelayedWorkQueue
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); //数组长度不够了, 增大一倍 size = i + 1; if (i == 0) { //空列队, 直接放在首位置 queue[0] = e; //index用于快速定位对象在数组中的位置 setIndex(e, 0); } else { //i为最末尾的位置, 从i开始上移 siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; }
private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1;//计算节点k的父节点 RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) //子节点大于等于父节点, 父节点不须要再移动, 结束 break; queue[k] = e; //父节点大于子节点, 将父节点移到下面 setIndex(e, k); k = parent; } queue[k] = key; //最终找到的合适位置 setIndex(key, k); }
public boolean remove(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { int i = indexOf(x); //索引已经存放在了x的成员变量中, 直接取出 if (i < 0) //若是定时任务已经被取消了, index会被置成-1 return false; setIndex(queue[i], -1); int s = --size; RunnableScheduledFuture<?> replacement = queue[s]; //最末尾的对象 queue[s] = null; if (s != i) { //若是s == i, 则表示x是最末尾的对象, 不须要更多的操做 siftDown(i, replacement); //将最末尾的对象, 从位置i开始下移.(由siftDown来判断须要不须要下移) if (queue[i] == replacement) //若是并无下移, 那确定就是须要上移了. siftUp(i, replacement); } return true; } finally { lock.unlock(); } }
private void siftDown(int k, RunnableScheduledFuture<?> key) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; //左子节点位置 RunnableScheduledFuture<?> c = queue[child]; //左子节点对象 int right = child + 1; //右子节点位置 if (right < size && c.compareTo(queue[right]) > 0) //左右子节点中, 选出较小的那个子节点 c = queue[child = right]; if (key.compareTo(c) <= 0) //key比子节点都小, 不须要再移动 break; queue[k] = c; //较小的子节点上移 setIndex(c, k); k = child; } queue[k] = key; //为key找到的最终位置 setIndex(key, k); }