3章 RxJava操做符

本篇文章已受权微信公众号 YYGeeker 独家发布转载请标明出处java

CSDN学院课程地址react

3. RxJava操做符

RxJava操做符也是其精髓之一,能够经过一个简单的操做符,实现复杂的业务逻辑,甚至还能够将操做符组合起来(即RxJava的组合过程),完成更为复杂的业务需求。好比咱们前面用到的.create().subscribeOn().observeOn().subscribe()都是RxJava的操做符之一,下面咱们将对RxJava的操做符进行分析c++

掌握RxJava操做符前,首先要学会看得懂RxJava的图片,图片是RxJava主导的精髓,下面咱们经过例子说明c#

这张图片咱们先要分清楚概念上的东西,上下两行横向的直线区域表明着事件流,上面一行(上游)是咱们的被观察者Observable,下面一行(下游)是咱们的观察者Observer,事件流就是从上游的被观察者发送给下游的观察者的。而中间一行的flatMap区域则是咱们的操做符部分,它能够对咱们的数据进行变换操做。最后,数据流则是图片上的圆形、方形、菱形等区域,也是从上游流向下游的,不一样的形状表明着不一样的数据类型缓存

这张图片并非表示没有被观察者Observable,而是Create方法自己就是建立了被观察者,因此能够将被观察者的上游省略。在进行事件的onNext()分发后,执行onComplete()事件,这样就表示事件流已经结束,后续若是上游继续发事件,则下游表示不接收。当事件流的onCompleted()或者onError()正好被调用过一次后,此后就不能再调用观察者的任何其它回调方法bash

在理解RxJava操做符以前,须要将这几个概念弄明白,整个操做符的章节都是围绕这几个概念进行的微信

  • 事件流:经过发射器发射的事件,从发射事件到结束事件的过程,这一过程称为事件流
  • 数据流:经过发射器发射的数据,从数据输入到数据输出的过程,这一过程称为数据流
  • 被观察者:事件流的上游,即Observable,事件流开始的地方和数据流发射的地方
  • 观察者:事件流的下游,即Observer,事件流结束的地方和数据流接收的地方

3.1 Creating Observables (建立操做符)

一、create网络

Observable最原始的建立方式,建立出一个最简单的事件流,可使用发射器发射特定的数据类型数据结构

public static void main(String[] args) {
    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    for (int i = 1; i < 5; i++) {
                        e.onNext(i);
                    }
                    e.onComplete();
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {

                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}
复制代码

输出并发

onNext=1
onNext=2
onNext=3
onNext=4
onComplete
复制代码

二、from

建立一个事件流并发出特定类型的数据流,其发射的数据流类型有以下几个操做符

public static void main(String[] args) {
    Observable.fromArray(new Integer[]{1, 2, 3, 4, 5})
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

三、just

just操做符和from操做符很像,只是方法的参数有所差异,它能够接受多个参数

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

四、defer

defer与just的区别是,just是直接将发射当前的数据流,而defer会等到订阅的时候,才会去执行它的call()回调,再去发射当前的数据流。复杂点的理解就是:defer操做符是将一组数据流在原有的事件流基础上缓存一个新的事件流,直到有人订阅的时候,才会建立它缓存的事件流

public static void main(String[] args) {

    i = 10;

    Observable<Integer> just = Observable.just(i, i);
    Observable<Object> defer = Observable.defer(new Callable<ObservableSource<?>>() {
        @Override
        public ObservableSource<?> call() throws Exception {
            //缓存新的事件流
            return Observable.just(i, i);
        }
    });

    i = 15;

    just.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("onNext=" + integer);
        }
    });

    defer.subscribe(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
            System.out.println("onNext=" + (int) o);
        }
    });

    i = 20;

    defer.subscribe(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
            System.out.println("onNext=" + (int) o);
        }
    });
}
复制代码

输出

onNext=10
onNext=10
onNext=15
onNext=15
onNext=20
onNext=20
复制代码

五、interval

interval操做符是按固定的时间间隔发射一个无限递增的整数数据流,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行,interval默认在computation调度器上执行

public void interval() {
    Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("onNext=" + aLong);
                }
            });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
......
复制代码

六、range

range操做符发射一个范围内的有序整数数据流,你能够指定范围的起始和长度

public static void main(String[] args) {
    Observable.range(1, 5)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

七、repeat

repeat操做符能够重复发送指定次数的某个事件流,repeat操做符默认在trampoline调度器上执行

public static void main(String[] args) {
    Observable.just(1).repeat(5)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

onNext=1
onNext=1
onNext=1
onNext=1
onNext=1
复制代码

八、timer

timer操做符能够建立一个延时的事件流,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行,默认在computation调度器上执行

public void timer() {
    Observable.timer(5, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("onNext=" + aLong);
                }
            });
}
复制代码

输出

onNext=0
复制代码

九、小结

  1. create():建立最简单的事件流
  2. from():建立事件流,可发送不一样类型的数据流
  3. just():建立事件流,可发送多个参数的数据流
  4. defer():建立事件流,可缓存可激活事件流
  5. interval():建立延时重复的事件流
  6. range():建立事件流,可发送范围内的数据流
  7. repeat():建立可重复次数的事件流
  8. timer():建立一次延时的事件流

补充:interval()、timer()、delay()的区别

  1. interval():用于建立事件流,周期性重复发送
  2. timer():用于建立事件流,延时发送一次
  3. delay():用于事件流中,能够延时某次事件流的发送

3.2 Transforming Observables (转换操做符)

一、map

map操做符能够将数据流进行类型转换

public static void main(String[] args) {
    Observable.just(1).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return "发送过来的数据会被变成字符串" + integer;
        }
    })
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println("onNext=" + s);
                }
            });
}
复制代码

输出

onNext=发送过来的数据会被变成字符串1
复制代码

二、flatMap

flatMap操做符将数据流进行类型转换,而后将新的数据流传递给新的事件流进行分发,这里经过模拟请求登陆的延时操做进行说明,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行

public void flatMap() {
    Observable.just(new UserParams("hensen", "123456")).flatMap(new Function<UserParams, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(UserParams userParams) throws Exception {
            return Observable.just(userParams.username + "登陆成功").delay(2, TimeUnit.SECONDS);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });
}

public static class UserParams {

    public UserParams(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String username;
    public String password;
}
复制代码

输出

hensen登陆成功
复制代码

补充:

  • concatMap与flatMap功能同样,惟一的区别就是concatMap是有序的,flatMap是乱序的

三、groupBy

groupBy操做符能够将发射出来的数据项进行分组,并将分组后的数据项保存在具备key-value映射的事件流中。groupBy具体的分组规则由groupBy操做符传递进来的函数参数Function所决定的,它能够将key和value按照Function的返回值进行分组,返回一个具备分组规则的事件流GroupedObservable,注意这里分组出来的事件流是按照原始事件流的顺序输出的,咱们能够经过sorted()对数据项进行排序,而后输出有序的数据流。

public static void main(String[] args) {
    Observable.just("java", "c++", "c", "c#", "javaScript", "Android")
            .groupBy(new Function<String, Character>() {
                @Override
                public Character apply(String s) throws Exception {
                    return s.charAt(0);//按首字母分组
                }
            })
            .subscribe(new Consumer<GroupedObservable<Character, String>>() {
                @Override
                public void accept(final GroupedObservable<Character, String> characterStringGroupedObservable) throws Exception {
                    //排序后,直接订阅输出key和value
                    characterStringGroupedObservable.sorted().subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            System.out.println("onNext= key:" + characterStringGroupedObservable.getKey() + " value:" + s);
                        }
                    });
                }
            });
}
复制代码

输出

onNext= key:A value:Android
onNext= key:c value:c
onNext= key:c value:c#
onNext= key:c value:c++
onNext= key:j value:java
onNext= key:j value:javaScript
复制代码

四、scan

scan操做符会对发射的数据和上一轮发射的数据进行函数处理,并返回的数据供下一轮使用,持续这个过程来产生剩余的数据流。其应用场景有简单的累加计算,判断全部数据的最小值等

public static void main(String[] args) {
    Observable.just(8, 2, 13, 1, 15).scan(new BiFunction<Integer, Integer, Integer>() {
        @Override
        public Integer apply(Integer integer, Integer integer2) throws Exception {
            return integer < integer2 ? integer : integer2;
        }
    })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer item) throws Exception {
                    System.out.println("onNext=" + item);
                }
            });
}
复制代码

输出

onNext=8
onNext=2
onNext=2
onNext=1
onNext=1
复制代码

五、buffer

buffer操做符能够将发射出来的数据流,在给定的缓存池中进行缓存,当缓存池中的数据项溢满时,则将缓存池的数据项进行输出,重复上述过程,直到将发射出来的数据所有发射出去。若是发射出来的数据不够缓存池的大小,则按照当前发射出来的数量进行输出。若是对buffer操做符设置了skip参数,则buffer每次缓存池溢满时,会跳过指定的skip数据项,而后再进行缓存和输出。

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
        .buffer(5).subscribe(new Consumer<List<Integer>>() {
    @Override
    public void accept(List<Integer> integers) throws Exception {
        System.out.println("onNext=" + integers.toString());
    }
});
复制代码

输出

onNext=[1, 2, 3, 4, 5]
onNext=[6, 7, 8, 9]
复制代码

六、window

window操做符和buffer操做符在功能上实现的效果是同样的,但window操做符最大区别在于一样是缓存必定数量的数据项,window操做符最终发射出来的是新的事件流integerObservable,而buffer操做符发射出来的是新的数据流,也就是说,window操做符发射出来新的事件流中的数据项,还能够通过Rxjava其余操做符进行处理。

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
            .window(2, 1).subscribe(new Consumer<Observable<Integer>>() {
        @Override
        public void accept(Observable<Integer> integerObservable) throws Exception {
            integerObservable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
        }
    });
}
复制代码

输出

onNext=1
onNext=2
onNext=2
onNext=3
onNext=3
onNext=4
onNext=4
onNext=5
onNext=5
onNext=6
onNext=6
onNext=7
onNext=7
onNext=8
onNext=8
onNext=9
onNext=9
复制代码

七、小结

  1. map():对数据流的类型进行转换
  2. flatMap():对数据流的类型进行包装成另外一个数据流
  3. groupby():对全部的数据流进行分组
  4. scan():对上一轮处理事后的数据流进行函数处理
  5. buffer():缓存发射的数据流到必定数量,随后发射出数据流集合
  6. window():缓存发射的数据流到必定数量,随后发射出新的事件流

3.3 Filtering Observables (过滤操做符)

一、debounce

debounce操做符会去过滤掉发射速率过快的数据项,下面的例子onNext事件能够想象成按钮的点击事件,若是在2秒种内频繁的点击,则其点击事件会被忽略,当i为3的除数的时候,发射的事件的时间会超过规定忽略事件的时间,那么则容许触发点击事件。这就有点像咱们频繁点击按钮,但始终只会触发一次点击事件,这样就不会致使重复去响应点击事件

public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 100; i++) {
                if (i % 3 == 0) {
                    Thread.sleep(3000);
                } else {
                    Thread.sleep(1000);
                }
                emitter.onNext(i);
            }
        }
    }).debounce(2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

onNext=2
onNext=5
onNext=8
onNext=11
onNext=14
......
复制代码

二、distinct

distinct操做符会过滤重复发送的数据项

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3).distinct()
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
复制代码

三、elementAt

elementAt操做符只取指定的角标的事件

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3).elementAt(0)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

onNext=1
复制代码

四、filter

filter操做符能够过滤指定函数的数据项

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3)
            .filter(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    return integer > 2;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

onNext=3
onNext=4
onNext=3
复制代码

五、first

first操做符只发射第一项数据项

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3)
            .first(7)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

onNext=1
复制代码

六、ignoreElements

ignoreElements操做符不发射任何数据,只发射事件流的终止通知

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3)
            .ignoreElements()
            .subscribe(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}
复制代码

输出

onComplete
复制代码

七、last

last操做符只发射最后一项数据

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3)
            .last(7)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

onNext=3
复制代码

八、sample

sample操做符会在指定的事件内从数据项中采集所须要的数据,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行

public void sample() {
    Observable.interval(1, TimeUnit.SECONDS)
            .sample(2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("onNext=" + aLong);
                }
            });
}
复制代码

输出

onNext=2
onNext=4
onNext=6
onNext=8
复制代码

九、skip

skip操做符能够忽略事件流发射的前N项数据项,只保留以后的数据

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
            .skip(3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer i) throws Exception {
                    System.out.println("onNext=" + i);
                }
            });
}
复制代码

输出

onNext=4
onNext=5
onNext=6
onNext=7
onNext=8
复制代码

十、skipLast

skipLast操做符能够抑制事件流发射的后N项数据

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
            .skipLast(3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer i) throws Exception {
                    System.out.println("onNext=" + i);
                }
            });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

十一、take

take操做符能够在事件流中只发射前面的N项数据

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
            .take(3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer i) throws Exception {
                    System.out.println("onNext=" + i);
                }
            });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
复制代码

十二、takeLast

takeLast操做符事件流只发射数据流的后N项数据项,忽略前面的数据项

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
            .takeLast(3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer i) throws Exception {
                    System.out.println("onNext=" + i);
                }
            });
}
复制代码

输出

onNext=6
onNext=7
onNext=8
复制代码

还有一个操做符叫takeLastBuffer,它和takeLast相似,,惟一的不一样是它把全部的数据项收集到一个List再发射,而不是依次发射一个

1三、小结

  1. debounce():事件流只发射规定范围时间内的数据项
  2. distinct():事件流只发射不重复的数据项
  3. elementAt():事件流只发射第N个数据项
  4. filter():事件流只发射符合规定函数的数据项
  5. first():事件流只发射第一个数据项
  6. ignoreElements():忽略事件流的发射,只发射事件流的终止事件
  7. last():事件流只发射最后一项数据项
  8. sample():事件流对指定的时间间隔进行数据项的采样
  9. skip():事件流忽略前N个数据项
  10. skipLast():事件流忽略后N个数据项
  11. take():事件流只发射前N个数据项
  12. takeLast():事件流只发射后N个数据项

3.4 Combining Observables (组合操做符)

一、merge/concat

merge操做符能够合并两个事件流,若是在merge操做符上增长延时发送的操做,那么就会致使其发射的数据项是无序的,会跟着发射的时间点进行合并。虽然是将两个事件流合并成一个事件流进行发射,但在最终的一个事件流中,发射出来的倒是两次数据流。因为concat操做符和merge操做符的效果是同样的,这里只举一例

merge和concat的区别

  • merge():合并后发射的数据项是无序的
  • concat():合并后发射的数据项是有序的
public static void main(String[] args) {
    Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
    Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");

    Observable.merge(just1, just2).subscribe(new Consumer<Serializable>() {
        @Override
        public void accept(Serializable serializable) throws Exception {
            System.out.println("onNext=" + serializable.toString());
        }
    });
}
复制代码

输出

onNext=A
onNext=B
onNext=C
onNext=D
onNext=E
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

二、zip

zip操做符是将两个数据流进行指定的函数规则合并

public static void main(String[] args) {
    Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
    Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");

    Observable.zip(just1, just2, new BiFunction<String, String, String>() {
        @Override
        public String apply(String s, String s2) throws Exception {
            return s + s2;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("onNext=" + s);
        }
    });
}
复制代码

输出

onNext=A1
onNext=B2
onNext=C3
onNext=D4
onNext=E5
复制代码

三、startWith

startWith操做符是将另外一个数据流合并到原数据流的开头

public static void main(String[] args) {
    Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
    Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");

    just1.startWith(just2).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("onNext=" + s);
        }
    });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=A
onNext=B
onNext=C
onNext=D
onNext=E
复制代码

四、join

join操做符是有时间期限的合并操做符,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行

public void join() {
    Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
    Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);

    just1.join(just2, new Function<String, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(String s) throws Exception {
            return Observable.timer(3, TimeUnit.SECONDS);
        }
    }, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long l) throws Exception {
            return Observable.timer(8, TimeUnit.SECONDS);
        }
    }, new BiFunction<String, Long, String>() {
        @Override
        public String apply(String s, Long l) throws Exception {
            return s + l;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("onNext=" + s);
        }
    });
}
复制代码

join操做符有三个函数须要设置

  • 第一个函数:规定just2的过时期限
  • 第二个函数:规定just1的过时期限
  • 第三个函数:规定just1和just2的合并规则

因为just2的期限只有3秒的时间,而just2延时1秒发送一次,因此just2只发射了2次,其输出的结果就只能和just2输出的两次进行合并,其输出格式有点相似咱们的排列组合

onNext=A0
onNext=B0
onNext=C0
onNext=D0
onNext=E0
onNext=A1
onNext=B1
onNext=C1
onNext=D1
onNext=E1
复制代码

五、combineLatest

conbineLatest操做符会寻找其余事件流最近发射的数据流进行合并,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行

public static String[] str = {"A", "B", "C", "D", "E"};

public void combineLatest() {
    Observable<String> just1 = Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {
        @Override
        public String apply(Long aLong) throws Exception {
            return str[(int) (aLong % 5)];
        }
    });
    Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);

    Observable.combineLatest(just1, just2, new BiFunction<String, Long, String>() {
        @Override
        public String apply(String s, Long l) throws Exception {
            return s + l;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("onNext=" + s);
        }
    });
}
复制代码

输出

onNext=A0
onNext=B0
onNext=B1
onNext=C1
onNext=C2
onNext=D2
onNext=D3
onNext=E3
onNext=E4
onNext=A4
onNext=A5
复制代码

六、小结

  1. merge()/concat():无序/有序的合并两个数据流
  2. zip():两个数据流的数据项合并成一个数据流一同发出
  3. startWith():将待合并的数据流放在自身前面一同发出
  4. join():将数据流进行排列组合发出,不过数据流都是有时间期限的
  5. combineLatest():合并最近发射出的数据项成数据流一同发出

3.5 Error Handling Operators(错误处理操做符)

一、onErrorReturn

onErrorReturn操做符表示当错误发生时,它会忽略onError的回调且会发射一个新的数据项并回调onCompleted()

public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            for (int i = 1; i < 5; i++) {
                if(i == 4){
                    e.onError(new Exception("onError crash"));
                }
                e.onNext(i);
            }
        }
    })
            .onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) throws Exception {
                    return -1;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            }, new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
复制代码

二、onErrorResumeNext

onErrorResumeNext操做符表示当错误发生时,它会忽略onError的回调且会发射一个新的事件流并回调onCompleted()

public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            for (int i = 1; i < 5; i++) {
                if(i == 4){
                    e.onError(new Exception("onError crash"));
                }
                e.onNext(i);
            }
        }
    })
            .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
                @Override
                public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
                    return Observable.just(-1);
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            }, new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
复制代码

三、onExceptionResumeNext

onExceptionResumeNext操做符表示当错误发生时,若是onError收到的Throwable不是一个Exception,它会回调onError方法,且不会回调备用的事件流,若是onError收到的Throwable是一个Exception,它会回调备用的事件流进行数据的发射

public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            for (int i = 1; i < 5; i++) {
                if(i == 4){
                    e.onError(new Exception("onException crash"));
                    //e.onError(new Error("onError crash"));
                }
                e.onNext(i);
            }
        }
    })
            .onExceptionResumeNext(new ObservableSource<Integer>() {
                @Override
                public void subscribe(Observer<? super Integer> observer) {
                    //备用事件流
                    observer.onNext(8);
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            }, new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=8
复制代码

四、retry

retry操做符表示当错误发生时,发射器会从新发射

public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            for (int i = 1; i < 5; i++) {
                if (i == 4) {
                    e.onError(new Exception("onError crash"));
                }
                e.onNext(i);
            }
        }
    })
            .retry(1)
            .onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) throws Exception {
                    return -1;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            }, new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
复制代码
  • retry():表示重试无限次
  • retry(long times):表示重试指定次数
  • retry(Func predicate):能够根据函数参数中的Throwable类型和重试次数决定本次需不须要重试

五、retryWhen

retryWhen操做符和retry操做符类似,区别在于retryWhen将错误Throwable传递给了函数进行处理并产生新的事件流进行处理,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行

private static int retryCount = 0;
private static int maxRetries = 2;

public void retryWhen(){
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            for (int i = 1; i < 5; i++) {
                if (i == 4) {
                    e.onError(new Exception("onError crash"));
                }
                e.onNext(i);
            }
        }
    })
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Throwable throwable) throws Exception {
                            if (++retryCount <= maxRetries) {
                                // When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
                                System.out.println("get error, it will try after " + 1 + " seconds, retry count " + retryCount);
                                return Observable.timer(1, TimeUnit.SECONDS);
                            }
                            return Observable.error(throwable);
                        }
                    });
                }
            })
            .onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) throws Exception {
                    return -1;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            }, new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 1
onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 2
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
复制代码

六、小结

  • onErrorReturn():当错误发生时,它会忽略onError的回调且会发射一个新的数据项并回调onCompleted()
  • onErrorResumeNext():当错误发生时,它会忽略onError的回调且会发射一个新的事件流并回调onCompleted()
  • onExceptionResumeNext():当错误发生时,若是onError收到的Throwable不是一个Exception,它会回调onError方法,且不会回调备用的事件流,若是onError收到的Throwable是一个Exception,它会回调备用的事件流进行数据的发射
  • retry():当错误发生时,发射器会从新发射
  • retryWhen():当错误发生时,根据Tharowble类型决定发射器是否从新发射

3.6 Observable Utility Operators(辅助性操做符)

一、delay

delay操做符能够延时某次事件发送的数据流,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行

public void deley() {
    Observable.just(1, 2, 3, 4, 5).delay(2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

delay和delaySubscription的效果是同样的,只不过delay是对数据流的延时,而delaySubscription是对事件流的延时

二、do

do操做符能够监听整个事件流的生命周期,do操做符分为多个类型,并且每一个类型的做用都不一样

  1. doOnNext():接收每次发送的数据项
  2. doOnEach():接收每次发送的数据项
  3. doOnSubscribe():当事件流被订阅时被调用
  4. doOnDispose():当事件流被释放时被调用
  5. doOnComplete():当事件流被正常终止时被调用
  6. doOnError():当事件流被异常终止时被调用
  7. doOnTerminate():当事件流被终止以前被调用,不管正常终止仍是异常终止都会调用
  8. doFinally():当事件流被终止以后被调用,不管正常终止仍是异常终止都会调用
public static void main(String[] args) {
    Observable.just(1, 2, 3)
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("doOnNext");
                }
            })
            .doOnEach(new Consumer<Notification<Integer>>() {
                @Override
                public void accept(Notification<Integer> integerNotification) throws Exception {
                    System.out.println("doOnEach");
                }
            })
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    System.out.println("doOnSubscribe");
                }
            })
            .doOnDispose(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("doOnDispose");
                }
            })
            .doOnTerminate(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("doOnTerminate");
                }
            })
            .doOnError(new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("doOnError");
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("doOnComplete");
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("doFinally");
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

doOnSubscribe
doOnNext
doOnEach
onNext=1
doOnNext
doOnEach
onNext=2
doOnNext
doOnEach
onNext=3
doOnEach
doOnTerminate
doOnComplete
doFinally
复制代码

三、materialize/dematerialize

materialize操做符将发射出的数据项转换成为一个Notification对象,而dematerialize操做符则是跟materialize操做符相反,这两个操做符有点相似咱们Java对象的装箱和拆箱功能

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5).materialize()
            .subscribe(new Consumer<Notification<Integer>>() {
                @Override
                public void accept(Notification<Integer> integerNotification) throws Exception {
                    System.out.println("onNext=" + integerNotification.getValue());
                }
            });
    
    Observable.just(1, 2, 3, 4, 5).materialize().dematerialize()
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object object) throws Exception {
                    System.out.println("onNext=" + object.toString());
                }
            });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=null
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

输出的时候,materialize会输出多个null,是由于null的事件为onCompleted事件,而dematerialize把onCompleted事件给去掉了,这个缘由也能够从图片中看出来

四、serialize

serialize操做符能够将异步执行的事件流进行同步操做,直到事件流结束

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5).serialize()
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

五、timeInterval

timeInterval操做符能够将发射的数据项转换为带有时间间隔的数据项,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行

public void timeInterval(){
    Observable.interval(2, TimeUnit.SECONDS).timeInterval(TimeUnit.SECONDS)
            .subscribe(new Consumer<Timed<Long>>() {
                @Override
                public void accept(Timed<Long> longTimed) throws Exception {
                    System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time());
                }
            });
}
复制代码

输出

onNext=0 timeInterval=2
onNext=1 timeInterval=2
onNext=2 timeInterval=2
onNext=3 timeInterval=2
onNext=4 timeInterval=2
复制代码

六、timeout

timeout操做符表示当发射的数据项超过了规定的限制时间,则发射onError事件,这里直接让程序超过规定的限制时间,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行

public void timeOut(){
    Observable.interval(2, TimeUnit.SECONDS).timeout(1, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("onNext=" + aLong);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            });
}
复制代码

输出

onError
复制代码

七、timestamp

timestamp操做符会给每一个发射的数据项带上时间戳,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行

public void timeStamp() {
    Observable.interval(2, TimeUnit.SECONDS).timestamp(TimeUnit.MILLISECONDS)
            .subscribe(new Consumer<Timed<Long>>() {
                @Override
                public void accept(Timed<Long> longTimed) throws Exception {
                    System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time());

                }
            });
}
复制代码

输出

onNext=0 timeInterval=1525755132132
onNext=1 timeInterval=1525755134168
onNext=2 timeInterval=1525755136132
onNext=3 timeInterval=1525755138132
复制代码

八、using

using操做符可让你的事件流存在一次性的数据项,即用完就将资源释放掉

using操做符接受三个参数:

  • 一个用户建立一次性资源的工厂函数
  • 一个用于建立一次性事件的工厂函数
  • 一个用于释放资源的函数
public static class UserBean {
    String name;
    int age;

    public UserBean(String name, int age) {
        this.name = name;
        this.age = age;
    }
}

public static void main(String[] args) {
    Observable.using(new Callable<UserBean>() {
        @Override
        public UserBean call() throws Exception {
            //从网络中获取某个对象
            return new UserBean("俊俊俊", 22);
        }
    }, new Function<UserBean, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(UserBean userBean) throws Exception {
            //拿出你想要的资源
            return Observable.just(userBean.name);
        }
    }, new Consumer<UserBean>() {
        @Override
        public void accept(UserBean userBean) throws Exception {
            //释放对象
            userBean = null;
        }
    }).subscribe(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
            System.out.println("onNext=" + o.toString());
        }
    });
}
复制代码

输出

onNext=俊俊俊
复制代码

九、to

to操做符能够将数据流中的数据项进行集合的转换,to操做符分为多个类型,并且每一个类型的做用都不一样

  1. toList():转换成List类型的集合
  2. toMap():转换成Map类型的集合
  3. toMultimap():转换成一对多(即<A类型,List<B类型>>)的Map类型的集合
  4. toSortedList():转换成具备排序的List类型的集合
public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5).toList()
            .subscribe(new Consumer<List<Integer>>() {
                @Override
                public void accept(List<Integer> integers) throws Exception {
                    System.out.println("onNext=" + integers.toString());
                }
            });
}
复制代码

输出

onNext=[1, 2, 3, 4, 5]
复制代码

十、小结

  1. delay():延迟事件发射的数据项
  2. do():监听事件流的生命周期
  3. materialize()/dematerialize():对事件流进行装箱/拆箱
  4. serialize():同步事件流的发射
  5. timeInterval():对事件流增长时间间隔
  6. timeout():对事件流增长限定时间
  7. timestamp():对事件流增长时间戳
  8. using():对事件流增长一次性的资源
  9. to():对数据流中的数据项进行集合的转换

3.7 Conditional and Boolean Operators(条件和布尔操做符)

一、all

all操做符表示对全部数据项进行校验,若是全部都经过则返回true,不然返回false

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5)
            .all(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    return integer > 0;
                }
            })
            .subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    System.out.println("onNext=" + aBoolean);
                }
            });
}
复制代码

输出

onNext=true
复制代码

二、contains

contains操做符表示事件流中发射的数据项当中是否包含有指定的数据项

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5)
            .contains(2)
            .subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    System.out.println("onNext=" + aBoolean);
                }
            });
}
复制代码

输出

onNext=true
复制代码

三、amb

amb操做符在多个事件流中只发射最早发出数据的事件流,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行

public void amb(){
    List<Observable<Integer>> list = new ArrayList<>();
    list.add(Observable.just(1, 2, 3).delay(3, TimeUnit.SECONDS));
    list.add(Observable.just(4, 5, 6).delay(2, TimeUnit.SECONDS));
    list.add(Observable.just(7, 8, 9).delay(1, TimeUnit.SECONDS));

    Observable.amb(list)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

onNext=7
onNext=8
onNext=9
复制代码

四、defaultIfEmpty

defaultIfEmpty操做符会在事件流没有发射任何数据时,发射一个指定的默认值

public static void main(String[] args) {
    Observable.empty()
            .defaultIfEmpty(-1)
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object o) throws Exception {
                    System.out.println("onNext=" + o.toString());
                }
            });
}
复制代码

输出

onNext=-1
复制代码

五、sequenceEqual

sequenceEqual操做符能够判断两个数据流是否彻底相等

public static void main(String[] args) {
    Observable<Integer> just1 = Observable.just(1, 2, 3);
    Observable<Integer> just2 = Observable.just(1, 2, 3);

    Observable.sequenceEqual(just1, just2)
            .subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    System.out.println("onNext=" + aBoolean);
                }
            });
}
复制代码

输出

onNext=true
复制代码

六、skipUntil/skipWhile

skipUtils操做符是在两个事件流发射的时候,第一个事件流会等到第二个事件流开始发射的时候,第一个事件流才开始发射出数据项,它会忽略以前发射过的数据项,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行

public void skipUntil(){
    Observable<Long> just1 = Observable.interval(1, TimeUnit.SECONDS);
    Observable<Integer> just2 = Observable.just(8).delay(3, TimeUnit.SECONDS);

    just1.skipUntil(just2)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("onNext=" + aLong);
                }
            });
}
复制代码

输出

onNext=2
onNext=3
onNext=4
onNext=5
......
复制代码

skipWhile操做符是在一个事件流中,从第一项数据项开始判断是否符合某个特定条件,若是判断值返回true,则不发射该数据项,继续从下一个数据项执行一样的判断,直到某个数据项的判断值返回false时,则终止判断,发射剩余的全部数据项。须要注意的是,这里只要一次判断为false则后面的全部数据项都不判断

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5)
            .skipWhile(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    return integer < 3;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

onNext=3
onNext=4
onNext=5
复制代码

七、takeUntil/takeWhile

takeUntil操做符跟skipUntil相似,skip表示跳过的意思,而take表示取值的意思,takeUntil操做符是在两个事件流发射的时候,第一个事件流会等到第二个事件流开始发射的时候,第一个事件流中止发射数据项,它会忽略以后的数据项,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行

public void takeUntil(){
    Observable<Long> just1 = Observable.interval(1, TimeUnit.SECONDS);
    Observable<Integer> just2 = Observable.just(8).delay(3, TimeUnit.SECONDS);

    just1.takeUntil(just2)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("onNext=" + aLong);
                }
            });
}
复制代码

输出

onNext=0
onNext=1
复制代码

takeWhile操做符是在一个事件流中,从第一项数据项开始判断是否符合某个特定条件,若是判断值返回true,则发射该数据项,继续从下一个数据项执行一样的判断,直到某个数据项的判断值返回false时,则终止判断,且剩余的全部数据项不会发射。须要注意的是,这里只要一次判断为false则后面的全部数据项都不判断

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 0)
            .takeWhile(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    return integer < 3;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}
复制代码

输出

onNext=1
onNext=2
复制代码

八、小结

  1. all():对全部数据项进行校验
  2. contains():全部数据项是否包含指定数据项
  3. amb():多个事件流中,只发射最早发出的事件流
  4. defaultIfEmpty():若是数据流为空则发射默认数据项
  5. sequenceEqual():判断两个数据流是否彻底相等
  6. skipUntil():当两个事件流发射时,第一个事件流的数据项会等到第二个事件流开始发射时才进行发射
  7. skipWhile():当发射的数据流达到某种条件时,才开始发射剩余全部数据项
  8. takeUntil():当两个事件流发射时,第一个事件流的数据项会等到第二个事件流开始发射时终止发射
  9. takeWhile():当发射的数据流达到某种条件时,才中止发射剩余全部数据项

3.8 Mathematical and Aggregate Operators(数学运算及聚合操做符)

数学运算操做符比较简单,对于数学运算操做符会放在小结中介绍,下面是对聚合操做符作介绍

一、reduce

reduce操做符跟scan操做符是同样的,会对发射的数据和上一轮发射的数据进行函数处理,并返回的数据供下一轮使用,持续这个过程来产生剩余的数据流。reduce与scan的惟一区别在于reduce只输出最后的结果,而scan会输出每一次的结果,这点从图片中也能看出来

public static void main(String[] args) {
    Observable.just(8, 2, 13, 1, 15).reduce(new BiFunction<Integer, Integer, Integer>() {
        @Override
        public Integer apply(Integer integer, Integer integer2) throws Exception {
            return integer < integer2 ? integer : integer2;
        }
    })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer item) throws Exception {
                    System.out.println("onNext=" + item);
                }
            });
}
复制代码

输出

onNext=1
复制代码

二、collect

collect操做符跟reduce操做符相似,只不过collect增长了一个可改变数据结构的函数供咱们处理

public static void main(String[] args) {
    Observable.just(8, 2, 13, 1, 15).collect(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "A";
        }
    }, new BiConsumer<String, Integer>() {
        @Override
        public void accept(String s, Integer integer) throws Exception {
            System.out.println("onNext=" + s + " " + integer);
        }
    }).subscribe(new BiConsumer<String, Throwable>() {
        @Override
        public void accept(String s, Throwable throwable) throws Exception {
            System.out.println("onNext2=" + s);
        }
    });
}
复制代码

输出

onNext=A  8
onNext=A  2
onNext=A  13
onNext=A  1
onNext=A  15
onNext2=A
复制代码

三、小结

数学运算操做符的使用须要在gradle中添加rxjava-math的依赖

implementation 'io.reactivex:rxjava-math:1.0.0'
复制代码
  1. average():求全部数据项的平均值
  2. max/min():求全部数据项的最大或最小值
  3. sum():求全部数据项的总和
  4. reduce():对上一轮处理事后的数据流进行函数处理,只返回最后的结果
  5. collect():对上一轮处理事后的数据流进行函数处理,可改变原始的数据结构

3.9 Connectable Observable(链接操做符)

一、publish

publish操做符是将普通的事件流转化成可链接的事件流ConnectableObservable,它与普通的事件流不同,ConnectableObservable在没有调用connect()进行链接的状况下,事件流是不会发射数据的

public static void main(String[] args) {
    ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();

    connectableObservable.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("onNext=" + integer);
        }
    });
}
复制代码

输出

复制代码

二、connect

connect操做符是将可链接的事件流进行链接并开始发射数据。这个方法须要注意的是,connect操做符必须在全部事件流被订阅后才开始发射数据。若是放在subscribe以前的话,则订阅者是没法收到数据的。若是后面还有订阅者将订阅这次事件流,则会丢失已经调用了connect后,发射出去的数据项

public static void main(String[] args) {
    ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();

    connectableObservable.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("onNext=" + integer);
        }
    });
    
    connectableObservable.connect();
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

三、refCount

refCount操做符能够将可链接的事件流转换成普通的事件流

public static void main(String[] args) {
    ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();

    connectableObservable.refCount().subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("onNext=" + integer);
        }
    });
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

四、replay

replay操做符将弥补connect操做符的缺陷,因为connect会让后面进行订阅的订阅者丢失以前发射出去的数据项,因此使用replay操做符能够将发射出去的数据项进行缓存,这样使得后面的订阅者均可以得到完整的数据项。这里须要注意的是,replay操做符不能和publish操做符同时使用,不然将不会发射数据。例子中,读者能够将replay操做符换成publish操做符,这时候的输出就会丢失前2秒发射的数据项

public void replay(){
    ConnectableObservable<Long> connectableObservable = Observable.interval(1, TimeUnit.SECONDS).replay();
    connectableObservable.connect();

    connectableObservable.delaySubscription(3, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("onNext=" + aLong);
                }
            });
}
复制代码

输出

onNext=0
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
......
复制代码

五、小结

  1. publish():将普通的事件流转换成可链接的事件流
  2. connect():将可链接的事件流进行链接并发射数据
  3. refCount():将可链接的事件流转换成普通的事件流
  4. replay():缓存可链接的事件流中的全部数据项
相关文章
相关标签/搜索