操做符之组合 / 合并操做符

     一、做用

  • 组合 多个被观察者(Observable) & 合并须要发送的事件

     二、类型

     三、详解

          3.1   concat() / concatArray()数据结构

       做用:组合多个被观察者一块儿发送数据,合并后 按发送顺序串行执行app

                     区别:组合被观察者的数量,即concat()组合被观察者数量≤4个,而concatArray()则可>4个ide

 

    public static void concat() {
        Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          输出:函数

08-06 16:28:21.348 22182 22182 D Operation: onSubscribe
08-06 16:28:21.348 22182 22182 D Operation: onNext: value = 1
08-06 16:28:21.348 22182 22182 D Operation: onNext: value = 2
08-06 16:28:21.348 22182 22182 D Operation: onNext: value = 3
08-06 16:28:21.349 22182 22182 D Operation: onNext: value = 4
08-06 16:28:21.349 22182 22182 D Operation: onNext: value = 5
08-06 16:28:21.349 22182 22182 D Operation: onComplete

 

          3.2   merge() / mergeArray()----- 见rxdocs.pdf第139页spa

       做用:组合多个被观察者一块儿发送数据,合并后 按时间线并行执行3d

       区别:组合被观察者的数量,即merge()组合被观察者数量≤4个,而mergeArray()则可>4个code

                 区别上述concat()操做符:一样是组合多个被观察者一块儿发送数据,但concat()操做符合并后是按发送顺序串行执行server

 

 

   public static void merge() {
        Observable.merge(Observable.intervalRange(0, 3, 0, 2, TimeUnit.SECONDS), Observable.intervalRange(6, 3, 1, 2, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          输出:blog

08-06 16:42:42.503 23013 23013 D Operation: onSubscribe
08-06 16:42:42.505 23013 23037 D Operation: onNext: value = 0
08-06 16:42:43.505 23013 23038 D Operation: onNext: value = 6
08-06 16:42:44.506 23013 23037 D Operation: onNext: value = 1
08-06 16:42:45.506 23013 23038 D Operation: onNext: value = 7
08-06 16:42:46.505 23013 23037 D Operation: onNext: value = 2
08-06 16:42:47.506 23013 23038 D Operation: onNext: value = 8
08-06 16:42:47.509 23013 23038 D Operation: onComplete

 

          3.3   concatDelayError() / mergeDelayError()----- 见rxdocs.pdf第140页事件

       做用:若其中一个被观察者发生onError事件,推迟到其余观察者发送完事件后才触发。

 

          3.4   zip()----- 见rxdocs.pdf第147页

       做用:将多个Observable发射的数据按顺序组合起来,每一个数据只能组合一次,并且都是有序的。最终组合的数据的数量由发射数据最少的Observable来决定。

 

    public static void zip() {
        Observable.zip(Observable.just(1, 2, 3, 4), Observable.just("A", "B", "C"), new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "onNext: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

          输出:

08-06 17:19:13.248 24303 24303 D Operation: onSubscribe
08-06 17:19:13.249 24303 24303 D Operation: onNext: value = 1A
08-06 17:19:13.249 24303 24303 D Operation: onNext: value = 2B
08-06 17:19:13.249 24303 24303 D Operation: onNext: value = 3C
08-06 17:19:13.249 24303 24303 D Operation: onComplete

 

          3.5   combineLatest()----- 见rxdocs.pdf第133页

       做用:当两个Observables中的任何一个发射了数据时,使用一个函数结合每一个Observable发射的最 近数据项,而且基于这个函数的结果发射数据

                     与zip()的区别:zip() = 按个数合并,即1对1合并;combineLatest() = 按时间合并,即在同一个时间点上合并

 

    public static void combineLatest() {
        Observable.combineLatest(Observable.intervalRange(0, 3, 0, 2, TimeUnit.SECONDS), Observable.intervalRange(6, 3, 1, 2, TimeUnit.SECONDS), new BiFunction<Long, Long, String>() {
            @Override
            public String apply(Long aLong, Long aLong2) throws Exception {
                return "" + aLong + "+" + aLong2;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "onNext: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

          输出:

08-06 17:50:20.240  5792  5792 D Operation: onSubscribe
08-06 17:50:21.248  5792  5811 D Operation: onNext: value = 0+6
08-06 17:50:22.242  5792  5810 D Operation: onNext: value = 1+6
08-06 17:50:23.242  5792  5811 D Operation: onNext: value = 1+7
08-06 17:50:24.241  5792  5810 D Operation: onNext: value = 2+7
08-06 17:50:25.242  5792  5811 D Operation: onNext: value = 2+8
08-06 17:50:25.242  5792  5811 D Operation: onComplete

 

          3.6   combineLatestDelayError()

       做用:做用相似于concatDelayError() / mergeDelayError() ,即错误处理,此处不做过多描述

 

          3.7   reduce()----- 见rxdocs.pdf第210页

       做用:把被观察者须要发送的事件聚合成1个事件 & 发送

 

    public static void reduce() {
        Observable.just(1, 2, 3, 4, 5)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }).subscribe(new MaybeObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onSuccess(Integer value) {
                Log.d(TAG, "onSuccess: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

          输出:

08-06 18:02:50.062  7606  7606 D Operation: onSubscribe
08-06 18:02:50.062  7606  7606 D Operation: onSuccess: value = 15

 

          3.8   collect()

       做用:将被观察者Observable发送的数据事件收集到一个数据结构里

 

 

   public static void collect() {
        Observable.just(1, 2, 3)
                .collect(new Callable<List<Integer>>() {
                    @Override
                    public List<Integer> call() throws Exception {
                        return new ArrayList<>();
                    }
                }, new BiConsumer<List<Integer>, Integer>() {
                    @Override
                    public void accept(List<Integer> list, Integer integer) throws Exception {
                        list.add(integer);
                    }
                }).subscribe(new SingleObserver<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onSuccess(List<Integer> list) {
                for (Integer i : list) {
                    Log.d(TAG, "onSuccess: value = " + i);
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }
        });
    }

          输出:

08-06 19:09:56.003 12640 12640 D Operation: onSubscribe
08-06 19:09:56.003 12640 12640 D Operation: onSuccess: value = 1
08-06 19:09:56.003 12640 12640 D Operation: onSuccess: value = 2
08-06 19:09:56.003 12640 12640 D Operation: onSuccess: value = 3

 

          3.9   startWith() / startWithArray()----- 见rxdocs.pdf第144页

       做用:在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者

 

    public static void startWith() {
        Observable.just(1, 2, 3)
                .startWith(Observable.just(7, 8, 9))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          输出:

08-06 19:18:16.663 13408 13408 D Operation: onSubscribe
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 7
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 8
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 9
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 1
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 2
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 3
08-06 19:18:16.664 13408 13408 D Operation: onComplete

 

          3.10   count()----- 见rxdocs.pdf第207页

       做用:统计被观察者发送事件的数量

    public static void count() {
        Observable.just(1, 2, 3)
                .count()
                .subscribe(new SingleObserver<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onSuccess(Long aLong) {
                        Log.d(TAG, "onSuccess: value = " + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }
                });
    }

          输出:

08-06 19:25:07.151 14405 14405 D Operation: onSubscribe
08-06 19:25:07.151 14405 14405 D Operation: onSuccess: value = 3

 

     四、总结