楼主最近在找实习工做,因为简历上说了解RxJava,因此在面试的时候应该会问到RxJava的知识,因而楼主结合RxJava的源码,对RxJava的工做原理进行初步的了解。也只敢说是初步了解,由于本身也是第一次看RxJava的源码,理解的程度确定不是很深。仍是那样,若是有错误之处,但愿各位指正!
本文参考:html
1.除非特殊说明,源码来自:2.2.0版本面试
楼主打算将RxJava的源码分析写成一个系列文章,因此这个是这个系列的第一篇文章,在概述里面仍是对RxJava是什么简单的介绍一下,本系列文章不会对RxJava的基本用法进行展开,若是有老哥对RxJava的基本使用掌握的不是很好的话,推荐这个系列的文章:给初学者的RxJava2.0教程(一)。
简单的说一下RxJava,RxJava是基于观察者模式的一个框架,在RxJava中有两个角色,一个Observable,一般被称为被观察者,一个是Observer,一般被称为观察者。整体的架构是,由Observable来处理任务或者发送事件,而后在Observer里面来接受到Observable发送过来的信息。
RxJava有不少的优点,好比线程调度,在Android里面,耗时操做必须放在子线程中,可是同时还须要主线程来更细UI,因此线程调度就显得尤其重要。固然RxJava还有不少重要的操做符,使得咱们的开发变得很是的方便。本系列文章不会对每一个操做符的基本使用展开,而是对一些比较经常使用的操做源码分析,所说的经常使用,也是指楼主用到的!!毕竟是菜鸡,确定有不少的东西都不太懂。安全
想要对RxJava的基本原理有一个更好的了解,必须对它的基本有一个大概的了解。咱们先经过一个简单的案例,来对RxJava的基本元素进行提取。服务器
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
在这个简单的案例当中,咱们能够提取的元素有:Observable
, ObservableOnSubscribe
, ObservableEmitter
,Observer
。
元素仍是挺少的,咱们如今对每一个元素的类结构来进行简单的分析一下。架构
public abstract class Observable<T> implements ObservableSource<T> { }
咱们发现Observable自己是一个抽象类,而且实现了ObservableSource接口,在来看看ObservableSource接口里面有什么。app
public interface ObservableSource<T> { void subscribe(@NonNull Observer<? super T> observer); }
ObservableSource接口里面只有一个subscribe
方法,也就是说,RxJava将注册观察者这部分的功能提取成一个接口,从而能够看出来,面向接口编程是多么的重要😂😂。。。
再分别来看看咱们上面案例中使用的两个方法--create
和subscribe
。框架
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { // 先省略代码部分,待会详细的分析。 }
啊,吓我一跳,我觉得create方法的参数又是一个接口类型,还好是ObservableOnSubscribe
类型,也是上面提取出来的元素其中之一,关于这个类,待会会详细的分析。ide
public final void subscribe(Observer<? super T> observer) { //... }
这个方法就更加的简单了,就是传递了一个Observer接口的对象。不过须要注意的是这个方法有不少的重载,其中以Consumer类型的操做最为多,不过这个也没什么,最后仍是Consumer转换成为了Observer,这个就涉及到Observer接口的一个实现类--LambdaObserver
。不要惧怕,待会都会一一的讲解的。源码分析
说了被观察者,咱们先来看看观察者--Observer
。
public interface Observer<T> { void onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull Throwable e); void onComplete(); }
哎呀呀,更加的简单了, Observer只是简单的接口,不过咱们须要注意的是这个接口定义的4个方法,这里不讲解四个方法的做用,毕竟咱们这里将Observable的基本原理🙄🙄。
public interface ObservableOnSubscribe<T> { void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception; }
一如既往的接口,subscribe
方法里面就是具体作事情的地方,这个相信大佬们应该都知道,我这里就班门弄斧的提醒一下😂😂。
public interface ObservableEmitter<T> extends Emitter<T> { void setDisposable(@Nullable Disposable d); void setCancellable(@Nullable Cancellable c); boolean isDisposed(); ObservableEmitter<T> serialize(); boolean tryOnError(@NonNull Throwable t); }
ObservableEmitter
也是一个接口,同时继承了Emitter
接口,咱们来看看Emitter
接口的定义
public interface Emitter<T> { void onNext(@NonNull T value); void onError(@NonNull Throwable error); void onComplete(); }
做为一个发射信息器,Emitter
里面定义了不少关于发送消息给Observer
的方法,Emitter
的onNext
对应着Observer
的onNext
方法,其余的方法也是相似的。
咱们对相关部分的基本元素有了一个基本的了解,如今咱们来对整个流程的工做原理进行分析。首先咱们create
方法入手
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
create方法没有咱们想象中的那么难,就只有两行代码,还有一行用来check的😂😂。对于ObservableCreate
类这里先不进行分析,咱们来看看 RxJavaPlugins
的onAssembly
方法。
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }
这里提醒一下,onAssembly
方法的参数类型是Observable
类型,也就是说ObservableCreate
自己就是一个Observable
。好了,扯了题外话,来看看onAssembly
方法具体是干吗的。
整个方法的执行过程比较简单,若是onObservableAssembly
为null,直接就返回了source
,也就是说返回了ObservableCreate
自己。而咱们在整个Observable的源码中发现,onObservableAssembly
初始值自己为null。
public static void reset() { //······ setOnObservableAssembly(null); //······ }
为何须要这样子绕圈子的作呢?这里就是作了钩子,以便于之后的扩展。
因此Observable
的create
方法就是返回了一个ObservableCreate
对象,不过须要注意的是ObservableCreate
包裹了一个ObservableOnSubscribe
对象,也就是咱们在create方法里面new的那个ObservableOnSubscribe
对象。
咱们先来不急着去理解ObservableCreate
是什么,仍是来看看subscribe
方法为咱们作了什么。
当咱们经过Observable的create方法来获取一个Observable对象时,一般还会调用Observable的subscribe方法来注册一个观察者。如今咱们来看看subscribe方法的实现。
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
整个过程也不是想象中的那么神秘,除去check相关的方法不看,归根结底就是两行代码,先是经过RxJavaPlugins
的onSubscribe
方法来获取Observer
对象,具体操做这里就不说了,确定跟RxJavaPlugins
的onAssembly
方法差很少,最后返回的是observer自己,最后调用了subscribeActual
方法。这个subscribeActual
方法是干吗的?
protected abstract void subscribeActual(Observer<? super T> observer);
卧了个槽?抽象方法!那我怎么知道调用的是哪一个类的subscribeActual
方法?不急哈,记得咱们以前在create
方法返回的Observable
对象是哪一个类的对象吗?想起来了吧,是ObservableCreate
先来看看ObservableCreate
类结构。
public final class ObservableCreate<T> extends Observable<T> { }
咱们发现,ObservableCreate
继承了Observable
,其实在分析create方法时,我也说过哟。
在ObservableCreate
类中,只有一个ObservableOnSubscribe
类型的成员变量,这个成员变量就是咱们在create
方法里面new的ObservableOnSubscribe
对象
咱们再来看看ObservableCreate
对subscribeActual
方法的实现
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
在subscribeActual
方法里面,先是对Observer
对象进行一次包装,将它包装在CreateEmitter
类中。而后咱们会发现两个比较眼熟的方法onSubscribe
方法和subscribe
方法。其中onSubscribe
方法在Observer
里面看到过,而这里刚好是经过Observer
对象来调用的,没错,这个的observer
就是在subscribe
方法里面new的对象。但是咱们记得onSubscribe
方法的参数类型是Disposable
,而这里是一个CreateEmitter
。咱们来看看CreateEmitter的类结构:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { //······ }
没错,CreateEmitter
实现了Disposable
接口,因此CreateEmitter
自己能够充当Disposable
的角色。
调用了Observer
的onSubscribe
方法以后,而后就会调用ObservableOnSubscribe
的subscribe
方法。
到这里,咱们应该完全的明白了整个Observable
的工做流程。咱们经过create方法建立一个ObservableCreate
方法,而后调用了subscribe
方法来注册了一个观察者,在subscribe
方法里面又调用了subscribeActual
方法,在subscribeActual
方法里面先是调用了Observer
的onSubscribe
方法,而后调用了
ObservableOnSubscribe
的subscribe
方法,在ObservableOnSubscribe
的subscribe
方法当中,具体的作的事有两件:1.作咱们本身的事情,好比从服务器上获取数据之类;2.将发送信息到Observer
去。
理解了整个流程的工做原理,咱们如今来看看CreateEmitter
是怎么信息发给Observer
的。
咱们知道,咱们在ObservableOnSubscribe
的subscribe
方法里面使用ObservableEmitter
来发射信息到Observer
。如今咱们来看看整个CreateEmitter
的工做原理,不过,咱们仍是先来看看这个类的结构,虽然上面已经看了,可是担忧大佬们忘了:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { //······ }
在上面已经说了CreateEmitter
实现了Disposable
接口,能够做为Disposable
对象来操做,在接下来,咱们将重点介绍Disposable
是怎么控制Observer
对信息的接收,同时还会介绍CreateEmitter
做为ObservableEmitter
接口的那部分功能。
以前在分析基本元素时,已经说了ObservableEmitter
这个接口,它实现了Emitter
接口。在Emitter
接口里面有三个方法用来发送信息给Observer
,分别是:onNext
,onError
,onComplete
。而CreateEmitter
类则是具体的实现了这三个方法,咱们来看看。
public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } }
代码是很是的简单,直接调用了Observer
的onNext
方法,也没用什么高逼格的东西😂😂。其他两个方法也是如此。只不过是,在调用onNext
方法时作了一个isDisposed
的判断。
因此感受Disposable
才是这个类的核心。咱们来看看isDisposed
方法:
@Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); }
在isDisposed
方法里面调用了DisposableHelper
的isDisposed
方法。不过这里须要注意的是这里传递过去的是get方法的返回值,这个返回值什么意思?
回到CreateEmitter
的类结构,发现它继承了AtomicReference
类,因此get方法返回的是一个Disposable
对象。
同时,咱们发现CreateEmitter
的dispose
方法也是经过DisposableHelper
类进行进行操做的,看看要理解Disposable
的功能,必须了解DisposableHelper
是怎么操做的。
从感官上来讲,一个发射信息器是否dispose
,直接设置一个boolean
类型的flag就OK了,为何搞得这么复杂,又是AtomicReference
,又是DisposableHelper
。这一切,咱们从DisposableHelper
来寻找答案。
首先咱们仍是来看看DisposableHelper
的结构:
public enum DisposableHelper implements Disposable { DISPOSED ; }
DisposableHelper
自己是一个enum类型,同时实现了Disposable
接口。这里使用enum主要是为了作一个DISPOSED
的单例。而后在经过isDisposed
方法来判断是否dispose
,能够直接与DISPOSED
比较。
public static boolean isDisposed(Disposable d) { return d == DISPOSED; }
既然判断是否dispose
是直接与DISPOSED
比较,那么若是dispose
的话,应该是将AtomicReference
里面的值设置为DISPOSED
吧?咱们来看一下dispose
方法:
public static boolean dispose(AtomicReference<Disposable> field) { Disposable current = field.get(); Disposable d = DISPOSED; if (current != d) { current = field.getAndSet(d); if (current != d) { if (current != null) { current.dispose(); } return true; } } return false; }
果真,跟咱们猜想同样的,AtomicReference
里面的值设置为DISPOSED
。只是,这里为了线程安全,作了不少的判断操做。
从这里咱们能够获得,为何须要设置DisposableHelper
来控制dispose
的状态,那是由于线程安全,若是直接设置一个flag,在有些状况下,可能存在线程不安全的风险。同时为了代码的优雅,若是这部分的逻辑写在CreateEmitter
里面,会不会显得冗杂呢?
写到这里,我感受也差很少了。这里对着部分的知识作一个总结。
1.在整个流程中,基本有Observable
,ObservableOnSubscribe
,ObservableEmitter
,Observer
,若是想要对整个过程有一个大概的理解,必须对这几个元素有基本的认识。
2.Observer
的onNext
之类方法的触发时机,其实是Observable
的subscribe
方法,由于subscribe
方法调用了Observable
的subscribeActual
方法,而在subscribeActual
方法里面作了两部分的操做:1.直接调用了Observer
的onSubscribe
方法;2.使用ObservableEmitter
将Observer
包裹起来,因此咱们在ObservableOnSubscribe
的subscribe
方法用ObservableEmitter
来发射信息,至关于调用了Observer
的相关方法。
3.在ObservableEmitter
的onNext
之类方法里面,存在一种相似AOP的代码,由于在调用Observer
的相关方法,作了一些其余的操做。
做者:琼珶和予 连接:https://www.jianshu.com/p/f17821d2cf78 來源:简书 简书著做权归做者全部,任何形式的转载都请联系做者得到受权并注明出处。