正在看rxjava,看到lift,在阅读了源码和网上的一些文章,整理了下思路。下文着重不是直接分析源代码,而是从lift解决什么问题和如何解决角度分析lift应该是作什么/怎么作的问题。具体源码实现请参考rxjava,网上不少文章分析的很详细。java
Observable的本质上就是异步获取/加工数据(OnSubscribe的call方法),而后通知observer(Observer的几个方法)的一个框架。每一个Observable都有一个OnSubscribe(继承Action1接口)对象。在调用Observable的subscribe方法建立,一旦subscribe后,Observable就开始工做。程序员
举例来讲,对于一个Observalbe<JSONObject>的对象,能够看做它最老是发射JSONObject数据,要求下游提供一个Subscriber<JSONObject>(Subscriber实现了Observer)来接收数据,而Subscriber<JSONObject>则放在OnSubscribe的参数。网络
rxjava中的lift是各类操做符的核心所在,具体操做符提供不一样的如map,filter等效果。lift的代码设计比较精细,其实只要理解了上面Observable的本质,lift的实现也就迎刃而解了。app
刚才讲Observable要获取/加工数据,那么它是怎么获取/加工数据呢,方式不少,如最基本的例子框架
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext(new String()); } });
这个是最简单的,可是new String()可说是一个“获取数据的例子”,固然这样写毫无心义。而可能的一种实现是网络获取如异步
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(final Subscriber<? super String> subscriber) { RemoteApi.getInstance().getCurrentUserName(new Callback<String>(){ public void onSuccess(String username){ subscriber.onNext(username); subscriber.onCompleted(); } public void onFail(int code, String detail){ subscriber.onError(new Exception(detail)); } }); } });
这都很好理解。ide
而lift本质是一个Observable数据是从另外一个Observable获取应该怎么处理呢?一言不合直接先上代码post
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> { static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); final OnSubscribe<T> parent; final Operator<? extends R, ? super T> operator; public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) { this.parent = parent; this.operator = operator; } @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); parent.call(st); } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } } }
咱们来分析下。this
Observable1(O1)调用lift返回Observable2(这是个新的对象O2),此时O2要从O1获取数据,O2是消费者,O1是生产者spa
O2调用另一个O1获取数据实际上要作3件事
1. 让O1开始获取数据
2. 获取数据后,发射给O2。
3. O2获得数据后,要发射给O2的消费者
先看第1,如何让O1开始获取数据?记得开始咱们所讲的Observable,它有个OnSubscribe对象,它的call方法是获取数据的地方。所以,很简单,调用该方法。
不过,“调用该方法”这么简单一句话通常是产品经理的说得,做为一个程序员,当你脑子想到“调用该方法”时候,就须要落实到实现:要用到的对象从哪来,方法参数是什么,方法参数从哪来,是否有返回值,返回值怎么处理等。
在这里,O2就是经过调用O1的OnSubscribe对象的call方法让O1开始工做的。O1的OnSubscribe对象是在建立O2是传入的,代码清晰可见。
OnSubscribe的call对象要接收一个O2的Subscriber对象,这个就是咱们关注的第二件事:“获取数据后,通知Observable2”。
而这里O2传给O1 OnSubscribe对象的Subscriber对象从哪来的?这就是lift的参数Operator的做用了。Operator就是负责提供给生产者(O1)监听回调Subscriber的做用,它实际是泛型为Subscriber的Func1的接口。不一样的操做符实质是不一样的Operator。好比map方法是OperatorMap,filter的是OperatorFilter,observeOn的是OperatorObserveOn(一样形式实现了线程切换,NB吧)
所以一个调用流程是
1. 第一步:O2的OnSubscribe的call -> 第二步:O2用operator构造Subscriber -> 第三步:O2用该Subscriber调用O1的OnSubscribe的call
按照这三步依次上溯,直到最后一个没有parent的Observable
2. 顶级Observable获取数据,调用下游Observable传来的Subscriber发射数据
3. 若是该Subscriber是你写的(经过subscribe方法),这个就结束了;若是是级联Observer,则
4. 上一步的Subscriber是O2的Operator构造出来,这个Subscriber一个任务,就是对收到的数据进行处理,而后在通知O2的下游消费者(由于下游消费者的Subscriber对象会保存在operator返回的Subscriber中)
5. 如此,2,4,反复调用,直至到第三步,一切game over
借用扔物线文章的图片以下