哇哦,又是新的一天,是时候来学习一些新的「姿式」了 🙂。前端
嗨,朋友们,但愿你们一切都好。这是咱们 RxJava2 Android 系列的第六篇文章【第一话,第二话,第三话,第四话,第五话,第六话第七话 和第八话 】。在这一篇文章中,咱们将继续围绕 Rx 展开对话。还有一件重要的事情是,基本上 Summer vs Winter 意味着 Hot 和 Cold Observale 🙂 。java
我为啥要写这个呢:react
缘由和我在 part1 与你分享过的同样。android
引言:ios
**这篇文章并无引言,由于这实际上是咱们上一篇文章的延续,但在开始以前我想咱们应该进行一下前景回顾。上一篇文章中咱们遇到了一位 Rx Observable 先生。他给了咱们很多关于学习 Rx 的建议,而后他还分享给了咱们一些能够用来创造 Observable 的方法,最后他打算告诉咱们一些关于 Could 和 Hot Observable 的东西,结果咱们就此打住。git
紧接上一话:github
Observable:其实还有不少。我在这里介绍两类 Observable 对象。一种叫作 Cold Observable,第二个是 Hot Observable。有些时候开发者习惯把 Hot 和 Cold Observabels 拿来作比较 :)。 这些真的是很简单的概念。这里,我会经过一些简单的例子来阐述一下概念,而后我会告诉你如何在编码中使用它们。再以后我想我会给你一些真实案例,你以为如何?后端
Me:固然,我就在你眼前,这样你能够随时检查我是否有作错的地方。bash
Observable: 哈哈哈哈,固然了。那么有多少人了解商场的促销人员,就是那些站在商店门口但愿藉由大声吆喝来招揽顾客的人呢?dom
Me: 估计没几个,不少人都不太了解这种盛行于亚洲国家好比巴基斯坦和印度的销售文化……你能试着采用一些更加通俗的例子吗,这样的话每一个人都能更加轻易的理解这个概念。
Observable: 固然,没问题。有多少人了解咖啡和咖啡店呢?
Me: 差很少每一个人吧。
Observable: 很好。如今这里有两家咖啡店,一家叫作霜语咖啡店,一家叫作火舞咖啡店。任何一个去霜语咖啡馆的人均可以买一杯咖啡,而后坐在咖啡馆的任何地方。咖啡厅里的每一个座位上都提供了一副智能耳机。他们提供了一个有三首诗的播放列表。这些耳机最智能的地方在于,每当有人带上它们,这些耳机老是从第一首诗开始播放,若是有人中途取下了耳机后再次从新戴上,那么这些耳机仍然会从新从第一首诗开始播放。对了,若是你只是取下了耳机,那么它也就会中止播放。
反过来,火舞咖啡馆有一套完善的音乐播放系统。当你进入咖啡馆的时候,你就会开始听到他们播放的诗,由于他们有着很是好的音乐播放系统和一个大号的扬声器。他们的诗歌列表里有无数首诗,当他们天天开始营业的时候他们就会打开这个系统。因此说这个系统的运行与顾客无关,任何将会进入这家咖啡馆的人都能听到那个时刻正在播放的诗,而且他永远也不知道他进入以前已经播放完了多少诗了。这跟咱们要讲的 Observable 是一个概念。
就像霜语咖啡馆的那些耳机,Cold Obervable 老是被动的。就像你用 Observable.fromArray() 或者其余任何方法来创造 Observable 同样,他们和那些耳机差很少。如同戴上耳机播放列表才会播放同样,当你开始订阅那些 Observable 后你才会开始接收到数据。而当订阅者取消了对 Observable 的订阅后,如同取下耳机后诗会中止播放同样,你也将再也不能接收到数据。
最后的重点是霜语咖啡馆提供了不少副耳机,可是每副耳机只会在有人戴上它们以后才会开始播放。即便某我的已经播放到了第二首诗,但另外的某我的才戴上耳机,那么第二我的会从第一首诗开始播放。这意味着每一个人都有独立的播放列表。就如同咱们有三个订阅了 Cold Observable 的订阅者同样,它们会获得各自独立的数据流,也就是说 Observable 会对每一个订阅者单独地去调用三次 onNext 方法。换句话说就是,Cold Observable 如同那些耳机同样依赖于订阅者的订阅(顾客戴上耳机)。
Hot observable 就像火舞咖啡馆的音乐系统同样。一旦咖啡馆开始营业,其音乐系统就会开始播放诗歌,无论有没有人在听。每位进来的顾客都会从那个时刻正好在播放的诗开始聆听。这跟 Hot Observable 所作的事情同样,一旦它们被建立出来就会开始发射数据,任何的订阅者都会从它们开始订阅的那个时间点开始接收到数据,而且毫不会接收到以前就发射出去的数据。任何订阅者都会在订阅以后才接收到数据。我想我会使用一样的例子来进行编码,而且以后我会给一些真实案例。
Cold Observable:
public class HotVsCold {
public static void main(String[] args) throws InterruptedException {
List<String > poemsPlayList = Arrays.asList("Poem 1", "Poem 2", "Poem 3");
Observable coldMusicCoffeCafe = Observable.fromArray(poemsPlayList);
Consumer client1 = poem-> System.out.println(poem);
Consumer client2 = poem-> System.out.println(poem);
Consumer client3 = poem-> System.out.println(poem);
Consumer client4 = poem-> System.out.println(poem);
coldMusicCoffeCafe.subscribe(client1);
coldMusicCoffeCafe.subscribe(client2);
System.out.println(System.currentTimeMillis());
Thread.sleep(2000);
System.out.println(System.currentTimeMillis());
coldMusicCoffeCafe.subscribe(client3);
coldMusicCoffeCafe.subscribe(client4);
}
}
复制代码
好吧,这是一些很简单的示例代码。我有 4 个顾客和 1 个我在霜语咖啡馆例子里提到的播放列表。当前两个顾客戴上了耳机后,我暂停了 2 秒的程序,而后 3 号和 4 号顾客也戴上了耳机。在最后咱们查看输出数据时,咱们能轻易地看出每一个顾客都把 3 首诗从头听了一遍。
Output:
[Poem 1, Poem 2, Poem 3]
[Poem 1, Poem 2, Poem 3]
1494142518697
1494142520701
[Poem 1, Poem 2, Poem 3]
[Poem 1, Poem 2, Poem 3]
复制代码
Hot Observable:
public static void main(String[] args) throws InterruptedException {
Observable<Long> hotMusicCoffeeCafe = Observable.interval(1000, TimeUnit.MILLISECONDS);
ConnectableObservable<Long> connectableObservable = hotMusicCoffeeCafe.publish();
connectableObservable.connect(); // 咖啡馆开始营业,音乐播放系统开启
Consumer client1 = poem-> System.out.println("Client 1 poem"+poem);
Consumer client2 = poem-> System.out.println("Client 2 poem"+poem);
Consumer client3 = poem-> System.out.println("Client 3 poem"+poem);
Consumer client4 = poem-> System.out.println("Client 4 poem"+poem);
Thread.sleep(2000); // 在2首诗已经播放完毕后第一位顾客才进来,因此他会才第二首诗开始听
connectableObservable.subscribe(client1);
Thread.sleep(1000); // 第二位顾客会从第三首诗开始听
connectableObservable.subscribe(client2);
Thread.sleep(4000); // 第三和第四为顾客为从第七首诗开始听(译者注:原本是写的 poem 9)
connectableObservable.subscribe(client3);
connectableObservable.subscribe(client4);
while (true);
}
复制代码
火舞咖啡馆开始营业的时候就会开启其音乐播放系统。诗歌会在以上代码里咱们调用 connect 方法的时候开始播放。暂时先不须要关注 connect 方法,而只是试着理解这个概念。当通过 2 秒暂停,第一个顾客走进了咖啡馆后,他会从第二首诗开始听。下一位顾客会在 1 秒以后进来,而且从第三首诗开始听。以后,第三和第四位顾客会在 4 秒后进入,而且从第七首诗开始听。你能够看到这个音乐播放系统是独立于顾客的。一旦这个音乐系统开始运行,它并不在意有没人顾客在听。也就是说,全部的顾客会在他们进入时听到当前正在播放的诗,并且他们毫不会听到以前已经播放过的诗。如今我以为你已经抓住了 Hot vs Cold Observable 的概念。是时候来瞧一瞧如何建立这些不一样 Observables 的要点了。
Cold Observable:
Hot Observable:
Me: 听上去不错。你能告诉我如何将咱们的 Cold Observable 转换成 Hot Observable 吗?
Observable: 固然,Cold 和 Hot Observable 之间的转换很简单。
List<Integer> integers = new ArrayList<>();
Observable.range(0, 10000)
.subscribe(count -> integers.add(count));
Observable<List<Integer>> listObservable = Observable.fromArray(integers);
复制代码
在上面的代码里面,listObservable 是一个 Cold Observable。如今来看看咱们怎么把这个 Cold Observable 转换成 Hot Observable 的。
Observable<List<Integer>> listObservable = Observable.fromArray(integers);
ConnectableObservable connectableObservable = listObservable.publish();
复制代码
咱们用 publish() 方法将咱们的 Cold Observable 转换成了 Hot Observable。因而咱们能够说任何的 Cold Observable 均可以经过调用 publish() 方法来转换成 Hot Observable,这个方法会返回给你一个 ConnectableObservable,只是此时尚未开始发射数据。有点神奇啊。当我对任意 Observable 调用 publish() 方法时,这意味着从如今开始任何开始订阅的订阅者都会分享一样的数据流。有趣的一点是,若是如今有任意的订阅者订阅了 connectableObservable,它们什么也得不到。也许大家感到有些疑惑了。这里有两件事须要说明。当我调用 publish() 方法时,只是说明如今这个 Observable 作好了能成为单一数据源来发射数据的准备,为了真正地发射数据,我须要调用 connect() 方法,以下方代码所示。
Observable<List<Integer>> listObservable = Observable.fromArray(integers);
ConnectableObservable connectableObservable = listObservable.publish();
connectableObservable.connect();
复制代码
很简单对吧。记住调用 publish() 只是会把 Cold Observable 转换成 Hot Observable,而不会开始发射数据。为了可以发射数据咱们须要调用 cocnnect()。当我对一个 ConnectableObserbale 调用 connect() 时,数据才会开始被发射,无论有没有订阅者。这里还有一些在正式项目里会很是有用的方法,好比 refCount()、share()、replay()。在开始谈及它们以前,我会就此打住并再给你展现一个例子,以确保大家真正抓住了要领。
Me: 好嘞,但愿不要太复杂。
Observable: 哈哈哈,不会的。我只是须要再来详细解释一下,确保每一个人都把握了这个概念,由于这个概念其实并不算是特别简单的和容易理解的。
Me: 我也以为。
Observable:如今我会给你一个例子来让你更好地来准确把握这个概念。好比咱们有以下的一个 Observable。
Observable<String> just = Observable.just("Hello guys");
复制代码
还有两个不一样的订阅者订阅了它。
public class HotVsCold {
public static void main(String[] args) {
Observable<String> just = Observable.just("Hello guys");
just.subscribe(s-> System.out.println(s));
just.subscribe(s-> System.out.println(s));
}
}
复制代码
Output:
Hello guys
Hello guys
复制代码
个人问题是,这个 Observable 是 Cold 仍是 Hot 的呢。我知道你确定已经知道这个是 cold,由于这里没有 publish() 的调用。先暂时把这个想象成我从某个第三方库得到而来的,因而我也不知道这是哪一种类型的 Observable。如今我打算写一个例子,这样不少事情就不言而喻了。
public static void main(String[] args) {
Random random = new Random();
Observable<Integer> just = Observable.create(source->source.onNext(random.nextInt()));
just.subscribe(s-> System.out.println(s));
just.subscribe(s-> System.out.println(s));
}
复制代码
我有一段生产随机数的程序,让咱们来看下输出再来讨论这是 Cold 仍是 Hot。
Output: 1531768121 607951518
两个不一样的值。这就是说这是一个 Cold observable,由于根据 Cold Observable 的定义每次都会获得一个全新的值。每次它都会建立一个全新的值,或者简单来讲 onNext() 方法会被不一样的订阅者分别调用一次。
如今让咱们来把这个 Cold Observable 转换成 Hot Observable。
public static void main(String[] args) {
Random random = new Random();
Observable<Integer> just = Observable.create(source->source.onNext(random.nextInt()));
ConnectableObservable<Integer> publish = just.publish();
publish.subscribe(s-> System.out.println(s));
publish.subscribe(s-> System.out.println(s));
publish.connect();
}
复制代码
在解释上面的代码以前,先让咱们来看一下输出。
Output:
1926621976
1926621976
复制代码
咱们的两个不一样订阅者获得了同一份数据。根据 Hot Observable 老是每份数据只发射一次的定义说明了这是一个 Hot Obsevable,或者简单来讲 onNext() 只被调用了一次。我接下来会解释 publish() 和 connect() 的调用。
当我调用 publish() 方法时,这意味着个人这个 Observable 已经独立于订阅者,而且全部订阅者只会接收到同一个数据源发射的同一份数据。简单来讲,Hot Observable 将会对全部订阅者发射调用一次 onNext() 所产生的数据。这里或许有些让你感到困惑,我在两个订阅者订阅以后才调用了 connect() 方法。由于我想告诉大家 Hot Observable 是独立的而且数据的发射应该经过一次对 onNext() 的调用,而且咱们知道 Hot Observable 只会在咱们调用 connect() 以后才会开始发射数据。因此首先咱们让两个订阅者去订阅,而后在咱们才调用 connect() 方法,因而咱们就能够获得一样一份数据。如今让咱们来对这个例子作些小小的改动。
Random random = new Random();
Observable<Integer> just = Observable.create(source->source.onNext(random.nextInt()));
ConnectableObservable<Integer> publish = just.publish();
publish.connect();
publish.subscribe(s-> System.out.println(s));
publish.subscribe(s-> System.out.println(s));
复制代码
咱们看到这里只有一处小小的变化。我在调用 connect() 以后才让订阅者订阅。你们来猜猜会输出什么?
Output:
Process finished with exit code 0
复制代码
没错,没有输出。是否是以为有点不对劲?听我慢慢解释。如你所见,我建立了一个发射随机数的 Observable,而且它只会调用一次了。经过调用 publish() 我将这个 Cold Observable 转换成了 Hot Observable,接着我当即调用了 connect() 方法。咱们知道如今它是一个独立于订阅者的 Hot Observable,而且它生成了一个随机数将其发射了出去。在调用 connect() 以后咱们才让两个订阅者订阅了这个 Observable,两个订阅者没有接收到任何数据的缘由是在它们订阅以前 Hot Observable 就已经将数据发射了出去。我想你们都能明白的吧。如今让咱们在 Observable 内部加上日志打印输出,这样咱们就能够确认这个流程是如同我所解释的同样了。
public static void main(String[] args) {
Random random = new Random();
Observable<Integer> just = Observable.create(source -> {
int value = random.nextInt();
System.out.println("Emitted data: " + value);
source.onNext(value);
}
);
ConnectableObservable<Integer> publish = just.publish();
publish.connect();
publish.subscribe(s -> System.out.println(s));
publish.subscribe(s -> System.out.println(s));
}
复制代码
Output:
Emitted data: -690044789
Process finished with exit code 0
复制代码
如上所示,个人 Hot Observable 在调用 connect() 以后开始发射数据,而后才是订阅者发起了订阅。这就是为何个人订阅者没有获得数据。让咱们在继续深刻以前来复习一下。
Observable: 小小的暂停一下,在咱们继续研究 Observable 以前,你若是能将以上的代码改形成能无限制间隔发射数据的话就太棒了。
Me: 小菜一碟。
public static void main(String[] args) throws InterruptedException {
Random random = new Random();
Observable<Integer> just = Observable.create(
source -> {
Observable.interval(1000, TimeUnit.MILLISECONDS)
.subscribe(aLong -> {
int value = random.nextInt();
System.out.println("Emitted data: " + value);
source.onNext(value);
});
}
); // 简单的把数据源变成了每间隔一秒就发射一次数据。
ConnectableObservable<Integer> publish = just.publish();
publish.connect();
Thread.sleep(2000); // 咱们的订阅者在 2 秒后才开始订阅。
publish.subscribe(s -> System.out.println(s));
publish.subscribe(s -> System.out.println(s));
while (true);
}
复制代码
Output:
Emitted data: -918083931
Emitted data: 697720136
Emitted data: 416474929
416474929
416474929
Emitted data: -930074666
-930074666
-930074666
Emitted data: 1694552310
1694552310
1694552310
Emitted data: -61106201
-61106201
-61106201
复制代码
输出结果如上所示。咱们的 Hot Observable 彻底在按照咱们以前得出的定义在工做。当它开始发射数据的 2 秒时间后,咱们获得了 2 个不一样的输出值,接着咱们让两个订阅者去订阅它,因而它们获得了同一份第三个被发射出来的值。 是时候来更加深刻的来理解这个概念了。在咱们已经对 Cold 和 Hot 有必定概念的基础上,我将针对一些场景对 Hot Observable 作更详细的介绍。
场景 1: 我但愿任意订阅者在订阅以后也能首先接收到其订阅这个时间点以前的数据,而后才是同步接收到新发射出来的数据。要解决这个问题,咱们只须要简单的调用 replay() 方法就行。
public static void main(String[] args) throws InterruptedException {
Random random = new Random();
Observable<Integer> just = Observable.create(
source -> {
Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribe(aLong -> {
int value = random.nextInt();
System.out.println("Emitted data: " + value);
source.onNext(value);
});
}
);
ConnectableObservable<Integer> publish = just.replay();
publish.connect();
Thread.sleep(2000);
publish.subscribe(s -> System.out.println("Subscriber 1: "+s));
publish.subscribe(s -> System.out.println("Subscriber 2: "+s));
while (true);
}
复制代码
Output:
**Emitted data: -1320694608**
**Emitted data: -1198449126**
**Emitted data: -1728414877**
**Emitted data: -498499026**
Subscriber 1: -1320694608
Subscriber 1: -1198449126
Subscriber 1: -1728414877
Subscriber 1: -498499026
Subscriber 2: -1320694608
Subscriber 2: -1198449126
Subscriber 2: -1728414877
Subscriber 2: -498499026
**Emitted data: -1096683631**
**Subscriber 1: -1096683631**
**Subscriber 2: -1096683631**
**Emitted data: -268791291**
**Subscriber 1: -268791291**
**Subscriber 2: -268791291**
复制代码
以上所示,你能轻松的理解 Hot Observabel 里的 replay() 这个方法。我首先建立了一个每隔 0.5 秒发射数据的 Hot Observable,在 2 秒事后咱们才让两个订阅者去订阅它。此时因为咱们的 Observable 已经发射出来了 4 个数据,因而你能看到输出结果里,咱们的订阅者首先获得了在其订阅这个时间点以前已经被发射出去的 4 个数据,而后才开始同步接收到新发射出来的数据。
场景 2: 我但愿有一种 Hot Observable 可以在最少有一个订阅者的状况下才发射数据,而且若是全部它的订阅者都取消了订阅,它就会中止发射数据。 这一样可以很轻松的办到。
public static void main(String[] args) throws InterruptedException {
Observable<Long> observable = Observable.interval(500, TimeUnit.MILLISECONDS).publish().refCount();
Consumer<Long > firstSubscriber = s -> System.out.println("Subscriber 1: "+s);
Consumer<Long > secondSubscriber = s -> System.out.println("Subscriber 2: "+s);
Disposable subscribe1 = observable.subscribe(firstSubscriber);
Disposable subscribe2 = observable.subscribe(secondSubscriber);
Thread.sleep(2000);
subscribe1.dispose();
Thread.sleep(2000);
subscribe2.dispose();
Consumer<Long > thirdSubscriber = s -> System.out.println("Subscriber 3: "+s);
Disposable subscribe3 = observable.subscribe(thirdSubscriber);
Thread.sleep(2000);
subscribe3.dispose();
while (true);
}
复制代码
Output: Subscriber 1: 0 Subscriber 2: 0 Subscriber 1: 1 Subscriber 2: 1 Subscriber 1: 2 Subscriber 2: 2 Subscriber 1: 3 Subscriber 2: 3 Subscriber 2: 4 Subscriber 2: 5 Subscriber 2: 6 Subscriber 2: 7 Subscriber 3: 0 Subscriber 3: 1 Subscriber 3: 2 Subscriber 3: 3 (译者注:原文少写了一行输出)
相当重要的一点是,这是一个 Hot Observable,而且它在第一个订阅者订阅以后才开始发射数据,而后当它没有订阅者时它会中止发射数据。 如上面的输出所示,当头两个订阅者开始订阅它以后,它才开始发射数据,而后其中一个订阅者取消了订阅,可是它并无中止发射数据,由于此时它还拥有另一个订阅者。又过了一会,另一个订阅者也取消了订阅后,它便中止了发射数据。当 2 秒事后第三个订阅者开始订阅它以后,它开始从头开始发射数据,而不是从第二个订阅者取消订阅时停留在的位置。
Observable: 哇哦,你真棒!你把这个概念解释地超好。
Me: 多谢夸奖。
Observable: 那么你还有其余的问题吗?
Me: 是的,我有。你能告诉我什么是 Subject 以及不一样类别的 Subject 的区别吗,好比 Publish,Behaviour 之类的。
Observable: Emmmmmm。我觉我应该在教你那些个概念以前告诉你关于 Observer API 的相关知识,还有就是关于如何使用 Lambda 表达式或者叫函数式接口来代替使用完整的 Observer 接口的方法。你以为呢?
Me: 好啊,都听你的。
Observable: 就目前咱们了解到的 Observable,这里还有一个关于咱们一直在使用的 Observable 的概念...
小结: 大家好啊,朋友们。此次的对话真是有点长啊,我必须在此打住了。不然的话这篇文章就会变成一本四库全书,什么乱七八糟的东西都会出现。我但愿咱们可以系统地有条理地来学习这一切。因此余下的内容,咱们下回再揭晓。再者,试试看尽你可能把咱们此次学到的东西用在你真正的项目中。最后感谢 Rx Observable 的到场。 周末快乐,再见。🙂
掘金翻译计划 是一个翻译优质互联网技术文章的社区,文章来源为 掘金 上的英文分享文章。内容覆盖 Android、iOS、前端、后端、区块链、产品、设计、人工智能等领域,想要查看更多优质译文请持续关注 掘金翻译计划、官方微博、知乎专栏。