在面向对象的架构中,开发者致力于建立一组解耦的实体。这样的话,实体就能够在不用妨碍整个系统的状况下能够被测试、复用和维护。设计这种系统就带来一个棘手的负面影响:维护相关对象之间的统一。java
在Smalltalk MVC架构中,建立模式的第一个例子就是用来解决这个问题的。用户界面框架提供一种途径使UI元素与包含数据的实体对象相分离,而且同时,它提供一种灵活的方法来保持它们之间的同步。编程
在这本畅销的四人组编写的《设计模式——可复用面向对象软件的基础》一书中,观察者模式是最有名的设计模式之一。它是一种行为模式并提供一种以一对多的依赖来绑定对象的方法:即当一个对象发生变化时,依赖它的全部对象都会被通知而且会自动更新。设计模式
在本章中,咱们将会对观察者模式有一个概述,它是如何实现的以及如何用RxJava来扩展,Observable是什么,以及Observables如何与Iterables相关联。数组
观察者模式很适合下面这些场景中的任何一个:缓存
在RxJava的世界里,咱们有四种角色:架构
Observables和Subjects是两个“生产”实体,Observers和Subscribers是两个“消费”实体。并发
当咱们异步执行一些复杂的事情,Java提供了传统的类,例如Thread、Future、FutureTask、CompletableFuture来处理这些问题。当复杂度提高,这些方案就会变得麻烦和难以维护。最糟糕的是,它们都不支持链式调用。app
RxJava Observables被设计用来解决这些问题。它们灵活,且易于使用,也能够链式调用,而且能够做用于单个结果程序上,更有甚者,也能够做用于序列上。不管什么时候你想发射单个标量值,或者一连串值,甚至是无穷个数值流,你均可以使用Observable。框架
Observable的生命周期包含了三种可能的易于与Iterable生命周期事件相比较的事件,下表展现了如何将Observable async/push 与 Iterable sync/pull相关联起来。异步
Event | Iterable(pull) | Observable(push) |
---|---|---|
检索数据 | T next() |
onNext(T) |
发现错误 | throws Exception |
onError(Throwable) |
完成 | !hasNext() |
onCompleted() |
使用Iterable时,消费者从生产者那里以同步的方式获得值,在这些值获得以前线程处于阻塞状态。相反,使用Observable时,生产者以异步的方式把值推给观察者,不管什么时候,这些值都是可用的。这种方法之因此更灵活是由于即使值是同步或异步方式到达,消费者在这两种场景均可以根据本身的须要来处理。
为了更好地复用Iterable接口,RxJava Observable类扩展了GOF观察者模式的语义。引入了两个新的接口:
从发射物的角度来看,有两种不一样的Observables:热的和冷的。一个"热"的Observable典型的只要一建立完就开始发射数据,所以全部后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有一些数据错过了)。一个"冷"的Observable会一直等待,直到有观察者订阅它才开始发射数据,所以这个观察者能够确保会收到整个数据序列。
在接下来的小节中将讨论Observables提供的两种建立Observable的方法。
create()方法使开发者有能力从头开始建立一个Observable。它须要一个OnSubscribe对象,这个对象继承Action1,当观察者订阅咱们的Observable时,它做为一个参数传入并执行call()函数。
Observable.create(new Observable.OnSubscribe<Object>(){
@Override
public void call(Subscriber<? super Object> subscriber) {
}
});
复制代码
Observable经过使用subscriber变量并根据条件调用它的方法来和观察者通讯。让咱们看一个“现实世界”的例子:
Observable<Integer> observableString = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
for (int i = 0; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
});
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
复制代码
例子故意写的简单,是由于即使是你第一次见到RxJava的操做,我想让你明白接下来要发生什么。
咱们建立一个新的Observable<Integer>
,它执行了5个元素的for循环,一个接一个的发射他们,最后完成。
另外一方面,咱们订阅了Observable,返回一个Subscription 。一旦咱们订阅了,咱们就开始接受整数,并一个接一个的打印出它们。咱们并不知道要接受多少整数。事实上,咱们也无需知道是由于咱们为每种场景都提供对应的处理操做:
在上一个例子中,咱们建立了一个整数序列并一个一个的发射它们。假如咱们已经有一个列表呢?咱们是否是能够不用for循环而也能够一个接一个的发射它们呢?
在下面的例子代码中,咱们从一个已有的列表中建立一个Observable序列:
List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(10);
items.add(100);
items.add(200);
Observable<Integer> observableString = Observable.from(items);
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
复制代码
输出的结果和上面的例子绝对是同样的。
from()
建立符能够从一个列表/数组来建立Observable,并一个接一个的从列表/数组中发射出来每个对象,或者也能够从Java Future
类来建立Observable,并发射Future对象的.get()
方法返回的结果值。传入Future
做为参数时,咱们能够指定一个超时的值。Observable将等待来自Future
的结果;若是在超时以前仍然没有结果返回,Observable将会触发onError()
方法通知观察者有错误发生了。
若是咱们已经有了一个传统的Java函数,咱们想把它转变为一个Observable又改怎么办呢?咱们能够用create()
方法,正如咱们先前看到的,或者咱们也能够像下面那样使用以此来省去许多模板代码:
Observable<String> observableString = Observable.just(helloWorld());
Subscription subscriptionPrint = observableString.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
复制代码
helloWorld()
方法比较简单,像这样:
private String helloWorld(){
return "Hello World";
}
复制代码
无论怎样,它能够是咱们想要的任何函数。在刚才的例子中,咱们一旦建立了Observable,just()
执行函数,当咱们订阅Observable时,它就会发射出返回的值。
just()
方法能够传入一到九个参数,它们会按照传入的参数的顺序来发射它们。just()
方法也能够接受列表或数组,就像from()
方法,可是它不会迭代列表发射每一个值,它将会发射整个列表。一般,当咱们想发射一组已经定义好的值时会用到它。可是若是咱们的函数不是时变性的,咱们能够用just来建立一个更有组织性和可测性的代码库。
最后注意just()
建立符,它发射出值后,Observable正常结束,在上面那个例子中,咱们会在控制台打印出两条信息:“Hello World”和“Observable completed”。
当咱们须要一个Observable毫无理由的再也不发射数据正常结束时,咱们可使用empty()
。咱们可使用never()
建立一个不发射数据而且也永远不会结束的Observable。咱们也可使用throw()
建立一个不发射数据而且以错误结束的Observable。
subject
是一个神奇的对象,它能够是一个Observable同时也能够是一个Observer:它做为链接这两个世界的一座桥梁。一个Subject能够订阅一个Observable,就像一个观察者,而且它能够发射新的数据,或者传递它接受到的数据,就像一个Observable。很明显,做为一个Observable,观察者们或者其它Subject均可以订阅它。
一旦Subject订阅了Observable,它将会触发Observable开始发射。若是原始的Observable是“冷”的,这将会对订阅一个“热”的Observable变量产生影响。
RxJava提供四种不一样的Subject:
Publish是Subject的一个基础子类。让咱们看看用PublishSubject实现传统的Observable Hello World
:
PublishSubject<String> stringPublishSubject = PublishSubject.create();
Subscription subscriptionPrint = stringPublishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no!Something wrong happened!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
stringPublishSubject.onNext("Hello World");
复制代码
在刚才的例子中,咱们建立了一个PublishSubject
,用create()
方法发射一个String
值,而后咱们订阅了PublishSubject。此时,没有数据要发送,所以咱们的观察者只能等待,没有阻塞线程,也没有消耗资源。就在这随时准备从subject接收值,若是subject没有发射值那么咱们的观察者就会一直在等待。再次声明的是,无需担忧:观察者知道在每一个场景中该作什么,咱们不用担忧何时是由于它是响应式的:系统会响应。咱们并不关心它何时响应。咱们只关心它响应时该作什么。
最后一行代码展现了手动发射字符串“Hello World”,它触发了观察者的onNext()
方法,让咱们在控制台打印出“Hello World”信息。
让咱们看一个更复杂的例子。话说咱们有一个private
声明的Observable,外部不能访问。Observable在它生命周期内发射值,咱们不用关心这些值,咱们只关心他们的结束。
首先,咱们建立一个新的PublishSubject来响应它的onNext()
方法,而且外部也能够访问它。
final PublishSubject<Boolean> subject = PublishSubject.create();
subject.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Boolean aBoolean) {
System.out.println("Observable Completed");
}
});
复制代码
而后,咱们建立“私有”的Observable,只有subject才能够访问的到。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).doOnCompleted(new Action0() {
@Override
public void call() {
subject.onNext(true);
}
}).subscribe();
复制代码
Observable.create()
方法包含了咱们熟悉的for循环,发射数字。doOnCompleted()
方法指定当Observable结束时要作什么事情:在subject上发射true。最后,咱们订阅了Observable。很明显,空的subscribe()
调用仅仅是为了开启Observable,而不用管已发出的任何值,也不用管完成事件或者错误事件。为了这个例子咱们须要它像这样。
在这个例子中,咱们建立了一个能够链接Observables而且同时可被观测的实体。当咱们想为公共资源建立独立、抽象或更易观测的点时,这是极其有用的。
简单的说,BehaviorSubject会首先向他的订阅者发送截至订阅前最新的一个数据对象(或初始值),而后正常发送订阅后的数据流。
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);
复制代码
在这个短例子中,咱们建立了一个能发射整形(Integer)的BehaviorSubject。因为每当Observes订阅它时就会发射最新的数据,因此它须要一个初始值。
ReplaySubject会缓存它所订阅的全部数据,向任意一个订阅它的观察者重发:
ReplaySubject<Integer> replaySubject = ReplaySubject.create();
复制代码
当Observable完成时AsyncSubject只会发布最后一个数据给已经订阅的每个观察者。
AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
复制代码
本章中,咱们了解到了什么是观察者模式,为何Observables在今天的编程场景中如此重要,以及如何建立Observables和subjects。
下一章中,咱们将建立第一个基于RxJava的Android应用程序,学习如何检索数据来填充listview,以及探索如何建立一个基于RxJava的响应式UI。