一看就懂的Java线程池分析详解

Java线程池

[toc]java

什么是线程池

线程池就是有N个子线程共同在运行的线程组合面试

举个容易理解的例子:有个线程组合(即线程池,咱能够比喻为一个公司),里面有3个子线程(看成3个员工吧),待命干活。
只要客户告诉他一个任务(好比搬砖),公司就会挑一个员工来作;数据库

若是不少客户都找,3个忙不过来,那公司能够再雇2我的,但本公司运营能力有限,办公室也不大,最多就雇佣5我的,若是还忙不过来,那这些送来的任务就排队了。一件一件作完。编程

ThreadPoolExecutor简介

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,所以若是要透彻地了解Java中的线程池,必须先了解这个类。下面咱们来看一下ThreadPoolExecutor类的具体实现源码:缓存

ThreadPoolExecutor类中提供了四个构造方法:服务器

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
   
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler);
        
}

从上面的代码能够得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,经过观察每一个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工做。多线程

corePoolSize 线程池维护线程的最少数量。并发

须要注意的是在初建立线程池时线程不会当即启动,直到有任务提交才开始启动线程并逐渐时线程数目达到 corePoolSize。若想一开始就建立全部核心线程需调用 prestartAllCoreThreads方法。

maximumPoolSize-池中容许的最大线程数。框架

须要注意的是当核心线程满且阻塞队列也满时才会判断当前线程数是否小于最大线程数,并决定是否建立新线程。

keepAliveTime - 线程池维护线程所容许的空闲时间异步

当线程数大于核心时,多于的空闲线程最多存活时间
默认状况下,只有 当线程池中的线程数大于corePoolSize时,keepAliveTime才会起做用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,若是一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。可是若是调用了 allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起做用,直到线程池中的线程数为0

unit - keepAliveTime 参数的时间单位,有7种取值。

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒

workQueue - 当线程数目超过核心线程数时用于保存任务的队列。主要有3种类型的BlockingQueue可供选择:有界队列,无界队列和同步移交。

ArrayBlockingQueue;  //有界队列
LinkedBlockingQueue; //无界队列
SynchronousQueue;  //同步移交
PriorityBlockingQueue; //一个具备优先级得无限阻塞队列。

threadFactory - 执行程序建立新线程时使用的工厂。

handler - 阻塞队列已满且线程数达到最大值时所采起的饱和策略。java默认提供了4种饱和策略的实现方式:停止、抛弃、抛弃最旧的、调用者运行。

ThreadPoolExecutor.AbortPolicy();  抛出java.util.concurrent.RejectedExecutionException异常 
ThreadPoolExecutor.CallerRunsPolicy();  重试添加当前的任务,他会自动重复调用execute()方法 
ThreadPoolExecutor.DiscardOldestPolicy(); 抛弃旧的任务  
ThreadPoolExecutor.DiscardPolicy();  抛弃当前的任务 
固然也能够根据应用场景须要来实现`RejectedExecutionHandler`接口自定义策略。如记录日志或持久化不能处理的任务。

向上翻源码

从上面给出的ThreadPoolExecutor类的代码能够知道,ThreadPoolExecutor继承了AbstractExecutorService,咱们来看一下AbstractExecutorService的实现:

public abstract class AbstractExecutorService implements ExecutorService {
 
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
    public Future<?> submit(Runnable task) {};
    public <T> Future<T> submit(Runnable task, T result) { };
    public <T> Future<T> submit(Callable<T> task) { };
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
    };
}

AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

咱们接着看ExecutorService接口的实现:

public interface ExecutorService extends Executor {
 
    void shutdown();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;
    
    <T> Future<T> submit(Callable<T> task);
    
    <T> Future<T> submit(Runnable task, T result);
    
    Future<?> submit(Runnable task);
    
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;
    
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;
 
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;
    
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService又是继承了Executor接口,咱们看一下Executor接口的实现:

public interface Executor {
    void execute(Runnable command);
}

到这里,你们应该明白了ThreadPoolExecutorAbstractExecutorServiceExecutorServiceExecutor几个之间的关系了。

Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思能够理解,就是用来执行传进去的任务的

而后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的全部方法;

在ThreadPoolExecutor类中有几个很是重要的方法:

execute()
submit()
shutdown()
shutdownNow()

execute()方法其实是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,经过这个方法能够向线程池提交一个任务,交由线程池去执行。

submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并无对其进行重写,这个方法也是用来向线程池提交任务的,可是它和execute()方法不一样,它可以返回任务执行的结果,去看submit()方法的实现,会发现它实际上仍是调用的execute()方法,只不过它利用了Future来获取任务执行结果。

shutdown()和shutdownNow()是用来关闭线程池的

还有不少其余的方法好比:getQueue() 、getPoolSize()、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,自行查阅API。

线程池的流程分析

线程池的主要工做流程以下图:
Alt text

从上图咱们能够看出,当提交一个新任务到线程池时,线程池的处理流程以下:

  1. 首先线程池判断基本线程池是否已满?没满,建立一个工做线程来执行任务。满了,则进入下个流程。
  2. 其次线程池判断工做队列是否已满?没满,则将新提交的任务存储在工做队列里。满了,则进入下个流程。
  3. 最后线程池判断整个线程池是否已满?没满,则建立一个新的工做线程来执行任务,满了,则交给饱和策略来处理这个任务。

源码分析

线程池执行任务的方法以下:

public void execute(Runnable command) {

        if (command == null)
            throw new NullPointerException();

        //若是线程数小于基本线程数,则建立线程并执行当前任务
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

            //如线程数大于等于基本线程数或线程建立失败,则将当前任务放到工做队列中。
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            
            //若是线程池不处于运行中或任务没法放入队列,而且当前线程数量小于最大容许的线程数量,则建立一个线程执行任务。
            else if (!addIfUnderMaximumPoolSize(command))

                //抛出RejectedExecutionException异常
                reject(command); // is shutdown or saturated
        }

    }

工做线程。线程池建立线程时,会将线程封装成工做线程Worker,Worker在执行完任务后,还会无限循环获取工做队列里的任务来执行。咱们能够从Worker的run方法里看到这点:

public void run() {
     try {
           Runnable task = firstTask;
           firstTask = null;
            while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
            }

      } finally {
             workerDone(this);
      }
}

合理的配置线程池

要想合理的配置线程池,就必须首先分析任务特性,能够从如下几个角度来进行分析:

  • 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
  • 任务的优先级:高,中和低。
  • 任务的执行时间:长,中和短。
  • 任务的依赖性:是否依赖其余系统资源,如数据库链接。
  • 任务性质不一样的任务能够用不一样规模的线程池分开处理。

CPU密集型任务 配置尽量少的线程数量,如配置Ncpu+1个线程的线程池。

IO密集型任务 则因为须要等待IO操做,线程并非一直在执行任务,则配置尽量多的线程,如2*Ncpu

混合型的任务 若是能够拆分,则将其拆分红一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,若是这两个任务执行时间相差太大,则不必进行分解。

咱们能够经过Runtime.getRuntime().availableProcessors()方法得到当前设备的CPU个数。

优先级不一样的任务可使用优先级队列PriorityBlockingQueue来处理。它可让优先级高的任务先获得执行,须要注意的是若是一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

执行时间不一样的任务能够交给不一样规模的线程池来处理,或者也可使用优先级队列,让执行时间短的任务先执行。

依赖数据库链接池的任务,由于线程提交SQL后须要等待数据库返回结果,若是等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU

建议使用有界队列,有界队列能增长系统的稳定性和预警能力,能够根据须要设大一点,好比几千。

别人的例子:

有一次咱们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,经过排查发现是数据库出现了问题,致使执行SQL变得很是缓慢,由于后台任务线程池里的任务全是须要向数据库查询和插入数据的,因此致使线程池里的工做线程所有阻塞住,任务积压在线程池里。若是当时咱们设置成无界队列,线程池的队列就会愈来愈多,有可能会撑满内存,致使整个系统不可用,而不仅是后台任务出现问题。固然咱们的系统全部的任务是用的单独的服务器部署的,而咱们使用不一样规模的线程池跑不一样类型的任务,可是出现这样问题时也会影响到其余任务。

线程池的监控

经过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可使用

--

taskCount:线程池须要执行的任务数量。

completedTaskCount:线程池在运行过程当中已完成的任务数量。小于或等于taskCount。

largestPoolSize:线程池曾经建立过的最大线程数量。经过这个数据能够知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。

getPoolSize:线程池的线程数量。若是线程池不销毁的话,池里的线程不会自动销毁,因此这个大小只增不减

getActiveCount:获取活动的线程数。

经过扩展线程池进行监控。经过继承线程池并重写线程池的beforeExecute,afterExecute,terminated方法,咱们能够在任务执行前,执行后和线程池关闭前干一些事情。

如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。如:

protected void beforeExecute(Thread t, Runnable r) { }

经常使用的几种线程池

什么是 Executor 框架 ? (面试题)

Executor框架在Java 5中被引入,Executor 框架是一个根据一组执行策略调用、调度、执行和控制的异步任务的框架。

Executor 框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable 等。

不过在java doc中,并不提倡咱们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来建立四种线程池

注意在全新的阿里编程规约里面不推荐使用 Executors提供的静态方法建立线程。
  • newCachedThreadPool 是一个可根据须要建立新线程的线程池,建立一个可缓存线程池,若是线程池长度超过处理须要,可灵活回收空闲线程,若无可回收,则新建线程。。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();   
   cachedThreadPool.execute(new Runnable() {  
    public void run() {  
     System.out.println("runing.....");  
    }  
   });   
 }

--

  • newSingleThreadExecutor 建立是一个单线程池,也就是该线程池只有一个线程在工做,全部的任务是串行执行的。若是这个惟一的线程由于异常结束,那么会有一个新的线程来替代它,此线程池保证全部任务的执行顺序按照任务的提交顺序执行。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(););   
   singleThreadExecutor.execute(new Runnable() {  
    public void run() {  
     System.out.println("runing.....");  
    }  
   });   
 }

--

  • newFixedThreadPool 建立固定大小的线程池,每次提交一个任务就建立一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,若是某个线程由于执行异常而结束,那么线程池会补充一个新线程。

定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);   
   fixedThreadPool .execute(new Runnable() {  
    public void run() {  
     System.out.println("runing.....");  
    }  
   });   
 }

--

  • newScheduledThreadPool 建立一个定长线程池,支持定时及周期性任务执行
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);    
   scheduledThreadPool.schedule(new Runnable() {  
    public void run() {  
     System.out.println("runing.....");  
    }  
   }, 3, TimeUnit.SECONDS);   // 表示延迟3秒执行。
 }

使用线程池的风险

虽然线程池是构建多线程应用程序的强大机制,但使用它并非没有风险的

用线程池构建的应用程序容易遭受任何其它多线程应用程序容易遭受的全部并发风险,诸如同步错误和死锁,它还容易遭受特定于线程池的少数其它风险,诸如与池有关的死锁、资源不足和线程泄漏。

死锁

任何多线程应用程序都有死锁风险。当一组进程或线程中的每个都在等待一个只有该组中另外一个进程才能引发的事件时,咱们就说这组进程或线程 死锁了。

死锁的最简单情形是:线程 A 持有对象 X 的独占锁,而且在等待对象 Y 的锁,而线程 B 持有对象 Y 的独占锁,却在等待对象 X 的锁。除非有某种方法来打破对锁的等待(Java 锁定不支持这种方法),不然死锁的线程将永远等下去。

虽然任何多线程程序中都有死锁的风险,但线程池却引入了另外一种死锁可能,在那种状况下,全部池线程都在执行已阻塞的等待队列中另外一任务的执行结果的任务,但这一任务却由于没有未被占用的线程而不能运行。
当线程池被用来实现涉及许多交互对象的模拟,被模拟的对象能够相互发送查询,这些查询接下来做为排队的任务执行,查询对象又同步等待着响应时,会发生这种状况。

资源不足

线程池的一个优势在于:相对于其它替代调度机制言,它们一般执行得很好。但只有恰当地调整了线程池大小时才是这样的。线程消耗包括内存和其它系统资源在内的大量资源。除了 Thread 对象所需的内存以外,每一个线程都须要两个可能很大的执行调用堆栈。除此之外,JVM 可能会为每一个 Java 线程建立一个本机线程,这些本机线程将消耗额外的系统资源。最后,虽然线程之间切换的调度开销很小,但若是有不少线程,环境切换也可能严重地影响程序的性能。

若是线程池太大,那么被那些线程消耗的资源可能严重地影响系统性能。在线程之间进行切换将会浪费时间,并且使用超出比您实际须要的线程可能会引发资源匮乏问题,由于池线程正在消耗一些资源,而这些资源可能会被其它任务更有效地利用。除了线程自身所使用的资源之外,服务请求时所作的工做可能须要其它资源,例如 JDBC 链接、套接字或文件。
这些也都是有限资源,有太多的并发请求也可能引发失效,例如不能分配 JDBC 链接。

并发错误

线程池和其它排队机制依靠使用 wait()notify() 方法,这两个方法都难于使用。若是编码不正确,那么可能丢失通知,致使线程保持空闲状态,尽管队列中有工做要处理。使用这些方法时,必须格外当心。而最好使用现有的、已经知道能工做的实现,例如 util.concurrent 包。

线程泄漏

各类类型的线程池中一个严重的风险是线程泄漏,当从池中除去一个线程以执行一项任务,而在任务完成后该线程却没有返回池时,会发生这种状况。发生线程泄漏的一种情形出如今任务抛出一个 RuntimeException 或一个 Error 时。若是池类没有捕捉到它们,那么线程只会退出而线程池的大小将会永久减小一个。当这种状况发生的次数足够多时,线程池最终就为空,并且系统将中止,由于没有可用的线程来处理任务。

有些任务可能会永远等待某些资源或来自用户的输入,而这些资源又不能保证变得可用,用户可能也已经回家了,诸如此类的任务会永久中止,而这些中止的任务也会引发和线程泄漏一样的问题。若是某个线程被这样一个任务永久地消耗着,那么它实际上就被从池除去了。对于这样的任务,应该要么只给予它们本身的线程,要么只让它们等待有限的时间

请求过载

仅仅是请求就压垮了服务器,这种状况是可能的。在这种情形下,咱们可能不想将每一个到来的请求都排队到咱们的工做队列,由于排在队列中等待执行的任务可能会消耗太多的系统资源并引发资源缺少。在这种情形下决定如何作取决于您本身;在某些状况下,您能够简单地抛弃请求,依靠更高级别的协议稍后重试请求,您也能够用一个指出服务器暂时很忙的响应来拒绝请求。

可选择的阻塞队列BlockingQueue详解

重复看一下新任务进入时线程池的执行策略:

  1. 若是运行的线程少于corePoolSize,则 Executor始终首选添加新的线程,而不进行排队。
  2. 若是运行的线程大于等于 corePoolSize,则 Executor始终首选将请求加入队列,而不添加新的线程。
  3. 若是没法将请求加入队列,则建立新的线程,除非建立此线程超出 maximumPoolSize,在这种状况下,任务将被拒绝。

主要有3种类型的BlockingQueue

无界队列

队列大小无限制,经常使用的为无界的LinkedBlockingQueue,将致使在全部 corePoolSize 线程都忙时新任务在队列中等待。这样,建立的线程就不会超过 corePoolSize

应用场景:当每一个任务彻底独立于其余任务,即任务执行互不影响时,适合于使用无界队列。
例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略容许无界线程具备增加的可能性。

有界队列

经常使用的有两类,一类是遵循FIFO原则的队列如ArrayBlockingQueue与有界的LinkedBlockingQueue,另外一类是优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定。

使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减小内存消耗,下降cpu使用率和上下文切换,可是可能会限制系统吞吐量。

当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,可是可能较难调整和控制。队列大小和最大池大小可能须要相互折衷,使用大型队列和小型池能够最大限度地下降 CPU 使用率、操做系统资源和上下文切换开销,可是可能致使人工下降吞吐量。

若是任务频繁阻塞(例如,若是它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列一般要求较大的池大小,CPU 使用率较高,可是可能遇到不可接受的调度开销,这样也会下降吞吐量。

同步移交

(直接提交) 若是不但愿任务在队列中等待而是但愿将任务直接移交给工做线程,可以使用SynchronousQueue做为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另外一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。

工做队列的默认选项是 SynchronousQueue,此策略能够 避免在处理可能具备内部依赖性的请求集时出现锁

该Queue自己的特性,在某次添加元素后必须等待其余线程取走后才能继续添加

可选择的饱和策略RejectedExecutionHandler详解

JDK主要提供了4种饱和策略供选择。4种策略都作为静态内部类在ThreadPoolExcutor中进行实现。

AbortPolicy停止策略

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }

使用该策略时在饱和时会抛出RejectedExecutionException(继承自RuntimeException),调用者可捕获该异常自行处理。

DiscardPolicy抛弃策略

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }

如代码所示,不作任何处理直接抛弃任务

DiscardOldestPolicy抛弃旧任务策略

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }

如代码,先将阻塞队列中的头元素出队抛弃,再尝试提交任务。若是此时阻塞队列使用PriorityBlockingQueue优先级队列,将会致使优先级最高的任务被抛弃,所以不建议将该种策略配合优先级队列使用。

CallerRunsPolicy调用者运行

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

既不抛弃任务也不抛出异常,直接运行任务的run方法,换言之将任务回退给调用者来直接运行。使用该策略时线程池饱和后将由调用线程池的主线程本身来执行任务,所以在执行任务的这段时间里主线程没法再提交新任务,从而使线程池中工做线程有时间将正在处理的任务处理完成。

相关文章
相关标签/搜索