RxJava操做符图谱
先对RxJava的操做符有一个总体结构的认识,而后从简单的操做符入手开始研究。html
转换操做符 – mapjava
map是RxJava中最简单的一个变换操做符了, 它的做用就是对Observable发送的每个事件应用一个函数, 使得每个事件都按照指定的函数去变化.
一个简单的例子:react
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } }).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return "This is result " + integer; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, s); } });
在Observable发送的是数字类型, 而在Observer接收的是String类型, 中间起转换做用的就是map操做符, 运行结果为:web
D/TAG: This is result 1 D/TAG: This is result 2 D/TAG: This is result 3
经过Map, 能够将Observable发来的事件转换为任意的类型, 能够是一个Object, 也能够是一个集合.api
转换操做符 – flatMap服务器
FlatMap将一个发送事件的Observable变换为多个发送事件的Observables,而后将它们发射的事件合并后放进一个单独的Observable里.
Observable每发送一个事件, flatMap都将建立一个新的Observable, 而后发送转换以后的新的事件, Observer接收到的就是这些新的Observable发送的数据. 这里须要注意的是, flatMap并不保证事件的顺序.
一个简单的例子:网络
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } }).flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { final List<String> list = new ArrayList<>(); for (int i = 0; i < 3; i++) { list.add("I am value " + integer); } return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, s); } });
在flatMap中将Observable发来的每一个事件转换为一个新的发送三个String事件的Observable, 为了看到flatMap结果是无序的,因此加了10毫秒的延时, 来看看运行结果吧:app
D/TAG: I am value 1 D/TAG: I am value 1 D/TAG: I am value 1 D/TAG: I am value 3 D/TAG: I am value 3 D/TAG: I am value 3 D/TAG: I am value 2 D/TAG: I am value 2 D/TAG: I am value 2
转换操做符 – concatMap
concatMap和flatMap的做用几乎如出一辙, 只是它的结果是严格按照Observable发送的顺序来发送的,一个简单的例子:ide
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } }).concatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { final List<String> list = new ArrayList<>(); for (int i = 0; i < 3; i++) { list.add("I am value " + integer); } return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, s); } });
运行结果:svg
D/TAG: I am value 1 D/TAG: I am value 1 D/TAG: I am value 1 D/TAG: I am value 2 D/TAG: I am value 2 D/TAG: I am value 2 D/TAG: I am value 3 D/TAG: I am value 3 D/TAG: I am value 3
简单介绍了RxJava的转换操做符中的map、flatMap和concatMap,对于其余操做符的学习对照官方文档仔细阅读并实际操做一下。
官方地址:http://reactivex.io/documentation/operators.html
实际应用
若是是一个新用户, 必须先注册, 等注册成功以后再自动登陆该怎么作呢.
很明显, 这是一个嵌套的网络请求, 首先须要去请求注册, 待注册成功回调了再去请求登陆的接口.
优雅的解决嵌套请求, 只须要用flatMap转换一下就好了.
请求接口:
public interface Api { @GET Observable<LoginResponse> login(@Body LoginRequest request); @GET Observable<RegisterResponse> register(@Body RegisterRequest request); }
登陆和注册返回的都是一个Observable, 而flatMap操做符的做用就是把一个Observable转换为另外一个Observable:
api.register(new RegisterRequest()) //发起注册请求 .subscribeOn(Schedulers.io()) //在IO线程进行网络请求 .observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求注册结果 .doOnNext(new Consumer<RegisterResponse>() { @Override public void accept(RegisterResponse registerResponse) throws Exception { //先根据注册的响应结果去作一些操做 } }) .observeOn(Schedulers.io()) //回到IO线程去发起登陆请求 .flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() { @Override public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception { return api.login(new LoginRequest()); } }) .observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求登陆的结果 .subscribe(new Consumer<LoginResponse>() { @Override public void accept(LoginResponse loginResponse) throws Exception { Toast.makeText(MainActivity.this, "登陆成功", Toast.LENGTH_SHORT).show(); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Toast.makeText(MainActivity.this, "登陆失败", Toast.LENGTH_SHORT).show(); } });
转换操做符线程是由observeOn指定的,建立操做符的线程是由subscribeon指定
注册失败的块用Observable.error(异常类);这样会回调onError
转换操做符 – zip
Zip经过一个函数将多个Observable发送的事件结合到一块儿,而后发送这些组合到一块儿的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable同样多的数据。
一个简单的例子:
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emit 1"); emitter.onNext(1); Thread.sleep(1000); Log.d(TAG, "emit 2"); emitter.onNext(2); Thread.sleep(1000); Log.d(TAG, "emit 3"); emitter.onNext(3); Thread.sleep(1000); Log.d(TAG, "emit 4"); emitter.onNext(4); Thread.sleep(1000); Log.d(TAG, "emit complete1"); emitter.onComplete(); } }).subscribeOn(Schedulers.io()); Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(TAG, "emit A"); emitter.onNext("A"); Thread.sleep(1000); Log.d(TAG, "emit B"); emitter.onNext("B"); Thread.sleep(1000); Log.d(TAG, "emit C"); emitter.onNext("C"); Thread.sleep(1000); Log.d(TAG, "emit complete2"); emitter.onComplete(); } }).subscribeOn(Schedulers.io()); Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) throws Exception { return integer + s; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(String value) { Log.d(TAG, "onNext: " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError"); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
运行结果:
D/TAG: onSubscribe D/TAG: emit A D/TAG: emit 1 D/TAG: onNext: 1A D/TAG: emit B D/TAG: emit 2 D/TAG: onNext: 2B D/TAG: emit C D/TAG: emit 3 D/TAG: onNext: 3C D/TAG: emit complete2 D/TAG: onComplete
*注:若是两个Observable都是运行在同一个线程里, 同一个线程里执行代码有前后顺序,因此要切换线程
实际应用
应用场景,好比一个界面须要展现用户的一些信息, 而这些信息分别要从两个服务器接口中获取, 而只有当两个都获取到了以后才能进行展现, 这个时候就能够用Zip操做符实现。
首先分别定义这两个请求接口:
public interface Api { @GET Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request); @GET Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request); }
接着用Zip来打包请求:
Observable<UserBaseInfoResponse> observable1 = api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io()); Observable<UserExtraInfoResponse> observable2 = api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io()); Observable.zip(observable1, observable2, new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() { @Override public UserInfo apply(UserBaseInfoResponse baseInfo, UserExtraInfoResponse extraInfo) throws Exception { return new UserInfo(baseInfo, extraInfo); } }).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<UserInfo>() { @Override public void accept(UserInfo userInfo) throws Exception { //do something; } });