在以前的初Android线程池文章中,因为时间缘由,只是对Android中的线程池作了一个初步的了解。那篇文章中初步了解了线程池的做用、为何使用线程池、ThreadPoolExecutor
类及其构造方法中的参数含义,最后还初步总结了一下线程池的运行过程。这篇文章将同过示例和源码分析的角度对线程池的运行原理进行深刻分析。Let's go!!!html
在进行今天的探究以前,咱们仍是先了解几个小的知识点。java
/** * The <code>Runnable</code> interface should be implemented by any * class whose instances are intended to be executed by a thread. The * class must define a method of no arguments called <code>run</code>. * <p> * This interface is designed to provide a common protocol for objects that * wish to execute code while they are active. For example, * <code>Runnable</code> is implemented by class <code>Thread</code>. * Being active simply means that a thread has been started and has not * yet been stopped. * <p> * In addition, <code>Runnable</code> provides the means for a class to be * active while not subclassing <code>Thread</code>. A class that implements * <code>Runnable</code> can run without subclassing <code>Thread</code> * by instantiating a <code>Thread</code> instance and passing itself in * as the target. In most cases, the <code>Runnable</code> interface should * be used if you are only planning to override the <code>run()</code> * method and no other <code>Thread</code> methods. * This is important because classes should not be subclassed * unless the programmer intends on modifying or enhancing the fundamental * behavior of the class. * * @author Arthur van Hoff * @see java.lang.Thread * @see java.util.concurrent.Callable * @since JDK1.0 */
@FunctionalInterface
public interface Runnable {
/** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */
public abstract void run();
}
复制代码
从注释中咱们能够看到,若是一个实体对象想被线程执行,就必须实现这个接口,同时还要实现一个无参的run
方法。编程
/** * A task that returns a result and may throw an exception. * Implementors define a single method with no arguments called * {@code call}. * * <p>The {@code Callable} interface is similar to {@link * java.lang.Runnable}, in that both are designed for classes whose * instances are potentially executed by another thread. A * {@code Runnable}, however, does not return a result and cannot * throw a checked exception. * * <p>The {@link Executors} class contains utility methods to * convert from other common forms to {@code Callable} classes. * * @see Executor * @since 1.5 * @author Doug Lea * @param <V> the result type of method {@code call} */
@FunctionalInterface
public interface Callable<V> {
/** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */
V call() throws Exception;
}
复制代码
Callable
中有返回值而且能够抛出异常,它于Runnable
类似,都是为多线程准备的。可是,后者不能有返回值,也不能在其内部自动抛出异常。
好了,看到这里咱们来总结一下它们的异同。缓存
一、它们两个都是接口(虽然这是一句废话)。
二、它们两个都能编写多线程(感受又是一句废话)。
三、Callable
中的方法有返回值,Runnable
中的则没有。
四、Callable
中的方法中可以自动抛出异常,Runnable
中须要手动抛出异常。
多线程
说了这么多感受仍是有点抽象,咱们来看一下用法吧。并发
public abstract class RunnableImp implements Runnable{}
复制代码
public class CallableImp implements Callable<String> {
@Override
public String call() throws Exception {
return "我是Callable";
}
}
复制代码
Handler mHandler = new Handler() {
@Override
public void handleMessage(@NonNull Message msg) {
switch (msg.what) {
case 1:
tv.setText("");
tv.setText(msg.obj.toString());
break;
}
}
};
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_second);
Button btn_runable = findViewById(R.id.btn_runable);
Button btn_callable = findViewById(R.id.btn_callable);
tv = findViewById(R.id.tv);
btn_runable.setOnClickListener(this);
btn_callable.setOnClickListener(this);
}
@Override
public void onClick(View v) {
switch (v.getId()) {
case R.id.btn_runable:
executeRunnable();
break;
case R.id.btn_callable:
executeCallable();
break;
}
}
public void executeRunnable() {
RunnableImp runnableImp = new RunnableImp() {
@Override
public void run() {
Message message = Message.obtain();
message.what = 1;
message.obj = "我是Runnable";
mHandler.handleMessage(message);
}
};
Thread thread = new Thread(runnableImp);
thread.start();
}
public void executeCallable() {
FutureTask<String> futureTask = new FutureTask<>(new CallableImp());
new Thread(futureTask).start();
try {
Message message = Message.obtain();
message.what = 1;
message.obj = futureTask.get();
mHandler.handleMessage(message);
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
复制代码
第一部分代码:建立一个抽象类实现
Runnable
接口。
第一部分代码:建立一个抽象类实现Callable
接口,同时重写call()
方法,返回一个String
类型的值。
第三部分代码:建立一个Activity
,界面中有两个按钮,分别使Runnable
和Callcal
的方式执行多线程,最后经过Handler
将数据返回到主线程中改变界面。
app
上面的代码没有什么难处,相信都能看得懂,咱们来看一下运行结果。 less
咱们在看上面多线程的例子时,发现线程能够和FutureTask
结合起来,那么这个类究竟时干什么的呢?
dom
/** * A cancellable asynchronous computation. This class provides a base * implementation of {@link Future}, with methods to start and cancel * a computation, query to see if the computation is complete, and * retrieve the result of the computation. The result can only be * retrieved when the computation has completed; the {@code get} * methods will block if the computation has not yet completed. Once * the computation has completed, the computation cannot be restarted * or cancelled (unless the computation is invoked using * {@link #runAndReset}). * * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or * {@link Runnable} object. Because {@code FutureTask} implements * {@code Runnable}, a {@code FutureTask} can be submitted to an * {@link Executor} for execution. * * <p>In addition to serving as a standalone class, this class provides * {@code protected} functionality that may be useful when creating * customized task classes. * * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this FutureTask's {@code get} methods */
public class FutureTask<V> implements RunnableFuture<V> {
...
}
复制代码
我们呢先不着急看里面的代码,先瞅两眼注释,从注释中咱们能获取如下几个信息。异步
一、它能够取消的异步运算。
二、它Future
的实现类(我擦,Future
是什么东东?无论了,等下再看)。
三、调用方法来让其运行,也能取消运行。
四、只有当运行结束时才能看到运行的结果,当运行没有结束时get()
方法会被阻塞掉。
五、一旦运行结束,将不能从新启动或者取消,除非调用runAndReset()
方法
六、因为其实现了Runnable
接口,Futuretask
能够被看做Callable
或者Runnable
的包装类,能够在Executor
类中执行提交。
咱们看到FutureTask
实现了RunnableFuture
接口,咱们看一下它是干什么的。
/** * A {@link Future} that is {@link Runnable}. Successful execution of * the {@code run} method causes completion of the {@code Future} * and allows access to its results. * @see FutureTask * @see Executor * @since 1.6 * @author Doug Lea * @param <V> The result type returned by this Future's {@code get} method */
public interface RunnableFuture<V> extends Runnable, Future<V> {
/** * Sets this Future to the result of its computation * unless it has been cancelled. */
void run();
}
复制代码
从上面咱们能够看到RunnableFuture
也一个接口,同时它继承自Runnable
和Future
。执行run
方法时可让Future
完成运行,而且还能获取到运行的结果。
看到这里,估计仍是不知道这些讲的都是啥。别着急,让咱们继续往下看。
Runnable
咱们已经看过了,让咱们看一个Future
。
/** * A {@code Future} represents the result of an asynchronous * computation. Methods are provided to check if the computation is * complete, to wait for its completion, and to retrieve the result of * the computation. The result can only be retrieved using method * {@code get} when the computation has completed, blocking if * necessary until it is ready. Cancellation is performed by the * {@code cancel} method. Additional methods are provided to * determine if the task completed normally or was cancelled. Once a * computation has completed, the computation cannot be cancelled. * If you would like to use a {@code Future} for the sake * of cancellability but not provide a usable result, you can * declare types of the form {@code Future<?>} and * return {@code null} as a result of the underlying task. * * <p> * <b>Sample Usage</b> (Note that the following classes are all * made-up.) * * <pre> {@code * interface ArchiveSearcher { String search(String target); } * class App { * ExecutorService executor = ... * ArchiveSearcher searcher = ... * void showSearch(final String target) * throws InterruptedException { * Future<String> future * = executor.submit(new Callable<String>() { * public String call() { * return searcher.search(target); * }}); * displayOtherThings(); // do other things while searching * try { * displayText(future.get()); // use future * } catch (ExecutionException ex) { cleanup(); return; } * } * }}</pre> * * The {@link FutureTask} class is an implementation of {@code Future} that * implements {@code Runnable}, and so may be executed by an {@code Executor}. * For example, the above construction with {@code submit} could be replaced by: * <pre> {@code * FutureTask<String> future = * new FutureTask<>(new Callable<String>() { * public String call() { * return searcher.search(target); * }}); * executor.execute(future);}</pre> * * <p>Memory consistency effects: Actions taken by the asynchronous computation * <a href="package-summary.html#MemoryVisibility"> <i>happen-before</i></a> * actions following the corresponding {@code Future.get()} in another thread. * * @see FutureTask * @see Executor * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this Future's {@code get} method */
public interface Future<V> {
//在任务执行完以前可否取消任务,返回false说明任务已经执行完毕,反之返回true。
boolean cancel(boolean mayInterruptIfRunning);
//判断任务完成以前是否被取消,返回true表示任务完成以前被取消。
boolean isCancelled();
//任务完成以后返回true。
boolean isDone();
//当任务执行结束以后能够获取执行结果。
V get() throws InterruptedException, ExecutionException;
//在给定的时间内等待任务的完成,若是等待时任务完成返回执行结果,不然将会抛出TimeoutException异常。
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码
从类的注释咱们能够看到,Future
是一个接口,他提供给了咱们方法来检测当前的任务是否已经结束,还能够等待任务结束而且拿到一个结果,经过调用Future
的get()
方法能够当任务结束后返回一个结果值,若是工做没有结束,则会阻塞当前线程,直到任务执行完毕,咱们能够经过调用cancel()
方法来中止一个任务,若是任务已经中止,则cancel()
方法会返回true
;若是任务已经完成或者已经中止了或者这个任务没法中止,则cancel()
会返回一个false
。当一个任务被成功中止后,他没法再次执行。isDone()
和isCancel()
方法能够判断当前工做是否完成和是否取消。
对于Future
和FutureTask
之间的关系,咱们能够总结成一张图。
Callable
和
Runnable
例子时,咱们也用到了
FutureTask
进行多线程操做,而且它于
Runnable
方式相比较来看,
FutureTask
能够经过
get()
方法获取线程的执行结果,而普通的方式则不能。同时,
FuturTask
在多线程任务执行的过程当中,能够对任务作一系列操做,例如:取消、查看任务是否执行完毕等等。
若是想了解更多有关Future和FutureTask等相关知识能够参考如下几篇文章:
Java并发编程:Callable、Future和FutureTask原理解析
Java FutureTask 源码分析 Android上的实现
FutureTask的用法及两种经常使用的使用场景
说了这么多,咱们来动手写几个例子吧,毕竟光说不练假把式。
public class ExecuteTask implements Runnable {
private int taskNum;
public ExecuteTask(int taskNum) {
this.taskNum = taskNum;
}
@Override
public void run() {
Log.i("ThreadPoolDemoExecute", "当前执行task" + taskNum);
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.i("ThreadPoolDemoExecute", "task:" + taskNum + "执行完毕");
}
}
public void execute() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2
, 4
, 200
, TimeUnit.SECONDS
, new ArrayBlockingQueue<Runnable>(5));
for (int i = 0; i < 8; i++) {
ExecuteTask task = new ExecuteTask(i);
executor.execute(task);
Log.i("ThreadPoolDemoExecute", "线程池中线程数目:" + executor.getPoolSize());
Log.i("ThreadPoolDemoExecute", "线程池中等待执行的任务数:" + executor.getQueue().size());
Log.i("ThreadPoolDemoExecute", "已经执行完的任务数:" + executor.getCompletedTaskCount());
}
executor.shutdown();
}
复制代码
在这个例子中首先建立了一个ExecuteTask
类,使其实现Runnable
接口。在run
方法中让当前线程sleep
2秒,至关因而执行时间。
而后使用ThreadPoolExecutor
建立线程池,因为以前文章已经对这几个参数作了解释,这里就再也不讲解。
最后经过循环的方式让建立ExecuteTask
对象,并调用ThreadPoolExecutor
的execute
方法。让咱们看一下运行结果。
public class Data {
private String name;
public Data(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
public class SubmitTask implements Runnable {
private Data data;
public SubmitTask(Data data) {
this.data = data;
}
@Override
public void run() {
Log.i("ThreadPoolDemoSubmit", "当前执行" + data.getName());
}
}
public void submit() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2
, 4
, 200
, TimeUnit.SECONDS
, new ArrayBlockingQueue<Runnable>(5));
for (int i = 0; i < 8; i++) {
Data data = new Data("任务" + i);
SubmitTask task = new SubmitTask(data);
Future<Data> future = executor.submit(task, data);
try {
Log.i("ThreadPoolDemoSubmit", future.get().getName() + "执行完毕");
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
executor.shutdown();
}
复制代码
在使用这种方式时,仍是先建立了SubmitTask
类,同时也让它实现了Runnable
接口,重写run
方法。
其次仍是建立出线程池对象,而后经过循环的方式建立Data
对象,同时也建立相应的SubmitTask
对象,并将Data
对象放入到SubmitTask
对象中去。
最后,调用submit
方法传入SubmitTask
和Data
对象,这个方法将会返回一个Future
对象,在以前讲解Future
时知道能够经过get()
方法获取任务的执行结果,运行结果以下图。
在第一个例子中咱们使用的是execute方法向线程池中增长任务,那么咱们就看一下这方法的源码。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE – 3; //32-3=29,线程数量所占位数
private static final int CAPACITY = (1 << COUNT_BITS) – 1;//低29位表示最大线程数,229-1
//五种线程池状态
private static final int RUNNING = -1 << COUNT_BITS; //int型变量高3位(含符号位)101表RUNING
private static final int SHUTDOWN = 0 << COUNT_BITS; //高3位000
private static final int STOP = 1 << COUNT_BITS; //高3位001
private static final int TIDYING = 2 << COUNT_BITS; //高3位010
private static final int TERMINATED = 3 << COUNT_BITS; //高3位011
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
int c = ctl.get(); //1
if (workerCountOf(c) < corePoolSize) { //2
if (addWorker(command, true)) //3
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //4
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //5
reject(command);
else if (workerCountOf(recheck) == 0) //6
addWorker(null, false);
}
else if (!addWorker(command, false)) //7
reject(command);
}
复制代码
注释上说添加的过程主要分为三个步骤:
一、若是当前线程数小于核心线程数,则尝试新开一个线程给这个任务。调用
addWorker
方法时经过原子性检查当前线程池的运行状态和任务个数,经过返回false
来避免在不该该添加线程的地方出现错误。
二、若是一个任务能够被成功的添加到队列中去,但仍然须要第二次检查是否真的须要添加一个线程(存在上次检查以后有一个线程死掉的情况,或者进入这个方法时线程池已经关闭)。判断是否有必要须要回滚或者新建一个线程。
三、若是不能将任务放入任务栈中,仍然会仙剑一个线程。当失败时咱们就能知道线程池已经关闭或者线程池已经达到上限了,此时将会执行拒接策略。
说了这么多,其实就是在执行execute()
方法时作了多重的判断操做,而且屡次尝试为任务新建线程将其放入到任务栈,若是实在放不进去说明线程池已经满了或者已经再也不工做了。
上述只是关于添加任务执行过程的大体说明,下面咱们将看一下代码细节,可是在看代码细节以前仍是看一下有关线程池的几种状态。
RUNNING:能接受新提交的任务,而且也能处理阻塞队列中的任务
SHUTDOWN:关闭状态,再也不接受新提交的任务,但却能够继续处理阻塞队列中已保存的任务。在线程池处于
RUNNING
状态时,调用shutdown()
方法会使线程池进入到该状态。(finalize()
方法在执行过程当中也会调用shutdown()
方法进入该状态)
STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于
RUNNING
或SHUTDOWN
状态时,调用shutdownNow()
方法会使线程池进入到该状态
TIDYING:若是全部的任务都已终止了,
workerCount
(有效线程数) 为0,线程池进入该状态后会调用terminated()
方法进入TERMINATED
状态
TERMINATED:
在terminated()
方法执行完后进入该状态,默认terminated()
方法中什么也没有作
代码细节对应注释
注释1:经过
ctl.get()
方法获取一个值,这个值中包含了当前线程池的状态和有效线程数。
注释2:看看当前线程数是否小于核心线程数。
注释3:若是当前线程数小于核心线程数,会调用addWork
方法将任务传入,同时传入一个true
值表示建立核心线程。
注释4:若是当前线程数不小于核心线程数时,这时要判断当线程池是否在运行,同时还要判断任务栈是否能成功插入任务。
注释5:在知足注释4的条件后,还会从新检查当前线程池的运行状态,若是不是就将任务从任务栈中移除,同时执行拒绝策略。
注释6:若是当先线程池处于运行状态,而且当前线程池的线程数为0,这时将会调用addWork
方法,但此时传入的firstTask
为空,而且也不是核心线程。
注释7:再次执行了addWorker
方法而且将任务传入,同时建立的不是核心线程,若是此次添加任务仍是失败,将会执行拒绝策略。
execute
方法的执行流程咱们已经看完了,其中向线程池中插入线程的核心方法是addWorker(Runnable firstTask, boolean core)
方法,咱们再来看一下这个方法。
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1 查看线程池是否能够接收新的任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 2 看一下当前线程的数量是否超过了容量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 3 调用compareAndIncrementWorkerCount方法使用CAS方法增长线程
// 若是增长失败,须要跳出从新执行
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 4 相间一个Worker对象,并将任务封装到该对象中去
// 经过线程工厂为Worker建立一个独有的线程
w = new Worker(firstTask);
// 5 获取这个线程
final Thread t = w.thread;
if (t != null) {
// 7 获取重入锁,并锁住
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 8 判断线程池处于运行状态,或者线程池关闭且任务线程为空
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 9 若是线程已经启动,须要抛出异常
if (t.isAlive())
throw new IllegalThreadStateException();
// 10 private final HashSet<Worker> wokers = new HashSet<Worker>();
// 包含线程池中全部的工做线程,只有在获取了全局的时候才能访问它。
// 将新构造的工做线程加入到工做线程集合中。
workers.add(w);
// 11 设置largestPoolSize和workerAdded
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 12 解锁
mainLock.unlock();
}
// 13 添加成功执行线程
if (workerAdded) {
//在被构造为Worker工做线程,且被加入到工做线程集合中后,执行线程任务,
//注意这里的start实际上执行Worker中run方法,因此接下来分析Worker的run方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
...
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
// 经过线程工厂建立线程
this.thread = getThreadFactory().newThread(this);
}
//执行任务
public void run() {
runWorker(this);
}
...
}
private void addWorkerFailed(Worker w) {
// 获取锁对象
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将任务移除
if (w != null)
workers.remove(w);
// 减小任务个数
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
复制代码
对这个方法的分析都在代码中的注释,同时在这里跟你们提一下关于CAS的相关知识,你们能够参考Java CAS 原理剖析这篇文章。
execute方法已经分析完毕了,仍是用那副经典图来做为总结。
咱们以前也使用了submit
方法举例,因此接下来咱们来看一下ThreadPoolExecutor
的submit
方法。
public abstract class AbstractExecutorService implements ExecutorService {
...
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
...
}
复制代码
可是当咱们点进去看时,这里又到了AbstractExecutorService
类中。到这里咱们从前面的线程池类的继承结构中知道Executor
接口中是没有submit方法的,它只是定义了一个execute
方法,而是在ExecutorService
接口中定义了submit
方法,AbstractExecutorService
又实现了ExecutorService
接口,同时实现了submit
方法。最后ThreadPoolExecutor
又继承了AbstractExecutorService
类。
最终在执行AbstractExecutorService.submit
方法时又调用了子类ThreadPoolExecutor````中的
execute方法,这种写法实际上是[模板方法模式](https://www.cnblogs.com/yulinfeng/p/5902164.html):
AbstractExecutorService.submit方法只是一个架子,具体的实现交由子类
ThreadPoolExecutor.execute```方法来实现。
这个方法有三个重载,其中咱们在举例时是用的第二种。
第一种方法:单独传入一个
Runnable
可是没有传入返回值类型,在返回的RunnableFuture
对象的泛型时传入的是一个Void
,也就是说这种方法没法获取线程执行的返回值。
第二种方法:传入一个
Runnable
对象,同时传入了一个返回值的载体,经过载体间接获取线程执行的结果。
第三种方法:传入一个
Callable
方法,同时定义了返回值的类型。以前也讲过Callable
的实现对象中须要重写call
方法,在call
方法中有返回值,这个返回值最后被定义给了RunnableFuture
对象,经过get
方法能够获取到。
上面讲的都是咱们手动建立线程池,下面咱们来介绍一个类,它能够帮咱们自动建立一个线程池,它就是Executors
。
/** * Factory and utility methods for {@link Executor}, {@link * ExecutorService}, {@link ScheduledExecutorService}, {@link * ThreadFactory}, and {@link Callable} classes defined in this * package. This class supports the following kinds of methods: * * <ul> * <li>Methods that create and return an {@link ExecutorService} * set up with commonly useful configuration settings. * <li>Methods that create and return a {@link ScheduledExecutorService} * set up with commonly useful configuration settings. * <li>Methods that create and return a "wrapped" ExecutorService, that * disables reconfiguration by making implementation-specific methods * inaccessible. * <li>Methods that create and return a {@link ThreadFactory} * that sets newly created threads to a known state. * <li>Methods that create and return a {@link Callable} * out of other closure-like forms, so they can be used * in execution methods requiring {@code Callable}. * </ul> * * @since 1.5 * @author Doug Lea */
public class Executors {
...
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
...
}
复制代码
这个类至关于一个工厂,只须要咱们调用其静态方法,就能为咱们建立相应的线程池。这个类中有不少用于建立线程池的静态方法,这里只是找了5个比较具备表明性的方法,咱们来分析一下。
newFixedThreadPool
:建立一个特定长度的线程池,能够控制并发的数量,超过长度的线程在队列中等待。
newSingleThreadExecutor
:建立一个单线程的线程池,而且只用这一个线程执行任务。
newCachedThreadPool
:建立一个可缓存的线程池,这个线程池能够是无限大。
newScheduledThreadPool
:建立一个定长的线程池,支持定时或周期性的执行任务。
newWorkStealingPool
:建立一个多任务的线程池,减小链接数,适用于耗时的操做。
这个类帮助咱们自动建立的确很方便,可是也存在一些问题:
newCachedThreadPool
方法中传入的最大线程数为Integer.MAX_VALUE
,可是核心线程数为0,因此短期内大量任务进来后,会建立有不少线程池对象,在资源有限的状况下容易致使OOM
。
newFixedThreadPool
和newSingleThreadExecutor
使用的任务队列为LinkedBlockingQueue
,去看它的源码能够知道,它的队列长度是Integer.MAX_VALUE
,因此这两个方法也存在同newCachedThreadPool
同样的问题。
有关Android
线程池相关的分析到这里就告一段落了,若是有哪里写的不对的地方,还请各位大佬批评指正,本人将不胜感激。
Java并发编程:Callable、Future和FutureTask原理解析
Java FutureTask 源码分析 Android上的实现
FutureTask的用法及两种经常使用的使用场景
Java CAS 原理剖析
模板方法模式