本身手动实现一个简单的线程池

前言:

     线程池,在个人理解当中,实际上是典型的消费者生产者模型

     咱们实现简单的线程池,其实不难,java的并发库中,有咱们能够直接拿来用的阻塞队列使用,用来存储任务以及消费者,而不须要咱们作额外的同步跟阻塞操做,而消费者会经过自旋的形式,不断的从任务阻塞队列获取任务,若是没有获取到任务,则阻塞,直到有任务来进行消费,下面是代码

 

 

 

-----------------------------------我是分割线-----------------------------------java

 

首先,咱们定义一个接口,定义下,线程池的一些简单操做,下面是代码并发

 

public interface Pool {

	/**
	 * 建立人:贺小五
	 * 建立时间:2017-08-19 00:21:53
	 * 描述:
	 * 		添加任务
	 */
	void execute(Runnable runnable);

	/**
	 * 建立人:贺小五
	 * 建立时间:2017-08-19 00:22:03
	 * 描述:
	 * 		中止线程池(让线程执行完任务在中止)
	 */
	void shutDown();

	/**
	 * 建立人:贺小五
	 * 建立时间:2017-08-19 00:22:39
	 * 描述:
	 * 		添加工做者
	 */
	void addWorker(int num);

	/**
	 * 建立人:贺小五
	 * 建立时间:2017-08-19 00:22:55
	 * 描述:
	 * 		移除工做者
	 */
	void removeWorker(int num);

	/**
	 * 建立人:贺小五
	 * 建立时间:2017-08-19 00:23:04
	 * 描述:
	 * 		线程池大小
	 */
	int poolSize();

	/**
	 * 建立人:贺小五
	 * 建立时间:2017-08-19 00:23:15
	 * 描述:
	 * 	     中止线程池(无论是否有任务,都中止)
	 */
	void shutDownNow();

}

 

 

既然定义好接口了,咱们来定义一下实现ide

public class DefaultThreadPool implements Pool{

	/**
	 * 使用java并发库下的阻塞队列来作,这样咱们就不须要作额外的同步跟阻塞操做
	 */
	private final BlockingQueue<Runnable> jobs = new LinkedBlockingQueue<>();
	private final BlockingQueue<Worker> workers = new LinkedBlockingQueue<>();


	/**
	 * 建立人:贺小五
	 * 建立时间:2017-08-19 00:24:22
	 * 描述:
	 * 		初始化线程池大小
	 */
	public DefaultThreadPool(int num) {
		initPool(num);
	}

	@Override
	public int poolSize() {
		return workers.size();
	}


	private void initPool(int num){
		for (int i = 0; i < num; i++) {
			Worker worker = new Worker();
			workers.add(worker);
			worker.start();
		}
	}

	@Override
	public void execute(Runnable runnable) {
		if(runnable!=null){
			jobs.add(runnable);
		}
	}

	@Override
	public void shutDown() {
		/**
		 * 经过不断的循环来判断,任务队列是否已经清空,
		 * 若是队列任务清空了,将工做者队列的线程中止
		 * 打破循环,清空工做者队列
		 */
		while(true){
			if(jobs.size()==0){
				for (Worker worker : workers) {
					worker.stopRunning();
				}
				break;
			}
		}
		workers.clear();
	}

	@Override
	public void shutDownNow() {
		/**
		 * 清空任务队列,而后调用中止线程池的方法
		 */
		jobs.clear();
		shutDown();
	}

	@Override
	public void addWorker(int num){
		/**
		 *  添加新的工做者到工做者队列尾部
		 */
		for (int i = 0; i < num; i++) {
			Worker worker = new Worker();
			workers.offer(worker);
			worker.start();
		}
	}

	@Override
	public void removeWorker(int num) {
		/**
		 * 移除工做者阻塞队列头部的线程
		 */
		for (int i = 0; i < num; i++) {
			try {
				workers.take().stopRunning();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}



	private class Worker extends Thread{

		//经过 volatile修饰的变量,保证变量的可见性,从而能让线程立刻得知状态
		private volatile boolean isRunning = true;


		@Override
		public void run() {
			//经过自旋不停的从任务队列中获取任务,
			while (isRunning){
				Runnable runnable = null;

				try {
					//若是工做队列为空,则紫色
					runnable = jobs.take();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}

				if(runnable!=null){
					System.out.print(getName()+"-->");
					runnable.run();
				}
				// 睡眠 100毫秒,验证 shutdown 是不是在任务执行完毕后才会关闭线程池
				try {
					Thread.sleep(100);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			System.out.println(getName()+"销毁...");
		}

		public void stopRunning(){
			this.isRunning = false;
		}
	}
}

 

 

上面接口实现很简单,工做者经过私有内部类,继承Thread 类来当工做者,内部定义两个阻塞队列,用于存储任务跟工做者,其它线程最大数,最小数,任务队列最大数暂时不定义了,这只是一个简单的实现,各位看官有兴趣,能够本身进行扩展测试

 

 

接下来就是测试代码,来验证下接口定义的方法this

public class PoolTest {
	public static void main(String[] args){
        //构建一个只有10个线程的线程池
		Pool pool = new DefaultThreadPool(10);

		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		//放500个任务进去,让线程池进行消费
		for (int i = 0; i < 500; i++) {
			int finalI = i;
			pool.execute(new Runnable() {
				@Override
				public void run() {
					System.out.println("打印数字:"+ finalI);
				}
			});
		}

		/**
		 * 验证线程池的消费完任务中止以及不等任务队列清空就中止任务
		 */
		System.out.println("中止线程池");
		pool.shutDown();
		//pool.shutDownNow();

		/**
		 * 移除2个工做者
		 */
		//pool.removeWorker(2);
		//System.out.println("线程池大小:"+pool.poolSize());

		/**
		 * 添加5个工做者
		 */
		//pool.addWorker(5);
		//System.out.println("线程池大小:"+pool.poolSize());

	}
}

 

 

 

以上,就是本身动手实现的一个简单线程池,经过自旋锁,让工做线程不停的从任务队列获取任务,很是简单的实现,若是不想使用java提供的阻塞队列,想本身作,可使用 java关键字 synchronize 配合 wait() notify()方法来实现或者是 lock接口配合 condition的 await()跟signal() 方法来实现一个简单的阻塞队列

 

 

 

到这,文章就结束了!spa

以上,均为本人我的理解,比较简单的理解,或许跟各位看官理解的有出入,欢迎指正交流线程

欢迎转载,请注明出处跟做者,谢谢!code

相关文章
相关标签/搜索