一、队列同步器是用来构建锁或者其余同步组件的基础框架,使用一个int型变量表明同步状态,经过内置的队列来完成线程的排队工做。java
二、下面是JDK8文档中对于AQS的部分介绍node
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable 提供一个框架,用于实现依赖先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量,事件等)。 该类被设计为大多数类型的同步器的有用依据,这些同步器依赖于单个原子int值来表示状
态。子类必须定义改变此状态的protected方法,以及根据该对象被获取或释放来定义该状态的含义。给定这些,这个类中的其余方法执行全部排队和阻塞机制。 子类能够保持其余状态字段,但只以
原子方式更新int使用方法操纵值getState() , setState(int)和compareAndSetState(int, int)被跟踪相对于同步。 此类支持默认独占模式和共享模式。 当以独占模式获取时,尝试经过其余线程获取不能成功。 多线程获取的共享模式可能(但不须要)成功。 除了在机械意义上,这个类不理解这些差别,当共享
模式获取成功时,下一个等待线程(若是存在)也必须肯定它是否也能够获取。 在不一样模式下等待的线程共享相同的FIFO队列。 一般,实现子类只支持这些模式之一,可是二者均可以在
ReadWriteLock中发挥做用。仅支持独占或仅共享模式的子类不须要定义支持未使用模式的方法。
总结来讲就是:安全
①子类经过继承AQS并实现其抽象方法来管理同步状态,对于同步状态的更改经过提供的getState()、setState(int state)、compareAndSetState(int expect, int update)来进行操做,由于使用CAS操做保证同步状态的改变是原子的。多线程
②子类被推荐定义为自定义同步组件的静态内部类,同步器自己并无实现任何的同步接口,仅仅是定义了若干状态获取和释放的方法来提供自定义同步组件的使用。并发
③同步器既能够支持独占式的获取同步状态,也能够支持共享式的获取同步状态(ReentrantLock、ReentrantReadWriteLock、CountDownLatch等不一样类型的同步组件)框架
三、同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义;ide
继承同步器而且重写指定的方法,而后将同步器组合在自定义同步组件的实现中,而且调用同步器提供的模板方法(这些模板方法会调用重写的方法);而重写指定的方法的时候,须要使用getState()、setState(int state)、compareAndSetState(int expect, int update)来访问或者更新同步状态。下面是源码中state变量和三个方法的定义声明实现测试
1 /** 2 * .(同步状态) 3 */ 4 private volatile int state; 5 6 /** 7 * (返回当前的同步状态) 8 * 此操做的内存语义为@code volatile read 9 */ 10 protected final int getState() { 11 return state; 12 } 13 14 /** 15 * (设置新的同步状态) 16 * 此操做的内存语义为@code volatile read 17 */ 18 protected final void setState(int newState) { 19 state = newState; 20 } 21 22 /** 23 * (若是要更新的状态和指望的状态相同,那就经过原子的方式更新状态) 24 * ( 此操做的内存语义为@code volatile read 和 write) 25 * (若是更新的状态和指望的状态不一样就返回false) 26 */ 27 protected final boolean compareAndSetState(int expect, int update) { 28 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 29 }
1 /** 2 * 独占式的获取同步状态,实现该方法须要查询当前状态并判断同步状态是否符合预期,而后再进行CAS设置同步状态 3 * 4 */ 5 protected boolean tryAcquire(int arg) { 6 throw new UnsupportedOperationException(); 7 } 8 9 /** 10 * 独占式的释放同步状态,等待获取同步状态的线程能够有机会获取同步状态 11 * 12 */ 13 protected boolean tryRelease(int arg) { 14 throw new UnsupportedOperationException(); 15 } 16 17 /** 18 * 尝试以共享模式获取。 该方法应该查询对象的状态是否容许在共享模式下获取该对象,若是是这样,就能够获取它。 该方法老是由执行获取的线程调用。 19 * 若是此方法报告失败,则获取方法可能将线程排队(若是还没有排队),直到被其余线程释放为止。 获取失败时返回负值,若是在获取成共享模式下功但没 20 * 有后续共享模式获取能够成功,则为零; 而且若是以共享模式获取成功而且随后的共享模式获取可能成功,则为正值,在这种状况下,后续等待线程必须检查可用性。 21 */ 22 protected int tryAcquireShared(int arg) { 23 throw new UnsupportedOperationException(); //若是不支持共享模式 ,会抛出该异常 24 } 25 26 /** 27 * 尝试将状态设置为以共享模式释放同步状态。 该方法老是由执行释放的线程调用。 28 */ 29 protected int tryReleaseShared(int arg) { 30 throw new UnsupportedOperationException(); //若是不支持共享模式 ,会抛出该异常 31 } 32 33 /** 34 * 当前同步器是否在独占模式下被线程占用,通常该方法表示是否被当前线程所独占 35 */ 36 protected int isHeldExclusively(int arg) { 37 throw new UnsupportedOperationException(); //若是不支持共享模式 ,会抛出该异常 38 }
在实现自定义同步组件的时候,须要重写上面的方法,而下面的模板方法会调用上面重写的方法。下面介绍同步器提供的模板方法ui
1 /** 2 * 以独占模式获取,忽略中断。 经过调用至少一次tryAcquire(int)实现,成功返回。 不然线 3 * 程排队,可能会重复阻塞和解除阻塞,直到成功才调用tryAcquire(int) 4 */ 5 public final void acquire(int arg) {...} 6 7 /** 8 * 以独占方式得到,若是中断,停止。 经过首先检查中断状态,而后调用至少一次 9 * tryAcquire(int) ,成功返回。 不然线程排队,可能会重复阻塞和解除阻塞,调用 10 * tryAcquire(int)直到成功或线程中断。 11 */ 12 public final void acquireInterruptibly(int arg) throws InterruptedException {...} 13 14 /** 15 * 尝试以独占模式获取,若是中断则停止,若是给定的超时时间失败。 首先检查中断状态,然 16 * 后调用至少一次tryAcquire(int) ,成功返回。 不然,线程排队,可能会重复阻塞和解除阻 17 * 塞,调用tryAcquire(int)直到成功或线程中断或超时 18 */ 19 public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {...} 20 21 /** 22 * 以共享模式获取,忽略中断。 经过首次调用至少一次执行 tryAcquireShared(int),成功返 23 * 回。 不然线程排队,可能会重复阻塞和解除阻塞,直到成功调用tryAcquireShared(int) 。 24 */ 25 public final void acquireShared(int arg){...} 26 27 /** 28 * 以共享方式获取,若是中断,停止。 首先检查中断状态,而后调用至少一次 29 * tryAcquireShared(int) ,成功返回。 不然线程排队,可能会重复阻塞和解除阻塞,调用 30 * tryAcquireShared(int)直到成功或线程中断。 31 */ 32 public final void acquireSharedInterruptibly(int arg) throws InterruptedException{...} 33 34 /** 35 * 尝试以共享模式获取,若是中断则停止,若是给定的时间超过,则失败。 经过首先检查中断 36 * 状态,而后调用至少一次tryAcquireShared(int) ,成功返回。 不然,线程排队,可能会重 37 * 复阻塞和解除阻塞,调用tryAcquireShared(int)直到成功或线程中断或超时。 38 */ 39 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{...} 40 41 /** 42 * 独占式的释放同步状态,该方法会在释放同步状态以后,将同步队列中的第一个节点包含的线程唤醒 43 */ 44 public final boolean release(int arg){...} 45 46 /** 47 * 共享式的释放同步状态 48 */ 49 public final boolean releaseShared(int arg){...} 50 51 /** 52 * 获取在等待队列上的线程集合 53 */ 54 public final Collection<Thread> getQueuedThreads(){...}
a)t同步队列的实现原理this
AQS内部维护一个同步队列来完成同步状态的管理,当前线程获取同步状态失败的时候,AQS会将当前线程以及等待状态信息构形成一个结点Node并将其加入同步队列中,同时阻塞当前线程,当同步状态由持有线程释放的时候,会将同步队列中的首节点唤醒使其再次尝试获取同步状态。同步队列中的结点用来保存获取同步状态失败的线程的线程引用、等待状态以及前驱结点和后继结点。下面是Node的属性分析
1 static final class Node { 2 /** 共享模式下构造结点 */ 3 static final Node SHARED = new Node(); 4 /** 独占模式下构造结点 */ 5 static final Node EXCLUSIVE = null; 6 7 /** 用于指示线程已经取消的waitStatus值(因为在同步队列中等待的线程等待超时或者发生中断,须要从同步队列中取消等待,结点进入该状态将不会发生变化)*/ 8 static final int CANCELLED = 1; 9 /** waitstatus值指示后续线程须要取消等待(后继结点的线程处于等待状态,而当前结点的线程若是释放了同步状态或者CANCELL,将会通知后继结点的线程以运行) */ 10 static final int SIGNAL = -1; 11 /**waitStatus值表示线程正在等待条件(本来结点在等待队列中,结点线程等待在Condition上,当其余线程对Condition调用了signal()方法以后)该结点会从
等待队列中转移到同步队列中,进行同步状态的获取 */ 12 static final int CONDITION = -2; 13 /** 14 * waitStatus值表示下一个共享式同步状态的获取应该无条件传播下去 15 */ 16 static final int PROPAGATE = -3; 17 18 /** 19 * 不一样的等到状态的int值 20 */ 21 volatile int waitStatus; 22 23 /** 24 * 前驱结点,当结点加入同步队列将会被设置前驱结点信息 25 */ 26 volatile Node prev; 27 28 /** 29 * 后继结点 30 */ 31 volatile Node next; 32 33 /** 34 * 当前获取到同步状态的线程 35 */ 36 volatile Thread thread; 37 38 /** 39 * 等待队列中的后继结点,若是当前结点是共享的,那么这个字段是一个SHARED常量;也就是说结点类型(独占和共享)和等待队列中的后继结点公用一个字段 40 */ 41 Node nextWaiter; 42 43 /** 44 * 若是是共享模式下等待,那么返回true(由于上面的Node nextWaiter字段在共享模式下是一个SHARED常量) 45 */ 46 final boolean isShared() { 47 return nextWaiter == SHARED; 48 } 49 50 final Node predecessor() throws NullPointerException { 51 Node p = prev; 52 if (p == null) 53 throw new NullPointerException(); 54 else 55 return p; 56 } 57 58 Node() { // 用于创建初始头结点或SHARED标记 59 } 60 61 Node(Thread thread, Node mode) { // 用于添加到等待队列 62 this.nextWaiter = mode; 63 this.thread = thread; 64 } 65 66 Node(Thread thread, int waitStatus) { // Used by Condition 67 this.waitStatus = waitStatus; 68 this.thread = thread; 69 } 70 }
b)同步队列示意图和简单分析
①同步队列示意图:当一个线程获取了同步状态后,其余线程不能获取到该同步状态,就会被构造称为Node而后添加到同步队列之中,这个添加的过程基于CAS保证线程安全性。
②同步队列遵循先进先出(FIFO),首节点是获取到同步状态的结点,首节点的线程在释放同步状态的时候将会唤醒后继结点(而后后继结点就会变成新的首节点等待获取同步状态)
①前面说过,同步器的acquire()方法会获取同步状态,这个方法对不会响应中断,也就是说当线程获取通同步状态失败后会被构形成结点加入到同步队列中,当线程被中断时不会从同步队列中移除。
1 /** 2 * ①首先调用tryAcquire方法尝试获取同步状态,若是获取同步状态失败,就进行下面的操做 3 * ②获取失败:按照独占式的模式构造同步结点并经过addWaiter方法将结点添加到同步队列的尾部 4 * ③经过acquireQueue方法自旋获取同步状态。 5 * ④若是获取不到同步状态,就阻塞结点中的线程,而结点中的线程唤醒主要是经过前驱结点的出队或者被中断来实现 6 */ 7 public final void acquire(int arg) { 8 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 9 selfInterrupt(); 10 }
②下面是addWaiter、enq和自旋获取同步状态acquireQueue方法的实现(该方法的主要做用就是将获取同步状态失败的线程构形成结点而后添加到同步队列的队尾)
1 private Node addWaiter(Node mode) { 2 Node node = new Node(Thread.currentThread(), mode); 3 //尝试直接放在队尾 4 Node pred = tail; //直接获取同步器的tail结点 5 if (pred != null) { 6 node.prev = pred; 7 if (compareAndSetTail(pred, node)) { 8 //队尾结点不为空经过原子操做将构造的结点置为队尾结点 9 pred.next = node; 10 return node; 11 } 12 } 13 //采用自旋方式保证构造的结点添加到同步队列中 14 enq(node); 15 return node; 16 } 17 private Node enq(final Node node) { 18 for (;;) { //死循环知道添加成功 19 Node t = tail; 20 if (t == null) { // Must initialize 21 if (compareAndSetHead(new Node())) 22 tail = head; 23 } else { 24 node.prev = t; 25 //经过CAS方式将结点添加到同步队列以后才会返回,不然就会不断尝试添加(这样实际上就是在并发状况下,把向同步队列添加Node变得串行化了) 26 if (compareAndSetTail(t, node)) { 27 t.next = node; 28 return t; 29 } 30 } 31 } 32 } 33 /** 34 * 经过tryAcquire()和addWaiter(),表示该线程获取同步状态已经失败,被放入同步 35 * 队列尾部了。线程阻塞等待直到其余线程(前驱结点得到同步装填或者被中断)释放同步状 36 * 态后唤醒本身,本身才能得到。 37 */ 38 final boolean acquireQueued(final Node node, int arg) { 39 boolean failed = true; 40 try { 41 boolean interrupted = false; 42 //线程在死循环的方式中尝试获取同步状态 43 for (;;) { 44 final Node p = node.predecessor(); //获取前驱结点 45 //只有前驱接待是头结点的时候才能尝试获取同步状态 46 if (p == head && tryAcquire(arg)) { 47 setHead(node); //获取到同步状态以后,就将本身设置为头结点 48 p.next = null; //前驱结点已经得到同步状态去执行本身的程序了,因此须要释放掉占用的同步队列的资源,由JVM回收 49 failed = false; 50 return interrupted; 51 } 52 //若是获取同步状态失败,应该自旋等待继续获取而且校验本身的中断标志位信息 53 if (shouldParkAfterFailedAcquire(p, node) && 54 parkAndCheckInterrupt()) 55 interrupted = true; //若是被中断,就改变本身的中断标志位状态信息 56 } 57 } finally { 58 if (failed) 59 cancelAcquire(node); 60 } 61 }
③独占式获取同步状态的整个流程
④独占式同步器的释放:release方法执行时,会唤醒头结点的后继结点线程
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head;//头结点 //唤醒头结点的后继结点线程 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
①共享式获取和独占式获取最主要的区别是可否有多个线程同时获取到同步状态。如图所示简易描述两者的区别(共享式访问的时候,能够容许多个线程访问资源,可是存在独占式访问的时候,同一时刻其余的不论是共享仍是独占都会被阻塞)
②关于共享式获取同步状态的方法
1 /** 2 * 此方法是共享模式下线程获取共享同步状态的顶层入口。它会尝试去获取同步状态,获取成功则直接返回, 3 * 获取失败则进入等待队列一直尝试获取(执行doAcquireShared方法体中的内容),直到获取到资源为止(条件就是tryAcquireShared方法返回值大于等于0),整个过程忽略中断 4 */ 5 public final void acquireShared(int arg) { 6 if (tryAcquireShared(arg) < 0) 7 doAcquireShared(arg); 8 } 9 /** 10 * "自旋"尝试获取同步状态 11 */ 12 private void doAcquireShared(int arg) { 13 //首先将该线程包括线程引用、等待状态、前驱结点和后继结点的信息封装台Node中,而后添加到等待队列里面(一共享模式添加) 14 final Node node = addWaiter(Node.SHARED); 15 boolean failed = true; 16 try { 17 boolean interrupted = false; //当前线程的中断标志 18 for (;;) { 19 final Node p = node.predecessor(); //获取前驱结点 20 if (p == head) { 21 //当前驱结点是头结点的时候就会以共享的方式去尝试获取同步状态 22 int r = tryAcquireShared(arg); 23 //判断tryAcquireShared的返回值 24 if (r >= 0) { 25 //若是返回值大于等于0,表示获取同步状态成功,就修改当前的头结点并将信息传播都后续的结点队列中 26 setHeadAndPropagate(node, r); 27 p.next = null; // 释放掉已经获取到同步状态的前驱结点的资源 28 if (interrupted) 29 selfInterrupt(); //检查中断标志 30 failed = false; 31 return; 32 } 33 } 34 if (shouldParkAfterFailedAcquire(p, node) && 35 parkAndCheckInterrupt()) 36 interrupted = true; 37 } 38 } finally { 39 if (failed) 40 cancelAcquire(node); 41 } 42 }
根据源代码咱们能够了解共享式获取同步状态的整个过程
首先同步器会调用tryAcquireShared方法来尝试获取同步状态,而后根据这个返回值来判断是否获取到同步状态(当返回值大于等于0可视为获取到同步状态);若是第一次获取失败的话,就进入'自旋'状态(执行doAcquireShared方法)一直尝试去获取同步状态;在自旋获取中,若是检查到当前前驱结点是头结点的话,就会尝试获取同步状态,而一旦获取成功(tryAcquireShared方法返回值大于等于0)就能够从自旋状态退出。
另外,还有一点就是上面说到的一个处于等待队列的线程要想开始尝试去获取同步状态,须要知足的条件就是前驱结点是头结点,那么它自己就是整个队列中的第二个结点。当头结点释放掉全部的临界资源以后,咱们考虑每一个线程运行所需资源的不一样数量问题,以下图所示
③共享式同步状态的释放
对于支持共享式的同步组件(即多个线程同同时访问),它们和独占式的主要区别就是tryReleaseShared方法必须确保同步状态的释放是线程安全的(CAS的模式来释放同步状态,由于既然是多个线程可以访问,那么释放的时候也会是多个线程的,就须要保证释放时候的线程安全)
1 /** 2 * 该方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,若是成功释放且容许唤醒等待线程,它会唤醒等待队列里的其余线程来获取资源。 3 */ 4 public final boolean releaseShared(int arg) { 5 if (tryReleaseShared(arg)) { 6 doReleaseShared(); // 7 return true; 8 } 9 return false; 10 }
一、共享式锁的实现
①、自定义一个同步组件,能够容许两个线程访问(共享式同步组件),超过两个线程就会被阻塞。
②、既然是共享式同步组件,按照前面所说的,组件自己须要使用AQS提供的共享式模板方法acquireShared等;组件的内部类须要实现AQS,而且重写关于共享式获取同步状态的方法(tryAcquireShared()、tryReleaseShared()等共享模式下的方法)。
③、既然是两个线程可以同时访问的话,那么状态数的取值范围就是0、一、2了,每当一个线程获取到同步状态的时候state值减1,反之就会增长1;当state值为0的时候就会阻塞其余想要获取同步状态的线程。对于同步状态的更改须要使用CAS来进行保证原子性。
1 package cn.source.concurrent; 2 3 import java.util.concurrent.TimeUnit; 4 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 5 import java.util.concurrent.locks.Condition; 6 import java.util.concurrent.locks.Lock; 7 8 public class TestAQS implements Lock{ 9 10 private Sync sync = new Sync(2); 11 12 private static class Sync extends AbstractQueuedSynchronizer { 13 14 Sync(int num) { 15 if(num <= 0) { 16 throw new RuntimeException("num须要大于0"); 17 } 18 setState(num); 19 } 20 21 @Override 22 protected int tryAcquireShared(int arg) { 23 for(; ;) { 24 int currentState = getState(); 25 int newState = currentState - arg; 26 if(newState < 0 || compareAndSetState(currentState, newState)) { 27 return newState; 28 } 29 } 30 } 31 32 @Override 33 protected boolean tryReleaseShared(int arg) { 34 for(; ;) { 35 int currentState = getState(); 36 int newState = currentState + arg; 37 if(compareAndSetState(currentState, newState)) { 38 return true; 39 } 40 } 41 } 42 43 44 } 45 @Override 46 public void lock() { 47 sync.acquireShared(1); 48 } 49 50 @Override 51 public void unlock() { 52 sync.releaseShared(1); 53 } 54 55 //...... 56 }
1 /** 2 * 测试结果:输出的线程名称是成对的,保证同一时刻只有两个线程可以获取到锁 3 * 4 */ 5 public class TestLockShare { 6 @Test 7 public void test() { 8 Lock lock = new TestAQS(); 9 class Worker extends Thread { 10 11 @Override 12 public void run() { 13 while(true) { 14 lock.lock(); 15 try { 16 Thread.sleep(1000); 17 System.out.println(Thread.currentThread().getName()); 18 Thread.sleep(1000); 19 } catch (Exception e) { 20 e.printStackTrace(); 21 } finally { 22 lock.unlock(); 23 } 24 } 25 } 26 27 } 28 29 for (int i = 0; i < 8; i++) { 30 Worker worker = new Worker(); 31 worker.setDaemon(true); 32 worker.start(); 33 34 } 35 for (int i = 0; i < 8; i++) { 36 try { 37 Thread.sleep(1000); 38 } catch (InterruptedException e) { 39 // TODO Auto-generated catch block 40 e.printStackTrace(); 41 } 42 System.out.println(); 43 } 44 } 45 }
二、独占式锁的实现
1 package cn.source.concurrent; 2 3 import java.util.concurrent.TimeUnit; 4 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 5 import java.util.concurrent.locks.Condition; 6 import java.util.concurrent.locks.Lock; 7 8 public class Mutex implements Lock{ 9 10 private Sync sync = new Sync(); 11 12 private static class Sync extends AbstractQueuedSynchronizer { 13 14 /** 15 * 尝试获取资源,当即返回。成功则返回true,不然false。 16 */ 17 @Override 18 protected boolean tryAcquire(int arg) { 19 if(compareAndSetState(0, 1)) {//state为0才设置为1,不可重入! 20 setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源 21 return true; 22 } 23 return false; 24 } 25 26 /** 27 * 尝试释放资源,当即返回。成功则为true,不然false。 28 */ 29 @Override 30 protected boolean tryRelease(int arg) { 31 if(getState() == 0) { //既然来释放,那确定就是已占有状态了。只是为了保险,多层判断! 32 throw new IllegalMonitorStateException(); 33 } 34 setExclusiveOwnerThread(null); 35 setState(0); 36 return true; 37 } 38 39 @Override 40 protected boolean isHeldExclusively() { 41 // 判断是否锁定状态 42 return getState() == 1; 43 } 44 45 } 46 47 @Override 48 public void lock() { 49 sync.acquire(1); 50 } 51 52 @Override 53 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { 54 return sync.tryAcquire(1); 55 } 56 57 @Override 58 public void unlock() { 59 sync.release(1); 60 } 61 62 }
1 public class TestMutex { 2 @Test 3 public void test() { 4 Lock lock = new Mutex(); 5 class Worker extends Thread { 6 7 @Override 8 public void run() { 9 while(true) { 10 lock.lock(); 11 try { 12 Thread.sleep(1000); 13 System.out.println(Thread.currentThread().getName()); 14 Thread.sleep(1000); 15 } catch (Exception e) { 16 e.printStackTrace(); 17 } finally { 18 lock.unlock(); 19 } 20 } 21 } 22 23 } 24 25 for (int i = 0; i < 8; i++) { 26 Worker worker = new Worker(); 27 worker.setDaemon(true); 28 worker.start(); 29 30 } 31 for (int i = 0; i < 8; i++) { 32 try { 33 Thread.sleep(1000); 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } 37 System.out.println(); 38 } 39 } 40 }