高并发系列(九)--线程池详解和参数设置(绝对干活)

1、点题html

    这篇将介绍线程池的使用,咱们时常就有一个这样的疑问,为何要使用线程池?不是有更简单的方式建立和开启线程?还有怎么使用线程池?线程池怎么与Springboot整合,意思就是怎么使用使用springBoot机制管理?带着这些疑问开启这篇文章之旅。java

    本篇的文章主要内容:程序员

    1.概念。spring

    2.线程池。缓存

    3.线程池配置。服务器

    4.Springboot管理线程池。多线程

    5.线程池隔离。并发

 

2、概念ide

 

1.为何要使用线程池技术?性能

    若是不使用线程池技术,通常咱们是使用new Thread技术进行建立线程。可是为何不推荐使用这个呢?主要是由于new Thread会带来下面的弊端:

1.每次new Thread新建对象,性能差。

2.线程缺少统一管理,可能无限制的新建线程,互相竞争,有可能占用过多系统资源致使死机或OOM。 

3.缺少更多功能,如更多执行、按期执行、线程中断。

    其实在阿里巴巴规范里也说明了,强制使用线程池建立:

2.使用线程池有什么好处呢?

    固然上面的问题都是解决了的,具体的详细,以下:

1.重用存在的线程,减小对象建立、消亡的开销,性能佳。

2.可有效控制最大并发线程数,提升系统资源利用率,同时能够避免过多资源竞争,避免阻塞。

3.提供定时执行、按期执行、单线程、并发数控制等功能。

    那么线程池的使用场景,或者说是目的又是什么呢?

线程是稀缺资源,不能频繁的建立。

解耦做用;线程的建立于执行彻底分开,方便维护。 

应当将其放入一个池子中,能够给其余任务进行复用。

 

3、线程池

 

1.线程池究竟是什么?

    java.util.concurrent.Executors提供了一个 java.util.concurrent.Executor接口的实现用于建立线程池多线程技术主要解决处理器单元内多个线程执行的问题,它能够显著减小处理器单元的闲置时间,增长处理器单元的吞吐能力。

 

    假设一个服务器完成一项任务所需时间为:T1 建立线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。

    若是:T1 + T3 远大于 T2,则能够采用线程池,以提升服务器性能。

    一个线程池包括如下四个基本组成部分:

  一、线程池管理器(ThreadPool):用于建立并管理线程池,包括 建立线程池,销毁线程池,添加新任务; 

 二、工做线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,能够循环的执行任务;

  三、任务接口(Task):每一个任务必须实现的接口,以供工做线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工做,任务的执行状态等;

  四、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

    线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提升服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。

   线程池不只调整T1,T3产生的时间段,并且它还显著减小了建立线程的数目,看一个例子:

    假设一个服务器一天要处理50000个请求,而且每一个请求须要一个单独的线程完成。在线程池中,线程数通常是固定的,因此产生线程总数不会超过线程池中线程的数目,而若是服务器不利用线程池来处理这些请求则线程总数为50000。通常线程池大小是远小于50000。因此利用线程池的服务器程序不会为了建立50000而在处理请求时浪费时间,从而提升效率。

2.Java中的线程池种类

    2.1 newSingleThreadExecutor

 

ExecutorService pool = Executors.newSingleThreadExecutor();

    一个单线程的线程池。这个线程池只有一个线程在工做,也就是至关于单线程串行执行全部任务。若是这个惟一的线程由于异常结束,那么会有一个新的线程来替代它。此线程池保证全部任务的执行顺序按照任务的提交顺序执行。



public class ThreadPoolExample3 {public static void main(String[] args) {ExecutorService executorService = Executors.newSingleThreadExecutor();for (int i = 0; i < 10; i++) {final int index = i;executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println("t;ask:{}"+ index);}});}executorService.shutdown();}}

    2.2 newFixedThreadPool

 

ExecutorService pool = Executors.newFixedThreadPool(10);

    建立固定大小的线程池。每次提交一个任务就建立一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,若是某个线程由于执行异常而结束,那么线程池会补充一个新线程。



public class ThreadPoolExample2 {public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(3);for (int i = 0; i < 10; i++) {final int index = i;executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println("t;ask:{}"+ index);}});}executorService.shutdown();}}

    2.3 newCachedThreadPool

 

ExecutorService pool = Executors.newCachedThreadPool();

    建立一个可缓存的线程池。若是线程池的大小超过了处理任务所须要的线程,那么就会回收部分空闲的线程,当任务数增长时,此线程池又添加新线程来处理任务。

 



public class ThreadPoolExample1 {public static void main(String[] args) {ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < 10; i++) {final int index = i;executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println("t;ask:{}"+ index);}});}executorService.shutdown();}}

    2.4 newScheduledThreadPool

    此线程池支持定时以及周期性执行任务的需求。

 

ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
  •  
import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class ThreadPool {public static void main(String[] args) {ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);for (int i = 0; i < 10; i++) {pool.schedule(() -> {System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");}, 10, TimeUnit.SECONDS);}}}

    上面演示的是延迟10秒执行任务,若是想要执行周期性的任务能够用下面的方式,每秒执行一次。

//pool.scheduleWithFixedDelay也能够pool.scheduleAtFixedRate(() -> {System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");}, 1, 1, TimeUnit.SECONDS);

    2.5 newWorkStealingPool

    newWorkStealingPool是jdk1.8才有的,会根据所需的并行层次来动态建立和关闭线程,经过使用多个队列减小竞争,底层用的ForkJoinPool来实现的。ForkJoinPool的优点在于,能够充分利用多cpu,多核cpu的优点,把一个任务拆分红多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成以后,再将这些执行结果合并起来便可。

 

    上面的5个线程池,Java已经提供了相对固定的模式,其实还有不足就是,很差写拒绝策略,调整核心数等大小问题。因此,这也是阿里巴巴不推荐的:

4、线程池配置

 

    竟然阿里规范都不推荐,那么咱们应该怎么写呢?,以下,所示:

​​​​​​​

public ExecutorService buildConsumerQueueThreadPool(){ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("consumer-queue-thread-%d").build();ExecutorService pool = new ThreadPoolExecutor(10, Integer.MAX_VALUE, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),namedThreadFactory,new ThreadPoolExecutor.AbortPolicy());return pool ;}

    并发线程池到底怎么设置呢?

 

    在具体分析以前先了解下线程池中所定义的状态,这些状态都和线程的执行密切相关:

  • RUNNING 天然是运行状态,指能够接受任务执行队列里的任务

  • SHUTDOWN 指调用了 shutdown() 方法,再也不接受新任务了,可是队列里的任务得执行完毕。

  • STOP 指调用了 shutdownNow() 方法,再也不接受新任务,同时抛弃阻塞队列里的全部任务并中断全部正在执行任务。

  • TIDYING 全部任务都执行完毕,在调用 shutdown()/shutdownNow() 中都会尝试更新为这个状态。

  • TERMINATED 终止状态,当执行 terminated() 后会更新为这个状态。

 

    在咱们平常业务开发过程当中,或多或少都会用到并发的功能。那么在用到并发功能的过程当中,就确定会碰到这个问题。

    一般有点年纪的程序员或许都据说这样一个说法 (其中 N 表明 CPU 的个数):

CPU 密集型应用,线程池大小设置为 N + 1。

IO 密集型应用,线程池大小设置为 2N。

    其实这是极不正确的。那为何呢?

    1.首先咱们从反面来看,假设这个说法是成立的,那咱们在一台服务器上部署多少个服务都无所谓了。由于线程池的大小只能服务器的核数有关,因此这个说法是不正确的。那具体应该怎么设置大小呢? 

    2.假设这个应用是二者混合型的,其中任务即有 CPU 密集,也有 IO 密集型的,那么咱们改怎么设置呢?是否是只能抛硬盘来决定呢?

    那么咱们到底该怎么设置线程池大小呢?

    咱们可使用利特尔法则(Little’s law)来断定线程池大小。咱们只需计算请求到达率和请求处理的平均时间。而后,将上述值放到利特尔法则(Little’s law)就能够算出系统平均请求数。估算公式以下:

*线程池大小 = ((线程 IO time + 线程 CPU time )/线程 CPU time ) CPU数目**

    经过公式,咱们了解到须要 3 个具体数值:

   1.一个请求所消耗的时间 (线程 IO time + 线程 CPU time)。 

   2.该请求计算时间 (线程 CPU time)。

   3.CPU 数目。

    1.请求消耗时间:Web 服务容器中,能够经过 Filter 来拦截获取该请求先后消耗的时间。

    2.CPU 计算时间 = 请求总耗时 - CPU IO time。假设该请求有一个查询 DB 的操做,只要知道这个查询 DB 的耗时(CPU IO time)。 

    3.逻辑 CPU 个数 ,设置线程池大小的时候参考的 CPU 个数:

    cat /proc/cpuinfo| grep "processor"| wc -l

    按照上诉原则,咱们再来看线程池的7个参数:

corePoolSize: 线程池中核心线程数量 。

maximumPoolSize: 最大线程数量 。

keepAliveTime :空闲时间(当线程池梳理超过核心数量时,多余的空闲时间的存活时间,即超过核心线程数量的空闲线程,在多长时间内,会被销毁) 。

unit: 时间单位 。

workQueue: 当核心线程工做已满,须要存储任务的队列。 

threadFactory: 建立线程的工厂 。

handler: 当队列满了以后的拒绝策略。

    线程池按如下行为执行任务:

    详细配置参考文章:

    https://www.cnblogs.com/syp172654682/p/9383335.html

 

    前面几个参数咱们就不讲了,很简单,主要是后面几个参数,队列,线程工厂,拒绝策略。

    咱们先看看队列,线程池默认提供了 4 个队列:

无界队列(LinkedBlockingQueue): 默认大小 int 最大值,所以可能会耗尽系统内存,引发OOM,很是危险。 

直接提交的队列 : 没有容量,不会保存,直接建立新的线程,所以须要设置很大的线程池数。不然容易执行拒绝策略,也很危险。 

有界队列(ArrayBlockingQueue):若是core满了,则存储在队列中,若是core满了且队列满了,则建立线程,直到maximumPoolSize 到了,若是队列满了且最大线程数已经到了,则执行拒绝策略。 

优先级队列:按照优先级执行任务。也能够设置大小。

    再看看拒绝策略,什么是拒绝策略呢?当队列满了,如何处理那些仍然提交的任务。JDK 默认有4种策略:

AbortPolicy :直接抛出异常,阻止系统正常工做.

CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样作不会真的丢弃任务,可是,任务提交线程的性能极有可能会急剧降低。 

DiscardOldestPolicy: 该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务. 

DiscardPolicy: 该策略默默地丢弃没法处理的任务,不予任何处理,若是容许任务丢失,我以为这是最好的方案.

    如何优雅的关闭线程池呢?   

 

    有运行任务天然也有关闭任务,从上文提到的 5 个状态就能看出如何来关闭线程池。

    其实无非就是两个方法 shutdown()/shutdownNow()

    但他们有着重要的区别:

  • shutdown() 执行后中止接受新任务,会把队列的任务执行完毕。

  • shutdownNow() 也是中止接受新任务,但会中断全部的任务,将线程池状态变为 stop。

两个方法都会中断线程,用户可自行判断是否须要响应中断。

shutdownNow() 要更简单粗暴,能够根据实际场景选择不一样的方法。

pool.awaitTermination(1, TimeUnit.SECONDS) 会每隔一秒钟检查一次是否执行完毕(状态为 TERMINATED),当从 while 循环退出时就代表线程池已经彻底终止了。

    

    如何写个完整的一个线程池?    

 

   了解上面的参数信息后咱们就能够定义本身的线程池了,我这边用ArrayBlockingQueue替换了LinkedBlockingQueue,指定了队列的大小,当任务超出队列大小以后使用CallerRunsPolicy拒绝策略处理。

    这样作的好处是严格控制了队列的大小,不会出现一直往里面添加任务的状况,有的时候任务处理的比较慢,任务数量过多会占用大量内存,致使内存溢出。

    固然你也能够在提交到线程池的入口进行控制,好比用CountDownLatch, Semaphore等。

/*** 自定义线程池<br>* 默认的newFixedThreadPool里的LinkedBlockingQueue是一个无边界队列,若是不断的往里加任务,最终会致使内存的不可控<br> * 增长了有边界的队列,使用了CallerRunsPolicy拒绝策略**/public class FangjiaThreadPoolExecutor {private static ExecutorService executorService = newFixedThreadPool(50);private static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10000), new DefaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());}public static void execute(Runnable command) {executorService.execute(command);}public static void shutdown() {executorService.shutdown();}static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "FSH-pool-" +poolNumber.getAndIncrement() +"-thread-";}public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}}}

 

5、Spring管理线程池

 

    SpringBoot 盛行;来看看在 SpringBoot 中应当怎么配置和使用线程池。        既然用了 SpringBoot ,那天然得发挥 Spring 的特性,因此须要 Spring 来帮咱们管理线程池:







@Configurationpublic class TreadPoolConfig {/*** 消费队列线程* @return*/@Bean(value = "consumerQueueThreadPool")public ExecutorService buildConsumerQueueThreadPool(){ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("consumer-queue-thread-%d").build();ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(5),namedThreadFactory,new ThreadPoolExecutor.AbortPolicy());return pool ;}}

    使用时,很方便:

 




@Resource(name = "consumerQueueThreadPool")private ExecutorService consumerQueueThreadPool;@Overridepublic void execute() {//消费队列for (int i = 0; i < 5; i++) {consumerQueueThreadPool.execute(new ConsumerQueueThread());}}

6、线程池隔离

 

线程池看似很美好,但也会带来一些问题。

    若是咱们不少业务都依赖于同一个线程池,当其中一个业务由于各类不可控的缘由消耗了全部的线程,致使线程池所有占满。    

    这样其余的业务也就不能正常运转了,这对系统的打击是巨大的。    

    好比咱们 Tomcat 接受请求的线程池,假设其中一些响应特别慢,线程资源得不到回收释放;线程池慢慢被占满,最坏的状况就是整个应用都不能提供服务。

    因此咱们须要将线程池进行隔离。

    一般的作法是按照业务进行划分:

好比下单的任务用一个线程池,获取数据的任务用另外一个线程池。这样即便其中一个出现问题把线程池耗尽,那也不会影响其余的任务运行。

    hystrix 隔离

这样的需求 Hystrix 已经帮咱们实现了。

Hystrix 是一款开源的容错插件,具备依赖隔离、系统容错降级等功能。

下面来看看 Hystrix 简单的应用:

首先须要定义两个线程池,分别用于执行订单、处理用户。

相关文章
相关标签/搜索