深刻学习java线程池

咱们都是经过new Thread来建立一个线程,因为线程的建立和销毁都须要消耗必定的CPU资源,因此在高并发下这种建立线程的方式将严重影响代码执行效率。而线程池的做用就是让一个线程执行结束后不立刻销毁,继续执行新的任务,这样就节省了不断建立线程和销毁线程的开销。编程

ThreadPoolExecutor

建立Java线程池最为核心的类为ThreadPoolExecutor:并发

QQ截图20190630215357.png

它提供了四种构造函数来建立线程池,其中最为核心的构造函数以下所示:dom

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
复制代码

这7个参数的含义以下:ide

  • corePoolSize 线程池核心线程数。即线程池中保留的线程个数,即便这些线程是空闲的,也不会被销毁,除非经过ThreadPoolExecutor的allowCoreThreadTimeOut(true)方法开启了核心线程的超时策略;函数

  • maximumPoolSize 线程池中容许的最大线程个数;高并发

  • keepAliveTime 用于设置那些超出核心线程数量的线程的最大等待时间,超过这个时间尚未新任务的话,超出的线程将被销毁;this

  • unit 超时时间单位;spa

  • workQueue 线程队列。用于保存经过execute方法提交的,等待被执行的任务;线程

  • threadFactory 线程建立工程,即指定怎样建立线程;3d

  • handler 拒绝策略。即指定当线程提交的数量超出了maximumPoolSize后,该使用什么策略处理超出的线程。

在经过这个构造方法建立线程池的时候,这几个参数必须知足如下条件,不然将抛出IllegalArgumentException异常:

  • corePoolSize不能小于0;

  • keepAliveTime不能小于0;

  • maximumPoolSize 不能小于等于0;

  • maximumPoolSize不能小于corePoolSize;

此外,workQueue、threadFactory和handler不能为null,不然将抛出空指针异常。

下面举些例子来深刻理解这几个参数的含义。

使用上面的构造方法建立一个线程池:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1, 2, 10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
                new ThreadPoolExecutor.AbortPolicy());
System.out.println("线程池建立完毕");

int activeCount = -1;
int queueSize = -1;
while (true) {
    if (activeCount != threadPoolExecutor.getActiveCount()
            || queueSize != threadPoolExecutor.getQueue().size()) {
        System.out.println("活跃线程个数 " + threadPoolExecutor.getActiveCount());
        System.out.println("核心线程个数 " + threadPoolExecutor.getCorePoolSize());
        System.out.println("队列线程个数 " + threadPoolExecutor.getQueue().size());
        System.out.println("最大线程数 " + threadPoolExecutor.getMaximumPoolSize());
        System.out.println("------------------------------------");
        activeCount = threadPoolExecutor.getActiveCount();
        queueSize = threadPoolExecutor.getQueue().size();
    }
}
复制代码

上面的代码建立了一个核心线程数量为1,容许最大线程数量为2,最大活跃时间为10秒,线程队列长度为1的线程池。

假如咱们经过execute方法向线程池提交1个任务,看看结果如何:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1, 2, 10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
                new ThreadPoolExecutor.AbortPolicy());
System.out.println("线程池建立完毕");

threadPoolExecutor.execute(() -> sleep(100));

int activeCount = -1;
int queueSize = -1;
while (true) {
    if (activeCount != threadPoolExecutor.getActiveCount()
            || queueSize != threadPoolExecutor.getQueue().size()) {
        System.out.println("活跃线程个数 " + threadPoolExecutor.getActiveCount());
        System.out.println("核心线程个数 " + threadPoolExecutor.getCorePoolSize());
        System.out.println("队列线程个数 " + threadPoolExecutor.getQueue().size());
        System.out.println("最大线程数 " + threadPoolExecutor.getMaximumPoolSize());
        System.out.println("------------------------------------");
        activeCount = threadPoolExecutor.getActiveCount();
        queueSize = threadPoolExecutor.getQueue().size();
    }
}
复制代码

ThreadPoolExecutor的execute和submit方法均可以向线程池提交任务,区别是,submit方法可以返回执行结果,返回值类型为Future

sleep方法代码:

private static void sleep(long value) {
    try {
        System.out.println(Thread.currentThread().getName() + "线程执行sleep方法");
        TimeUnit.SECONDS.sleep(value);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
复制代码

启动程序,控制台输出以下:

QQ截图20190630222238.png

线程池核心线程数量为1,经过execute提交了一个任务后,因为核心线程是空闲的,因此任务被执行了。因为这个任务的逻辑是休眠100秒,因此在这100秒内,线程池的活跃线程数量为1。此外,由于提交的任务被核心线程执行了,因此并无线程须要被放到线程队列里等待,线程队列长度为0。

假如咱们经过execute方法向线程池提交2个任务,看看结果如何:

threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
复制代码

QQ截图20190701183457.png

线程池核心线程数量为1,经过execute提交了2个任务后,一开始核心线程是空闲的,Thread-0被执行。因为这个任务的逻辑是休眠100秒,因此在这100秒内,线程池的活跃线程数量为1。由于核心线程数量为1,因此另一个任务在这100秒内不能被执行,因而被放到线程队列里等待,线程队列长度为1。

假如咱们经过execute方法向线程池提交3个任务,看看结果如何:

threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
复制代码

QQ截图20190701184303.png

这三个任务都是休眠100秒,因此核心线程池中第一个任务正在被执行,第二个任务被放入到了线程队列。而当第三个任务被提交进来时,线程队列满了(咱们定义的长度为1),因为该线程池容许的最大线程数量为2,因此线程池还能够再建立一个线程来执行另一个任务,因而乎以前在线程队列里的线程被取出执行(FIFO),第三个任务被放入到了线程队列。

改变第二个和第三个任务的睡眠时间,观察输出:

threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(5));
threadPoolExecutor.execute(() -> sleep(5));
复制代码

QQ截图20190701185215.png

第二个任务提交5秒后,任务执行完毕,因此线程队列里的任务被执行,因而队列线程个数为0,活跃线程数量为2(第一个和第三个任务)。再过5秒后,第三个任务执行完毕,因而活跃线程数量为1(第一个100秒还没执行完毕)。

在第三个任务结束的瞬间,咱们观察线程快照:

QQ截图20190701185617.png

能够看到,线程池中有两个线程,Thread-0在执行第一个任务(休眠100秒,还没结束),Thread-1执行完第三个任务后并无立刻被销毁。过段时间后(10秒钟后)再观察线程快照:

QQ截图20190701190444.png

能够看到,Thread-1这个线程被销毁了,由于咱们在建立线程池的时候,指定keepAliveTime 为10秒,10秒后,超出核心线程池线程外的那些线程将被销毁。

假如一次性提交4个任务,看看会怎样:

threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
复制代码

QQ截图20190701190808.png

由于咱们设置的拒绝策略为AbortPolicy,因此最后提交的那个任务直接被拒绝了。更多拒绝策略下面会介绍到。

关闭线程池

线程池包含如下几个状态:

QQ截图20190702100110.png

当线程池中全部任务都处理完毕后,线程并不会本身关闭。咱们能够经过调用shutdown和shutdownNow方法来关闭线程池。二者的区别在于:

shutdown方法将线程池置为shutdown状态,拒绝新的任务提交,但线程池并不会立刻关闭,而是等待全部正在折行的和线程队列里的任务都执行完毕后,线程池才会被关闭。因此这个方法是平滑的关闭线程池。

shutdownNow方法将线程池置为stop状态,拒绝新的任务提交,中断正在执行的那些任务,而且清除线程队列里的任务并返回。因此这个方法是比较“暴力”的。

举两个例子观察下二者的区别:

shutdown例子:

public static void main(String[] args) {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 4, 10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy());

    threadPoolExecutor.execute(new shortTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new shortTask());

    threadPoolExecutor.shutdown();
    System.out.println("已经执行了线程池shutdown方法");
}

static class shortTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "执行shortTask完毕");
        } catch (InterruptedException e) {
            System.err.println("shortTask执行过程当中被打断" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "执行longTask完毕");
        } catch (InterruptedException e) {
            System.err.println("longTask执行过程当中被打断" + e.getMessage());
        }
    }
}
复制代码

启动程序,控制台输出以下:

QQ截图20190702101041.png

能够看到,虽然在任务都被提交后立刻执行了shutdown方法,可是并不会立刻关闭线程池,而是等待全部被提交的任务都执行完了才关闭。

shutdownNow例子:

public static void main(String[] args) {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 4, 10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy());

    threadPoolExecutor.execute(new shortTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new shortTask());

    List<Runnable> runnables = threadPoolExecutor.shutdownNow(); // 立刻关闭,并返回还未被执行的任务
    System.out.println(runnables);

    System.out.println("已经执行了线程池shutdownNow方法");
}

static class shortTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "执行shortTask完毕");
        } catch (InterruptedException e) {
            System.err.println("shortTask执行过程当中被打断" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "执行longTask完毕");
        } catch (InterruptedException e) {
            System.err.println("longTask执行过程当中被打断" + e.getMessage());
        }
    }
}
复制代码

启动程序,控制台输出以下:

QQ截图20190702101355.png

能够看到,在执行shutdownNow方法后,线程池立刻就被关闭了,正在执行中的两个任务被打断,而且返回了线程队列中等待被执行的两个任务。

经过上面两个例子咱们还能够看到shutdown和shutdownNow方法都不是阻塞的。常与shutdown搭配的方法有awaitTermination。

awaitTermination方法接收timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,不然返回false。该方法是阻塞的:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 4, 10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy());

    threadPoolExecutor.execute(new shortTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new shortTask());

    threadPoolExecutor.shutdown();
    boolean isShutdown = threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
    if (isShutdown) {
        System.out.println("线程池在3秒内成功关闭");
    } else {
        System.out.println("等了3秒还没关闭,不等了╰(‵□′)╯");
    }
    System.out.println("------------");
}

static class shortTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "执行shortTask完毕");
        } catch (InterruptedException e) {
            System.err.println("shortTask执行过程当中被打断" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "执行longTask完毕");
        } catch (InterruptedException e) {
            System.err.println("longTask执行过程当中被打断" + e.getMessage());
        }
    }
}
复制代码

启动程序输出以下:

QQ截图20190702102156.png

4大拒绝策略

当线程池没法再接收新的任务的时候,可采起以下四种策略:

QQ截图20190302111014.png

CallerRunsPolicy CallerRunsPolicy策略:由调用线程处理该任务:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 3, 10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.CallerRunsPolicy());

    threadPoolExecutor.execute(new shortTask("任务1"));
    threadPoolExecutor.execute(new longTask("任务2"));
    threadPoolExecutor.execute(new longTask("任务3"));
    threadPoolExecutor.execute(new shortTask("任务4"));
    threadPoolExecutor.execute(new shortTask("任务5"));

    threadPoolExecutor.shutdown();
}

static class shortTask implements Runnable {

    private String name;

    public shortTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "执行shortTask-name-" + name + "完毕");
        } catch (InterruptedException e) {
            System.err.println("shortTask执行过程当中被打断" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {

    private String name;

    public longTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "执行longTask-name-" + name + "完毕");
        } catch (InterruptedException e) {
            System.err.println("longTask执行过程当中被打断" + e.getMessage());
        }
    }
}
复制代码

上面的线程池最多只能一次性提交4个任务,第5个任务提交后会被拒绝策略处理。启动程序输出以下:

QQ截图20190702103818.png

能够看到,第5个提交的任务由调用线程(即main线程)处理该任务。

AbortPolicy AbortPolicy策略:丢弃任务,并抛出RejectedExecutionException异常。前面的例子就是使用该策略,因此再也不演示。

DiscardOldestPolicy DiscardOldestPolicy策略:丢弃最先被放入到线程队列的任务,将新提交的任务放入到线程队列末端:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 3, 10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.DiscardOldestPolicy());

    threadPoolExecutor.execute(new shortTask("任务1"));
    threadPoolExecutor.execute(new longTask("任务2"));
    threadPoolExecutor.execute(new longTask("任务3"));
    threadPoolExecutor.execute(new shortTask("任务4"));
    threadPoolExecutor.execute(new shortTask("任务5"));

    threadPoolExecutor.shutdown();
}

static class shortTask implements Runnable {

    private String name;

    public shortTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "执行shortTask-name-" + name + "完毕");
        } catch (InterruptedException e) {
            System.err.println("shortTask执行过程当中被打断" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {

    private String name;

    public longTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "执行longTask-name-" + name + "完毕");
        } catch (InterruptedException e) {
            System.err.println("longTask执行过程当中被打断" + e.getMessage());
        }
    }
}
复制代码

启动程序输出以下:

QQ截图20190702105646.png

能够看到最后提交的任务被执行了,而第3个任务是第一个被放到线程队列的任务,被丢弃了。

DiscardPolicy DiscardPolicy策略:直接丢弃新的任务,不抛异常:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 3, 10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.DiscardPolicy());

    threadPoolExecutor.execute(new shortTask("任务1"));
    threadPoolExecutor.execute(new longTask("任务2"));
    threadPoolExecutor.execute(new longTask("任务3"));
    threadPoolExecutor.execute(new shortTask("任务4"));
    threadPoolExecutor.execute(new shortTask("任务5"));

    threadPoolExecutor.shutdown();
}

static class shortTask implements Runnable {

    private String name;

    public shortTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "执行shortTask-name-" + name + "完毕");
        } catch (InterruptedException e) {
            System.err.println("shortTask执行过程当中被打断" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {

    private String name;

    public longTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "执行longTask-name-" + name + "完毕");
        } catch (InterruptedException e) {
            System.err.println("longTask执行过程当中被打断" + e.getMessage());
        }
    }
}
复制代码

启动程序,输出以下:

QQ截图20190702110022.png

第5个任务直接被拒绝丢弃了,而没有抛出任何异常。

线程池工厂方法

除了使用ThreadPoolExecutor的构造方法建立线程池外,咱们也可使用Executors提供的工厂方法来建立不一样类型的线程池:

QQ截图20190702110350.png

newFixedThreadPool 查看newFixedThreadPool方法源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
复制代码

能够看到,经过newFixedThreadPool建立的是一个固定大小的线程池,大小由nThreads参数指定,它具备以下几个特色:

由于corePoolSize和maximumPoolSize的值都为nThreads,因此线程池中线程数量永远等于nThreads,不可能新建除了核心线程数的线程来处理任务,即keepAliveTime实际上在这里是无效的。

LinkedBlockingQueue是一个无界队列(最大长度为Integer.MAX_VALUE),因此这个线程池理论是能够无限的接收新的任务,这就是为何上面没有指定拒绝策略的缘由。

newCachedThreadPool 查看newCachedThreadPool方法源码:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
复制代码

这是一个理论上无限大小的线程池:

核心线程数为0,SynchronousQueue队列是没有长度的队列,因此当有新的任务提交,若是有空闲的还未超时的(最大空闲时间60秒)线程则执行该任务,不然新增一个线程来处理该任务。

由于线程数量没有限制,理论上能够接收无限个新任务,因此这里也没有指定拒绝策略。

newSingleThreadExecutor 查看newSingleThreadExecutor源码:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
复制代码

核心线程数和最大线程数都为1,每次只能有一个线程处理任务。

LinkedBlockingQueue队列能够接收无限个新任务。

newScheduledThreadPool 查看newScheduledThreadPool源码:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
   
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
复制代码

因此newScheduledThreadPool理论是也是能够接收无限个任务,DelayedWorkQueue也是一个无界队列。

使用newScheduledThreadPool建立的线程池除了能够处理普通的Runnable任务外,它还具备调度的功能:

1.延迟指定时间后执行:

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延迟5秒执行
executorService.schedule(() -> System.out.println("hello"), 5, TimeUnit.SECONDS);
复制代码

2.按指定的速率执行:

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延迟1秒执行,而后每5秒执行一次
executorService.scheduleAtFixedRate(
        () -> System.out.println(LocalTime.now()), 1, 5, TimeUnit.SECONDS
);
复制代码

QQ截图20190702152117.png

3.按指定的时延执行:

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(
        () -> System.out.println(LocalTime.now()), 1, 5, TimeUnit.SECONDS
);
复制代码

QQ截图20190702152440.png

乍一看,scheduleAtFixedRate和scheduleWithFixedDelay没啥区别,实际它们仍是有区别的:

scheduleAtFixedRate按照固定速率执行任务,好比每5秒执行一个任务,即便上一个任务没有结束,5秒后也会开始处理新的任务;

scheduleWithFixedDelay按照固定的时延处理任务,好比每延迟5秒执行一个任务,不管上一个任务处理了1秒,1分钟仍是1小时,下一个任务老是在上一个任务执行完毕后5秒钟后开始执行。

对于这些线程池工厂方法的使用,阿里巴巴编程规程指出:

QQ截图20190702153306.png

由于这几个线程池理论是均可以接收无限个任务,因此这就有内存溢出的风险。实际上只要咱们掌握了ThreadPoolExecutor构造函数7个参数的含义,咱们就能够根据不一样的业务来建立出符合需求的线程池。通常线程池的建立能够参考以下规则:

IO密集型任务,线程池线程数量能够设置为2 X CPU核心数;

计算密集型任务,线程池线程数量能够设置为CPU核心数 + 1。

一些API的用法 ThreadPoolExecutor提供了几个判断线程池状态的方法:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            1, 2, 5, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy()
    );

    threadPoolExecutor.execute(() -> {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    threadPoolExecutor.shutdown();
    System.out.println("线程池为shutdown状态:" + threadPoolExecutor.isShutdown());
    System.out.println("线程池正在关闭:" + threadPoolExecutor.isTerminating());
    System.out.println("线程池已经关闭:" + threadPoolExecutor.isTerminated());
    threadPoolExecutor.awaitTermination(6, TimeUnit.SECONDS);
    System.out.println("线程池已经关闭" + threadPoolExecutor.isTerminated());
}
复制代码

程序输出以下:

20190703205843.png

前面咱们提到,线程池核心线程即便是空闲状态也不会被销毁,除非使用allowCoreThreadTimeOut设置了容许核心线程超时:

public static void main(String[] args) throws InterruptedException {
       ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
               1, 2, 3, TimeUnit.SECONDS,
               new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
               new ThreadPoolExecutor.AbortPolicy()
       );
       threadPoolExecutor.allowCoreThreadTimeOut(true);
       threadPoolExecutor.execute(() -> {
           try {
               TimeUnit.SECONDS.sleep(5);
               System.out.println("任务执行完毕");
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       });
   }
复制代码

程序输出以下所示:

asdfasdfaaaaa.gif

5秒后任务执行完毕,核心线程处于空闲的状态。由于经过allowCoreThreadTimeOut方法设置了容许核心线程超时,因此3秒后(keepAliveTime设置为3秒),核心线程被销毁。核心线程被销毁后,线程池也就没有做用了,因而就自动关闭了。

值得注意的是,若是一个线程池调用了allowCoreThreadTimeOut(true)方法,那么它的keepAliveTime不能为0。

ThreadPoolExecutor提供了一remove方法,查看其源码:

public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}
复制代码

可看到,它删除的是线程队列中的任务,而非正在被执行的任务。举个例子:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            1, 2, 3, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy()
    );
    threadPoolExecutor.execute(() -> {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println("任务执行完毕");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    Runnable r = () -> System.out.println("看看我是否会被删除");
    threadPoolExecutor.execute(r);
    threadPoolExecutor.remove(r);

    threadPoolExecutor.shutdown();
}
复制代码

执行程序,输出以下:

QQ截图20190703211746.png

可看到任务并无被执行,已经被删除,由于惟一一个核心线程已经在执行任务了,因此后提交的这个任务被放到了线程队列里,而后经过remove方法删除。

默认状况下,只有当往线程池里提交了任务后,线程池才会启动核心线程处理任务。咱们能够经过调用prestartCoreThread方法,让核心线程即便没有任务提交,也处于等待执行任务的活跃状态:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 2, 3, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy()
    );
    System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());
    threadPoolExecutor.prestartCoreThread();
    System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());
    threadPoolExecutor.prestartCoreThread();
    System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());
    threadPoolExecutor.prestartCoreThread();
    System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());
}
复制代码

程序输出以下所示:

QQ截图20190703213145.png

该方法返回boolean类型值,若是因此核心线程都启动了,返回false,反之返回true。

还有一个和它相似的prestartAllCoreThreads方法,它的做用是一次性启动全部核心线程,让其处于活跃地等待执行任务的状态。

ThreadPoolExecutor的invokeAny方法用于随机执行任务集合中的某个任务,并返回执行结果,该方法是同步方法:

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 5, 3, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy()
    );

    // 任务集合
    List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
        TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
        return i;
    }).collect(Collectors.toList());
    // 随机执行结果
    Integer result = threadPoolExecutor.invokeAny(tasks);
    System.out.println("-------------------");
    System.out.println(result);
    threadPoolExecutor.shutdownNow();
}
复制代码

启动程序,输出以下:

QQ截图20190704091530.png

ThreadPoolExecutor的invokeAll则是执行任务集合中的全部任务,返回Future集合:

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 5, 3, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy()
    );

    List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
        TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
        return i;
    }).collect(Collectors.toList());

    List<Future<Integer>> futureList = threadPoolExecutor.invokeAll(tasks);
    futureList.stream().map(f->{
        try {
            return f.get();
        } catch (InterruptedException | ExecutionException e) {
           return null;
        }
    }).forEach(System.out::println);

    threadPoolExecutor.shutdownNow();
}
复制代码

输出以下:

QQ截图20190704091836.png

总结下这些方法:

方法 描述
allowCoreThreadTimeOut(boolean value) 是否容许核心线程空闲后超时,是的话超时后核心线程将销毁,线程池自动关闭
awaitTermination(long timeout, TimeUnit unit) 阻塞当前线程,等待线程池关闭,timeout用于指定等待时间。
execute(Runnable command) 向线程池提交任务,没有返回值
submit(Runnable task) 向线程池提交任务,返回Future
isShutdown() 判断线程池是否为shutdown状态
isTerminating() 判断线程池是否正在关闭
isTerminated() 判断线程池是否已经关闭
remove(Runnable task) 移除线程队列中的指定任务
prestartCoreThread() 提早让一个核心线程处于活跃状态,等待执行任务
prestartAllCoreThreads() 提早让全部核心线程处于活跃状态,等待执行任务
getActiveCount() 获取线程池活跃线程数
getCorePoolSize() 获取线程池核心线程数
threadPoolExecutor.getQueue() 获取线程池线程队列
getMaximumPoolSize() 获取线程池最大线程数
shutdown() 让线程池处于shutdown状态,再也不接收任务,等待全部正在运行中的任务结束后,关闭线程池。
shutdownNow() 让线程池处于stop状态,再也不接受任务,尝试打断正在运行中的任务,并关闭线程池,返回线程队列中的任务。
相关文章
相关标签/搜索