RxJava中提供了大量的操做符,这大大提升了了咱们的开发效率。其中最基本的两个变换操做符就是map
和flatMap
。而其余变换操做符的原理基本与map
相似。java
Observable
map
对Observable发射的每一项数据应用一个函数,执行变换操做。对原始的Observable发射的每一项数据应用一个你选择的函数,而后返回一个发射这些结果的Observable。git
flatMap
将一个发射数据的Observable变换为多个Observables,而后将它们发射的数据合并后放进一个单独的Observable。操做符使用一个指定的函数对原始Observable发射的每一项数据执行变换操做,这个函数返回一个自己也发射数据的Observable,而后FlatMap合并这些Observables发射的数据,最后将合并后的结果当作它本身的数据序列发射
经过代码来看一下二者的使用用方法:github
Observable.just(new User("白瑞德")) .map(new Function<User, String>() { @Override public String apply(User user) throws Throwable { return user.getName(); } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Throwable { System.out.println(s); } }); <<<白瑞德 复制代码
这段代码接受一个User对象,最后打印出User中的name。bash
假设存在一个需求,图书馆要打印每一个User借走每一本书的名字: User
结构以下:markdown
class User {
private String name;
private List<String> book;
}
复制代码
咱们来看一下map
的实现方法:app
Observable.fromIterable(userList) .map(new Function<User, List<String>>() { @Override public List<String> apply(User user) throws Throwable { return user.getBook(); } }) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) throws Throwable { for (String s : strings) { System.out.println(s); } } }); 复制代码
能够看到,map
的转换老是一对一,只能单一转换。咱们不得不借助循环进行打印。 下面咱们来看一下flatMap
的实现方式:ide
Observable.fromIterable(userList) .flatMap(new Function<User, ObservableSource<String>>() { @Override public ObservableSource<String> apply(User user) throws Throwable { return Observable.fromIterable(user.getBook()); } }) .subscribe(new Consumer<String>() { @Override public void accept(String o) throws Throwable { System.out.println(o); } }); 复制代码
flatmap
既能够单一转换也能够一对多/多对多转换。flatMap
使用一个指定的函数对原始Observable
发射的每一项数据之行相应的变换操做,这个函数返回一个自己也发射数据的Observable
,所以能够再内部再次进行事件的分发。而后flatMap
合并这些Observables
发射的数据,最后将合并后的结果当作它本身的数据序列发射。函数
下面咱们就结合源码来分析一下这两个操做符。为了下降代码阅读难道,这里只保留核心代码:oop
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { //接受一个Function实例,并返回一个ObservableMap return new ObservableMap<T, R>(this, mapper); } public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { //调用用父类构造方法,初始化父类中的downstream super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { v = mapper.apply(t); downstream.onNext(v); } } } 复制代码
这段代码是去掉map源码中一些校验和其它相关回调后的精简代码。接下来分析一下代码流程:源码分析
map
时,map接受一个匿名内部类Function
的实例,并返回一个ObservableMap
对象。ObservableMap
本质上是一个Observable
,也是一个被观察者,其构造方法接受最外层的那个被Observable
实例,和Function
实例。ObservableMap
重写了subscribeActual
方法,在subscribeActual
中使用新建了一个MapObserver
实现了对原始Observable
的观察。Observable
中的数据变会被发送到MapObserver
的实例中。MapObserver
构造方法接收原始Observable
的观察者actual
,和Function
的实例mapper
MapObserver
在其onNext
方法中调用mapper
的apply
方法,得到该方法的返回值v apply方法就是map实例中: public String apply(User user) throws Throwable { return user.getName(); }
downstream
的onNext方法,并传入实参v。其中downstream
是MapObserver
父类中定义的变量,在MapObserver
构造方法中super(actual);
时初始化,其自己就是传入的actual
,本质上就是最原始的Observable
整个流程能够归纳以下: 存在一个原始的ObservableA
和一个观察者ObserverA
,当原始的被观察者ObservableA
调用map
,并传入一个匿名内部类实例化的’function‘,map
新建并返回了一个被观察者ObservableB
,经过subscribe
让观察者ObserverA
对其进行订阅。并重写subscribeActual
方法,在其被订阅时建立一个新的观察者ObserverB
其接受的,并用ObserverB
对原始的ObservableA
进行订阅观察。当原始的ObservableA
发出事件,调用ObserverB
的onNext
方法,subscribeActual
接受的观察者即是最原始的观察者ObserverA
。ObserverB
变执行经过匿名内部类实例化的’function‘的apply
方法获得数据v
,紧接着调用原始的ObservableA
的onNext
方法,并传入实参v
,ObserverA
观察到事件。 一句话归纳:一个原始的被观察者和观察者,可是让原始的观察者去订阅一个新的观察者,当新的被观察者被订阅的时候,建立一个新的观察者去订阅原始的被观察者,并在监听的事件以后执行指定的操做后再通知原始观察者。
faltMap
和map
的基本原理相似,其代码以下:
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) { return new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize); } public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends ObservableSource<? extends U>> mapper; final boolean delayErrors; final int maxConcurrency; final int bufferSize; public ObservableFlatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { super(source); } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize)); } static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> { MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { ... this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY); } @Override public void onSubscribe(Disposable d) { downstream.onSubscribe(this); } @Override public void onNext(T t) { ObservableSource<? extends U> p; p = mapper.apply(t); subscribeInner(p); } @SuppressWarnings("unchecked") void subscribeInner(ObservableSource<? extends U> p) { InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++); p.subscribe(inner); } void drain() { drainLoop(); } void drainLoop() { final Observer<? super U> child = this.downstream; child.onNext(o); } } static final class InnerObserver<T, U> extends AtomicReference<Disposable> implements Observer<U> { private static final long serialVersionUID = -4606175640614850599L; final long id; final MergeObserver<T, U> parent; volatile boolean done; volatile SimpleQueue<U> queue; int fusionMode; InnerObserver(MergeObserver<T, U> parent, long id) { this.id = id; this.parent = parent; } @Override public void onNext(U t) { parent.drain(); } } } 复制代码
上述代码便是faltMap
精简后的源码,其中大部分代码的运做流程和前文中的map
源码一致,咱们继续延续上文中讲解中的观察者和被观察者。重点关注其不一样的地方: faltMap
返回一个新的被观察者ObservableB
,重写ObservableB
的subscribeActual
方法在原始的观察者ObserverA
对其进行订阅时新建一个观察者ObserverB
对原始的ObservableA
进行订阅。新的观察者ObserverB
持有原始的ObserverA
和faltMap
接收的匿名对象实例function
。当ObserverB
监听到原始的被观察者ObservableA
的事件时,ObserverB
调用function
的apply
方法得到新新的被观察者ObservableC
,再建立一个新的观察者ObserverC
对ObservableC
进行订阅,ObserverC
持有原始的观察者ObserverA
,在ObserverC
观察到被观察者ObservableC
的时间时,调用原始的观察者ObserverA
的方法。
至此,map和flatMap已基本分析完毕,其中map的代码比较简单易懂,flatMap中还涉及到大量辅助操做,文中并未涉及到其中的合并等操做,阅读起来有些困难。若是仅仅是为了了解两者的原理,能够阅读Single<T>
中的代码。其中的代码量远远少于Observable
中的代码量。若是对RxJava基本的模式还不了解,能够阅读大神博客手写极简版的Rxjava