今天,咱们来整理如下几个你们容易弄混的概念,并用实际例子来演示,能够从 RxSample 的第十二章中获取:java
publish
reply
ConnectableObservable
connect
share
refCount
autoConnect
对于以上这些概念,能够用一幅图来归纳: git
Observable
能够分为四类,下面咱们将逐一介绍这几种
Observable
的特色:
Cold Observable
,就是咱们经过Observable.create
、Observable.interval
等建立型操做符生成的Observable
。Cold Observable
通过publish()
或者replay(int N)
操做符转换成的ConnectableObservable
。ConnectableObservable
通过refCount()
,或者由Cold Observable
通过share()
转换成的Observable
。ConnectableObservable
通过autoConnect(int N)
转换成的Observable
。Cold Observable
就是咱们经过Observable.create
、Observable.interval
等建立型操做符生成的Observable
,它具备如下几个特色:github
Cold Observable
时,Cold Observable
会从新开始发射数据给该订阅者。Cold Observable
,它们收到的数据是相互独立的。Cold Observable
后,Cold Observable
会中止发射数据给该订阅者,但不会中止发射数据给其它订阅者。下面,咱们演示一个例子,首先咱们建立一个Cold Observable
:缓存
//直接订阅Cold Observable。
private void createColdSource() {
mConvertObservable = getSource();
}
private Observable<Integer> getSource() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
try {
int i = 0;
while (true) {
Log.d(TAG, "源被订阅者发射数据=" + i + ",发送线程ID=" + Thread.currentThread().getId());
mSourceOut.add(i);
observableEmitter.onNext(i++);
updateMessage();
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io());
}
复制代码
在建立两个订阅者,它们能够随时订阅到Cold Observable
或者取消对它的订阅:ide
private void startSubscribe1() {
if (mConvertObservable != null && mDisposable1 == null) {
mDisposable1 = mConvertObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "订阅者1收到数据=" + integer + ",接收线程ID=" + Thread.currentThread().getId());
mSubscribe1In.add(integer);
updateMessage();
}
});
}
}
private void disposeSubscribe1() {
if (mDisposable1 != null) {
mDisposable1.dispose();
mDisposable1 = null;
mSubscribe1In.clear();
updateMessage();
}
}
private void startSubscribe2() {
if (mConvertObservable != null && mDisposable2 == null) {
mDisposable2 = mConvertObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "订阅者2收到数据=" + integer + ",接收线程ID=" + Thread.currentThread().getId());
mSubscribe2In.add(integer);
updateMessage();
}
});
}
}
private void disposeSubscribe2() {
if (mDisposable2 != null) {
mDisposable2.dispose();
mDisposable2 = null;
mSubscribe2In.clear();
updateMessage();
}
}
复制代码
为了验证以前说到的几个特色,进入程序以后,咱们会先建立该Cold Observable
,以后进行一系列的操做,效果以下: spa
Cold Observable
,这时候Cold Observable
没有发送任何数据。Observer1
订阅Observable
,此时Cold Observable
开始发送数据,Observer1
也能够收到数据,即 一个订阅者订阅 Cold Observable 时, Cold Observable 会开始发射数据给该订阅者Observer2
订阅Observable
,此时Observable2
也能够收到数据,可是它和Observable1
收到的数据是相互独立的,即 当多个订阅者订阅到同一个 Cold Observable ,它们收到的数据是相互独立的。Observer1
取消对Observable
的订阅,这时候Observer1
收不到数据,而且Observable
也不会发射数据给它,可是仍然会发射数据给Observer2
,即 当一个订阅者取消订阅 Cold Observable 后,Cold Observable 会中止发射数据给该订阅者,但不会中止发射数据给其它订阅者。Observer1
从新订阅Observable
,这时候Observable
从0
开始发射数据给Observer1
,即 一个订阅者订阅 Cold Observable 时, Cold Observable 会从新开始发射数据给该订阅者。在了解完Cold Observable
以后,咱们再来看第二类的Observable
,它的类型为ConnectableObservable
,它是经过Cold Observable
通过下面两种方式生成的:线程
.publish()
.reply(int N)
若是使用.publish()
建立,那么订阅者只能收到在订阅以后Cold Observable
发出的数据,而若是使用reply(int N)
建立,那么订阅者在订阅后能够收到Cold Observable
在订阅以前发送的N
个数据。3d
咱们先以publish()
为例,介绍ConnectableObservable
的几个特色:code
ConnectableObservable
有没有订阅者,只要调用了ConnectableObservable
的connect
方法,Cold Observable
就开始发送数据。connect
会返回一个Disposable
对象,调用了该对象的dispose
方法,Cold Observable
将会中止发送数据,全部ConnectableObservable
的订阅者也没法收到数据。connect
返回的Disposable
对象后,若是从新调用了connect
方法,那么Cold Observable
会从新发送数据。ConnectableObservable
后,该订阅者会收到在订阅以后,Cold Observable
发送给ConnectableObservable
的数据。ConnectableObservable
时,它们收到的数据是相同的。ConnectableObservable
,不会影响其余订阅者收到消息。下面,咱们建立一个ConnectableObservable
,两个订阅者以后会订阅到它,而不是Cold Observable
:cdn
//.publish()将源Observable转换成为HotObservable,当调用它的connect方法后,不管此时有没有订阅者,源Observable都开始发送数据,订阅者订阅后将能够收到数据,而且订阅者解除订阅不会影响源Observable数据的发射。
public void createPublishSource() {
mColdObservable = getSource();
mConvertObservable = mColdObservable.publish();
mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
}
复制代码
和上面同样,仍是用一个例子来演示,该例子的效果为:
Cold Observable
的publish
方法建立ConnectableObservable
,并调用ConnectableObservable
的connect
方法,能够看到,此时虽然ConnectableObservable
没有任何订阅者,可是Cold Observable
也已经开始发送数据。Observer1
订阅到ConnectableObservable
,此时它只能收到订阅以后Cold Observable
发射的数据。Observer2
订阅到ConnectableObservable
,Cold Observable
只会发射一份数据,而且Observer1
和Observer2
收到的数据是相同的。Observer1
取消对ConnectableObservable
的订阅,Cold Observable
仍然会发射数据,Observer2
仍然能够收到Cold Observable
发射的数据。Observer1
从新订阅ConnectableObservable
,和第三步相同,它仍然只会收到订阅以后Cold Observable
发射的数据。connect
返回的Disposable
对象,调用dispose
方法,此时Cold Observable
中止发射数据,而且Observer1
和Observer2
都收不到数据。上面这些现象发生的根本缘由在于:如今Observer
和Observer2
都是订阅到ConnectableObservable
,真正产生数据的Cold Observable
并不知道他们的存在,和它交互的是ConnectableObservable
,ConnectableObservable
至关于一个中介,它完成下面两项任务:
connect
和dispose
方法决定是否要订阅到Cold Observer
,也就是决定了Cold Observable
是否发送数据。Cold Observable
发送的数据转交给它的订阅者。由ConnectableObservable
转换成Observable
有两种方法,咱们分为两节介绍下当订阅到转换后的Observable
时的现象:
.refCount()
.autoConnect(int N)
通过refCount
方法,ConnectableObservable
能够转换成正常的Observable
,咱们称为refObservable
,这里咱们假设ConnectableObservable
是由Cold Observable
经过publish()
方法转换的,对于它的订阅者,有如下几个特色:
refObservable
后,Cold Observable
开始发送数据。refObservable
后,只能收到在订阅以后Cold Observable
发送的数据。refObservable
后,假如它是当前refObservable
的惟一一个订阅者,那么Cold Observable
会中止发送数据;不然,Cold Observable
仍然会继续发送数据,其它的订阅者仍然能够收到Cold Observable
发送的数据。接着上例子,咱们建立一个refObservable
:
//.share()至关于.publish().refCount(),当有订阅者订阅时,源订阅者会开始发送数据,若是全部的订阅者都取消订阅,源Observable就会中止发送数据。
private void createShareSource() {
mColdObservable = getSource();
mConvertObservable = mColdObservable.publish().refCount();
}
复制代码
示例以下:
.publish().refCount()
建立由ConnectableObservable
转换后的refObservable
,此时Cold Observable
没有发送任何消息。Observer1
订阅到refObservable
,Cold Observable
开始发送数据,Observer1
接收数据。Observer2
订阅到refObservable
,它只能收到在订阅以后Cold Observable
发送的数据。Observer1
取消订阅,Cold Observable
继续发送数据,Observer2
仍然能收到数据。Observer2
取消订阅,Cold Observable
中止发送数据。Observer1
从新订阅,Cold Observable
从新开始发送数据。最后说明一点:订阅到Cold Observable
的.publish().refCount()
和Cold Observable
的share()
所返回的Observable
是等价的。
autoConnect(int N)
和refCount
很相似,都是将ConnectableObservable
转换成普通的Observable
,咱们称为autoObservable
,一样咱们先假设ConnectableObservable
是由Cold Observable
经过publish()
方法生成的,它有如下几个特色:
N
个订阅者订阅到refObservable
后,Cold Observable
开始发送数据。refObservable
后,只能收到在订阅以后Cold Observable
发送的数据。Cold Observable
开始发送数据,即便全部的autoObservable
的订阅和都取消了订阅,Cold Observable
也不会中止发送数据,若是想要Cold Observable
中止发送数据,那么可使用autoConnect(int numberOfSubscribers, Consumer connection)
中Consumer
返回的Disposable
,它的做用和ConnectableObservable
的connect
方法返回的Disposable
相同。其建立方法以下所示:
//.autoConnect在有指定个订阅者时开始让源Observable发送消息,可是订阅者是否取消订阅不会影响到源Observable的发射。
private void createAutoConnectSource() {
mColdObservable = getSource();
mConvertObservable = mColdObservable.publish().autoConnect(1, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
mConvertDisposable = disposable;
}
});
}
复制代码
示例效果以下:
autoConnect
转换后的autoObservable
。Observer1
订阅到autoObservable
,此时知足条件,Cold Observable
开始发送数据。Observer2
订阅到autoObservable
,它只能收到订阅后发生的数据。Observer1
取消订阅,Cold Observable
继续发送数据,Observer2
仍然能够收到数据。Observer2
取消订阅,Cold Observable
仍然继续发送数据。Observer2
订阅到autoObservable
,它只能收到订阅后发送的消息了。mConvertDisposable
的dispose
,Cold Observable
中止发送数据。在上面的例子当中,全部总结的特色都是创建在ConnectableObservable
是由publish()
生成,只因此这么作,是为了方便你们理解,不管是订阅到ConnectableObservable
,仍是由ConnectableObservable
转换的refObservable
和autoObservable
,使用这两种方式建立的惟一区别就是,订阅者在订阅后,若是是经过publish()
建立的,那么订阅者以后收到订阅后Cold Observable
发送的数据;而若是是reply(int N)
建立的,那么订阅者还能额外收到N
个以前Cold Observable
发送的数据,咱们用下面一个小例子来演示,订阅者订阅到的Observable
以下:
//.reply会让缓存源Observable的N个数据项,当有新的订阅者订阅时,它会发送这N个数据项给它。
private void createReplySource() {
mColdObservable = getSource();
mConvertObservable = mColdObservable.replay(3);
mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
}
复制代码
示例演示效果:
Cold Observable
的publish
方法建立ConnectableObservable
,并调用ConnectableObservable
的replay(3)
方法,能够看到,此时虽然ConnectableObservable
没有任何订阅者,可是Cold Observable
也已经开始发送数据。Observer1
订阅到ConnectableObservable
,此时它会先收到以前发射的3
个数据,以后收到订阅以后Cold Observable
发射的数据。最后再提一下,更详细的代码你们能够从 RxSample 的第十二章中获取。