线程间的同步与通讯(8)——Semaphore源码分析

前言

系列文章目录 java

Semaphore(信号量)也是经常使用的并发工具之一,它经常用于流量控制。一般状况下,公共的资源经常是有限的,例如数据库的链接数。使用Semaphore能够帮助咱们有效的管理这些有限资源的使用。数据库

Semaphore的结构和ReentrantLock以及CountDownLatch很像,内部采用了公平锁与非公平锁两种实现,若是你已经看过了ReentrantLock源码分析CountDownLatch源码分析,弄懂它将绝不费力。segmentfault

核心属性

CountDownLatch相似,Semaphore主要是经过AQS的共享锁机制实现的,所以它的核心属性只有一个sync,它继承自AQS:数组

private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    Sync(int permits) {
        setState(permits);
    }

    final int getPermits() {
        return getState();
    }

    final int nonfairTryAcquireShared(int acquires) {
        //省略
    }

    protected final boolean tryReleaseShared(int releases) {
        //
    }

    final void reducePermits(int reductions) {
        //省略
    }

    final int drainPermits() {
        //省略
    }
}

这里的permits和CountDownLatch的count很像,它们最终都将成为AQS中的state属性的初始值。并发

构造函数

Semaphore有两个构造函数:框架

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

默认的构造函数使用的是非公平锁,另外一个构造函数经过传入的fair参数来决定使用公平锁仍是非公平锁,这一点和ReentrantLock用的是一样的套路,都是一样的代码框架。函数

公平锁和非公平锁的定义以下:工具

static final class FairSync extends Sync {
    
   FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

static final class NonfairSync extends Sync {
    
   NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

获取信号量

获取信号量的方法有4个:源码分析

acquire方法 本质调用
acquire() sync.acquireSharedInterruptibly(1)
acquire(int permits) sync.acquireSharedInterruptibly(permits)
acquireUninterruptibly() sync.acquireShared(1)
acquireUninterruptibly(int permits) sync.acquireShared(permits);

可见,acquire()方法就至关于acquire(1)acquireUninterruptibly同理,只不过一种响应中断,一种不响应中断,关于AQS的那四个方法咱们在前面的文章中都已经分析过了,除了其中的tryAcquireShared(arg)由子类实现外,其余的都由AQS实现。ui

值得注意的是,在逐行分析AQS源码(3)——共享锁的获取与释放中咱们特别提到过tryAcquireShared返回值的含义:

  • 若是该值小于0,则表明当前线程获取共享锁失败
  • 若是该值大于0,则表明当前线程获取共享锁成功,而且接下来其余线程尝试获取共享锁的行为极可能成功
  • 若是该值等于0,则表明当前线程获取共享锁成功,可是接下来其余线程尝试获取共享锁的行为会失败

这里的返回值其实表明的是剩余的信号量的值,若是为负值则说明信号量不够了。

接下来咱们就看看子类对于tryAcquireShared(arg)方法的实现:

非公平锁实现

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || 
            compareAndSetState(available, remaining))
            return remaining;
    }
}

与通常的tryAcquire逻辑不一样,Semaphore的tryAcquire逻辑是一个自旋操做,由于Semaphore是共享锁,同一时刻可能有多个线程来修改这个值,因此咱们必须使用自旋 + CAS来避免线程冲突。

该方法退出的惟一条件是成功的修改了state值,并返回state的剩余值。若是剩下的信号量不够了,则就不须要进行CAS操做,直接返回剩余值。因此其实tryAcquireShared返回的不是当前剩余的信号量的值,而是若是扣去acquires以后,当前将要剩余的信号量的值,若是这个“将要”剩余的值比0小,则是不会发生扣除操做的。这就比如我要买10个包子,包子铺如今只剩3个了,则将会返回剩余3 - 10 = -7个包子,可是事实上包子店并无将包子卖出去,实际剩余的包子仍是3个;此时若是有另外一我的来只要买1个包子,则将会返回剩余3 - 1 = 2个包子,而且包子店会将一个包子卖出,实际剩余的包子数也是2个。

非公平锁的这种获取信号量的逻辑其实和CountDownLatch的countDown方法很像:

// CountDownLatch
public void countDown() {
    sync.releaseShared(1);
}

countDown()releaseShared(1)方法中将调用tryReleaseShared

// CountDownLatch
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

对比CountDownLatch的tryReleaseShared方法和Semaphore的tryAcquireShared方法可知,它们的核心逻辑都是减小state的值,只不过CountDownLatch借用了共享锁的壳,对它而言,减小state的值是一种释放共享锁的行为,由于它的目的是将state值降为0;而在Semaphore中,减小state的值是一种获取共享锁的行为,减小成功了,则获取成功。

公平锁实现

protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

经过对比能够看出,它和nonfairTryAcquireShared的惟一的差异在于:

if (hasQueuedPredecessors())
    return -1;

即在获取共享锁以前,先用hasQueuedPredecessors方法判断有没有人排在本身前面。关于hasQueuedPredecessors方法,咱们在前面的文章中已经分析过了,它就是判断当前节点是否有前驱节点,有的话直接返回获取失败,由于要让前驱节点先去获取锁。(毕竟公平锁讲究先来后到嘛)

释放信号量

释放信号量的方法有2个:

public void release() {
    sync.releaseShared(1);
}
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

可见,release() 至关于调用了 release(1),它们最终都调用了tryReleaseShared(int releases)方法:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

与获取信号量的逻辑相反,释放信号量的逻辑是将获得的信号量再归还回去,所以是增长state值的操做,代码自己很容易理解,这里再也不赘述。

工具方法

除了以上获取和释放信号量所用到的方法,Semaphore还定义了一些其余方法来帮助咱们操做信号量:

tryAcquire

注意,这个tryAcquire不是给acquire方法使用的!!!咱们上面分析信号量的获取时说过,获取信号量的acquire方法调用的是AQS的acquireSharedacquireSharedInterruptibly ,而这两个方法会调用子类的tryAcquireShared方法,子类必须实现这个方法。

而这里的tryAcquire方法并无定义在AQS的子类中,即既不在NonfairSync,也不在FairSync中,对于使用共享锁的AQS的子类,也不须要定义这个方法。事实上它直接定义在Semaphore中的。

因此,在看这个方法时,脑海中必定要有一个意识,虽然它和AQS的独占锁的获取逻辑中的tryAcquire重名了,但实际上它和AQS的独占锁是没有关系的,不要被它的名字绕晕了。

那么,这个tryAcquiretryAcquireShared方法有什么不一样呢?只要有两点:

  1. 返回值不一样:tryAcquire返回boolean类型,tryAcquireShared返回int
  2. tryAcquire必定是采用非公平锁模式,而tryAcquireShared有公平和非公平两种实现。

理清楚以上几点以后,咱们再来看tryAcquire方法的源码,它有四种重载形式:
两种不带超时机制的形式:

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

两种带超时机制的形式:

public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

其中,不带超时机制的tryAcquire方法实际上调用的就是nonfairTryAcquireShared(int acquires)方法,它和非公平锁的tryAcquireShared同样,只是tryAcquireShared是直接return nonfairTryAcquireShared(acquires),而tryAcquirereturn sync.nonfairTryAcquireShared(1) >= 0;,即直接返回获取锁的操做是否成功。

而带超时机制的tryAcquire方法提供了一种超时等待的方式,这是前面介绍的公平锁和非公平锁的获取锁逻辑中所没有的,它本质上调用了AQS的tryAcquireSharedNanos(int arg, long nanosTimeout)方法:

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

这个方法咱们在介绍CountDownLatch源码分析await(long timeout, TimeUnit unit)方法时已经分析过了,属于老套路了,这里就不展开了。

reducePermits

reducePermits方法用来减小信号量的总数,这在debug中是颇有用的,它与前面介绍的acquire方法的不一样点在于,即便当前信号量的值不足,它也不会致使调用它的线程阻塞等待。只要须要减小的信号量的数量reductions大于0,操做最终就会成功,也就是说,即便当前的reductions大于现有的信号量的值也不要紧,因此该方法可能会致使剩余信号量为负值。

protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}
final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return;
    }
}

咱们将它和nonfairTryAcquireShared对比一下:

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

能够看出,二者在CAS前的判断条件并不相同,reducePermits只要剩余值不比当前值大就能够,而nonfairTryAcquireShared必需要保证剩余值不小于0才会执行CAS操做。

drainPermits

相比reducePermits,drainPermits就更简单了,它直接将剩下的信号量一次性消耗光,而且返回所消耗的信号量,这个方法在debug中也是颇有用的:

public int drainPermits() {
    return sync.drainPermits();
}
final int drainPermits() {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}

实战

以上咱们分析了信号量的源码,接下来咱们来分析一下官方给的一个使用的例子:

class Pool {
    private static final int MAX_AVAILABLE = 100;
    // 初始化一个信号量,设置为公平锁模式,总资源数为100个
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

    public Object getItem() throws InterruptedException {
        // 获取一个信号量
        available.acquire();
        return getNextAvailableItem();
    }

    public void putItem(Object x) {
        if (markAsUnused(x))
            available.release();
    }

    // Not a particularly efficient data structure; just for demo

    protected Object[] items = ...whatever kinds of items being managed
    protected boolean[] used = new boolean[MAX_AVAILABLE];

    protected synchronized Object getNextAvailableItem() {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (!used[i]) {
                used[i] = true;
                return items[i];
            }
        }
        return null; // not reached
    }

    protected synchronized boolean markAsUnused(Object item) {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (item == items[i]) {
                if (used[i]) {
                    used[i] = false;
                    return true;
                } else
                    return false;
            }
        }
        return false;
    }

}

这个例子很简单,咱们用items数组表明可用的资源,用used数组来标记已经使用的资源的,used[i]的值为true,则表明items[i]这个资源已经被使用了。

(1) 获取一个可用资源
咱们调用getItem()来获取资源,在该方法中会先调用available.acquire()方法请求一个信号量,注意,这里若是当前信号量数不够时,是会阻塞等待的;当咱们成功地获取了一个信号量以后,将会调用getNextAvailableItem方法,返回一个可用的资源。

(2) 释放一个资源
咱们调用putItem(Object x)来释放资源,在该方法中会先调用markAsUnused(Object item)将须要释放的资源标记成可用状态(即将used数组中对应的位置标记成false), 若是释放成功,咱们就调用available.release()来释放一个信号量。

总结

Semaphore是一个有效的流量控制工具,它基于AQS共享锁实现。咱们经常用它来控制对有限资源的访问。每次使用资源前,先申请一个信号量,若是资源数不够,就会阻塞等待;每次释放资源后,就释放一个信号量。

(完)

系列文章目录

相关文章
相关标签/搜索