RxJava2.0操做符(三)

RxJava操做符图谱
在这里插入图片描述先对RxJava的操做符有一个总体结构的认识,而后从简单的操做符入手开始研究。html

转换操做符 – mapjava

map是RxJava中最简单的一个变换操做符了, 它的做用就是对Observable发送的每个事件应用一个函数, 使得每个事件都按照指定的函数去变化.
一个简单的例子:react

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "This is result " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });

在Observable发送的是数字类型, 而在Observer接收的是String类型, 中间起转换做用的就是map操做符, 运行结果为:web

D/TAG: This is result 1 
	D/TAG: This is result 2 
	D/TAG: This is result 3

经过Map, 能够将Observable发来的事件转换为任意的类型, 能够是一个Object, 也能够是一个集合.api

转换操做符 – flatMap服务器

FlatMap将一个发送事件的Observable变换为多个发送事件的Observables,而后将它们发射的事件合并后放进一个单独的Observable里.
Observable每发送一个事件, flatMap都将建立一个新的Observable, 而后发送转换以后的新的事件, Observer接收到的就是这些新的Observable发送的数据. 这里须要注意的是, flatMap并不保证事件的顺序.
一个简单的例子:网络

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });

在flatMap中将Observable发来的每一个事件转换为一个新的发送三个String事件的Observable, 为了看到flatMap结果是无序的,因此加了10毫秒的延时, 来看看运行结果吧:app

D/TAG: I am value 1
	D/TAG: I am value 1
	D/TAG: I am value 1
	D/TAG: I am value 3
	D/TAG: I am value 3
	D/TAG: I am value 3
	D/TAG: I am value 2
	D/TAG: I am value 2
	D/TAG: I am value 2

转换操做符 – concatMap
concatMap和flatMap的做用几乎如出一辙, 只是它的结果是严格按照Observable发送的顺序来发送的,一个简单的例子:ide

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });

运行结果:svg

D/TAG: I am value 1   
	D/TAG: I am value 1   
	D/TAG: I am value 1   
	D/TAG: I am value 2   
	D/TAG: I am value 2   
	D/TAG: I am value 2   
	D/TAG: I am value 3   
	D/TAG: I am value 3   
	D/TAG: I am value 3

简单介绍了RxJava的转换操做符中的map、flatMap和concatMap,对于其余操做符的学习对照官方文档仔细阅读并实际操做一下。
官方地址:http://reactivex.io/documentation/operators.html

实际应用

若是是一个新用户, 必须先注册, 等注册成功以后再自动登陆该怎么作呢.
很明显, 这是一个嵌套的网络请求, 首先须要去请求注册, 待注册成功回调了再去请求登陆的接口.
优雅的解决嵌套请求, 只须要用flatMap转换一下就好了.

请求接口:

public interface Api {
    @GET
    Observable<LoginResponse> login(@Body LoginRequest request);

    @GET
    Observable<RegisterResponse> register(@Body RegisterRequest request);
}

登陆和注册返回的都是一个Observable, 而flatMap操做符的做用就是把一个Observable转换为另外一个Observable:

api.register(new RegisterRequest())            //发起注册请求
                .subscribeOn(Schedulers.io())               //在IO线程进行网络请求
                .observeOn(AndroidSchedulers.mainThread())  //回到主线程去处理请求注册结果
                .doOnNext(new Consumer<RegisterResponse>() {
                    @Override
                    public void accept(RegisterResponse registerResponse) throws Exception {
                        //先根据注册的响应结果去作一些操做
                    }
                })
                .observeOn(Schedulers.io())                 //回到IO线程去发起登陆请求
                .flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
                    @Override
                    public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
                        return api.login(new LoginRequest());
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())  //回到主线程去处理请求登陆的结果
                .subscribe(new Consumer<LoginResponse>() {
                    @Override
                    public void accept(LoginResponse loginResponse) throws Exception {
                        Toast.makeText(MainActivity.this, "登陆成功", Toast.LENGTH_SHORT).show();
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Toast.makeText(MainActivity.this, "登陆失败", Toast.LENGTH_SHORT).show();
                    }
                });

转换操做符线程是由observeOn指定的,建立操做符的线程是由subscribeon指定
注册失败的块用Observable.error(异常类);这样会回调onError

转换操做符 – zip

Zip经过一个函数将多个Observable发送的事件结合到一块儿,而后发送这些组合到一块儿的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable同样多的数据。

  • 组合的过程是分别从 两个Observable里各取出一个事件 来进行组合, 而且一个事件只能被使用一次, 组合的顺序是严格按照事件发送的顺利 来进行的.
  • 最终observe收到的事件数量 是和上游中发送事件最少的那一根水管的事件数量 相同. 这个也很好理解, 由于是从每一个Observable里取一个事件来进行合并, 最少的 那个确定就最早取完 , 这个时候其余的Observable尽管还有事件 ,可是已经没有足够的事件来组合了, 所以observe就不会收到剩余的事件了.

一个简单的例子:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {         
    @Override                                                                                      
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {                   
        Log.d(TAG, "emit 1");                                                                      
        emitter.onNext(1);                                                                         
        Thread.sleep(1000);                                                                        
                                                                                                   
        Log.d(TAG, "emit 2");                                                                      
        emitter.onNext(2);                                                                         
        Thread.sleep(1000);                                                                        
                                                                                                   
        Log.d(TAG, "emit 3");                                                                      
        emitter.onNext(3);                                                                         
        Thread.sleep(1000);                                                                        
                                                                                                   
        Log.d(TAG, "emit 4");                                                                      
        emitter.onNext(4);                                                                         
        Thread.sleep(1000);                                                                        
                                                                                                   
        Log.d(TAG, "emit complete1");                                                              
        emitter.onComplete();                                                                      
    }                                                                                              
}).subscribeOn(Schedulers.io());                                                                   
                                                                                                   
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {           
    @Override                                                                                      
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {                    
        Log.d(TAG, "emit A");                                                                      
        emitter.onNext("A");                                                                       
        Thread.sleep(1000);                                                                        
                                                                                                   
        Log.d(TAG, "emit B");                                                                      
        emitter.onNext("B");                                                                       
        Thread.sleep(1000);                                                                        
                                                                                                   
        Log.d(TAG, "emit C");                                                                      
        emitter.onNext("C");                                                                       
        Thread.sleep(1000);                                                                        
                                                                                                   
        Log.d(TAG, "emit complete2");                                                              
        emitter.onComplete();                                                                      
    }                                                                                              
}).subscribeOn(Schedulers.io());                                                                   
                                                                                                   
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {               
    @Override                                                                                      
    public String apply(Integer integer, String s) throws Exception {                              
        return integer + s;                                                                        
    }                                                                                              
}).subscribe(new Observer<String>() {                    
    @Override                                                                                      
    public void onSubscribe(Disposable d) {                                                        
        Log.d(TAG, "onSubscribe");                                                                 
    }                                                                                              
                                                                                                   
    @Override                                                                                      
    public void onNext(String value) {                                                             
        Log.d(TAG, "onNext: " + value);                                                            
    }                                                                                              
                                                                                                   
    @Override                                                                                      
    public void onError(Throwable e) {                                                             
        Log.d(TAG, "onError");                                                                     
    }                                                                                              
                                                                                                   
    @Override                                                                                      
    public void onComplete() {                                                                     
        Log.d(TAG, "onComplete");                                                                  
    }                                                                                              
});

运行结果:

D/TAG: onSubscribe    
D/TAG: emit A         
D/TAG: emit 1         
D/TAG: onNext: 1A     
D/TAG: emit B         
D/TAG: emit 2         
D/TAG: onNext: 2B     
D/TAG: emit C         
D/TAG: emit 3         
D/TAG: onNext: 3C     
D/TAG: emit complete2 
D/TAG: onComplete

*注:若是两个Observable都是运行在同一个线程里, 同一个线程里执行代码有前后顺序,因此要切换线程

实际应用

应用场景,好比一个界面须要展现用户的一些信息, 而这些信息分别要从两个服务器接口中获取, 而只有当两个都获取到了以后才能进行展现, 这个时候就能够用Zip操做符实现。

首先分别定义这两个请求接口:

public interface Api {
    @GET
    Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);

    @GET
    Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);

}

接着用Zip来打包请求:

Observable<UserBaseInfoResponse> observable1 =                                            
        api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());      
                                                                                          
Observable<UserExtraInfoResponse> observable2 =                                           
        api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());    
                                                                                          
Observable.zip(observable1, observable2,                                                  
        new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {         
            @Override                                                                     
            public UserInfo apply(UserBaseInfoResponse baseInfo,                          
                                  UserExtraInfoResponse extraInfo) throws Exception {     
                return new UserInfo(baseInfo, extraInfo);                                 
            }                                                                             
        }).observeOn(AndroidSchedulers.mainThread())                                      
        .subscribe(new Consumer<UserInfo>() {                                             
            @Override                                                                     
            public void accept(UserInfo userInfo) throws Exception {                      
                //do something; 
            }                                                                             
        });