前言:
线程池,在个人理解当中,实际上是典型的消费者生产者模型
咱们实现简单的线程池,其实不难,java的并发库中,有咱们能够直接拿来用的阻塞队列使用,用来存储任务以及消费者,而不须要咱们作额外的同步跟阻塞操做,而消费者会经过自旋的形式,不断的从任务阻塞队列获取任务,若是没有获取到任务,则阻塞,直到有任务来进行消费,下面是代码
-----------------------------------我是分割线-----------------------------------java
首先,咱们定义一个接口,定义下,线程池的一些简单操做,下面是代码并发
public interface Pool { /** * 建立人:贺小五 * 建立时间:2017-08-19 00:21:53 * 描述: * 添加任务 */ void execute(Runnable runnable); /** * 建立人:贺小五 * 建立时间:2017-08-19 00:22:03 * 描述: * 中止线程池(让线程执行完任务在中止) */ void shutDown(); /** * 建立人:贺小五 * 建立时间:2017-08-19 00:22:39 * 描述: * 添加工做者 */ void addWorker(int num); /** * 建立人:贺小五 * 建立时间:2017-08-19 00:22:55 * 描述: * 移除工做者 */ void removeWorker(int num); /** * 建立人:贺小五 * 建立时间:2017-08-19 00:23:04 * 描述: * 线程池大小 */ int poolSize(); /** * 建立人:贺小五 * 建立时间:2017-08-19 00:23:15 * 描述: * 中止线程池(无论是否有任务,都中止) */ void shutDownNow(); }
既然定义好接口了,咱们来定义一下实现ide
public class DefaultThreadPool implements Pool{ /** * 使用java并发库下的阻塞队列来作,这样咱们就不须要作额外的同步跟阻塞操做 */ private final BlockingQueue<Runnable> jobs = new LinkedBlockingQueue<>(); private final BlockingQueue<Worker> workers = new LinkedBlockingQueue<>(); /** * 建立人:贺小五 * 建立时间:2017-08-19 00:24:22 * 描述: * 初始化线程池大小 */ public DefaultThreadPool(int num) { initPool(num); } @Override public int poolSize() { return workers.size(); } private void initPool(int num){ for (int i = 0; i < num; i++) { Worker worker = new Worker(); workers.add(worker); worker.start(); } } @Override public void execute(Runnable runnable) { if(runnable!=null){ jobs.add(runnable); } } @Override public void shutDown() { /** * 经过不断的循环来判断,任务队列是否已经清空, * 若是队列任务清空了,将工做者队列的线程中止 * 打破循环,清空工做者队列 */ while(true){ if(jobs.size()==0){ for (Worker worker : workers) { worker.stopRunning(); } break; } } workers.clear(); } @Override public void shutDownNow() { /** * 清空任务队列,而后调用中止线程池的方法 */ jobs.clear(); shutDown(); } @Override public void addWorker(int num){ /** * 添加新的工做者到工做者队列尾部 */ for (int i = 0; i < num; i++) { Worker worker = new Worker(); workers.offer(worker); worker.start(); } } @Override public void removeWorker(int num) { /** * 移除工做者阻塞队列头部的线程 */ for (int i = 0; i < num; i++) { try { workers.take().stopRunning(); } catch (InterruptedException e) { e.printStackTrace(); } } } private class Worker extends Thread{ //经过 volatile修饰的变量,保证变量的可见性,从而能让线程立刻得知状态 private volatile boolean isRunning = true; @Override public void run() { //经过自旋不停的从任务队列中获取任务, while (isRunning){ Runnable runnable = null; try { //若是工做队列为空,则紫色 runnable = jobs.take(); } catch (InterruptedException e) { e.printStackTrace(); } if(runnable!=null){ System.out.print(getName()+"-->"); runnable.run(); } // 睡眠 100毫秒,验证 shutdown 是不是在任务执行完毕后才会关闭线程池 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(getName()+"销毁..."); } public void stopRunning(){ this.isRunning = false; } } }
上面接口实现很简单,工做者经过私有内部类,继承Thread 类来当工做者,内部定义两个阻塞队列,用于存储任务跟工做者,其它线程最大数,最小数,任务队列最大数暂时不定义了,这只是一个简单的实现,各位看官有兴趣,能够本身进行扩展测试
接下来就是测试代码,来验证下接口定义的方法this
public class PoolTest { public static void main(String[] args){ //构建一个只有10个线程的线程池 Pool pool = new DefaultThreadPool(10); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //放500个任务进去,让线程池进行消费 for (int i = 0; i < 500; i++) { int finalI = i; pool.execute(new Runnable() { @Override public void run() { System.out.println("打印数字:"+ finalI); } }); } /** * 验证线程池的消费完任务中止以及不等任务队列清空就中止任务 */ System.out.println("中止线程池"); pool.shutDown(); //pool.shutDownNow(); /** * 移除2个工做者 */ //pool.removeWorker(2); //System.out.println("线程池大小:"+pool.poolSize()); /** * 添加5个工做者 */ //pool.addWorker(5); //System.out.println("线程池大小:"+pool.poolSize()); } }
到这,文章就结束了!spa
以上,均为本人我的理解,比较简单的理解,或许跟各位看官理解的有出入,欢迎指正交流线程
欢迎转载,请注明出处跟做者,谢谢!code