Android进程框架:线程与线程池

关于做者html

郭孝星,程序员,吉他手,主要从事Android平台基础架构方面的工做,欢迎交流技术方面的问题,能够去个人Github提issue或者发邮件至guoxiaoxingse@163.com与我交流。java

文章目录android

  • 一 线程原理
    • 1.1 线程建立
    • 1.2 线程调度
  • 二 线程同步
    • 2.1 volatile
    • 2.2 synchronized
  • 三 线程池
    • 3.1 线程池调度
    • 3.2 线程池配置
    • 3.1 线程池监控
  • 四 线程池应用
    • 4.1 AsyncTask
    • 4.2 Okhttp

本篇文章主要用来讨论Java中多线程并发原理与实践经验,并非一篇使用例子教程,这方面内容能够参考网上其余文章。git

一 线程原理

1.1 线程建立

线程是比进程更加轻量级的调度单位,线程的引入能够把进程的资源分配和执行调度分开,各个线程既能够共享进程资源,又能够独立调度。程序员

一般你们都会这么去解释进程与线程的区别,在文章01Android进程框架:进程的启动建立、启动与调度流程中 咱们剖析了进程的本质,咱们这里再简单回忆一下。github

关于进程本质的描述:数据库

咱们知道,代码是静态的,有代码和资源组成的系统要想运行起来就须要一种动态的存在,进程就是程序的动态执行过程。何为进程? 进程就是处理执行状态的代码以及相关资源的集合,包括代码段、文件、信号、CPU状态、内存地址空间等。数组

进程使用task_struct结构体来描述,以下所示:缓存

  • 代码段:编译后造成的一些指令
  • 数据段:程序运行时须要的数据
    • 只读数据段:常量
    • 已初始化数据段:全局变量,静态变量
    • 未初始化数据段(bss):未初始化的全局变量和静态变量
  • 堆栈段:程序运行时动态分配的一些内存
  • PCB:进程信息,状态标识等

咱们接着来看看Java线程的建立序列图,以下所示:安全

能够看到,最终调用pthread库的pthread_create()方法建立了新的线程,该线程也使用task_struct结构体来描述,可是它没有本身独立的地址空间,而是与其所在的进程共享地址空间和资源。

因此你能够发现,对于虚拟机而言,除了是否具备独立的地址空间外,进程与线程并无本质上的区别。

咱们接着来看看线程是如何调度的。

1.2 线程调度

线程状态流程图图

  • NEW:建立状态,线程建立以后,可是还未启动。
  • RUNNABLE:运行状态,处于运行状态的线程,但有可能处于等待状态,例如等待CPU、IO等。
  • WAITING:等待状态,通常是调用了wait()、join()、LockSupport.spark()等方法。
  • TIMED_WAITING:超时等待状态,也就是带时间的等待状态。通常是调用了wait(time)、join(time)、LockSupport.sparkNanos()、LockSupport.sparkUnit()等方法。
  • BLOCKED:阻塞状态,等待锁的释放,例如调用了synchronized增长了锁。
  • TERMINATED:终止状态,通常是线程完成任务后退出或者异常终止。

NEW、WAITING、TIMED_WAITING都比较好理解,咱们重点说一说RUNNABLE运行态和BLOCKED阻塞态。

线程进入RUNNABLE运行态通常分为五种状况:

  • 线程调用sleep(time)后查出了休眠时间
  • 线程调用的阻塞IO已经返回,阻塞方法执行完毕
  • 线程成功的获取了资源锁
  • 线程正在等待某个通知,成功的得到了其余线程发出的通知
  • 线程处于挂起状态,而后调用了resume()恢复方法,解除了挂起。

线程进入BLOCKED阻塞态通常也分为五种状况:

  • 线程调用sleep()方法主动放弃占有的资源
  • 线程调用了阻塞式IO的方法,在该方法返回前,该线程被阻塞。
  • 线程视图得到一个资源锁,可是该资源锁正被其余线程锁持有。
  • 线程正在等待某个通知
  • 线程调度器调用suspend()方法将该线程挂起

咱们再来看看和线程状态相关的一些方法。

  • sleep()方法让当前正在执行的线程在指定时间内暂停执行,正在执行的线程能够经过Thread.currentThread()方法获取。
  • yield()方法放弃线程持有的CPU资源,将其让给其余任务去占用CPU执行时间。但放弃的时间不肯定,有可能刚刚放弃,立刻又得到CPU时间片。
  • wait()方法是当前执行代码的线程进行等待,将当前线程放入预执行队列,并在wait()所在的代码处中止执行,知道接到通知或者被中断为止。该方法可使得调用该方法的线程释放共享资源的锁, 而后从运行状态退出,进入等待队列,直到再次被唤醒。该方法只能在同步代码块里调用,不然会抛出IllegalMonitorStateException异常。
  • wait(long millis)方法等待某一段时间内是否有线程对锁进行唤醒,若是超过了这个时间则自动唤醒。
  • notify()方法用来通知那些可能等待该对象的对象锁的其余线程,该方法能够随机唤醒等待队列中等同一共享资源的一个线程,并使该线程退出等待队列,进入可运行状态。
  • notifyAll()方法能够是全部正在等待队列中等待同一共享资源的所有线程从等待状态退出,进入可运行状态,通常会是优先级高的线程先执行,可是根据虚拟机的实现不一样,也有多是随机执行。
  • join()方法可让调用它的线程正常执行完成后,再去执行该线程后面的代码,它具备让线程排队的做用。

二 线程同步

线程安全,一般所说的线程安全指的是相对的线程安全,它指的是对这个对象单独的操做是线程安全的,咱们在调用的时候无需作额外的保障措施。

什么叫相对安全?🤔

🤞举个栗子

咱们知道Java里的Vector是个线程安全的类,在多线程环境下对其插入、删除和读取都是安全的,但这仅限于每次只有一个线程对其操做,若是多个线程同时操做 Vector,那它就再也不是线程安全的了。

final Vector<String> vector = new Vector<>();

    while (true) {
        for (int i = 0; i < 10; i++) {
            vector.add("项:" + i);
        }

        Thread removeThread = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < vector.size(); i++) {
                    vector.remove(i);
                }
            }
        });

        Thread printThread = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < vector.size(); i++) {
                    Log.d(TAG, vector.get(i));
                }
            }
        });

        removeThread.start();
        printThread.start();

        if (Thread.activeCount() >= 20) {
            return;
        }
    }
复制代码

可是程序却crash了

正确的作法应该是vector对象加上同步锁,以下:

final Vector<String> vector = new Vector<>();

        while (true) {
            for (int i = 0; i < 10; i++) {
                vector.add("项:" + i);
            }

            Thread removeThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    synchronized (vector){
                        for (int i = 0; i < vector.size(); i++) {
                            vector.remove(i);
                        }
                    }
                }
            });

            Thread printThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    synchronized (vector){
                        for (int i = 0; i < vector.size(); i++) {
                            Log.d(TAG, vector.get(i));
                        }
                    }
                }
            });

            removeThread.start();
            printThread.start();

            if (Thread.activeCount() >= 20) {
                return;
            }
        }
复制代码

2.1 volatile

volatile也是互斥同步的一种实现,不过它很是的轻量级。

volatile有两条关键的语义:

  • 保证被volatile修饰的变量对全部线程都是可见的
  • 禁止进行指令重排序

要理解volatile关键字,咱们得先从Java的线程模型开始提及。如图所示:

Java内存模型规定了全部字段(这些字段包括实例字段、静态字段等,不包括局部变量、方法参数等,由于这些是线程私有的,并不存在竞争)都存在主内存中,每一个线程会 有本身的工做内存,工做内存里保存了线程所使用到的变量在主内存里的副本拷贝,线程对变量的操做只能在工做内存里进行,而不能直接读写主内存,固然不一样内存之间也 没法直接访问对方的工做内存,也就是说主内存时线程传值的媒介。

咱们来理解第一句话:

保证被volatile修饰的变量对全部线程都是可见的

如何保证可见性?🤔

被volatile修饰的变量在工做内存修改后会被强制写回主内存,其余线程在使用时也会强制从主内存刷新,这样就保证了一致性。

关于“保证被volatile修饰的变量对全部线程都是可见的”,有种常见的错误理解:

错误理解:因为volatile修饰的变量在各个线程里都是一致的,因此基于volatile变量的运算在多线程并发的状况下是安全的。

这句话的前半部分是对的,后半部分却错了,所以它忘记考虑变量的操做是否具备原子性这一问题。

:point_up:举个栗子

private volatile int start = 0;

    private void volatileKeyword() {

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    start++;
                }
            }
        };

        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(runnable);
            thread.start();
        }
        Log.d(TAG, "start = " + start);
    }

复制代码

这段代码启动了10个线程,每次10次自增,按道理最终结果应该是100,可是结果并不是如此。

为何会这样?:thinking:

仔细看一下start++,它其实并不是一个原子操做,简单来看,它有两步:

  1. 取出start的值,由于有volatile的修饰,这时候的值是正确的。
  2. 自增,可是自增的时候,别的线程可能已经把start加大了,这种状况下就有可能把较小的start写回主内存中。

因此volatile只能保证可见性,在不符合如下场景下咱们依然须要经过加锁来保证原子性:

  • 运算结果并不依赖变量当前的值,或者只有单一线程修改变量的值。(要么结果不依赖当前值,要么操做是原子性的,要么只要一个线程修改变量的值)
  • 变量不须要与其余状态变量共同参与不变约束

比方说咱们会在线程里加个boolean变量,来判断线程是否中止,这种状况就很是适合使用volatile。

咱们再来理解第二句话。

  • 禁止进行指令重排序

什么是指令重排序?🤔

指令重排序是值指令乱序执行,即在条件容许的状况下,直接运行当前有能力当即执行的后续指令,避开为获取下一条指令所需数据而形成的等待,经过乱序执行的技术,提供执行效率。

指令重排序绘制被volatile修饰的变量的赋值操做前,添加一个内存屏障,指令重排序时不能把后面的指令重排序的内存屏障以前的位置。

关于指令重排序不是本篇文章重点讨论的内容,更多细节能够参考指令重排序

2.2 synchronized

synchronized是互斥同步的一种实现。

synchronized:当某个线程访问被synchronized标记的方法或代码块时,这个线程便得到了该对象的锁,其余线程暂时没法访问这个方法,只有等待这个方法执行完毕或者代码块执行完毕,这个 线程才会释放该对象的锁,其余线程才能执行这个方法或代码块。

前面咱们已经说了volatile关键字,这里咱们举个例子来综合分析volatile与synchronized关键字的使用。

:point_up:举个栗子

public class Singleton {

    //volatile保证了:1 instance在多线程并发的可见性 2 禁止instance在操做是的指令重排序
    private volatile static Singleton instance;

    public static Singleton getInstance() {
        //第一次判空,保证没必要要的同步
        if (instance == null) {
            //synchronized对Singleton加全局所,保证每次只要一个线程建立实例
            synchronized (Singleton.class) {
                //第二次判空时为了在null的状况下建立实例
                if (instance == null) {
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}
复制代码

这是一个经典的DSL单例。

它的字节码以下:

能够看到被synchronized同步的代码块,会在先后分别加上monitorenter和monitorexit,这两个字节码都须要指定加锁和解锁的对象。

关于加锁和解锁的对象:

  • synchronized代码块 :同步代码块,做用范围是整个代码块,做用对象是调用这个代码块的对象。
  • synchronized方法 :同步方法,做用范围是整个方法,做用对象是调用这个方法的对象。
  • synchronized静态方法 :同步静态方法,做用范围是整个静态方法,做用对象是调用这个类的全部对象。
  • synchronized(this):做用范围是该对象中全部被synchronized标记的变量、方法或代码块,做用对象是对象自己。
  • synchronized(ClassName.class) :做用范围是静态的方法或者静态变量,做用对象是Class对象。

synchronized(this)添加的是对象锁,synchronized(ClassName.class)添加的是类锁,它们的区别以下:

对象锁:Java的全部对象都含有1个互斥锁,这个锁由JVM自动获取和释放。线程进入synchronized方法的时候获取该对象的锁,固然若是已经有线程获取了这个对象的锁,那么当前线 程会等待;synchronized方法正常返回或者抛异常而终止,JVM会自动释放对象锁。这里也体现了用synchronized来加锁的好处,方法抛异常的时候,锁仍然能够由JVM来自动释放。

类锁:对象锁是用来控制实例方法之间的同步,类锁是用来控制静态方法(或静态变量互斥体)之间的同步。其实类锁只是一个概念上的东西,并非真实存在的,它只是用来帮助咱们理 解锁定实例方法和静态方法的区别的。咱们都知道,java类可能会有不少个对象,可是只有1个Class对象,也就是说类的不一样实例之间共享该类的Class对象。Class对象其实也仅仅是1个 java对象,只不过有点特殊而已。因为每一个java对象都有1个互斥锁,而类的静态方法是须要Class对象。因此所谓的类锁,不过是Class对象的锁而已。获取类的Class对象有好几种,最简 单的就是MyClass.class的方式。 类锁和对象锁不是同一个东西,一个是类的Class对象的锁,一个是类的实例的锁。也就是说:一个线程访问静态synchronized的时候,容许另外一个线程访 问对象的实例synchronized方法。反过来也是成立的,由于他们须要的锁是不一样的。

关不一样步锁还有ReentrantLock,eentrantLockR相对于synchronized具备等待可中断、公平锁等更多功能,这里限于篇幅,再也不展开。

三 线程池

咱们知道线程的建立、切换与销毁都会花费比较大代价,因此很天然的咱们使用线程池来复用和管理线程。Java里的线程池咱们一般经过ThreadPoolExecutor来实现。 接下来咱们就来分析ThreadPoolExecutor的相关原理,以及ThreadPoolExecutor在Android上的应用AsyncTask。

3.1 线程池调度

线程池有五种运行状态,以下所示:

线程池状态图

  • RUNNING:能够接受新任务,也能够处理等待队列里的任务。
  • SHUTDOWN:不接受新任务,但能够处理等待队列里的任务。
  • STOP:不接受新的任务,再也不处理等待队列里的任务。中断正在处理的任务。
  • TIDYING:全部任务都已经处理完了,当前线程池没有有效的线程,而且即将调用terminated()方法。
  • TERMINATED:调用了terminated()方法,线程池终止。

另外,ThreadPoolExecutor是用一个AtomicInteger来记录线程池状态和线程池里的线程数量的,以下所示:

  • 低29位:用来存放线程数
  • 高3位:用来存放线程池状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;// 111
private static final int SHUTDOWN   =  0 << COUNT_BITS;// 000
private static final int STOP       =  1 << COUNT_BITS;// 001
private static final int TIDYING    =  2 << COUNT_BITS;// 010
private static final int TERMINATED =  3 << COUNT_BITS;// 110

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }//线程池状态
private static int workerCountOf(int c) { return c & CAPACITY; }//线程池当前线程数
private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码

在正式介绍线程池调度原理以前,咱们先来回忆一下Java实现任务的两个接口:

  • Runnable:在run()方法里完成任务,无返回值,且不会抛出异常。
  • Callable:在call()方法里完成任务,有返回值,且可能抛出异常。

另外,还有个Future接口,它能够对Runnable、Callable执行的任务进行判断任务是否完成,中断任务以及获取任务结果的操做。咱们一般会使用它的实现类FutureTask,FutureTask是一个Future、Runnable 以及Callable的包装类。利用它能够很方便的完成Future接口定义的操做。FutureTask内部的线程阻塞是基于LockSupport来实现的。

咱们接下来看看线程池是和执行任务的。

ThreadPoolExecutor调度流程图

execute(Runnable command)

public class ThreadPoolExecutor extends AbstractExecutorService {
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            //1. 若线程池状态是RUNNING,线程池大小小于配置的核心线程数,则能够在线程池中建立新线程执行新任务。
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //2. 若线程池状态是RUNNING,线程池大小大于配置的核心线程数,则尝试将任务插入阻塞队列进行等待
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //若插入成功,则将次检查线程池的状态是否为RUNNING,若是不是则移除当前任务并进入拒绝策略。
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                //若是线程池中的线程数为0,即线程池中的线程都执行完毕处于SHUTDOWN状态,此时添加了一个null任务
                //(由于SHUTDOWN状态再也不接受新任务)
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //3. 若没法插入阻塞队列,则尝试建立新线程,建立失败则进入拒绝策略。
            else if (!addWorker(command, false))
                reject(command);
        }
}
复制代码
  1. 若线程池大小小于配置的核心线程数,则能够在线程池中建立新线程执行新任务。
  2. 若线程池状态是RUNNING,线程池大小大于配置的核心线程数,则尝试将任务插入阻塞队列进行等待。若插入成功,为了健壮性考虑,则将次检查线程池的状态是否为RUNNING ,若是不是则移除当前任务并进入拒绝策略。若是线程池中的线程数为0,即线程池中的线程都执行完毕处于SHUTDOWN状态,此时添加了一个null任务(由于SHUTDOWN状态再也不接受 新任务)。
  3. 若没法插入阻塞队列,则尝试建立新线程,建立失败则进入拒绝策略。

这个其实很好理解,打个比方。咱们公司的一个小组来完成任务,

  • 若是任务数量小于小组人数(核心线程数),则指派小组里人的完成;
  • 若是任务数量大于小组人数,则去招聘新人来完成,则将任务加入排期等待(阻塞队列)。
  • 若是没有排期,则试着去招新人来完成任务(最大线程数),若是招新人也完成不了,说明这不是人干的活,则去找产品经理砍需求(拒绝策略)。

addWorker(Runnable firstTask, boolean core)

addWorker(Runnable firstTask, boolean core) 表示添加个Worker,Worker实现了Runnable接口,是对Thread的封装,该方法添加完Worker后,则调用runWorker()来启动线程。

public class ThreadPoolExecutor extends AbstractExecutorService {
    
     private boolean addWorker(Runnable firstTask, boolean core) {
            //重试标签
            retry:
            for (;;) {
                int c = ctl.get();
                //获取当前线程池状态
                int rs = runStateOf(c);
    
                //如下状况表示再也不接受新任务:1 线程池没有处于RUNNING状态 2 要执行的任务为空 3 阻塞队列已满
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    //获取线程池当前的线程数
                    int wc = workerCountOf(c);
                    //若是超出容量,则再也不接受新任务,core表示是否使用corePoolSize做为比较标准
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    //增长线程数
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    //若是线程池状态发生变化,从新开始循环
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            //线程数增长成功,开始添加新线程,Worker是Thread的封装类
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    //加锁
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            //将新启动的线程添加到线程池中
                            workers.add(w);
                            //更新线程池中线程的数量,注意这个数量不能超过largestPoolSize
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        //调用runWorker()方法,开始执行线程
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
}
复制代码

runWorker(Worker w)

runWorker()方法是整个阻塞队列的核心循环,在这个循环中,线程池会不断的从阻塞队列workerQueue中取出的新的task并执行。

public class ThreadPoolExecutor extends AbstractExecutorService {
    
    final void runWorker(Worker w) {
           Thread wt = Thread.currentThread();
           Runnable task = w.firstTask;
           w.firstTask = null;
           w.unlock(); // allow interrupts
           boolean completedAbruptly = true;
           try {
               //从阻塞队列中不断取出任务,若是取出的任务为空,则循环终止
               while (task != null || (task = getTask()) != null) {
                   w.lock();
                   // If pool is stopping, ensure thread is interrupted;
                   // if not, ensure thread is not interrupted. This
                   // requires a recheck in second case to deal with
                   // shutdownNow race while clearing interrupt
                   if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                         runStateAtLeast(ctl.get(), STOP))) &&
                       !wt.isInterrupted())
                       wt.interrupt();
                   try {
                       //该方法为空,能够从新次方法,在任务执行开始前作一些处理
                       beforeExecute(wt, task);
                       Throwable thrown = null;
                       try {
                           task.run();
                       } catch (RuntimeException x) {
                           thrown = x; throw x;
                       } catch (Error x) {
                           thrown = x; throw x;
                       } catch (Throwable x) {
                           thrown = x; throw new Error(x);
                       } finally {
                           //该方法为空,能够从新次方法,在任务执行结束后作一些处理
                           afterExecute(task, thrown);
                       }
                   } finally {
                       task = null;
                       w.completedTasks++;
                       w.unlock();
                   }
               }
               completedAbruptly = false;
           } finally {
               processWorkerExit(w, completedAbruptly);
           }
       }
       
        //从阻塞队列workerQueue中取出Task
        private Runnable getTask() {
               boolean timedOut = false; // Did the last poll() time out?
               //循环
               for (;;) {
                   int c = ctl.get();
                   //获取线程池状态
                   int rs = runStateOf(c);
       
                   //如下状况中止循环:1 线程池状态不是RUNNING(>= SHUTDOWN)2 线程池状态>= STOP 或者阻塞队列为空
                   if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                       //递减workCount
                       decrementWorkerCount();
                       return null;
                   }
       
                   int wc = workerCountOf(c);
       
                   // 判断线程的IDLE超时机制是否生效,有两种状况:1 allowCoreThreadTimeOut = true,这是能够手动
                   //设置的 2 当前线程数大于核心线程数
                   boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
       
                   if ((wc > maximumPoolSize || (timed && timedOut))
                       && (wc > 1 || workQueue.isEmpty())) {
                       if (compareAndDecrementWorkerCount(c))
                           return null;
                       continue;
                   }
       
                   try {
                       //根据timed来决定是以poll超时等待的方式仍是以take()阻塞等待的方式从阻塞队列中获取任务
                       Runnable r = timed ?
                           workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                           workQueue.take();
                       if (r != null)
                           return r;
                       timedOut = true;
                   } catch (InterruptedException retry) {
                       timedOut = false;
                   }
               }
           }
}
复制代码

因此你能够理解了,runWorker()方法是在新建立线程的run()方法里的,而runWorker()又不断的调用getTask()方法去获取阻塞队列里的任务,这样就实现了线程的复用。

3.2 线程池配置

咱们先来看看ThreadPoolExecutor的构造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 复制代码
  • int corePoolSize:核心线程池大小
  • int maximumPoolSize:线程池最大容量大小
  • long keepAliveTime:线程不活动时存活的时间
  • TimeUnit unit:时间单位
  • BlockingQueue workQueue:任务队列
  • ThreadFactory threadFactory:线程工程
  • RejectedExecutionHandler handler:线程拒绝策略

那么这些参数咱们应该怎么配置呢?要合理配置线程池就须要先了解咱们的任务特性,通常说来:

  • 任务性质:CPU密集型、IO密集型、混合型
  • 任务优先级:低、中、高
  • 任务执行时间:短、中、长
  • 任务依赖性:是否依赖其余资源,数据库、网络

咱们根据这些属性来一一分析这些参数的配置。

首先就是核心线程数corePoolSize与最大线程数maximumPoolSize。这个的配置咱们一般要考虑CPU同时执行线程的阈值。一旦超过这个阈值,CPU就须要花费不少 时间来完成线程的切换与调度,这样会致使性能大幅下滑。

/** * CPU核心数,注意该方法并不可靠,它返回的有可能不是真实的CPU核心数,由于CPU在某些状况下会对某些核 * 心进行睡眠处理,这种状况返回的知识已激活的CPU核心数。 */
private static final int NUMBER_OF_CPU = Runtime.getRuntime().availableProcessors();

/** * 核心线程数 */
private static final int corePoolSize = Math.max(2, Math.min(NUMBER_OF_CPU - 1, 4));

/** * 最大线程数 */
private static final int maximumPoolSize = NUMBER_OF_CPU * 2 + 1;
复制代码

至于keepAliveTime,该参数描述了线程不活动时存活的时间,若是是CPU密集型任务,则将时间设置的小一些,若是是IO密集型或者数据库链接任务,则将时间设置的长一些。

咱们再来看看BlockingQueue参数的配置。BlockingQueue用来描述阻塞队列。它的方法以四种形式存在,以此来知足不一样需求。

抛出异常 特殊值 阻塞 超时
add(e) offer(e) put(e) offer(e, time, unit)
remove() poll() take() poll(time, unit)
element() peek() 不可用 不可用

它有如下特色:

  • 不支持null元素
  • 线程安全

它的实现类有:

  • ArrayBlockingQueue :一个数组实现的有界阻塞队列,此队列按照FIFO的原则对元素进行排序,支持公平访问队列(可重入锁实现ReenttrantLock)。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列,此队列默认和最大长度为Integer.MAX_VALUE,按照FIFO的原则对元素进行排序。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列,默认状况下采用天然顺序排列,也能够指定Comparator。
  • DelayQueue:一个支持延时获取元素的无界阻塞队列,建立元素时能够指定多久之后才能从队列中获取当前元素,经常使用于缓存系统设计与定时任务调度等。
  • SynchronousQueue:一个不存储元素的阻塞队列。存入操做必须等待获取操做,反之亦然,它至关于一个传球手,很是适合传递性场景。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列,与LinkedBlockingQueue相比多了transfer和tryTranfer方法,该方法在有消费者等待接收元素时会当即将元素传递给消费者。
  • LinkedBlockingDeque:一个由链表结构组成的双端阻塞队列,能够从队列的两端插入和删除元素。由于出入口都有两个,能够减小一半的竞争。适用于工做窃取的场景。

工做窃取:例若有两个队列A、B,各自干本身的活,可是A效率比较高,很快把本身的活干完了,因而勤快的A就会去窃取B的任务来干,这是A、B会访问同一个队列,为了减小A、B的竞争,规定窃取者A 只从双端队列的尾部拿任务,被窃取者B只从双端队列的头部拿任务。

咱们最后来看看RejectedExecutionHandler参数的配置。

RejectedExecutionHandler用来描述线程数大于或等于线程池最大线程数时的拒绝策略,它的实现类有:

  • ThreadPoolExecutor.AbortPolicy:默认策略,当线程池中线程的数量大于或者等于最大线程数时,抛出RejectedExecutionException异常。
  • ThreadPoolExecutor.DiscardPolicy:当线程池中线程的数量大于或者等于最大线程数时,默默丢弃掉不能执行的新任务,不报任何异常。
  • ThreadPoolExecutor.CallerRunsPolicy:当线程池中线程的数量大于或者等于最大线程数时,若是线程池没有被关闭,则直接在调用者的线程里执行该任务。
  • ThreadPoolExecutor.DiscardOldestPolicy:当线程池中线程的数量大于或者等于最大线程数时,丢弃阻塞队列头部的任务(即等待最近的任务),而后从新添加当前任务。

另外,Executors提供了一系列工厂方法用来建立线程池。这些线程是适用于不一样的场景。

  • newCachedThreadPool():无界可自动回收线程池,查看线程池中有没有之前创建的线程,若是有则复用,若是没有则创建一个新的线程加入池中,池中的线程超过60s不活动则自动终止。适用于生命 周期比较短的异步任务。
  • newFixedThreadPool(int nThreads):固定大小线程池,与newCachedThreadPool()相似,可是池中持有固定数目的线程,不能随时建立线程,若是建立新线程时,超过了固定 线程数,则放在队列里等待,直到池中的某个线程被移除时,才加入池中。适用于很稳定、很正规的并发线程,多用于服务器。
  • newScheduledThreadPool(int corePoolSize):周期任务线程池,该线程池的线程能够按照delay依次执行线程,也能够周期执行。
  • newSingleThreadExecutor():单例线程池,任意时间内池中只有一个线程。

3.3 线程池监控

ThreadPoolExecutor里提供了一些空方法,咱们能够经过继承ThreadPoolExecutor,复写这些方法来实现对线程池的监控。

public class ThreadPoolExecutor extends AbstractExecutorService {
       
    protected void beforeExecute(Thread t, Runnable r) { }
    protected void afterExecute(Runnable r, Throwable t) { }
}
复制代码

常见的监控指标有:

  • taskCount:线程池须要执行的任务数量。
  • completedTaskCount:线程池在运行过程当中已完成的任务数量,小于或等于taskCount。
  • largestPoolSize:线程池里曾经建立过的最大线程数量。经过这个数据能够知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
  • getPoolSize:线程池的线程数量。若是线程池不销毁的话,线程池里的线程不会自动销毁,因此这个大小只增不减。
  • getActiveCount:获取活动的线程数。

四 线程池应用

4.1 AsyncTask

AsyncTask基于ThreadPoolExecutor实现,内部封装了Thread+Handler,多用来执行耗时较短的任务。

一个简单的AsyncTask例子

public class AsyncTaskDemo extends AsyncTask<String, Integer, String> {

    /** * 在后台任务开始执行以前调用,用于执行一些界面初始化操做,例如显示一个对话框,UI线程。 */
    @Override
    protected void onPreExecute() {
        super.onPreExecute();
    }

    /** * 执行后台线程,执行完成能够经过return语句返回,worker线程 * * @param strings params * @return result */
    @Override
    protected String doInBackground(String... strings) {
        return null;
    }

    /** * 更新进度,UI线程 * * @param values progress */
    @Override
    protected void onProgressUpdate(Integer... values) {
        super.onProgressUpdate(values);
    }


    /** * 后台任务执行完成并经过return语句返回后会调用该方法,UI线程。 * * @param result result */
    @Override
    protected void onPostExecute(String result) {
        super.onPostExecute(result);
    }

    /** * 后台任务呗取消后回调 * * @param reason reason */
    @Override
    protected void onCancelled(String reason) {
        super.onCancelled(reason);
    }

    /** * 后台任务呗取消后回调 */
    @Override
    protected void onCancelled() {
        super.onCancelled();
    }
}
复制代码

AsyncTask的使用很是的简单,接下来咱们去分析AsyncTask的源码实现。

AsyncTask流程图

AsyncTask源码的一开始就是个建立线程池的流程。

public abstract class AsyncTask<Params, Progress, Result> {
    
        private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
        //核心线程数,最少2个,最多4个
        private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
        private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
        //线程不活动时的存活时间是30s
        private static final int KEEP_ALIVE_SECONDS = 30;
    
        //线程构建工厂,指定线程的名字
        private static final ThreadFactory sThreadFactory = new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);
    
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
            }
        };
    
        //一个由链表结构组成的无界阻塞队列
        private static final BlockingQueue<Runnable> sPoolWorkQueue =
                new LinkedBlockingQueue<Runnable>(128);
    
        public static final Executor THREAD_POOL_EXECUTOR;
    
        //构建线程池
        static {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                    sPoolWorkQueue, sThreadFactory);
            threadPoolExecutor.allowCoreThreadTimeOut(true);
            THREAD_POOL_EXECUTOR = threadPoolExecutor;
        }
}
复制代码

另外,咱们能够经过AsyncTask.executeOnExecutor(Executor exec, Params... params) 来自定义线程池。

咱们再来看看构造方法。

public abstract class AsyncTask<Params, Progress, Result> {
    
      //构造方法须要在UI线程里调用
      public AsyncTask() {
          //建立一个Callable对象,WorkerRunnable实现了Callable接口
          mWorker = new WorkerRunnable<Params, Result>() {
              public Result call() throws Exception {
                  mTaskInvoked.set(true);
  
                  Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                  //noinspection unchecked
                  Result result = doInBackground(mParams);
                  Binder.flushPendingCommands();
                  return postResult(result);
              }
          };
  
          //建立一个FutureTask对象,该对象用来接收mWorker的结果
          mFuture = new FutureTask<Result>(mWorker) {
              @Override
              protected void done() {
                  try {
                      //将执行的结果经过发送给Handler处理,注意FutureTask的get()方法会阻塞直至结果返回
                      postResultIfNotInvoked(get());
                  } catch (InterruptedException e) {
                      android.util.Log.w(LOG_TAG, e);
                  } catch (ExecutionException e) {
                      throw new RuntimeException("An error occurred while executing doInBackground()",
                              e.getCause());
                  } catch (CancellationException e) {
                      postResultIfNotInvoked(null);
                  }
              }
          };
      } 
      
      private void postResultIfNotInvoked(Result result) {
          final boolean wasTaskInvoked = mTaskInvoked.get();
          if (!wasTaskInvoked) {
              postResult(result);
          }
      }
  
      private Result postResult(Result result) {
          @SuppressWarnings("unchecked")
          Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                  new AsyncTaskResult<Result>(this, result));
          message.sendToTarget();
          return result;
      }
      
     //内部的Handler 
     private static class InternalHandler extends Handler {
        public InternalHandler() {
            //UI线程的Looper
            super(Looper.getMainLooper());
        }

        @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
        @Override
        public void handleMessage(Message msg) {
            AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
            switch (msg.what) {
                //返回结果
                case MESSAGE_POST_RESULT:
                    // There is only one result
                    result.mTask.finish(result.mData[0]);
                    break;
                //返回进度
                case MESSAGE_POST_PROGRESS:
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }
     }
}
复制代码

能够看到当咱们调用AsyncTask的构造方法时,就建立了一个FutureTask对象,它内部包装了Callable对象(就是咱们要执行的任务),并在FutureTask对象的done()方法里 将结果发送给Handler。

接着看看执行方法execute()。

public abstract class AsyncTask<Params, Progress, Result> {
    
        //须要在UI线程里调用
        @MainThread
        public final AsyncTask<Params, Progress, Result> execute(Params... params) {
            return executeOnExecutor(sDefaultExecutor, params);
        }

        @MainThread
        public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) {
            if (mStatus != Status.PENDING) {
                switch (mStatus) {
                    case RUNNING:
                        throw new IllegalStateException("Cannot execute task:"
                                + " the task is already running.");
                    case FINISHED:
                        throw new IllegalStateException("Cannot execute task:"
                                + " the task has already been executed "
                                + "(a task can be executed only once)");
                }
            }
    
            mStatus = Status.RUNNING;
            //任务执行前的处理,咱们能够复写次方法
            onPreExecute();
    
            mWorker.mParams = params;
            //执行任务,exec为sDefaultExecutor
            exec.execute(mFuture);
    
            return this;
        }
}
复制代码

接着看看这个sDefaultExecutor。

能够看到sDefaultExecutor是个SerialExecutor对象,SerialExecutor实现了Executor接口。

public abstract class AsyncTask<Params, Progress, Result> {
    
        public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
        private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
        
        private static class SerialExecutor implements Executor {
            //任务队列
            final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
            //当前执行的任务
            Runnable mActive;
    
            public synchronized void execute(final Runnable r) {
                
                mTasks.offer(new Runnable() {
                    public void run() {
                        try {
                            r.run();
                        } finally {
                            scheduleNext();
                        }
                    }
                });
                if (mActive == null) {
                    //开始执行任务
                    scheduleNext();
                }
            }
    
            protected synchronized void scheduleNext() {
                //取出队列头的任务开始执行
                if ((mActive = mTasks.poll()) != null) {
                    THREAD_POOL_EXECUTOR.execute(mActive);
                }
            }
        }
}
复制代码

因此咱们没调用一次AsyncTask.execute()方法就将FutureTask对象添加到队列尾部,而后会从队列头部取出任务放入线程池中执行,因此你能够看着这是一个串行执行器。

4.2 Okhttp

在Okhttp的任务调度器Dispatcher里有关于线程池的配置

public final class Dispatcher {
    
      public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
}
复制代码

你能够看到它的配置:

  • 核心线程数为0,最大线程数为Integer.MAX_VALUE,不对核心线程数进行限制,随时建立新的线程,空闲存活时间为60s,用完即走。这也比较符合网络请求的特性。
  • 阻塞队列为SynchronousQueue,该队列不存储任务,只传递任务,因此把任务添加进去就会执行。

这实际上是Excutors.newCachedThreadPool()缓存池的实现。总结来讲就是新任务过来进入SynchronousQueue,它是一个单工模式的队列,只传递任务,不存储任务,而后就建立 新线程执行任务,线程不活动的存活时间为60s。

Okhttp请求流程图

在发起网络请求时,每一个请求执行完成后都会调用client.dispatcher().finished(this)。

final class RealCall implements Call {
    
  final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        //异步请求
        client.dispatcher().finished(this);
      }
    }
  }
}
复制代码

咱们来看看client.dispatcher().finished(this)这个方法。

public final class Dispatcher {
    
  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      //将已经结束的请求call移除正在运行的队列calls
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //异步请求promoteCalls为true
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

    private void promoteCalls() {
      //当前异步请求数大于最大请求数,不继续执行
      if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
      //异步等待队列为空,不继续执行
      if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
  
      //遍历异步等待队列
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall call = i.next();
  
        //若是没有超过相同host的最大请求数,则复用当前请求的线程
        if (runningCallsForHost(call) < maxRequestsPerHost) {
          i.remove();
          runningAsyncCalls.add(call);
          executorService().execute(call);
        }
  
        //运行队列达到上限,也再也不执行
        if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
      }
    }
}
复制代码

因此你能够看到Okhttp不是用线程池来控制线程个数,线程池里的线程执行的都是正在运行请请求,控制线程的是Dispatcher,Dispatcher.promoteCalls()方法经过 最大请求数maxRequests和相同host最大请求数maxRequestsPerHost来控制异步请求不超过两个最大值,在值范围内不断的将等待队列readyAsyncCalls中的请求添加 到运行队列runningAsyncCalls中去。

相关文章
相关标签/搜索