在探索了RxJava的源码以后,在这里作一次总结。源码大体的执行流程。bash
1 建立Observable,而后使用操做符转换或者处理Observable,每个操做符,都会产生一种对应类型的Observable。咱们知道Observable是链式调用的,好比:框架
Flowable<String> observable = Flowable
// 建立FlowableJust对象
.just("hello", "world")
// 建立建立FlowableMap对象,而且在建立的时候将上一步建立的对象传入FlowableMap中
.map(String::toUpperCase)
;
复制代码
首先我建立了A对象,在A对象的基础之上进行下一步操做产生B对象,在建立B对象的时候,能够将本身做为参数传递给B对象,那么B对象的source就是A对象。ide
2 建立消费者,咱们写的消费者本质上是RxJava框架封装到最后很是方便使用的,就是一个单纯的Observer类。this
3 调用subscribe方法,将上面的步骤进行串联起来。好比说咱们建立了A,链式调用又建立了B,而后B又建立了C,这个时候C有B的引用,B有A的引用。spa
可是在最后真正调用subscribe方法的时候,爆出的对象是C对象,即用C去调用subscribe方法。code
这时:cdn
源头Observer
,C建立本身的Observer,好比CObserver
,参数为源头Observer,而且将其设置为downstrem。CObserver
,而后B对象接受到CObserver
建立本身的观察者,包装CObserver
为本身的downstream先整理下,这个时候分别建立了:AObserver,BObserver,CObserver,源头Observer(咱们本身写的),而且server
AObserver:对象
BObserver:blog
CObserver:
而对于Observable来讲,则与上面相反。
CObservable:
BObservable:
AObservable:
4 当Observer传递到AObserver,即最开始产生数据哪里时,其调用自身Observer的onSubscribe方法,由于每一个自身的Observer都有downstream的Observer,再逐次调用downstream的onSubscribe方法。
在调用onSubscribe方法时,将自身传递进去,这时BObserver的upstream为AObserver。依次类推
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
downstream.onSubscribe(this);
s.request(Long.MAX_VALUE);
}
}
复制代码
5 咱们知道最终调用的是首先建立的AObserveable,AObservable的subscribeActual方法中,在调用onSubscribe以后,将生产的数据与Observer联合起来,再运行。
AObserver运行完本身的观察者逻辑后,交由本身的downstream继续进行onNext方法,直到咱们本身的消费者调用onNext方法。
提及来有点绕,可是大致上就是在建立观察者和消费者的时候,使用某种方式将其关联起来。