JUC 中提供的限流利器-Semaphore(信号量)

在 JUC 包下,有一个 Semaphore 类,翻译成信号量,Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它经过协调各个线程,以保证合理的使用公共资源。Semaphore 跟锁(synchronized、Lock)有点类似,不一样的地方是,锁同一时刻只容许一个线程访问某一资源,而 Semaphore 则能够控制同一时刻多个线程访问某一资源。java

Semaphore(信号量)并非 Java 语言特有的,几乎全部的并发语言都有。因此也就存在一个信号量模型的概念,以下图所示:数据库

信号量模型

信号量模型比较简单,能够归纳为:一个计数器、一个队列、三个方法并发

计数器:记录当前还能够运行多少个资源访问资源。框架

队列:待访问资源的线程ide

三个方法学习

  • init():初始化计数器的值,可就是容许多少线程同时访问资源。
  • up():计数器加1,有线程归还资源时,若是计数器的值大于或者等于 0 时,从等待队列中唤醒一个线程
  • down():计数器减 1,有线程占用资源时,若是此时计数器的值小于 0 ,线程将被阻塞。

这三个方法都是原子性的,由实现方保证原子性。例如在 Java 语言中,JUC 包下的 Semaphore 实现了信号量模型,因此 Semaphore 保证了这三个方法的原子性。ui

Semaphore 是基于 AbstractQueuedSynchronizer 接口实现信号量模型的。AbstractQueuedSynchronizer 提供了一个基于 FIFO 队列,能够用于构建锁或者其余相关同步装置的基础框架,利用了一个 int 来表示状态,经过相似 acquire 和 release 的方式来操纵状态。关于 AbstractQueuedSynchronizer 更多的介绍,能够点击连接:spa

ifeve.com/introduce-a…线程

AbstractQueuedSynchronizer 在 Semaphore 类中的实现类以下:翻译

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }
复制代码

在 Semaphore 类中,实现了两种信号量:公平的信号量和非公平的信号量,公平的信号量就是你们排好队,先到先进,非公平的信号量就是不必定先到先进,容许插队。非公平的信号量效率会高一些,因此默认使用的是非公平信号量。具体的能够查看 Semaphore 类实现源码。

Semaphore 类中,主要有如下方法:

// 构造方法,参数表示许可证数量,用来建立信号量
public Semaphore(int permits);
// 从信号量中获取许可,至关于获取到执行权
public void acquire() throws InterruptedException;
// 尝试获取1个许可,无论是否可以获取成功,都当即返回,true表示获取成功,false表示获取失败
public boolean tryAcquire();
// 将许可还给信号量
public void release();
复制代码

Semaphore 类的实现就了解的差很少了。可能你会有疑问 Semaphore 的应用场景是什么?Semaphore 能够用来限流(流量控制),在一些公共资源有限的场景下,Semaphore 能够派上用场。好比在作日志清洗时,可能有几十个线程在并发清洗,可是将清洗的数据存入到数据库时,可能只给数据库分配了 10 个链接池,这样两边的线程数就不对等了,咱们必须保证同时只能有 10 个线程获取数据库连接,不然就会存在大量线程没法连接上数据库。

用 Semaphore 信号量来模拟这操做,代码以下:

public class SemaphoreDemo {
    /** * semaphore 信号量,能够限流 * * 模拟并发数据库操做,同时有三十个请求,可是系统每秒只能处理 5 个 */

    private static final int THREAD_COUNT = 30;

    private static ExecutorService threadPool = Executors
            .newFixedThreadPool(THREAD_COUNT);
	// 初始化信号量,个数为 5 
    private static Semaphore s = new Semaphore(5);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 获取许可
                        s.acquire();
                        System.out.println(Thread.currentThread().getName()+" 完成数据库操做 ,"+ new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format( new Date()));
                        // 休眠两秒钟,效果更直观
                        Thread.sleep(2000);
                        // 释放许可
                        s.release();
                    } catch (InterruptedException e) {
                    }
                }
            });
        }
		// 关闭链接池
        threadPool.shutdown();
    }
}
复制代码

运行效果以下:

图片描述

从结果中,能够看出,每秒只有 5 个线程在执行,这符合咱们的预期。

好了,关于 Semaphore 的内容就结束了,更加详细的还请您查阅相关资料和阅读 Semaphore 源码。但愿这篇文章对您的学习或者工做有所帮助。

感谢您的阅读,祝好。

最后

目前互联网上不少大佬都有 Semaphore(信号量) 相关文章,若有雷同,请多多包涵了。原创不易,码字不易,还但愿你们多多支持。若文中有所错误之处,还望提出,谢谢。

互联网平头哥
相关文章
相关标签/搜索