Java Semaphore实现线程池任务调度

关于Semaphore举例html

以一个停车场运做为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这时若是同时来了五辆车,看门人容许其中三辆不受阻碍的进入,而后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,若是又离开两辆,则又能够放入两辆,如此往复。在这个停车场系统中,车位是公共资源,每辆车比如一个线程,看门人起的就是信号量的做用。java

信号量的特性以下:多线程

信号量是一个非负整数(车位数),全部经过它的线程(车辆)都会将该整数减一(经过它固然是为了使用资源),当该整数值为零时,全部试图经过它的线程都将处于等待状态。在信号量上咱们定义两种操做: Wait(等待) 和 Release(释放)。 当一个线程调用Wait(等待)操做时,它要么经过而后将信号量减一,要么一直等下去,直到信号量大于一或超时。Release(释放)其实是在信号量上执行加操做,对应于车辆离开停车场,该操做之因此叫作“释放”是由于加操做其实是释放了由信号量守护的资源。并发

在java中,还能够设置该信号量是否采用公平模式,若是以公平方式执行,则线程将会按到达的顺序(FIFO)执行,若是是非公平,则能够后请求的有可能排在队列的头部。app


JDK中定义以下:
Semaphore(int permits, boolean fair)ide

建立具备给定的许可数和给定的公平设置的Semaphore。工具

Semaphore当前在多线程环境下被扩放使用,操做系统的信号量是个很重要的概念,在进程控制方面都有应用。Java并发库Semaphore 能够很轻松完成信号量控制,Semaphore能够控制某个资源可被同时访问的个数,经过 acquire() 获取一个许可,若是没有就等待,而 release() 释放一个许可。好比在Windows下能够设置共享文件的最大客户端访问个数。ui

 

与Semaphore具备相同做用的调度工具备CountDownLatch、CyclicBarrierspa

 

一.多线程资源分配

1.通常模式

package com.tianwt.app;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreThreadPool {
	
	public static int numbers = 20;
    public static void main(String[] args) {
    	// 容许2个线程同时访问  
        final Semaphore semaphore = new Semaphore(2);  
        ExecutorService executorService = Executors.newCachedThreadPool();  
       
        for (int i = 0; i < 20; i++) {  
            final int index = i+1;   
            executorService.execute(new Runnable() {  
                public void run() {  
                    try {  
                        semaphore.acquire();  
                        // 这里多是业务代码  
                        numbers = numbers-1;
                        System.out.println("("+index+")."+"线程:" + Thread.currentThread().getName() + ",numbers=" + numbers);  
                        TimeUnit.SECONDS.sleep(1);  
                        semaphore.release();  
                        System.out.println("容许TASK个数:" + semaphore.availablePermits());    
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                }  
            });  
        }  
        executorService.shutdown();  
    }
   
}

执行结果以下:操作系统

(1).线程:pool-1-thread-1,numbers=18
(2).线程:pool-1-thread-2,numbers=18
(4).线程:pool-1-thread-4,numbers=16
(3).线程:pool-1-thread-3,numbers=16
容许TASK个数:0
容许TASK个数:0
容许TASK个数:2
容许TASK个数:0
(5).线程:pool-1-thread-5,numbers=14
(6).线程:pool-1-thread-6,numbers=14
(7).线程:pool-1-thread-7,numbers=13
容许TASK个数:0
(10).线程:pool-1-thread-10,numbers=12
容许TASK个数:0
容许TASK个数:1
容许TASK个数:1
(11).线程:pool-1-thread-11,numbers=10
(9).线程:pool-1-thread-9,numbers=10
容许TASK个数:0
(8).线程:pool-1-thread-8,numbers=8
容许TASK个数:1
(12).线程:pool-1-thread-12,numbers=9
容许TASK个数:2
容许TASK个数:2
(14).线程:pool-1-thread-14,numbers=6
(13).线程:pool-1-thread-13,numbers=6
容许TASK个数:2
容许TASK个数:0
(16).线程:pool-1-thread-16,numbers=4
(15).线程:pool-1-thread-15,numbers=4
容许TASK个数:2
容许TASK个数:2
(18).线程:pool-1-thread-18,numbers=2
(17).线程:pool-1-thread-17,numbers=2
(19).线程:pool-1-thread-19,numbers=1
(20).线程:pool-1-thread-20,numbers=0
容许TASK个数:0
容许TASK个数:0
容许TASK个数:1
容许TASK个数:2

从执行结果来看,CacheThreadPool连续建立了20个线程,一样也印证了Semaphore资源分配功能。遗憾的是的Semaphore使用公平模式初始化时,并未彻底进行FIFO模式排队。另一点,从numbers咱们知道,Semaphore只是用来实现资源分配的,并不会资源同步

final Semaphore semaphore = new Semaphore(2,true);  

运行结果以下:

2.同步模式

(2).线程:pool-1-thread-2,numbers=18
(1).线程:pool-1-thread-1,numbers=18
容许TASK个数:0
(3).线程:pool-1-thread-3,numbers=16
容许TASK个数:0
(4).线程:pool-1-thread-4,numbers=17
容许TASK个数:2
容许TASK个数:0
(5).线程:pool-1-thread-5,numbers=14
(6).线程:pool-1-thread-6,numbers=14
容许TASK个数:0
(9).线程:pool-1-thread-9,numbers=12
(7).线程:pool-1-thread-7,numbers=13
容许TASK个数:0
(11).线程:pool-1-thread-11,numbers=10
容许TASK个数:0
容许TASK个数:0
(10).线程:pool-1-thread-10,numbers=10
(8).线程:pool-1-thread-8,numbers=8
容许TASK个数:2
容许TASK个数:0
(12).线程:pool-1-thread-12,numbers=8
容许TASK个数:2
(14).线程:pool-1-thread-14,numbers=6
容许TASK个数:2
(13).线程:pool-1-thread-13,numbers=6
容许TASK个数:0
容许TASK个数:0
(16).线程:pool-1-thread-16,numbers=4
(15).线程:pool-1-thread-15,numbers=5
容许TASK个数:0
(17).线程:pool-1-thread-17,numbers=3
(18).线程:pool-1-thread-18,numbers=2
容许TASK个数:0
容许TASK个数:1
容许TASK个数:2
(20).线程:pool-1-thread-20,numbers=0
(19).线程:pool-1-thread-19,numbers=1
容许TASK个数:2
容许TASK个数:2

咱们看到,pool-1-thread-8和pool-1-thread-7相距很远。

2.同步模式

如何解决3个不足呢?

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreThreadPool {
	
	public static int numbers = 20;
    public static void main(String[] args) {
    	// 容许2个线程同时访问  
        final Semaphore semaphore = new Semaphore(2,true);  
        final ExecutorService executorService = Executors.newFixedThreadPool(4);
       
        for (int i = 0; i < 20; i++) {  
            final int index = i+1;   
            executorService.execute(new Runnable() {  
                public void run() {  
                    try {  
                        semaphore.acquire();  
                        // 这里多是业务代码  
                        synchronized (executorService) {
                        numbers = numbers-1;
                        System.out.println("("+index+")."+"线程:" + Thread.currentThread().getName() + ",numbers=" + numbers);  
                        TimeUnit.SECONDS.sleep(1);  
                        }
                        semaphore.release();  
                        System.out.println("容许TASK个数:" + semaphore.availablePermits());    
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                }  
            });  
        }  
        executorService.shutdown();  
    }
   
}

这样就能够解决线程池过多,FIFO问题以及同步问题。

二.线程等待

public static boolean isLocked = true;
    public static void main(String[] args) throws InterruptedException {
    	
    	final Semaphore semaphore = new Semaphore(0);
    	Thread t = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					TimeUnit.SECONDS.sleep(3);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}finally{
					System.out.println(Thread.currentThread().getName()+":start");
					/**
					 *  (●'◡'●)
					 * 注意,若是是重要的变量值修改,必须在semaphore调用release以前,不然有可能形成未知问题
					 *  (●'◡'●)
					 */
					isLocked = true; 
					semaphore.release(1);
					System.out.println(Thread.currentThread().getName()+":end");
				}
			}
		});
    	
    	t.start();
    	semaphore.acquire(); //由于初始信号量为0,所以这里阻塞等待
    	System.out.println("主线程执行!");
    	semaphore.release();
    
    }

以前在iOS开发中遇到变量值修改不一样不得问题,在这里提醒开发者,若是是重要的变量值修改,必须在semaphore调用release以前,不然有可能形成未知问题

iOS dispatch信号量semaphore

相关文章
相关标签/搜索