Java并发编程(7)- 线程调度 - 线程池

线程池

平时有接触过多线程开发的小伙伴们应该都或多或少都有了解、使用过线程池,而《阿里巴巴 Java 手册》里也有一条规范:
Java并发编程(7)- 线程调度 - 线程池java

因而可知线程池的重要性,线程池对于限制应用程序中同一时刻运行的线程数颇有用。由于每启动一个新线程都会有相应的性能开销,每一个线程都须要给栈分配一些内存等等。spring

咱们能够把并发执行的任务传递给一个线程池,来替代为每一个并发执行的任务都启动一个新的线程。只要池里有空闲的线程,任务就会分配给一个线程执行。在线程池的内部,任务被插入一个阻塞队列(Blocking Queue ),线程池里的线程会去取这个队列里的任务。当一个新任务插入队列时,一个空闲线程就会成功的从队列中取出任务而且执行它。编程

线程池常常应用在多线程服务器上。每一个经过网络到达服务器的链接都被包装成一个任务而且传递给线程池。线程池的线程会并发的处理链接上的请求。缓存

简单来讲使用线程池有如下几个目的:服务器

  • 线程是稀缺资源,不能频繁的建立。应当将其放入一个池子中,能够给其余任务进行复用,减小对象建立、消亡的开销,性能好
  • 解耦做用;线程的建立于执行彻底分开,方便维护。
  • 线程池可有效控制最大并发线程数,提升系统资源利用率,同时能够避免过多资源竞争,避免阻塞
  • 线程池可提供定时执行、按期执行、单线程以及并发数控制等功能

直接new Thread的弊端:网络

  • 每次new Thread 新建对象,性能差
  • 线程缺少统一管理,可能无限制的新建线程,相互竞争,经常会出现占用过多的系统资源致使死机或者发生OOM(out of memory 内存溢出),这种问题的缘由不是由于单纯的new一个Thread,而是可能由于程序的bug或者设计上的缺陷致使不断new Thread形成的。
  • 缺乏更多功能,如更多执行、按期执行、线程中断等

线程池原理:多线程

  • 谈到线程池就会想到池化技术,其中最核心的思想就是把宝贵的资源放到一个池子中;每次使用都从里面获取,用完以后又放回池子供其余人使用,有点吃大锅饭的意思。

线程池类图:
Java并发编程(7)- 线程调度 - 线程池并发

在上边的类图中,最上层就是Executor框架,它是一个根据一组执行策略的调用调度执行和控制异步任务的框架,目的是提供一种将任务提交与任务如何运行分离开的机制。它包含了三个executor接口:框架

  • Executor:运行新任务的简单接口
  • ExecutorService:扩展了Executor,添加了用来管理执行器生命周期和任务生命周期的方法
  • ScheduledExecutorService:扩展了ExecutorService,支持Future和按期执行任务

在类图中,咱们最常使用的是ThreadPoolExecutor和Executors,这两个类均可以建立线程池,其中ThreadPoolExecutor是可定制化的去建立线程池,而Executors则属因而工具类,该类中已经封装好了一些建立线程池的方法,直接调用相应的方法便可建立线程。异步

但《阿里巴巴 Java 手册》里有一条规范指明不容许使用Executors建立线程池,具体以下:
Java并发编程(7)- 线程调度 - 线程池

能够说线程池体系里最为核心的类是ThreadPoolExecutor,也是功能最强的,ThreadPoolExecutor共有四个构造函数,以下:
Java并发编程(7)- 线程调度 - 线程池

线程池参数

其中最多可传入七个参数,这七个参数配合起来,构成了线程池强大的功能。参数说明:

corePoolSize:核心线程数量

maximumPoolSize:线程最大线程数

workQueue:阻塞队列,存储等待执行的任务,很重要,会对线程池运行过程产生重大影响

keepAliveTime:线程没有任务执行时最多保持多久时间终止(当线程中的线程数量大于corePoolSize的时候,若是这时没有新的任务提交核心线程外的线程不会当即销毁,而是等待,直到等待的时间超过keepAliveTime)

unit:keepAliveTime的时间单位

threadFactory:线程工厂,用来建立线程,若不设置则使用默认的工厂来建立线程,这样新建立出来的线程会具备相同的优先级,而且是非守护的线程,同时也会设置好名称

rejectHandler:当拒绝处理任务时(阻塞队列满)的策略(AbortPolicy默认策略直接抛出异常、CallerRunsPolicy用调用者所在的线程执行任务、DiscardOldestPolicy丢弃队列中最靠前的任务并执行当前任务、DiscardPolicy直接丢弃当前任务)

拒绝策略的实现类都在TreadPoolExecutor中:
Java并发编程(7)- 线程调度 - 线程池

咱们来讲一下其中corePoolSize、maximumPoolSize、workQueue 这三个参数的关系:

若是运行的线程数量小于corePoolSize的时候,直接建立新线程来处理任务。即便线程池中的其余线程是空闲的。若是线程池中的线程数量大于corePoolSize且小于maximumPoolSize时,那么只有当workQueue满的时候才建立新的线程去处理任务。若是corePoolSize与maximumPoolSize是相同的,那么建立的线程池大小是固定的。这时若是有新任务提交,且workQueue未满时,就把请求放入workQueue中,等待空闲线程从workQueue取出任务进行处理。若是须要运行的线程数量大于maximumPoolSize时,而且此时workQueue也满了,那么就使用rejectHandler参数所指定的拒绝策略去进行处理。

而后咱们来具体介绍一下 workQueue, 它是保存待执行任务的一个阻塞队列,当咱们提交一个新的任务到线程池后,线程池会根据当前池中正在运行的线程数量来决定该任务的处理方式。处理方式总共有三种:

一、直接切换(SynchronusQueue)

二、×××队列(LinkedBlockingQueue),若使用该队列,那么线程池中可以建立的最大线程数为corePoolSize,这时maximumPoolSize就不会起做用了。当线程池中全部的核心线程都是运行状态的时候,新的任务提交就会放入等待队列中。

三、有界队列(ArrayBlockingQueue),使用该队列能够将线程池中的最大线程数量限制为maximumPoolSize参数所指定的值,这种方式可以下降资源消耗,可是这种方式使得线程池对线程调度变的更困难。由于此时线程池与队列容量都是有限的了,因此想让线程池处理任务的吞吐率达到一个合理的范围,又想使咱们的线程调度相对简单,而且还尽量下降线程池对资源的消耗,那么咱们就须要合理的设置corePoolSize和maximumPoolSize这两个参数的值

分配技巧: 若是想下降资源的消耗包括下降cpu使用率、操做系统资源的消耗、上下文切换的开销等等,能够设置一个较大的队列容量和较小的线程池容量,这样会下降线程池处理任务的吞吐量。若是咱们提交的任务常常发生阻塞,咱们能够考虑调用相关方法调整maximumPoolSize参数的值。若是咱们的队列容量较小,一般须要把线程池的容量设置得大一些,这样cpu的使用率相对来讲会高一些。可是若是线程池的容量设置的过大,提升任务的数量过多的时候,并发量会增长,那么线程之间的调度就是一个须要考虑的问题,这样反而可能会下降处理任务的吞吐量。


线程池状态

线程池有五种状态,线程池状态转换过程图以下:
Java并发编程(7)- 线程调度 - 线程池

  • running:运行状态,能接受新提交的任务,也能处理阻塞队列中的任务
  • shutdown:关闭状态,不能处理新的任务,但却能够继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程当中也会调用shutdown()方法进入该状态);
  • stop:中止状态,不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
  • tidying:若是全部的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
  • terminated:最终状态,在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有作。

线程池经常使用方法:

方法名 描述
execute() 提交任务,交给线程池执行
submit() 提交任务,可以返回执行结果 execute+Future
shutdown() 关闭线程池,等待任务都执行完
shutdownNow() 马上关闭线程池,不等待任务执行完
getTaskCount() 线程池已执行和未执行的任务总数
getCompleteTaskCount() 已完成的任务数量
getPoolSize() 线程池当前的线程数量
getActiveCount() 当前线程池中正在执行任务的线程数量

使用Executors建立线程池

上文中咱们提到了可使用Executors工具类方便的建立线程,该类中提供了四种建立线程池的方法,以下:

方法名 描述
newCachedThreadPool 建立一个可缓存线程池,若是线程池长度超过处理须要,可灵活回收空闲线程,若无可回收,则新建线程
newFixedThreadPool 建立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
newScheduledThreadPool 建立一个定长线程池,支持定时及周期性任务执行
newSingleThreadExecutor 建立一个单线程化的线程池,它只会用惟一的工做线程来执行任务,保证全部任务按照指定顺序(FIFO, LIFO, 优先级)执行

newCachedThreadPool使用示例:

@Slf4j
public class ThreadPoolExample1 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 若需使用ThreadPoolExecutor里的方法,则须要进行强转
//        ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> log.info("task: {}", index));
        }
        executorService.shutdown();
    }
}

newFixedThreadPool使用示例:

@Slf4j
public class ThreadPoolExample2 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> log.info("task: {}", index));
        }
        executorService.shutdown();
    }
}

newSingleThreadExecutor使用示例:

@Slf4j
public class ThreadPoolExample3 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> log.info("task: {}", index));
        }
        executorService.shutdown();
    }
}

newScheduledThreadPool使用示例:

@Slf4j
public class ThreadPoolExample4 {
    public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);

        // 延迟3秒执行
        executorService.schedule(() -> log.info("Scheduled run"), 3, TimeUnit.SECONDS);

        // 以指定的速率执行任务,这里是每隔3秒执行一次任务
        executorService.scheduleAtFixedRate(() -> log.info("Scheduled run"), 1, 3, TimeUnit.SECONDS);

        // 以指定的延迟执行任务,这里是延迟3秒执行一次任务,使用起来和scheduleAtFixedRate基本同样
        executorService.scheduleWithFixedDelay(() -> log.info("Scheduled run"), 1, 3, TimeUnit.SECONDS);

        executorService.shutdown();
    }
}

关于延迟执行任务的操做,在Java中还可使用Timer类进行实现,以下:

@Slf4j
public class ThreadPoolExample4 {
    public static void main(String[] args) {
        Timer timer = new Timer();
        // 每隔3秒执行一次任务
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                log.info("timer task run");
            }
        }, new Date(), 3000);
    }
}

虽然可行,可是并不建议这么使用,在多线程并行处理定时任务时,Timer运行多个TimeTask的话,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。


使用ThreadPoolExecutor建立线程池

以前咱们提到了,不建议使用Executors来建立线程池,而是使用ThreadPoolExecutor进行建立。实际上Executors里建立的也就是ThreadPoolExecutor的实例,具体的看一下Executors类的源码就知道了。

接下来用一个例子演示一下如何经过ThreadPoolExecutor来建立线程池,这里使用7个参数的构造函数,示例代码以下:

package org.zero.concurrency.demo.example.threadpool;

import lombok.extern.slf4j.Slf4j;
import org.springframework.lang.NonNull;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @program: concurrency-demo
 * @description: ThreadPoolExecutor使用示例
 * @author: 01
 * @create: 2018-10-20 16:35
 **/
@Slf4j
public class ThreadPoolExample6 {
    public static void main(String[] args) {
        // 使用ArrayBlockingQueue做为其等待队列
        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(5);
        // 使用自定义的ThreadFactory,目的是设置有意义的的线程名字,方便出错时回溯
        ThreadFactory namedThreadFactory = new MyThreadFactory("test-thread");

        // 建立线程池
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                3, 5, 1, TimeUnit.MINUTES, blockingQueue, namedThreadFactory,
                new ThreadPoolExecutor.AbortPolicy());

        // 执行任务
        poolExecutor.execute(() -> log.info("thread run"));

        // 关闭线程池
        poolExecutor.shutdown();
    }

    private static class MyThreadFactory implements ThreadFactory {
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        private MyThreadFactory(String namePrefix) {
            this.namePrefix = namePrefix + "-";
        }

        @Override
        public Thread newThread(@NonNull Runnable r) {
            Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
            if (t.isDaemon()) {
                t.setDaemon(true);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
}

线程池的建立先介绍到这,其实大部分的建立方式能够参考Executors类的源码,因此这里就不赘述了。

线程池的合理配置:

  • CPU密集型任务,就须要尽可能压榨CPU,参考值能够设置为NCPU+1,即CPU核心数量+1
  • IO密集型任务,参考值能够设置为2*NCPU,即CPU核心数量的2倍

最后须要说一句,线程池虽好但并不是放之四海皆准,咱们应当结合实际业务场景去考虑是否使用线程池。例如当线程池内须要执行的任务很小,小到执行任务的时间和任务调度的时间很接近,这时若使用线程池反而会更慢,由于任务调度和任务管理是须要耗时的。

相关文章
相关标签/搜索