Rxjava2 可谓是平常开发中的利器,特别是在异步任务中更能发挥做用。响应式编程以及流式api的良好支持,给予了更好的编码体验。愈来愈多开发者渐渐用起来了。学习rxjava2最好的地方无外乎官方文档,详细且完整。如下结合官方文档和我本身的理解以及例子,解释各个操做符的用法,给各位以及我本身做一篇参考。java
要使用RxJava,须要先建立Observables(发出数据项),以各类方式转换这些Observable以获取所须要的精确数据项(经过使用Observable运算符),而后观察并响应这些须要的项目序列(经过实现观察者) 或者订阅者,而后将它们订阅到最终的变换后的Observables)。react
经过获取预先存在的对象并在订阅时将该特定对象发布给下游使用者来构造反应类型。为方便起见,存在2到9个参数的重载,这些对象(具备相同的常见类型)将按指定的顺序发出。就像From相似,但请注意From将传入一个数组或一个iterable或相似的东西来取出要发出的项目,而Just只是简单地发出数组或者迭代器。git
请注意,若是将null传递给Just,它将返回一个Observable,它将null做为项发出。不要错误地假设这将返回一个空的Observable(一个根本不发出任何项目)。为此,须要使用Empty运算符。 github
![]()
fun testOpJust() {
val arr = arrayOf("mary", "tom", "ben", "lisa", "ken")
Observable.fromArray(arr).filter { it.size > 3 }.map { it + "s" }.subscribe(System.out::println)
val list = arrayListOf("mary", "tom", "ben", "lisa", "ken")
Observable.just(list).forEach { it -> System.out.println(it + "s") }
list.stream().filter { it -> it.length > 3 }.map { "$it s" }.forEach(System.out::println)
}
复制代码
根据预先存在的源或生成器类型构造序列。当使用Observable时,若是使用的全部数据均可以表示为Observables,而不是Observables和其余类型的混合,则能够更方便。这容许使用一组运算符来控制数据流的整个生命周期。例如,Iterables能够被认为是一种的Observable;做为一种始终只发出单一项目的Observable。经过将这些对象显式转换为Observable,能够将它们做为对等体与其余Observable进行交互。所以,大多数ReactiveX实现都具备容许将特定于语言的对象和数据结构转换为Observable的方法。编程
注意:这些静态方法使用后缀命名约定(即,在方法名称中重复参数类型)以免重载解析模糊。api
从java.lang.Iterable源(例如Lists,Sets或Collections或custom Iterables)发出信号,而后完成序列。数组
可用于 Flowable ,Observablebash
发信号通知给定数组的元素,而后完成序列。 可用于Flowable,Observable数据结构
注意:RxJava不支持原始数组,只支持(通用)引用数组。并发
fun testOpFrom(){
val list = arrayListOf<Int>(1,2,3,4,5,6)
Observable.fromIterable(list).subscribe(System.out::println)
Observable.fromArray(1,2,3,4,5,6).subscribe(System.out::println)
}
复制代码
当消费者订阅时,调用给定的java.util.concurrent.Callable并将其返回值(或抛出的异常)转发给该使用者。
可用于:Observable,Flowable,Maybe,Single,Completable
备注:在Completable中,忽略实际返回值,而且Completable完成。
Observable.fromCallable<String> {
"hello"
}.subscribe(System.out::println)
Completable.fromCallable{
"complatable from callable"
}.subscribe {
System.out.println("complete")
}
复制代码
当消费者订阅时,调用给定的io.reactivex.function.Action而且消费者完成或接收Action抛出的异常。
可用于: Maybe,Completable
Maybe.fromAction<String>{
System.out.println("maybe from action")
}.subscribe(System.out::println)
复制代码
如下标星先很少作解释,用得很少
将另外一种反应类型包裹或转换为目标反应类型。具备如下签名模式的各类反应类型中提供如下组合:targetType.from {sourceType}()
*注意:并不是全部可能的转换都是经过from {reactive type}方法系列实现的。查看to {reactive type}方法系列以得到进一步的转换可能性。
注意:fromAction和fromRunnable之间的区别在于Action接口容许抛出已受检的异常,而java.lang.Runnable则否则。
可用于Observable,Flowable,Maybe,Single,Completable
经过java.util.concurrent.Callable向消费者发出预先存在或生成的错误信号。
fun testOpError(){
Observable.error<Throwable>(IOException(""))
.subscribe({
System.out.print("不会打印吧")
},{
it.printStackTrace()
},{
System.out.println("也不会打印")
})
}
复制代码
一个典型的用例是使用onErrorResumeNext有条件地映射或抑制链中的异常:
/**
* 抑制链上发生的异常
*/
@Test
fun testOpOnErrorResumeNext() {
val observable = Observable.fromCallable {
if (Math.random() < 0.5f) {
throw IllegalArgumentException()
}
throw IOException()
}
observable.onErrorResumeNext(Function {
if (it is IllegalArgumentException) {
Observable.empty()
} else {
Observable.error(it)
}
}).subscribe({
System.out.println("nothing")
},{
it.printStackTrace()
},{
System.out.println("empty")
})
}
复制代码
这个onErrorResumeNext 厉害了,能够说以前一直不太明白怎么很好的处理。经过此操做符能够抑制错误的传递,原本若是subscribe发生了错误会触发onError回调。事实上可能发生了错误,须要不处理或者抑制产生。在onErrorResumeNext的function参数中,能够根据错误类型返回处理流程。
示例可见onErrorResumeNext的例子
empty发送直接表示完成,就是订阅者直接调用onComplete回调。onNext 不会执行
可用于Observable,Flowable,Maybe,Single,Completable
不会对订阅者的任何回调进行调用。禁用也可理解,好比发送了错误,都不往下执行
可用于Observable,Flowable
fun testOpInterval(){
Observable.interval(1,TimeUnit.SECONDS)
.onErrorResumeNext(Function {
Observable.error(it)
})
.subscribe({
if (it.rem(5) == 0L) {
System.out.println("tick")
} else {
System.out.println("tock")
}
},{
it.printStackTrace()
},{
System.out.println("interval complete")
})
}
复制代码
也就是说在给定的时间以后发送事件
可用于 Observable,Flowable
fun testOpRange(){
val s = "test range operation now"
Observable.range(0,s.length- 3)
.map { "${s[it]} in range"}
.subscribe {
System.out.println(it)
}
}
复制代码
发出一系列值,参数为起点,和长度。
可用于Observable,Flowable
@Test
fun testOpGenerate(){
val start = 1
val increaseValue = 2
Observable.generate<Int,Int>(Callable<Int> {
start
}, BiFunction<Int, Emitter<Int>,Int> {
t1, t2 ->
t2.onNext(t1 + increaseValue)
t1 + increaseValue
}).subscribe {
System.out.println("generate value : $it")
}
}
复制代码
不太明白干啥的,具体应用场景。只是一直不间断的产生值
过滤操做是很是经常使用且重要的,并且相关的操做符也不少
可用于Observable,Flowable
删除响应源发出的项目,在给定的超时值到期以前,这些项目后面跟着更新的项目。计时器重置每次发射。此运算符会跟踪最近发出的项目,而且仅在有足够的时间过去而没有源发出任何其余项目时才会发出此项目。
按照我得理解就是debounde传入了超时值,在该时间以内若是屡次发射,取离超时值最近得值。既然又超时那么也应该又开始时间,开始时间就是一组发射最开始值得时间,这一组发射得值的时的差是在debounce超时时间以内。
// Diagram:
// -A--------------B----C-D-------------------E-|---->
// a---------1s
// b---------1s
// c---------1s
// d---------1s
// e-|---->
// -----------A---------------------D-----------E-|-->
fun testOpDebounce(){
Observable.create<String>{
it.onNext("A")
Thread.sleep(1_500)
it.onNext("B")
Thread.sleep(500)
it.onNext("C")
Thread.sleep(250)
it.onNext("D")
Thread.sleep(2_000)
it.onNext("E")
}.debounce(1,TimeUnit.SECONDS)
.subscribe(System.out::println)
}
复制代码
可用于Observable Flowable 经过仅发出与先前项目相比不一样的项目来过滤反应源。能够指定io.reactivex.functions.Function,将源发出的每一个项目映射到一个新值中,该值将用于与先前的映射值进行比较。Distinct运算符经过仅容许还没有发出的项目来过滤Observable。在一些实现中,存在容许调整两个项被视为“不一样”的标准的变体。在一些实施例中,存在操做符的变体,其仅将项目与其前一个项目进行比较以得到更精确的比较,从而仅过滤连续的重复项目,序列中的项目。
fun testOpDistinct(){
Observable.fromArray(1,2,3,3,4,5)
.distinct()
.subscribe(System.out::println)
// 用来过滤序列中一组值先后是否相同得值
Observable.fromArray(1,1,2,3,2)
.distinct { "呵呵" }
.subscribe(System.out::println)
}
复制代码
重载的方法,传入keySelectro ,做用是对每一个元素应用方法获得得新得值,再决定怎么去重
可用于Observable Flowable
经过仅发出与其前一个元素相比较不一样的项目来过滤反应源。能够指定io.reactivex.functions.Function,将源发出的每一个项目映射到一个新值中,该值将用于与先前的映射值进行比较。或者,能够指定io.reactivex.functions.BiPredicate做为比较器函数来比较前一个。
Observable.fromArray(1,2,3,3,4,5)
// .distinctUntilChanged()
.distinctUntilChanged { t1, t2 ->
t1 == t2
}
.subscribe(System.out::println)
复制代码
能够说是distinct的增强版,多了一个能够传入比较器的重载方法
课用于Flowable,Observable 在来自反应源的一系列发射的数据项中,以指定的从零开始的索引起出单个项目。若是指定的索引不在序列中,则能够指定将发出的默认项。
简单说就是按照发出项的次序获取指定的位置的元素
Observable.fromArray(1,2,3,3,4,5)
.elementAt(2)
.subscribe(System.out::println)
复制代码
可用于Observable,Flowable,Maybe,Single 经过仅发出知足指定函数的项来过滤由反应源发出的项。
过滤偶数
Observable.fromArray(1,2,3,3,4,5)
.filter {
it.rem(2) == 0
}
.subscribe(System.out::println)}
复制代码
可用于Flowable,Observable 仅发出反应源发出的第一个项目,或者若是源完成而不发出项目则发出给定的默认项目。这与firstElement的不一样之处在于此运算符返回Single,而firstElement返回Maybe。
Observable.fromArray(1,2,3,3,4,5)
.first(-1)
.subscribe(Consumer<Int> {
System.out.println("onNext :$it")
})
Observable.fromArray(1,2,3,3,4,5)
.firstElement()
.subscribe {
System.out.println("onNext :$it")
}
复制代码
仅发出响应源发出的第一个项目,或者若是源完成而不发出项目则发出java.util.NoSuchElementException信号。
可用于Maybe Single 忽略Single或Maybe源发出的单个项目,并返回一个Completable,它仅从源中发出错误或完成事件的信号。
Maybe.timer(1L,TimeUnit.SECONDS)
.ignoreElement()
.doOnComplete {
System.out.println("done")
}
.blockingAwait()
复制代码
忽略Single或Maybe源发出的单个项目,并返回一个Completable,它仅从源中发出错误或完成事件的信号。
Observable.timer(1L,TimeUnit.SECONDS)
.ignoreElements()
.doOnComplete {
System.out.println("completed")
}
.blockingAwait()
复制代码
可用于Observable,Flowable
仅发出反应源发出的最后一个项目,或者若是源完成而不发出项目则发出给定的默认项目。这与lastElement的不一样之处在于此运算符返回Single,而lastElement返回Maybe。
Observable.fromArray(1,2,3,3,4,5)
.last(-1)
.subscribe(Consumer<Int>{
System.out.println("last $it")
})
复制代码
Observable.fromArray(1,2,3,3,4,5)
.lastElement()
.subscribe(Consumer<Int>{
System.out.println("last $it")
})
复制代码
仅发出响应源发出的最后一项,或者若是源完成而不发出项,则发出java.util.NoSuchElementException信号。
可用于Flowable,Observable,Maybe 经过仅发出指定类型的项目来过滤反应源发出的项目。
Observable.fromArray(1,2.1f,3,3,4,5)
.ofType(Int::class.java)
.subscribe(Consumer<Int>{
System.out.println("last $it")
})
复制代码
可用于Observable Flowable 经过仅在周期性时间间隔内发出最近发出的项目来过滤反应源发出的项目。
Observable.create<String> {
it.onNext("A")
Thread.sleep(1_000)
it.onNext("B")
Thread.sleep(300)
it.onNext("C")
Thread.sleep(700)
it.onNext("D")
it.onComplete()
}.sample(1,TimeUnit.SECONDS)
.blockingSubscribe(System.out::println)
复制代码
删除响应源发出的前n个项目,并发出剩余项目。您能够经过使用Skip运算符修改Observable来忽略Observable发出的前n个项目,并仅参加以后的项目。
Observable.fromArray("hehe",2.1f,3,3,4,5)
// .ofType(String::class.java)
.skip(3)
.subscribe {
System.out.println(it)
}
复制代码
丢弃反应源发出的最后n个项目,并发出剩余的项目。
可用于Flowable Observable 仅发出反应源发出的前n项。
Observable.fromArray("hehe",2.1f,3,3,4,5)
.take(2)
.subscribe(System.out::println)
复制代码
可用于Flowable Observable 仅发出反应源发出的最后n个项目。
可用于Flowable Observable
跟debounce有些类似,是取时间范围内第一个,在点击事件过滤很经常使用
在指定持续时间的连续时间窗口期间仅发出由反应源发出的第一个项目。
Observable.create<String> {
it.onNext("A")
Thread.sleep(300)
it.onNext("B")
Thread.sleep(400)
}.throttleFirst(1,TimeUnit.SECONDS)
.subscribe(System.out::println)
复制代码
可用于Observable,Flowable 在指定持续时间的连续时间期间仅发出由反应源发出的最后一个项目。跟throttleFirst相反,取最后一个值
跟debounce的别名
public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
return debounce(timeout, unit);
}
复制代码
从Observable或Flowable源发出项目,但若是在从上一项开始的指定超时持续时间内未发出下一项,则以java.util.concurrent.TimeoutException终止。对于Maybe,Single和Completable,指定的超时持续时间指定等待成功或完成事件到达的最长时间。若是Maybe,Single或Completable在给定时间内没有完成,将发出java.util.concurrent.TimeoutException。
Observable.create<String>{
it.onNext("A")
Thread.sleep(600)
it.onNext("B")
Thread.sleep(1_500)
it.onNext("C")
Thread.sleep(500)
}.subscribeOn(Schedulers.io())
.subscribe({
System.out.println(it)
},{
it.printStackTrace()
})
复制代码
一下为Kotlin编写的代码,能够看到在发生错误的状况下,经过onError() 抛出了错误,而且须要在订阅者,第二个参数传入,处理错误的回调。
fun testErrorHandle() {
Observable.create<String> {
it.onNext("start")
Thread {
try {
System.out.println("start open ...")
it.onNext("start open ...")
val stream = URL("https://www.baidu.com").openStream()
System.out.println("after url ...")
it.onNext("after url")
val br = stream.bufferedReader()
if (!it.isDisposed) {
var text = br.readText()
it.onNext(text)
}
stream.close()
br.close()
it.onNext("after open ...")
if (!it.isDisposed) {
it.onComplete()
}
}catch (e : java.lang.Exception) {
System.out.println(e)
e.printStackTrace()
it.onError(e)
}
}.start()
}.subscribe(System.out::println) {
it.printStackTrace()
System.out.println("what the fuck")
}
}
复制代码
Observable一般不会抛出异常。相反,它会经过使用onError通知终止Observable序列来通知任何观察者发生了不可恢复的错误。
这有一些例外。例如,若是onError()调用自己失败,Observable将不会尝试经过再次调用onError来通知观察者,但会抛出RuntimeException,OnErrorFailedException或OnErrorNotImplementedException。
所以,不是捕获异常,而是观察者或操做者应该更一般地响应异常的onError通知。还有各类Observable运算符可用于对来自Observable的onError通知做出反应或从中恢复。例如,可使用运算符:
可使用错误处理运算符中描述的运算符来实现这些策略。
吞下的意思,应该是不处理异常
CompositeException 这代表发生了多个异常。可使用异常的getExceptions()方法来检索构成组合的各个异常。
MissingBackpressureException 这表示试图将过多发出数据项应用于它的Observable。有关背压(github.com/ReactiveX/R…)的Observable的解决方法,请参阅Backpressure。
OnErrorFailedException 这代表Observable试图调用其观察者的onError()方法,但该方法自己引起了异常。
OnErrorNotImplementedException 这代表Observable试图调用其观察者的onError()方法,可是没有这样的方法存在。能够经过修复Observable以使其再也不达到错误条件,经过在观察者中实现onError处理程序,或经过使用本页其余地方描述的其中一个运算符到达观察者以前截获onError通知来消除此问题。。
OnErrorThrowable 观察者将这种类型的throwable传递给他们的观察者的onError()处理程序。此变量的Throwable包含有关错误的更多信息以及错误发生时系统的Observable特定状态,而不是标准Throwable。