Semaphore


JUC 高并发工具类(3文章)与高并发容器类(N文章) :

1 Semaphore是什么?

Semaphore是计数信号量。Semaphore管理一系列许可。每一个acquire方法阻塞,直到有一个许可证能够得到而后拿走一个许可证;每一个release方法增长一个许可,这可能会释放一个阻塞的acquire方法。然而,其实并无实际的许可这个对象,Semaphore只是维持了一个可得到许可证的数量。面试

好比:停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1,当显示屏上的剩余车位为0时,停车场入口的栏杆就不会再打开,车辆就没法进入停车场了,直到有一辆车从停车场出去为止。数据库

好比:在学生时代都去餐厅打过饭,假若有3个窗口能够打饭,同一时刻也只能有3名同窗打饭。第四我的来了以后就必须在外面等着,只要有打饭的同窗好了,就能够去相应的窗口了 。编程

1604135894267

2 怎么使用 Semaphore

2.1 构造方法

//建立具备给定的许可数和非公平的公平设置的 Semaphore。  
Semaphore(int permits)   

//建立具备给定的许可数和给定的公平设置的 Semaphore。  
Semaphore(int permits, boolean fair)

2.2 重要方法

在上面咱们使用最基本的acquire方法和release方法就能够实现Semaphore最多见的功能,不过其余方法仍是须要咱们去了解一下的。并发

一、acquire(int permits)

今后信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。就比如是一个学生占两个窗口。这同时也对应了相应的release方法。

二、release(int permits)

释放给定数目的许可,将其返回到信号量。这个是对应于上面的方法,一个学生占几个窗口完事以后还要释放多少

三、availablePermits()

返回此信号量中当前可用的许可数。也就是返回当前还有多少个窗口可用。

四、reducePermits(int reduction)

根据指定的缩减量减少可用许可的数目。

五、hasQueuedThreads()

查询是否有线程正在等待获取资源。

六、getQueueLength()

返回正在等待获取的线程的估计数目。该值仅是估计的数字。

七、tryAcquire(int permits, long timeout, TimeUnit unit)

若是在给定的等待时间内此信号量有可用的全部许可,而且当前线程未被中断,则今后信号量获取给定数目的许可。

八、acquireUninterruptibly(int permits)

今后信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。

3 使用案例

这个案例使用的就是咱们以前的小例子,也就是去餐厅打饭的案例。高并发

咱们先看Test类:工具

img

在这个代码中咱们看到,主要是new了一个Semaphore,而后赋给每一位同窗Student,接下来咱们就来好好看看Student线程是如何实现的。oop

img

在这个Student类中咱们最主要看run方法的实现,首先咱们经过acquire获取了当前窗口的许可,而后休眠3秒表明打饭,最后在finally使用release方法释放这个窗口许可证。代码很简单,原理很清楚,咱们测试一波:

img

这个结果你也看到了,基本上同一时刻只能有三个学生在窗口旁边。

在这里你可能有一个疑问了,Semaphore好像和synchronized关键字没什么区别,均可以实现同步,若是是这样那说明咱们尚未真正理解jdk的注释,他只是限制了访问某些资源的线程数,其实并无实现同步,咱们能够看一下:

img

如今咱们在获取许可前增长了一条输出语句,也就是能打印出有哪一个线程进入了,再去测试一波

img

结果很清晰,因此对于Semaphore来讲,咱们须要记住的实际上是资源的互斥而不是资源的同步,在同一时刻是没法保证同步的,可是却能够保证资源的互斥。

4 Semaphore使用场景

用于那些资源有明确访问数量限制的场景,经常使用于限流 。

  • 好比:数据库链接池,同时进行链接的线程有数量限制,链接不能超过必定的数量,当链接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库链接才能得到数据库链接。

  • 好比:停车场场景,车位数量有限,同时只能容纳多少台车,车位满了以后只有等里面的车离开停车场外面的车才能够进入。

5 Semaphore原理

(1)、Semaphore初始化。

Semaphore semaphore=new Semaphore(2);

一、当调用new Semaphore(2) 方法时,默认会建立一个非公平的锁的同步阻塞队列。

二、把初始许可数量赋值给同步队列的state状态,state的值就表明当前所剩余的许可数量。

初始化完成后同步队列信息以下图:

img

(2)获取许可

semaphore.acquire();

一、当前线程会尝试去同步队列获取一个许可,获取许可的过程也就是使用原子的操做去修改同步队列的state ,获取一个许可则修改成state=state-1。

二、 当计算出来的state<0,则表明许可数量不足,此时会建立一个Node节点加入阻塞队列,挂起当前线程。

三、当计算出来的state>=0,则表明获取许可成功。

源码:

/**
     *  获取1个许可
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
/**
     * 共享模式下获取许可,获取成功则返回,失败则加入阻塞队列,挂起线程
     * @param arg
     * @throws InterruptedException
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //尝试获取许可,arg为获取许可个数,当可用许可数减当前许可数结果小于0,则建立一个节点加入阻塞队列,挂起当前线程。
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
/**
     * 一、建立节点,加入阻塞队列,
     * 二、重双向链表的head,tail节点关系,清空无效节点
     * 三、挂起当前节点线程
     * @param arg
     * @throws InterruptedException
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //建立节点加入阻塞队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //得到当前节点pre节点
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);//返回锁的state
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //重组双向链表,清空无效节点,挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

线程一、线程二、线程三、分别调用semaphore.acquire(),整个过程队列信息变化以下图:

img

(3)、释放许可

semaphore.release();

当调用semaphore.release() 方法时

一、线程会尝试释放一个许可,释放许可的过程也就是把同步队列的state修改成state=state+1的过程

二、释放许可成功以后,同时会唤醒同步队列的全部阻塞节共享节点线程

三、被唤醒的节点会从新尝试去修改state=state-1 的操做,若是state>=0则获取许可成功,不然从新进入阻塞队列,挂起线程。

源码:

/**
     * 释放许可
     */
    public void release() {
        sync.releaseShared(1);
    }
/**
     *释放共享锁,同时唤醒全部阻塞队列共享节点线程
     * @param arg
     * @return
     */
    public final boolean releaseShared(int arg) {
        //释放共享锁
        if (tryReleaseShared(arg)) {
            //唤醒全部共享节点线程
            doReleaseShared();
            return true;
        }
        return false;
    }
/**
     * 唤醒全部共享节点线程
     */
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//是否须要唤醒后继节点
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改状态为初始0
                        continue;
                    unparkSuccessor(h);//唤醒h.nex节点线程
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE));
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

继上面的图,当咱们线程1调用semaphore.release(); 时候整个流程以下图:

img


回到◀疯狂创客圈

疯狂创客圈 - Java高并发研习社群,为你们开启大厂之门

相关文章
相关标签/搜索