Java 并发编程(四) 线程池的建立与使用

一.前言

在项目中,避免不了要使用多线程,为了不资源浪费和线程数的不可控而出现未知的问题,咱们通常都会使用线程池;java

JDK中给咱们提供了多种能够当即使用的建立线程池的方法,其都是基于ThreadPoolExecutor建立的线程池,因此ThreadPoolExecutor是线程池的基础,咱们主要分析一下ThreadPoolExecutor的使用,以后再分析一下快速建立线程池的方法优劣,再最后分析一下多线程的更加灵活运用;数组

二.ThreadPoolExecutor

2.1 构造方法

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

上边的是AllArgsConstructor,一共有七个参数;缓存

  • int corePoolSize : 核心线程池的数量,也是线程池保持活跃的线程数量;
    • 当新的任务进来时,若执行任务中的线程数量小于corePoolSize,则建立一条新的线程来执行任务, 直至线程数量等于corePoolSize;此后线程池中活跃线程的数量始终为corePoolSize,除非设置了allowCoreThreadTimeOut参数为true;
    • 在线程池建立之初,线程数量是为0的,只有当任务进来没有闲置线程的时候才会建立新线程;
  • int maximumPoolSize : 最大线程数量;用于限制线程池的线程数量;
  • long keepAliveTime, TimeUnit unit : 保持活跃的时间,和时间单位;
    • 当线程池中线程数量大于corePoolSize ,多出的线程在任务执行完成以后继续保持活跃的时间,时间事后线程将会被销毁,直至线程数量为corePoolSize;
  • BlockingQueue<Runnable> workQueue : 阻塞队列;
    • 用于保存当线程数量达到corePoolSize时后续提交的任务;
    • BlockingQueue有几个Impl,在后面会说到;
    • 当阻塞在workQueue中的任务等于队列的长度的时候,也就是队列Full的时候,线程池会建立新的线程来处理任务,maximumPoolSize会限制线程数量的最大值;
  • ThreadFactory threadFactory : 线程工厂,因为设置线程属性;
  • RejectedExecutionHandler handler : 拒绝策略;
    • 当线程池线程数量和阻塞队列同时Full的时候,后续提交任务的处理策略,可以使用已提供的策略也能够自定义实现;后面会说到;

2.2 BlockingQueue

2.2.1 SynchronousQueue

没有任何容量的队列,能够理解为容量为0的队列,当处理任务的线程数量大于等于corePoolSize时,新进任务会直接建立线程执行,若线程数量等于maximumPoolSizes 则会抛出RejectedExecutionException异常;bash

2.2.2 LinkedBlockingQueue

无界队列,内部经过Node链表实现,若使用这种队列则线程池中线程最大数量为corePoolSize,参数maximumPoolSizes是无用的,由于超过corePoolSize的任务都会被放进queue中,且queue无界,不会触发corePoolSize以外的线程建立;服务器

无界队列并不表明真的无界,只是说明该队列可支持无线长度,该队列支持一个有参构造,可设置队列长度,这样maximumPoolSizes就不会失效了;多线程

2.2.3 ArrayBlockingQueue

有界队列,必须设置一个固定容量,所以称之为有界,内部数组实现,可定义公平与非公平策略;并发

2.3 RejectedExecutionHandler

A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.app

当队列和线程都处于Full状态时,新进任务的处理策略,有以下3个默认给出的实现策略供咱们使用,固然也能够自定义,只要实现其less

rejectedExecution(Runnable r, ThreadPoolExecutor e)方法就能够了;ide

2.3.1 ThreadPoolExecutor.AbortPolicy

查看其实现:

/**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }

可知不处理任务,始终抛出一个RejectedExecutionException异常;

该策略可很好的控制服务的线程数量和队列的容量,但应该catch异常信息返回状态码,例如在app请求服务的时候返回服务繁忙请稍后再试的提示;

2.3.2 ThreadPoolExecutor.DiscardPolicy

其rejectedExecution方法实现为空,从其注释也能够看出,该策略默默的丢弃了新进任务,没有任何提示及异常;

因为会致使任务丢失且不可感知,所以应该在特定的场景下使用;

2.3.3 ThreadPoolExecutor.DiscardOldestPolicy

/**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

删除队列中最头部的任务,而后将新进任务插入队列尾部;

该策略会致使任务丢失,与2.3.2同样,除非特定场景不然我的不建议使用;

2.3.4 ThreadPoolExecutor.CallerRunsPolicy

/**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

若是线程池没有挂掉,则使用新进任务的线程直接执行任务,而非等待使用线程池中的线程;

此种策略应该在服务内线程数量可控的范围内,或在咱们很了解服务的线程使用状况下使用;

若短期内有大量的新任务产生,此策略会致使服务内线程数目飙升,与咱们使用线程池的初衷不符;

三 Executors 快速建立线程池

3.1 ExecutorService newFixedThreadPool(int nThreads)

建立固定容量的线程池:

/**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

经过内部实现咱们看到也是利用ThreadPoolExecutor建立,只不过是默认了部分入参而已.

corePoolSize 和 maximumPoolSize 均为int nThreads,从而限制了线程的数量,采用无界队列LinkedBlockingQueue使没有得到线程资源的任务所有进入队列等待,任务不会丢失;参数keepAliveTime默认为0,由于没有多于核心线程数的线程被建立,因此无需设置此值;

固定线程池能够很好的控制服务中线程的数量,很好的避免的线程数量激增,控制了CPU的占用率,但也会带来另外的问题,如果队列中有大量的任务阻塞,势必会致使内存飙升;所以,固定线程池适用于任务并发数量可控的,短期内不会有大量任务提交的场景;

若在短期内有大量任务并发,可是每一个任务的运算不会占用很长时间,能够考虑下面的线程池 : ExecutorService newCachedThreadPool();

3.2 ExecutorService newCachedThreadPool()

缓存线程池 :

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

corePoolSize设置为0,即没有核心线程数量,全部的线程都是用完后超过keepAliveTime时间后就销毁;maximumPoolSize 设置为Integer.MAX_VALUE,基本能够理解为无上限,阻塞队列采用同步队列SynchronousQueue,全部任务即时提交线程执行,即不会有任务被阻塞在队列中.

该线程池适用于短期内有任务并发,但任务都是在短期内能够处理完毕的;maximumPoolSize的值保证了全部任务都能被线程或新建立线程当即处理,keepAliveTime = 60L使得大量线程在处理完当下任务时能够保持活跃等待下一个任务到来,避免每次都会新建立线程带来的开销,在支持建立大量线程的状况下有保证了线程不会被浪费,当线程空闲时间到达指定时间后销毁,又避免了大量线程同时存在,控制的线程的数量;

对于任务处理时间长的场景,线程占用时间过长,每次新进任务都会建立新的线程,线程数会上升,该线程池就不适用了须要考虑其余的线程池;

3.3 ExecutorService newSingleThreadExecutor()

单线程线程池;采用无界队列的核心线程和最大线程都是1的线程池,全部任务会被串行的执行;

3.4 ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

支持延迟执行任务的线程池,maximumPoolSize为Integer.MAX_VALUE;可用于定时任务的执行;还有不少灵活的用法,详细的能够点击直接看第二节;

3.5 ExecutorService newWorkStealingPool(int parallelism)

工做窃取线程池,参数为指定并发等级,默认为服务器CPU的数量;该线程池内部基于ForkJoinPool,具体使用请点击连接跳转;

3.6 自定义线程池工具类

下面提供一个自定义线程池可直接使用,须要结合项目实际状况适当修改:

package com.river.thread;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public enum ContextThreadPool {

    /**
     * 该类的一个实例,经过枚举类实现单例模式
     */
    INSTANCE;

    public ThreadPoolExecutor getThreadPool(){
        return ThreadPoolHolder.pool;
    }

    private static class ThreadPoolHolder{
        /**
         * 阻塞队列的容量
         */
        private final static int CAPACITY = 500;

        private static ThreadPoolExecutor pool ;
        /**
         * 获取处理器数目
         */
        private static int availableProcessors = Runtime.getRuntime().availableProcessors();

        /**
         * 基于LinkedBlockingQueue的容量为{@link CAPACITY}
         */
        private static BlockingQueue queue = new LinkedBlockingQueue(CAPACITY);

        static {
            pool = new ThreadPoolExecutor(
                    availableProcessors * 2,
                    availableProcessors * 4 + 1,
                    0,
                    TimeUnit.MILLISECONDS,
                    queue,
                    new ThreadFactory() {
                        private AtomicInteger count = new AtomicInteger(0);

                        @Override
                        public Thread newThread(Runnable r) {
                            Thread thread = new Thread(r);
                            String threadName = EnvirmentThreadPool.class.getSimpleName() + "-thread-" + count.addAndGet(1);
                            thread.setName(threadName);
                            return thread;
                        }
                    },
                    //自定义线程池FULL时的策略,新的任务阻塞在队列外面;
                    (r, executor) -> {
                        try {
                            queue.put(r);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
            );
        }

    }
}

工具调用:

ContextThreadPool.INSTANCE.getThreadPool();

四. Future

如今咱们拥有了线程池,接下来就须要向线程池提交任务,目前有两种方式:

  • void execute(Runnable command)    
  • <T> Future<T> submit(Callable<T> task)

                                                                             

前者定义在Executor中,用于任务无返回值的使用,后者定义在ExecutorService中,能够拿到任务的结果Future;

咱们都知道,咱们建立线程有几种方式,其中之一之二就是继承Runnable接口和Callable接口,普通使用没有什么区别,可是在线程执行结果的获取上就体现出来了;

@FunctionalInterface
public interface Runnable {
    //void返回
    public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

4.1 get result

  • V get() throws InterruptedException, ExecutionException;
  • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

能够经过Future.上面的方法获取线程返回值,有时候任务执行的时间比较长,在咱们获取结果的时候尚未执行完毕,所以一般调用

boolean isDone();来判断任务是否执行完毕;

package com.river.thread;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
public class FutureTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadPool = ContextThreadPool.INSTANCE.getThreadPool();
        //正常这里我会用lamdba表达式去写,为了明了接口实现
        Future<String> result = threadPool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "123";
            }
        });

        while (true){
            if (result.isDone()){
                System.out.println(result.get());
                break;
            }
            log.info("not finish");
        }

    }
}

日志输出:

2018-09-17 17:47:37.352 myAppName [main] INFO  com.river.thread.FutureTest - not finish
123

这里看到第一次获取结果是没有获取到的,第二次就获取到了;

接下来咱们使用待超时的get()方法获取结果:

public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        ThreadPoolExecutor threadPool = ContextThreadPool.INSTANCE.getThreadPool();
        Future<String> result = threadPool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return "123";
            }
        });

        log.info("get result");
        log.info(result.get(3, TimeUnit.SECONDS));

    }
2018-09-17 17:52:43.348 myAppName [main] INFO  com.river.thread.FutureTest - get result
2018-09-17 17:52:45.349 myAppName [main] INFO  com.river.thread.FutureTest - 123

能够看到2s中以后获取到了执行结果,若是线程执行时间超过获取时间呢?

咱们将sleep参数改为了5000,

2018-09-17 17:55:14.380 myAppName [main] INFO  com.river.thread.FutureTest - get result
Exception in thread "main" java.util.concurrent.TimeoutException
	at java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at com.river.thread.FutureTest.main(FutureTest.java:21)

能够看到抛出了异常;

可是当咱们不尝试get结果的时候,异常是不会被抛出来的,也就是说,Future有持有异常的能力;咱们能够经过在任务执行完毕后catch该异常,从而执行相应的处理办法;

一般状况下,咱们会向线程池提交一个任务集合,将result保存在集合中,最后在遍历集合中的执行结果来获得最终的结果;

相关文章
相关标签/搜索