详解 RxJava 的消息订阅和线程切换原理

本文由玉刚说写做平台提供写做赞助java

原做者:四月葡萄git

版权声明:本文版权归微信公众号 玉刚说 全部,未经许可,不得以任何形式转载github

1.前言

本文主要是对RxJava的消息订阅和线程切换进行源码分析,相关的使用方式等不做详细介绍。数据库

本文源码基于rxjava:2.1.14json

2. RxJava简介

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.缓存

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.安全

上面这段话来自于RxJava在github上面的官方介绍。翻译成中文的大概意思就是:微信

RxJava是一个在Java虚拟机上的响应式扩展,经过使用可观察的序列将异步和基于事件的程序组合起来的一个库。markdown

它扩展了观察者模式来支持数据/事件序列,而且添加了操做符,这些操做符容许你声明性地组合序列,同时抽象出要关注的问题:好比低级线程、同步、线程安全和并发数据结构等。网络

简单点来讲, RxJava就是一个使用了观察者模式,可以异步的库。

3. 观察者模式

上面说到,RxJava扩展了观察者模式,那么什么是观察模式呢?咱们先来了解一下。

举个例子,以微信公众号为例,一个微信公众号会不断产生新的内容,若是咱们读者对这个微信公众号的内容感兴趣,就会订阅这个公众号,当公众号有新内容时,就会推送给咱们。咱们收到新内容时,若是是咱们感兴趣的,就会点进去看下;若是是广告的话,就可能直接忽略掉。这就是咱们生活中遇到的典型的观察者模式。

在上面的例子中,微信公众号就是一个被观察者(Observable),不断的产生内容(事件),而咱们读者就是一个观察者(Observer) ,经过订阅(subscribe)就可以接受到微信公众号(被观察者)推送的内容(事件),根据不一样的内容(事件)作出不一样的操做。

3.1 Rxjava角色说明

RxJava的扩展观察者模式中就是存在这么4种角色:

角色 角色功能
被观察者(Observable 产生事件
观察者(Observer 响应事件并作出处理
事件(Event 被观察者和观察者的消息载体
订阅(Subscribe 链接被观察者和观察者

3.2 RxJava事件类型

RxJava中的事件分为三种类型:Next事件、Complete事件和Error事件。具体以下:

事件类型 含义 说明
Next 常规事件 被观察者能够发送无数个Next事件,观察者也能够接受无数个Next事件
Complete 结束事件 被观察者发送Complete事件后能够继续发送事件,观察者收到Complete事件后将不会接受其余任何事件
Error 异常事件 被观察者发送Error事件后,其余事件将被终止发送,观察者收到Error事件后将不会接受其余任何事件

4.RxJava的消息订阅

在分析RxJava消息订阅原理前,咱们仍是先来看下它的简单使用步骤。这里为了方便讲解,就不用链式代码来举例了,而是采用分步骤的方式来逐一说明(平时写代码的话仍是建议使用链式代码来调用,由于更加简洁)。其使用步骤以下:

  1. 建立被观察者(Observable),定义要发送的事件。
  2. 建立观察者(Observer),接受事件并作出响应操做。
  3. 观察者经过订阅(subscribe)被观察者把它们链接到一块儿。

4.1 RxJava的消息订阅例子

这里咱们就根据上面的步骤来实现这个例子,以下:

//步骤1. 建立被观察者(Observable),定义要发送的事件。
        Observable observable = Observable.create(
        new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        });
       
        //步骤2. 建立观察者(Observer),接受事件并作出响应操做。
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext : " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError : " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };
       
        //步骤3. 观察者经过订阅(subscribe)被观察者把它们链接到一块儿。
        observable.subscribe(observer);
复制代码

其输出结果为:

onSubscribe
onNext : 文章1
onNext : 文章2
onNext : 文章3
onComplete
复制代码

4.2 源码分析

下面咱们对消息订阅过程当中的源码进行分析,分为两部分:建立被观察者过程和订阅过程。

4.2.1 建立被观察者过程

首先来看下建立被观察者(Observable)的过程,上面的例子中咱们是直接使用Observable.create()来建立Observable,咱们点进去这个方法看下。

4.2.1.1 Observable类的create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
复制代码

能够看到,create()方法中也没作什么,就是建立一个ObservableCreate对象出来,而后把咱们自定义的ObservableOnSubscribe做为参数传到ObservableCreate中去,最后就是调用 RxJavaPlugins.onAssembly()方法。

咱们先来看看ObservableCreate类:

4.2.1.2 ObservableCreate类
public final class ObservableCreate<T> extends Observable<T> {//继承自Observable
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;//把咱们建立的ObservableOnSubscribe对象赋值给source。
    }
}
复制代码

能够看到,ObservableCreate是继承自Observable的,而且会把ObservableOnSubscribe对象给存起来。

再看下RxJavaPlugins.onAssembly()方法

4.2.1.3 RxJavaPlugins类的onAssembly()
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        //省略无关代码
        return source;
    }
复制代码

很简单,就是把上面建立的ObservableCreate给返回。

4.2.1.4 简单总结

因此Observable.create()中就是把咱们自定义的ObservableOnSubscribe对象从新包装成一个ObservableCreate对象,而后返回这个ObservableCreate对象。 注意,这种从新包装新对象的用法在RxJava中会频繁用到,后面的分析中咱们还会屡次遇到。 放个图好理解,包起来哈~

被观察者.png

4.2.1.5 时序图

Observable.create()的时序图以下所示:

Observable.create()时序图.png

4.2.2 订阅过程

接下来咱们就看下订阅过程的代码,一样,点进去Observable.subscribe()

4.2.2.1 Observable类的subscribe()
public final void subscribe(Observer<? super T> observer) {
            //省略无关代码
           
            observer = RxJavaPlugins.onSubscribe(this, observer);

            subscribeActual(observer);
           
            //省略无关代码
    }
复制代码

能够看到,实际上其核心的代码也就两句,咱们分开来看下:

4.2.2.2 RxJavaPlugins类的onSubscribe()
public static <T> Observer<? super T> onSubscribe(
    @NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        //省略无关代码
       
        return observer;
    }
复制代码

跟以前代码同样,这里一样也是把原来的observer返回而已。 再来看下subscribeActual()方法。

4.2.2.3 Observable类的subscribeActual()
protected abstract void subscribeActual(Observer<? super T> observer);
复制代码

Observable类的subscribeActual()中的方法是一个抽象方法,那么其具体实如今哪呢?还记得咱们前面建立被观察者的过程吗,最终会返回一个ObservableCreate对象,这个ObservableCreate就是Observable的子类,咱们点进去看下:

4.2.2.4 ObservableCreate类的subscribeActual()
@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //触发咱们自定义的Observer的onSubscribe(Disposable)方法
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
复制代码

能够看到,subscribeActual()方法中首先会建立一个CreateEmitter对象,而后把咱们自定义的观察者observer做为参数给传进去。这里一样也是包装起来,放个图:

观察者.png
这个 CreateEmitter实现了 ObservableEmitter接口和 Disposable接口,以下:

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
        //代码省略
    }
复制代码

而后就是调用了observer.onSubscribe(parent),实际上就是调用观察者的onSubscribe()方法,即告诉观察者已经成功订阅到了被观察者。

继续往下看,subscribeActual()方法中会继续调用source.subscribe(parent),这里的source就是ObservableOnSubscribe对象,即这里会调用ObservableOnSubscribesubscribe()方法。 咱们具体定义的subscribe()方法以下:

Observable observable = Observable.create(
        new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        });
复制代码

ObservableEmitter,顾名思义,就是被观察者发射器。 因此,subscribe()里面的三个onNext()方法和一个onComplete()会逐一被调用。 这里的ObservableEmitter接口其具体实现为CreateEmitter,咱们看看CreateEmitte类的onNext()方法和onComplete()的实现:

4.2.2.5 CreateEmitter类的onNext()和onComplete()等
//省略其余代码
       
        @Override
        public void onNext(T t) {
            //省略无关代码
            if (!isDisposed()) {
                //调用观察者的onNext()
                observer.onNext(t);
            }
        }
       
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    //调用观察者的onComplete()
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
复制代码

能够看到,最终就是会调用到观察者的onNext()onComplete()方法。至此,一个完整的消息订阅流程就完成了。 另外,能够看到,上面有个isDisposed()方法能控制消息的走向,即可以切断消息的传递,这个后面再来讲。

4.2.2.6 简单总结

Observable(被观察者)和Observer(观察者)创建链接(订阅)以后,会建立出一个发射器CreateEmitter,发射器会把被观察者中产生的事件发送到观察者中去,观察者对发射器中发出的事件作出响应处理。能够看到,是订阅以后,Observable(被观察者)才会开始发送事件。

放张事件流的传递图:

订阅过程.png

4.2.2.7 时序流程图

再来看下订阅过程的时序流程图:

订阅过程时序图.png

4.3 切断消息

以前有提到过切断消息的传递,咱们先来看下如何使用:

4.3.1 切断消息

Observable observable = Observable.create(
        new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        });
       
        Observer<String> observer = new Observer<String>() {
            private Disposable mDisposable;
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe : " + d);
                mDisposable=d;
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext : " + s);
                mDisposable.dispose();
                Log.d(TAG, "切断观察者与被观察者的链接");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError : " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };
       
        observable.subscribe(observer);
复制代码

输出结果为:

onSubscribe : null
onNext : 文章1
切断观察者与被观察者的链接
复制代码

能够看到,要切断消息的传递很简单,调用下Disposabledispose()方法便可。调用dispose()以后,被观察者虽然能继续发送消息,可是观察者却收不到消息了。 另外有一点须要注意,上面onSubscribe输出的Disposable值是"null",并非空引用null

4.3.2 切断消息源码分析

咱们这里来看看下dispose()的实现。Disposable是一个接口,能够理解Disposable为一个链接器,调用dispose()后,这个链接器将会中断。其具体实如今CreateEmitter类,以前也有提到过。咱们来看下CreateEmitterdispose()方法:

4.3.2.1 CreateEmitter的dispose()
@Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }
复制代码

就是调用DisposableHelper.dispose(this)而已。

4.3.2.2 DisposableHelper类
public enum DisposableHelper implements Disposable {

    DISPOSED
    ;
   
    //其余代码省略

    public static boolean isDisposed(Disposable d) {
        //判断Disposable类型的变量的引用是否等于DISPOSED
        //即判断该链接器是否被中断
        return d == DISPOSED;
    }
   
    public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) {
            //这里会把field给设为DISPOSED
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }
}
复制代码

能够看到DisposableHelper是一个枚举类,而且只有一个值:DISPOSEDdispose()方法中会把一个原子引用field设为DISPOSED,即标记为中断状态。所以后面经过isDisposed()方法便可以判断链接器是否被中断。

4.3.2.3 CreateEmitter类中的方法

再回头看看CreateEmitter类中的方法:

@Override
        public void onNext(T t) {
            //省略无关代码
           
            if (!isDisposed()) {
                //若是没有dispose(),才会调用onNext()
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                //若是dispose()了,会调用到这里,即最终会崩溃
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            //省略无关代码
            if (!isDisposed()) {
                try {
                    //若是没有dispose(),才会调用onError()
                    observer.onError(t);
                } finally {
                    //onError()以后会dispose()
                    dispose();
                }
                //若是没有dispose(),返回true
                return true;
            }
            //若是dispose()了,返回false
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    //若是没有dispose(),才会调用onComplete()
                    observer.onComplete();
                } finally {
                    //onComplete()以后会dispose()
                    dispose();
                }
            }
        }
复制代码

从上面的代码能够看到:

  1. 若是没有disposeobserver.onNext()才会被调用到。
  2. onError()onComplete()互斥,只能其中一个被调用到,由于调用了他们的任意一个以后都会调用dispose()
  3. onError()onComplete()onComplete()不会被调用到。反过来,则会崩溃,由于onError()中抛出了异常:RxJavaPlugins.onError(t)。其实是dispose后继续调用onError()都会炸。

5.RxJava的线程切换

上面的例子和分析都是在同一个线程中进行,这中间也没涉及到线程切换的相关问题。可是在实际开发中,咱们一般须要在一个子线程中去进行一些数据获取操做,而后要在主线程中去更新UI,这就涉及到线程切换的问题了,经过RxJava咱们也能够把线程切换写得还简洁。

5.1 线程切换例子

关于RxJava如何使用线程切换,这里就不详细讲了。 咱们直接来看一个例子,并分别打印RxJava在运行过程当中各个角色所在的线程。

new Thread() {
            @Override
            public void run() {
                Log.d(TAG, "Thread run() 所在线程为 :" + Thread.currentThread().getName());
                Observable
                        .create(new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                                Log.d(TAG, "Observable subscribe() 所在线程为 :" + Thread.currentThread().getName());
                                emitter.onNext("文章1");
                                emitter.onNext("文章2");
                                emitter.onComplete();
                            }
                        })
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable d) {
                                Log.d(TAG, "Observer onSubscribe() 所在线程为 :" + Thread.currentThread().getName());
                            }

                            @Override
                            public void onNext(String s) {
                                Log.d(TAG, "Observer onNext() 所在线程为 :" + Thread.currentThread().getName());
                            }

                            @Override
                            public void onError(Throwable e) {
                                Log.d(TAG, "Observer onError() 所在线程为 :" + Thread.currentThread().getName());
                            }

                            @Override
                            public void onComplete() {
                                Log.d(TAG, "Observer onComplete() 所在线程为 :" + Thread.currentThread().getName());
                            }
                        });
            }
        }.start();
复制代码

输出结果为:

Thread run() 所在线程为 :Thread-2 Observer onSubscribe() 所在线程为 :Thread-2 Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1 Observer onNext() 所在线程为 :main Observer onNext() 所在线程为 :main Observer onComplete() 所在线程为 :main 复制代码

从上面的例子能够看到:

  1. Observer(观察者)的onSubscribe()方法运行在当前线程中。
  2. Observable(被观察者)中的subscribe()运行在subscribeOn()指定的线程中。
  3. Observer(观察者)的onNext()onComplete()等方法运行在observeOn()指定的线程中。

5.2 源码分析

下面咱们对线程切换的源码进行一下分析,分为两部分:subscribeOn()observeOn()

5.2.1 subscribeOn()源码分析

首先来看下subscribeOn(),咱们的例子中是这么个使用的:

.subscribeOn(Schedulers.io())
复制代码

subscribeOn()方法要传入一个Scheduler类对象做为参数,Scheduler是一个调度类,可以延时或周期性地去执行一个任务。

5.2.1.1 Scheduler类型

经过Schedulers类咱们能够获取到各类Scheduler的子类。RxJava提供了如下这些线程调度类供咱们使用:

Scheduler类型 使用方式 含义 使用场景
IoScheduler Schedulers.io() io操做线程 读写SD卡文件,查询数据库,访问网络等IO密集型操做
NewThreadScheduler Schedulers.newThread() 建立新线程 耗时操做等
SingleScheduler Schedulers.single() 单例线程 只需一个单例线程时
ComputationScheduler Schedulers.computation() CPU计算操做线程 图片压缩取样、xml,json解析等CPU密集型计算
TrampolineScheduler Schedulers.trampoline() 当前线程 须要在当前线程当即执行任务时
HandlerScheduler AndroidSchedulers.mainThread() Android主线程 更新UI等
5.2.1.2 Schedulers类的io()

下面咱们来看下Schedulers.io()的代码,其余的Scheduler子类都差很少,就不逐以分析了,有兴趣的请自行查看哈~

@NonNull
    static final Scheduler IO;
   
    @NonNull
    public static Scheduler io() {
        //1.直接返回一个名为IO的Scheduler对象
        return RxJavaPlugins.onIoScheduler(IO);
    }
   
    static {
        //省略无关代码
       
        //2.IO对象是在静态代码块中实例化的,这里会建立按一个IOTask()
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
    }
   
    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            //3.IOTask中会返回一个IoHolder对象
            return IoHolder.DEFAULT;
        }
    }
   
    static final class IoHolder {
        //4.IoHolder中会就是new一个IoScheduler对象出来
        static final Scheduler DEFAULT = new IoScheduler();
    }
复制代码

能够看到,Schedulers.io()中使用了静态内部类的方式来建立出了一个单例IoScheduler对象出来,这个IoScheduler是继承自Scheduler的。这里mark一发,后面会用到这个IoScheduler的。

5.2.1.3 Observable类的subscribeOn()

而后,咱们就来看下subscribeOn()的代码:

public final Observable<T> subscribeOn(Scheduler scheduler) {
        //省略无关代码
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
复制代码

能够看到,首先会将当前的Observable(其具体实现为ObservableCreate)包装成一个新的ObservableSubscribeOn对象。 放个图:

ObservableSubscribeOn.png

跟前面同样,RxJavaPlugins.onAssembly()也是将ObservableSubscribeOn对象原样返回而已,这里就不看了。 能够看下ObservableSubscribeOn的构造方法:

5.2.1.4 ObservableSubscribeOn类的构造方法
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
复制代码

也就是把sourcescheduler这两个保存一下,后面会用到。

而后subscribeOn()方法就完了。好像也没作什么,就是从新包装一下对象而已,而后将新对象返回。即将一个旧的被观察者包装成一个新的被观察者。

5.2.1.5 ObservableSubscribeOn类的subscribeActual()

接下来咱们回到订阅过程,为何要回到订阅过程呢?由于事件的发送是从订阅过程开始的啊。 虽然咱们这里用到了线程切换,可是呢,其订阅过程前面的内容跟上一节分析的是同样的,咱们这里就不重复了,直接从不同的地方开始。还记得订阅过程当中Observable类的subscribeActual()是个抽象方法吗?所以要看其子类的具体实现。在上一节订阅过程当中,其具体实现是在ObservableCreate类。可是因为咱们调用subscribeOn()以后,ObservableCreate对象被包装成了一个新的ObservableSubscribeOn对象了。所以咱们就来看看ObservableSubscribeOn类中的subscribeActual()方法:

@Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
复制代码

subscribeActual()中一样也将咱们自定义的Observer给包装成了一个新的SubscribeOnObserver对象。一样,放张图:

SubscribeOnObserver.png
而后就是调用 ObserveronSubscribe()方法,能够看到,到目前为止,还没出现过任何线程相关的东西,因此 ObserveronSubscribe()方法就是运行在当前线程中。 而后咱们重点看下最后一行代码,首先建立一个 SubscribeTask对象,而后就是调用 scheduler.scheduleDirect().。 咱们先来看下 SubscribeTask类:

5.2.1.6 SubscribeTask类
//SubscribeTask是ObservableSubscribeOn的内部类
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //这里的source就是咱们自定义的Observable对象,即ObservableCreate
            source.subscribe(parent);
        }
    }
复制代码

很简单的一个类,就是实现了Runnable接口,而后run()中调用Observer.subscribe()

5.2.1.7 Scheduler类的scheduleDirect()

再来看下scheduler.scheduleDirect()方法

public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
复制代码

往下看:

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

        //createWorker()在Scheduler类中是个抽象方法,因此其具体实如今其子类中
        //所以这里的createWorker()应当是在IoScheduler中实现的。
        //Worker中能够执行Runnable
        final Worker w = createWorker();
       
        //实际上decoratedRun仍是这个run对象,即SubscribeTask
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
       
        //将Runnable和Worker包装成一个DisposeTask
        DisposeTask task = new DisposeTask(decoratedRun, w);
       
        //Worker执行这个task
        w.schedule(task, delay, unit);

        return task;
    }
复制代码

咱们来看下建立WorkerWorker执行任务的过程。

5.2.1.8 IoScheduler的createWorker()和schedule()
final AtomicReference<CachedWorkerPool> pool;
   
    public Worker createWorker() {
        //就是new一个EventLoopWorker,而且传一个Worker缓存池进去
        return new EventLoopWorker(pool.get());
    }
   
    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();
       
        //构造方法
        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            //从缓存Worker池中取一个Worker出来
            this.threadWorker = pool.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            //省略无关代码
           
            //Runnable交给threadWorker去执行
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
复制代码

注意,不一样的Scheduler类会有不一样的Worker实现,由于Scheduler类最终是交到Worker中去执行调度的。

咱们来看下Worker缓存池的操做:

5.2.1.9 CachedWorkerPool的get()
static final class CachedWorkerPool implements Runnable {
        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                //若是缓冲池不为空,就从缓存池中取threadWorker
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            //若是缓冲池中为空,就建立一个并返回。
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }
    }
复制代码
5.2.1.10 NewThreadWorker的scheduleActual()

咱们再来看下threadWorker.scheduleActual()ThreadWorker类没有实现scheduleActual()方法,其父类NewThreadWorker实现了该方法,咱们点进去看下:

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        //构造方法中建立一个ScheduledExecutorService对象,能够经过ScheduledExecutorService来使用线程池
        executor = SchedulerPoolFactory.create(threadFactory);
    }
   
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        //这里的decoratedRun实际仍是run对象
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //将decoratedRun包装成一个新对象ScheduledRunnable
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        //省略无关代码
       
        if (delayTime <= 0) {
            //线程池中当即执行ScheduledRunnable
            f = executor.submit((Callable<Object>)sr);
        } else {
            //线程池中延迟执行ScheduledRunnable
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
           
        //省略无关代码

        return sr;
    }
}
复制代码

这里的executor就是使用线程池去执行任务,最终SubscribeTaskrun()方法会在线程池中被执行,即Observablesubscribe()方法会在IO线程中被调用。这与上面例子中的输出结果符合:

Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1 复制代码
5.2.1.11 简单总结
  1. Observer(观察者)的onSubscribe()方法运行在当前线程中,由于在这以前都没涉及到线程切换。
  2. 若是设置了subscribeOn(指定线程),那么Observable(被观察者)中subscribe()方法将会运行在这个指定线程中去。
5.2.1.12 时序图

来张总的subscribeOn()切换线程时序图

subscribeOn()切换线程时序图.png

5.2.1.13 屡次设置subscribeOn()的问题

若是咱们屡次设置subscribeOn(),那么其执行线程是在哪个呢?先来看下例子

//省略先后代码,看重点部分
        .subscribeOn(Schedulers.io())//第一次
        .subscribeOn(Schedulers.newThread())//第二次
        .subscribeOn(AndroidSchedulers.mainThread())//第三次
复制代码

其输出结果为:

Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1 复制代码

即只有第一次的subscribeOn()起做用了。这是为何呢? 咱们知道,每调用一次subscribeOn()就会把旧的被观察者包装成一个新的被观察者,通过了三次调用以后,就变成了下面这个样子:

屡次设置subscribeOn().png
同时,咱们知道,被观察者被订阅时是从最外面的一层通知到里面的一层,那么当传到上图第三层时,也就是 ObservableSubscribeOn(第一次)那一层时,管你以前是在哪一个线程, subscribeOn(Schedulers.io())都会把线程切到IO线程中去执行,因此屡次设置 subscribeOn()时,只有第一次生效。

5.2.2 observeOn()

咱们再来看下observeOn(),仍是先来回顾一下咱们例子中的设置:

//指定在Android主线程中执行
    .observeOn(AndroidSchedulers.mainThread())
复制代码
5.2.2.1 Observable类的observeOn()
public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //省略无关代码
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
复制代码

一样,这里也是新包装一个ObservableObserveOn对象,注意,这里包装的旧被观察者是ObservableSubscribeOn对象了,由于以前调用过subscribeOn()包装了一层了,因此如今是以下图所示:

ObservableObserveOn.png

RxJavaPlugins.onAssembly()也是原样返回。

咱们看看ObservableObserveOn的构造方法。

5.2.2.2 ObservableObserveOn类的构造方法
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
复制代码

里面就是一些变量赋值而已。

5.2.2.3 ObservableObserveOn的subscribeActual()

subscribeOn()差很少,咱们就直接来看ObservableObserveOnsubscribeActual()方法了。

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        //判断是否当前线程
        if (scheduler instanceof TrampolineScheduler) {
            //是当前线程的话,直接调用里面一层的subscribe()方法
            //即调用ObservableSubscribeOn的subscribe()方法
            source.subscribe(observer);
        } else {
            //建立Worker
            //本例子中的scheduler为AndroidSchedulers.mainThread()
            Scheduler.Worker w = scheduler.createWorker();
            //这里会将Worker包装到ObserveOnObserver对象中去
            //注意:source.subscribe没有涉及到Worker,因此仍是在以前设置的线程中去执行
            //本例子中source.subscribe就是在IO线程中执行。
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
复制代码

一样,这里也将observer给包装了一层,以下图所示:

ObserveOnObserver.png

source.subscribe()中将会把事件逐一发送出去,咱们这里只看下ObserveOnObserver中的onNext()方法的处理,onComplete()等就不看了,实际上都差很少。

5.2.2.4 ObserveOnObserver的onNext()
@Override
        public void onNext(T t) {
            //省略无关代码
            if (sourceMode != QueueDisposable.ASYNC) {
                //将信息存入队列中
                queue.offer(t);
            }
            schedule();
        }
复制代码

就是调用schedule()而已。

5.2.2.5 ObserveOnObserver的schedule()
void schedule() {
            if (getAndIncrement() == 0) {
                //ObserveOnObserver一样实现了Runnable接口,因此就把它本身交给worker去调度了
                worker.schedule(this);
            }
        }
复制代码

Android主线程调度器里面的代码就不分析了,里面其实是用handler来发送Message去实现的,感兴趣的能够看下。 既然ObserveOnObserver实现了Runnable接口,那么就是其run()方法会在主线程中被调用。 咱们来看下ObserveOnObserverrun()方法:

5.2.2.6 ObserveOnObserver的run()
@Override
        public void run() {
            //outputFused默认是false
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
复制代码

这里会走到drainNormal()方法。

5.2.2.7 ObserveOnObserver的drainNormal()
void drainNormal() {
            int missed = 1;
            //存储消息的队列
            final SimpleQueue<T> q = queue;
            //这里的actual其实是SubscribeOnObserver
            final Observer<? super T> a = actual;

            //省略无关代码
           
            //从队列中取出消息
            v = q.poll();
           
            //...
           
            //这里调用的是里面一层的onNext()方法
            //在本例子中,就是调用SubscribeOnObserver.onNext()
            a.onNext(v);
           
            //...
        }
复制代码

至于SubscribeOnObserver.onNext(),里面也没切换线程的逻辑,就是调用里面一层的onNext(),因此最终会调用到咱们自定义的Observer中的onNext()方法。所以,ObserveronNext()方法就在observeOn()中指定的线程中给调用了,在本例中,就是在Android主线程中给调用。

5.2.2.8 简单总结
  1. 若是设置了observeOn(指定线程),那么Observer(观察者)中的onNext()onComplete()等方法将会运行在这个指定线程中去。
  2. subscribeOn()设置的线程不会影响到observeOn()
5.2.2.9 时序图

最后,来张observeOn()时序图:

observeOn()时序图.png

6.其余

因本人水平有限,若有错误,欢迎指出并交流~四月葡萄的博客

欢迎关注微信公众号,接收第一手技术干货
相关文章
相关标签/搜索