Semaphore源码解析

一 概述
   二 源码总览
   三 acquire-请求令牌
   四 release-释放令牌
   五 总结
复制代码

一 概述

semaphore是信号的意思,在并发包中则表示持有指定数量令牌的信号量。它一般用于多线程同时请求令牌的控制。提供了acquire方法用于获取令牌,当令牌发放完后则进行阻塞等待,持有令牌的线程完成任务后须要调用release方法归还令牌。semaphore的使用很简单,如今经过学习它的源码来了解它的实现原理是怎样的。java

二 源码总览

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    // 全部机制都经过AbstractQueuedSynchronizer子类Sync来完成的
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            // 采用AQS中的state来统计令牌数
            setState(permits);
        }
        ……
    }
    // 非公平锁
    static final class NonfairSync extends Sync {……}
    // 公平锁
    static final class FairSync extends Sync{……}
    // 默认构造方法-默认非公平锁
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    // 提供选择公平锁仍是非公平锁的构造方法
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    // 请求一个令牌-响应中断+阻塞
    public void acquire() throws InterruptedException {……}
    // 指定获取领牌数-响应中断且阻塞
    public void acquire(int permits) throws InterruptedException {……}
    // 请求一个令牌-不响应中断+阻塞
    public void acquireUninterruptibly() {……}
    // 尝试请求一个令牌-非阻塞,失败当即返回
    public boolean tryAcquire(){……}
    // 尝试请求一个令牌,阻塞指定时间,超时后返回
    public boolean tryAcquire(long timeout, TimeUnit unit){……}
    // 释放令牌
    public void release(){……}
}    
复制代码

整体来讲,Semaphore内部有一个继承于AQS的内部类Sync,利用AQS的共享锁来实现对共享变量state进行操做,并将state做为令牌的计数, 并提供了公平和非公平锁的方式来获取令牌,总体的设计跟ReentrantLock很像。下面以最经常使用的acquire和release方法为例,详细了解他们的原理;node

三 acquire-请求令牌

先看acquire的方法体:微信

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
复制代码

里面实际调用的是AQS的请求共享锁方法:多线程

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 先进行一遍尝试获取锁,当返回小于0说明令牌不足了,则须要将当前线程加入到等待队列中 
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
复制代码

接着先回调在Semaphore中重写的tryAcquireShared()方法尝试获取锁:并发

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
    // 循环获取令牌
    for (;;) {
        // 获取当前可用的令牌数
        int available = getState();
        // 当前获取完后剩下的令牌数
        int remaining = available - acquires;
        // 剩下领牌数小于0或者大于等于0并CAS更新成功则直接返回剩余令牌数
        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;
    }
}
复制代码

简单尝试获取令牌失败后则再CAS尝试几回后加入同步队列中休眠等待:oop

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 在同步队列中增长等待节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 获取当前节点的前驱节点
            final Node p = node.predecessor();
            // 若是前驱节点为head节点,表示当前节点是同步等待队列中的第一个,故继续尝试一次获取锁
            if (p == head) {
                // 尝试获取令牌,此时会跳转到semaphore中(由于重写了该方法)
                int r = tryAcquireShared(arg);
                // 返回大于0则表示成功获取到令牌了
                if (r >= 0) {
                    // 将当前节点设为head节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 自旋几回后为避免强占CPU,则对该线程进行休眠处理
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 因中断请求则取消排队请求
        if (failed)
            cancelAcquire(node);
    }
}
复制代码

简单总结下acquire方法流程为:学习

  1. 先循环尝试取获取令牌,若是还有令牌则直接取到并cas更新剩余令牌数;
  2. 若上一步尝试没取到令牌则将当前线程加入到AQS同步队列中,并检查当前是否为第一个等待节点,是则再尝试请求;
  3. 若上一步屡次尝试无果则阻塞等待,等待有线程释放了令牌后再唤醒等待队列中的线程从新竞争获取令牌;

四 release-释放令牌

release方法体:ui

public void release() {
    sync.releaseShared(1);
}
复制代码

实际调用的是AQS的方法:this

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
复制代码

先调用在Semaphore中重写的尝试释放令牌方法,而且释放成功后返回true:spa

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 获取当前令牌数量
        int current = getState();
        // 计算释放后的令牌数量
        int next = current + releases;
        // 若是本次释放的令牌为负数则抛出异常
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // CAS更新令牌数成功后返回true
        if (compareAndSetState(current, next))
            return true;
    }
}
复制代码

释放令牌成功后,唤醒在同步队列中等待的线程:

private void doReleaseShared() {
    /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
复制代码

简单总结下release流程:

  1. 循环尝试释放令牌,并cas更新剩余令牌数;
  2. 令牌释放成功后唤醒同步等待队列中的线程;

五 总结

Semaphore利用AQS中的共享锁来操做共享变量state,并使用state做为令牌的计数。每一个线程调用required请求获取令牌,调用release则释放领牌。当令牌取完时则剩下的线程加入AQS队列中阻塞等待,当有令牌释放时会唤醒等待的线程。


更多原创文章请关注微信公众号
👇👇👇
唠吧嗑吧

相关文章
相关标签/搜索