Java并发编程笔记之Semaphore信号量源码分析

JUC 中 Semaphore 的使用与原理分析,Semaphore 也是 Java 中的一个同步器,与 CountDownLatch 和 CycleBarrier 不一样在于它内部的计数器是递增的,那么,Semaphore 的内部实现是怎样的呢?java

  Semaphore 信号量也是Java 中一个同步容器,与CountDownLatch 和 CyclicBarrier 不一样之处在于它内部的计数器是递增的。为了可以一览Semaphore的内部结构,咱们首先要看一下Semaphore的类图,类图,以下所示:算法

 

 如上类图能够知道Semaphoren内部仍是使用AQS来实现的,Sync只是对AQS的一个修饰,而且Sync有两个实现类,分别表明获取信号量的时候是否采起公平策略。建立Semaphore的时候会有一个变量标示是否使用公平策略,源码以下:函数

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new       
        NonfairSync(permits);
    }

   Sync(int permits) {
       setState(permits);
   }

如上面代码所示,Semaphore默认使用的是非公平策略,若是你须要公平策略,则可使用带两个参数的构造函数来构造Semaphore对象,另外和CountDownLatch同样,构造函数里面传递的初始化信号量个数 permits 被赋值给了AQS 的state状态变量,也就是说这里AQS的state值表示当前持有的信号量个数。ui

 

接下来咱们主要看看Semaphore实现的主要方法的源码,以下:spa

  1.void acquire() 当前线程调用该方法的时候,目的是但愿获取一个信号量资源,若是当前信号量计数个数大于 0 ,而且当前线程获取到了一个信号量则该方法直接返回,当前信号量的计数会减小 1 。不然会被放入AQS的阻塞队列,当前线程被挂起,直到其余线程调用了release方法释放了信号量,而且当前线程经过竞争获取到了改信号量。当前线程被其余线程调用了 interrupte()方法中断后,当前线程会抛出 InterruptedException异常返回。源码以下:线程

   public void acquire() throws InterruptedException {
        //传递参数为1,说明要获取1个信号量资源
        sync.acquireSharedInterruptibly(1);
   }
   public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {

        //(1)若是线程被中断,则抛出中断异常
        if (Thread.interrupted())
            throw new InterruptedException();

        //(2)否者调用sync子类方法尝试获取,这里根据构造函数肯定使用公平策略
        if (tryAcquireShared(arg) < 0)
            //若是获取失败则放入阻塞队列,而后再次尝试若是失败则调用park方法挂起当前线程
        doAcquireSharedInterruptibly(arg);
    }

如上代码可知,acquire()内部调用了sync的acquireSharedInterruptibly  方法,后者是对中断响应的(若是当前线程被中断,则抛出中断异常),尝试获取信号量资源的AQS的方法tryAcquireShared 是由 sync 的子类实现,因此这里就要分公平性了,这里先讨论非公平策略 NonfairSync 类的 tryAcquireShared 方法,源码以下:code

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
} final
int nonfairTryAcquireShared(int acquires) { for (;;) { //获取当前信号量值 int available = getState(); //计算当前剩余值 int remaining = available - acquires; //若是当前剩余小于0或者CAS设置成功则返回 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }

如上代码,先计算当前信号量值(available)减去须要获取的值(acquires) 获得剩余的信号量个数(remaining),若是剩余值小于 0 说明当前信号量个数知足不了需求,则直接返回负数,而后当前线程会被放入AQS的阻塞队列,当前线程被挂起。若是剩余值大于 0 则使用CAS操做设置当前信号量值为剩余值,而后返回剩余值。另外能够知道NonFairSync是非公平性获取的,是说先调用aquire方法获取信号量的线程不必定比后来者先获取锁。对象

 

接下来咱们要看看公平性的FairSync 类是如何保证公平性的,源码以下:blog

 protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 || compareAndSetState(available, remaining))
                return remaining;
        }
 }

能够知道公平性仍是靠 hasQueuedPredecessors 这个方法来作的,之前的随笔已经讲过公平性是看当前线程节点是否有前驱节点也在等待获取该资源,若是是则本身放弃获取的权力,而后当前线程会被放入AQS阻塞队列,不然就去获取。hasQueuedPredecessors源码以下:队列

public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

如上面代码所示,若是当前线程节点有前驱节点则返回true,不然若是当前AQS队列为空 或者 当前线程节点是AQS的第一个节点则返回 false ,其中,若是 h == t 则说明当前队列为空则直接返回 false,若是 h !=t 而且 s == null 说明有一个元素将要做为AQS的第一个节点入队列(回顾下 enq 函数第一个元素入队列是两步操做,首先建立一个哨兵头节点,而后第一个元素插入到哨兵节点后面),那么返回 true,若是  h !=t 而且 s != null 而且  s.thread != Thread.currentThread() 则说明队列里面的第一个元素不是当前线程则返回 true。

 

  2.void acquire(int permits) 该方法与 acquire() 不一样在与后者只须要获取一个信号量值,而前者则获取指定 permits 个,源码以下:

public void acquire(int permits) throws InterruptedException {
        if (permits < 0) 
       throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }

 

  3.void acquireUninterruptibly() 该方法与 acquire() 相似,不一样之处在于该方法对中断不响应,也就是当当前线程调用了 acquireUninterruptibly 获取资源过程当中(包含被阻塞后)其它线程调用了当前线程的 interrupt()方法设置了当前线程的中断标志当前线程并不会抛出 InterruptedException 异常而返回。源码以下:

public void acquireUninterruptibly() {
     sync.acquireShared(1);
}

 

  4.void acquireUninterruptibly(int permits) 该方法与 acquire(int permits) 不一样在于该方法对中断不响应。源码如以下:

 public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
 }

 

  5.void release() 该方法做用是把当前 semaphore对象的信号量值增长 1 ,若是当前有线程由于调用 acquire 方法被阻塞放入了 AQS的阻塞队列,则会根据公平策略选择一个线程进行激活,激活的线程会尝试获取刚增长的信号量,源码以下:

  public void release() {
        //(1)arg=1
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {

        //(2)尝试释放资源
        if (tryReleaseShared(arg)) {

            //(3)资源释放成功则调用park唤醒AQS队列里面最早挂起的线程
            doReleaseShared();
            return true;
        }
        return false;
    }

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {

            //(4)获取当前信号量值
            int current = getState();

            //(5)当前信号量值增长releases,这里为增长1
            int next = current + releases;
            if (next < current) // 移除处理
                throw new Error("Maximum permit count exceeded");

            //(6)使用cas保证更新信号量值的原子性
            if (compareAndSetState(current, next))
                return true;
        }
    }

如上面代码能够看到 release()方法中对 sync.releaseShared(1),能够知道release方法每次只会对信号量值增长 1 ,tryReleaseShared方法是无限循环,使用CAS保证了 release 方法对信号量递增 1 的原子性操做。当tryReleaseShared 方法增长信号量成功后会执行代码(3),调用AQS的方法来激活由于调用acquire方法而被阻塞的线程。

 

  6.void release(int permits) 该方法与不带参数的不一样之处在于前者每次调用会在信号量值原来基础上增长 permits,然后者每次增长 1。源码以下:

public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
}

另外注意到这里调用的是 sync.releaseShared 是共享方法,这说明该信号量是线程共享的,信号量没有和固定线程绑定,多个线程能够同时使用CAS去更新信号量的值而不会阻塞。

 

到目前已经知道了其原理,接下来用一个例子来加深对Semaphore的理解,例子以下:

package com.hjc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * Created by cong on 2018/7/8.
 */
public class SemaphoreTest {

    // 建立一个Semaphore实例
    private static volatile Semaphore semaphore = new Semaphore(0);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 加入线程A到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {

                    System.out.println(Thread.currentThread() +  " over");
                    semaphore.release();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 加入线程B到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {

                    System.out.println(Thread.currentThread() +  " over");
                    semaphore.release();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 等待子线程执行完毕,返回
        semaphore.acquire(2);
        System.out.println("all child thread over!");

        //关闭线程池
        executorService.shutdown();
    }
}

运行结果以下:

相似于 CountDownLatch,上面咱们的例子也是在主线程中开启两个子线程进行执行,等全部子线程执行完毕后主线程在继续向下运行。

如上代码首先首先建立了一个信号量实例,构造函数的入参为 0,说明当前信号量计数器为 0,而后 main 函数添加两个线程任务到线程池,每一个线程内部调用了信号量的 release 方法,至关于计数值递增一,最后在 main 线程里面调用信号量的 acquire 方法,参数传递为 2 说明调用 acquire 方法的线程会一直阻塞,直到信号量的计数变为 2 时才会返回。

看到这里也就明白了,若是构造 Semaphore 时候传递的参数为 N,在 M 个线程中调用了该信号量的 release 方法,那么在调用 acquire 对 M 个线程进行同步时候传递的参数应该是 M+N;

 

对CountDownLatch,CyclicBarrier,Semaphored这三者之间的比较总结:

  1.CountDownLatch 经过计数器提供了更灵活的控制,只要检测到计数器为 0,而无论当前线程是否结束调用 await 的线程就能够往下执行,相比使用 jion 必须等待线程执行完毕后主线程才会继续向下运行更灵活。

  2.CyclicBarrier 也能够达到 CountDownLatch 的效果,可是后者当计数器变为 0 后,就不能在被复用,而前者则使用 reset 方法能够重置后复用,前者对同一个算法可是输入参数不一样的相似场景下比较适用。

  3.而 semaphore 采用了信号量递增的策略,一开始并不须要关心须要同步的线程个数,等调用 aquire 时候在指定须要同步个数,而且提供了获取信号量的公平性策略。

相关文章
相关标签/搜索