RxJava2.0变换操做符(二)

一、转换操做符:java

①、Map(做用:数据类型色转换)数组

//实现的功能:将Integer类型转换成String类型
        Observable.create(

                //建立发射器
                new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        //发射事件
                        e.onNext(0);
                        e.onNext(1);
                        e.onNext(2);
                    }
                })
                //转换
                .map(
                        //Integer类型转换成String类型
                        new Function<Integer, String>() {
                            @Override
                            public String apply(@NonNull Integer integer) throws Exception {
                                return "Integer类型转换成String类型:"+integer;
                            }
                        })
                //订阅
                .subscribe(
                        //建立简单的观察者
                        new Consumer<String>() {
                            @Override
                            public void accept(@NonNull String s) throws Exception {
                                System.out.println("接收到的事件:"+s);
                            }
                        }
                );
运行结果:


②FlatMap(做用:将被观察者发送的事件序列进行 拆分 & 单独转换,在合并成一个新的事件序列,最后进行发送)缓存

原理:网络

一、为事件序列中每一个事件都建立一个 Observable(被观察者)app

二、将对每个 原始事件 转换后的 新的事件 都放入到对应的 Observableide

三、将新建的每个 Observable 都合并到一个 新建的、总的 Observablespa

四、新建的、总的 Observable 将合并的事件序列 发送给观察者 Observer线程

示意图:code


Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(0);
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        })

        .flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {

                List<String> lists=new ArrayList<String>();

                //把一个事件拆分红n个子事件
                for (int i = 0; i < 5; i++) {
                    lists.add("我是事件:"+integer+",被拆分后的子事件:"+i);
                }

                //发射数组遍历后的数据
                return Observable.fromIterable(lists);
            }
        })
        //订阅
        .subscribe(
                //建立简单的观察者
                new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        System.out.println("接收到的事件:"+s);
                    }
                }
        );
运行结果:


注意:新合并生成的事件序列顺序是无序的,与旧序列发送的顺序无关。(上面的运行结果为有序纯属巧合server


③ConcatMap(做用:相似FlatMap,区别,它是有序的)


Observable.create(
                new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        e.onNext(0);
                        e.onNext(1);
                        e.onNext(2);
                    }
                }
        )
        //有序的转换符
        .concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                List<String> lists=new ArrayList<String>();

                //把事件拆分红N份
                for (int i = 0; i < 5; i++) {
                    lists.add("我是事件:"+integer+",被拆分红:"+i);
                }

                return Observable.fromIterable(lists);
            }
        })
        //订阅
        .subscribe(
                //建立观测者
                new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        System.out.println("接收到的事件:"+s);
                    }
                }
        );
运行结果:


总结:ConcatMap,新合并生成的事件序列顺序是有序的,即严格按照旧序列发送事件的顺序。而 FlatMap,新合并的事件序列是无序的。


④Buffer

做用:按期从 被观测者须要发送的事件中 取出必定数量的事件 而且 放到缓存区中,最终发送


Observable.create(
                new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        e.onNext(0);
                        e.onNext(1);
                        e.onNext(2);
                        e.onNext(3);
                        e.onNext(4);
                        e.onNext(5);
                        

                    }
                }
        )
                //参数:设置缓存区的大小和步长
                //缓存区的大小 = 每次从被观察者中获取的事件数量
                //步长 = 每次获取新事件的数量
                .buffer(3,1)
                .subscribe(
                        new Consumer<List<Integer>>() {
                            @Override
                            public void accept(@NonNull List<Integer> integers) throws Exception {
                                System.out.println("缓存区里存放放的事件总数:"+integers.size());
                                for (Integer integer : integers) {
                                    System.out.println("从缓存区获取到的事件:"+integer);
                                }
                            }
                        }
                );

运行结果:




二、开发实际应用:使用FlatMap嵌套网络请求(好比咱们实际开发中的,先注册,注册完成之后去作登陆操做。这样就是两步的操做,能够直接用FlatMap来实现)

//举例,为了模拟注册登陆,我只是模拟两次不一样的网络请求,而实际的注册登陆也是同一个道理:

final Observable<RegisterEntity> observableForRegister =   RetrofitFactory.getRetrofit().create(NetApi.class).doRegister();
        final Observable<LoginEntity> observableForLogin = RetrofitFactory.getRetrofit().create(NetApi.class).doLogin();

        observableForRegister
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                //先发起注册请求
                .doOnNext(new Consumer<RegisterEntity>() {
                    @Override
                    public void accept(@NonNull RegisterEntity registerEntity) throws Exception {
                        System.out.println("第一次网络请求:" + "注册成功");
                        System.out.println("注册返回的结果:" + registerEntity.toString());
                    }
                })

                //切换到io线程进行登陆请求
                .subscribeOn(Schedulers.io())
                //切换到主线程处理登陆请求返回的结果
                .observeOn(AndroidSchedulers.mainThread())

                .flatMap(new Function<RegisterEntity, ObservableSource<LoginEntity>>() {
                    @Override
                    public ObservableSource<LoginEntity> apply(@NonNull RegisterEntity registerEntity) throws Exception {

                        //将注册请求转换成登陆请求,即发起第二次网络请求(即登陆请求)
                        return observableForLogin;
                    }
                })


                .subscribe(
                        //建立观察者
                        new Consumer<LoginEntity>() {
                            @Override
                            public void accept(@NonNull LoginEntity loginEntity) throws Exception {
                                System.out.println("第二次网络请求:" + "登陆成功");
                                System.out.println("登陆成功还回的结果:" + loginEntity.toString());
                            }
                        }

                        , new Consumer<Throwable>() {
                            @Override
                            public void accept(@NonNull Throwable throwable) throws Exception {
                                System.out.println("登陆失败" + throwable);
                            }
                        }

                );