目录java
AQS是AbstractQueuedSynchronizer(抽象队列同步器)的缩写。它是多线程访问共享资源的框架,ReentrantLock、CountDownLatch、Semaphore等都是基于它来实现的。node
从图中能够看到,有两个关键的组成部分,一个是state(共享资源,也能够理解为资源占用计数器),另外一个是FIFO队列,用来保存须要得到共享资源的县城,其head节点始终指向当前真在占用共享资源的线程。多线程
进入等待队列的线程会被封装成一个Node。其主要成员以下:框架
class Node { //在同步队列中等待的线程等待超时或被中断,须要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。 static final int CANCELLED = 1; //值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。 static final int SIGNAL = -1; //与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其余线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。 static final int CONDITION = -2; //与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。 static final int PROPAGATE = -3; //节点的等待状态,默认0状态:值为0,表明初始化状态。 volatile int waitStatus; //前驱结点 volatile Node prev; //后驱节点 volatile Node next; //目标线程 volatile Thread thread; //获取前驱结点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } }
对于共享资源state的修改,除了提供普通的getter以外,还提供了一个原子操做compareAndSetState()。函数
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。ui
AQS提供了几个重要的方法,参数都是state的值:this
自定义同步器主要实现这些方法便可,其余的工做AQS自己已经实现好了。线程
以ReentrantLock为例,state初始化为0,表示资源/锁未被占用。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其余线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。固然,释放锁以前,A线程本身是能够重复获取此锁的(state会累加),这就是==可重入==的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。code
今天主要分析独占式下的acquire-release。blog
1 public final void acquire(int arg) { 2 if (!tryAcquire(arg) && 3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4 selfInterrupt(); 5 }
函数步骤以下:
下面逐个方法看。
1 protected boolean tryAcquire(int arg) { 2 throw new UnsupportedOperationException(); 3 }
该方法的实现仅仅是抛出一个异常。然而该方法正是须要自定义同步器重写的方法,包括对state的操做。
因为直接获取资源失败,该方法是将线程放到等待队列尾部。
private Node addWaiter(Node mode) { //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享) Node node = new Node(Thread.currentThread(), mode); //尝试快速方式直接放到队尾。 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //上一步失败则经过enq入队。 enq(node); return node; }
其中compareAndSetTail(pred, node)是以原子的方式进行尾节点和当前线程节点的交换。
该方法是在快速加入尾节点失败以后执行,目的也是将node加入队尾。
1 private Node enq(final Node node) { 2 //"自旋",直到成功加入队尾 3 for (;;) { 4 Node t = tail; 5 if (t == null) { // 队列为空,建立一个空的标志结点做为head结点,并将tail也指向它。 6 if (compareAndSetHead(new Node()))//原子设置头节点 7 tail = head; 8 } else {//正常流程,放入队尾 9 node.prev = t; 10 if (compareAndSetTail(t, node)) { 11 t.next = node; 12 return t; 13 } 14 } 15 } 16 }
经过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部“休息”,直到其余线程完全释放资源后唤醒本身,再去获取共享资源。
1 final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true;//标记是否成功拿到资源 3 try { 4 boolean interrupted = false;//标记等待过程当中是否被中断过 5 6 //又是一个“自旋”! 7 for (;;) { 8 final Node p = node.predecessor();//拿到前驱 9 //若是前驱是head,则能够去获取资源 10 if (p == head && tryAcquire(arg)) { 11 setHead(node);//拿到资源后,将head指向该结点。因此head所指的标杆结点,就是当前获取到资源的那个结点或null。 12 p.next = null; // setHead中node.prev已置为null,此处再将原来的head.next置为null,就是为了方便GC回收之前的head结点。也就意味着以前拿完资源的结点出队了! 13 failed = false; 14 return interrupted;//返回等待过程当中是否被中断过 15 } 16 17 //判断是否能够休息,若是能够,就进入waiting状态,若是等待过程当中被中断过,就将interrupted标记为true 18 if (shouldParkAfterFailedAcquire(p, node) && 19 parkAndCheckInterrupt()) 20 interrupted = true; 21 } 22 } finally {//自旋过程当中超时或者被中断则从队列移除该节点 23 if (failed) 24 cancelAcquire(node); 25 } 26 }
此方法主要用于检查状态,看看本身是否能够进入waiting状态。
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2 int ws = pred.waitStatus;//拿到前驱的等待状态 3 if (ws == Node.SIGNAL) 4 //若是已经告诉前驱资源释放后通知本身一下,那就能够安心休息了 5 return true; 6 if (ws > 0) { 7 /* 8 * 若是前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。 9 * 注意:那些放弃的结点稍后就会被回收 10 */ 11 do { 12 node.prev = pred = pred.prev; 13 } while (pred.waitStatus > 0); 14 pred.next = node; 15 } else { 16 //若是前驱正常,那就把前驱的状态设置成SIGNAL,告诉它资源释放后通知本身一下。 17 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 18 } 19 return false; 20 }
休眠而且检查中断。
1 private final boolean parkAndCheckInterrupt() { 2 LockSupport.park(this);//调用本地方法park()使线程进入waiting状态 3 return Thread.interrupted();//返回当前线程是否被中断。 4 }
从队列移除节点
private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev;//跳过已经被取消的节点一直往前找,直到找到一个有效的节点,让node的前驱结点指向该节点 Node predNext = pred.next;//获取刚才找到的前驱结点的后置节点 node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) {//若是当前node就是尾节点,就以原子方式把刚才找到的前驱结点设置为新的尾节点 compareAndSetNext(pred, predNext, null);//以原子的方式将上面设置为新的尾节点的后置节点置为null } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {//前驱结点不是头节点并且成功设置了"信号状态"的以后,就把它的后置节点指向即将要取消的node节点的后置节点 Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node);//不然唤醒下一个须要获取锁的节点 } node.next = node; } }
当前节点是尾节点:
当前节点既不是尾节点也不是头节点:
释放资源,若是完全释放了(即state=0),它会唤醒等待队列里的其余线程来获取资源。
1 public final boolean release(int arg) { 2 if (tryRelease(arg)) { 3 Node h = head;//找到头结点,即当前持有资源的线程对应的节点 4 if (h != null && h.waitStatus != 0) 5 unparkSuccessor(h);//唤醒等待队列里的下一个线程 6 return true; 7 } 8 return false; 9 }
此方法尝试去释放指定量的资源。须要自定义同步器本身实现。
1 protected boolean tryRelease(int arg) { 2 throw new UnsupportedOperationException(); 3 }
其实就是将state减去arg,若是已经完全释放资源(state=0),要返回true,不然返回false。
此方法用于唤醒等待队列中下一个线程。
1 private void unparkSuccessor(Node node) { 3 int ws = node.waitStatus; 4 if (ws < 0)//置0当前线程所在的结点状态。 5 compareAndSetWaitStatus(node, ws, 0); 6 7 Node s = node.next;//找到下一个须要唤醒的结点s 8 if (s == null || s.waitStatus > 0) {//若是为空或已取消 9 s = null; 10 for (Node t = tail; t != null && t != node; t = t.prev)//从队列尾向前找,直到找到下一个距离node最近的有效节点 11 if (t.waitStatus <= 0)//从这里能够看出,<=0的结点,都是还有效的结点。 12 s = t; 13 } 14 if (s != null) 15 LockSupport.unpark(s.thread);//唤醒 16 }