每一个Android开发者,都是爱RxJava的,简洁线程切换和多网络请求合并,再配合Retrofit,简直是APP开发的福音。不知不觉,RxJava一路走来,已经更新到第三大版本了。不像RxJava 2对RxJava 1那么残忍,RxJava 3对RxJava 2的兼容性仍是挺好的,目前并无作出很大的更改。RxJava2到2020年12月31号再也不提供支持,错误的会同时在2.x和3.x修复,但新功能只会在3.x上添加。java
同时,但愿经过本文,能知道垃圾箱颜色分类。react
做为尝鲜,赶忙品尝吧。git
implementation "io.reactivex.rxjava3:rxjava:3.0.0-RC0"
复制代码
很差意思哦,还没看到RxAndroid出3.0,这就很尴尬了...github
在RxJava,数据以流的方式组织。也就是说,Rxjava包括一个源的数据流,数据流后跟着消费者的零个到多个消费数据流步骤。windows
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
复制代码
在上文代码中,对于operator2来讲,在它前面叫作上流,在它后面的叫作下流。憋住,别笑,真的是下流来的。缓存
在RxJava的文档中,emission, emits, item, event, signal, data and message都被认为在数据流中被传递的数据对象。bash
当数据流经过异步的步骤执行时,这些步骤的执行速度可能不一致。也就是说上流数据发送太快,下流没有足够的能力去处理。为了不这种状况,通常要么缓存上流的数据,要么抛弃数据。但这种处理方式,有时会带来很大的问题。为此,RxJava带来了backpressure的概念。背压是一种流量的控制步骤,在不知道上流还有多少数据的情形下控制内存的使用,表示它们还能处理多少数据。网络
支持背压的有Flowable类,不支持背压的有Observable,Single, Maybe and Completable类。并发
对于咱们Android开发来讲,最喜欢的就是它简洁切换线程的操做。RxJava经过调度器来方便线程的切换。app
在不一样平台还有不一样的调度器,例如Android的主线程:AndroidSchedulers.mainThread()
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);
复制代码
在 RxJava 3 能够发现有如下几个基类(跟RxJava 2是一致的吧):
下文关于操做符内容太多了
等须要了,再来查阅
下班时间仍是好好护发吧
指定观察者的线程,例如在Android访问网络后,数据须要主线程消费,那么将观察者的线程切换到主线就须要ObserveOn操做符。每次指定一次都会生效。
指定被观察者的线程,即数据源发生的线程。例如在Android访问网络时,须要将线程切换到子线程。屡次指定只有第一次有效。
数据源(Observable)每发送一次数据,就调用一次。
数据源每次调用onNext() 以前都会先回调该方法。
数据源每次调用onError() 以前会回调该方法。
数据源每次调用onComplete() 以前会回调该方法
数据源每次调用onSubscribe() 以后会回调该方法
数据源每次调用dispose() 以后会回调该方法
其余的见官网吧,不难
主要讲对数据源进行选择和过滤的经常使用操做符
能够做用于Flowable,Observable,表示源发射数据前,跳过多少个。例以下面跳过前四个:
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.skip(4)
.subscribe(System.out::print);
打印结果:5678910
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.skipLast(4)
.subscribe(System.out::print);
打印结果:1 2 3 4 5 6
复制代码
skipLast(n)操做表示从流的尾部跳过n个元素。
可做用于Flowable,Observable。在Android开发,一般为了防止用户重复点击而设置标记位,而经过RxJava的debounce操做符能够有效达到该效果。在规定时间内,用户重复点击只有最后一次有效,
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(1_500);
emitter.onNext("B");
Thread.sleep(500);
emitter.onNext("C");
Thread.sleep(250);
emitter.onNext("D");
Thread.sleep(2_000);
emitter.onNext("E");
emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
.debounce(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.print(item+" "),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));
打印:A D E onComplete
复制代码
上文代码中,数据源以必定的时间间隔发送A,B,C,D,E。操做符debounce的时间设为1秒,发送A后1.5秒并无发射其余数据,因此A能成功发射。发射B后,在1秒以内,又发射了C和D,在D以后的2秒才发射E,全部B、C都失效,只有D有效;而E以后已经没有其余数据流了,全部E有效。
可做用于Flowable,Observable,去掉数据源重复的数据。
Observable.just(2, 3, 4, 4, 2, 1)
.distinct()
.subscribe(System.out::print);
// 打印:2 3 4 1
Observable.just(1, 1, 2, 1, 2, 3, 3, 4)
.distinctUntilChanged()
.subscribe(System.out::print);
//打印:1 2 1 2 3 4
复制代码
distinctUntilChanged()去掉相邻重复数据。
可做用于Flowable,Observable,从数据源获取指定位置的元素,从0开始。
Observable.just(2,4,3,1,5,8)
.elementAt(0)
.subscribe(integer ->
Log.d("TAG","elmentAt->"+integer));
打印:2
Observable<String> source = Observable.just("Kirk", "Spock", "Chekov", "Sulu");
Single<String> element = source.elementAtOrError(4);
element.subscribe(
name -> System.out.println("onSuccess will not be printed!"),
error -> System.out.println("onError: " + error));
打印:onSuccess will not be printed!
复制代码
elementAtOrError:指定元素的位置超过数据长度,则发射异常。
可做用于 Flowable,Observable,Maybe,Single。在filter中返回表示发射该元素,返回false表示过滤该数据。
Observable.just(1, 2, 3, 4, 5, 6)
.filter(x -> x % 2 == 0)
.subscribe(System.out::print);
打印:2 4 6
复制代码
做用于 Flowable,Observable。发射数据源第一个数据,若是没有则发送默认值。
Observable<String> source = Observable.just("A", "B", "C");
Single<String> firstOrDefault = source.first("D");
firstOrDefault.subscribe(System.out::println);
打印:A
Observable<String> emptySource = Observable.empty();
Single<String> firstOrError = emptySource.firstOrError();
firstOrError.subscribe(
element -> System.out.println("onSuccess will not be printed!"),
error -> System.out.println("onError: " + error));
打印:onError: java.util.NoSuchElementException
复制代码
和firstElement的区别是first返回的是Single,而firstElement返回Maybe。firstOrError在没有数据会返回异常。
last、lastElement、lastOrError与fist、firstElement、firstOrError相对应。
Observable<String> source = Observable.just("A", "B", "C");
Single<String> lastOrDefault = source.last("D");
lastOrDefault.subscribe(System.out::println);
//打印:C
Observable<String> source = Observable.just("A", "B", "C");
Maybe<String> last = source.lastElement();
last.subscribe(System.out::println);
//打印:C
Observable<String> emptySource = Observable.empty();
Single<String> lastOrError = emptySource.lastOrError();
lastOrError.subscribe(
element -> System.out.println("onSuccess will not be printed!"),
error -> System.out.println("onError: " + error));
// 打印:onError: java.util.NoSuchElementException
复制代码
ignoreElements 做用于Flowable、Observable。ignoreElement做用于Maybe、Single。二者都是忽略掉数据,返回完成或者错误时间。
Single<Long> source = Single.timer(1, TimeUnit.SECONDS);
Completable completable = source.ignoreElement();
completable.doOnComplete(() -> System.out.println("Done!"))
.blockingAwait();
// 1秒后打印:Donde!
Observable<Long> source = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
Completable completable = source.ignoreElements();
completable.doOnComplete(() -> System.out.println("Done!"))
.blockingAwait();
// 五秒后打印:Done!
复制代码
做用于Flowable、Observable、Maybe、过滤掉类型。
Observable<Number> numbers = Observable.just(1, 4.0, 3, 2.71, 2f, 7);
Observable<Integer> integers = numbers.ofType(Integer.class);
integers.subscribe((Integer x) -> System.out.print(x+" "));
//打印:1 3 7
复制代码
做用于Flowable、Observable,在一个周期内发射最新的数据。
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
.sample(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.print(item+" "),
Throwable::printStackTrace,
() -> System.out.print("onComplete"));
// 打印: C D onComplete
复制代码
与debounce的区别是,sample是以时间为周期的发射,一秒又一秒内的最新数据。而debounce是最后一个有效数据开始。
做用于Flowable、Observable。throttleLast与smaple一致,而throttleFirst是指定周期内第一个数据。throttleWithTimeout与debounce一致。
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
.throttleFirst(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.print(item+" "),
Throwable::printStackTrace,
() -> System.out.print(" onComplete"));
//打印:A D onComplete
source.subscribeOn(Schedulers.io())
.throttleLast(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.print(item+" "),
Throwable::printStackTrace,
() -> System.out.print(" onComplete"));
// 打印:C D onComplete
复制代码
之因此拿出来单独说,我看不懂官网的解释。而后看别人的文章:throttleFirst+throttleLast的组合?开玩笑的吧。我的理解是:若是源的第一个数据总会被发射,而后开始周期计时,此时的效果就会跟throttleLast一致。
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(200);
emitter.onNext("D");
Thread.sleep(400);
emitter.onNext("E");
Thread.sleep(400);
emitter.onNext("F");
Thread.sleep(400);
emitter.onNext("G");
Thread.sleep(2000);
emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
.throttleLatest(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> Log.e("RxJava",item),
Throwable::printStackTrace,
() -> Log.e("RxJava","finished"));
复制代码
打印结果:
做用于Flowable、Observable,take发射前n个元素;takeLast发射后n个元素。
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.take(4)
.subscribe(System.out::print);
//打印:1 2 3 4
source.takeLast(4)
.subscribe(System.out::println);
//打印:7 8 9 10
复制代码
做用于Flowable、Observable、Maybe、Single、Completabl。后一个数据发射未在前一个元素发射后规定时间内发射则返回超时异常。
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(800);
emitter.onNext("B");
Thread.sleep(400);
emitter.onNext("C");
Thread.sleep(1200);
emitter.onNext("D");
emitter.onComplete();
});
source.timeout(1, TimeUnit.SECONDS)
.subscribe(
item -> System.out.println("onNext: " + item),
error -> System.out.println("onError: " + error),
() -> System.out.println("onComplete will not be printed!"));
// 打印:
// onNext: A
// onNext: B
// onNext: C
// onError: java.util.concurrent.TimeoutException:
The source did not signal an event for 1 seconds
and has been terminated.
复制代码
经过链接操做符,将多个被观察数据(数据源)链接在一块儿。
可做用于Flowable、Observable。将指定数据源合并在另外数据源的开头。
Observable<String> names = Observable.just("Spock", "McCoy");
Observable<String> otherNames = Observable.just("Git", "Code","8");
names.startWith(otherNames).subscribe(item -> Log.d(TAG,item));
//打印:
RxJava: Git
RxJava: Code
RxJava: 8
RxJava: Spock
RxJava: McCo
复制代码
可做用全部数据源类型,用于合并多个数据源到一个数据源。
Observable<String> names = Observable.just("Hello", "world");
Observable<String> otherNames = Observable.just("Git", "Code","8");
Observable.merge(names,otherNames).subscribe(name -> Log.d(TAG,name));
//也能够是
//names.mergeWith(otherNames).subscribe(name -> Log.d(TAG,name));
//打印:
RxJava: Hello
RxJava: world
RxJava: Git
RxJava: Code
RxJava: 8
复制代码
merge在合并数据源时,若是一个合并发生异常后会当即调用观察者的onError方法,并中止合并。可经过mergeDelayError操做符,将发生的异常留到最后处理。
Observable<String> names = Observable.just("Hello", "world");
Observable<String> otherNames = Observable.just("Git", "Code","8");
Observable<String> error = Observable.error(
new NullPointerException("Error!"));
Observable.mergeDelayError(names,error,otherNames).subscribe(
name -> Log.d(TAG,name), e->Log.d(TAG,e.getMessage()));
//打印:
RxJava: Hello
RxJava: world
RxJava: Git
RxJava: Code
RxJava: 8
RxJava: Error!
复制代码
可做用于Flowable、Observable、Maybe、Single。将多个数据源的数据一个一个的合并在一块儿哇。当其中一个数据源发射完事件以后,若其余数据源还有数据未发射完毕,也会中止。
Observable<String> names = Observable.just("Hello", "world");
Observable<String> otherNames = Observable.just("Git", "Code", "8");
names.zipWith(otherNames, (first, last) -> first + "-" + last)
.subscribe(item -> Log.d(TAG, item));
//打印:
RxJava: Hello-Git
RxJava: world-Code
复制代码
可做用于Flowable, Observable。在结合不一样数据源时,发射速度快的数据源最新item与较慢的相结合。 以下时间线,Observable-1发射速率快,发射了65,Observable-2才发射了C, 那么二者结合就是C5。
一个发射多个小数据源的数据源,这些小数据源发射数据的时间发生重复时,取最新的数据源。
变化数据源的数据,并转化为新的数据源。
做用于Flowable、Observable。指将数据源拆解含有长度为n的list的多个数据源,不够n的成为一个数据源。
Observable.range(0, 10)
.buffer(4)
.subscribe((List<Integer> buffer) -> System.out.println(buffer));
// 打印:
// [0, 1, 2, 3]
// [4, 5, 6, 7]
// [8, 9]
复制代码
做用于Flowable、Observable、Maybe、Single。将数据元素转型成其余类型,转型失败会抛出异常。
Observable<Number> numbers = Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5);
numbers.filter((Number x) -> Integer.class.isInstance(x))
.cast(Integer.class)
.subscribe((Integer x) -> System.out.println(x));
// prints:
// 1
// 7
// 12
// 5
复制代码
做用于Flowable、Observable、Maybe。将数据源的元素做用于指定函数后,将函数的返回值有序的存在新的数据源。
Observable.range(0, 5)
.concatMap(i -> {
long delay = Math.round(Math.random() * 2);
return Observable.timer(delay, TimeUnit.SECONDS).map(n -> i);
})
.blockingSubscribe(System.out::print);
// prints 01234
复制代码
与concatMap做用相同,只是将过程发送的全部错误延迟到最后处理。
Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
.concatMapDelayError(x -> {
if (x.equals(1L)) return Observable.error(new IOException("Something went wrong!"));
else return Observable.just(x, x * x);
})
.blockingSubscribe(
x -> System.out.println("onNext: " + x),
error -> System.out.println("onError: " + error.getMessage()));
// prints:
// onNext: 2
// onNext: 4
// onNext: 3
// onNext: 9
// onError: Something went wrong!
复制代码
做用于Flowable、Observable。与contactMap相似,不过应用于函数后,返回的是CompletableSource。订阅一次并在全部CompletableSource对象完成时返回一个Completable对象。
Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletable(x -> {
return Completable.timer(x, TimeUnit.SECONDS)
.doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
});
completable.doOnComplete(() -> System.out.println("Info: Processing of all items completed"))
.blockingAwait();
// prints:
// Info: Processing of item "2" completed
// Info: Processing of item "1" completed
// Info: Processing of item "3" completed
// Info: Processing of all items completed
复制代码
与concatMapCompletable做用相同,只是将过程发送的全部错误延迟到最后处理。
Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletableDelayError(x -> {
if (x.equals(2)) {
return Completable.error(new IOException("Processing of item \"" + x + "\" failed!"));
} else {
return Completable.timer(1, TimeUnit.SECONDS)
.doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
}
});
completable.doOnError(error -> System.out.println("Error: " + error.getMessage()))
.onErrorComplete()
.blockingAwait();
// prints:
// Info: Processing of item "1" completed
// Info: Processing of item "3" completed
// Error: Processing of item "2" failed!
复制代码
做用于Flowable、Observable、Maybe、Single。与contactMap相似,只是contactMap的数据发射是有序的,而flatMap是无序的。
Observable.just("A", "B", "C")
.flatMap(a -> {
return Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
.map(b -> '(' + a + ", " + b + ')');
})
.blockingSubscribe(System.out::println);
// prints (not necessarily in this order):
// (A, 1)
// (C, 1)
// (B, 1)
// (A, 2)
// (C, 2)
// (B, 2)
// (A, 3)
// (C, 3)
// (B, 3)
复制代码
太多了,减小篇幅,你们感兴趣本身查阅官网吧。功能与flatMap和contactMap相似。
做用于Maybe、Single,将其转化为Flowable,或Observable。
Single<Double> source = Single.just(2.0);
Flowable<Double> flowable = source.flattenAsFlowable(x -> {
return List.of(x, Math.pow(x, 2), Math.pow(x, 3));
});
flowable.subscribe(x -> System.out.println("onNext: " + x));
// prints:
// onNext: 2.0
// onNext: 4.0
// onNext: 8.0
复制代码
做用于Flowable、Observable。根据必定的规则对数据源进行分组。
Observable<String> animals = Observable.just(
"Tiger", "Elephant", "Cat", "Chameleon", "Frog", "Fish", "Turtle", "Flamingo");
animals.groupBy(animal -> animal.charAt(0), String::toUpperCase)
.concatMapSingle(Observable::toList)
.subscribe(System.out::println);
// prints:
// [TIGER, TURTLE]
// [ELEPHANT]
// [CAT, CHAMELEON]
// [FROG, FISH, FLAMINGO]
复制代码
做用于Flowable、Observable。对数据进行相关联操做,例如聚合等。
Observable.just(5, 3, 8, 1, 7)
.scan(0, (partialSum, x) -> partialSum + x)
.subscribe(System.out::println);
// prints:
// 0
// 5
// 8
// 16
// 17
// 24
复制代码
对数据源发射出来的数据进行收集,按照指定的数量进行分组,以组的形式从新发射。
Observable.range(1, 4)
// Create windows containing at most 2 items, and skip 3 items before starting a new window.
.window(2)
.flatMapSingle(window -> {
return window.map(String::valueOf)
.reduce(new StringJoiner(", ", "[", "]"), StringJoiner::add);
})
.subscribe(System.out::println);
// prints:
// [1, 2]
// [3, 4]
复制代码
做用于Flowable、Observable、Maybe、Single。但调用数据源的onError函数后会回到该函数,可对错误进行处理,而后返回值,会调用观察者onNext()继续执行,执行完调用onComplete()函数结束全部事件的发射。
Single.just("2A")
.map(v -> Integer.parseInt(v, 10))
.onErrorReturn(error -> {
if (error instanceof NumberFormatException) return 0;
else throw new IllegalArgumentException();
})
.subscribe(
System.out::println,
error -> System.err.println("onError should not be printed!"));
// prints 0
复制代码
与onErrorReturn相似,onErrorReturnItem不对错误进行处理,直接返回一个值。
Single.just("2A")
.map(v -> Integer.parseInt(v, 10))
.onErrorReturnItem(0)
.subscribe(
System.out::println,
error -> System.err.println("onError should not be printed!"));
// prints 0
复制代码
可做用于Flowable、Observable、Maybe。onErrorReturn发生异常时,回调onComplete()函数后再也不往下执行,而onExceptionResumeNext则是要在处理异常的时候返回一个数据源,而后继续执行,若是返回null,则调用观察者的onError()函数。
Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
e.onNext(4);
})
.onErrorResumeNext(throwable -> {
Log.d(TAG, "onErrorResumeNext ");
return Observable.just(4);
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete ");
}
});
复制代码
结果:
可做用于全部的数据源,当发生错误时,数据源重复发射item,直到没有异常或者达到所指定的次数。
boolean first=true;
Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
if (first){
first=false;
e.onError(new NullPointerException());
}
})
.retry(9)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete ");
}
});
复制代码
结果:
做用于Flowable、Observable、Maybe。与retry相似,但发生异常时,返回值是false表示继续执行(重复发射数据),true再也不执行,但会调用onError方法。
Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
e.onError(new NullPointerException());
e.onNext(3);
e.onComplete();
})
.retryUntil(() -> true)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete ");
}
});
复制代码
结果:
太多操做符太累了,看得心好累。仍是根据实际开发须要查阅文档才是正确的姿式。本文只是RxJava冰山一角,更多请参阅官网。同时不建议立马在项目上实践,给它点时间报bug。
若是你看到了这,点个赞,收下我双膝。若是文章有误,帮忙指正,谢谢大佬们。