J.U.C 之Semaphore

简介

信号量 Semaphore 是一个控制访问多个共享资源的计数器,和 CountDownLatch 同样,其本质上是一个“共享锁”。java

一个计数信号量。从概念上讲,信号量维护了一个许可集。node

若有必要,在许可可用前会阻塞每个 acquire,而后再获取该许可。
每一个 release 添加一个许可,从而可能释放一个正在阻塞的获取者。
复制代码

可是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采起相应的行动。安全

Semaphore 一般用于限制能够访问某些资源(物理或逻辑的)的线程数目。并发

下面咱们就一个停车场的简单例子来阐述 Semaphore :dom

  1. 为了简单起见咱们假设停车场仅有 5 个停车位。一开始停车场没有车辆全部车位所有空着,而后前后到来三辆车,停车场车位够,安排进去停车,而后又来三辆,这个时候因为只有两个停车位,全部只能停两辆,其他一辆必须在外面候着,直到停车场有空车位。固然,之后每来一辆都须要在外面候着。当停车场有车开出去,里面有空位了,则安排一辆车进去(至因而哪辆,要看选择的机制是公平仍是非公平)。ide

  2. 从程序角度看,停车场就至关于信号量 Semaphore ,其中许可数为 5 ,车辆就相对线程。当来一辆车时,许可数就会减 1 。当停车场没有车位了(许可数 == 0 ),其余来的车辆须要在外面等候着。若是有一辆车开出停车场,许可数 + 1,而后放进来一辆车。函数

  3. 信号量 Semaphore 是一个非负整数( >=1 )。当一个线程想要访问某个共享资源时,它必需要先获取 Semaphore。当 Semaphore > 0 时,获取该资源并使 Semaphore – 1 。若是S emaphore 值 = 0,则表示所有的共享资源已经被其余线程所有占用,线程必需要等待其余线程释放资源。当线程释放资源时,Semaphore 则 +1 。ui

实现分析

java.util.concurrent.Semaphore 结构以下图:this

从上图能够看出,Semaphore 内部包含公平锁(FairSync)和非公平锁(NonfairSync),继承内部类 Sync ,其中 Sync 继承 AQS(再一次阐述 AQS 的重要性)。spa

Semaphore 提供了两个构造函数:

Semaphore(int permits) :建立具备给定的许可数和非公平的公平设置的 Semaphore 。
Semaphore(int permits, boolean fair) :建立具备给定的许可数和给定的公平设置的 Semaphore 。
复制代码

实现以下:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
复制代码

Semaphore 默认选择非公平锁。

当信号量 Semaphore = 1 时,它能够看成互斥锁使用。其中 0、1 就至关于它的状态:

  1. 当 =1 时表示,其余线程能够获取;
  2. 当 =0 时,排他,即其余线程必需要等待。

Semaphore 的代码实现结构,和 ReentrantLock 相似。

信号量获取

Semaphore 提供了 #acquire() 方法,来获取一个许可。

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
复制代码

内部调用 AQS 的 #acquireSharedInterruptibly(int arg) 方法,该方法以共享模式获取同步状态。代码以下:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
复制代码

在 #acquireSharedInterruptibly(int arg) 方法中,会调用 #tryAcquireShared(int arg) 方法。而 #tryAcquireShared(int arg) 方法,由子类来实现。对于 Semaphore 而言,若是咱们选择非公平模式,则调用 NonfairSync 的#tryAcquireShared(int arg) 方法,不然调用 FairSync 的 #tryAcquireShared(int arg) 方法。若 #tryAcquireShared(int arg) 方法返回 < 0 时,则会阻塞等待,从而实现 Semaphore 信号量不足时的阻塞,代码以下:

// AQS.java
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            /** * 对于 Semaphore 而言,若是 tryAcquireShared 返回小于 0 时,则会阻塞等待。 */
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
复制代码

另外,这也是为何 Semaphore 在使用 AQS 时,state 表明的是,剩余可获取的许可数,而不是已经使用的许可数。咱们假设 state 表明的是已经使用的许可数,那么 #tryAcquireShared(int arg) 返回的结果 = 原始许可数 - state ,这个操做在并发状况下,会存在线程不安全的问题。因此,state 表明的是,剩余可获取的许可数,而不是已经使用的许可数。

公平状况的 FairSync 的方法实现,代码以下:

// FairSync.java
@Override
protected int tryAcquireShared(int acquires) {
    for (;;) {
        //判断该线程是否位于CLH队列的列头,从而实现公平锁
        if (hasQueuedPredecessors())
            return -1;
        //获取当前的信号量许可
        int available = getState();

        //设置“得到acquires个信号量许可以后,剩余的信号量许可数”
        int remaining = available - acquires;

        //CAS设置信号量
        if (remaining < 0 ||
                compareAndSetState(available, remaining))
            return remaining;
    }
}
复制代码

经过 #hasQueuedPredecessors() 方法,判断该线程是否位于 CLH 队列的列头,从而实现公平锁。 非公平状况的 NonfairSync 的方法实现,代码以下:

// NonfairSync.java
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

// Sync.java
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
复制代码

对于非公平而言,由于它不须要判断当前线程是否位于 CLH 同步队列列头,因此相对而言会简单些。

信号量释放

获取了许可,当用完以后就须要释放,Semaphore 提供 #release() 方法,来释放许可。代码以下:

public void release() {
    sync.releaseShared(1);
}
复制代码

内部调用 AQS 的 #releaseShared(int arg) 方法,释放同步状态。

// AQS.java
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
复制代码

releaseShared(int arg) 方法,会调用 Semaphore 内部类 Sync 的 #tryReleaseShared(int arg) 方法,释放同步状态。

// Sync.java
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        //信号量的许可数 = 当前信号许可数 + 待释放的信号许可数
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //设置可获取的信号许可数为next
        if (compareAndSetState(current, next))
            return true;
    }
}
复制代码

如该方法返回 true 时,表明释放同步状态成功,从而在 #releaseShared(int args) 方法中,调用 #doReleaseShared() 方法,可唤醒阻塞等待 Semaphore 的许可的线程。

应用示例

咱们已停车为示例:

public class SemaphoreTest {

    static class Parking {
    
        //信号量
        private Semaphore semaphore;

        Parking(int count) {
            semaphore = new Semaphore(count);
        }

        public void park() {
            try {
                //获取信号量
                semaphore.acquire();
                long time = (long) (Math.random() * 10);
                System.out.println(Thread.currentThread().getName() + "进入停车场,停车" + time + "秒..." );
                Thread.sleep(time);
                System.out.println(Thread.currentThread().getName() + "开出停车场...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }
    }


    static class Car extends Thread {
        Parking parking ;

        Car(Parking parking){
            this.parking = parking;
        }

        @Override
        public void run() {
            parking.park();     //进入停车场
        }
    }

    public static void main(String[] args){
        Parking parking = new Parking(3);

        for(int i = 0 ; i < 5 ; i++){
            new Car(parking).start();
        }
    }
}
复制代码

运行结果以下:

相关文章
相关标签/搜索