咱们在使用RxJava的时候最经常使用的功能就是写一个被观察者、一个观察者。在被观察者中发射数据,在观察者中接收数据,最后用subscribe将二者给订阅起来实现最基础的功能。例以下面这种:git
//被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("aa");
e.onNext("bb");
e.onNext("cc");
e.onNext("dd");
e.onComplete();
}
});
//观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//TODO 初始化数据
d.dispose();
}
@Override
public void onNext(String value) {
//TODO 接收被观察者发送的数据
Log.i("result:",value);
}
@Override
public void onError(Throwable e) {
//TODO 错误
}
@Override
public void onComplete() {
//TODO 完成以后回调
}
};
//订阅
observable.subscribe(observer);
复制代码
那么在这种状况下,被观察者是如何发送数据给观察者?观察者又是如何接收数据?二者又是如何被subscribe订阅起来的呢?下面咱们经过源码的分析来查看这一切的操做流程。github
首先,在建立被观察者的时候,通常来讲都是经过Observable.creat()来建立。那么咱们进入creat()方法去看看里面作了什么操做。bash
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//非空判断
ObjectHelper.requireNonNull(source, "source is null");
//返回一个Observable
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
复制代码
咱们跳过非空判断逻辑,直接查看return。这里会经过RxJavaPlugins.onAssembly()返回一个Observable对象。那么咱们跳进onAssembly去查看里面是如何进行Observable的建立的。ide
public static <T> Observable<T> onAssembly(Observable<T> source) {
...
//返回传入的参数对象
return source;
}
复制代码
这里其实没作什么很特别的操做,仅仅只是返回了参数对象。也就是说咱们如今应该返回去重点研究的是这个参数对象source。而这个source根据前面的源码查看,能够看到实际上是new ObservableCreate()。咱们跳进这里查看源码分析
//ObservableCreat继承了Observable,为被观察者Observable的子类
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
//构造方法,传入参数ObservableOnSubscribe,也就是咱们在里面进行onNext、onComplete与onError的方法。
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//此方法是在被订阅subscribe时候才调用,具体后面再说。
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}
复制代码
代码进行了一些删减。学习
经过上面代码逻辑注释能够看到:ui
1.ObservableCreat为Observable的子类,由于他拥有Observable的所有特性。this
2.在ObservableCreat构造方法中传入了ObservableOnSubscribe,这个具体的做用咱们下面讲。spa
3.有一个subscribeActual方法,这个方法实际上是后面用做订阅的方法subscribe来实现的具体方式。线程
如今咱们再来看看传入ObservableCreat中参数ObservableOnSubscribe
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(ObservableEmitter<T> e) throws Exception;
}
复制代码
发现ObservableOnSubscribe实际上是一个接口,而这个接口里有一个方法,专门用来实现咱们的向观察者发送消息的方法。到此,被观察者在进行creat()的源码分析完毕,咱们来总结一下。
总结:
1.在进行creat的时候,内部返回了一个Observable,而这个Observable其实是一个继承了ObserVable的ObservableCreat类
2.ObserVable的ObservableCreat构造方法中传入了一个接口ObservableOnSubscribe,咱们通常进行数据的发送都是经过这个接口中的subscribe方法里的ObservableEmitter来进行onNext、onComplete与onError。
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//获取observer
observer = RxJavaPlugins.onSubscribe(this, observer);
//非空判断
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//订阅方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
复制代码
抛开上面的逻辑判断不谈,咱们直接看订阅方法subscribeActual()。不知道你们还记不记得,在咱们讲Observable.creat的时候,在ObservableCreat这个类里面有两个用到的方法,一个是构造方法,传入接口ObservableOnSubscribe,另一个是subscribeActual。而如今Observable.subscribe实际上就是在执行这个方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
//实例化CreatEmitter对象
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//执行观察者中的onSubscribe方法
//onSubscribe()回调所在的线程是ObservableCreate执行subscribe()所在的线程
//和subscribeOn()与observeOn()无关!
observer.onSubscribe(parent);
try {
//真正的订阅方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
复制代码
咱们在来看下这方法中实例化CreatEmitter对象的这个类。
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//当没有被取消订阅的时候就执行onNext()方法用于发送数据
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
//出现错误时调用这个方法,用于抛出异常,而且在抛出以后的finally中调用dispose用于取消订阅
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
//判断若是没有执行disposed方法就调用onComplete而且dispose。这也就是为何onComplete与onError为何只会执行其中一个
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
复制代码
里面的逻辑能够说是一点都不复杂,就是咱们平时常常使用的onNext、onComplete与onError方法。
onNext():先判断发送的消息是否为null,若是为空则调用onError方法来抛出异常。若不为空而且并未取消订阅,则发送数据。
onError():出现错误的时候执行这个方法。当抛去异常以后经过finally强行执行dispose()方法,来强制结束掉订阅。
onComplete():判断若是没有执行disposed方法就调用onComplete而且dispose。这也就是为何onComplete与onError为何只会执行其中一个。
分析一下各个类的职责:
Observable :我的理解是装饰器模式下的基类,实际上全部操做都是Observable的子类进行的实现
ObservableOnSubscribe: 接口,定义了数据源的发射行为
ObservableCreate: 装饰器模式的具体体现,内部存储了数据源的发射事件,和subscribe订阅事件
ObservableEmitter: 数据源发射器,内部存储了observer
Observer: 接收到数据源后的回调(好比打印数据等)
1.Observable.create(),实例化ObservableCreate和ObservableOnSubscribe,并存储数据源发射行为,准备发射(我已经准备好数据源,等待被订阅)
2.Observable.subscribe(),实例化ObservableEmitter(发射器ObservableEmitter准备好!数据发射后,数据处理方式Observer已准备好!)
3.执行Observer.onSubscribe()回调,ObservableEmitter做为Disposable参数传入
4.执行ObservableOnSubscribe.subscribe()方法(ObservableEmitter发射数据,ObservableEmitter内部的Observer处理数据)
具体其余的一些操做符的用法,请参考个人github:RxJavaDemo
有兴趣能够关注个人小专栏,学习更多知识:小专栏