线程池参数原理及应用

线程池原理

    Java建立一个线程很方便,只需new Thread()就能够, 可是当有多个任务须要进行进行处理时,频繁的进行建立和启用线程一样须要系统开销,也不利于管理,因而同mysql的链接池同样,天然有对线程的管理池即线程池。java

    作个比喻,线程池比如一个公司,那么线程自己就是一个个的员工,来对线程的建立和销毁进行管理,最大化的进行资源的合理调度。mysql

    Java的线程池建立也很简单,concurrent这个并发包下有Executors能够很方便的进行四种经常使用线程的建立:spring

    newFixedThreadPool:建立固定数量的线程的线程池,能够控制最大并发数,经常使用于知道具体任务的数量,须要进行多线程的操做,如批量插入数据库任务,须要进行10万条数据分页,每1万条数据一页,配置一个线程处理,一共配置10个线程,进行并行批量插入,就可使用这个线程池来进行,大大减小响应时间sql

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    newCachedThreadPool: 建立可一段时间内重复利用的线程池,经常使用于不知道具体的任务数量,可是还须要进行并行处理的状况,如springboot @Aysnc就能够指定使用这个线程池,来进行一些埋点等的各类业务的异步处理数据库

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    newSingleThreadExecutor: 建立单个线程的线程池,这个线程池能够在线程死后(或发生异常时)从新启动一个线程来替代原来的线程继续执行下去!编程

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    newScheduledThreadPool: 建立一个能够定时和重复执行的线程池,经常使用于定时任务和延时任务的执行线程池数组

public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

    固然线程池还能够自定义,Java只是提供了几种经常使用的静态线程池的建立方法,以上也已经将4种线程池的建立源码显示出来了,能够发现线程池的建立都是经过new ThreadPoolExecutor()来实现的,如今主要介绍下几个重要的参数和接口:缓存

    首先ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,AbstractExecutorService又实现了ExecutorService接口,ExecutorService接口继承了只有一个方法execute的Executor。springboot

   下面解释下一下构造器中各个参数的含义:多线程

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有很是大的关系。在建立了线程池后,默认状况下,线程池中并无任何线程,而是等待有任务到来才建立线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就能够看出,是预建立线程的意思,即在没有任务到来以前就建立corePoolSize个线程或者一个线程。默认状况下,在建立了线程池后,线程池中的线程数为0,当有任务来以后,就会建立一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个很是重要的参数,它表示在线程池中最多能建立多少个线程;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认状况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起做用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,若是一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。可是若是调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起做用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,分别表明一种时间的单位,秒,分,小时等:
  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,通常来讲,这里的阻塞队列有如下几种选择:  
ArrayBlockingQueue; 有界阻塞队列,由数组实现,须要指定数组大小
LinkedBlockingQueue; 无界阻塞队列,由链表实现,最大值是Integer的最大值 
SynchronousQueue; 这个队列不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
  • threadFactory:线程工厂,主要用来建立线程;
  • handler:表示当拒绝处理任务时的策略,有如下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,可是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,而后从新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    其中注意这几个参数都是volatile修饰的,用来保证多线程下的可见性,咱们也能够根据这些参数的不一样配置,来产生咱们须要的线程池。

    有了线程池后,咱们须要关注几个线程池的状态:

        

    下图代表几个状态之间的转化关系:

        

    接下来就是举个栗子来代表如何使用:

    ExecutorService executorService = Executors.newFixedThreadPool(15);

    在执行完上述代码后,咱们其实就建立了一个有15个核心线程数量,最大也是15个线程数量,空闲线程保存时间为1分钟,采用无限阻塞队列,任务拒绝采用AbortPolicy:丢弃任务并抛出RejectedExecutionException异常的线程池。在建立后,并无进行活跃的线程工人产生,可用线程数为0,好比接下来有10个任务进来,就会建立10个线程工人来进行工做,而且工做完不会销毁,以后又来了10个任务,以前的10个线程尚未处理完他们本身的任务,这个时候就又会建立5个线程工人来进行任务的处理,有小伙伴有疑问了,那剩下的5个任务怎么办呢,对了,还有阻塞队列,这些没有工人处理的任务会进入待办事项般的阻塞队列,先进先出,待15个工人将手头的活办完以后进行依次处理,由于阻塞队列是无界阻塞队列,所以,任务会不断的丢到这个队列中,因此,并不会建立由于队列过小,而不得已建立几个个临时工来处理,这个几个数量即在最大线程和核心线程之间的差值数量,这些临时线程的有效时间只有keepAliveTime的时间,此外在来了多个任务以后,若是队列是有界的,且任务数超过了最大可以建立的线程数,即工人不能再招了,待办事项列表也满了,这个时候公司旧不干了,抛出异常,任务拒绝策略。

    接下了是实战,结合CompletableFuture进行展现:

    简单介绍下CompletableFuture:CompletableFuture提供了很是强大的Future的扩展功能,能够帮助咱们简化异步编程的复杂性,而且提供了函数式编程的能力,能够经过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法,结合线程池能够达到并发编程的目的    

package cn.chinotan;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.*;

/**
 * @program: test
 * @description: 多线程测试
 * @author: xingcheng
 * @create: 2019-03-23 17:27
 **/
@Slf4j
public class ExecutorTest {

    @Test
    public void test() {
        ExecutorService executorService = Executors.newFixedThreadPool(15);

        CompletableFuture[] completableFutures = new CompletableFuture[15];
        List<Integer> integers = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 15; i++) {
            int finalI = i;
            CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> costMethod(finalI), executorService)
                    .whenComplete((r, e) -> {
                        if (null != e) {
                           e.printStackTrace(); 
                        } else {
                            integers.add(r);
                        }
                    });

            completableFutures[i] = integerCompletableFuture;
        }

        CompletableFuture.allOf(completableFutures).join();
        long count = integers.stream().count();
        log.info("一共处理成功:{}", count);
    }

    /**
     * 耗时的操做
     *
     * @param i
     * @return
     */
    public int costMethod(int i) {
        try {
            TimeUnit.SECONDS.sleep(5);
            log.info("耗时的操做 {}", i);
            return 1;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return 0;
        }
    }
}

    运行结果:

    能够看到15个耗时的操做很快就并行执行完成,而且还能返回执行的成功结果数

    以上就是我对线程池的理解和应用,欢迎你们关注和浏览提问,谢谢你们

相关文章
相关标签/搜索