CountDownLatch是同步工具类之一,能够指定一个计数值,在并发环境下由线程进行减1操做,当计数值变为0以后,被await方法阻塞的线程将会唤醒,实现线程间的同步。java
CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,以后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。node
public static void main(String[] args) { int threadNum = 10; final CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (int i = 0; i < threadNum; i++) { final int tNum = i + 1; new Thread(() -> { System.out.println("thread " + tNum + " start"); Random random = new Random(); try { Thread.sleep(random.nextInt(10000) + 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread " + tNum + " finish"); countDownLatch.countDown();//每个线程执行完就会减一 }).start(); } //主线程启动10个子线程后阻塞在await方法,须要等子线程都执行完毕,主线程才能唤醒继续执行。 try { countDownLatch.await();//当前线程被阻塞 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(threadNum + " thread finish"); }
CountDownLatch和ReentrantLock同样,内部使用Sync继承AQS。Sync的构造函数接收了计数值并设置为state。咱们知道AQS的state是一个由子类决定含义的“状态”。对于ReentrantLock来讲,state是线程获取锁的次数;对于CountDownLatch来讲,则表示计数值的大小。并发
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
Sync(int count) { setState(count);//设置state,表示计数值的大小 }
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted())//判断当前线程是否被中断 throw new InterruptedException(); //tryAcquireShared:是否须要阻塞当前线程 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg);//由CountDownLatch实现阻塞当前线程的逻辑。 }
/** * tryAcquireShared方法其实就是判断一下当前计数器的值,是否为0。 * 为0则返回1:表示当前线程不会阻塞在countDownLatch.await() */ protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//建立共享的node节点,并加入等待队列 boolean failed = true; try { for (;;) {//死循环 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg);//判断state是否为0 if (r >= 0) { setHeadAndPropagate(node, r);//设置本身为head,并通知其余等待的线程 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
以上对于countDownLatch.await()方法是怎么“阻塞”当前线程的咱们已经很明白了,即当咱们调用了countDownLatch.await()方法后,你当前线程就会进入了一个死循环当中,在这个死循环里面,会不断的进行判断,经过调用tryAcquireShared方法,不断判断咱们上面说的那个计数器,看看它的值是否为0了(为0的时候,其实就是咱们调用了足够屡次数的countDownLatch.countDown()方法的时候),若是是为0的话,tryAcquireShared就会返回1,代码会执行if(r>=0){...},而后跳出了循环,也就再也不“阻塞”当前线程了。须要注意的是,说是在不停的循环,其实也并不是在不停的执行for循环里面的内容,由于在后面调用parkAndCheckInterrupt()方法时,在这个方法里面是会调用 LockSupport.park(this);,来禁用当前线程的。dom
//释放锁的操做,每调用一次,计数值减小1 public void countDown() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//尝试释放锁,具体实如今CountDownLatch中 doReleaseShared(); return true; } return false; }
protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc))//state的减1操做 return nextc == 0;//当计数值等于0,表明全部子线程都执行完毕,被await阻塞的线程就能够被唤醒 } }
//唤醒被await阻塞的线程 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //头节点状态若是SIGNAL,则状态重置为0, if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h);//唤醒下个节点 } //被唤醒的节点状态会重置成0,在下一次循环中被设置成PROPAGATE状态, //表明状态要向后传播。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);//设置非CANCELLED节点的等待状态为0 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//唤醒节点 }
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared())//若是节点为共享类型 doReleaseShared();//释放,下个节点被唤醒后,再次唤醒后续的等待节点,达到共享状态向后传播。 } }
参考地址:函数