多线程编程很难,难点在于多线程代码的执行不是按照咱们直觉上的执行顺序。因此多线程编程必需要创建起一个宏观的认识。java
线程池是多线程编程中的一个重要概念。为了可以更好地使用多线程,学习好线程池固然是必须的。git
平时咱们在使用多线程的时候,一般都是架构师配置好了线程池的 Bean,咱们须要使用的时候,提交一个线程便可,不须要过多关注其内部原理。github
在学习一门新的技术以前,咱们仍是先了解下为何要使用它,使用它可以解决什么问题:面试
例如:编程
记建立线程消耗时间T1,执行任务消耗时间T2,销毁线程消耗时间T3缓存
若是T1+T3>T2,那么是否是说开启一个线程来执行这个任务太不划算了!多线程
正好,线程池缓存线程,可用已有的闲置线程来执行新任务,避免了T1+T3带来的系统开销架构
咱们知道线程能共享系统资源,若是同时执行的线程过多,就有可能致使系统资源不足而产生阻塞的状况并发
运用线程池能有效的控制线程最大并发数,避免以上的问题异步
好比:延时执行、定时循环执行的策略等
运用线程池都能进行很好的实现
在 Java 中,新建一个线程池对象很是简单,Java 自己提供了工具类java.util.concurrent.Executors
,可使用以下代码建立一个固定数量线程的线程池:
ExecutorService service = Executors.newFixedThreadPool(10);
注意:以上代码用来测试还能够,实际使用中最好可以显示地指定相关参数。
咱们能够看下其内部源码实现:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
在阿里巴巴代码规范中,建议咱们本身指定线程池的相关参数,为的是让开发人员可以自行理解线程池建立中的每一个参数,根据实际状况,建立出合理的线程池。接下来,咱们来剖析下java.util.concurrent.ThreadPoolExecutor
的构造方法参数。
java.util.concurrent.ThreadPoolExecutor
有多个构造方法,咱们拿参数最多的构造方法来举例,如下是阿里巴巴代码规范中给出的建立线程池的范例:
ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
贴一张IDEA中的图更方便看:
首先最重要的几个参数,可能就是:corePoolSize
,maximumPoolSize
,workQueue
了,先看下这几个参数的解释:
因为本文是初步了解线程池,因此先理解这几个参数,上文对于这三个参数的解释,基本上跟JDK源码中的注释一致(java.util.concurrent.ThreadPoolExecutor#execute
里的代码)。
咱们编写个程序来方便理解:
// 建立线程池 ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy()); // 等待执行的runnable Runnable runnable = () -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }; // 启动的任务数量 int counts = 1224; for (int i = 0; i < counts; i++) { service.execute(runnable); } // 监控线程池执行状况的代码 ThreadPoolExecutor tpe = ((ThreadPoolExecutor) service); while (true) { System.out.println(); int queueSize = tpe.getQueue().size(); System.out.println("当前排队线程数:" + queueSize); int activeCount = tpe.getActiveCount(); System.out.println("当前活动线程数:" + activeCount); long completedTaskCount = tpe.getCompletedTaskCount(); System.out.println("执行完成线程数:" + completedTaskCount); long taskCount = tpe.getTaskCount(); System.out.println("总线程数:" + taskCount); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } }
线程池的容量与咱们启动的任务数量息息相关。
已知:
咱们修改同时 execute 添加到线程池的 Runnable 数量 counts:
当前排队线程数:0 当前活动线程数:3 执行完成线程数:0 总线程数:3
当前排队线程数:15 当前活动线程数:5 执行完成线程数:0 总线程数:20
当前排队线程数:1024 当前活动线程数:105 执行完成线程数:0 总线程数:1129
java.util.concurrent.RejectedExecutionException
java.util.concurrent.RejectedExecutionException: Task com.bwjava.util.ExecutorServiceUtilTest$$Lambda$1/314265080@725bef66 rejected from java.util.concurrent.ThreadPoolExecutor@2aaf7cc2[Running, pool size = 200, active threads = 200, queued tasks = 1024, completed tasks = 0]
此次的踩坑才是我写这篇文章的初衷,借此机会好好了解下线程池的各个概念。自己这段时间在研究爬虫,为了尽可能提升爬虫的效率,用到了多线程处理。因为代码写得比较随性,因此遇到了一个阻塞的问题,研究了一下才搞明白,模拟的代码以下:
ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy()); @Test public void testBlock() { Runnable runnableOuter = () -> { try { Runnable runnableInner1 = () -> { try { TimeUnit.SECONDS.sleep(3); // 模拟比较耗时的爬虫操做 } catch (InterruptedException e) { e.printStackTrace(); } }; Future<?> submit = service.submit(runnableInner1); submit.get(); // 实际业务中,runnableInner2须要用到此处返回的参数,因此必须get Runnable runnableInner2 = () -> { try { TimeUnit.SECONDS.sleep(5); // 模拟比较耗时的爬虫操做 } catch (InterruptedException e) { e.printStackTrace(); } }; Future<?> submit2 = service.submit(runnableInner2); submit2.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }; for (int i = 0; i < 20; i++) { service.execute(runnableOuter); } ThreadPoolExecutor tpe = ((ThreadPoolExecutor) service); while (true) { System.out.println(); int queueSize = tpe.getQueue().size(); System.out.println("当前排队线程数:" + queueSize); int activeCount = tpe.getActiveCount(); System.out.println("当前活动线程数:" + activeCount); long completedTaskCount = tpe.getCompletedTaskCount(); System.out.println("执行完成线程数:" + completedTaskCount); long taskCount = tpe.getTaskCount(); System.out.println("总线程数:" + taskCount); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }
线程池是前文的线程池,参数彻底不变。线程的监控代码也一致。当咱们运行这个单元测试的时候,会发现打印出来的结果一直是以下:
当前排队线程数:15 当前活动线程数:5 执行完成线程数:0 总线程数:20 当前排队线程数:20 当前活动线程数:5 执行完成线程数:0 总线程数:25 当前排队线程数:20 当前活动线程数:5 执行完成线程数:0 总线程数:25 ……略
根本问题是 Runnable 内部还嵌套了 Runnable ,且他们都提交到了一个线程池。下面分步骤说明问题:
runnableOuter 开始执行,runnableInner1 被提交到线程池,对 runnableInner1 的结果进行 get,致使runnableOuter 被阻塞
用图表示大概为:
既然明白了出错的缘由,那么解决起来就简单了。这个案例告诉咱们,设计一个多线程程序,必定要自顶向下有一个良好的设计,而后再开始编码,不可以盲目地使用多线程、线程池,这样只会致使程序出现莫名其妙的错误。
其实这个我没怎么关注过,曾经在一次面试中被问到过。很简单,java.util.concurrent.ThreadPoolExecutor
提供了Setter方法,能够直接设置相关参数。按我目前的实践经验,几乎没有用到过,可是知道这个聊胜于无吧。特定的复杂场景下应该颇有用。
笔者在实际工程应用中,使用过多线程和消息队列处理过异步任务。不少新手工程师每每弄不清楚这二者的区别。按笔者的浅见:
多线程是用来充分利用多核 CPU 以提升程序性能的一种开发技术,线程池能够维持一个队列保存等待处理的多线程任务,可是因为此队列是内存控制的,因此断电或系统故障后未执行的任务会丢失。
消息队列是为消息处理而生的一门技术。其根据消费者的自身消费能力进行消费的特性使其普遍用于削峰的高并发任务处理。此外利用其去耦合的特性也能够实现代码上的解耦。消息队列大多能够对其消息进行持久化,即便断电也可以恢复未被消费的任务并继续处理。
以上是笔者在学习实践以后对于多线程和消息队列的粗浅认识,初学者切莫混淆二者的做用。
参考文献: