Java线程池(ThreadPoolExecutor)原理分析与使用

在咱们的开发中“池”的概念并不罕见,有数据库链接池、线程池、对象池、常量池等等。下面咱们主要针对线程池来一步一步揭开线程池的面纱。javascript

使用线程池的好处

一、下降资源消耗
能够重复利用已建立的线程下降线程建立和销毁形成的消耗。
二、提升响应速度
当任务到达时,任务能够不须要等到线程建立就能当即执行。
三、提升线程的可管理性
线程是稀缺资源,若是无限制地建立,不只会消耗系统资源,还会下降系统的稳定性,使用线程池能够进行统一分配、调优和监控java

线程池的工做原理

首先咱们看下当一个新的任务提交到线程池以后,线程池是如何处理的
一、线程池判断核心线程池里的线程是否都在执行任务。若是不是,则建立一个新的工做线程来执行任务。若是核心线程池里的线程都在执行任务,则执行第二步。
二、线程池判断工做队列是否已经满。若是工做队列没有满,则将新提交的任务存储在这个工做队列里进行等待。若是工做队列满了,则执行第三步
三、线程池判断线程池的线程是否都处于工做状态。若是没有,则建立一个新的工做线程来执行任务。若是已经满了,则交给饱和策略来处理这个任务web

线程池饱和策略

这里提到了线程池的饱和策略,那咱们就简单介绍下有哪些饱和策略:
AbortPolicy
为java线程池默认的阻塞策略,不执行此任务,并且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute须要try catch,不然程序会直接退出。
DiscardPolicy
直接抛弃,任务不执行,空方法
DiscardOldestPolicy
从队列里面抛弃head的一个任务,并再次execute 此task。
CallerRunsPolicy
在调用execute的线程里面执行此command,会阻塞入口
用户自定义拒绝策略(最经常使用)
实现RejectedExecutionHandler,并本身定义策略模式
下咱们以ThreadPoolExecutor为例展现下线程池的工做流程图
这里写图片描述
这里写图片描述
一、若是当前运行的线程少于corePoolSize,则建立新线程来执行任务(注意,执行这一步骤须要获取全局锁)。
二、若是运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
三、若是没法将任务加入BlockingQueue(队列已满),则在非corePool中建立新的线程来处理任务(注意,执行这一步骤须要获取全局锁)。
四、若是建立新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用
RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采起上述步骤的整体设计思路,是为了在执行execute()方法时,尽量地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热以后(当前运行的线程数大于等于corePoolSize),几乎全部的execute()方法调用都是执行步骤2,而步骤2不须要获取全局锁。数据库

关键方法源码分析

咱们看看核心方法添加到线程池方法execute的源码以下:
一、若是当前运行的线程少于corePoolSize,则建立新线程来执行任务(注意,执行这一步骤须要获取全局锁)。
二、若是运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
三、若是没法将任务加入BlockingQueue(队列已满),则在非corePool中建立新的线程来处理任务(注意,执行这一步骤须要获取全局锁)。
四、若是建立新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用
RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采起上述步骤的整体设计思路,是为了在执行execute()方法时,尽量地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热以后(当前运行的线程数大于等于corePoolSize),几乎全部的execute()方法调用都是执行步骤2,而步骤2不须要获取全局锁。安全

关键方法源码分析

咱们看看核心方法添加到线程池方法execute的源码以下:服务器

//
     //Executes the given task sometime in the future. The task
     //may execute in a new thread or in an existing pooled thread.
     //
     // If the task cannot be submitted for execution, either because this
     // executor has been shutdown or because its capacity has been reached,
     // the task is handled by the current {@code RejectedExecutionHandler}.
     //
     // @param command the task to execute
     // @throws RejectedExecutionException at discretion of
     // {@code RejectedExecutionHandler}, if the task
     // cannot be accepted for execution
     // @throws NullPointerException if {@code command} is null
     //
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //
         // Proceed in 3 steps:
         //
         // 1. If fewer than corePoolSize threads are running, try to
         // start a new thread with the given command as its first
         // task. The call to addWorker atomically checks runState and
         // workerCount, and so prevents false alarms that would add
         // threads when it shouldn't, by returning false.
         // 翻译以下:
         // 判断当前的线程数是否小于corePoolSize若是是,使用入参任务经过addWord方法建立一个新的线程,
         // 若是能完成新线程建立exexute方法结束,成功提交任务
         // 2. If a task can be successfully queued, then we still need
         // to double-check whether we should have added a thread
         // (because existing ones died since last checking) or that
         // the pool shut down since entry into this method. So we
         // recheck state and if necessary roll back the enqueuing if
         // stopped, or start a new thread if there are none.
         // 翻译以下:
         // 在第一步没有完成任务提交;状态为运行而且可否成功加入任务到工做队列后,再进行一次check,若是状态
         // 在任务加入队列后变为了非运行(有多是在执行到这里线程池shutdown了),非运行状态下固然是须要
         // reject;而后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;
         // 3. If we cannot queue task, then we try to add a new
         // thread. If it fails, we know we are shut down or saturated
         // and so reject the task.
         // 翻译以下:
         // 若是不能加入任务到工做队列,将尝试使用任务新增一个线程,若是失败,则是线程池已经shutdown或者线程池
         // 已经达到饱和状态,因此reject这个他任务
         //
        int c = ctl.get();
        // 工做线程数小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 直接启动新线程,true表示会再次检查workerCount是否小于corePoolSize
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 若是工做线程数大于等于核心线程数
        // 线程的的状态未RUNNING而且队列notfull
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次检查线程的运行状态,若是不是RUNNING直接从队列中移除
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                // 移除成功,拒绝该非运行的任务
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // 防止了SHUTDOWN状态下没有活动线程了,可是队列里还有任务没执行这种特殊状况。
                // 添加一个null任务是由于SHUTDOWN状态下,线程池再也不接受新任务
                addWorker(null, false);
        }
        // 若是队列满了或者是非运行的任务都拒绝执行
        else if (!addWorker(command, false))
            reject(command);
    }

下面咱们继续看看addWorker是如何实现的:ide

private boolean addWorker(Runnable firstTask, boolean core) {
        // java标签
        retry:
        // 死循环
        for (;;) {
            int c = ctl.get();
            // 获取当前线程状态
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            // 这个逻辑判断有点绕能够改为 
            // rs >= shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty())
            // 逻辑判断成立能够分为如下几种状况均不接受新任务
            // 一、rs > shutdown:--不接受新任务
            // 二、rs >= shutdown && firstTask != null:--不接受新任务
            // 三、rs >= shutdown && workQueue.isEmppty:--不接受新任务
            // 逻辑判断不成立
            // 一、rs==shutdown&&firstTask != null:此时不接受新任务,可是仍会执行队列中的任务
            // 二、rs==shotdown&&firstTask == null:会执行addWork(null,false)
            // 防止了SHUTDOWN状态下没有活动线程了,可是队列里还有任务没执行这种特殊状况。
            // 添加一个null任务是由于SHUTDOWN状态下,线程池再也不接受新任务
            if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
                return false;
            // 死循环
            // 若是线程池状态为RUNNING而且队列中还有须要执行的任务
            for (;;) {
                // 获取线程池中线程数量
                int wc = workerCountOf(c);
                // 若是超出容量或者最大线程池容量不在接受新任务
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 线程安全增长工做线程数
                if (compareAndIncrementWorkerCount(c))
                    // 跳出retry
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 若是线程池状态发生变化,从新循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // 走到这里说明工做线程数增长成功
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);
                    // RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 检查线程状态
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 将新启动的线程添加到线程池中
                        workers.add(w);
                        // 更新线程池线程数且不超过最大值
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 启动新添加的线程,这个线程首先执行firstTask,而后不停的从队列中取任务执行
                if (workerAdded) {
                    //执行ThreadPoolExecutor的runWoker方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 线程启动失败,则从wokers中移除w并递减wokerCount
            if (! workerStarted)
                // 递减wokerCount会触发tryTerminate方法
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker以后是runWorker,第一次启动会执行初始化传进来的任务firstTask;而后会从workQueue中取任务执行,若是队列为空则等待keepAliveTime这么长时间svg

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 容许中断
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 若是getTask返回null那么getTask中会将workerCount递减,若是异常了这个递减操做会在processWorkerExit中处理
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

咱们看下getTask是如何执行的oop

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        // 死循环
        retry: for (;;) {
            // 获取线程池状态
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            // 1.rs > SHUTDOWN 因此rs至少等于STOP,这时再也不处理队列中的任务
            // 2.rs = SHUTDOWN 因此rs>=STOP确定不成立,这时还须要处理队列中的任务除非队列为空
            // 这两种状况都会返回null让runWoker退出while循环也就是当前线程结束了,因此必需要decrement
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 递减workerCount值
                decrementWorkerCount();
                return null;
            }
            // 标记从队列中取任务时是否设置超时时间
            boolean timed; // Are workers subject to culling?
            // 1.RUNING状态
            // 2.SHUTDOWN状态,但队列中还有任务须要执行
            for (;;) {
                int wc = workerCountOf(c);
                // 1.core thread容许被超时,那么超过corePoolSize的的线程一定有超时
                // 2.allowCoreThreadTimeOut == false && wc >
                // corePoolSize时,通常都是这种状况,core thread即便空闲也不会被回收,只要超过的线程才会
                timed = allowCoreThreadTimeOut || wc > corePoolSize;
                // 从addWorker能够看到通常wc不会大于maximumPoolSize,因此更关心后面半句的情形:
                // 1. timedOut == false 第一次执行循环, 从队列中取出任务不为null方法返回 或者
                // poll出异常了重试
                // 2.timeOut == true && timed ==
                // false:看后面的代码workerQueue.poll超时时timeOut才为true,
                // 而且timed要为false,这两个条件相悖不可能同时成立(既然有超时那么timed确定为true)
                // 因此超时不会继续执行而是return null结束线程。
                if (wc <= maximumPoolSize && !(timedOut && timed))
                    break;
                // workerCount递减,结束当前thread
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get(); // Re-read ctl
                // 须要从新检查线程池状态,由于上述操做过程当中线程池可能被SHUTDOWN
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
            try {
                // 1.以指定的超时时间从队列中取任务
                // 2.core thread没有超时
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;// 超时
            } catch (InterruptedException retry) {
                timedOut = false;// 线程被中断重试
            }
        }
    }

下面咱们看下processWorkerExit是如何工做的源码分析

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 正常的话再runWorker的getTask方法workerCount已经被减一了
        if (completedAbruptly)
            decrementWorkerCount();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 累加线程的completedTasks
            completedTaskCount += w.completedTasks;
            // 从线程池中移除超时或者出现异常的线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        // 尝试中止线程池
        tryTerminate();
        int c = ctl.get();
        // runState为RUNNING或SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 线程不是异常结束
            if (!completedAbruptly) {
                // 线程池最小空闲数,容许core thread超时就是0,不然就是corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 若是min == 0可是队列不为空要保证有1个线程来执行队列中的任务
                if (min == 0 && !workQueue.isEmpty())
                    min = 1;
                // 线程池还不为空那就不用担忧了
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 1.线程异常退出
            // 2.线程池为空,可是队列中还有任务没执行,看addWoker方法对这种状况的处理
            addWorker(null, false);
        }
    }

tryTerminate
processWorkerExit方法中会尝试调用tryTerminate来终止线程池。这个方法在任何可能致使线程池终止的动做后执行:好比减小wokerCount或SHUTDOWN状态下从队列中移除任务。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 如下状态直接返回:
            // 1.线程池还处于RUNNING状态
            // 2.SHUTDOWN状态可是任务队列非空
            // 3.runState >= TIDYING 线程池已经中止了或在中止了
            if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
                return;
            // 只能是如下情形会继续下面的逻辑:结束线程池。
            // 1.SHUTDOWN状态,这时再也不接受新任务并且任务队列也空了
            // 2.STOP状态,当调用了shutdownNow方法
            // workerCount不为0则还不能中止线程池,并且这时线程都处于空闲等待的状态
            // 须要中断让线程“醒”过来,醒过来的线程才能继续处理shutdown的信号。
            if (workerCountOf(c) != 0) { // Eligible to terminate
                // runWoker方法中w.unlock就是为了能够被中断,getTask方法也处理了中断。
                // ONLY_ONE:这里只须要中断1个线程去处理shutdown信号就能够了。
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 进入TIDYING状态
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 子类重载:一些资源清理工做
                        terminated();
                    } finally {
                        // TERMINATED状态
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 继续awaitTermination
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

shutdown这个方法会将runState置为SHUTDOWN,会终止全部空闲的线程。shutdownNow方法将runState置为STOP。和shutdown方法的区别,这个方法会终止全部的线程。主要区别在于shutdown调用的是interruptIdleWorkers这个方法,而shutdownNow实际调用的是Worker类的interruptIfStarted方法:
他们的实现以下:

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 线程池状态设为SHUTDOWN,若是已经至少是这个状态那么则直接返回
            advanceRunState(SHUTDOWN);
            // 注意这里是中断全部空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit →
            // tryTerminate方法中会保证队列中剩余的任务获得执行。
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // STOP状态:再也不接受新任务且再也不执行队列中的任务。
        advanceRunState(STOP);
        // 中断全部线程
        interruptWorkers();
        // 返回队列中尚未被执行的任务。
        tasks = drainQueue();
    }
    finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // w.tryLock能获取到锁,说明该线程没有在运行,由于runWorker中执行任务会先lock,
            // 所以保证了中断的确定是空闲的线程。
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    }
    finally {
        mainLock.unlock();
    }
}
void interruptIfStarted() {
    Thread t;
    // 初始化时state == -1
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

线程池的使用

线程池的建立

咱们能够经过ThreadPoolExecutor来建立一个线程池

/** * @param corePoolSize 线程池基本大小,核心线程池大小,活动线程小于corePoolSize则直接建立,大于等于则先加到workQueue中, * 队列满了才建立新的线程。当提交一个任务到线程池时,线程池会建立一个线程来执行任务,即便其余空闲的基本线程可以执行新任务也会建立线程, * 等到须要执行的任务数大于线程池基本大小时就再也不建立。若是调用了线程池的prestartAllCoreThreads()方法, * 线程池会提早建立并启动全部基本线程。 * @param maximumPoolSize 最大线程数,超过就reject;线程池容许建立的最大线程数。若是队列满了, * 而且已建立的线程数小于最大线程数,则线程池会再建立新的线程执行任务 * @param keepAliveTime * 线程池的工做线程空闲后,保持存活的时间。因此,若是任务不少,而且每一个任务执行的时间比较短,能够调大时间,提升线程的利用率 * @param unit 线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、 * 毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒) * @param workQueue 工做队列,线程池中的工做线程都是从这个工做队列源源不断的获取任务进行执行 */
    public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue) {
        // threadFactory用于设置建立线程的工厂,能够经过线程工厂给每一个建立出来的线程设置更有意义的名字
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), defaultHandler);
    }

向线程池提交任务

可使用两个方法向线程池提交任务,分别为execute()和submit()方法。execute()方法用于提交不须要返回值的任务,因此没法判断任务是否被线程池执行成功。经过如下代码可知execute()方法输入的任务是一个Runnable类的实例。

threadsPool.execute(new Runnable() {
        @Override
        public void run() {
        }
    });

submit()方法用于提交须要返回值的任务。线程池会返回一个future类型的对象,经过这个future对象能够判断任务是否执行成功,而且能够经过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后当即返回,这时候有可能任务没有执行完。

Future<Object> future = executor.submit(harReturnValuetask);
  try
    {
        Object s = future.get();
    }catch(
    InterruptedException e)
    {
        // 处理中断异常
    }catch(
    ExecutionException e)
    {
        // 处理没法执行任务异常
    }finally
    {
        // 关闭线程池
        executor.shutdown();
    }

关闭线程池

能够经过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工做线程,而后逐个调用线程的interrupt方法来中断线程,因此没法响应中断的任务可能永远没法终止。可是它们存在必定的区别,shutdownNow首先将线程池的状态设置成STOP,而后尝试中止全部的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,而后中断全部没有正在执行任务的线程。
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当全部的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪种方法来关闭线程池,应该由提交到线程池的任务特性决定,一般调用shutdown方法来关闭线程池,若是任务不必定要执行完,则能够调用shutdownNow方法。

合理的配置线程池

要想合理地配置线程池,就必须首先分析任务特性,能够从如下几个角度来分析。
一、任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
二、任务的优先级:高、中和低。
三、任务的执行时间:长、中和短。
四、任务的依赖性:是否依赖其余系统资源,如数据库链接。
性质不一样的任务能够用不一样规模的线程池分开处理。CPU密集型任务应配置尽量小的线程,如配置Ncpu+1个线程的线程池。因为IO密集型任务线程并非一直在执行任务,则应配置尽量多的线程,如2*Ncpu。混合型的任务,若是能够拆分,将其拆分红一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。若是这两个任务执行时间相差太大,则不必进行分解。能够经过Runtime.getRuntime().availableProcessors()方法得到当前设备的CPU个数。优先级不一样的任务可使用优先级队列PriorityBlockingQueue来处理。它可让优先级高的任务先执行
若是一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。执行时间不一样的任务能够交给不一样规模的线程池来处理,或者可使用优先级队列,让
执行时间短的任务先执行。依赖数据库链接池的任务,由于线程提交SQL后须要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。
建议使用有界队列。有界队列能增长系统的稳定性和预警能力,能够根据须要设大一点儿,好比几千。有时候咱们系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,经过排查发现是数据库出现了问题,致使执行SQL变得很是缓慢,由于后台任务线程池里的任务全是须要向数据库查询和插入数据的,因此致使线程池里的工做线程所有阻塞,任务积压在线程池里。若是当时咱们设置成无界队列,那么线程池的队列就会愈来愈多,有可能会撑满内存,致使整个系统不可用,而不仅是后台任务出现问题。固然,咱们的系统全部的任务是用单独的服务器部署的,咱们使用不一样规模的线程池完成不一样类型的任务,可是出现这样问题时也会影响到其余任务。

线程池的监控

若是在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,能够根据线程池的使用情况快速定位问题。能够经过线程池提供的参数进行监控,在监控线程池的时候可使用如下属性 ·taskCount:线程池须要执行的任务数量。 ·completedTaskCount:线程池在运行过程当中已完成的任务数量,小于或等于taskCount。 ·largestPoolSize:线程池里曾经建立过的最大线程数量。经过这个数据能够知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。 ·getPoolSize:线程池的线程数量。若是线程池不销毁的话,线程池里的线程不会自动销毁,因此这个大小只增不减。 ·getActiveCount:获取活动的线程数。 经过扩展线程池进行监控。能够经过继承线程池来自定义线程池,重写线程池的beforeExecute、afterExecute和terminated方法,也能够在任务执行前、执行后和线程池关闭前执 行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。