Java并发框架AbstractQueuedSynchronizer(AQS)

1.前言

  本文介绍一下Java并发框架AQS,这是大神Doug Lea在JDK5的时候设计的一个抽象类,主要用于并发方面,功能强大。在新增的并发包中,不少工具类都能看到这个的影子,好比:CountDownLatch、Semaphore、ReentrantLock等,其内部基本都有一个Sync对象是实现了这个AQS这个抽象类,只是实现的过程不一样而已,造就了这些不一样功能特色的并发工具类,不得不说这个并发框架的强大。理解了这个抽象类,咱们能够设计本身的并发工具类,达到相关的目的。html

2.AbstractQueuedSynchronizer

  其父类AbstractOwnableSynchronizer很简单,就一个成员变量exclusiveOwnerThread,用于展现当前全部者的线程。要理解这个框架的用途,最快的方法就是看代码上的注释了。这里对注释进行简要复述:java

  基于先入先出队列提供了一个用于实现阻塞锁和相关同步器(信号量和事件等)的一个框架。这个类被设计成一个对于实现大部分同步器很是有效的基本类,基于一个原子的int值,表示状态state。子类必须定义protected方法,来改变这个state。这个state意味着这个对象被获取或者是释放。除了这些方法,类中的其余方法实现了全部的排队和阻塞机制。子类能够维护其余状态字段,可是只能经过相关方法进行原子更新(compareAndSetState),以便同步过程被追踪。node

  子类应该被定义成一个非public的内部类,被外部类用来实现特性。这个类提供了默认的排他和共享模式。在排他模式中,其余线程请求锁都不会成功。共享模式多线程请求可能成功但不是必须。这个类并不清楚共享模式请求成功的意义,下一个等待线程也须要再次判断是否可以请求。一般只能实现一种模式,可是也能够同时实现共享和排他模式,能够参考ReadWriteLock。子类若是只支持一种模式,能够不须要实现不适用的模式的方法。设计模式

  这个类定义了一个嵌套类ConditionObject能够用于子类支持排他模式的实现,isHeldExclusively方法用于判断当前线程是不是持有排他锁的线程,release方法只有在当前state彻底释放了这个对象才被调用,acquire方法给定保存的状态值,最终恢复到acquire以前的状态。AQS中没有方法建立这样的一个condition,因此若是不能知足这些约束,不要使用它。condtionObject的行为取决于它的同步器语义的实现。安全

  该类为内部队列提供检查、插装和监视方法,condition object也有的相似方法。序列化这个类只会保存状态,相关线程会丢失。多线程

  若是须要使用这个类做为同步器的基本实现,须要实现下面的方法,经过检查或者修改同步器的状态(方法有getState、setState、compareAndSetState):并发

    1.tryAcquire框架

    2.tryReleaseide

    3.tryAcquireShared工具

    4.tryReleaseShared

    5.isHeldExclusively

  实现的时候必须确保这些方法的内部线程安全,而且运行时间要短,非阻塞。定义这些方法是使用这个类惟一支持的事情,其余方法都被声明了,他们不能独立变化。

2.1 Queued

  上面的解释很清楚了,首先是一个FIFO队列。这个队列是CLH(Craig,Landin,Hagersten)锁队列的变种。CLH锁常常用于自旋锁,在这里用于锁同步器,同时使用一些基本策略保存控制信息。status字段用于跟踪一个线程是否应该被锁。一个节点在它的前置节点释放时被触发。每个节点都做为一个特定触发风格的监视器,并持有一个等待中的线程。状态字段不控制线程是否被授予锁等等。一个在队列中的首节点会尝试获取,可是不保证第一个就会成功,因此它可能从新等待。

  入队列就是放在tail,出队列就是设置head。下面看看节点的定义:

  首先说明一下几个常量,这些都是用于status:

    CANCELLED:这个节点因为中断或者超时,任务被取消了,为这个状态的节点状态再也不改变,固然也不会被阻塞。

    SIGNAL:当前节点的后继节点是阻塞的,因此当前节点在释放或者取消的时候必须唤醒后继节点。为了不竞争,acquire方法必须首先代表它们须要一个signal,而后再尝试原子获取锁,最终失败阻塞。

    CONDITION:这个节点是一个条件队列,它不会用做同步队列节点知道被调用,此时status应该被设置成0(这个值与字段其余用途无关,简化告终构)

    PROPAGATE:释放共享锁的时候,须要传播通知其它节点。仅设置在头节点上,即使有其余操做干扰也要确保传播的持续。

  其余的字段比较好理解,waitStatus就是状态,prev前节点,next后节点,thread当前等待线程,nextWaiter是用于condition的,或者这个值是SHARED,代表是共享模式,isShared()方法就是经过nextWaiter==SHARED来进行判断的。

2.2 ConditionObject

  这个类很简单,就两个节点firstWaiter、lastWaiter,都是2.1中的Node类型。结合上面的内容也能够很容易明白这里的做用了,将同一个条件的等待者构形成双端链表放置在了一块儿,能够触发相关方法,通知全部条件等待者。

  addConditionWaiter():判断尾节点是否被取消,取消移除全部被取消的节点,添加到链表尾。private方法,非线程安全。

  doSignal():不断的经过CAS操做将firstWaiter的status设置成0,成功后将该节点CAS入同步器的队列尾,返回上一个队列节点,判断其等待状态若是大于0,或者是设置成signal失败,LockSupport,unpark当前节点的线程,完成通知操做。private方法。这个操做含义就是设置Condition的头个等待节点的等待状态是condition,设置失败,意味着任务取消找到能够设置的头个节点。成功后就要将这个节点加入到同步器的队列中,而且要保证前一个节点指示这个condition分支节点是等待状态,因此前一个节点不能是cancelled或者设置成signal起始信号失败,出现了这种状况要释放线程,从新进行同步。

  signal():public final方法,先判断是不是持有排他锁。以前说过condition通常是排他模式,这个方法只有占用锁的线程才能发起signal信号,通知其它线程。最后调用的就是private的doSignal方法。

  awaitUninterruptibly():这个实现了uninterruptible的条件等待,将线程添加到condition的等待链表中,而后尝试释放一下当前同步器,执行tryRelease,传入当前同步器的状态。若是成功,释放头节点的后继节点,返回当前的同步器状态。若是添加的这个节点不在同步队列中,阻塞这个节点,直到这个节点被添加到队列中。方法acquireQueued的主要工做就是:若是当该节点的前置节点成为队列的头节点时,tryAcquire判断是否能获取排他锁,前置节点获取了排他锁将该节点设置成头节点,这是个自旋的过程,返回这个过程当中线程是否中断的判断,或者线程在此期间被中断,标记线程中断。

  await():这类方法的实现差很少,都是先添加到condition的等待列表,而后尝试根据同步器状态判断是否能够释放当前头节点的后继阻塞节点。后面同样判断这个condition是否保存在了FIFO队列中,没有阻塞这个节点,知道唤醒后进入了排队等待队列。后续也是自旋等待到本身这个节点进行操做。最后也就是判断是否中断,中断模式执行相应的操做。

  其余方法不进行介绍,condition相关的主要就是介绍一下signal和await都干了什么。若是上面理解不了,能够结合2.3中对主要方法的说明,来理解一下这几个condition的操做。

2.3 AQS

  AQS里面的内容十分简单,就5个内容:unsafe用于操做CAS,head队列的头节点,tail队列的尾节点,state同步器当前状态,spinForTimeoutThreshold自旋超时时间。里面的方法在condition那里已经介绍了一些,主要分为:1.操做state的,2.CAS操做同步器状态的,包括head、tail和state,3.查询某个condition等待的线程状况的,4.操做队列的方法。

  enq(Node):等待节点入队列尾,返回其前驱节点。循环CAS操做,线程安全。

  addWaiter(Node mode):用所给的模式,设置本线程的节点。尝试通常执行入队列,成功则没有竞争安全,不成功调用enq方法入队列,保证竞争中安全。

  unparkSuccessor(Node node):唤醒后置节点,若是本节点状态是负数,则尝试修改其为0,失败也不要紧,关注的主要是后继节点。这里会选择一个未取消的后继节点,而后执行LockSupport.unpark(s.thread),唤醒这个线程。

  doReleaseShared():share模式下的release操做。先是肯定head的状态是signal,将其转换成0,并调用unparkSuccessor方法唤醒后继节点。成功后若是状态是0,将其状态改成传播状态propagate。所有设置好了,就退出循环的CAS操做。

  setHeadAndPropagate(Node node):设置头节点,并传播。要求参数大于0,或者节点为null,或者节点状态为负,另外要求后继节点若是存在必须是share模式,以后就调用doReleaseShared方法。

  cancelAcquire(Node node):取消一个正在请求获取的节点。找到该节点的前置非取消的第一个节点(往前数),将这个pred设置为它的前置节点。节点的状态变成取状态,若是该节点正好在尾部,移除掉,将pred设置为队列的尾节点,另外将pred.next设置成其后继节点。若是这个pred节点不是头节点,状态是signal或者小于0并修改为了signal,将此节点的next节点设置成pred的next。若是都不是,调用unparkSuccessor方法,直接唤醒后继节点便可。

  shouldParkAfterFailedAcquire(Node pred, Node node):检查并更新获取失败的节点的状态,若是该线程须要阻塞,返回true。要求pred = node.prev。pred的状态原本就是signal,返回true。若是pred取消了,继续找node的前一个未取消的节点,设置相关依赖,不然,将其改为signal,最后返回false。

  acquireQueued(Node node, int arg):这个以前介绍过,用于排他模式下的uninterruptible。主要做用就是找到该节点的前置节点,若是是head,就尝试获取锁,若是成功了,将这个设置为头节点。不是head,就调用shouldParkAfterFailedAcquire方法,做为一个失败的后继节点,设置状态成须要阻塞。

   其余方法再也不过多描述,基本看明白这几个方法干了些什么事情就能够了,下面结合具体的使用,来从全局的角度看一下这个并发框架是如何使用的。

3.具体实现

3.1 ReentrantLock

    private static class Test {

        private int n;

        public int getN() {
            return n;
        }

        public void inc() {
            n++;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int threadN = 20; int cycle = 10000;
        final Test test = new Test();
        for(int i = 0; i < threadN; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for(int i = 0; i < cycle; i++) {
                        test.inc();
                    }
                    System.out.println("ok");
                }
            }).start();
        }
        Thread.sleep(3000);
        System.out.println(test.getN());
    }

  上面是一段测试代码,很显然是线程不安全的,这个时候使用ReentrantLock修改一下Test类就能够保证其线程安全了。

    private static class Test {

        private ReentrantLock lock = new ReentrantLock();

        private int n;

        public int getN() {
            return n;
        }

        public void inc() {
            try {
                lock.lock();
                n++;
            } finally {
                lock.unlock();
            }
        }
    }

  ReentrantLock毫无疑问是实现了一个锁的问题,而且其实现的仍是可重入锁。下面着重关注lock和unlock的实现,来看待AQS的运行。

  ReentrantLock实现了两种锁,公平锁和非公平锁,公平就是按照阻塞顺序进行锁的获取,非公平就是竞争获取锁,默认采起的是非公平锁,公平锁会有额外的开销。

  先看这个非公平锁的lock方法:

    1.lock方法先是尝试无锁的状况下,获取锁,成功了就设置当前持有锁的线程。失败了调用acquire方法,传入获取1。

    2.acquire方法主要是先调用了tryAcquire(1)方法请求获取锁,失败了就先建立一个排他模式的等待节点,执行acquireQueued方法,这个方法上面说过,会让当前节点的前置节点若是是head进行获取锁尝试,成功了,那么该节点就变成了第一个节点,失败了这个节点就要变成一个阻塞节点,并检测节点是否取消。

    3.tryAcquire尝试获取锁的方法在这里就是调用了nonfairTryAcquire方法,这个方法中先判断当前同步器的状态,若是是0,意味着这期间锁被释放了,马上尝试获取一下,成功同样是设置此线程是锁的全部者线程,返回true,这样2步骤的阻塞就不须要了,直接执行便可。若是当前线程就是排他线程,则state状态累加,注意这就是可重入锁的含义,lock了多少次,就必须unlock多少次。更新这个状态。都不是天然不可能获取锁,执行2后面的阻塞动做。

  unlock方法就是release方法:

    1.release是AQS的方法,其调用tryRelease(arg)方法进行尝试释放锁,成功了以后会唤醒后继节点,调用的就是unparkSuccessor方法。

    2.和上面的同样,unlock就是要state扣除了,若是调用unlock方法的线程不是持有锁的线程会抛出异常,若是state变成0了,就意味着没有线程拥有锁,设置state,清楚ownThread。

  看到这里就很清楚了,首先就是state应该算是记录了多少次调用锁lock的方法的次数,为0的时候意味着没有竞争,不须要阻塞,但状态仍是要设置的,大于0就意味的被持有了,排他的话就须要放入等待队列了。unlock的时候就须要减去目标的state的值了,并唤醒等待队列中后一个节点来执行后续步骤。

  这里有个很奇怪的事情,非公平锁不是说不以请求顺序决定的吗,唤醒下一个等待节点那不是有序的?问题就在于构建等待队列的时候并无保证顺序,非公平的步骤中咱们能够看到其在判断state为0的时候直接进行尝试获取了锁,可是颇有可能刚刚有个线程释放了锁,原本是其后继节点得到锁的,这样就插了一个队。

  看到这里咱们就能明白公平锁之因此公平,问题就出在其获取锁的实现方式了:

  这里能够看到,公平锁和非公平锁惟一的区别就在于公平锁在state为0的时候并无马上获取锁,而是判断队列中是否有等待节点,没有才能直接获取。这就是ReentrantLock中的公平与非公平锁的区别。可是公平锁必定公平吗?这个就很差说了,由于两个线程在插入等待队列中的时候依旧存在竞争,顺序可能打乱,可是从本意上讲那也是赛跑看谁先排上队,至少排队肯定了以后就是公平的了。

3.2 CountDownLatch

  上面讲的ReentrantLock无疑是一个排他模式的AQS的运用,这里讲的CountDownLatch就是一个共享模式的运用了。这个类的做用颇有意思,比如倒计时,主要目标就是全部子线程任务都完成了,计数减一,最终主线程才能执行下去。下面给一个使用例子:

    public static void main(String[] args) throws InterruptedException {
        int thread = 5;
        CountDownLatch latch = new CountDownLatch(thread);
        for(int i = 0; i < thread; i++) {
            final int n = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(n*1000);
                        System.out.println("ok");
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        latch.await();
        System.out.println("over");
    }

  若是没有latch,开启多个线程执行会直接退出,由于主线程已经执行完了,有了这个以后就须要等待最初设置数量的子线程执行完毕才可。

  能够看到代码中子线程并无任何阻塞,想象一下大体就能明白CountDownLatch的实现过程了。先传入一个设置的state,这个就是new建立时传入的整数了。

  countdown方法调用了AQS的releaseShared(1)方法,实际上就回调了一些本身须要覆写的tryReleaseShared(方法),因为没有实质上生成等待节点,因此doReleaseShared是没有太大意义的。tryReleaseShared方法主要就是将state的计数经过循环CAS进行减1了,若是是state是0直接返回false,意味着多调用了一次,不正确。

  上面的说明基本能搞明白CountDownLatch的一个基本实现,可是主线程是如何实现阻塞的呢?继续看代码:await调用了AQS的acquireSharedInterruptibly的方法,反过来回调了tryAcquireShared方法,判断是是否小于0,小于0才会执行后续操做。因此在tryAcquireShared的方法中,若是state为0,意味着全部子线程执行完了,那还等啥,返回一个1,不须要执行等待操做了。反之返回一个-1,执行AQS的doAcquireSharedInterruptibly方法吧。这个方法会建立一个SHARE模式的等待节点,并不断循环查看tryAcquireShared,这个方法前面说了,判断的是当前是否全部子线程执行完了,因此当返回大于0的值时,就调用share模式的传播方法,实际上主线程根本不会阻塞,只是在不断的循环而已。直到调用setHeadAndPropagate方法,执行完毕后就返回了。

3.3 ReentrantReadWriteLock

  这个锁也是以前提到的,是一个很神奇的类,读写锁并存(即共享模式和排他模式)。更神奇的是读锁后接了写锁,锁即升级了,再也不容许读操做,排队等写完成,全部的读完成后就切换成写锁了。写操做完成后,读操做又能够进行了。这是怎么实现的呢?

  实际上读写锁的内部实现了3个内容,读写锁,和Sync锁。也分为公平和非公平。读写锁对象内部持有的锁就是父类的Sync锁,这样他们就创建起了联系。

  首先看读锁,先调用的就是sync的acquireShared(1)方法,这个毫无疑问。以后就会回调tryAcquireShared,尝试获取锁,具体步骤以下:

  经过exclusiveCount方法咱们能够明白这里面的门道了。state是一个int类型,读写锁将state设计成前16位存储写锁对象,后16位设计成存放读锁对象。写锁不为0且同步器当前全部线程不是这个线程,返回-1,执行后续的doAcquireShared(1)方法,进入等待。这就是上面说的,写锁存在的时候,读锁须要等待写锁完成。若是能够加读锁,分红首次等不一样状况进行处理。

  读锁的unlock方法同样调用了releaseShared(1)方法,最终调用了就是tryReleaseShared的方法。

  过程大体上就是读锁的一个逆过程。至于写锁的特性继续看,锁住的逻辑基本一致,主要关系tryAcquire的方法:

  获取状态和写锁的数量,若是存在锁,写锁为0,但当前的线程不是持有锁的线程,不能获取锁,即有读锁不能加写锁,有写锁不是本线程也不行。达到锁上限也不行,都经过了就能够获取写锁了。失败了天然去了阻塞队列,等待写锁完成,或其余写锁完成释放锁了。没有设置过锁就进行设置。

  写锁解锁的方法很简单,就是一个普通的锁去除而已。

3.4 Semaphore

  信号量也是一个有特色的设计类,简单的理解为其控制最大并发数。它维持着一个凭证数量,每一个线程须要获取一个凭证,执行完归还。凭证借完了以后就没有了,新的线程只能等待其余线程归还凭证了。下面一个例子能够运行看看。

    public static void main(String[] args) throws InterruptedException {
        final Semaphore semaphore = new Semaphore(2);
        for(int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        Thread.sleep(1000);
                        System.out.println(System.currentTimeMillis());
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        Thread.sleep(4000);
    }

  上面代码就能看到信号量的一个使用方法了,先获取一个凭证,执行后续代码,执行完成以后释放凭证。固然上述例子很不严谨,抛出异常后释放动做就没法执行了。

  看到这里根据以前所说的,就能猜想一下信号量的实现过程了。首先因为容许多个线程同时容许,因此必定是共享模式。另外构造方法中的参数必定是同步器的state。acquire方法就应该是反过来的-1操做了,减到零就阻塞。release操做就是反过来的+1操做了。

  简单看一下果真是这么作的,另外要注意的就是release操做若是乱用是会超过最初设置的容许大小的,好比acquire(1),可是release(2),Semaphore是不会关心这个的。能够将上面的例子改为这个试试。

  以前我看到一道题,两个线程如何交替打印1,2,3...。第一反应就是使用notify和wait来实现了,可是说Semaphore能够实现。看到这里你会怎么使用semaphore来实现呢?可能想使用凭证为1的信号量,公平锁就能够了啊。其实是不行的,缘由以前说过,这里的公平含义和你所想的是有出入的,不必定是交替执行的。那么不能实现了吗?也不是,这里有个颇有意思的操做,直接看代码便可:

    public static void test() {
        final Semaphore[] semaphores = new Semaphore[]{new Semaphore(1), new Semaphore(0)};
        for(int i = 0; i < 2; i++) {
            final int n = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    int index = n;
                    Semaphore aim = semaphores[index];
                    Semaphore other = semaphores[1-index];
                    for(int i = 0; i < 10; i++) {
                        try {
                            aim.acquire();
                            System.out.println(Thread.currentThread().getName()+":"+SemaphoreTest.getAndInc());
                            other.release();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "Thread-" + i).start();
        }
    }

  其中有个方法就是自增操做,不列出来了。

3.5 CyclicBarrier

  这个类虽然和AQS没有直接的关系,可是其实现使用到的是ReentrantLock,也不是彻底无关,另外就是此类的功能也有特色。这个是一个栏闩,能够看做是屏障或者是起跑线,做用就是只有知足最初设置数量的线程在等待,才会开始运行,就像有个栏杆把全部线程拦住了,必须等人到齐,才能运行。这个类的例子有不少,好比:这里。本文就不给出了。

  主要方法就一个dowait方法,parties表示总参与者数量,count表示未就位的参与者数量。generation表示一个周期,就像连接中的例子给出来的,能够重复使用,进行二段赛跑。breakBarrier和nextGeneration都是结束本轮,进行下一轮的相关方法。

  最后仍是关注一下dowait方法的实现,首先就是加锁,一个个来。1.查看本轮是否结束,结束抛出异常。2.查看线程是否中断,中断结束本轮。3.count未参与者数量减1,若是减到零说明全部的人准备齐了,进入下一轮nextGeneration,返回。4.使用condition的await方法。这样在有唤醒操做的时候就能够全部线程继续运行下去了,这里唤醒动做就在breakBarrier和nextGenenration里面了,步骤2中减到零,全部的对象都到齐,就进入下一轮顺带唤醒本轮的全部线程。这样就达到了这个类的目的了。

4.总结

  AQS的使用很简单,只须要实现2中所说的那几个方法就能够了。同步器以state做为判断依据,怎么定义state就看相关人员怎么设计了,同步器采用了模板方法设计模式,实现了排他锁和共享锁的相关逻辑,只须要实现方法,判断是否须要阻塞添加到等待队列便可,全部其余过程均已被封装。

  对于封装的方法不理解,能够参考这篇文章,可能更详细一点:这里

  本文主要仍是讲了一下JDK中几个用到了AQS的类,他们的特色以及实现过程,旨在学习如何使用AQS定义出本身想要的同步工具。

相关文章
相关标签/搜索