10问10答:你真的了解线程池吗?

简介: 《Java开发手册》中强调,线程资源必须经过线程池提供,而建立线程池必须使用ThreadPoolExecutor。手册主要强调利用线程池避免两个问题,一是线程过渡切换,二是避免请求过多时形成OOM。可是若是参数配置错误,仍是会引起上面的两个问题。因此本节咱们主要是讨论ThreadPoolExecutor的一些技术细节,而且给出几个经常使用的最佳实践建议。缓存

image.png

做者 | 风楼
来源 | 阿里技术公众号服务器

《Java开发手册》中强调,线程资源必须经过线程池提供,而建立线程池必须使用ThreadPoolExecutor。手册主要强调利用线程池避免两个问题,一是线程过渡切换,二是避免请求过多时形成OOM。可是若是参数配置错误,仍是会引起上面的两个问题。因此本节咱们主要是讨论ThreadPoolExecutor的一些技术细节,而且给出几个经常使用的最佳实践建议。多线程

我在查找资料的过程当中,发现有些问题存在争议。后面发现,一部分缘由是由于不一样JDK版本的现实是有差别的。所以,下面的分析是基于当下最经常使用的版本JDK1.8,而且对于存在争议的问题,咱们分析源码,源码才是最准确的。并发

1 corePoolSize=0会怎么样

这是一个争议点。我发现大部分博文,不管是国内的仍是国外的,都是这样回答这个问题的:框架

  • 提交任务后,先判断当前池中线程数是否小于corePoolSize,若是小于,则建立新线程执行这个任务。
  • 否者,判断等待队列是否已满,若是没有满,则添加到等待队列。
  • 否者,判断当前池中线程数是否大于maximumPoolSize,若是大于则拒绝。
  • 否者,建立一个新的线程执行这个任务。

按照上面的描述,若是corePoolSize=0,则会判断等待队列的容量,若是还有容量,则排队,而且不会建立新的线程。异步

—— 但其实,这是老版本的实现方式,从1.6以后,实现方式就变了。咱们直接看execute的源码(submit也依赖它),我备注出了关键一行:ide

int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 注意这一行代码,添加到等待队列成功后,判断当前池内线程数是否为0,若是是则建立一个firstTask为null的worker,这个worker会从等待队列中获取任务并执行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
  • 线程池提交任务后,首先判断当前池中线程数是否小于corePoolSize。
  • 若是小于则尝试建立新的线程执行该任务;不然尝试添加到等待队列。
  • 若是添加队列成功,判断当前池内线程数是否为0,若是是则建立一个firstTask为null的worker,这个worker会从等待队列中获取任务并执行。
  • 若是添加到等待队列失败,通常是队列已满,才会再尝试建立新的线程。
  • 但在建立以前须要与maximumPoolSize比较,若是小于则建立成功。
  • 不然执行拒绝策略。

函数

上述问题需区分JDK版本。在1.6版本以后,若是corePoolSize=0,提交任务时若是线程池为空,则会当即建立一个线程来执行任务(先排队再获取);若是提交任务的时候,线程池不为空,则先在等待队列中排队,只有队列满了才会建立新线程。工具

因此,优化在于,在队列没有满的这段时间内,会有一个线程在消费提交的任务;1.6以前的实现是,必须等队列满了以后,才开始消费。优化

2 线程池建立以后,会当即建立核心线程么

以前有人问过我这个问题,由于他发现应用中有些Bean建立了线程池,可是这个Bean通常状况下用不到,因此咨询我是否须要把这个线程池注释掉,以减小应用运行时的线程数(该应用运行时线程过多。)

不会。从上面的源码能够看出,在刚刚建立ThreadPoolExecutor的时候,线程并不会当即启动,而是要等到有任务提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads事先启动核心线程。

  • prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.
  • prestartAllCoreThreads:Starts all core threads.

3 核心线程永远不会销毁么

这个问题有点tricky。首先咱们要明确一下概念,虽然在JavaDoc中也使用了“core/non-core threads”这样的描述,但其实这是一个动态的概念,JDK并无给一部分线程打上“core”的标记,作什么特殊化的处理。这个问题我认为想要探讨的是闲置线程终结策略的问题。

在JDK1.6以前,线程池会尽可能保持corePoolSize个核心线程,即便这些线程闲置了很长时间。这一点曾被开发者诟病,因此从JDK1.6开始,提供了方法allowsCoreThreadTimeOut,若是传参为true,则容许闲置的核心线程被终止。

请注意这种策略和corePoolSize=0的区别。我总结的区别是:

  • corePoolSize=0:在通常状况下只使用一个线程消费任务,只有当并发请求特别多、等待队列都满了以后,才开始用多线程。
  • allowsCoreThreadTimeOut=true && corePoolSize>1:在通常状况下就开始使用多线程(corePoolSize个),当并发请求特别多,等待队列都满了以后,继续加大线程数。可是当请求没有的时候,容许核心线程也终止。

因此corePoolSize=0的效果,基本等同于allowsCoreThreadTimeOut=true && corePoolSize=1,但实现细节其实不一样。

在JDK1.6以后,若是allowsCoreThreadTimeOut=true,核心线程也能够被终止。

4 如何保证线程不被销毁

首先咱们要明确一下线程池模型。线程池有个内部类Worker,它实现了Runnable接口,首先,它本身要run起来。而后它会在合适的时候获取咱们提交的Runnable任务,而后调用任务的run()接口。一个Worker不终止的话能够不断执行任务。

咱们前面说的“线程池中的线程”,其实就是Worker;等待队列中的元素,是咱们提交的Runnable任务。

每个Worker在建立出来的时候,会调用它自己的run()方法,实现是runWorker(this),这个实现的核心是一个while循环,这个循环不结束,Worker线程就不会终止,就是这个基本逻辑。

  • 在这个while条件中,有个getTask()方法是核心中的核心,它所作的事情就是从等待队列中取出任务来执行:
  • 若是没有达到corePoolSize,则建立的Worker在执行完它承接的任务后,会用workQueue.take()取任务、注意,这个接口是阻塞接口,若是取不到任务,Worker线程一直阻塞。
  • 若是超过了corePoolSize,或者allowCoreThreadTimeOut,一个Worker在空闲了以后,会用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)取任务。注意,这个接口只阻塞等待keepAliveTime时间,超过这个时间返回null,则Worker的while循环执行结束,则被终止了。
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 看这里,核心逻辑在这里
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 注意,核心中的核心在这里
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

实现方式很是巧妙,核心线程(Worker)即便一直空闲也不终止,是经过workQueue.take()实现的,它会一直阻塞到从等待队列中取到新的任务。非核心线程空闲指定时间后终止是经过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)实现的,一个空闲的Worker只等待keepAliveTime,若是尚未取到任务则循环终止,线程也就运行结束了。

引伸思考

Worker自己就是个线程,它再调用咱们传入的Runnable.run(),会启动一个子线程么?若是你尚未答案,再回想一下Runnable和Thread的关系。

5 空闲线程过多会有什么问题

笼统地回答是会占用内存,咱们分析一下占用了哪些内存。首先,比较普通的一部分,一个线程的内存模型:

  • 虚拟机栈
  • 本地方法栈
  • 程序计数器

我想额外强调是下面这几个内存占用,须要当心:

  • ThreadLocal:业务代码是否使用了ThreadLocal?就算没有,Spring框架中也大量使用了ThreadLocal,你所在公司的框架可能也是同样。
  • 局部变量:线程处于阻塞状态,确定还有栈帧没有出栈,栈帧中有局部变量表,凡是被局部变量表引用的内存都不能回收。因此若是这个线程建立了比较大的局部变量,那么这一部份内存没法GC。
  • TLAB机制:若是你的应用线程数处于高位,那么新的线程初始化可能由于Eden没有足够的空间分配TLAB而触发YoungGC。

线程池保持空闲的核心线程是它的默认配置,通常来说是没有问题的,由于它占用的内存通常不大。怕的就是业务代码中使用ThreadLocal缓存的数据过大又不清理。

若是你的应用线程数处于高位,那么须要观察一下YoungGC的状况,估算一下Eden大小是否足够。若是不够的话,可能要谨慎地建立新线程,而且让空闲的线程终止;必要的时候,可能须要对JVM进行调参。

6 keepAliveTime=0会怎么样

这也是个争议点。有的博文说等于0表示空闲线程永远不会终止,有的说表示执行完马上终止。还有的说等于-1表示空闲线程永远不会终止。其实稍微看一下源码知道了,这里我直接抛出答案。

在JDK1.8中,keepAliveTime=0表示非核心线程执行完马上终止。

默认状况下,keepAliveTime小于0,初始化的时候才会报错;但若是allowsCoreThreadTimeOut,keepAliveTime必须大于0,否则初始化报错。

7 怎么进行异常处理

不少代码的写法,咱们都习惯按照常见范式去编写,而没有去思考为何。好比:

  • 若是咱们使用execute()提交任务,咱们通常要在Runable任务的代码加上try-catch进行异常处理。
  • 若是咱们使用submit()提交任务,咱们通常要在主线程中,对Future.get()进行try-catch进行异常处理。

—— 可是在上面,我提到过,submit()底层实现依赖execute(),二者应该统一呀,为何有差别呢?下面再扒一扒submit()的源码,它的实现蛮有意思。

首先,ThreadPoolExecutor中没有submit的代码,而是在它的父类AbstractExecutorService中,有三个submit的重载方法,代码很是简单,关键代码就两行:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

正是由于这三个重载方法,都调用了execute,因此我才说submit底层依赖execute。经过查看这里execute的实现,咱们不难发现,它就是ThreadPoolExecutor中的实现,因此,形成submit和execute的差别化的代码,不在这。那么形成差别的必定在newTaskFor方法中。这个方法也就new了一个FutureTask而已,FutureTask实现RunnableFuture接口,RunnableFuture接口继承Runnable接口和Future接口。而Callable只是FutureTask的一个成员变量。

因此讲到这里,就有另外一个Java基础知识点:Callable和Future的关系。咱们通常用Callable编写任务代码,Future是异步返回对象,经过它的get方法,阻塞式地获取结果。FutureTask的核心代码就是实现了Future接口,也就是get方法的实现:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        // 核心代码
        s = awaitDone(false, 0L);
    return report(s);
}

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 死循环
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        // 只有任务的状态是’已完成‘,才会跳出死循环
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

get的核心实现是有个awaitDone方法,这是一个死循环,只有任务的状态是“已完成”,才会跳出死循环;不然会依赖UNSAFE包下的LockSupport.park原语进行阻塞,等待LockSupport.unpark信号量。而这个信号量只有当运行结束得到结果、或者出现异常的状况下,才会发出来。分别对应方法set和setException。这就是异步执行、阻塞获取的原理,扯得有点远了。

回到最初咱们的疑问,为何submit以后,经过get方法能够获取到异常?缘由是FutureTask有一个Object类型的outcome成员变量,用来记录执行结果。这个结果能够是传入的泛型,也能够是Throwable异常:

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

// get方法中依赖的,报告执行结果

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

FutureTask的另外一个巧妙的地方就是借用RunnableAdapter内部类,将submit的Runnable封装成Callable。因此就算你submit的是Runnable,同样能够用get获取到异常。

  • 不管是用execute仍是submit,均可以本身在业务代码上加try-catch进行异常处理。我通常喜欢使用这种方式,由于我喜欢对不一样业务场景的异常进行差别化处理,至少打不同的日志吧。
  • 若是是execute,还能够自定义线程池,继承ThreadPoolExecutor并复写其afterExecute(Runnable r, Throwable t)方法。
  • 或者实现Thread.UncaughtExceptionHandler接口,实现void uncaughtException(Thread t, Throwable e);方法,并将该handler传递给线程池的ThreadFactory。
  • 可是注意,afterExecute和UncaughtExceptionHandler都不适用submit。由于经过上面的FutureTask.run()不难发现,它本身对Throwable进行了try-catch,封装到了outcome属性,因此底层方法execute的Worker是拿不到异常信息的。

8 线程池需不须要关闭

通常来说,线程池的生命周期跟随服务的生命周期。若是一个服务(Service)中止服务了,那么须要调用shutdown方法进行关闭。因此ExecutorService.shutdown在Java以及一些中间件的源码中,是封装在Service的shutdown方法内的。

若是是Server端不重启就不中止提供服务,我认为是不须要特殊处理的。

9 shutdown和shutdownNow的区别

shutdown => 平缓关闭,等待全部已添加到线程池中的任务执行完再关闭。
shutdownNow => 马上关闭,中止正在执行的任务,并返回队列中未执行的任务。
原本想分析一下二者的源码的,可是发现本文的篇幅已通过长了,源码也贴了很多。感兴趣的朋友本身看一下便可。

10 Spring中有哪些和ThreadPoolExecutor相似的工具

image.png

这里我想着重强调的就是SimpleAsyncTaskExecutor,Spring中使用的@Async注解,底层就是基于SimpleAsyncTaskExecutor去执行任务,只不过它不是线程池,而是每次都新开一个线程。

另外想要强调的是Executor接口。Java初学者容易想固然的觉得Executor结尾的类就是一个线程池,而上面的都是反例。咱们能够在JDK的execute方法上看到这个注释:

/**

  • Executes the given command at some time in the future. The command
  • may execute in a new thread, in a pooled thread, or in the calling
  • thread, at the discretion of the {@code Executor} implementation.
    */
    因此,它的职责并非提供一个线程池的接口,而是提供一个“未来执行命令”的接口。真正能表明线程池意义的,是ThreadPoolExecutor类,而不是Executor接口。

最佳实践总结
【强制】使用ThreadPoolExecutor的构造函数声明线程池,避免使用Executors类的 newFixedThreadPool和newCachedThreadPool。
【强制】 建立线程或线程池时请指定有意义的线程名称,方便出错时回溯。即threadFactory参数要构造好。
【建议】建议不一样类别的业务用不一样的线程池。
【建议】CPU密集型任务(N+1):这种任务消耗的主要是CPU资源,能够将线程数设置为N(CPU核心数)+1,比CPU核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它缘由致使的任务暂停而带来的影响。一旦任务暂停,CPU就会处于空闲状态,而在这种状况下多出来的一个线程就能够充分利用CPU的空闲时间。
【建议】I/O密集型任务(2N):这种任务应用起来,系统会用大部分的时间来处理I/O交互,而线程在处理I/O的时间段内不会占用CPU来处理,这时就能够将CPU交出给其它线程使用。所以在I/O密集型任务的应用中,咱们能够多配置一些线程,具体的计算方法是2N。
【建议】workQueue不要使用无界队列,尽可能使用有界队列。避免大量任务等待,形成OOM。
【建议】若是是资源紧张的应用,使用allowsCoreThreadTimeOut能够提升资源利用率。
【建议】虽然使用线程池有多种异常处理的方式,但在任务代码中,使用try-catch最通用,也能给不一样任务的异常处理作精细化。
【建议】对于资源紧张的应用,若是担忧线程池资源使用不当,能够利用ThreadPoolExecutor的API实现简单的监控,而后进行分析和优化。
image.png

线程池初始化示例:

private static final ThreadPoolExecutor pool;

static {
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build();
    pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512),
        threadFactory, new ThreadPoolExecutor.AbortPolicy());
    pool.allowCoreThreadTimeOut(true);
}
  • threadFactory:给出带业务语义的线程命名。
  • corePoolSize:快速启动4个线程处理该业务,是足够的。
  • maximumPoolSize:IO密集型业务,个人服务器是4C8G的,因此4*2=8。
  • keepAliveTime:服务器资源紧张,让空闲的线程快速释放。
  • pool.allowCoreThreadTimeOut(true):也是为了在能够的时候,让线程释放,释放资源。
  • workQueue:一个任务的执行时长在100~300ms,业务高峰期8个线程,按照10s超时(已经很高了)。10s钟,8个线程,能够处理10 1000ms / 200ms 8 = 400个任务左右,往上再取一点,512已经不少了。
  • handler:极端状况下,一些任务只能丢弃,保护服务端。

原文连接

本文为阿里云原创内容,未经容许不得转载。

相关文章
相关标签/搜索