要看CountDownLatch源码,你会发现其中的核心是由一个继承了AbstractQueuedSynchronizer类的静态内部类Sync。 实际上ReentrantLock,Semaphore等线程控制类的内部都是基于AbstractQueuedSynchronizer实现的。 先来了解一下AbstractQueuedSynchronizer。node
队列同步器:用来构建锁或者其它同步组件的基础框架。它内部使用一个int型成员变量state表示同步状态,经过一个内置的FIFO队列来完成资源获取线程的排队工做。 AQS就是提供了获取锁,FIFO排队,释放锁这些逻辑的基础框架。ReentrantLock,Semaphore,CountDownLatch等并发类都是基于AQS实现的。他们的内部都有一个静态内部类继承了AQS。 至关于就是说,ReentrantLock,Semaphore,CountDownLatch等并发类只是提供给使用者的门面,内部核心逻辑是由AQS实现的。
AQS中的一些核心方法:并发
# 修改线程同步状态的方法: getState(): 获取当前同步状态 setState(int newState): 设置当前同步装 compareAndSetState(int expect,int update):使用CAS乐观锁设置当前同步状态,保证原子性 # 独占式/共享式获取/释放同步状态: tryAcquire(int arg) 独占式获取同步状态 tryRelease(int arg) 独占式释放同步状态 tryAcquireShared(int arg) 共享式获取同步状态 tryReleaseShared(int arg) 共享式释放同步状态 isHeldExclusively() 当前同步器释放在独占模式下被线程占用。返回true表示被当前线程独占 # 获取同步队列的相关信息 getQueuedThreads() 获取等待在同步队列上的线程集合
同步器依赖于内部的FIFO队列来进行同步状态管理。当线程获取同步状态失败时,同步器会将当前线程构形成一个节点Node并加入同步队列,而且会阻塞当前线程。当同步状态释放时,会把队列中的第一个线程唤醒,使其获取同步状态。框架
那么首先来看看这个同步队列是什么样子的吧。 查看Node节点类:源码分析
static final class Node { /**标记这个节点是处于共享模式*/ static final Node SHARED = new Node(); /**标记这个节点是处于独占模式*/ static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /**表示线程的等待状态。 1表示cancelled, 表示这个等待的线程等待超时或者被中断,须要从同步队列中取消等待; -1表示signal,后继节点处于等待状态,而当前节点若是释放了同步状态或者被取消,将会唤醒后继节点,使其继续运行 -2表示condition,当前线程等待在condition上,当其余线程对condition调用了signal方法后,改节点就会移出等待队列,去获取同步状态; -3表示propagate,表示下一次共享式同步状态将会无条件传播下去; 0表示initial,初始状态*/ volatile int waitStatus; /**前一个节点*/ volatile Node prev; /**后一个节点*/ volatile Node next; /**也是用来表示下一个节点。若是当前节点是共享的,那么这个字段就是SHARED常量*/ Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } /**返回上一个节点*/ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
能够看出,Node节点中保存了线程引用,等待状态,是共享仍是独占,先后节点引用等信息。ui
再看AQS的三个属性:this
/**等待队列的头节点*/ private transient volatile Node head; /**等待队列的尾节点*/ private transient volatile Node tail; /**这就是表示同步状态的属性*/ private volatile int state;
在AQS中维护了同步队列的头节点和尾节点,以及同步状态。线程
因此同步队列的结构就很清楚了: code
那么线程是怎么被放入队列的呢?从acquire方法开始跟踪:blog
//这个方法就是尝试获取独占锁 public final void acquire(int arg) { //若是获取不到锁,就把当前线程加入到等待队列进行等待 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 打断当前线程,其实就是本身打断本身 selfInterrupt(); }
先看一下addWaiter方法,这个方法就是给当前线程构造一个Node节点。继承
private Node addWaiter(Node mode) { //mode表示当前线程是独占式仍是共享式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; //将当前节点加入到等待队列的尾部 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //没有尾节点的话就进行初始化 enq(node); return node; }
下面继续跟踪acquireQueued方法:
//这个方法就是在死循环等待锁 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取前一个节点 final Node p = node.predecessor(); //若是前一个节点是头节点,而且获取到了锁,那么当前节点这只为头节点,并返回。 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
也就是说,等待队列的头节点实际上就是当前得到锁的节点,后面的节点死循环一直等待。 可是只有头节点的后一个节点才会不断尝试获取锁,由于要保证FIFO顺序。 新进来的线程是放在队列尾部进行死循环等待的。
那么实际上等待队列是这样的流程:
下面再跟踪释放锁的过程:
//释放独占锁 public final boolean release(int arg) { //判断是否能够释放锁 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
这里当waitStatus不为0的时候,才会调用unparkSuccessor方法去唤醒后面一个方法。
什么场景下head节点的waitStatus不为0呢??
进入unparkSuccessor方法:
TODO 先略了,后面在补充。。。。。
了解的AQS的原理,那么看 CountDownLatch的源码就so easy了。由于CountDownLatch只是一个门面,核心逻辑就是AQS。