关注微信公众号 JavaStormjava
Semaphore
如今广泛翻译成 "信号量",从概念上讲信号量维护着一组 "凭证",获取到凭证的线程才能访问资源,使用完成后释放, 咱们可使用信号量来限制访问特定资源的并发线程数。数据库
就像现实生活中的停车场车位,当有空位的时候才能放车子进入,否则就只能等待,出来的车子则释放凭证。安全
能够简单的归纳为:一个计数器、一个等待队列、三个方法。 在信号量模型里,计数器和等待队列对外是透明的,只能经过信号量模型提供的三个方法访问它们,init()、acquire()、release()
。微信
这里提到的 init()、acquire()、release()
三个方法都是原子性的,而且这个原子性是由信号量模型的实现方保证的。在 Java SDK 里面,信号量模型是由 java.util.concurrent.Semaphore
实现的,Semaphore 这个类可以保证这三个方法都是原子操做。并发
经过一个简化版的信号模型代码便于理解:app
public class Semaphore {
//计数器
private int count;
//保存线程的等待队列
private Queue queue;
/** * 初始化计数器 * @param count */
public Semaphore(int count) {
this.count = count;
}
/** * 获取凭证 */
public void acquire(){
this.count--;
if(this.count<0){
// 将当前线程插入等待队列
// 阻塞当前线程
}
}
/** * 释放凭证 */
public void release(){
this.count++;
if(this.count >= 0) {
// 移除等待队列中的某个线程 T
// 唤醒线程 T
}
}
}
复制代码
经过上文咱们了解到信号量模型原理,接下来则看如何在实际场景中使用。这里咱们仍是用累加器的例子来讲明信号量的使用吧。在累加器的例子里面,count++
操做是个临界区,只容许一个线程执行,也就是说要保证互斥。函数
public class TestSemaPhore {
private static int count;
//初始化信号量为 1
private static final Semaphore semaphore = new Semaphore(1);
public static void addOne() throws InterruptedException {
//使用信号量保证互斥,只有一个线程进入
semaphore.acquire();
try {
count++;
} finally {
semaphore.release();
}
}
public static int getCount() {
return count;
}
public static void main(String[] args) throws InterruptedException {
//模拟十个线程同时访问
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
addOne();
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
countDownLatch.countDown();
TimeUnit.SECONDS.sleep(3);
int count = getCount();
System.out.println(count);
}
}
复制代码
咱们来分析下信号量如何保证互斥的。ui
假设两个线程 T1 和 T2 同时访问 addOne()
,当他们都调用semaphore.acquire();
的时候,因为这是一个原子操做,因此只有一个线程能把信号量计数器减为 0,另一个线程 T2 则是将计数器减为 -1。对应线程 T1 计数器的值为 0 ,知足大于等于 0,因此线程 T1 会继续执行;对于线程 T2,信号量计数器的值为 -1,小于 0 ,按照咱们以前的信号量模型 acquire()
描述,线程 T2 将被阻塞进入等待队列。因此此刻只有线程 T1 进入临界区执行 count++
。this
当前信号量计数器的值为 -1 ,当线程 T1 执行 semaphore.release()
操做执行完后 计数器 +1 则变成了 0,知足小于等于 0,按照模型的定义,此刻等待队列中的 T2 将会被唤醒,因而 T2 在 T1 执行完临界区代码后才得到进入代码领截取的机会,从而保证了互斥性。spa
上面的例子咱们利用信号量实现了一个简单的互斥锁,你会不会以为奇怪,既然 Java SDK 里面提供了 Lock,为啥还要提供一个 Semaphore ?其实 Semaphore 还有一个功能是 Lock 不容易实现的,那就是:Semaphore 能够容许多个线程访问一个临界区。
常见的就是池化资源,好比链接池、对象池、线程池等。好比熟悉的数据库链接池,在同一时刻容许多个线程同时使用链接,固然在每一个链接被释放以前,是容许其余线程使用的。
如今咱们假设有一个场景,对象池需求,就是一次性建立 N 哥对象,以后全部的线程都复用这 N 个对象,在对象被释放前,是不容许其余线程使用。
/** * 对象池 * */
public class ObjectPool {
//使用 阻塞队列保存对象池
private final ArrayBlockingQueue<InputSaleMapDO> pool;
//信号量
private final Semaphore sem;
/** * 初始化对象池 * * @param size 池大小 */
public ObjectPool(int size) {
pool = new ArrayBlockingQueue<>(size);
sem = new Semaphore(size);
for (int i = 0; i < size; i++) {
InputSaleMapDO inputSaleMapDO = new InputSaleMapDO();
inputSaleMapDO.setId((long) i);
pool.add(inputSaleMapDO);
}
}
//利用对象池的对象调用 function
public Long run(Function<InputSaleMapDO, Long> function) throws InterruptedException {
InputSaleMapDO obj = null;
sem.acquire();
try {
obj = pool.poll();
return function.apply(obj);
} finally {
pool.add(obj);
sem.release();
}
}
public static void main(String[] args) throws InterruptedException {
ObjectPool objectPool = new ObjectPool(2);
//模拟十个线程同时访问
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
objectPool.run(f -> {
System.out.println(f);
return f.getId();
});
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
countDownLatch.countDown();
TimeUnit.SECONDS.sleep(30);
}
}
复制代码
初始化线程池大小 2 ,咱们模拟 10 个线程,每次只能两个线程分配对象 InputSaleMapDO。
执行完回调函数以后,它们就会释放对象(这个释放工做是经过 pool.add(obj) 实现的),同时调用 release() 方法来更新信号量的计数器。若是此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。
上面的例子中 保存对象池使用了 ArrayBlockingQueue ,是一个线程安全的容器,那么是否能够换成 ArrayList?欢迎后台给出答案。还有假设是停车场的车位做为对象池,车主停车是否是也可使用 Semaphore 实现?