【Java并发工具类】Semaphore

前言

1965年,荷兰计算机科学家Dijkstra提出的信号量机制成为一种高效的进程同步机制。这以后的15年,信号量一直都是并发编程领域的终结者。1980年,管程被提出,成为继信号量以后的在并发编程领域的第二个选择。目前几乎全部的语言都支持信号量机制,Java也不例外。Java中提供了Semaphore并发工具类来支持信号量机制。下面咱们就来了解Java实现的信号量机制。
首先介绍信号量模型,而后介绍如何使用,最后使用信号量来实现一个限流器。html

信号量模型

信号量模型图(图来自参考[1]):java

image-20200215222026469

信号量模型总结为:一个计数器、一个等待队列和三个对外调用的方法。
计数器和等待队列时对外透明的,全部咱们只能经过三个对外方法来访问计数器和等待队列。
init():设置计数器的初始值。
down():计数器的值减一。若是此时计数器的值小于0,则当前线程插入等待队列并阻塞,不然当前线程能够继续执行。
up():计数器的值加一。若是此时计数器的值小于或者等于0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。数据库

这三个方法都是原子性的,由实现信号量模型的方法保证。在Java SDK中,信号量模型是由java.util.concurrent.Semaphore实现。编程

信号量模型代码化大体相似以下:安全

class Semaphore{
    int count; // 计数器
    Queue queue; // 等待队列
    
    // 初始化操做
    Semaphore(int c){
        this.count=c;
    }
    
    void down(){
        this.count--; // 计数器值减一
        if(this.count < 0){
            // 将当前线程插入等待队列
            // 阻塞当前线程
        }
    }
    
    void up(){
        this.count++; // 计数器值加一
        if(this.count <= 0) {
            // 移除等待队列中的某个线程T
            // 唤醒线程T
        }
    }
}

在信号量模型中,down()up()这两个操做也被成为P操做(荷兰语proberen,测试)和V操做(荷荷兰语verhogen,增长)。在我学的操做系统教材中(C语言实现),P操做对应wait(),V操做对应singal()。虽然叫法不一样,可是语义都是相同的。在Java SDK并发包中,down()up()分别对应于Semaphore中的acquire()release()并发

如何使用信号量

信号量有时也被称为红绿灯,咱们想一想红绿灯时怎么控制交通的,就知道该如何使用信号量。车辆路过十字路时,须要先检查是否为绿灯,若是是则通行,不然就等待。想一想和加锁机制有点类似,都是同样的操做,先检查是否符合条件(“尝试获取”),符合(“获取到”)则线程继续运行,不然阻塞线程。app

下面使用累加器的例子来讲明如何使用信号量。编程语言

count+=1操做是个临界区,只容许一个线程执行,即要保证互斥。因而咱们在进入临界区以前,使用down()即Java中的acquire(),在退出以后使用up()即Java中的release()。ide

static int count;
//初始化信号量
static final Semaphore s = new Semaphore(1); // 构造函数参数为1,表示只容许一个线程进行临界区。可实现一个互斥锁的功能。
//用信号量保证互斥    
static void addOne() {
    s.acquire(); // 获取一个许可(可看做加锁机制中加锁)
    try {
        count+=1;
    } finally {
        s.release(); // 归还许可(可看作加锁机制中解锁)
    }
}

完整代码以下:函数

package com.sakura.concrrent;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
    static int count;
    static final Semaphore s = new Semaphore(1);
    static void addOne() throws InterruptedException {
        //只会有一个线程将信号量中的计数器减为1,而另一个线程只能将信号量中计数器减为-1,致使被阻塞
        s.acquire();  
        try {
            count +=1;
            System.out.println("Now thread is " + Thread.currentThread() + "   and count is " + count);
        }finally {
            //进入临界区的线程在执行完临界区代码后将信号量中计数器的值加1而后,此时信号量中计数器的值为0,则从阻塞队列中唤醒被阻塞的进程
            s.release();   
        }
    }

    public static void main(String[] args) {
        // 建立两个线程运行
        MyThread thread1 = new MyThread();
        MyThread thread2 = new MyThread();

        thread1.start();
        thread2.start();
        System.out.println("main thread");

    }
}
class MyThread extends Thread{
    @Override
    public void run() {
        super.run();
        for(int i=0; i<10; i++) {                   
            try {
                SemaphoreTest.addOne();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果:

若是Semaphore的构造函数参数(许可数量,内置计数器的值)修改一下:

static final Semaphore s = new Semaphore(2);

计数器值的为2,那么就容许有两个线程进入临界区,咱们的count值就会出现问题

快速实现一个限流器

当设置信号量的计数器为1时,可实现一个简单的互斥锁功能。可是,咱们前面刚介绍过Java SDK中的Lock,Semaphore的用途显然不会与Lock一致,否则就重复造轮子了。Semaphore最重要的一个功能即是:能够容许多个线程访问一个临界区。(上述例子咱们就设置了计数器的值为2,可发现thread1和thread2均可进入临界区。)

咱们会在什么地方碰见这种需求呢?
各类池化资源,例如链接池、对象池、线程池等等。例如,数据库链接池,在同一时刻,必定是容许多个线程同时使用链接池,固然,每一个链接在被释放以前,是不容许其余线程使用的。

咱们设计以下能够容许N个线程使用的对象池,咱们将信号量的计数器值设为N,就可让N个线程同时进行临界区,多余的就会被阻塞。(代码来自参考[1])

class ObjPool<T, R> {
    final List<T> pool;    //使用List保存实例对象
    // 用信号量实现限流器
    final Semaphore sem;
    
    // 构造函数
    ObjPool(int size, T t){
        pool = new Vector<T>(){}; 
        for(int i=0; i<size; i++){
            pool.add(t);
        }
        sem = new Semaphore(size);
    }
    
    // 获取对象池的对象,调用 func
    R exec(Function<T,R> func) {
        T t = null;
        sem.acquire();    //容许N个进程同时进入临界区
        try {
            //咱们须要注意,由于多个进行能够进入临界区,因此Vector的remove方法是线程安全的
            t = pool.remove(0);    
            return func.apply(t);    //获取对象池汇中的一个对象后,调用func函数
        } finally {
            pool.add(t);    //离开临界区以前,将以前获取的对象放回到池中
            sem.release();    //使得计数器加1,若是信号量中计数器小于等于0,那么说明有线程在等待,此时就会自动唤醒等待线程
        }
    }
}
// 建立对象池
ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);

// 经过对象池获取 t,以后执行  
pool.exec(t -> {
    System.out.println(t);
    return t.toString();
});

小结

记得学习操做系统时,信号量类型分为了好几种整型信号量、记录型信号量、AND信号量以及“信号量集”(具体了解可戳参考[2])。我认为Java SDK中Semaphore应该是记录型信号量的实现。不禁想起,编程语言是对OS层面操做的一种抽象描述。这句话须要品须要细细品。

参考: [1] 极客时间专栏王宝令《Java并发编程实战》 [2] 静水深流.操做系统之信号量机制总结.https://www.cnblogs.com/IamJiangXiaoKun/p/9464336.html

相关文章
相关标签/搜索