关于做者html
郭孝星,程序员,吉他手,主要从事Android平台基础架构方面的工做,欢迎交流技术方面的问题,能够去个人Github提issue或者发邮件至guoxiaoxingse@163.com与我交流。java
文章目录android
本篇文章主要用来讨论Java中多线程并发原理与实践经验,并非一篇使用例子教程,这方面内容能够参考网上其余文章。git
线程是比进程更加轻量级的调度单位,线程的引入能够把进程的资源分配和执行调度分开,各个线程既能够共享进程资源,又能够独立调度。程序员
一般你们都会这么去解释进程与线程的区别,在文章01Android进程框架:进程的启动建立、启动与调度流程中 咱们剖析了进程的本质,咱们这里再简单回忆一下。github
关于进程本质的描述:数据库
咱们知道,代码是静态的,有代码和资源组成的系统要想运行起来就须要一种动态的存在,进程就是程序的动态执行过程。何为进程? 进程就是处理执行状态的代码以及相关资源的集合,包括代码段、文件、信号、CPU状态、内存地址空间等。数组
进程使用task_struct结构体来描述,以下所示:缓存
咱们接着来看看Java线程的建立序列图,以下所示:安全
能够看到,最终调用pthread库的pthread_create()方法建立了新的线程,该线程也使用task_struct结构体来描述,可是它没有本身独立的地址空间,而是与其所在的进程共享地址空间和资源。
因此你能够发现,对于虚拟机而言,除了是否具备独立的地址空间外,进程与线程并无本质上的区别。
咱们接着来看看线程是如何调度的。
线程状态流程图图
NEW、WAITING、TIMED_WAITING都比较好理解,咱们重点说一说RUNNABLE运行态和BLOCKED阻塞态。
线程进入RUNNABLE运行态通常分为五种状况:
线程进入BLOCKED阻塞态通常也分为五种状况:
咱们再来看看和线程状态相关的一些方法。
线程安全,一般所说的线程安全指的是相对的线程安全,它指的是对这个对象单独的操做是线程安全的,咱们在调用的时候无需作额外的保障措施。
什么叫相对安全?🤔
🤞举个栗子
咱们知道Java里的Vector是个线程安全的类,在多线程环境下对其插入、删除和读取都是安全的,但这仅限于每次只有一个线程对其操做,若是多个线程同时操做 Vector,那它就再也不是线程安全的了。
final Vector<String> vector = new Vector<>();
while (true) {
for (int i = 0; i < 10; i++) {
vector.add("项:" + i);
}
Thread removeThread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
});
Thread printThread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
Log.d(TAG, vector.get(i));
}
}
});
removeThread.start();
printThread.start();
if (Thread.activeCount() >= 20) {
return;
}
}
复制代码
可是程序却crash了
正确的作法应该是vector对象加上同步锁,以下:
final Vector<String> vector = new Vector<>();
while (true) {
for (int i = 0; i < 10; i++) {
vector.add("项:" + i);
}
Thread removeThread = new Thread(new Runnable() {
@Override
public void run() {
synchronized (vector){
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
}
});
Thread printThread = new Thread(new Runnable() {
@Override
public void run() {
synchronized (vector){
for (int i = 0; i < vector.size(); i++) {
Log.d(TAG, vector.get(i));
}
}
}
});
removeThread.start();
printThread.start();
if (Thread.activeCount() >= 20) {
return;
}
}
复制代码
volatile也是互斥同步的一种实现,不过它很是的轻量级。
volatile有两条关键的语义:
要理解volatile关键字,咱们得先从Java的线程模型开始提及。如图所示:
Java内存模型规定了全部字段(这些字段包括实例字段、静态字段等,不包括局部变量、方法参数等,由于这些是线程私有的,并不存在竞争)都存在主内存中,每一个线程会 有本身的工做内存,工做内存里保存了线程所使用到的变量在主内存里的副本拷贝,线程对变量的操做只能在工做内存里进行,而不能直接读写主内存,固然不一样内存之间也 没法直接访问对方的工做内存,也就是说主内存时线程传值的媒介。
咱们来理解第一句话:
保证被volatile修饰的变量对全部线程都是可见的
如何保证可见性?🤔
被volatile修饰的变量在工做内存修改后会被强制写回主内存,其余线程在使用时也会强制从主内存刷新,这样就保证了一致性。
关于“保证被volatile修饰的变量对全部线程都是可见的”,有种常见的错误理解:
错误理解:因为volatile修饰的变量在各个线程里都是一致的,因此基于volatile变量的运算在多线程并发的状况下是安全的。
这句话的前半部分是对的,后半部分却错了,所以它忘记考虑变量的操做是否具备原子性这一问题。
:point_up:举个栗子
private volatile int start = 0;
private void volatileKeyword() {
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
start++;
}
}
};
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(runnable);
thread.start();
}
Log.d(TAG, "start = " + start);
}
复制代码
这段代码启动了10个线程,每次10次自增,按道理最终结果应该是100,可是结果并不是如此。
为何会这样?:thinking:
仔细看一下start++,它其实并不是一个原子操做,简单来看,它有两步:
因此volatile只能保证可见性,在不符合如下场景下咱们依然须要经过加锁来保证原子性:
比方说咱们会在线程里加个boolean变量,来判断线程是否中止,这种状况就很是适合使用volatile。
咱们再来理解第二句话。
什么是指令重排序?🤔
指令重排序是值指令乱序执行,即在条件容许的状况下,直接运行当前有能力当即执行的后续指令,避开为获取下一条指令所需数据而形成的等待,经过乱序执行的技术,提供执行效率。
指令重排序绘制被volatile修饰的变量的赋值操做前,添加一个内存屏障,指令重排序时不能把后面的指令重排序的内存屏障以前的位置。
关于指令重排序不是本篇文章重点讨论的内容,更多细节能够参考指令重排序。
synchronized是互斥同步的一种实现。
synchronized:当某个线程访问被synchronized标记的方法或代码块时,这个线程便得到了该对象的锁,其余线程暂时没法访问这个方法,只有等待这个方法执行完毕或者代码块执行完毕,这个 线程才会释放该对象的锁,其余线程才能执行这个方法或代码块。
前面咱们已经说了volatile关键字,这里咱们举个例子来综合分析volatile与synchronized关键字的使用。
:point_up:举个栗子
public class Singleton {
//volatile保证了:1 instance在多线程并发的可见性 2 禁止instance在操做是的指令重排序
private volatile static Singleton instance;
public static Singleton getInstance() {
//第一次判空,保证没必要要的同步
if (instance == null) {
//synchronized对Singleton加全局所,保证每次只要一个线程建立实例
synchronized (Singleton.class) {
//第二次判空时为了在null的状况下建立实例
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}
复制代码
这是一个经典的DSL单例。
它的字节码以下:
能够看到被synchronized同步的代码块,会在先后分别加上monitorenter和monitorexit,这两个字节码都须要指定加锁和解锁的对象。
关于加锁和解锁的对象:
synchronized(this)添加的是对象锁,synchronized(ClassName.class)添加的是类锁,它们的区别以下:
对象锁:Java的全部对象都含有1个互斥锁,这个锁由JVM自动获取和释放。线程进入synchronized方法的时候获取该对象的锁,固然若是已经有线程获取了这个对象的锁,那么当前线 程会等待;synchronized方法正常返回或者抛异常而终止,JVM会自动释放对象锁。这里也体现了用synchronized来加锁的好处,方法抛异常的时候,锁仍然能够由JVM来自动释放。
类锁:对象锁是用来控制实例方法之间的同步,类锁是用来控制静态方法(或静态变量互斥体)之间的同步。其实类锁只是一个概念上的东西,并非真实存在的,它只是用来帮助咱们理 解锁定实例方法和静态方法的区别的。咱们都知道,java类可能会有不少个对象,可是只有1个Class对象,也就是说类的不一样实例之间共享该类的Class对象。Class对象其实也仅仅是1个 java对象,只不过有点特殊而已。因为每一个java对象都有1个互斥锁,而类的静态方法是须要Class对象。因此所谓的类锁,不过是Class对象的锁而已。获取类的Class对象有好几种,最简 单的就是MyClass.class的方式。 类锁和对象锁不是同一个东西,一个是类的Class对象的锁,一个是类的实例的锁。也就是说:一个线程访问静态synchronized的时候,容许另外一个线程访 问对象的实例synchronized方法。反过来也是成立的,由于他们须要的锁是不一样的。
关不一样步锁还有ReentrantLock,eentrantLockR相对于synchronized具备等待可中断、公平锁等更多功能,这里限于篇幅,再也不展开。
咱们知道线程的建立、切换与销毁都会花费比较大代价,因此很天然的咱们使用线程池来复用和管理线程。Java里的线程池咱们一般经过ThreadPoolExecutor来实现。 接下来咱们就来分析ThreadPoolExecutor的相关原理,以及ThreadPoolExecutor在Android上的应用AsyncTask。
线程池有五种运行状态,以下所示:
线程池状态图
另外,ThreadPoolExecutor是用一个AtomicInteger来记录线程池状态和线程池里的线程数量的,以下所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;// 111
private static final int SHUTDOWN = 0 << COUNT_BITS;// 000
private static final int STOP = 1 << COUNT_BITS;// 001
private static final int TIDYING = 2 << COUNT_BITS;// 010
private static final int TERMINATED = 3 << COUNT_BITS;// 110
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }//线程池状态
private static int workerCountOf(int c) { return c & CAPACITY; }//线程池当前线程数
private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码
在正式介绍线程池调度原理以前,咱们先来回忆一下Java实现任务的两个接口:
另外,还有个Future接口,它能够对Runnable、Callable执行的任务进行判断任务是否完成,中断任务以及获取任务结果的操做。咱们一般会使用它的实现类FutureTask,FutureTask是一个Future、Runnable 以及Callable的包装类。利用它能够很方便的完成Future接口定义的操做。FutureTask内部的线程阻塞是基于LockSupport来实现的。
咱们接下来看看线程池是和执行任务的。
ThreadPoolExecutor调度流程图
execute(Runnable command)
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//1. 若线程池状态是RUNNING,线程池大小小于配置的核心线程数,则能够在线程池中建立新线程执行新任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2. 若线程池状态是RUNNING,线程池大小大于配置的核心线程数,则尝试将任务插入阻塞队列进行等待
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//若插入成功,则将次检查线程池的状态是否为RUNNING,若是不是则移除当前任务并进入拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
//若是线程池中的线程数为0,即线程池中的线程都执行完毕处于SHUTDOWN状态,此时添加了一个null任务
//(由于SHUTDOWN状态再也不接受新任务)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 若没法插入阻塞队列,则尝试建立新线程,建立失败则进入拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
}
复制代码
这个其实很好理解,打个比方。咱们公司的一个小组来完成任务,
addWorker(Runnable firstTask, boolean core)
addWorker(Runnable firstTask, boolean core) 表示添加个Worker,Worker实现了Runnable接口,是对Thread的封装,该方法添加完Worker后,则调用runWorker()来启动线程。
public class ThreadPoolExecutor extends AbstractExecutorService {
private boolean addWorker(Runnable firstTask, boolean core) {
//重试标签
retry:
for (;;) {
int c = ctl.get();
//获取当前线程池状态
int rs = runStateOf(c);
//如下状况表示再也不接受新任务:1 线程池没有处于RUNNING状态 2 要执行的任务为空 3 阻塞队列已满
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//获取线程池当前的线程数
int wc = workerCountOf(c);
//若是超出容量,则再也不接受新任务,core表示是否使用corePoolSize做为比较标准
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//增长线程数
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//若是线程池状态发生变化,从新开始循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//线程数增长成功,开始添加新线程,Worker是Thread的封装类
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将新启动的线程添加到线程池中
workers.add(w);
//更新线程池中线程的数量,注意这个数量不能超过largestPoolSize
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//调用runWorker()方法,开始执行线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
}
复制代码
runWorker(Worker w)
runWorker()方法是整个阻塞队列的核心循环,在这个循环中,线程池会不断的从阻塞队列workerQueue中取出的新的task并执行。
public class ThreadPoolExecutor extends AbstractExecutorService {
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//从阻塞队列中不断取出任务,若是取出的任务为空,则循环终止
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//该方法为空,能够从新次方法,在任务执行开始前作一些处理
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//该方法为空,能够从新次方法,在任务执行结束后作一些处理
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
//从阻塞队列workerQueue中取出Task
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//循环
for (;;) {
int c = ctl.get();
//获取线程池状态
int rs = runStateOf(c);
//如下状况中止循环:1 线程池状态不是RUNNING(>= SHUTDOWN)2 线程池状态>= STOP 或者阻塞队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//递减workCount
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判断线程的IDLE超时机制是否生效,有两种状况:1 allowCoreThreadTimeOut = true,这是能够手动
//设置的 2 当前线程数大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//根据timed来决定是以poll超时等待的方式仍是以take()阻塞等待的方式从阻塞队列中获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
}
复制代码
因此你能够理解了,runWorker()方法是在新建立线程的run()方法里的,而runWorker()又不断的调用getTask()方法去获取阻塞队列里的任务,这样就实现了线程的复用。
咱们先来看看ThreadPoolExecutor的构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 复制代码
那么这些参数咱们应该怎么配置呢?要合理配置线程池就须要先了解咱们的任务特性,通常说来:
咱们根据这些属性来一一分析这些参数的配置。
首先就是核心线程数corePoolSize与最大线程数maximumPoolSize。这个的配置咱们一般要考虑CPU同时执行线程的阈值。一旦超过这个阈值,CPU就须要花费不少 时间来完成线程的切换与调度,这样会致使性能大幅下滑。
/** * CPU核心数,注意该方法并不可靠,它返回的有可能不是真实的CPU核心数,由于CPU在某些状况下会对某些核 * 心进行睡眠处理,这种状况返回的知识已激活的CPU核心数。 */
private static final int NUMBER_OF_CPU = Runtime.getRuntime().availableProcessors();
/** * 核心线程数 */
private static final int corePoolSize = Math.max(2, Math.min(NUMBER_OF_CPU - 1, 4));
/** * 最大线程数 */
private static final int maximumPoolSize = NUMBER_OF_CPU * 2 + 1;
复制代码
至于keepAliveTime,该参数描述了线程不活动时存活的时间,若是是CPU密集型任务,则将时间设置的小一些,若是是IO密集型或者数据库链接任务,则将时间设置的长一些。
咱们再来看看BlockingQueue参数的配置。BlockingQueue用来描述阻塞队列。它的方法以四种形式存在,以此来知足不一样需求。
抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|
add(e) | offer(e) | put(e) | offer(e, time, unit) |
remove() | poll() | take() | poll(time, unit) |
element() | peek() | 不可用 | 不可用 |
它有如下特色:
它的实现类有:
工做窃取:例若有两个队列A、B,各自干本身的活,可是A效率比较高,很快把本身的活干完了,因而勤快的A就会去窃取B的任务来干,这是A、B会访问同一个队列,为了减小A、B的竞争,规定窃取者A 只从双端队列的尾部拿任务,被窃取者B只从双端队列的头部拿任务。
咱们最后来看看RejectedExecutionHandler参数的配置。
RejectedExecutionHandler用来描述线程数大于或等于线程池最大线程数时的拒绝策略,它的实现类有:
另外,Executors提供了一系列工厂方法用来建立线程池。这些线程是适用于不一样的场景。
ThreadPoolExecutor里提供了一些空方法,咱们能够经过继承ThreadPoolExecutor,复写这些方法来实现对线程池的监控。
public class ThreadPoolExecutor extends AbstractExecutorService {
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
}
复制代码
常见的监控指标有:
AsyncTask基于ThreadPoolExecutor实现,内部封装了Thread+Handler,多用来执行耗时较短的任务。
一个简单的AsyncTask例子
public class AsyncTaskDemo extends AsyncTask<String, Integer, String> {
/** * 在后台任务开始执行以前调用,用于执行一些界面初始化操做,例如显示一个对话框,UI线程。 */
@Override
protected void onPreExecute() {
super.onPreExecute();
}
/** * 执行后台线程,执行完成能够经过return语句返回,worker线程 * * @param strings params * @return result */
@Override
protected String doInBackground(String... strings) {
return null;
}
/** * 更新进度,UI线程 * * @param values progress */
@Override
protected void onProgressUpdate(Integer... values) {
super.onProgressUpdate(values);
}
/** * 后台任务执行完成并经过return语句返回后会调用该方法,UI线程。 * * @param result result */
@Override
protected void onPostExecute(String result) {
super.onPostExecute(result);
}
/** * 后台任务呗取消后回调 * * @param reason reason */
@Override
protected void onCancelled(String reason) {
super.onCancelled(reason);
}
/** * 后台任务呗取消后回调 */
@Override
protected void onCancelled() {
super.onCancelled();
}
}
复制代码
AsyncTask的使用很是的简单,接下来咱们去分析AsyncTask的源码实现。
AsyncTask流程图
AsyncTask源码的一开始就是个建立线程池的流程。
public abstract class AsyncTask<Params, Progress, Result> {
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
//核心线程数,最少2个,最多4个
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
//线程不活动时的存活时间是30s
private static final int KEEP_ALIVE_SECONDS = 30;
//线程构建工厂,指定线程的名字
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
//一个由链表结构组成的无界阻塞队列
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
public static final Executor THREAD_POOL_EXECUTOR;
//构建线程池
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
}
复制代码
另外,咱们能够经过AsyncTask.executeOnExecutor(Executor exec, Params... params) 来自定义线程池。
咱们再来看看构造方法。
public abstract class AsyncTask<Params, Progress, Result> {
//构造方法须要在UI线程里调用
public AsyncTask() {
//建立一个Callable对象,WorkerRunnable实现了Callable接口
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};
//建立一个FutureTask对象,该对象用来接收mWorker的结果
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
//将执行的结果经过发送给Handler处理,注意FutureTask的get()方法会阻塞直至结果返回
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
//内部的Handler
private static class InternalHandler extends Handler {
public InternalHandler() {
//UI线程的Looper
super(Looper.getMainLooper());
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
//返回结果
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
//返回进度
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
}
复制代码
能够看到当咱们调用AsyncTask的构造方法时,就建立了一个FutureTask对象,它内部包装了Callable对象(就是咱们要执行的任务),并在FutureTask对象的done()方法里 将结果发送给Handler。
接着看看执行方法execute()。
public abstract class AsyncTask<Params, Progress, Result> {
//须要在UI线程里调用
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
//任务执行前的处理,咱们能够复写次方法
onPreExecute();
mWorker.mParams = params;
//执行任务,exec为sDefaultExecutor
exec.execute(mFuture);
return this;
}
}
复制代码
接着看看这个sDefaultExecutor。
能够看到sDefaultExecutor是个SerialExecutor对象,SerialExecutor实现了Executor接口。
public abstract class AsyncTask<Params, Progress, Result> {
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
private static class SerialExecutor implements Executor {
//任务队列
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
//当前执行的任务
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
//开始执行任务
scheduleNext();
}
}
protected synchronized void scheduleNext() {
//取出队列头的任务开始执行
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
}
复制代码
因此咱们没调用一次AsyncTask.execute()方法就将FutureTask对象添加到队列尾部,而后会从队列头部取出任务放入线程池中执行,因此你能够看着这是一个串行执行器。
在Okhttp的任务调度器Dispatcher里有关于线程池的配置
public final class Dispatcher {
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
}
复制代码
你能够看到它的配置:
这实际上是Excutors.newCachedThreadPool()缓存池的实现。总结来讲就是新任务过来进入SynchronousQueue,它是一个单工模式的队列,只传递任务,不存储任务,而后就建立 新线程执行任务,线程不活动的存活时间为60s。
Okhttp请求流程图
在发起网络请求时,每一个请求执行完成后都会调用client.dispatcher().finished(this)。
final class RealCall implements Call {
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//异步请求
client.dispatcher().finished(this);
}
}
}
}
复制代码
咱们来看看client.dispatcher().finished(this)这个方法。
public final class Dispatcher {
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//将已经结束的请求call移除正在运行的队列calls
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//异步请求promoteCalls为true
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
private void promoteCalls() {
//当前异步请求数大于最大请求数,不继续执行
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
//异步等待队列为空,不继续执行
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
//遍历异步等待队列
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
//若是没有超过相同host的最大请求数,则复用当前请求的线程
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
//运行队列达到上限,也再也不执行
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
}
复制代码
因此你能够看到Okhttp不是用线程池来控制线程个数,线程池里的线程执行的都是正在运行请请求,控制线程的是Dispatcher,Dispatcher.promoteCalls()方法经过 最大请求数maxRequests和相同host最大请求数maxRequestsPerHost来控制异步请求不超过两个最大值,在值范围内不断的将等待队列readyAsyncCalls中的请求添加 到运行队列runningAsyncCalls中去。