RxJava给咱们提供了不少变换的操做符,map、flatMap就是比较经常使用的操做符,通常咱们使用的时候,都是看官方文档来了解每一个操做符的含义,可是我本身感受下来,看官方文档使用没问题,可是总有一点隔靴搔痒的意思,因此我还要去RxJava的源码一探究竟,作到心中有数。html
咱们先从相对简单的 Map 开始react
官方定义:transform the items emitted by an Observable by applying a function to each itemapp
拙劣的翻译:应用一个函数 转换全部的被发射的item函数
官方的图解:oop
到这里咱们总结一下:线程
这里抛出一个问题,map 调用咱们提供的function进行转换,那么这个function在何时被调用?在哪一个线程被调用?(这个对咱们实际工程中使用map有意义,知道代码被执行的线程是必须的)翻译
废话很少说,进入源码3d
Observable类是RxJava的门面,基本上全部的转换符都在这里定义,直接看Map 的方法定义orm
能够看到,Function类,泛型有2个参数,第一个是原数据类型,第二个是转换后的数据类型,最终返回的是ObservableMap 类(RxJava的类命名很规范,若是是Observable类型的就是Observable开头 + 具体的操做符名称,若是是Observer类型的 就是 具体的操做符名称 + Observer结尾)咱们进入ObservableMap类,Observable类以前的文章有提到过,subscribeActual 是个重要的钩子方法,因此咱们直接看ObservableMap如何重写该方法的cdn
方法代码就一行,调用装饰的Observable的subscribe方法,传递一个MapObserver对象,Observer类咱们就比较熟悉了,咱们这里主要看onNext方法
代码也很简单,红框标识的就是 mapper 转换函数被调用的地方,获得转换后的对象v,传递给被装饰的Observer 的onNext方法,到这里,一次数据的map转换就结束了。源码的实现仍是很简单的,在咱们了解了源码的实现后,思路会更清晰,写代码时也会更有把握。
如今咱们来解答前面咱们抛出的问题,Function在何时被调用?在哪一个线程被调用? Function调用的地方已经清楚了,在ObserverMap 的 onNext方法中,那么调用的线程呢,由于是在Observer方法中被调用,因此若是在map 以前 调用了 ObserverOn 方法设置监听线程,那么就在该监听线程,若是没有设置 ObserverOn 可是设置了 SubscribeOn方法设置发射线程,那么就在该 发射线程,若是SubscribeOn也没有设置,那就在Observable的建立线程。
到此Map 就介绍完了,接下来是Map 的好兄弟 FlatMap,调用逻辑稍微复杂一点点,看官们耐心 -。-
###FlatMap
官方定义:transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
拙劣的翻译:应用一个函数 转换全部的被发射的item从一个Observable转成为多个Observable,并将全部要发射的数据平铺为一个Observable
官方的图解:
到这里咱们总结一下:
这里抛出一个问题,flatMap会将原来的Observable,转换为多个Observable来发射数据,那么这些发射的数据是否会严格按顺序发射而后被Observer接收?
问题先留在这里,进入源码
FlatMap操做符涉及的代码会相对多一些,可是也是有规律可循。 一样到Observable 类中看 flatMap的定义,源码做者为了方便开发者调用,提供了多个方法重载,咱们最经常使用的方法定义以下
最终调用的方法是
跟map 的套路 差很少,咱们直接进入 ObservableFlatMap类, 咱们仍是看它的 subscribeActual 方法实现
能够看到,它给原Observer 装饰后的 Observer 是 MergeObserver,咱们再继续看 MergeObserver 的 onNext 方法
因为咱们默认调用的flatmap 的 maxConcurrency 大小是 Integer.MAX_VALUE, 因此最终会调用 subscribeInner(p),注意这里咱们的mapper方法以及被调用了,p就是跟咱们传入的Function生成的Observable,咱们再继续往下看
通常咱们传入的Function 生成的Observable 都不是 Callable类型的,因此最终传给Observable p 的 是InnerObserver, 找到了最终元凶,直接去看它的onNext方法实现吧。
funsionMode 默认是 None,走第一个if 逻辑,最终调用的是 上面的MergeObserable 的 tryEmit 方法,继续进去看
这里要插一句,MergeObserver 继承了 AtomicInteger,因此这里的tryEmit方法就利用了 AtomicInteger 的同步机制,因此同时只会有一个 value 被 actual Observer 发射,并且这里 恰好 能够解答咱们上面留下的 问题,因为 AtomicInteger CAS锁只能保证操做的原子性,并不保证锁的获取顺序,是抢占式的,因此最终数据的发射顺序并非固定的(同一个Observable发出的数据是有序的)
若是没有获取到锁,就会将要发射的数据放入 队列中,drainLoop 方法会循环去获取队列中的 数据,而后发射,因为篇幅有限,更详细的调用过程你们能够看源码。
Map 和 FlatMap 二个操做符的 源码就解析到这里,水平有限,有不对的,还望大佬不吝赐教。