RxJava操做符(二)----转换操做符

RxJava各种型操做符详解以下:

RxJava操做符汇总
RxJava操做符(一) —-建立操做符
RxJava操做符(二)—-转换操做符
RxJava操做符(三)—-合并操做符
RxJava操做符(四)—-功能操做符
RxJava操做符(五) —-过滤操做符
RxJava操做符(六)—-条件操做符java



转换操做符: 变换被观察者(Observable)发送的事件。将Observable发送的数据按照必定的规则作一些变换,而后再将变换的数据发射出去。 变换的操做符有map,flatMap,concatMap,switchMap,buffer,groupBy等等。

一、map()操做符git

/** * ======================map============================ * * map操做符,能够说是的被观察者转换器。 经过指定一个Funcation对象,将被观察者(Observable)转换成新的被观察者(Observable)对象并发射,观察者会收到新的被观察者并处理 * * * 原本发射的数据是 数字1,而后观察者接收到的是 “ 这是新的观察数据===: 1” * * 流程: 被观察者.create(事件发射器).map(转换器).subscribe(观察者) */

    public static void map() {
        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        for (int i = 0; i < 10; i++) {
                            emitter.onNext(i);
                        }
                        emitter.onComplete();
                    }
                })
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return "这是新的观察数据===:" + integer;

                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG + "map", s);
                    }
                });
    }

输出以下:github

1

二、flatMap()操做符web

/** * ======================flatMap============================ * * flatMap操做符, 将Observable每一次发射的事件都转换成一个Observable,也就是说把Observable的发射事件集合转换成Observable集合。 * 而后观察者Observer最终观察的是Observable集合。可是观察者不能保证接收到这Observable集合发送事件的顺序。 * * 是否是很抽象? 先来看看这一个流程: 观察者.create(事件发射器).flatMap(转换器).subscribe(观察者) * * 再来看看例子 : 下面的代码,一开始Observable经过发射器的onNext发送了0-9这10个事件发送出去,正常来讲Observer接收到就是 0 - 9 这10个数据 * 然而中间通过了flatMap的转换。这 10个事件都分别在Funcation转换器上新的Observable。而最终观察者接收的就是这10个新的Observable的发送事件。 */

    public static void flatMap() {

        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        for (int i = 0; i < 10; i++) {
                            emitter.onNext(i);
                        }
                        emitter.onComplete();
                    }
                })
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        List<String> list = new ArrayList<>();
                        list.add(String.valueOf(0));
                        list.add(String.valueOf(1));
                        return Observable.fromIterable(list).delay(100, TimeUnit.MILLISECONDS);
                    }
                })
                .subscribe(
                        new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable d) {

                            }

                            @Override
                            public void onNext(String s) {
                                Log.d(TAG + "flatMap", s);
                            }

                            @Override
                            public void onError(Throwable e) {

                            }

                            @Override
                            public void onComplete() {
                                Log.d(TAG + "flatMap", "complete");
                            }
                        });
    }

2

三、concatMap() 操做符并发

/** * ======================concatMap============================ * * 与上面的flatMap做用基本同样,与flatMap惟一不一样的是concat能保证Observer接收到Observable集合发送事件的顺序 */
    public static void concatMap() {

        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        for (int i = 0; i < 10; i++) {
                            emitter.onNext(i);
                        }
                        emitter.onComplete();
                    }
                })
                .concatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        List<String> list = new ArrayList<>();
                        list.add(String.valueOf(0));
                        list.add(String.valueOf(1));
                        return Observable.fromIterable(list).delay(100, TimeUnit.MILLISECONDS);
                    }
                })
                .subscribe(
                        new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable d) {

                            }

                            @Override
                            public void onNext(String s) {
                                Log.d(TAG + "flatMap", s);
                            }

                            @Override
                            public void onError(Throwable e) {

                            }

                            @Override
                            public void onComplete() {
                                Log.d(TAG + "flatMap", "complete");
                            }
                        });
    }

3

四、buffer() 操做符app

/** * ========================buffer 操做符 ====================================== * * 把发射数据按照必定间隔分红若干段。按每段的数据转换成新的Observable,这个Observable把一段数据一次性发射出去。 * 能够简单地理解为把一组数据分红若干小组发射出去,而不是单个单个地发射出去 */
    public static void buffer() {

        Observable
                .just(1, 2, 3, 4, 5, 6)
                .buffer(2)
                .subscribe(new Observer<List<Integer>>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(List<Integer> integers) {
                        for (Integer integer : integers) {
                            Log.d(TAG + "buffer", String.valueOf(integer));
                        }
                        Log.d(TAG + "buffer", "============================");
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG + "buffer", "onComplete");
                    }
                });
    }

4


上面代码地址ide


RxJava各种型操做符详解以下:

RxJava操做符汇总
RxJava操做符(一) —-建立操做符
RxJava操做符(二)—-转换操做符
RxJava操做符(三)—-合并操做符
RxJava操做符(四)—-功能操做符
RxJava操做符(五) —-过滤操做符
RxJava操做符(六)—-条件操做符svg