哇哦, 咱们又多了一天时间,因此让咱们来学点新东西好让这一天过得很棒吧 🙂。前端
各位好, 但愿你如今已经作的很好了。 这是咱们关于 RxJava2 Android 系列文章的第八话 [ 第一话,第二话,第三话,第四话,第五话,第六话,第七话,第八话 ] 。在这一篇文章中将讨论 Rx 中的 Subjects(主题)。java
研究动机 : 本文研究动机和系列文章 第一话 中分享给你们的相同。react
引言 : 当我开始与 Rx 的这段旅程时, Subjects 就是我最困惑的一个部分。在大多数我开始去读任一博客的时候,我老是获得这样一个定义: “ Subjects 就像一个 Observable 和 Observer 同时存在同样。” 由于我不是一个聪明的人,因此这一点一直让我很困惑,所以在用 Rx 作了不少练习以后,有一天我获得了关于 Subjects 的概念,我惊讶于这个概念的强大,因此在这篇文章中我将和你一块儿讨论这个概念以及这个概念有多强大,或许在一些地方我不正确的使用了这个概念,可是此次让你学到这个概念,在本文最后,你将会和 Subjects 成为很好的朋友。🙂android
若是你和我同样,认为 Subjects 就像是 Observer 和 Observable 的组合,那么请尽可能忘掉这个概念。如今我将要修改一下 Observable 和 Observer 的概念. 对于 Observable 我会建议你阅读 Rx Observable 和 开发者 ( 我 ) 之间的对话 [ Android RxJava2 ] (这什么鬼系列 )第五话 而且 Observer 我会建议你阅读 继续 Rx Observable 和 开发者 ( 我 ) 之间的对话 (Observable 求婚 Observer) [ Android RxJava2 ](这什么鬼系列)第七话 。而后你就能够很轻易的理解本篇文章,如今我会在下面和你分享一下 Obsevable 和 Observer 的一些 API。ios
这是 Observable 的代码,如图所示代码总行数为 3000 多行。 正如咱们所知,Observable 一般使用其不一样的方法将数据转换为流,下面我给出一个简单的例子。git
public static void main(String[] args) {
List<String> list = Arrays.asList("Hafiz", "Waleed", "Hussain");
Observable<String> stringObservable = Observable.fromIterable(list);
}
复制代码
接下来咱们须要 Observer 从 Observable 中获得数据。如今我将第一次向你展现 Obsever 的一些 API。github
就像咱们看到的 Observer 很是简单,只有 4 个方法,那如今是时候在示例中使用一下这个 Observer 了。后端
/**
* Created by waleed on 09/07/2017.
*/
public class Subjects {
public static void main(String[] args) {
List<String> list = Arrays.asList("Hafiz", "Waleed", "Hussain");
Observable<String> stringObservable = Observable.fromIterable(list);
Observer<String> stringObserver = new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
stringObservable.subscribe(stringObserver);
}
}
复制代码
它的输出很简单. 如今咱们成功修订了 Observable 和 Observer API’s , 当作订阅时,Observable 基本是调用咱们的 Observer API’s。 任什么时候候 Observable 想要提供数据,老是要调用 Observaer 的 onNext ( data ) 方法。 任什么时候候发生错误 Observable 会调用 Observer 的 onError(e) 方法。
任什么时候候流操做完成 Observable 会调用 Observer 的 onComplete() 方法. 这是这两个 API 之间的一个简单关系.bash
如今我将要开始咱们今天的主题,若是再次对 Observable 和 Observer 有任何疑惑,请尝试阅读我上文中提到的文章,或者在评论中提问。 我认为 Rx 中关于 Subjects 的定义放到最后讨论,如今我将向你解释一个更简单的例子,它将使咱们能够更直接的掌握 Rx 中 Subjects 的概念。ide
Observable<String> stringObservable = Observable.create(observableEmitter -> {
observableEmitter.onNext("Event");
});
复制代码
这是能够发射一个字符串的 Observable。
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
复制代码
这是一个将会订阅 Observable 的消费者。
while (true) {
Thread.sleep(1000);
stringObservable.subscribe(consumer);
}
复制代码
这段代码会在每一秒后产生一个事件。 为了方便阅读我把完整的代码代码贴出。
public class Subjects {
public static void main(String[] args) throws InterruptedException {
Observable<String> stringObservable = Observable.create(observableEmitter -> {
observableEmitter.onNext("Event");
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
while (true) {
Thread.sleep(1000);
stringObservable.subscribe(consumer);
}
}
}
复制代码
Output: Event Event Event Event
这是一个简单的例子,我认为没有必要过多的解释,如今有趣的部分是,我会用不一样的技术来写出会有同样输出的新的例子。 在深刻以前,尝试阅读下面的代码。
class ObservableObserver extends Observable<String> implements Observer<String>.
复制代码
这很简单,我建立了一个名为 ObservableObserver 的新类, 它继承自 Observable 而且实现了 Observer 接口。 因此这意味这它能够做为 Observable 增强版 和 Observer. 我不认为这会有任何疑问,因此咱们已经知道 Observable 老是会生成流,因此这个类也有这个能力,由于它继承自 Observable。而后咱们可知 Observer 能够经过 订阅 Observable 来观察 Observable 中的任何流,那么咱们的新类也能够完成这些工做,由于它实现了 Observer 接口,BOOM。 很简单。 如今我要给你看所有代码,代码只是为了解释这个概念并不意味着它是一个 成熟 的代码。
class ObservableObserver extends Observable<String> implements Observer<String> {
private Observer<? super String> observer;
@Override
protected void subscribeActual(Observer<? super String> observer) { // Observable abstract method
this.observer = observer;
}
@Override
public void onSubscribe(Disposable disposable) { //Observer API
if (observer != null) {
observer.onSubscribe(disposable);
}
}
@Override
public void onNext(String s) {//Observer API
if (observer != null) {
observer.onNext(s);
}
}
@Override
public void onError(Throwable throwable) {//Observer API
if (observer != null) {
observer.onError(throwable);
}
}
@Override
public void onComplete() {//Observer API
if (observer != null) {
observer.onComplete();
}
}
public Observable<String> getObservable() {
return this;
}
}
复制代码
又一个很简单的类,咱们已经使用过上面的全部方法了,只是在这里有一个区别,就是咱们在同一个类中使用了 Observable 和 Observer 的相关方法。
public static void main(String[] args) throws InterruptedException {
ObservableObserver observableObserver = new ObservableObserver();
observableObserver.getObservable().subscribe(System.out::println);
while (true) {
Thread.sleep(1000);
observableObserver.onNext("Event");
}
}
复制代码
Output: Event Event Event
在上面的代码中有两行很重要,我将要给你们解释一下: **observableObserver.getObservable(): **这里,我从 ObservableObserver 类获取 Observable 并订阅 Observer . **observableObserver.onNext(“Event”): **这里,当事件发生时调用 Observer API 方法. 由于做为一个自我闭环的类,因此我可以从这个既是 Observabel 又是 Observer 的类中得到好处。如今有一个惊喜,你已经掌握了 Subjects 的概念,若是你不信的话来看下面图中的代码:
这是 RxJava2 Subject 类的代码,如今你能够明白为何人们会说 Subjiects 既是 Observable 又是 Observer,由于它使用了两个的 API 方法。 如今的 RxJava 中可使用不一样类型的 Subjects, 这是咱们下面要讨论的内容。
在 RxJava 中你能够获取到 4 种类型的 Subjiects。 1. Publish Subject 2. Behaviour Subject 3. Replay Subject 4. Async Subject
public static void main(String[] args) throws InterruptedException {
Subject<String> subject = PublishSubject.create();
// Subject<String> subject = BehaviorSubject.create();
// Subject<String> subject = ReplaySubject.create();
// Subject<String> subject = AsyncSubject.create(); I will explain in the end
subject.subscribe(System.out::println);
int eventCounter = 0;
while (true) {
Thread.sleep(100);
subject.onNext("Event "+ (++eventCounter));
}
}
复制代码
Output: Event 1 Event 2 Event 3 Event 4 Event 5 Event 6 Event 7 Event 8 Event 9 Event 10
通常来讲若是你运行上面的代码,你将会看到输出中除了 AsyncSubject 的其余 Subjects 输出都是相同的,如今是时候来区别一下这些 Subjects 的类型了。 **1. Publish Subject: **在该类型 Subject 中,咱们能够获取实时的数据,例如个人一个 Publish Subject 是获取传感器数据,那么如今我订阅了该 Subject, 我将之获取最新的值,示例以下:
public static void main(String[] args) throws InterruptedException {
Subject<String> subject = PublishSubject.create();
int eventCounter = 0;
while (true) {
Thread.sleep(100);
subject.onNext("Event " + (++eventCounter));
if (eventCounter == 10)
subject.subscribe(System.out::println);
}
}
复制代码
Output: Event 11 Event 12 Event 13 Event 14 Event 15 Event 16
因此,在这里 publish subject 发布数据是从 0 开始,而在订阅的时候已经发布到了 10,正如你所见,输出的数据为 Event 11。
**2. Behaviour Subject: **在这种类型的 Subjects 中,咱们将获取这个 Subject 最后发布出的值和新的将要发出的值,为了简单起见,请阅读下面的代码。
public static void main(String[] args) throws InterruptedException {
Subject<String> subject = BehaviorSubject.create();
int eventCounter = 0;
while (true) {
Thread.sleep(100);
subject.onNext("Event " + (++eventCounter));
if (eventCounter == 10)
subject.subscribe(System.out::println);
}
}
复制代码
Output: Event 10 Event 11 Event 12 Event 13 Event 14 Event 15
正如输出中你所看到的那样,我也得到了 “ Event 10” 这个值,而且这个值在我订阅以前就已经发布了。这意味着若是我想要订阅以前的最后一个值的话,我可使用这个类型的 Subject。
**3. Replay Subject: **在这个类型的 Subject 中,当我订阅时能够没有顾及的得到全部发布的数据值,简单起见仍是直接上代码吧。
public static void main(String[] args) throws InterruptedException {
Subject<String> subject = ReplaySubject.create();
int eventCounter = 0;
while (true) {
Thread.sleep(100);
subject.onNext("Event " + (++eventCounter));
if (eventCounter == 10)
subject.subscribe(System.out::println);
}
}
复制代码
Output: Event 1 Event 2 Event 3 Event 4 Event 5 Event 6 Event 7 Event 8 Event 9 Event 10 Event 11 Event 12
如今我再次在 event 10 的时候订阅,可是我能够得到全部的历史数据,因此这很简单嘛。
**4. Async Subject: **在这个类型的 Subject 中,咱们将得到最后发布的数据值,这个数据值是 Subject 在完成和终止前发射的,为了简单起见,依旧是直接上代码吧。
public static void main(String[] args) throws InterruptedException {
Subject<String> subject = AsyncSubject.create();
subject.subscribe(System.out::println);
int eventCounter = 0;
while (true) {
Thread.sleep(100);
subject.onNext("Event " + (++eventCounter));
if (eventCounter == 10) {
subject.onComplete();
break;
}
}
}
复制代码
Output: Event 10 Process finished with exit code 0
在这里,你能够看到在值为 10 的时候以完成标识结束了 Subject 而且在程序完成后和程序退出以前,我获得了输出的 Event 10 ,因此这意味着它的意思是任什么时候候我想要经过 Subject 得到最后一次发布的的数据值可使用 Async Subject。
再次重复一下: Publish Subject: 我不关心以前的发布历史,我只关心新的或者最新的值。 Behaviour Subject: 我关心该 Subject 发布的最后一个值和新值。 Replay Subject: 我关心全部发布了新值的历史数据。 Async Subject: 我只关心在完成或终止以前由主题发出的最后一个值。
总结: 你好呀朋友,但愿你对这个知识点已经很清晰了,另外尽你最大的努力去动手实践这些概念,如今,我想要和各位说再见了,还有祝你们有个愉快的周末。 🙂
掘金翻译计划 是一个翻译优质互联网技术文章的社区,文章来源为 掘金 上的英文分享文章。内容覆盖 Android、iOS、前端、后端、区块链、产品、设计、人工智能等领域,想要查看更多优质译文请持续关注 掘金翻译计划、官方微博、知乎专栏。