RxJava2源码分析

RxJava2源码分析

RxJava的鼎鼎大名相信Android开发的同窗都很是熟悉了,其实不只仅有RxJava,还有RxJs,RxKotlin等等一系列。能够说Rx并非一种局限于Android的框架,Rx是一种思想,咱们深刻了解了RxJava,一样会加深咱们对其余Rx系列的认知。java

使用方法

咱们来看一个常见的例子:并发

Observable.create(ObservableOnSubscribe<Int> { e ->
            e.onNext(1)
            e.onComplete()
        }).map {
            Log.d("map-thread : ", Thread.currentThread().name)
            Log.d("--", "------------------------------")
            "result : $it"
        }.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(object : Observer<String> {
            override fun onSubscribe(d: Disposable) {
                Log.d("onSubscribe-thread : ", Thread.currentThread().name)
                Log.d("--", "------------------------------")
            }

            override fun onNext(s: String) {
                Log.d("onNext-thread : ", Thread.currentThread().name)
                Log.d("onNext-result", s)
                Log.d("--", "------------------------------")
            }

            override fun onError(e: Throwable) {
                Log.d("onError-thread : ", Thread.currentThread().name)
                Log.d("onError-message : ", e.message)
                Log.d("--", "------------------------------")
                Log.d("--", "------------------------------")
                Log.d("--", "------------------------------")
            }

            override fun onComplete() {
                Log.d("onComplete-thread : ", Thread.currentThread().name)
                Log.d("--", "------------------------------")
                Log.d("--", "------------------------------")
                Log.d("--", "------------------------------")
            }
        })

这是一个使用Kotlin写的例子,对Kotlin不熟悉的同窗无需关注代码细节,大体能看懂什么意思就行。首先发送一个数字1,而后经过 map 操做符把数字1变成 result : 1 ,将以前的操做切换到IO线程,将以后的操做切换到主线程。app

整个RxJava的使用能够分为三个部分:框架

  • 建立发送数据的原始Observable
  • 使用Rxjava的操做符对发送事件作相应变换
  • 使用subscribeOn和observeOn作线程切换

接下来我会对这三个部分作详细的说明。异步

建立发送数据的原始Observable

建立发送数据的原始Observable采用的是 Observable.create() ,固然使用 Observable.just(1)Observable.from(list) 等等这些方法也能够建立发送数据的Observable,其实这些操做符的本质都是建立了一个新的Observable,在订阅的时候发送了一些数据。咱们着重看一下 Observable.create()ide

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        //确保source非空
        ObjectHelper.requireNonNull(source, "source is null");
        //返回一个ObservableCreate对象
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

第一句很好理解,确保传入的 ObservableOnSubscribe 对象非空,第二句本质是直接返回了 new ObservableCreate<T>(source) 执行的结果,咱们之后在看到 RxJavaPlugins.onAssembly(obj) 相似的代码时,直接能够理解为返回了一个 obj 。因此这个方法实际上作了一件事:建立了一个 ObservableCreate 类型的对象并返回。函数

这里咱们先了解一下在订阅时,也就是调用 subscribeWith(observer) 时具体是作了什么:源码分析

public final <E extends Observer<? super T>> E subscribeWith(E observer) {
        subscribe(observer);
        return observer;
    }
    
    public final void subscribe(Observer<? super T> observer) {
        //确保observer非空
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //这里通常都是直接返回传入的observer
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            
            //不一样类型的Observable的具体订阅的方法,全部的数据发送操做都在这个方法中
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

先看第一个方法 subscribeWith(E observer) ,在这个方法内部调用了 subscribe(observer) 这个方法。 subscribe(observer) 这个方法的核心是 subscribeActual(observer) 这一句。subscribeActual(observer) 这个方法是Observable内的一个抽象方法,Observable通过create操做符后会变成一个 ObservableCreate,通过map操做符会变成一个 ObservableMap 其余的操做符都是一个套路。通过操做符转化而来的Observable都是Observable的子类,在这些子类的内部都会实现 subscribeActual(observer) 这个抽象方法,并在这个方法内作发送事件的相关操做。post

通过上面的分析,咱们知道了 subscribeActual(observer) 这个方法是不一样Observable的关键,有了这个前置知识,咱们接着上面的进度看一下 ObservableCreate 这个Observable子类中的 subscribeActual(observer) 方法:ui

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        //对observer的包装
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            //这是关键
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

关键是 source.subscribe(parent) 这一句。这个source就是 ObservableOnSubscribe 的实例,是在 Observable.create(source) 中传入的参数。 ObservableOnSubscribe 是一个接口,在这个接口的 subscribe 方法中咱们定义了数据的发送方式。这个 ObservableOnSubscribe 能够理解为一个 数据发送的剧本 ,这个剧本具体的实现细节写在了 subscribe方法内。在最上面的例子中,咱们经过 e.onNext(1);e.onComplete() 发送了一个数字1,紧接着通知事件已经发送完毕。

使用RxJava的操做符对发送事件作相应变换

因为RxJava的事件操做符太多,咱们这里只讲map操做符的源码分析:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        //确保mapper非空
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //建立并返回一个ObservableMap对象
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

是否是很熟悉,map操做符和create操做符是一个套路,事实上全部的操做符都是一个套路: 建立并返回了一个对原有Observable的包装类 ,那咱们来看一下这个 ObservableMap

@Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    //对原始observer的包装类
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            U v;
            try {
                //在这里将T类型的原始数据t变换为类型U的新数据v
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //调用原始observer的onNext
            actual.onNext(v);
        }
        
        //忽略无关代码......
    }

ObservableMap 内部有有两个值得咱们注意的点:一个是咱们上面提过的 subscribeActual 方法;另外一个是它的内部类 MapObserver

subscribeActual 中依然是 source.subscribe(new MapObserver<T, U>(t, function)) 这句熟悉的代码,值得注意的是这句代码并无传入原始的observer,而是传入了 MapObserver 对象。简而言之, MapObserver 是一个包装类,它的内部包含咱们原始的observer: actual 和咱们变换的具体操做: mapper 。新的 MapObserver 包装类在它的 onNext 方法中作了两步操做:

  • 经过 mapper.apply(t) 实现数据的类型变换,获取新的变换后的数据
  • 将新类型的数据传入,经过 actual.onNext(v) 实现原始observer接收变换后新数据的功能

其实在不了解RxJava的源码以前,咱们经常会惊叹于RxJava的神奇:明明我发送的是数字1,我要接收的倒是字符串类型的数据,这中间仅仅是经过了一个map操做符,简直太神奇了!在咱们了解了map操做符的源码以后,就会知道:咱们在订阅的时候map操做符将原始的observer包装成了一个 MapObserver 对象,在这个 MapObserver 内部的 onNext 方法中首先会将原始数据 1 经过咱们传入的变换规则变换为 result : 1,在变换以后才会调用咱们编写的原始observer来处理新的 String 类型的数据。

使用subscribeOn和observeOn作线程切换

在分析线程切换以前,咱们先明确一些前置知识:从主线程切换到子线程的操做很简单,新开一条线程,在新的线程执行便可。从子线程切换到主线程,在Android开发中,全部的第三方框架使用的都是Handler机制。因此不要把这些第三方框架想的特别难,他们的本质都是Android中的基础知识。

咱们以 subscribeOn(Schedulers.io()) 这个例子来分析,首先传入的是 Schedulers.io() ,直接说结论吧,它就是一个 IoScheduler 类型的单例对象。继续看代码:

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

咱们都很熟悉了,本质是建立并返回了一个 ObservableSubscribeOn 类型的对象。咱们继续看 ObservableSubscribeOn 的内部:

public void subscribeActual(final Observer<? super T> s) {
        //建立一个对原始observer的包装对象
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);
        //这里是关键
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
        
        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }
        //忽略无关代码......
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //这一句咱们很是熟悉了,在这里实现了事件的订阅
            source.subscribe(parent);
        }
    }

这里跟 ObservableMap 中的 subscribeActual 也是一个套路:

  • 建立一个下一级observer的包装类 SubscribeOnObserver
  • 经过 source.subscribe(parent) 实现事件的订阅

经过看 SubscribeOnObserver 这个包装类中的 onNext 方法也能够明白:它的内部直接调用的就是 actual.onNext(t) ,没有像map同样作数据的变换,这很好理解,由于 subscribeOn(schedule) 自己就只是为了切换线程,并不作其余多余的操做,因此这个包装类中的 onNext 才会直接调用下一级observer的onNext。

可能有的小伙伴要有问题了:你说经过 source.subscribe(parent) 实现事件的订阅,但 subscribeActual 中并无你说的代码啊?其实在这里 scheduler.scheduleDirect(new SubscribeTask(parent)) 这里传入了一个 SubscribeTask 对象,这个对象实际上是个 Runnable ,在它的 run() 方法中调用了 source.subscribe(parent) 。到目前为止,咱们惟一陌生的就是 scheduleDirect 这个方法了,这个方法定义在 Scheduler 这个线程调度类中,它实际调用的是:

@NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        //createWorker是一个抽象方法,不一样的线程调度器有不一样的实现
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);
        //事件调度处理
        w.schedule(task, delay, unit);

        return task;
    }

scheduleDirect 这个方法作了两件事:建立一个 Worker 工做类,调用工做类的 schedule 来进行事件调度。不一样的线程调度器会有不一样的处理,对于 IoSchedule 这个调度器来讲,它最终是执行这个方法:

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        //忽略无关代码......
        Future<?> f;
        try {
            if (delayTime <= 0) {
                //经过线程池来开启一个子线程执行
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

经过上面的分析咱们能够知道, subscribeOn(Schedulers.io()) 本质上是经过线程池开启了一个线程,在这个新的线程中仍是经过调用 source.subscribe(observer) 来订阅事件。

subscribeOn(Schedulers.io()) 分析过了,咱们再来看一下 subscribeOn(AndroidSchedulers.mainThread()) 这个线程切换。
AndroidSchedulers.mainThread() 返回的是一个 HandlerScheduler 类型的线程调度器,它的 scheduleDirect 方法定义以下:

public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        
        //本质是经过handler机制实现线程切换
        handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
        return scheduled;
    }

从它的方法内部能够看到,它是经过 handler.postDelayed() 来实现切换到主线程的功能的,这个handler是定义在主线程的handler。其余类型的线程调度器就再也不分析了,本质都是大同小异。

接下来咱们来看一下 observeOn(AndroidSchedulers.mainThread()) ,额,其实它的套路也是同样的,建立并返回了一个 ObservableObserveOn ,咱们主要关注它的 subscribeActual 方法:

protected void subscribeActual(Observer<? super T> observer) {
            //忽略无关代码......
            
            Scheduler.Worker w = scheduler.createWorker();
            //订阅包装类ObserveOnObserver
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
    
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        //下一级观察者
        final Observer<? super T> actual;
        //对应Scheduler里的Worker
        final Scheduler.Worker worker;
        //上一级Observable发送的数据队列
        SimpleQueue<T> queue;
        Disposable s;
        //若是onError了,保存对应的异常
        Throwable error;
        //是否完成
        volatile boolean done;
        //是否取消
        volatile boolean cancelled;
        // 表明同步发送 异步发送 
        int sourceMode;
        ....
        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                //忽略无关代码......
        
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            //执行过error / complete ,直接返回
            if (done) {
                return;
            }
            //若是数据源类型不是异步的, 默认不是
            if (sourceMode != QueueDisposable.ASYNC) {
                //将上一级Observable发送的数据加入队列中
                queue.offer(t);
            }
            //这句代码是线程调度的核心,进入相应Worker线程,在相应线程发送数据队列中的数据
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            //这句代码是线程调度的核心,进入相应Worker线程,在相应线程发送数据队列中的数据
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            //开始调度
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
        
        @Override
        public void run() {
            //默认是false
            if (outputFused) {
                drainFused();
            } else {
                //取出队列中的数据并发送
                drainNormal();
            }
        }


        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                //若是已经结束或队列为空,跳出循环
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        //从队列里取出一个值
                        v = q.poll();
                    } catch (Throwable ex) {
                        //异常处理并跳出函数
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        return;
                    }
                    boolean empty = v == null;
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                
                    if (empty) {
                        break;
                    }
                    //调用下一级的observer的onNext方法发送事件
                    a.onNext(v);
                }
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            //若是已经disposed 
            if (cancelled) {
                queue.clear();
                return true;
            }
            // 若是已经结束
            if (d) {
                Throwable e = error;
                //若是是延迟发送错误
                if (delayError) {
                    //若是空
                    if (empty) {
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        //中止worker(线程)
                        worker.dispose();
                        return true;
                    }
                } else {
                    //发送错误
                    if (e != null) {
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    //发送complete
                    if (empty) {
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }
    }

能够看到 subscribeOnobserveOn 不一样的是两者线程切换的地方。 subscribeOn 是将 source.subscribe(observer) 整个部分切换到了相应的线程,而 observeOn 则是在 ObserveOnObserver 这个包装类中的 onNextonError 方法中作了线程的切换,具体是在 onNext 等方法中建立了相关的 Worker 工做类,并调用了这个类的 schedule 方法,这个跟咱们讲 subscribeOn 是同样的。

到此为止,RxJava2的源码就基本分析完了。最后根据最初的例子,用一幅图来展现RxJava的流程:
RxJava流程

相关文章
相关标签/搜索