RxJava && Agera 从源码简要分析基本调用流程(1)

版权声明:本文由晋中望原创文章,转载请注明出处: 
文章原文连接:https://www.qcloud.com/community/article/123编程

来源:腾云阁 https://www.qcloud.com/community网络

 

相信不少作Android或是Java研发的同窗对RxJava应该都早有耳闻了,尤为是在Android开发的圈子里,RxJava渐渐开始广为流行。一样有不少同窗已经开始在本身的项目中使用RxJava。它可以帮助咱们在处理异步事件时可以省去那些复杂而繁琐的代码,尤为是当某些场景逻辑中回调中嵌入回调时,使用RxJava依旧可以让咱们的代码保持极高的可读性与简洁性。不只如此,这种基于异步数据流概念的编程模式事实上一样也能普遍运用在移动端这种包括网络调用、用户触摸输入和系统弹框等在内的多种响应驱动的场景。那么如今,就让咱们一块儿分析一下RxJava的响应流程吧。
(本文基于RxJava-1.1.3)异步

一.用法

首先来看一个简单的例子:
函数

运行结果为:

从结果中咱们不难看出总体的调用流程:线程

首先经过调用Observable.create()方法生成一个被观察者,紧接着在这里咱们又调用了map()方法对原被观察者进行数据流的变换操做,生成一个新的被观察者(为什么是新的被观察者后文会讲),最后调用subscribe()方法,传入咱们的观察者,这里观察者订阅的则是调用map()以后生成的新被观察者。3d

在整个过程当中咱们会注意到三个主角:Observable、OnSubscribe、Subscriber,全部的操做都是围绕它们进行的。不难看出这里三个角色的分工:code

  • Observable:被观察者的来源,亦或说是被观察者自己
  • OnSubscribe:用来通知观察者的不一样行为
  • Subscriber:观察者,经过实现对应方法来产生具体的处理。

因此接下来咱们以这三个角色为中心来分析具体的流程。orm

二.分析

1.订阅过程

首先咱们进入Observable.create()看看:

这里调用构造函数生成了一个Observable对象并将传入的OnSubscribe赋给本身的成员变量onsubscribe,等等,这个hook是从哪里冒出来的?咱们向上找:

RxJavaObservableExecutionHook这个抽象Proxy类默认对OnSubscribe对象不作任何处理,不过经过继承该类并重写onCreate()等方法咱们能够对这些方法对应的时机作一些额外处理好比打Log或者一些数据收集方面的工做。对象

到目前最初始的被观察者已经生成了,咱们再来看看观察者这边。咱们知道经过调用observable.subscribe()方法传入一个观察者即构成了观察者与被观察者之间的订阅关系,那么这内部又是如何实现的呢?看代码:
blog

这里咱们略去部分无关代码看主要部分,subscribe.onStart()默认空实现咱们暂且不用管它,对于传进来的subscriber要包装成SafeSubscriber,这个SafeSubscriber对原来的subscriber的一系列方法作了更完善的处理,包括:onError()onCompleted()只会有一个被执行;保证一旦onError()或者onCompleted()被执行,将再也不能再执onNext()等状况。这里封装为SafeSubscriber以后,调用onSubscribe.call(),并将subscriber传入,这样就完成了一次订阅。

显而易见,Subscriber做为观察者,在订阅行为完成后,其具体行为在整个链式调用中起着相当重要的做用,咱们来看看它内部的构成的主要部分:


每一个Subscriber都持有一个SubscriptionList,这个list保存的是全部该观察者的订阅事件,同时Subscriber也对应实现了Subscription接口,当这个Subscriber取消订阅的时候会将持有事件列表中的全部Subscription取消订阅,而且今后再也不接受任何订阅事件。同时,经过Producer能够去限定该Subscriber所接收的数据流的总量,这个限制量实际上是加在Subscriber.onNext()方法上的,onComplete()onError()则不会受到其影响。由于是底层抽象类,onNext()onComplete()onError()统一不在这里处理。

2.变换过程

在收到Observable的消息以前咱们有可能会对数据流进行处理,例如map()、flatMap()、deBounce()、buffer()等方法,本例中咱们用了map()方法,它接收了原被观察者发射的数据并将经过该方法返回的结果做为新的数据发射出去,至关于作了一层中间转化:

咱们接着看这个转化过程:

这里是经过一个lift()方法实现的,再查看其余的转化方法发现内部也都使用lift()实现的,看来这个lift()就是关键所在了,不过不急,咱们先来看看这个OperationMap是什么:

OperationMap实现了Operator接口的call()方法,该方法接受外部传入的观察者,并将其做为参数构造出了一个新的观察者,咱们不难发现o.onNext(transformer.call(t));这一句起了相当重要的做用,这里的接口transformer将泛型T转化为泛型R:

这样以后,再将转换后的数据传回至原观察者的onNext()方法,就完成了观察数据流的转化,可是你应该也注意到了,咱们用来作转换的这个新的观察者并无实现订阅被观察者的操做,这个订阅操做又是在哪里实现的呢?答案就是接下来的lift()

在这里咱们新生成了一个Observable对象,在这个新对象的onSubscribe成员的call()方法中咱们经过operator.call()拿到以前生成的未产生订阅的观察者st,以后将它做为参数传入一开始的onSubscribe.call()中,即完成了这个中间订阅的过程。
如今咱们将整个流程梳理一下:

  • 一次map()变换

  • 根据Operator实例生成新的Subscriber

  • 经过lift()生成新的Observable

  • 原Subscriber订阅新的Observavble

  • 新的Observable中onSubscribe通知新Subscriber订阅原Observable

  • 新Subscriber将消息传给原Subscriber。

为了便于理解,这里借用一下扔物线的图:

以上就是一次map()变换的流程,事实上屡次map()也是一样道理:最外层的目标Subscriber发生订阅行为后,onSubscribe.onNext()会逐层嵌套调用,直至初始Observable被最底层的Subscriber订阅,经过Operator的一层层变化将消息传到目标Subscriber。再次祭出扔物线的图:

至于其余的多种变化的实现流程也都很相似,借助于Operator的不一样实现来达到变换数据流的目的。例如其中的flatMap(),它须要进行两次lift(),其中第二次是OperationMerge,将转换成的每个Observable数据流经过InnerSubscriber这个纽带订阅后,在InnerSubscriber的onNext()中拿到R,再经过传入的parent(也就是原MergeSubscriber)将它们所有发射(emit)出去,由最外层咱们传入的Subscriber统一接收,这样就完成了 T => Observable<R> => R 的转化:




除此以外,还有许多各式各样的操做符,若是它们还不能知足你的须要,你也能够经过实现Operator接口定制新的操做符。灵活运用它们每每能达到事半功倍的效果,好比经过使用sample()debounce()等操做符有效避免backpressure的须要等等,这里就不一一介绍了。

下篇将继续从"线程切换过程"开始分析

文章来源公众号:QQ空间终端开发团队(qzonemobiledev)

相关文章
相关标签/搜索