目录html
正文java
咱们知道线程池运行时,会不断从任务队列中获取任务,而后执行任务。若是咱们想实现延时或者定时执行任务,重要一点就是任务队列会根据任务延时时间的不一样进行排序,延时时间越短地就排在队列的前面,先被获取执行。数组
队列是先进先出的数据结构,就是先进入队列的数据,先被获取。可是有一种特殊的队列叫作优先级队列,它会对插入的数据进行优先级排序,保证优先级越高的数据首先被获取,与数据的插入顺序无关。安全
实现优先级队列高效经常使用的一种方式就是使用堆。数据结构
回到顶部多线程
堆一般是一个能够被看作一棵树的数组对象。并发
堆(heap)又被为优先队列(priority queue)。尽管名为优先队列,但堆并非队列。动画
由于队列中容许的操做是先进先出(FIFO),在队尾插入元素,在队头取出元素。this
而堆虽然在堆底插入元素,在堆顶取出元素,可是堆中元素的排列不是按照到来的前后顺序,而是按照必定的优先顺序排列的。spa
这里来讲明一下满二叉树的概念与彻底二叉树的概念。
除了叶子节点,全部的节点的左右孩子都不为空,就是一棵满二叉树,以下图。
能够看出:满二叉树全部的节点都拥有左孩子,又拥有右孩子。
不必定是一个满二叉树,但它不满的那部分必定在右下侧,以下图
堆老是知足下列性质:
堆中某个节点的值老是不大于或不小于其父节点的值;
堆老是一棵彻底二叉树。
堆是一个二叉树,可是它最简单的方式是经过数组去实现二叉树,并且由于堆是一个彻底二叉树,就不存在数组空间的浪费。怎么使用数组来存储二叉树呢?
就是用数组的下标来模拟二叉树的各个节点,好比说根节点就是0,第一层的左节点是1,右节点是2。由此咱们能够得出下列公式:
1 // 对于n位置的节点来讲: 2 int left = 2 * n + 1; // 左子节点 3 int right = 2 * n + 2; // 右子节点 4 int parent = (n - 1) / 2; // 父节点,固然n要大于0,根节点是没有父节点的
对于堆来讲,只有两个操做,插入insert和删除remove,无论插入仍是删除保证堆的成立条件,1.是彻底二叉树,2.父节点的值不能小于子节点的值。
1 public void insert(int value) { 2 // 第一步将插入的值,直接放在最后一个位置。并将长度加一 3 store[size++] = value; 4 // 获得新插入值所在位置。 5 int index = size - 1; 6 while(index > 0) { 7 // 它的父节点位置坐标 8 int parentIndex = (index - 1) / 2; 9 // 若是父节点的值小于子节点的值,你不知足堆的条件,那么就交换值 10 if (store[index] > store[parentIndex]) { 11 swap(store, index, parentIndex); 12 index = parentIndex; 13 } else { 14 // 不然表示这条路径上的值已经知足降序,跳出循环 15 break; 16 } 17 } 18 }
主要步骤:
直接将value插入到size位置,并将size自增,这样store数组中插入一个值了。
要保证从这个叶节点到根节点这条路径上的节点,知足父节点的值不能小于子节点。
经过int parentIndex = (index - 1) / 2获得父节点,若是比父节点值大,那么二者位置的值交换,而后再拿这个父节点和它的父父节点比较。
直到这个节点值比父节点值小,或者这个节点已是根节点就退出循环。
由于每次循环index都是除以2这种倍数递减的方式,因此它最多循环次数是(log N)次。
1 public int remove() { 2 // 将根的值记录,最后返回 3 int result = store[0]; 4 // 将最后位置的值放到根节点位置 5 store[0] = store[--size]; 6 int index = 0; 7 // 经过循环,保证父节点的值不能小于子节点。 8 while(true) { 9 int leftIndex = 2 * index + 1; // 左子节点 10 int rightIndex = 2 * index + 2; // 右子节点 11 // leftIndex >= size 表示这个子节点尚未值。 12 if (leftIndex >= size) break; 13 int maxIndex = leftIndex; 14 //找到左右节点中较大的一个节点 15 if (store[leftIndex] < store[rightIndex]) maxIndex = rightIndex; 16 //与子节点中较大的子节点比较,若是子节点更大,则交换位置 17 //为何要与较大的子节点比较呢?若是和较小的节点比较,没有交换位置,但有可能比较大的节点小 18 if (store[index] < store[maxIndex]) { 19 swap(store, index, maxIndex); 20 index = maxIndex; 21 } else { 22 //知足子节点比当前节点小,退出循环 23 break; 24 } 25 } 26 //返回最开始的第一个值 27 return result; 28 }
在堆中最大值就在根节点,因此操做步骤:
将根节点的值保存到result中。
将最后节点的值移动到根节点,再将长度减一,这样知足堆成立第一个条件,堆是一个彻底二叉树。
使用循环,来知足堆成立的第二个条件,父节点的值不能小于子节点的值。
最后返回result。
每次循环咱们都是以2的倍数递增,因此它也是最多循环次数是(log N)次。
因此经过堆这种方式能够快速实现优先级队列,它的插入和删除操做的效率都是O(log N)。
那么怎么实现堆排序?这个很简单,利用优先队列的特性:
1 private static void headSort(int[] arr) { 2 int size = arr.length; 3 Head head = new Head(size); 4 for (int i = 0; i < size; i++) { 5 head.insert(arr[i]); 6 } 7 for (int i = 0; i < size; i++) { 8 // 实现从大到小的排序 9 arr[size - 1 - i] = head.remove(); 10 } 11 }
堆排序的效率:由于每次插入数据效率是O(log N),而咱们须要进行n次循环,将数组中每一个值插入到堆中,因此它的执行时间是O(N * log N)级。
1 static class DelayedWorkQueue extends AbstractQueue<Runnable> 2 implements BlockingQueue<Runnable> {
从定义中看出DelayedWorkQueue是一个阻塞队列。而且DelayedWorkQueue是一个最小堆,最顶点的值最小,即堆中某个节点的值老是不小于其父节点的值。
1 // 初始时,数组长度大小。 2 private static final int INITIAL_CAPACITY = 16; 3 // 使用数组来储存队列中的元素。 4 private RunnableScheduledFuture<?>[] queue = 5 new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; 6 // 使用lock来保证多线程并发安全问题。 7 private final ReentrantLock lock = new ReentrantLock(); 8 // 队列中储存元素的大小 9 private int size = 0; 10 11 //特指队列头任务所在线程 12 private Thread leader = null; 13 14 // 当队列头的任务延时时间到了,或者有新的任务变成队列头时,用来唤醒等待线程 15 private final Condition available = lock.newCondition();
DelayedWorkQueue是用数组来储存队列中的元素,那么咱们看看它是怎么实现优先级队列的。
1 public void put(Runnable e) { 2 offer(e); 3 } 4 5 public boolean add(Runnable e) { 6 return offer(e); 7 } 8 9 public boolean offer(Runnable e, long timeout, TimeUnit unit) { 10 return offer(e); 11 }
咱们发现与普通阻塞队列相比,这三个添加方法都是调用offer方法。那是由于它没有队列已满的条件,也就是说能够不断地向DelayedWorkQueue添加元素,当元素个数超过数组长度时,会进行数组扩容。
1 public boolean offer(Runnable x) { 2 if (x == null) 3 throw new NullPointerException(); 4 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; 5 // 使用lock保证并发操做安全 6 final ReentrantLock lock = this.lock; 7 lock.lock(); 8 try { 9 int i = size; 10 // 若是要超过数组长度,就要进行数组扩容 11 if (i >= queue.length) 12 // 数组扩容 13 grow(); 14 // 将队列中元素个数加一 15 size = i + 1; 16 // 若是是第一个元素,那么就不须要排序,直接赋值就好了 17 if (i == 0) { 18 queue[0] = e; 19 setIndex(e, 0); 20 } else { 21 // 调用siftUp方法,使插入的元素变得有序。 22 siftUp(i, e); 23 } 24 // 表示新插入的元素是队列头,更换了队列头, 25 // 那么就要唤醒正在等待获取任务的线程。 26 if (queue[0] == e) { 27 leader = null; 28 // 唤醒正在等待等待获取任务的线程 29 available.signal(); 30 } 31 } finally { 32 lock.unlock(); 33 } 34 return true; 35 }
数组扩容方法:
1 private void grow() { 2 int oldCapacity = queue.length; 3 // 每次扩容增长原来数组的一半数量。 4 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 5 if (newCapacity < 0) // overflow 6 newCapacity = Integer.MAX_VALUE; 7 // 使用Arrays.copyOf来复制一个新数组 8 queue = Arrays.copyOf(queue, newCapacity); 9 }
插入元素排序siftUp方法:
1 private void siftUp(int k, RunnableScheduledFuture<?> key) { 2 // 当k==0时,就到了堆二叉树的根节点了,跳出循环 3 while (k > 0) { 4 // 父节点位置坐标, 至关于(k - 1) / 2 5 int parent = (k - 1) >>> 1; 6 // 获取父节点位置元素 7 RunnableScheduledFuture<?> e = queue[parent]; 8 // 若是key元素大于父节点位置元素,知足条件,那么跳出循环 9 // 由于是从小到大排序的。 10 if (key.compareTo(e) >= 0) 11 break; 12 // 不然就将父节点元素存放到k位置 13 queue[k] = e; 14 // 这个只有当元素是ScheduledFutureTask对象实例才有用,用来快速取消任务。 15 setIndex(e, k); 16 // 从新赋值k,寻找元素key应该插入到堆二叉树的那个节点 17 k = parent; 18 } 19 // 循环结束,k就是元素key应该插入的节点位置 20 queue[k] = key; 21 setIndex(key, k); 22 }
主要是三步:
咱们来看看动画
假设现有元素 5 须要插入,为了维持彻底二叉树的特性,新插入的元素必定是放在结点 6 的右子树;同时为了知足任一结点的值要小于左右子树的值这一特性,新插入的元素要和其父结点做比较,若是比父结点小,就要把父结点拉下来顶替当前结点的位置,本身则依次不断向上寻找,找到比本身大的父结点就拉下来,直到没有符合条件的值为止。
动画讲解:
在这里先将元素 5 插入到末尾,即放在结点 6 的右子树。
而后与父类比较, 6 > 5 ,父类数字大于子类数字,子类与父类交换。
重复此操做,直到不发生替换。
1 public RunnableScheduledFuture<?> poll() { 2 final ReentrantLock lock = this.lock; 3 lock.lock(); 4 try { 5 RunnableScheduledFuture<?> first = queue[0]; 6 // 队列头任务是null,或者任务延时时间没有到,都返回null 7 if (first == null || first.getDelay(NANOSECONDS) > 0) 8 return null; 9 else 10 // 移除队列头元素 11 return finishPoll(first); 12 } finally { 13 lock.unlock(); 14 } 15 }
1 public long getDelay(TimeUnit unit) { 2 return unit.convert(time - now(), NANOSECONDS); 3 }
当队列头任务是null,或者任务延时时间没有到,表示这个任务还不能返回,所以直接返回null。不然调用finishPoll方法,移除队列头元素并返回。
1 // 移除队列头元素 2 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { 3 // 将队列中元素个数减一 4 int s = --size; 5 // 获取队列末尾元素x 6 RunnableScheduledFuture<?> x = queue[s]; 7 // 原队列末尾元素设置为null 8 queue[s] = null; 9 if (s != 0) 10 // 将队列最后一个元素移动到对列头元素位置,而后向下排序 11 // 由于移除了队列头元素,因此进行从新排序。 12 siftDown(0, x); 13 setIndex(f, -1); 14 return f; 15 }
这个方法与咱们在第一节中,介绍堆的删除方法同样。
- 先将队列中元素个数减一。
- 将原队列末尾元素设置成队列头元素,再将队列末尾元素设置为null。
- 调用siftDown(0, x)方法,保证按照元素的优先级排序。
移除元素排序siftDown方法:
1 private void siftDown(int k, RunnableScheduledFuture<?> key) { 2 int half = size >>> 1; 3 // 经过循环,保证父节点的值不能大于子节点。 4 while (k < half) { 5 // 左子节点, 至关于 (k * 2) + 1 6 int child = (k << 1) + 1; 7 // 左子节点位置元素 8 RunnableScheduledFuture<?> c = queue[child]; 9 // 右子节点, 至关于 (k * 2) + 2 10 int right = child + 1; 11 // 若是左子节点元素值大于右子节点元素值,那么右子节点才是较小值的子节点。 12 // 就要将c与child值从新赋值 13 if (right < size && c.compareTo(queue[right]) > 0) 14 c = queue[child = right]; 15 // 若是父节点元素值小于较小的子节点元素值,那么就跳出循环 16 if (key.compareTo(c) <= 0) 17 break; 18 // 不然,父节点元素就要和子节点进行交换 19 queue[k] = c; 20 setIndex(c, k); 21 k = child; 22 } 23 // 循环结束,k就是元素key应该插入的节点位置 24 queue[k] = key; 25 setIndex(key, k); 26 }
咱们来看看动画
核心点:将最后一个元素填充到堆顶,而后不断的下沉这个元素。
假设要从节点 1 ,也能够称为取出节点 1 ,为了维持彻底二叉树的特性 ,咱们将最后一个元素 6 去替代这个 1 ;而后比较 1 和其子树的大小关系,若是比左右子树大(若是存在的话),就要从左右子树中找一个较小的值替换它,而它能本身就要跑到对应子树的位置,再次循环这种操做,直到没有子树比它小。
经过这样的操做,堆依然是堆,总结一下:
1 public RunnableScheduledFuture<?> take() throws InterruptedException { 2 final ReentrantLock lock = this.lock; 3 lock.lockInterruptibly(); 4 try { 5 for (;;) { 6 RunnableScheduledFuture<?> first = queue[0]; 7 // 若是没有任务,就让线程在available条件下等待。 8 if (first == null) 9 available.await(); 10 else { 11 // 获取任务的剩余延时时间 12 long delay = first.getDelay(NANOSECONDS); 13 // 若是延时时间到了,就返回这个任务,用来执行。 14 if (delay <= 0) 15 return finishPoll(first); 16 // 将first设置为null,当线程等待时,不持有first的引用 17 first = null; // don't retain ref while waiting 18 19 // 若是仍是原来那个等待队列头任务的线程, 20 // 说明队列头任务的延时时间尚未到,继续等待。 21 if (leader != null) 22 available.await(); 23 else { 24 // 记录一下当前等待队列头任务的线程 25 Thread thisThread = Thread.currentThread(); 26 leader = thisThread; 27 try { 28 // 当任务的延时时间到了时,可以自动超时唤醒。 29 available.awaitNanos(delay); 30 } finally { 31 if (leader == thisThread) 32 leader = null; 33 } 34 } 35 } 36 } 37 } finally { 38 if (leader == null && queue[0] != null) 39 // 唤醒等待任务的线程 40 available.signal(); 41 lock.unlock(); 42 } 43 }
若是队列中没有任务,那么就让当前线程在available条件下等待。若是队列头任务的剩余延时时间delay大于0,那么就让当前线程在available条件下等待delay时间。
1 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) 2 throws InterruptedException { 3 long nanos = unit.toNanos(timeout); 4 final ReentrantLock lock = this.lock; 5 lock.lockInterruptibly(); 6 try { 7 for (;;) { 8 RunnableScheduledFuture<?> first = queue[0]; 9 // 若是没有任务。 10 if (first == null) { 11 // 超时时间已到,那么就直接返回null 12 if (nanos <= 0) 13 return null; 14 else 15 // 不然就让线程在available条件下等待nanos时间 16 nanos = available.awaitNanos(nanos); 17 } else { 18 // 获取任务的剩余延时时间 19 long delay = first.getDelay(NANOSECONDS); 20 // 若是延时时间到了,就返回这个任务,用来执行。 21 if (delay <= 0) 22 return finishPoll(first); 23 // 若是超时时间已到,那么就直接返回null 24 if (nanos <= 0) 25 return null; 26 // 将first设置为null,当线程等待时,不持有first的引用 27 first = null; // don't retain ref while waiting 28 // 若是超时时间小于任务的剩余延时时间,那么就有可能获取不到任务。 29 // 在这里让线程等待超时时间nanos 30 if (nanos < delay || leader != null) 31 nanos = available.awaitNanos(nanos); 32 else { 33 Thread thisThread = Thread.currentThread(); 34 leader = thisThread; 35 try { 36 // 当任务的延时时间到了时,可以自动超时唤醒。 37 long timeLeft = available.awaitNanos(delay); 38 // 计算剩余的超时时间 39 nanos -= delay - timeLeft; 40 } finally { 41 if (leader == thisThread) 42 leader = null; 43 } 44 } 45 } 46 } 47 } finally { 48 if (leader == null && queue[0] != null) 49 // 唤醒等待任务的线程 50 available.signal(); 51 lock.unlock(); 52 } 53 }
与take方法相比较,就要考虑设置的超时时间,若是超时时间到了,尚未获取到有用任务,那么就返回null。其余的与take方法中逻辑同样。
http://www.javashuo.com/article/p-wpcnychj-ek.html
使用优先级队列DelayedWorkQueue,保证添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被获取。