我的博客项目地址java
但愿各位帮忙点个star,给我加个小星星✨git
在java中,使用线程时经过new Thread实现很简单,可是若是并发数量不少时,频繁地建立线程就会大大下降系统的效率。github
因此能够经过线程池,使得线程能够复用,每执行完一个任务,并非被销毁,而是能够继续执行其余任务。编程
花了两天时间去看了高洪岩写的《JAVA并发编程》,是想要知其然,知其因此然,在使用的状况下,了解学习了一下原理记录下java.util.concurrent并发包下的ThreadPoolExecutor特性和实现缓存
粗暴点,咱们直接看如何使用吧多线程
简单举个🌰:
Executors.newCachedThreadPool(); //建立一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //建立容量为1的缓冲池
Executors.newFixedThreadPool(int); //建立固定容量大小的缓冲池
具体实现逻辑:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
经过该Executors的静态方法进行线程池的建立,并且从具体实现来看,仍是调用了new ThreadPoolExecutor(),只是内部参数已经帮咱们配置好了。并发
既然真正实现都是用ThreadPoolExecutor,那就本身设定好方法的参数吧。ide
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.HOURS, new LinkedBlockingDeque<>());
for(int i=0;i<10;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
executor.getQueue().size()+",已执行完别的任务数目:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
static class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在执行task "+taskNum);
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"执行完毕");
}
}
复制代码
打印效果以下:函数
正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行完别的任务数目:0
task 2执行完毕
task 0执行完毕
task 3执行完毕
task 1执行完毕
正在执行task 8
task 4执行完毕
正在执行task 7
正在执行task 6
正在执行task 5
正在执行task 9
task 8执行完毕
task 6执行完毕
task 7执行完毕
task 5执行完毕
task 9执行完毕
复制代码
任务Task提交以后,因为是多线程状态下,因此打印效果并非同步的,能够看出任务都已经顺利执行。学习
我这个实现参数是5个corePoolSize核心线程数和5个maximumPoolSize最大线程数,当线程池中的线程数超过5个的时候,将新来的任务放进缓存队列中,小伙伴能够试下把任务数(for循环的个数)提升一点,让缓存等待的任务数超过5个,看看默认的任务拒绝策略(AbortPolicy)会抛出什么错误hhh
下面来看看ThreadPoolExecutor的庐山真面目吧~
它有如下四个构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
复制代码
从构造方法能够看出,前三个方法最终都是调用第四个构造器进行初始化工做的。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
复制代码
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
复制代码
从源码构造函数能够看到,不传参数的时候,默认阻塞队列中的大小是Integer.MAX_VALUE;
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
复制代码
Array和Linked在传入大小小于0时将会报错,比较经常使用的是LinkedBlockingDeque和SynchronousQueue,线程池的排队策略与BlockingQueue有关。
主要用来建立线程,能够在newThread()方法中自定义线程名字和设置线程异常状况的处理逻辑。
举个🌰:
static class MyThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread();
thread.setName("JingQ" + new Date());
thread.setUncaughtExceptionHandler((t, e) -> {
doSomething();
e.printStackTrace();
});
return thread;
}
}
复制代码
有如下四种:
能够看出,实际上ThreadPoolExecutor是继承了AbstractExecutorService类和引用了ExecutorService、Executor接口。
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
};
}
复制代码
AbstarctExecutorService是一个抽象类,它实现的是ExecutorService接口
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码
接口ExecutorService引用了Executor接口,Executor接口比较简单,只有一个execute方法定义
public interface Executor {
void execute(Runnable command);
}
复制代码
小结:
Executor是一个顶级接口,定义了一个execute方法,返回值为空,参数为Runnable。
ExecutorService继承了Executor而且定义了其它一些方法,结果以下图:
抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的全部方法。
最后ThreadPoolExecutor继承了AbstractExecutorService,咱们最经常使用到它两个方法,submit和execute,下面介绍一下这二者:
execute():
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * (如下是我的渣翻译,有误请轻喷~) * 有如下三步流程: * * 1. 若是少于核心池大小的线程正在运行, * 那么尝试以给定的命令做为它的第一个任务启动一个新线程。 * 调用添加worker原子性检查运行状态和workder的数量, * 这样能够防止错误警报在不该该返回的状况下添加线程,返回false。 * * 2. 若是一个任务能够成功地排队,那么咱们仍然须要再次检查是否应该添加一个线程 * (由于现有的线程在上次检查后死亡),或者是在该方法进入后关闭了池。 * 所以,咱们从新检查状态,若是必要的话,若是中止的话,须要回滚队列。 * 若是没有新的线程,就去启动它 * * 3. 若是咱们不能排队任务,那么咱们尝试添加一个新线程。 * 若是失败了,咱们知道任务队列已经被关闭或饱和,因此拒绝这个任务。 */
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
复制代码
submit:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
复制代码
execute()方法在ThreadPoolExecutor中进行了重写,submit()方法是在AbstractExecutorService实现的,ThreadPoolExecutor并无重写,而且execute方法是没有返回结果的,submit的返回类型是Future,可以得到任务的结果,可是实际执行的仍是execute方法。
固然,还有例如shutdown、getQueue、getActiveCount、getPoolSize等方法没有介绍到,推荐胖友们打开IDE进行查看吧~
ps:关于线程池的原理并未深刻记录,有关它的任务拒绝策略、线程初始化、ThreadPoolExecutor构造以后,当任务超过设定值,它的执行策略等原理都值得去深刻学习,下回记录~