史上最全的Rxjava2讲解(使用篇)

1.前言

在好久以前就一直想整理一下rxjava,可是一直没有时间,最近是由于离职了,总算有时间整理一下了。由于打算每篇博客都记录一个框架。因此为了描述清楚,本篇博客可能略长(包含rxjava的简介,使用,背压,原理等),但愿大家能认真的读完,收获确定仍是有的,也会采用大量的图来介绍,这样能够加深理解。也能够当一个工具博客,须要的使用的话随时查阅。java

后续还会继续出背压和原理篇,敬请期待
react

2.简介

什么是rxjava? 是一种事件驱动的基于异步数据流的编程模式,整个数据流就像一条河流,它能够被观测(监听),过滤,操控或者与其余数据流合并为一条新的数据流。android

三要素git

  1. 被观察者(Observable)
  2. 观察者(Observer)
  3. 订阅(subscribe)

好了,由于秉持着要有图的思想,在介绍rxjava各个操做符的时候,会采用大量的图示来表示,图示来源于官方,这里先给你们介绍一下怎么看。
ok,进入到撸码环节
github

3.简单使用

1.首先要在 build.gradle 文件中添加依赖编程

implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
复制代码

2.依赖搭建完毕了,咱们先写个最简单的案例,一共3步走并发

  • 2.1 建立被观察者
// 建立被观察者
   Observable.create(new ObservableOnSubscribe<String>() {

       @Override
       public void subscribe(ObservableEmitter<String> emitter) throws Exception {

            emitter.onNext("你好呀");
            emitter.onNext("我爱中国");
            emitter.onNext("祝愿祖国繁荣富强");
            emitter.onComplete();
        }
   });
复制代码
  • 2.2 建立观察者
// 建立观察者
   Observer observer = new Observer<String>(){

       @Override
       public void onSubscribe(Disposable d) {

           Log.i("lybj", "准备监听");
       }

       @Override
       public void onNext(String s) {

           Log.i("lybj", s);
       }

       @Override
       public void onError(Throwable e) {

           Log.i("lybj", "error");
       }

       @Override
       public void onComplete() {

           Log.i("lybj", "监听完毕");
       }
   };
复制代码
  • 2.3 订阅(也就是将被观察者和观察者关联)
// 订阅
   observable.subscribe(observer);
复制代码

这就完事了,看下结果app

是否是很简单,几个概念再介绍一下框架

  • onNext():当被观察者(observable)经过调用onNext()发射数据的时候,观察者(observer)调用onNext()接收数据
  • onError():当被观察者(observable)调用该函数时,观察者(observer)调用onError(),其余事件将不会继续发送
  • onComplete():当被观察者(observable)调用该函数时,观察者(observer)调用onComplete(),其余事件将不会继续发送

其实rxjava,打个比方,就相似花洒的头,数据流就相似水流,它的被观察者(observable)的各类操做符就是花洒的那个头,能够有各类模式,好比中间喷水的,周围喷水的,喷水雾的等等。根据操做符的不一样,能够改变数据的各类样式,根据花洒头的不一样,能够把水流改为各类样式。 接下来,就来学习下observable的丰富的操做符。异步

先看看大纲


4.建立操做符

1.create()

1.1 作啥的?

建立被观察者对象

1.2 如何用?

// 建立被观察者
Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("你好呀");
                emitter.onNext("我爱中国");
                emitter.onNext("祝愿祖国繁荣富强");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>(){ // 关联观察者

            @Override
            public void onSubscribe(Disposable d) {

                Log.i("lybj", "准备监听");
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj", s);
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj", "error");
            }

            @Override
            public void onComplete() {

                Log.i("lybj", "监听完毕");
            }
        });
复制代码

1.3 结果

能够直接链式调用关联观察者


2 just()

2.1 作啥的?

经过上面的图,应该很形象的说明了,主要做用就是建立一个被观察者,并发送事件,可是发送的事件不能够超过10个以上。

2.2 如何用?

Observable.just("小明", "小红", "小兰").subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

                Log.i("lybj", "准备监听");
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj", s+"来了");
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj", "Error");
            }

            @Override
            public void onComplete() {

                Log.i("lybj", "完毕");
            }
        });
复制代码

2.3 结果


3 timer()

3.1 作啥的?

当到指定时间后就会发送一个 0 的值给观察者。 在项目中,能够作一些延时的处理,相似于Handler中的延时

3.2 如何用?

Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {

        @Override
        public void accept(Long aLong) throws Exception {

             Log.i("lybj", aLong+"");
        }
});
复制代码

3.3 结果

延迟2秒后,将结果发送给观察者,Consumer和Observer是建立观察者的两种写法,至关于观察者中的onNext方法。


4 interval()

4.1 作啥的?

每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。 相似于项目中的timer,作计时器

4.2 如何用?

Observable.interval(3,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            
     @Override
     public void accept(Long aLong) throws Exception {

          Log.i("lybj", aLong+"");
     }
});
复制代码

4.3 结果


5 intervalRange()

5.1 作啥的?

能够指定发送事件的开始值和数量,其余与 interval() 的功能同样。

5.2 如何用?

Observable.intervalRange(100, 4, 0, 10, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
           
      @Override
      public void accept(Long aLong) throws Exception {

           Log.i("lybj", aLong+"");
      }
});
复制代码

5.3 结果

参数依次是:开始值,循环执行的次数,初始延迟时间,执行间隔时间,时间单位


6 range()

6.1 作啥的?

同时发送必定范围的事件序列。

6.2 如何用?

Observable.range(0,10).subscribe(new Consumer<Integer>() {
            
    @Override
    public void accept(Integer integer) throws Exception {

         Log.i("lybj", integer+"");
    }
});
复制代码

6.3 结果


7 rangeLong()

7.1 作啥的?

做用与 range() 同样,只是数据类型为 Long

7.2 如何用?

Observable.rangeLong(0,10).subscribe(new Consumer<Long>() {
            
    @Override
    public void accept(Long aLong) throws Exception {

         Log.i("lybj", aLong+"");
    }
});
复制代码

7.3 结果


8 empty() & never() & error()

8.1 作啥的?

  • never():不发送任何事件
  • error():发送 onError() 事件
  • empty() : 直接发送 onComplete() 事件

8.2 如何用?

private void empty_never_error(){

        Observable.empty().subscribe(new Observer(){
            @Override
            public void onSubscribe(Disposable d) {

                Log.i("lybj", "准备监听");
            }

            @Override
            public void onNext(Object o) {

                Log.i("lybj", o+"");
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj", "onError");
            }

            @Override
            public void onComplete() {

                Log.i("lybj", "onComplete");
            }
        });
复制代码

8.3 结果

若是是empty() 则:

若是是error() 则:

若是是never()则:



5.转换操做符

1 map()

1.1 作啥的?

map 能够将被观察者发送的数据类型转变成其余的类型

1.2 怎么用?

Observable.just("中国", "祖国", "中国军人")
                .map(new Function<String, String>() {

                    @Override
                    public String apply(String s) throws Exception {

                        return "我爱" + s;
                    }
                })
                .subscribe(new Consumer<String>() {

                    @Override
                    public void accept(String s) throws Exception {

                        Log.i("lybj", s);
                    }
                });
复制代码

1.3 结果

简单来说,就是能够对发射过来的数据进行再加工,再传给观察者


2 flatMap()

2.1 作啥的?

这个方法能够将事件序列中的元素进行整合加工,返回一个新的被观察者。 flatMap() 其实与 map() 相似,可是 flatMap() 返回的是一个 Observerable,map()只是返回数据,若是在元素再加工的时候,想再使用上面的建立操做符的话,建议使用flatMap(),而非map()。

2.2 怎么用?

Observable.just("中国", "祖国", "中国军人", "贪官")
                .flatMap(new Function<String, ObservableSource<String>>() {

                    @Override
                    public ObservableSource<String> apply(String s) throws Exception {

                        if(s.equals("贪官")){

                            return Observable.error(new Exception("贪官不能被喜欢"));
                        }
                        return Observable.just("我爱"+s);
                    }
                })
                .subscribe(new Consumer<String>() {

                    @Override
                    public void accept(String s) throws Exception {

                        Log.i("lybj", s);
                    }
                }, new Consumer<Throwable>() {

                    @Override
                    public void accept(Throwable throwable) throws Exception {

                        Log.i("lybj", throwable.getMessage());
                    }
                });
复制代码

2.3 结果

new Consumer方法监听的是Observable.error()


3 concatMap()

3.1 作啥的?

concatMap() 和 flatMap() 基本上是同样的,只不过 concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的。

3.2 怎么用?

Observable.just("中国", "祖国", "中国军人", "贪官")
                .concatMap(new Function<String, ObservableSource<String>>() {

                    @Override
                    public ObservableSource<String> apply(String s) throws Exception {

                        if(s.equals("贪官")){

                            return Observable.error(new Exception("贪官不能被喜欢"));
                        }
                        return Observable.just("我爱"+s);
                    }
                })
                .subscribe(new Consumer<String>() {

                    @Override
                    public void accept(String s) throws Exception {

                        Log.i("lybj", s);
                    }
                }, new Consumer<Throwable>() {

                    @Override
                    public void accept(Throwable throwable) throws Exception {

                        Log.i("lybj", throwable.getMessage());
                    }
                });
复制代码

3.3 结果


4 buffer()

4.1 作啥的?

从须要发送的事件当中获取必定数量的事件,并将这些事件放到缓冲区当中一并发出。

4.2 怎么用?

buffer 有两个参数,一个是 count,另外一个 skip。count 缓冲区元素的数量,skip 就表明缓冲区满了以后,发送下一次事件序列的时候要跳过多少元素。

Observable.just("1", "2", "3", "4", "5")
                .buffer(2,1)
                .subscribe(new Consumer<List<String>>() {

                    @Override
                    public void accept(List<String> strings) throws Exception {

                        Log.d("lybj", "缓冲区大小: " + strings.size());
                        for (String s : strings){
                            Log.d("lybj",  s);
                        }
                    }
                });
复制代码

4.3 结果


5 scan()

5.1 作啥的?

将发射的数据经过一个函数进行变换,而后将变换后的结果做为参数跟下一个发射的数据一块儿继续经过那个函数变换,这样依次连续发射获得最终结果。

5.2 怎么用

Observable.just(1, 2, 3, 4, 5)
                .scan(new BiFunction<Integer, Integer, Integer>() {

                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {

                        Log.i("lybj", "integer01: " + integer + " integer02: "+ integer2);
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj", "accept: " + integer);
                    }
                });
复制代码

5.3 结果

简单来讲,先将第一个元素返回给观察者,而后将1,2的和返给观察者,而后将上一次计算的和,当第一个元素,也就是3,第2个元素,是一直按顺序取值,取第3个元素也就是3,那么将,3+3 =6,返回给观察者,以此类推,将6做为第一个元素,第二个元素取值4,将6+4=10返回给观察者。

sacn操做符是遍历源Observable产生的结果,再按照自定义规则进行运算,依次输出每次计算后的结果给订阅者


6 window()

6.1 作啥的?

发送事件时,将这些事件分为按数量从新分组。window 中的 count 的参数就是表明指定的数量,例如将 count 指定为2,那么每发2个数据就会将这2个数据分红一组。

window与buffer区别:window是把数据分割成了Observable,buffer是把数据分割成List

6.2 如何用?

Observable.just("鲁班", "孙尚香", "亚索","火女","盖伦")
                .window(2)
                .subscribe(new Consumer<Observable<String>>() {

                    @Override
                    public void accept(Observable<String> stringObservable) throws Exception {

                        Log.i("lybj", "分组开始");
                        stringObservable.subscribe(new Consumer<String>() {

                            @Override
                            public void accept(String s) throws Exception {

                                Log.i("lybj", s);
                            }
                        });
                    }
                });
复制代码

6.3 结果


6.组合操做符

1.concat()

1.1 作啥的?

能够将多个观察者组合在一块儿,而后按照以前发送顺序发送事件。须要注意的是,concat() 最多只能够发送4个事件。

1.2 怎么用?

private void concat(){

        Observable.concat(
                Observable.just(1, 2, 3),
                Observable.just(4, 5),
                Observable.just(6, 7),
                Observable.just(8, 9))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj", integer+"");
                    }
                });
    }
复制代码

1.3 结果


2.concatArray()

2.1 作啥的?

与 concat() 做用同样,不过 concatArray() 能够发送多于 4 个被观察者。

2.2 怎么用?

Observable.concatArray(Observable.just(1, 2, 3, 4),
                Observable.just(5, 6),
                Observable.just(7, 8, 9, 10),
                Observable.just(11, 12, 13),
                Observable.just(14, 15),
                Observable.just(16))
                .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

                Log.i("lybj", integer+"");
            }
        });
复制代码

2.3 结果


3.merge()

3.1 作啥的?

这个方法与 concat() 做用基本同样,可是 concat() 是串行发送事件,而 merge() 并行发送事件,也是只能发送4个。

3.2 怎么用?

Observable.merge(Observable.just(1, 2, 3, 4),
                Observable.just(5, 6),
                Observable.just(7, 8, 9, 10),
                Observable.just(11, 12, 13))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj", integer+"");
                    }
                });
复制代码

3.3 结果


4.zip()

4.1 作啥的?

zip操做符用于将多个数据源合并,并生成一个新的数据源。新生成的数据源严格按照合并前的数据源的数据发射顺序,而且新数据源的数据个数等于合并前发射数据个数最少的那个数据源的数据个数。

4.2 怎么用?

Observable.zip(Observable.just(1, 2, 3),
                Observable.just("A", "B", "C", "D", "E"),
                new BiFunction<Integer, String, String>(){

                    @Override
                    public String apply(Integer o1, String o2) throws Exception {

                        return o1 +"_"+ o2;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String o) throws Exception {

                        Log.i("lybj", o);
                    }
                });
复制代码

4.3 结果


5.startWith() & startWithArray()

5.1 作啥的?

在发送事件以前追加事件,startWith() 追加一个事件,startWithArray() 能够追加多个事件。追加的事件会先发出。

5.2 怎么用?

Observable.just(1, 2, 3)
               .startWithArray(4, 5)
               .subscribe(new Consumer<Integer>() {

                   @Override
                   public void accept(Integer integer) throws Exception {

                       Log.i("lybj", integer+"");
                   }
               });
复制代码

5.3 结果


6.count()

6.1 作啥的?

返回被观察者发送事件的数量。

6.2 怎么用?

Observable.just(2, 3, 4, 5, 6)
                .count()
                .subscribe(new Consumer<Long>() {

                    @Override
                    public void accept(Long aLong) throws Exception {

                        Log.i("lybj", "事件数量:" + aLong);
                    }
                });
复制代码

6.3 结果


7.功能操做符

1.delay()

1.1 作啥的?

延迟一段事件发送事件。

1.2 怎么用?

Observable.just(1,2,3,4)
                .delay(3, TimeUnit.SECONDS)
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

1.3 结果


2.周期函数

2.1 作啥的?

doOnEach(): 每次发送事件以前都会回调这个方法

doOnNext(): Observable 每发送 onNext() 以前都会先回调这个方法。

doAfterNext(): Observable 每发送 onNext() 以后都会回调这个方法。

doOnComplete(): Observable 每发送 onComplete() 以前都会回调这个方法。

doOnError(): Observable 每发送 onError() 以前都会回调这个方法。

doOnSubscribe(): Observable 每发送 onSubscribe()以前都会回调这个方法。

doOnDispose(): 当调用 Disposable 的 dispose() 以后回调该方法。

doOnTerminate(): 在 onError 或者 onComplete 发送以前回调。

doAfterTerminate(): onError 或者 onComplete 发送以后回调。

doFinally(): 在全部事件发送完毕以后回调该方法。若是取消订阅以后doAfterTerminate()就不会被回调,而doFinally()不管怎么样都会被回调,且都会在事件序列的最后。

2.2 怎么用?

Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).doOnEach(new Consumer<Notification<Integer>>() {

            @Override
            public void accept(Notification<Integer> integerNotification) throws Exception {

                Log.i("lybj",  "doOnEach 方法执行了, 结果:"+ integerNotification.getValue());
            }
        }).doOnNext(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {

                Log.i("lybj",  "doOnNext 方法执行了, 结果:"+ integer);
            }
        }).doAfterNext(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {

                Log.i("lybj",  "doAfterNext 方法执行了, 结果:"+ integer);
            }
        }).doOnComplete(new Action() {

            @Override
            public void run() throws Exception {

                Log.i("lybj",  "doOnComplete 方法执行了");
            }
        }).doOnError(new Consumer<Throwable>() {

            @Override
            public void accept(Throwable throwable) throws Exception {

                Log.i("lybj",  "doOnError 方法执行了");
            }
        }).doOnSubscribe(new Consumer<Disposable>() {

            @Override
            public void accept(Disposable disposable) throws Exception {

                Log.i("lybj",  "doOnSubscribe 方法执行了");
            }
        }).doOnDispose(new Action() {

            @Override
            public void run() throws Exception {

                Log.i("lybj",  "doOnDispose 方法执行了");
            }
        }).doOnTerminate(new Action() {

            @Override
            public void run() throws Exception {

                Log.i("lybj",  "doOnTerminate 方法执行了");
            }
        }).doAfterTerminate(new Action() {

            @Override
            public void run() throws Exception {

                Log.i("lybj",  "doAfterTerminate 方法执行了");
            }
        }).doFinally(new Action() {

            @Override
            public void run() throws Exception {

                Log.i("lybj",  "doFinally 方法执行了");
            }
        }).subscribe(new Observer<Integer>() {

            private Disposable disposable;

            @Override
            public void onSubscribe(Disposable d) {

                disposable = d;
                Log.i("lybj", "------观察者onSubscribe()执行");
            }

            @Override
            public void onNext(Integer integer) {

                Log.i("lybj", "------观察者onNext()执行:"+integer);
                if(integer == 2){
// disposable.dispose(); // 取消订阅
                }
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj", "------观察者onError()执行");
            }

            @Override
            public void onComplete() {

                Log.i("lybj", "------观察者onComplete()执行");
            }
        });
复制代码

2.3 结果


3.onErrorReturn()

3.1 作啥的?

当接受到一个 onError() 事件以后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列。

3.2 怎么用?

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("小明:到");
                emitter.onError(new IllegalStateException("error"));
                emitter.onNext("小方:到");
            }
        }).onErrorReturn(new Function<Throwable, String>() {

            @Override
            public String apply(Throwable throwable) throws Exception {

                Log.i("lybj",  "小红请假了");
                return "小李:到";
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj",  s);
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj",  e.getMessage());
            }

            @Override
            public void onComplete() {
            }
        });
复制代码

3.3 结果


4.onErrorResumeNext()

4.1 作啥的?

当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列。

4.2 怎么用?

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("小明");
                emitter.onNext("小方");
                emitter.onNext("小红");
                emitter.onError(new NullPointerException("error"));
            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends String>>() {

            @Override
            public ObservableSource<? extends String> apply(Throwable throwable) throws Exception {

                return Observable.just("1", "2", "3");
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

                Log.i("lybj",  "准备监听");
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj",  s);
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj",  e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i("lybj",  "onComplete");
            }
        });
复制代码

4.3 结果


5.onExceptionResumeNext()

5.1 作啥的?

与 onErrorResumeNext() 做用基本一致,可是这个方法只能捕捉 Exception。

5.2 怎么用?

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("小明");
                emitter.onNext("小方");
                emitter.onNext("小红");
                emitter.onError(new Error("error"));
            }
        }).onExceptionResumeNext(new Observable<String>() {

            @Override
            protected void subscribeActual(Observer observer) {

                observer.onNext("小张");
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

                Log.i("lybj",  "准备监听");
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj",  s);
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj",  e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i("lybj",  "onComplete");
            }
        });
复制代码

5.3 结果


6.retry()

6.1 作啥的?

若是出现错误事件,则会从新发送全部事件序列。times 是表明从新发的次数。

6.2 怎么用?

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onError(new IllegalStateException());
            }
        }).retry(2)
          .subscribe(new Observer<String>() {

              @Override
              public void onSubscribe(Disposable d) {

                  Log.i("lybj",  "准备监听");
              }

              @Override
              public void onNext(String s) {

                  Log.i("lybj",  s);
              }

              @Override
              public void onError(Throwable e) {

                  Log.i("lybj",  e.getMessage());
              }

              @Override
              public void onComplete() {
                  Log.i("lybj",  "onComplete");
              }
          });
复制代码

6.3 结果


7.retryUntil()

7.1 作啥的?

出现错误事件以后,能够经过此方法判断是否继续发送事件。

7.2 怎么用?

Observable.create(new ObservableOnSubscribe<String>() {

            public void subscribe(@NonNull ObservableEmitter<String> emitter){

                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onError(new NullPointerException("error"));
                emitter.onNext("4");
                emitter.onNext("5");
            }
        }).retryUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {

                Log.i("lybj",  "getAsBoolean");
                return true;
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj",  s);
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });
复制代码

7.3 结果


8.repeat()

8.1 作啥的?

重复发送被观察者的事件,times 为发送次数。

8.2 怎么用?

Observable.just(1,2,3)
                .repeat(2)
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

8.3 结果


9.subscribeOn() & observeOn()

9.1 作啥的?

subscribeOn(): 指定被观察者的线程,若是屡次调用此方法,只有第一次有效。 observeOn(): 指定观察者的线程

9.2 怎么用?

Observable.create(new ObservableOnSubscribe<String>() {

            public void subscribe(@NonNull ObservableEmitter<String> emitter){

                emitter.onNext("1");
                Log.i("lybj",  Thread.currentThread().getName());
            }
        }).subscribeOn(Schedulers.io())
          .observeOn(Schedulers.newThread())
          .subscribe(new Consumer<String>() {

               @Override
               public void accept(String s) throws Exception {

                   Log.i("lybj",  s);
                   Log.i("lybj",  Thread.currentThread().getName());
               }
          });
复制代码

9.3 结果


8.过滤操做符

1.filter()

1.1 作啥的?

若是返回 true 则会发送事件,不然不会发送

1.2 怎么用?

Observable.just(1,2,3,4,5)
          .filter(new Predicate<Integer>() {
                    
                    @Override
                    public boolean test(Integer integer) throws Exception {

                        if(integer > 4){
                            return true;
                        }
                        return false;
                    }
           }).subscribe(new Consumer<Integer>() {
            
               @Override
               public void accept(Integer integer) throws Exception {

                   Log.i("lybj",  integer+"");
               }
          });
复制代码

1.3 结果


2.ofType()

2.1 作啥的?

能够过滤不符合该类型事件

2.2 怎么用?

Observable.just(1, 2, 3, "小明", "小方")
                .ofType(String.class)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {

                        Log.i("lybj",  s+"");
                    }
                });
复制代码

2.3 结果


3.skip()

3.1 作啥的?

跳过正序某些事件,count 表明跳过事件的数量

3.2 怎么用?

Observable.just(1,2,3,4,5,6,7)
                .skip(2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

3.3 结果


4.distinct()

4.1 作啥的?

过滤事件序列中的重复事件。

4.2 作啥的?

Observable.just(1,2,3,1,4,1,2)
                .distinct()
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

4.3 结果


5.distinctUntilChanged()

5.1 作啥的?

过滤掉连续重复的事件

5.2 作啥的?

Observable.just(1,2,3,3,1,5,6)
        .distinctUntilChanged()
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

                Log.i("lybj",  integer+"");
            }
        });
复制代码

5.3 结果


6.take()

6.1 作啥的?

控制观察者接收的事件的数量。

6.2 怎么用?

Observable.just(1,2,3,4,5,6)
                .take(3)
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

6.3 结果


7.debounce()

7.1 作啥的?

若是两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者。 简单来讲就是防抖动,好比按钮控制快速点击等。

7.2 怎么用?

Observable.just(1,2,3,4,5)
                .map(new Function<Integer, Integer>() {

                    @Override
                    public Integer apply(Integer integer) throws Exception {

                        Thread.sleep(900);
                        return integer;
                    }
                })
                .debounce(1,TimeUnit.SECONDS)
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

7.3 结果


8.firstElement() && lastElement() && elementAt()

8.1 作啥的?

firstElement(): 取事件序列的第一个元素。

lastElement(): 取事件序列的最后一个元素。

elementAt(): 以指定取出事件序列中事件,可是输入的 index 超出事件序列的总数的话就不会出现任何结果。

8.2 怎么用?

Observable.just(1,2,3,4)
                .firstElement()
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

8.3 结果


9.条件操做符

1.all()

1.1 作啥的?

判断事件序列是否所有知足某个事件,若是都知足则返回 true,反之则返回 false。

1.2 怎么用?

Observable.just(1, 2, 3, 4, 5)
                .all(new Predicate<Integer>() {

                    @Override
                    public boolean test(Integer integer) throws Exception {

                        return integer <= 4;
                    }
                }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {

                Log.i("lybj",  aBoolean+"");
            }
        });
复制代码

1.3 结果


2.takeWhile() & takeUntil()

2.1 作啥的?

takeWhile(): 从左边开始,将知足条件的元素取出来,直到遇到第一个不知足条件的元素,则终止 takeUntil(): 从左边开始,将知足条件的元素取出来,直到遇到第一个知足条件的元素,则终止 filter(): 是将全部知足条件的数据都取出。

2.2 怎么用?

Observable.just(1, 2, 3, 4, 5)
                .takeWhile(new Predicate<Integer>() {

                    @Override
                    public boolean test(Integer integer) throws Exception {

                        return integer < 3;
                    }
                }).subscribe(new Consumer<Integer>() {
            
                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

2.3 结果


3.skipWhile

3.1 作啥的?

从左边开始,根据条件跳过元素

3.2 怎么用?

Observable.just(1,2,3,4,5,3,2,1,7)
                .skipWhile(new Predicate<Integer>() {

                    @Override
                    public boolean test(Integer integer) throws Exception {

                        return integer < 3;
                    }
                }).subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

3.3 结束


4.isEmpty() & defaultIfEmpty()

4.1 作啥的?

isEmpty(): 判断事件序列是否为空。

defaultIfEmpty(): 若是观察者只发送一个 onComplete() 事件,则能够利用这个方法发送一个值。

4.2 怎么用?

Observable.create(new ObservableOnSubscribe<String>() {

            public void subscribe(@NonNull ObservableEmitter<String> emitter){

                emitter.onComplete();
            }
        }).isEmpty()
          .subscribe(new Consumer<Boolean>() {

              @Override
              public void accept(Boolean aBoolean) throws Exception {

                  Log.i("lybj",  aBoolean+"");
              }
          });
复制代码

4.3 结果


5.contains()

5.1 作啥的?

判断事件序列中是否含有某个元素,若是有则返回 true,若是没有则返回 false。

5.2 怎么用?

在Observable.just(1,2,3,4,5,6)
                .contains(2)
                .subscribe(new Consumer<Boolean>() {

                    @Override
                    public void accept(Boolean aBoolean) throws Exception {

                        Log.i("lybj",  aBoolean+"");
                    }
                });
复制代码

5.3 结果


6.sequenceEqual()

6.1 作啥的?

判断两个 Observable 发送的事件是否相同。

6.2 怎么用?

Observable.sequenceEqual(Observable.just("小明", "小方", "小李"),
                Observable.just("小明", "小方", "小李", "小张"))
                .subscribe(new Consumer<Boolean>() {

                    @Override
                    public void accept(Boolean aBoolean) throws Exception {

                        Log.i("lybj",  aBoolean+"");
                    }
                });
复制代码

6.3 结果


10.源码

demo下载

相关文章
相关标签/搜索