一 前言bash
上一篇讲解的被观察者的分类,以及特殊的subject/Processor。起初脑海中构建这篇文章大概是讲解subject,可是被《一篇不太同样的RxJava介绍》深深的吸引,这篇文章阐述了rx真实的推导过程,原来Observable是一个异步集合——从主动问是否有数据,到被动等通知数据到来,其实被动的等通知就是统一同步和异步世界的钥匙,而且顺带统一的回调。以后一发不可收拾的又看了《Observable究竟如何封装数据》简单有效的源码分析。仍是不够过瘾,因而就有了这篇文章------谈不上源码分析,只是简单点进去看看里边的大体调用原理。app
二 仅仅看看你异步
先来看这样一段代码:ide
代码一:
Observable<Integer> Observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
}
});复制代码
点进去静态方法create内部代码以下:源码分析
源码一:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}复制代码
source指的是外部传入的ObservableOnSubscribe对象。source又传入ObservableCreate对象中,而且静态方法返回这个ObservableCreate对象。
post
继续跟进ObservableCreate对象,这个类代码以下:ui
源码二:
//1 它是一个Observable
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
//2这里的 source 就是外部传入的ObservableOnSubscribe
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override //注意这个方法,下边进行阐述。
protected void subscribeActual(Observer<? super T> observer) {
//建立发射对象,传入被观察者
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
//3.调用外部逻辑,对外暴漏发射对象
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}复制代码
源码二有两点须要阐述:this
1.CreateEmitter经过回调对外暴漏对象,同时CreateEmitter持有observer对象,理所固然能够猜到CreateEmitter内部是onNext方法中去调用observer的onNext方法。点进去猜对了。spa
2. subscribeActual方法何时调用?当订阅的时候才会调用这个方法,不信你能够查看Observable$subscribe方法,里边对subscribeActual方法进行了调用。线程
很显然这个Observable1没有被订阅,因此目前来subscribeActual方法不会被调用。
咱们使用map去转换Observable1而后返回Observable2:
代码二:
Observable<Integer> Observable2 = Observable1.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer + 10;
}
});复制代码
点进去map,能够看到
源码三:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
这个this就是Observable1实例,毕竟是调用他的map方法
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
} 复制代码
继续跟踪源码三中的ObservableMap以下:
源码四
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
//注意这个source是上边传的this,也是Observable1实例
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override//这个方法是订阅的时候才调用
public void subscribeActual(Observer<? super U> t) {
//注意这个source是上边传的this,也是Observable1实例
source.subscribe(new MapObserver<T, U>(t, function));
} }复制代码
源码3、四有几点须要阐述:
源码四中的source就是上流的Observable1事例。
由于Observable2 没有订阅,因此subscribeActual方法不会被调用。
咱们再去使用doOnNext去作一次转换
代码三:
Observable<Integer> Observable3 = Observable2.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});复制代码
点进去doOnNext查看源码:
源码五:
private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
//这里的this是指的上流的Observable2,毕竟调用的他的doOnEach方法
return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
} 复制代码
继续跟踪ObservableDoOnEach
源码六:
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
//这个source是上边传入的this,也就是Observable2实例
public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Action onAfterTerminate) {
super(source);
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onAfterTerminate = onAfterTerminate;
}
@Override
public void subscribeActual(Observer<? super T> t) {
//这个source是上边传入的this,也就是Observable2实例
source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
} 复制代码
源码六中的source就是上流的Observabl2事例。
截止到目前Observable3 也没有被订阅,因此全部的subscribeActual方法都没有被调用。
接下来咱们去订阅Observable3(注意仅仅是订阅Observable3):
Observable3 .subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});复制代码
这个时候源码六中的subscribeActual方法被触发,subscribeActual
中的逻辑又会触发Observable2被订阅,这个时候源码四中的的subscribeActual
方法被触发,subscribeActual
中的逻辑又会使得Observable1被订阅,当Observable1被订阅的时候,源码二中的subscribeActual
方法就开始建立发射对象,进行发射。这就完成了一处订阅,到处订阅的效果。
到此,咱们只是跟踪了订阅的源码。
咱们接下来跟踪一下观察者是怎么传递和执行的!
看源码六,外部传入的Observer经过DoOnEachObserver包装而后传递给Observable2。继续跟踪又经过源码四中的MapObserver包装传递到Observable1
最后传递到源码二中的CreateEmitter对象中被调用,这就经过包装者模式完成数据转换的层层封装。
三总结
都是下流被观察者持有上流被观察者对象引用,能够保证subscribeActual方法
递归的向上调用。每次调用都把观察者使用包装者模式包装一下本层的逻辑,而后传递给上游,直至到源头流。
在订阅时,实际上这个顺序是逆向的,从下游往上游进行订阅。数据的传递和变换则是正常,方向从上游往下游进行依次处理,最终执行咱们subscribe中传递的Observer.onNext()。
以上分析所有是基于同步进行的分析,若是想要印证:
observeOn 后面的全部操做都会在它指定线程工做。subscribeOn 指定的线程是从这个Observable生成一直到
遇到其余 observeOn。若是程序须要屡次切换线程,使用屡次observeOn是彻底能够的。而subscribeOn只有最
上方的subscribeOn会起做用。复制代码
就须要查看ObservableObserveOn被观察者中的subscribeActual
方法。和ObservableSubscribeOn被观察者的subscribeActual方法。有兴趣的可自行研究。大体就是subscribeOn的原理就是实现一个runnable,而后再runnable中去开启上流订阅,这样全部观察者的逻辑都和runnable的线程一致。而后指定runnable的线程就能够改变全部观察者的线程。observeOn的原理就是获取下游传递过来的观察者,而后指定下流传递过来的被观察者的线程。伪代码以下:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//整个订阅都是在runnable中,因此能够指定上游的线程和全部观察者的线程
source.subscribe(parent);
}
}复制代码
上边代码注释很清楚,指定全部观察者线程,这就让全部的代码经过一行代码进行了线程切换。直到遇到切换下游线程的代码observeOn。另外关于指定下游的伪代码:
Scheduler.Worker w = scheduler.createWorker();
这里的订阅是转换完成再指定下游的线程
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));复制代码