Java线程池二:线程池原理

最近精读Netty源码,读到NioEventLoop部分的时候,发现对Java线程&线程池有些概念还有困惑, 因此深刻总结一下
Java线程池一:线程基础
Java线程池二:线程池原理html

为何须要使用线程池

Java线程映射的是系统内核线程,是稀缺资源,使用线程池主要有如下几点好处java

  • 下降资源消耗:重复利用池中线程下降线程的建立和消耗形成的资源消耗。
  • 提升响应速度:任务到达时直接使用池总中空闲的线程,能够不用等待线程建立。
  • 提升线程的可管理性:线程是稀缺资源,不能无限制建立,使用线程池能够统一进行分配、监控、调优。

线程池框架简介

  • Executor接口:提供execute方法提交任务
  • ExecutorService接口:提供能够跟踪任务执行结果的 submit方法 & 提供线程池关闭的方法(shutdown, shutdowNow)
  • AbstractExecutorService抽象类:实现submit方法
  • ThreadPoolExecutor: 线程池实现类
  • ScheduleThreadPoolExecutor:能够执行定时任务的线程池

ThreadPoolExecutor原理

核心参数以及含义

  • corePoolSize:核心线程池大小
  • maximumPoolSize: 线程池最大大小
  • workQueue: 工做队列(任务暂时存放的地方)
  • RejectedExecutionHandler:拒绝策略(线程池没法执行该任务时的处理策略)

任务提交流程

任务提交过程见下流程图
api

线程池的状态

  • RUNNING:正常的线程池运行状态
  • SHUTDOWN:调用shutdown方法到该状态,该状态下拒绝提交新任务,但会将已提交的任务的处理完毕
  • STOP:调用shutdownNow方法到该状态,该状态下拒绝新任务的提交 & 丢弃工做队列中的任务 & 中断正在执行任务的工做线程
  • TIDYING:工做队列和线程池都为空时自动到该状态
  • TERMINATED:terminated方法返回以后自动到该状态

工做队列

核心线程池满时,任务会尝试提交到工做队列,后续工做线程会从工做队列中获取任务执行。数组

由于涉及到多个线程对工做队列的读写,因此工做队列须要是线程安全的,Java提供了如下几种线程安全的队列(BlockingQueue)安全

实现类 工做机制
ArrayBlockingQueue 底层实现是数组
LinkedBlockingDeque 底层实现是链表
PriorityBlockingQueue 优先队列,本质是个小顶堆
DelayQueue 延时队列 (优先队列 & 元素实现Delayed接口),ScheduledThreadPoolExecutor实现的关键
SynchronousQueue 同步队列

BlockingQueue 多组读写操做API

操做 描述
add/remove 队列已满/队列已空时,抛出异常
put/take 队列已满/队列已空时,阻塞等待
offer/poll 队列已满/队列已空时,返回特殊值(false/null)
offer(time) / poll(time) 超时时间内没法写入或者读取成功,返回特殊值

拒绝策略

拒绝策略是当线程池满负载时(任务队列已满 & 线程池已满)对新提交任务的处理策略,jdk提供了以下四种实现,其中AbortPolicy是默认实现。并发

实现类 工做机制
AbortPolicy 抛出RejectedExecutionException异常
CallerRunsPolicy 调用线程执行该任务
DiscardOldestPolicy 丢弃工做队列头部任务,再尝试提交该任务
DiscardPolicy 直接丢弃

固然咱们能够有自定义的实现,好比记录日志、任务实例持久化,同时发送报警到开发人员。框架

跟踪任务的执行结果

线程池提供了几个submit方法, 调用线程能够根据返回的Future对象获取任务执行结果,那么它的实现原理又是什么呐?oop

装饰模式对task的run方法进行加强this

1.提交任务前,会把task装饰成一个FutureTask对象线程

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
 }

2.FutureTask对象的run方法会存储返回的结果或者异常。调用方能够根据FutureTask获取任务的执行结果。

//省略了部分代码
public void run() {
      Callable<V> c = callable;
      if (c != null && state == NEW) {
        V result;
        boolean ran;
        try {
          //执行任务
          result = c.call();
          ran = true;
        } catch (Throwable ex) {
          result = null;
          ran = false;
          //存储异常
          setException(ex);
        }
        if (ran)
          //存储返回值
          set(result);
 }

线程池的关闭

shutdown

shutdown将线程池的状态设置成SHUTDOWN,同时拒绝提交新的任务,可是已提交的任务会正常执行

shutdownNow

shutdownNow将线程池的状态设置成STOP,该状态下拒绝提交新的任务 & 丢弃工做队列中的任务& 中断当前活跃的线程(尝试中止正在执行的任务)

须要注意的是shutdownNow对于正在执行的任务只是尝试中止,不保证成功(取决于任务是否监听处理中断位)

ScheduledThreadPoolExecutor 定时调度原理

ScheduledThreadPoolExecutor在ThreadPoolExecutor之上扩展实现了定时调度的能力

1.实例化时工做队列使用延时队列(DelayedWorkQueue)--- 本质是个小顶堆

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}

2.提交的任务装饰成ScheduledFutureTask类型,并把任务加入到工做队列(不直接调用execute)

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        //装饰
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
  			//任务加入工做队列
        delayedExecute(t);
        return t;
    }

3.ScheduledFutureTask实现Delayed和Comparable接口

因此提交到工做队列中的任务是按照任务执行时间排序的(最先执行的任务在头部),由于工做队列是个小顶堆。

public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
}

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

4.只能从工做队列中获取已到执行时间的任务

public RunnableScheduledFuture<?> poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        RunnableScheduledFuture<?> first = queue[0];
      	//若是头部的任务尚未到执行时间, 直接返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return finishPoll(first);
    } finally {
        lock.unlock();
    }
}

线程池配置

假设:CPU核心数是N,每一个任务的执行时间是T,任务的超时时间是timeout,核心线程数是corePoolSize,工做队列大小是workQueue, 最大线程数是 maxPoolSize, 任务最大并发数为maxTasks

核心线程数配置

  1. 对于CPU密集型任务:corePoolSize 大小设置成和CPU核心数接近,如N+1 或者 N+2

  2. 对于IO密集型任务:corePoolSize能够设置的比较大一些,如2N~3N;也能够经过以下逻辑进行估算

    假设80%的时间是IO操做,那么每一个任务须要占用CPU时间大概是0.2T, 每秒每一个CPU核心最大能够执行的任务数为 = (1/0.2T) = 5/T;因此理论上 80%IO的状况下corePoolSize能够设置为 5N (一个cpu能够对应5个工做线程)

工做队列大小配置

工做队列的大小取决于任务的超时时间 & 核心线程池的吞吐量

workQueue = corePoolSize * (1/T) * timeout = (corePoolSize * timeout) / T

须要注意的是: 工做队列不能使用无界队列。(无界队列异常状况下可能耗尽系统资源,形成服务不可用)

最大线程数配置

最大线程数的大小取决于最大的任务并发数 & 工做队列的大小 & 任务的执行时间

maxPoolSize = (maxTasks - workQueue) / T

拒绝策略配置

对于可有可无的任务,咱们能够直接丢弃;对于一些重要的任务须要对任务进行持久化,以便后续进行补偿和恢复。

线程池监控

咱们能够有个定时脚本将线程池的最大线程数、工做队列大小、已经执行的任务数、已经拒绝的任务数等数据推送到监控系统

这样咱们能够根据这些数据对线程池进行调优,也能够即便感知线上业务异常。

相关文章
相关标签/搜索