Java 中提供的线程池 Api
Executors 里面提供了几个线程池的工厂方法,使用Executors 的工厂方法,就能够使用线程池:
newFixedThreadPool:该方法返回一个固定数量的线程池,线程数不变,当有一个任务提交时,若线程池中空闲,则当即执行,若没有,则会被暂缓在一个任务队列中,等待有空闲的线程去执行。
newSingleThreadExecutor: 建立一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。
newCachedThreadPool:返回一个可根据实际状况调整线程个数的线程池,不限制最大线程数量,若用空闲的线程则执行任务,若无任务则不建立线程。而且每个空闲线程会在 60 秒后自动回收
newScheduledThreadPool: 建立一个能够指定线程的数量的线程池,可是这个线程池还带有延迟和周期性执行任务的功能,相似定时器。数组
public ThreadPoolExecutor (int corePoolSize, //核心线程数量 int maximumPoolSize, //最大线程数 long keepAliveTime, // 超时时间,超出核心线程数量之外的线程空余存活时间 TimeUnit unit, // 存活时间单位 BlockingQueue<Runnable> workQueue, // 保存执行任务的队列 ThreadFactory threadFactory, // 建立新线程使用的工厂 RejectedExecutionHandler handler // 当任务没法执行的时候的处理方式)
线程池原理分析(FixedThreadPool)
拒绝策略
当核心线程数等于设置的核心线程数,任务队列已满,非核心线程数已经等于达到最大线程数量,拒绝任务。缓存
- AbortPolicy:直接抛出异常,默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务;
- 根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务
线程池的大小
CPU 密集型,主要是执行计算任务,响应时间很快,cpu 一直在运行,这种任务 cpu的利用率很高,那么线程数的配置应该根据 CPU 核心数来决定,CPU 核心数=最大同时执行线程数,假如 CPU 核心数为 4,那么服务器最多能同时执行 4 个线程。过多的线程会致使上下文切换反而使得效率下降。
线程池的最大线程数能够配置为 cpu 核心数+1 IO 密集型,主要是进行 IO 操做,执行 IO 操做的时间较长,这时 cpu 出于空闲状态,致使 cpu 的利用率不高,这种状况下能够增长线程池的大小。这种状况下能够结合线程的等待时长来作判断,等待时间越高,那么线程数也相对越多。通常能够配置 cpu 核心数的 2 倍。
公式:线程池设定最佳线程数目 = ((线程池设定的线程等待时间+线程 CPU 时间)/线程 CPU 时间 )* CPU 数目
这个公式的线程 cpu 时间是预估的程序单个线程在 cpu 上运行的时间服务器
线程初始化
默认状况下,建立线程池以后,线程池中是没有线程的,须要提交任务以后才会建立线程。在实际中若是须要线程池建立以后当即建立线程,能够经过如下两个方法办到:
prestartCoreThread():初始化一个核心线程;
prestartAllCoreThreads():初始化全部核心线程
ThreadPoolExecutor tpe=(ThreadPoolExecutor)service;
tpe.prestartAllCoreThreads();函数
线程池的关闭
ThreadPoolExecutor 提供了两个方法,用于线程池的关闭, 分别是shutdown()和shutdownNow() shutdown():不会当即终止线程池,而是要等全部任务缓存队列中的任务都执行完后才终止,但不再会接受新的任务 shutdownNow():当即终止线程池,并尝试打断正在执行的任务,而且清空任务缓存队列,返回还没有执行的任务线程
线程池容量的动态调整
ThreadPoolExecutor 提 供 了 动 态 调 整 线 程 池 容 量 大 小 的 方 法 : setCorePoolSize() 和setMaximumPoolSize()
setCorePoolSize:设置核心池大小
setMaximumPoolSize:设置线程池最大能建立的线程数目大小
任务缓存队列及排队策略
任务缓存队列,即 workQueue,用来存放等待执行的任务。
workQueue 的类型为 BlockingQueue,一般能够取下面三种类型:rest
- ArrayBlockingQueue:基于数组的先进先出队列,此队列建立时必须指定大小;
- LinkedBlockingQueue:基于链表的先进先出队列,若是建立时没有指定此队列大小,则默认为 Integer.MAX_VALUE;
- SynchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
线程池的监控
线程池提供了相应的扩展方法,咱们经过重写线程池的 beforeExecute、afterExecute 和 shutdown 等方式就能够实现对线程的监控日志
Callable/Future
线程池的执行任务有两种方法,一种是 submit、一种是 execute;code
execute 和 submit 区别
execute | submit |
---|---|
只能够接收一个 Runnable 的参数 | 能够接收 Runable 和 Callable 这两种类型的参数 |
execute 若是出现异常会抛出 | 调用不会抛异常,除非调用 Future.get |
execute 没有返回值 | 若是传入一个 Callable,能够获得一个 Future 的返回值 |
Callable/Future 原理分析
Callable 是一个函数式接口,里面就只有一个 call 方法。 Future 提供了get() get 方法就是阻塞获取线程执行结果,这里主要作了两个事情blog
- 判断当前的状态,若是状态小于等于 COMPLETING,表示 FutureTask 任务尚未完结,因此调用 awaitDone 方法,让当前线程等待。
- report 返回结果值或者抛出异常
public V get() throws InterruptedException, ExecutionException { int s = state; if ( s <= COMPLETING ) s = awaitDone( false, 0L ); return(report( s ) ); }
awaitDone 若是当前的结果尚未被执行完,把当前线程线程和插入到等待队列接口
run 方法执行完成以后唤醒阻塞线程,响应结果
AbstractExecutorService.submit
调用抽象类中的 submit 方法,这里其实相对于 execute 方法来讲,只多作了一步操做,就是封装了一个 RunnableFuture
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
ThreadpoolExecutor.execute
调用 execute 方法,经过 worker 线程来调用过 ftask 的run 方法。而这个 ftask 其实就是 FutureTask 里面最终实现的逻辑