Rxjava2(三)合并操作符

1.concat

private void concat() {
    final Integer[] items={1,2,3,4};
    Observable.concat(Observable.just(1,2,3),Observable.just(4,5))
          .subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  LogUtils.syso("======accept==="+integer);
              }
          });

}

作用:组合多个被观察者一起发送数据,合并后 按发送顺序串行执行  注意:concat()组合被观察者数量≤4个

2.concatArray

private void concatArray() {
    final Integer[] items={1,2,3,4};
    Observable.concatArray(Observable.just(1,2,3),
            Observable.just(4,5),
            Observable.just(6,7),
            Observable.just(8,9),
            Observable.just(10,11),
            Observable.just(12,13))
          .subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  LogUtils.syso("======accept==="+integer);
              }
          });

}

  • 作用
    组合多个被观察者一起发送数据,合并后 按发送顺序串行执行  注意:concatArray()则可>4个

3. merge

private void merge() {
    Observable.merge(Observable.intervalRange(1,3,2,1, TimeUnit.SECONDS),
            Observable.intervalRange(4,3,2,1, TimeUnit.SECONDS))
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    LogUtils.syso("====aLong===="+aLong);
                }
            });
}

  • 作用
    组合多个被观察者一起发送数据,合并后 按时间线并行执行

组合被观察者的数量,即merge()组合被观察者数量≤4个

4.mergeArray

private void mergeArray() {
    Observable.mergeArray(Observable.intervalRange(1,3,2,1, TimeUnit.SECONDS),
            Observable.intervalRange(4,3,2,1, TimeUnit.SECONDS),
            Observable.intervalRange(7,3,2,1, TimeUnit.SECONDS),
            Observable.intervalRange(10,3,2,1, TimeUnit.SECONDS),
            Observable.intervalRange(13,3,2,1, TimeUnit.SECONDS))
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    LogUtils.syso("====aLong===="+aLong);
                }
            });
}

  1. 二者区别:组合被观察者的数量,即merge()组合被观察者数量≤4个,而mergeArray()则可>4个
  2. 区别上述concat()操作符:同样是组合多个被观察者一起发送数据,但concat()操作符合并后是按发送顺序串行执行

5.concatDelayError



测试结果:第1个被观察者发送Error事件后,第2个被观察者则不会继续发送事件

那么如果希望onError事件推迟到其他观察者发送事件结束

private void concatArrayDelayError() {

  Observable.concatArrayDelayError( Observable.create(new ObservableOnSubscribe<Integer>() {
      @Override
      public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onError(new NullPointerException());
            e.onComplete();
      }
  }),Observable.just(4,5,6))
          .subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  LogUtils.syso("=====accept=====" + integer);
              }
          }, new Consumer<Throwable>() {
              @Override
              public void accept(Throwable throwable) throws Exception {
                  LogUtils.syso("=====accept=throwable====" + throwable.getMessage());
              }
          });
}

达到预期的效果

小结:


6.zip (合并多个事件)

该类型的操作符主要是对多个被观察者中的事件进行合并处理

作用:
合并 多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送


假设这样一种场景,我们利用github api开发一个app,在user界面,我既要请求user基本信息,又要列举user下的event数据,为此,我准备使用Retrofit来做网络请求。

虽然在后台有两次请求,但是在前台,我们希望用户打开这个页面,然后等待加载,然后显示。用户只有一次等待加载的过程。所以说,我们需要等待这两个请求都返回结果了,再开始显示数据。

怎么办?自己写判断两个都加载已完成的代码吗?逻辑好像也不是很复杂,但是代码看起来就没有那么高大上了啊。

其实既然你都用过了还有,那么直觉上你应该意识到也许RxJava可以解决这个问题。没错,就是RxJava,使用zip操作符。

zip( ):使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果


private void zip() {
    Observable.zip(Observable.just(1, 2, 3),
            Observable.just("one", "two", "three"),
            new BiFunction<Integer, String, String>() {
                @Override
                public String apply(Integer integer, String s) throws Exception {
                    return integer+s;
                }
            }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            LogUtils.syso("======返回的结果======="+s);
        }
    });
}


7.conbineLatest

  • 作用
    当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据
  • Zip()的区别:Zip() = 按个数合并,即1对1合并;CombineLatest() = 按时间合并,即在同一个时间点上合并
   private void combineLatest() {
        Observable.combineLatest(Observable.just(1, 2, 3),
                Observable.just("one", "two", "three"),
                new BiFunction<Integer, String, String>() {
                    @Override
                    public String apply(Integer integer, String s) throws Exception {
                        return integer+s;
                    }
                }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                LogUtils.syso("======返回的结果======="+s);
            }
        });
    }
}

8.reduce

  • 作用
    把被观察者需要发送的事件聚合成1个事件 & 发送
聚合的逻辑根据需求撰写,但本质都是前2个数据聚合,然后与后1个数据继续进行聚合,依次类推

private void reduce() {
    Observable.just(1, 2, 3)
            .reduce(new BiFunction<Integer, Integer, Integer>() {
                @Override
                public Integer apply(Integer integer, Integer integer2) throws Exception {
                    Log.e("REDUCE", "本次计算的数据是: "+integer +" "+ integer2);
                    return integer+integer2;
                }
            }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            LogUtils.syso("=====result===="+integer);
        }
    });
}

9.collect

作用
将被观察者Observable发送的数据事件收集到一个数据结构里

private void collect() {
   Observable.just(1,2,3,4)

           .collect(
                   // 1. 创建数据结构(容器),用于收集被观察者发送的数据
               new Callable<List<Integer>>() {
               @Override
               public List<Integer> call() throws Exception {
                   return new ArrayList<>();
               }
           },// 2. 对发送的数据进行收集
           new BiConsumer<List<Integer>, Integer>() {
                   /**  * mList 容器,  * integer 后者数据  *  * */  @Override
               public void accept(List<Integer> mList, Integer integer) throws Exception {
                   mList.add(integer);
               }
           }).subscribe(new Consumer<List<Integer>>() {
       @Override
       public void accept(List<Integer> integers) throws Exception {
            LogUtils.syso("====result======"+integers);
       }
   });
}

10.startWith/startWithArray

  • 作用
    在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者


Observable.just(1,2,3,4)
        .startWith(5)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                 LogUtils.syso("=======result======="+integer);
            }
        });

Observable.just(1,2,3,4)
        .startWithArray(7,8,9)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                 LogUtils.syso("======resutl======="+integer);
            }
        });

11.count

  • 作用
    统计被观察者发送事件的数量

private void count() {
    Observable.just(1,2,3,4)
            .count()
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    LogUtils.syso("========发送事件的个数======"+aLong);
                }
            });
}