Java并发系列 — 线程池

Java中的线程池是运用场景最多的并发框架,几乎全部须要异步或并发执行任务的程序均可以使用线程池。java

在开发过程当中,合理地使用线程池可以带来3个好处:git

  • 下降资源消耗:经过重复利用已建立的线程下降线程建立和销毁形成的消耗。github

  • 提升响应速度:当任务到达时,任务能够不须要等到线程建立就能当即执行。数据库

  • 提升线程的可管理性:线程是稀缺资源,若是无限制地建立,不只会消耗系统资源,还会下降系统的稳定性,使用线程池能够进行统一分配、调优和监控。可是,要作到合理利用线程池,必须对其实现原理了如指掌。数组

如何建立线程池

一、使用Executors工厂类提供的静态方法来建立线程池,方法以下:服务器

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}
复制代码

二、经过ThreadPoolExecutor的构造函数建立,代码以下:多线程

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
复制代码

参数说明:并发

  • 一、corePoolSize框架

    线程池中的核心线程数,当提交一个任务时,线程池建立一个新线程执行任务,直到当前线程数等于 corePoolSize;若是当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;若是执行了线程池的prestartAllCoreThreads()方法,线程池会提早建立并启动全部核心线程。异步

  • 二、maximumPoolSize

    线程池中容许的最大线程数。若是当前阻塞队列满了,且继续提交任务,则建立新的线程执行任务,前提是当前线程数小于maximumPoolSize。

  • 三、keepAliveTime

    线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认状况下,该参数只在线程数大于corePoolSize时才有用。

  • 四、unit

    keepAliveTime的单位

  • 五、workQueue

    用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口。

    在JDK中提供了以下阻塞队列: ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;

    LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量一般要高于ArrayBlockingQuene;

    SynchronousQuene:一个不存储元素的阻塞队列,每一个插入操做必须等到另外一个线程调用移除操做,不然插入操做一直处于阻塞状态,吞吐量一般要高于LinkedBlockingQuene;

    priorityBlockingQuene:具备优先级的无界阻塞队列;

  • 六、threadFactory

    建立线程的工厂,经过自定义的线程工厂能够给每一个新建的线程设置一个具备识别度的线程名。

  • 七、handler

    线程池的饱和策略,当阻塞队列满了,且没有空闲的工做线程,若是继续提交任务,必须采起一种策略处理该任务,线程池提供了4种策略:

    1. AbortPolicy:直接抛出异常,默认策略;
    2. CallerRunsPolicy:用调用者所在的线程来执行任务;
    3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    4. DiscardPolicy:直接丢弃任务;

    固然也能够根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

使用示例

public class ThreadPoolExecutorExample {

    // 一、经过threadPoolExecutor的构造函数建立线程池
    private static ThreadPoolExecutor threadPoolExecutor = 
            new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS,
                                    new ArrayBlockingQueue<Runnable>(10));


    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 二、使用execute方法执行没有返回结果的任务
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                new Task().doSomething();
            }
        });

        // 三、使用submit方法执行有返回结果的任务且须要实现Callable接口
        Future future = threadPoolExecutor.submit(new Callable<Integer>() {
              @Override
              public Integer call() throws Exception {
                  return new Task().doOtherthing();
              }
          }
        );
        System.out.println(future.get());
    }
}

class Task {

    public void doSomething() {
        System.out.println("doSomeThing ...");
    }

    public int doOtherthing() {
        System.out.println("doOtherthing ..., and return 10.");
        return 10;
    }
}
复制代码

execute方法和submit方法的区别

  • execute方法

    用于提交不须要返回值的任务,因此没法判断任务是否被线程池执行成功。

  • submit方法

    用于提交须要返回值的任务。线程池会返回一个future类型的对象,经过这个future对象能够判断任务是否执行成功。get()方法会阻塞当前线程直到任务完成;get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后当即返回,这时候有可能任务没有执行完。

ThreadPoolExecutor的实现原理

当向线程池提交一个任务以后,线程池是如何处理这个任务的呢?线程池的主要处理流程以下:

ThreadPoolExecutor的实现原理

  1. 若是当前运行的线程小于corePoolSize,则建立新线程来执行;
  2. 若是运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue;
  3. 若是没法将任务加入BlockingQueue(队列已满),则建立新的线程来处理任务;
  4. 若是建立新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用handler.rejectedExecution(command, this)方法。

ThreadPoolExecutor采起上述步骤的整体设计思路,是为了在执行executor()方法时,尽量地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热以后(当前运行的线程数大于等于corePoolSize),几乎全部的execute()方法调用都是执行步骤2,而步骤2不须要获取全局锁。

ThreadPoolExecutor源码分析

类结构图

ThreadPoolExecutor类结构图.png

核心变量与方法(状态转换)

// 初始化状态和数量,状态为RUNNING,线程数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 前3位表示状态,全部线程数占29位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池容量大小为 1 << 29 - 1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池状态
// RUNNING状态:11100000000000000000000000000000(前3位为111)
private static final int RUNNING    = -1 << COUNT_BITS;
// SHUTDOWN状态:00000000000000000000000000000000(前3位为000)
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// STOP状态:00100000000000000000000000000000(前3位为001)
private static final int STOP       =  1 << COUNT_BITS;
// TIDYING状态:01000000000000000000000000000000(前3位为010)
private static final int TIDYING    =  2 << COUNT_BITS;
// TERMINATED状态:01100000000000000000000000000000(前3位为011)
private static final int TERMINATED =  3 << COUNT_BITS;

// 获得状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获得线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

复制代码

ThreadPoolExecutor线程池有5个状态,分别是:

  1. RUNNING:能够接受新的任务,也能够处理阻塞队列里的任务。
  2. SHUTDOWN:不接受新的任务,可是能够处理阻塞队列里的任务。
  3. STOP:不接受新的任务,不处理阻塞队列里的任务,中断正在处理的任务。
  4. TIDYING:过渡状态,也就是说全部的任务都执行完了,当前线程池已经没有有效的线程,这个时候线程池的状态将会TIDYING,而且将要调用terminated方法。
  5. TERMINATED:终止状态。terminated方法调用完成之后的状态。

线程池的状态转换过程:

  • RUNNING -> SHUTDOWN On invocation of shutdown(), perhaps implicitly in finalize()
  • (RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow()
  • SHUTDOWN -> TIDYING When both queue and pool are empty
  • STOP -> TIDYING When pool is empty
  • TIDYING -> TERMINATED When the terminated() hook method has completed

Threads waiting in awaitTermination() will return when the state reaches TERMINATED.

核心方法

execute方法

使用ThreadPoolExecutor执行任务的时候,可使用execute或submit方法,submit方法以下:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
复制代码

经过源码可知submit方法一样也是由execute()完成的,execute()方法源码以下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 过程1
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // 过程2
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次检查线程池状态
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 线程池中没有可用的工做线程时 
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) // 过程3
        // 过程4
        reject(command);
}
复制代码

addWorker方法

addWorker方法的主要工做就是建立一个工做线程执行任务,代码以下:

/* * firstTask参数:用于指定新增的线程执行的第一个任务。 * core为true:表示在新增线程时会判断当前活动线程数是否少于corePoolSize, * false表示新增线程前须要判断当前活动线程数是否少于maximumPoolSize。 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        /** * 只有当下面两种状况会继续执行,其余直接返回false(添加失败) * 一、rs == RUNNING * 二、rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty() *(执行了shutdown方法,可是阻塞队列还有任务没有执行) */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 判断工做线程的数量是否超过线程池的限制
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // workerCount加1成功,跳出两层循坏。
            if (compareAndIncrementWorkerCount(c))
                break retry;
            /** * 能执行到这里,都是由于多线程竞争,只有两种状况 * 一、workCount发生变化,compareAndIncrementWorkerCount失败, * 这种状况不须要从新获取ctl,继续for循环便可。 * 二、runState发生变化,可能执行了shutdown或者shutdownNow, * 这种状况从新走retry,取得最新的ctl并判断状态。 */
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    // worker是否执行标识
    boolean workerStarted = false;
    // worker是否添加成功标识
    boolean workerAdded = false;
    // 保存建立的worker变量
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        // 检查线程是否建立成功
        if (t != null) {
            // 加锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 加锁成功,从新检查线程池的状态 
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将w存储到workers容器中
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 添加成功标识
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 执行任务
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 失败回退,从 wokers 移除 w, 线程数减一,尝试结束线程池(调用tryTerminate 方法)
            addWorkerFailed(w);
    }
    return workerStarted;
}
复制代码

Worker类

在分析t.start()以前,须要了解Worker类。其源码以下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

    // 工做线程
    final Thread thread;
    // 初始化任务
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        // 禁止中断,直到runWorker
        setState(-1);
        this.firstTask = firstTask;
        // 很重要,worker实例被包装成thread执行的任务。
        // 这样t.start启动后,将运行Worker的run方法。
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }

    // 实现AQS的相关方法
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock() { acquire(1); }
    public boolean tryLock() { return tryAcquire(1); }
    public void unlock() { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
复制代码
runWorker方法
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 自旋操做,获取队列中的任务
        while (task != null || (task = getTask()) != null) {
            // 加锁的做用是线程池关闭时,防止正在执行工做线程被中断。
            w.lock();
             /* * 在执行任务以前先作一些处理。 * 1. 若是线程池已经处于STOP状态而且当前线程没有被中断,中断线程。 * 2. 若是线程池还处于RUNNING或SHUTDOWN状态,而且当前线程已经被中断了, * 从新检查一下线程池状态,若是处于* * STOP状态而且没有被中断,那么中断线程。 */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();

            try {
                // hook method
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                     // 真正的开始执行任务,调用的是run方法,而不是start方法。
                    // 这里run的时候可能会被中断,好比线程池调用了shutdownNow方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // hook method
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 回收woker
        processWorkerExit(w, completedAbruptly);
    }
}
复制代码
getTask方法
private Runnable getTask() {

    boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 计算从队列获取任务的方式( poll or take)
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 当工做线程超过其最大值或者timed = true时其workQueue.isEmpty()时,返回null。
        // 这意味为该worker将被回收。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
复制代码

由上可知,当allowCoreThreadTimeOut为true时,若是队列长时间没有任务,工做线程最终都会被销毁。

其余知识点

关闭线程池

能够经过调用线程池的shutdownshutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工做线程,而后逐个调用线程的interrupt方法来中断线程,因此没法响应中断的任务可能永远没法终止。

可是它们存在必定的区别,shutdownNow首先将线程池的状态设置成STOP,而后尝试中止全部的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,而后中断全部没有正在执行任务的线程。

只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当全部的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。

合理地配置线程池

要想合理地配置线程池,就必须首先分析任务特性,能够从如下几个角度来分析。

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。

  • 任务的优先级:高、中和低。

  • 任务的执行时间:长、中和短。

  • 任务的依赖性:是否依赖其余系统资源,如数据库链接。

性质不一样的任务能够用不一样规模的线程池分开处理:

1)CPU密集型任务应配置尽量小的线程,如配置N cpu +1个线程的线程池。

2)IO密集型任务线程并非一直在执行任务,则应配置尽量多的线程,如2*N cpu 。

3)混合型的任务,若是能够拆分,将其拆分红一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。若是这两个任务执行时间相差太大,则不必进行分解。

优先级不一样的任务可使用优先级队列PriorityBlockingQueue来处理。它可让优先级高的任务先获得执行,须要注意的是若是一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

执行时间不一样的任务能够交给不一样规模的线程池来处理,或者也可使用优先级队列,让执行时间短的任务先执行。

依赖数据库链接池的任务,由于线程提交SQL后须要等待数据库返回结果,若是等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。

能够经过Runtime.getRuntime().availableProcessors()方法得到当前设备的CPU个数

建议使用有界队列,有界队列能增长系统的稳定性和预警能力,能够根据须要设大一点,好比几千。有一次咱们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,经过排查发现是数据库出现了问题,致使执行SQL变得很是缓慢,由于后台任务线程池里的任务全是须要向数据库查询和插入数据的,因此致使线程池里的工做线程所有阻塞住,任务积压在线程池里。若是当时咱们设置成无界队列,线程池的队列就会愈来愈多,有可能会撑满内存,致使整个系统不可用,而不仅是后台任务出现问题。固然咱们的系统全部的任务是用的单独的服务器部署的,而咱们使用不一样规模的线程池跑不一样类型的任务,可是出现这样问题时也会影响到其余任务。

线程池监控

若是在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,能够根据线程池的使用情况快速定位问题。能够经过线程池提供的参数进行监控,在监控线程池的时机可使用如下属性:

  • taskCount:线程池须要执行的任务数量。
  • completedTaskCount:线程池在运行过程当中已完成的任务数量,小于或等于taskCount。
  • largestPoolSize:线程池里曾经建立过的最大线程数量。经过这个数据能够直到线程池是否曾经满过。
  • getPoolSize:线程池的线程数量。
  • getActiveCount:获取活动的线程数。

经过扩展线程池进行监控。能够经过继承线程池来自定义线程池,重写线程池的beforeExecuteafterExecuteterminated方法,也能够在任务执行前、执行后和线程池关闭以前执行一些代码来进行监控。

参考资料

  1. Java线程池ThreadPoolExecutor源码分析

  2. 【细谈Java并发】谈谈线程池:ThreadPoolExecutor


若是读完以为有收获的话,欢迎点赞、关注、加公众号【牛觅技术】,查阅更多精彩历史!!!

相关文章
相关标签/搜索