以前建立线程的时候都是用的 newCachedThreadPoo
,newFixedThreadPool
,newScheduledThreadPool
,newSingleThreadExecutor
这四个方法。
固然 Executors
也是用不一样的参数去 new ThreadPoolExecutor
实现的,本文先分析前四种线程建立方式,后在分析 new ThreadPoolExecutor
建立方式java
因为使用了LinkedBlockingQueue
因此maximumPoolSize
没用,当corePoolSize
满了以后就加入到LinkedBlockingQueue
队列中。 每当某个线程执行完成以后就从LinkedBlockingQueue
队列中取一个。 因此这个是建立固定大小的线程池。git
源码分析github
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
建立线程数为1的线程池,因为使用了LinkedBlockingQueue
因此maximumPoolSize
没用,corePoolSize
为1表示线程数大小为1,满了就放入队列中,执行完了就从队列取一个。缓存
源码分析函数
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService
(
new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
复制代码
建立可缓冲的线程池。没有大小限制。因为corePoolSize
为0因此任务会放入SynchronousQueue
队列中,SynchronousQueue
只能存放大小为1,因此会马上新起线程,因为maxumumPoolSize
为Integer.MAX_VALUE
因此能够认为大小为2147483647
。受内存大小限制。oop
源码分析源码分析
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
复制代码
源码分析 ,ThreadPoolExecutor
的构造函数测试
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
复制代码
一、corePoolSize
核心线程数大小,当线程数 < corePoolSize ,会建立线程执行 runnableui
二、maximumPoolSize
最大线程数, 当线程数 >= corePoolSize的时候,会把 runnable 放入 workQueue中this
三、keepAliveTime
保持存活时间,当线程数大于corePoolSize的空闲线程能保持的最大时间。
四、unit
时间单位
五、workQueue
保存任务的阻塞队列
六、threadFactory
建立线程的工厂
七、handler
拒绝策略
一、当线程数小于 corePoolSize
时,建立线程执行任务。
二、当线程数大于等于 corePoolSize
而且 workQueue
没有满时,放入workQueue
中
三、线程数大于等于 corePoolSize
而且当 workQueue
满时,新任务新建线程运行,线程总数要小于 maximumPoolSize
四、当线程总数等于 maximumPoolSize
而且 workQueue
满了的时候执行 handler
的 rejectedExecution
。也就是拒绝策略。
ThreadPoolExecutor默认有四个拒绝策略:
一、ThreadPoolExecutor.AbortPolicy()
直接抛出异常RejectedExecutionException
二、ThreadPoolExecutor.CallerRunsPolicy()
直接调用run方法而且阻塞执行
三、ThreadPoolExecutor.DiscardPolicy()
直接丢弃后来的任务
四、ThreadPoolExecutor.DiscardOldestPolicy()
丢弃在队列中队首的任务
固然能够本身继承RejectedExecutionHandler来写拒绝策略.
package io.ymq.thread.TestThreadPoolExecutor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** * 描述: * * @author yanpenglei * @create 2017-10-12 15:39 **/
public class TestThreadPoolExecutor {
public static void main(String[] args) {
long currentTimeMillis = System.currentTimeMillis();
// 构造一个线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3)
);
for (int i = 1; i <= 10; i++) {
try {
String task = "task=" + i;
System.out.println("建立任务并提交到线程池中:" + task);
threadPool.execute(new ThreadPoolTask(task));
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
try {
//等待全部线程执行完毕当前任务。
threadPool.shutdown();
boolean loop = true;
do {
//等待全部线程执行完毕当前任务结束
loop = !threadPool.awaitTermination(2, TimeUnit.SECONDS);//等待2秒
} while (loop);
if (loop != true) {
System.out.println("全部线程执行完毕");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("耗时:" + (System.currentTimeMillis() - currentTimeMillis));
}
}
}
复制代码
package io.ymq.thread.TestThreadPoolExecutor;
import java.io.Serializable;
/** * 描述: * * @author yanpenglei * @create 2017-10-12 15:40 **/
public class ThreadPoolTask implements Runnable, Serializable {
private Object attachData;
ThreadPoolTask(Object tasks) {
this.attachData = tasks;
}
public void run() {
try {
System.out.println("开始执行任务:" + attachData + "任务,使用的线程池,线程名称:" + Thread.currentThread().getName());
System.out.println();
} catch (Exception e) {
e.printStackTrace();
}
attachData = null;
}
}
复制代码
遇到java.util.concurrent.RejectedExecutionException
第一
你的线程池 ThreadPoolExecutor
显示的 shutdown()
以后,再向线程池提交任务的时候。 若是你配置的拒绝策略是 AbortPolicy
的话,这个异常就会抛出来。
第二
当你设置的任务缓存队列太小的时候,或者说, 你的线程池里面全部的线程都在干活(线程数== maxPoolSize
),而且你的任务缓存队列也已经充满了等待的队列, 这个时候,你再向它提交任务,则会抛出这个异常。
响应
能够看到线程 pool-1-thread-1 到5 循环使用
建立任务并提交到线程池中:task=1
开始执行任务:task=1任务,使用的线程池,线程名称:pool-1-thread-1
建立任务并提交到线程池中:task=2
开始执行任务:task=2任务,使用的线程池,线程名称:pool-1-thread-2
建立任务并提交到线程池中:task=3
开始执行任务:task=3任务,使用的线程池,线程名称:pool-1-thread-3
建立任务并提交到线程池中:task=4
开始执行任务:task=4任务,使用的线程池,线程名称:pool-1-thread-4
建立任务并提交到线程池中:task=5
开始执行任务:task=5任务,使用的线程池,线程名称:pool-1-thread-5
建立任务并提交到线程池中:task=6
开始执行任务:task=6任务,使用的线程池,线程名称:pool-1-thread-1
建立任务并提交到线程池中:task=7
开始执行任务:task=7任务,使用的线程池,线程名称:pool-1-thread-2
建立任务并提交到线程池中:task=8
开始执行任务:task=8任务,使用的线程池,线程名称:pool-1-thread-3
建立任务并提交到线程池中:task=9
开始执行任务:task=9任务,使用的线程池,线程名称:pool-1-thread-4
建立任务并提交到线程池中:task=10
开始执行任务:task=10任务,使用的线程池,线程名称:pool-1-thread-5
全部线程执行完毕
耗时:1015
复制代码
github github.com/souyunku/ym…