这个页面列出了不少用于Observable的辅助操做符html
materialize( )
— 将Observable转换成一个通知列表convert an Observable into a list of Notificationsdematerialize( )
— 将上面的结果逆转回一个Observabletimestamp( )
— 给Observable发射的每一个数据项添加一个时间戳serialize( )
— 强制Observable按次序发射数据而且要求功能是无缺的cache( )
— 记住Observable发射的数据序列并发射相同的数据序列给后续的订阅者observeOn( )
— 指定观察者观察Observable的调度器subscribeOn( )
— 指定Observable执行任务的调度器doOnEach( )
— 注册一个动做,对Observable发射的每一个数据项使用doOnCompleted( )
— 注册一个动做,对正常完成的Observable使用doOnError( )
— 注册一个动做,对发生错误的Observable使用doOnTerminate( )
— 注册一个动做,对完成的Observable使用,不管是否发生错误doOnSubscribe( )
— 注册一个动做,在观察者订阅时使用doOnUnsubscribe( )
— 注册一个动做,在观察者取消订阅时使用finallyDo( )
— 注册一个动做,在Observable完成时使用delay( )
— 延时发射Observable的结果delaySubscription( )
— 延时处理订阅请求timeInterval( )
— 按期发射数据using( )
— 建立一个只在Observable生命周期存在的资源single( )
— 强制返回单个数据,不然抛出异常singleOrDefault( )
— 若是Observable完成时返回了单个数据,就返回它,不然返回默认数据toFuture( )
, toIterable( )
, toList( )
— 将Observable转换为其它对象或数据结构=========================================================java
Materialize
将数据项和事件通知都当作数据项发射,Dematerialize
恰好相反。react
一个合法的有限的Obversable将调用它的观察者的onNext
方法零次或屡次,而后调用观察者的onCompleted
或onError
正好一次。Materialize
操做符将这一系列调用,包括原来的onNext
通知和终止通知onCompleted
或onError
都转换为一个Observable发射的数据序列。缓存
RxJava的materialize
未来自原始Observable的通知转换为Notification
对象,而后它返回的Observable会发射这些数据。数据结构
materialize
默认不在任何特定的调度器 (Scheduler
) 上执行。多线程
Dematerialize
操做符是Materialize
的逆向过程,它将Materialize
转换的结果还原成它本来的形式。并发
dematerialize
反转这个过程,将原始Observable发射的Notification
对象还原成Observable的通知。异步
dematerialize
默认不在任何特定的调度器 (Scheduler
) 上执行。ide
给Observable发射的数据项附加一个时间戳函数
RxJava中的实现为timestamp
,它将一个发射T类型数据的Observable转换为一个发射类型为Timestamped<T>
的数据的Observable,每一项都包含数据的原始发射时间。
timestamp
默认在immediate
调度器上执行,可是能够经过参数指定其它的调度器。
强制一个Observable连续调用并保证行为正确
一个Observable能够异步调用它的观察者的方法,多是从不一样的线程调用。这可能会让Observable行为不正确,它可能会在某一个onNext
调用以前尝试调用onCompleted
或onError
方法,或者从两个不一样的线程同时调用onNext
方法。使用Serialize
操做符,你能够纠正这个Observable的行为,保证它的行为是正确的且是同步的。
RxJava中的实现是serialize
,它默认不在任何特定的调度器上执行。
保证全部的观察者收到相同的数据序列,即便它们在Observable开始发射数据以后才订阅
可链接的Observable (connectable Observable)与普通的Observable差很少,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect
操做符时才会开始。用这种方法,你能够在任什么时候候让一个Observable开始发射数据。
若是在将一个Observable转换为可链接的Observable以前对它使用Replay
操做符,产生的这个可链接Observable将老是发射完整的数据序列给任何将来的观察者,即便那些观察者在这个Observable开始给其它观察者发射数据以后才订阅。
RxJava的实现为replay
,它有多个接受不一样参数的变体,有的能够指定replay
的最大缓存数量,有的还能够指定调度器。
有一种 replay
返回一个普通的Observable。它能够接受一个变换函数为参数,这个函数接受原始Observable发射的数据项为参数,返回结果Observable要发射的一项数据。所以,这个操做符实际上是replay
变换以后的数据项。
指定一个观察者在哪一个调度器上观察这个Observable
不少ReactiveX实现都使用调度器 “Scheduler
”来管理多线程环境中Observable的转场。你可使用ObserveOn
操做符指定Observable在一个特定的调度器上发送通知给观察者 (调用观察者的onNext
,onCompleted
, onError
方法)。
注意:当遇到一个异常时ObserveOn
会当即向前传递这个onError
终止通知,它不会等待慢速消费的Observable接受任何以前它已经收到但尚未发射的数据项。这可能意味着onError
通知会跳到(并吞掉)原始Observable发射的数据项前面,正如图例上展现的。
SubscribeOn
操做符的做用相似,但它是用于指定Observable自己在特定的调度器上执行,它一样会在那个调度器上给观察者发通知。
RxJava中,要指定Observable应该在哪一个调度器上调用观察者的onNext
, onCompleted
, onError
方法,你须要使用observeOn
操做符,传递给它一个合适的Scheduler
。
指定Observable自身在哪一个调度器上执行
不少ReactiveX实现都使用调度器 “Scheduler
”来管理多线程环境中Observable的转场。你可使用SubscribeOn
操做符指定Observable在一个特定的调度器上运转。
ObserveOn
操做符的做用相似,可是功能颇有限,它指示Observable在一个指定的调度器上给观察者发通知。
在某些实现中还有一个UnsubscribeOn
操做符。
注册一个动做做为原始Observable生命周期事件的一种占位符
你能够注册回调,当Observable的某个事件发生时,Rx会在与Observable链关联的正常通知集合中调用它。Rx实现了多种操做符用于达到这个目的。
RxJava实现了不少Do
操做符的变体。
doOnEach
操做符让你能够注册一个回调,它产生的Observable每发射一项数据就会调用它一次。你能够以Action
的形式传递参数给它,这个Action接受一个onNext
的变体Notification
做为它的惟一参数,你也能够传递一个Observable给doOnEach
,这个Observable的onNext
会被调用,就好像它订阅了原始的Observable同样。
doOnNext
操做符相似于doOnEach(Action1)
,可是它的Action不是接受一个Notification
参数,而是接受发射的数据项。
示例代码
Observable.just(1, 2, 3) .doOnNext(new Action1<Integer>() { @Override public void call(Integer item) { if( item > 1 ) { throw new RuntimeException( "Item exceeds maximum value" ); } } }).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
输出
Next: 1 Error: Item exceeds maximum value
doOnSubscribe
操做符注册一个动做,当观察者订阅它生成的Observable它就会被调用。
doOnUnsubscribe
操做符注册一个动做,当观察者取消订阅它生成的Observable它就会被调用。
doOnCompleted
操做符注册一个动做,当它产生的Observable正常终止调用onCompleted
时会被调用。
doOnError
操做符注册一个动做,当它产生的Observable异常终止调用onError
时会被调用。
doOnTerminate
操做符注册一个动做,当它产生的Observable终止以前会被调用,不管是正常仍是异常终止。
finallyDo
操做符注册一个动做,当它产生的Observable终止以后会被调用,不管是正常仍是异常终止。
延迟一段指定的时间再发射来自Observable的发射物
Delay
操做符让原始Observable在发射每项数据以前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前总体平移了一个增量。
RxJava的实现是 delay
和delaySubscription
。
第一种delay
接受一个定义时长的参数(包括数量和单位)。每当原始Observable发射一项数据,delay
就启动一个定时器,当定时器过了给定的时间段时,delay
返回的Observable发射相同的数据项。
注意:delay
不会平移onError
通知,它会当即将这个通知传递给订阅者,同时丢弃任何待发射的onNext
通知。然而它会平移一个onCompleted
通知。
delay
默认在computation
调度器上执行,你能够经过参数指定使用其它的调度器。
另外一种delay
不实用常数延时参数,它使用一个函数针对原始Observable的每一项数据返回一个Observable,它监视返回的这个Observable,当任何那样的Observable终止时,delay
返回的Observable就发射关联的那项数据。
这种delay
默认不在任何特定的调度器上执行。
这个版本的delay
对每一项数据使用一个Observable做为原始Observable的延时定时器。
这种delay
默认不在任何特定的调度器上执行。
还有一个操做符delaySubscription
让你你能够延迟订阅原始Observable。它结合搜一个定义延时的参数。
delaySubscription
默认在computation
调度器上执行,你能够经过参数指定使用其它的调度器。
还有一个版本的delaySubscription
使用一个Obseable而不是一个固定的时长来设置订阅延时。
这种delaySubscription
默认不在任何特定的调度器上执行。
将一个发射数据的Observable转换为发射那些数据发射时间间隔的Observable
TimeInterval
操做符拦截原始Observable发射的数据项,替换为发射表示相邻发射物时间间隔的对象。
RxJava中的实现为timeInterval
,这个操做符将原始Observable转换为另外一个Obserervable,后者发射一个标志替换前者的数据项,这个标志表示前者的两个连续发射物之间流逝的时间长度。新的Observable的第一个发射物表示的是在观察者订阅原始Observable到原始Observable发射它的第一项数据之间流逝的时间长度。不存在与原始Observable发射最后一项数据和发射onCompleted
通知之间时长对应的发射物。
timeInterval
默认在immediate
调度器上执行,你能够经过传参数修改。
建立一个只在Observable生命周期内存在的一次性资源
Using
操做符让你能够指示Observable建立一个只在它的生命周期内存在的资源,当Observable终止时这个资源会被自动释放。
using
操做符接受三个参数:
当一个观察者订阅using
返回的Observable时,using
将会使用Observable工厂函数建立观察者要观察的Observable,同时使用资源工厂函数建立一个你想要建立的资源。当观察者取消订阅这个Observable时,或者当观察者终止时(不管是正常终止仍是因错误而终止),using
使用第三个函数释放它建立的资源。
using
默认不在任何特定的调度器上执行。
只发射第一项(或者知足某个条件的第一项)数据
若是你只对Observable发射的第一项数据,或者知足某个条件的第一项数据感兴趣,你可使用First
操做符。
在某些实现中,First
没有实现为一个返回Observable的过滤操做符,而是实现为一个在当时就发射原始Observable指定数据项的阻塞函数。在这些实现中,若是你想要的是一个过滤操做符,最好使用Take(1)
或者ElementAt(0)
。
在一些实现中还有一个Single
操做符。它的行为与First
相似,但为了确保只发射单个值,它会等待原始Observable终止(不然,不是发射那个值,而是以一个错误通知终止)。你可使用它从原始Observable获取第一项数据,并且也确保只发射一项数据。
在RxJava中,这个操做符被实现为first
,firstOrDefault
和takeFirst
。
可能容易混淆,BlockingObservable
也有名叫first
和firstOrDefault
的操做符,它们会阻塞并返回值,不是当即返回一个Observable。
还有几个其它的操做符执行相似的功能。
只发射第一个数据,使用没有参数的first
操做符。
示例代码
Observable.just(1, 2, 3) .first() .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
输出
Next: 1 Sequence complete.
传递一个谓词函数给first
,而后发射这个函数断定为true
的第一项数据。
firstOrDefault
与first
相似,可是在Observagle没有发射任何数据时发射一个你在参数中指定的默认值。
传递一个谓词函数给firstOrDefault
,而后发射这个函数断定为true
的第一项数据,若是没有数据经过了谓词测试就发射一个默认值。
takeFirst
与first
相似,除了这一点:若是原始Observable没有发射任何知足条件的数据,first
会抛出一个NoSuchElementException
,takeFist
会返回一个空的Observable(不调用onNext()
可是会调用onCompleted
)。
single
操做符也与first
相似,可是若是原始Observable在完成以前不是正好发射一次数据,它会抛出一个NoSuchElementException
。
single
的变体接受一个谓词函数,发射知足条件的单个值,若是不是正好只有一个数据项知足条件,会以错误通知终止。
和firstOrDefault
相似,可是若是原始Observable发射超过一个的数据,会以错误通知终止。
和firstOrDefault(T, Func1)
相似,若是没有数据知足条件,返回默认值;若是有多个数据知足条件,以错误通知终止。
first系列的这几个操做符默认不在任何特定的调度器上执行。
将Observable转换为另外一个对象或数据结构
ReactiveX的不少语言特定实现都有一种操做符让你能够将Observable或者Observable发射的数据序列转换为另外一个对象或数据结构。它们中的一些会阻塞直到Observable终止,而后生成一个等价的对象或数据结构;另外一些返回一个发射那个对象或数据结构的Observable。
在某些ReactiveX实现中,还有一个操做符用于将Observable转换成阻塞式的。一个阻塞式的Ogbservable在普通的Observable的基础上增长了几个方法,用于操做Observable发射的数据项。
getIterator
操做符只能用于BlockingObservable
的子类,要使用它,你首先必须把原始的Observable转换为一个BlockingObservable
。可使用这两个操做符:BlockingObservable.from
或the Observable.toBlocking
。
这个操做符将Observable转换为一个Iterator
,你能够经过它迭代原始Observable发射的数据集。
toFuture
操做符也是只能用于BlockingObservable
。这个操做符将Observable转换为一个返回单个数据项的Future
,若是原始Observable发射多个数据项,Future
会收到一个IllegalArgumentException
;若是原始Observable没有发射任何数据,Future
会收到一个NoSuchElementException
。
若是你想将发射多个数据项的Observable转换为Future
,能够这样用:myObservable.toList().toBlocking().toFuture()
。
toFuture
操做符也是只能用于BlockingObservable
。这个操做符将Observable转换为一个Iterable
,你能够经过它迭代原始Observable发射的数据集。
一般,发射多项数据的Observable会为每一项数据调用onNext
方法。你能够用toList
操做符改变这个行为,让Observable将多项数据组合成一个List
,而后调用一次onNext
方法传递整个列表。
若是原始Observable没有发射任何数据就调用了onCompleted
,toList
返回的Observable会在调用onCompleted
以前发射一个空列表。若是原始Observable调用了onError
,toList
返回的Observable会当即调用它的观察者的onError
方法。
toList
默认不在任何特定的调度器上执行。
toMap
收集原始Observable发射的全部数据项到一个Map(默认是HashMap)而后发射这个Map。你能够提供一个用于生成Map的Key的函数,还能够提供一个函数转换数据项到Map存储的值(默认数据项自己就是值)。
toMap
默认不在任何特定的调度器上执行。
toMultiMap
相似于toMap
,不一样的是,它生成的这个Map同时仍是一个ArrayList
(默认是这样,你能够传递一个可选的工厂方法修改这个行为)。
toMultiMap
默认不在任何特定的调度器上执行。
toSortedList
相似于toList
,不一样的是,它会对产生的列表排序,默认是天然升序,若是发射的数据项没有实现Comparable
接口,会抛出一个异常。然而,你也能够传递一个函数做为用于比较两个数据项,这是toSortedList
不会使用Comparable
接口。
toSortedList
默认不在任何特定的调度器上执行。
nest
操做符有一个特殊的用途:将一个Observable转换为一个发射这个Observable的Observable。