- 原文地址:5 Not So Obvious Things About RxJava
- 原文做者:Jag Saund
- 译文出自:掘金翻译计划
- 译者: skyar2009
- 校对者:Danny1451, yunshuipiao
不管你是刚刚接触 RxJava,仍是已经使用过一段时间,关于 RxJava 你总会有些新的知识要学。在使用 RxJava 框架过程当中,我发现了 5 点不那么明显的知识,使我能够充分挖掘它的潜能。javascript
注释 本文引用的 APIs 是基于 RxJava 1.2.6html
map 和 flatMap 是经常使用的两个 ReactiveX 操做。它们每每是你最早接触的两个操做,而且很难肯定使用哪一个是正确的。前端
map 和 flatMap 都是对 Observable 发出的每个元素执行转换方法。可是,map 只输出一个元素,flatMap 输出 0 或多个元素。java
在上面的例子中,map
操做对每个字符串执行了 split
方法并输出了一个包含字符串数组的元素。当你想将一个元素转换成另外一个时使用 map
。react
有些时候,咱们执行的方法返回多个元素,而且咱们但愿将他们添加到同一个流中。这种状况下,flatMap
是一个好的选择。在上面的例子中 flatMap
操做将字符串数组处理后输出到了同一个序列。android
有些时候你须要将同步或异步的 API 转成响应式的 API。使用 Observable.create 看起来是个极具诱惑性的选择,但它有以下要求:ios
很难正确的实现以上要求,幸运的是,你能够不这么作。有一些静态工具方法能够帮你解决:git
syncOnSubscribegithub
一个能够建立安全 OnSubscribe<T>
的工具,它建立的 OnSubscribe<T>
可以正确地处理来自订阅者的背压请求。当你须要将一个同步获取式的阻塞 API 转成响应式 API 时可使用。数据库
public Observable<byte[]> readFile(@NonNull FileInputStream stream) {
final SyncOnSubscribe<FileInputStream, byte[]> fileReader = SyncOnSubscribe.createStateful(
() -> stream,
(stream, output) -> {
try {
final byte[] buffer = new byte[BUFFER_SIZE];
int count = stream.read(buffer);
if (count < 0) {
output.onCompleted();
} else {
output.onNext(buffer);
}
} catch (IOException error) {
output.onError(error);
}
return stream;
},
s -> IOUtil.closeSilently(s));
return Observable.create(fileReader);
}复制代码
fromCallable
一个静态工具,能够对简单的同步 API 进行封装并将之转化成响应式 API。更赞的是,fromCallable
也能够处理检查到的异常。
public Observable<Boolean> enablePushNotifications(boolean enable) {
return Observable.fromCallable(() -> sharedPrefs
.edit()
.putBoolean(KEY_PUSH_NOTIFICATIONS_PREFS, enable)
.commit());
}复制代码
fromEmitter
一个静态工具,对异步 API 进行封装并能够管理 Observable 被取消订阅时释放的资源。不像 fromCallable
,你能够输出多个元素。
import android.bluetooth.le.BluetoothLeScanner;
import android.bluetooth.le.ScanCallback;
import android.bluetooth.le.ScanResult;
import android.support.annotation.NonNull;
import rx.Emitter;
import rx.Observable;
import java.util.List;
public class RxBluetoothScanner {
public static class ScanResultException extends RuntimeException {
public ScanResultException(int errorCode) {
super("Bluetooth scan failed. Error code: " + errorCode);
}
}
private RxBluetoothScanner() {
}
@NonNull
public static Observable<ScanResult> scan(@NonNull final BluetoothLeScanner scanner) {
return Observable.fromEmitter(scanResultEmitter -> {
final ScanCallback scanCallback = new ScanCallback() {
@Override
public void onScanResult(int callbackType, @NonNull ScanResult result) {
scanResultEmitter.onNext(result);
}
@Override
public void onBatchScanResults(@NonNull List<ScanResult> results) {
for (ScanResult r : results) {
scanResultEmitter.onNext(r);
}
}
@Override
public void onScanFailed(int errorCode) {
scanResultEmitter.onError(new ScanResultException(errorCode));
}
};
scanResultEmitter.setCancellation(() -> scanner.stopScan(scanCallback));
scanner.startScan(scanCallback);
}, Emitter.BackpressureMode.BUFFER);
}
}复制代码
有时,Observable 产生事件过快以致于下游观察者跟不上它的速度。当这种状况发生时,你每每会遇到 MissingBackpressureException
异常。
RxJava 提供了一些方法管理背压,可是具体使用哪种须要视状况而定。
冷、热 Observable
只有当有订阅时,冷 Observable 才会发送元素。观察者订阅冷 Observable 能够控制发送事件的速度而不须要牺牲流的完整性。冷 Observable 例子有:读文件、数据库查询、网络请求以及静态迭代器转成的 Observable。
热 Observable 是连续的事件流,它的发出不依赖订阅者的数量。当一个观察者订阅了 Observable,那么它将面临下面的一种状况:
热 Observables 例子有:触摸事件、通知以及进度更新。
因为热 Observable 发出事件的本性,咱们不能控制它的速度。例如,你不能下降触摸事件发出的速度。所以,最好是使用 BackpressureMode
提供的流控制策略。
使用一个响应式获取方法,冷 Observable 能够根据观察者的反馈下降发送速度。更多知识,请看 ReactiveX 文档的背压与响应式获取方法.
BackpressureMode.NONE 和 BackpressureMode.ERROR
在这两种模式中,发送的事件不是背压。当被观察者的 16 元素缓冲区溢出时会抛出 MissingBackpressureException
。
BackpressureMode.BUFFER
在这种模式下,有一个无限的缓冲区(初始化时是 128)。过快发出的元素都会放到缓冲区中。若是缓冲区中的元素没法消耗,会持续的积累直到内存耗尽。结果是 OutOfMemoryException
异常。
BackpressureMode.DROP
这种模式是使用固定大小为 1 的缓冲区。若是下游观察者没法处理,第一个元素会缓存下来后续的会被丢弃。当消费者能够处理下一个元素时,它收到的将是 Observable 发出的第一个元素。
BackpressureMode.LATEST
这种模式与 BackpressureMode.DROP
相似,由于它也使用固定大小为 1 的缓冲区。然而,不是缓存第一个元素丢弃后续元素,BackpressureMode.LATEST
而是使用最新的元素替换缓冲区缓存的元素。当消费者能够处理下一个元素时,它收到的是 Observable 最近一次发送的元素。
RxJava 经过给 Observable 序列发送 onError
通知不可恢复的错误,而且会结束序列。
有时,你不但愿结束序列。对于这种状况,RxJava 提供了几种不会结束序列的错误处理方法。
RxJava 提供了许多错误处理方法,可是有时你不但愿结束序列。尤为是涉及到主题时。
onErrorResumeNext
使用 onErrorResumeNext 能够拦截 onError
并返回一个 Observable。或者对错误信息添加附加信息并返回一个新的错误,或者发送给 onNext
一个新的事件。
public Observable<SearchResult> search(@NotNull EditText searchView) {
return RxTextView.textChanges(searchView) // In production, share this text view observable, don't create a new one each time
.map(CharSequence::toString)
.debounce(500, TimeUnit.MILLISECONDS) // Avoid getting spammed with key stroke changes
.filter(s -> s.length() > 1) // Only interested in queries of length greater than 1
.observeOn(workerScheduler) // Next set of operations will be network so switch to an IO Scheduler (or worker)
.switchMap(query -> searchService.query(query)) // Take the latest observable from upstream and unsubscribe from any previous subscriptions
.onErrorResumeNext(Observable.empty()); // <-- This will terminate upstream (ie. we will stop receiving text view changes after an error!)
}复制代码
使用 onErrorResumeNext 捕获
使用该操做会修复下游序列,可是会结束上游序列由于已经发送了 onError
通知。因此,若是你链接的是一个发布通知的主题,onError
通知会结束主题。
若是你但愿上游继续运行,能够在 onErrorResumeNext
操做中嵌套 flatMap
或 switchMap
操做。
public Observable<SearchResult> search(@NotNull EditText searchView) {
return RxTextView.textChanges(searchView) // In production, share this text view observable, don't create a new one each time
.map(CharSequence::toString)
.debounce(500, TimeUnit.MILLISECONDS) // Avoid getting spammed with key stroke changes
.filter(s -> s.length() > 1) // Only interested in queries of length greater than 1
.observeOn(workerScheduler) // Next set of operations will be network so switch to an IO Scheduler (or worker)
.switchMap(query -> searchService.query(query) // Take the latest observable from upstream and unsubscribe from any previous subscriptions
.onErrorResumeNext(Observable.empty()); // <-- This fixes the problem since the error is not seen by the upstream observable
}复制代码
有时你须要将 Observable 的输出共享给多个观察者。RxJava 提供了 share
和 publish
两种方式实现 Observable 发送事件的多播。
Share
share
容许多个观察者链接到源 Observable。下面的例子中,共享的是 Observable 发送的 MotionEvent
事件。而后,咱们建立了另外两个 Observable 分别过滤 DOWN
和 UP
触摸事件。DOWN
事件咱们画红圈,UP
事件咱们画篮圈。
public void touchEventHandler(@NotNull View view) {
final Observable<MotionEvent> motionEventObservable = RxView.touches(view).share();
// Capture down events
final Observable<MotionEvent> downEventsObservable = motionEventObservable
.filter(event -> event.getAction() == MotionEvent.ACTION_DOWN);
// Capture up events
final Observable<MotionEvent> upEventsObservable = motionEventObservable
.filter(event -> event.getAction() == MotionEvent.ACTION_UP);
// Show a red circle at the position where the down event ocurred
subscriptions.add(downEventsObservable.subscribe(event ->
view.showCircle(event.getX(), event.getY(), Color.RED)));
// Show a blue circle at the position where the up event ocurred
subscriptions.add(upEventsObservable.subscribe(event ->
view.showCircle(event.getX(), event.getY(), Color.BLUE)));
}复制代码
然而,一旦有观察者订阅 Observable,Observable 就会开始发送事件。这样就会形成后续的订阅者会错过一个或多个触摸事件。
在这个例子中,“蓝” 观察者错过了第一个事件。有些时候这没问题,可是若是你不能接受错过任何事件,那么你须要使用 publish
操做。
Publish
对 Observable 执行 publish
操做会将值转化为 ConnectedObservable。就像打开阀门同样。下面的例子和上面同样,须要注意的是咱们如今使用的是 publish
操做。
public void touchEventHandler(@NotNull View view) {
final ConnectedObservable<MotionEvent> motionEventObservable = RxView.touches(view).publish();
// Capture down events
final Observable<MotionEvent> downEventsObservable = motionEventObservable
.filter(event -> event.getAction() == MotionEvent.ACTION_DOWN);
// Capture up events
final Observable<MotionEvent> upEventsObservable = motionEventObservable
.filter(event -> event.getAction() == MotionEvent.ACTION_UP);
// Show a red circle at the position where the down event ocurred
subscriptions.add(downEventsObservable.subscribe(event ->
view.showCircle(event.getX(), event.getY(), Color.RED)));
// Show a blue circle at the position where the up event ocurred
subscriptions.add(upEventsObservable.subscribe(event ->
view.showCircle(event.getX(), event.getY(), Color.BLUE)));
// Connect the source observable to begin emitting events
subscriptions.add(motionEventObservable.connect());
}复制代码
一旦必要的 Observables 订阅了源,你须要执行对源 ConnectedObservable 执行 connect
来开始发送事件。
注意,一旦对源调用了 connect
方法,相同事件序列会分别发送给 “绿” 和 “蓝” 观察者。
掘金翻译计划 是一个翻译优质互联网技术文章的社区,文章来源为 掘金 上的英文分享文章。内容覆盖 Android、iOS、React、前端、后端、产品、设计 等领域,想要查看更多优质译文请持续关注 掘金翻译计划。