Java 并发编程 | 线程池详解

原文: https://chenmingyu.top/concurrent-threadpool/java

线程池

线程池用来处理异步任务或者并发执行的任务编程

优势:数组

  1. 重复利用已建立的线程,减小建立和销毁线程形成的资源消耗
  2. 直接使用线程池中的线程,提升响应速度
  3. 提升线程的可管理性,由线程池同一管理

ThreadPoolExecutor

java中线程池使用ThreadPoolExecutor实现缓存

构造函数

ThreadPoolExecutor提供了四个构造函数,其余三个构造函数最终调用的都是下面这个构造函数并发

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

入参:框架

  1. corePoolSize:线程池的核心线程数量异步

    线程池维护的核心线程数量,当线程池初始化后,核心线程数量为零,当有任务来到的时候才会建立线程去执行任务,当线程池中的工做线程数量等于核心线程数量时,新到的任务就会放到缓存队列中函数

  2. maximumPoolSize:线程池容许建立的最大线程数量oop

    当阻塞队列满了的时候,而且线程池中建立的线程数量小于maximumPoolSize,此时会建立新的线程执行任务ui

  3. keepAliveTime:线程活动保持时间

    只有当线程池数量大于核心线程数量时,keepAliveTime才会有效,若是当前线程数量大于核心线程数量时,而且线程的空闲时间达到keepAliveTime,当前线程终止,直到线程池数量等于核心线程数

  4. unit:线程活动保持时间的单位

    keepAliveTime的单位,包括:TimeUnit.DAYS天,TimeUnit.HOURS小时,TimeUnit.MINUTES分钟,TimeUnit.SECONDS秒,TimeUnit.MILLISECONDS毫秒,TimeUnit.MICROSECONDS微秒,TimeUnit.NANOSECONDS纳秒

  5. workQueue:任务队列,用来保存等待执行任务的阻塞队列

    ArrayBlockingQueue:是一个基于数组结构的有界队列

    LinkedBlockingQueue:是一个基于链表结构的阻塞队列

    SynchronousQueue:不存储元素的阻塞队列,每个插入操做必须等到下一个线程调用移除操做,不然插入操做一直阻塞

    PriorityBlockingQueue:一个具备优先级的无线阻塞队列

  6. threadFactory:用来建立线程的工厂

  7. handler:饱和策略,当线程池和队列都满了的时候,必需要采起一种策略处理新的任务,默认策略是AbortPolicy,根据本身需求选择合适的饱和策略

    AbortPolicy:直接抛出异常

    CallerRunsPolicy:用调用者所在的线程来运行当前任务

    DiscardOldestPolicy:丢弃队列里面最近的一个任务,并执行当前任务

    DiscardPolicy:不处理,丢弃掉

    固然咱们也能够经过实现RejectedExecutionHandler去自定义实现处理策略

入参不一样,线程池的运行机制也不一样,了解每一个入参的含义因为咱们更透传的理解线程池的实现原理

提交任务

线程池处理提交任务流程以下

处理流程

  1. 若是核心线程数量未满,建立线程执行任务,不然添加到阻塞队列中
  2. 若是阻塞队列中未满,将任务存到队列里
  3. 若是阻塞队列满了,看线程池数量是否达到了线程池最大数量,若是没达到,建立线程执行任务
  4. 若是已经达到线程池最大数量,根据饱和策略进行处理

ThreadPoolExecutor使用execute(Runnable command)submit(Runnable task)向线程池中提交任务,在submit(Runnable task)方法中调用了execute(Runnable command),因此咱们只要了解execute(Runnable command)

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 获取线程池状态,而且能够经过ctl获取到当前线程池数量及线程池状态
    int c = ctl.get();
    // 若是工做线程数小于核心线程数量,则建立一个新线程执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 若是不符合上面条件,当前线程处于运行状态而且写入阻塞队列成功
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 双重检查,再次获取线程状态,若是当前线程状态变为非运行状态,则从队列中移除任务,执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 检查工做线程数量是否为0
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //建立线程执行任务,若是添加失败则执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

execute(Runnable command)方法中咱们比较关心的就是如何建立新的线程执行任务,就addWorker(command, true)方法

workQueue.offer(command)方法是用来向阻塞队列中添加任务的

reject(command)方法会根据建立线程池时传入的饱和策略对任务进行处理,例如默认的AbortPolicy,查看源码后知道就是直接抛了个RejectedExecutionException异常,其余的饱和策略的源码也是特别简单

关于线程池状态与工做线程的数量是如何表示的

ThreadPoolExecutor中使用一个AtomicInteger类型变量表示

/**
 * ctl表示两个信息,一个是线程池的状态(高3位表示),一个是当前线程池的数量(低29位表示),这个跟咱们前面   * 说过的读写锁的state变量是同样的,以一个变量记录两个信息,都是以利用int的32个字节,高十六位表述读,低十  * 六位表示写锁
 */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//低29位保存线程池数量
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池最大容量
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 运行状态存储在高3位
// 运行状态
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

addWorker(command, boolean)建立工做线程,执行任务

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 线程池状态
        int rs = runStateOf(c);
        // 判断线程池状态,以及阻塞队列是否为空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 获取线程工做线程数量
            int wc = workerCountOf(c);
            // 判断是否大于最大容量,以及根据传入的core判断是否大于核心线程数量仍是最大线程数量
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 增长工做线程数量
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            //若是线程池状态改变,则重试
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 建立Worker,内部建立了一个新的线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // 线程池状态判断
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将建立的线程添加到线程池
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //执行任务,首先会执行Worker对象的firstTask
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //若是任务执行失败
        if (! workerStarted)
            //移除worker
            addWorkerFailed(w);
    }
    return workerStarted;
}
关闭线程池

ThreadPoolExecutor中关闭线程池使用shutdown()shutdownNow()方法,原理都是经过遍历线程池中的线程,对线程进行中断

for (Worker w : workers) {
    Thread t = w.thread;
    if (!t.isInterrupted() && w.tryLock()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        } finally {
            w.unlock();
        }
    }
    if (onlyOne)
        break;
    }
Executor框架

Executor框架将任务的提交与任务的执行进行分离

Executors提供了一系列工厂方法用于创先线程池,返回的线程池都实现了 ExecutorService 接口

工厂方法:

  1. newFixedThreadPool:用于建立固定数目线程的线程池
  2. newCachedThreadPool:用于建立一个可缓存的线程池,调用execute将重用之前构造的线程,若是现有线程没有可用的,则建立一个新线 程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程
  3. newSingleThreadExecutor:用于建立只有一个线程的线程池
  4. newScheduledThreadPool:用于建立一个支持定时及周期性的任务执行的线程池

在阿里巴巴手册中强制要求禁止使用Executors提供的工厂方法建立线程池

这个确实是一个很严重的问题,咱们部门曾经就出现过使用FixedThreadPool线程池,致使OOM,这是由于线程执行任务的时候被阻塞或耗时很长时间,致使阻塞队列一直在添加任务,直到内存被打满,报OOM

因此咱们在使用线程池的时候要使用ThreadPoolExecutor的构造函数去建立线程池,根据本身的任务类型来肯定核心线程数和最大线程数,选择适合阻塞队列和阻塞队列的长度

合理配置线程池

合理的配置线程池须要分析一下任务的性质(使用ThreadPoolExecutor建立线程池):

  1. CPU密集型任务应配置竟可能小的线程,好比 cpu数量+1

  2. IO密集型任务并非一直在执行任务,应该配置尽量多的线程,好比 cpu数量x2

    可经过Runtime.getRuntime().availableProcessors()获取cpu数量

  3. 执行的任务有调用外部接口比较费时的时候,这时cup空闲的时间就越长,能够将线程池数量设置大一些,这样cup空闲的时间就能够去执行别的任务

  4. 建议使用有界队列,可根据须要将长度设置大一些,防止OOM

参考:java并发编程的艺术

推荐阅读

java并发编程 | 线程详解

java并发编程 | 锁详解:AQS,Lock,ReentrantLock,ReentrantReadWriteLock

相关文章
相关标签/搜索