RxJava 2.0已经于2016年10月29日正式发布,本人也专门抽时间研究了一下其相关特性。趁热打铁,在这篇文章里对RxJava2.0的使用进行一个简单的总结。java
阅读本文前须要掌握RxJava 1.0的基本概念,若是从未接触过RxJava, 请点击这里react
1. RxJava 2.0 再也不支持 null 值,若是传入一个null会抛出 NullPointerException;安全
Observable.just(null); Single.just(null); Flowable.just(null); Maybe.just(null); Observable.fromCallable(() -> null) .subscribe(System.out::println, Throwable::printStackTrace); Observable.just(1).map(v -> null) .subscribe(System.out::println, Throwable::printStackTrace);
2. RxJava 2.0 全部的函数接口(Function/Action/Consumer)均设计为可抛出Exception,解决编译异常须要转换问题;架构
3. RxJava 1.0 中Observable不能很好支持背压,在RxJava2.0 中将Oberservable完全实现成不支持背压,而新增Flowable 来支持背压。(关于背压的概念请参考本人对ReativeX的英文原文的中文翻译)app
RxJava 1.0有四个基本概念:Observable(可观察者,即被观察者)、Observer(观察者)、subscribe(订阅)、事件。Observable和 Observer经过 subscribe()方法实现订阅关系,从而 Observable能够在须要的时候发出事件来通知 Observer。ide
基于以上的概念, RxJava 1.0的基本实现主要有三点:函数
step1: 建立 Observerpost
Observer 即观察者,它决定事件触发的时候将有怎样的行为。 RxJava 中的 Observer 接口的实现方式:测试
Observer<String> observer = new Observer<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error!"); } };
除了Observer接口以外,RxJava 还内置了一个实现了Observer的抽象类: Subscriber。Subscriber对 Observer接口进行了一些扩展,但他们的基本使用方式是彻底同样的:ui
Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error!"); } };
step2:建立 Observable
Observable 即被观察者,它决定何时触发事件以及触发怎样的事件。 RxJava 使用 create() 方法来建立一个 Observable ,并为它定义事件触发规则:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Hi"); subscriber.onNext("Aloha"); subscriber.onCompleted(); } });
step3:Subscribe (订阅)
建立了 Observable和 Observer以后,再用 subscrbe() 方法将它们联结起来,整条链子就能够工做了。代码形式很简单:
observable.subscribe(observer); // 或者: observable.subscribe(subscriber);
然而,在2.0中咱们熟悉的 Subscrber 竟然没影了,取而代之的是 ObservableEmitter, 俗称发射器。此外,因为没有了 Subscrber 的踪迹,咱们建立观察者时需使用 Observer。而 Observer 也不是咱们熟悉的那个 Observer,其回调的 Disposable 参数更是让人摸不到头脑。
step1:初始化一个Observable
Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onComplete(); } });
step2:初始化一个Observer
Observer<Integer> observer= new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }
step3:创建订阅关系
observable.subscribe(observer); //创建订阅关系
不难看出,与 RxJava1.0 仍是存在着一些区别的。首先,建立Observable时,回调的是ObservableEmitter,字面意思即发射器,用于发射数据(onNext())和通知(onError()/onComplete())。其次,建立的Observer中多了一个回调方法 onSubscribe(),传递参数为Disposable。
ObservableEmitter: Emitter是发射器的意思,那就很好猜了,这个就是用来发出事件的,它能够发出三种类型的事件,经过调用emitter的 onNext(T value) 、onComplete()和onError(Throwable e)就能够分别发出next事件、complete事件和error事件
Disposable:这个单词的字面意思是一次性用品,用完便可丢弃的。 那么在RxJava中怎么去理解它呢, 对应于上面的水管的例子, 咱们能够把它理解成两根管道之间的一个机关, 当调用它的 dispose() 方法时, 它就会将两根管道切断, 从而致使下游收不到事件,即至关于 Subsciption。
注意: 调用dispose()并不会致使上游再也不继续发送事件, 上游会继续发送剩余的事件.
来看个例子, 咱们让上游依次发送 1,2,complete,4,在下游收到第二个事件以后, 切断水管, 看看运行结果:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emit 1"); emitter.onNext(1); Log.d(TAG, "emit 2"); emitter.onNext(2); Log.d(TAG, "emit 3"); emitter.onNext(3); Log.d(TAG, "emit complete"); emitter.onComplete(); Log.d(TAG, "emit 4"); emitter.onNext(4); } }).subscribe(new Observer<Integer>() { private Disposable mDisposable; private int i; @Override public void onSubscribe(Disposable d) { Log.d(TAG, "subscribe"); mDisposable = d; } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: " + value); i++; if (i == 2) { Log.d(TAG, "dispose"); mDisposable.dispose(); Log.d(TAG, "isDisposed : " + mDisposable.isDisposed()); } } @Override public void onError(Throwable e) { Log.d(TAG, "error"); } @Override public void onComplete() { Log.d(TAG, "complete"); } });
运行结果为:
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: subscribe 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 1 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 1 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 2 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 2 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: dispose 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: isDisposed : true 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 3 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit complete 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 4
从运行结果咱们看到, 在收到onNext 2这个事件后, 切断了水管, 可是上游仍然发送了3, complete, 4这几个事件, 并且上游并无由于发送了onComplete而中止。 同时能够看到下游的 onSubscibe()方法是最早调用的.
Disposable的用处不止这些, 后面讲解到了线程的调度以后, 咱们会发现它的重要性. 随着后续深刻的讲解, 咱们会在更多的地方发现它的身影.
此外,RxJava2.x中仍然保留了其余简化订阅方法,咱们能够根据需求,选择相应的简化订阅。只不过传入的对象改成了 Consumer。`
Disposable disposable = observable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //这里接收数据项 } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { //这里接收onError } }, new Action() { @Override public void run() throws Exception { //这里接收onComplete。 } });
不一样于RxJava 1.0,RxJava 2.0中没有了一系列的Action/Func接口,取而代之的是与Java8命名相似的函数式接口,以下图:
其中Action相似于RxJava 1.0中的Action0,区别在于Action容许抛出异常。
public interface Action { /** * Runs the action and optionally throws a checked exception * @throws Exception if the implementation wishes to throw a checked exception */ void run() throws Exception; }
而Consumer即消费者,用于接收单个值, BigConsumer则是接收两个值, Function用于变换对象, Predicate用于判断。这些接口命名大多参照了Java8,熟悉Java8新特性的应该都知道意思,这里也就再也不赘述了。
关于线程切换这点,RxJava1.x和RxJava2.x的实现思路是同样的。这里就简单看下相关源码。
同RxJava1.x同样, subscribeOn 用于指定 subscribe() 时所发生的线程,从源码角度能够看出,内部线程调度是经过 ObservableSubscribeOn 来实现的。
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
ObservableSubscribeOn 的核心源码在 subscribeActual方法中,经过代理的方式使用SubscribeOnObserver 包装Observer后,设置 Disposable 来将 subscribe 切换到 Scheduler 线程中
@Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); //回调Disposable parent.setDisposable(scheduler.scheduleDirect(new Runnable() { //设置`Disposable` @Override public void run() { source.subscribe(parent); //使Observable的subscribe发生在Scheduler线程中 } })); }
observeOn 方法用于指定下游Observer回调发生的线程。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { //.. //验证安全 return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
主要实如今 ObservableObserveOn 中的subscribeActual, 能够看出,不一样于subscribeOn, 没有将subscribe 操做所有切换到Scheduler中,而是经过ObserveOnSubscriber 与 Scheduler配合,经过schedule()达到切换下游Observer回调发生的线程,这一点与RxJava 1.0实现几乎相同。关于ObserveOnSubscriber 的源码这里再也不重复描述了。
@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnSubscriber<T>(observer, w, delayError, bufferSize)); } }
Flowable是RxJava 2.0中新增的类,专门用于应对背压(Backpressure)问题,但这并非RxJava 2.0中新引入的概念。所谓背压,即生产者的速度大于消费者的速度带来的问题,好比在Android中常见的点击事件,点击过快则会形成点击两次的效果。
咱们知道,在RxJava 1.0中背压控制是由Observable完成的,使用以下:
Observable.range(1,10000) .onBackpressureDrop() .subscribe(integer -> Log.d("JG",integer.toString()));
而在RxJava 2.0中将其独立了出来,取名为Flowable。所以,原先的Observable已经不具有背压处理能力。
经过 Flowable, 咱们能够自定义背压处理策略。
/** * Represents the options for applying backpressure to a source sequence. */ public enum BackpressureStrategy { /** * OnNext events are written without any buffering or dropping. * Downstream has to deal with any overflow. * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators. */ MISSING, /** * Signals a MissingBackpressureException in case the downstream can't keep up. */ ERROR, /** * Buffers <em>all</em> onNext values until the downstream consumes it. */ BUFFER, /** * Drops the most recent onNext value if the downstream can't keep up. */ DROP, /** * Keeps only the latest onNext value, overwriting any previous value if the * downstream can't keep up. */ LATEST }
测试Flowable例子以下:
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> e) throws Exception { for(int i=0;i<10000;i++){ e.onNext(i); } e.onComplete(); } }, FlowableEmitter.BackpressureStrategy.ERROR) //指定背压处理策略,抛出异常 .subscribeOn(Schedulers.computation()) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d("JG", integer.toString()); Thread.sleep(1000); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d("JG",throwable.toString()); } });
或者可使用相似RxJava 1.0的方式来控制。
Flowable.range(1,10000) .onBackpressureDrop() .subscribe(integer -> Log.d("JG",integer.toString()));
其中还须要注意的一点在于,Flowable并非订阅就开始发送数据,而是需等到执行Subscription.request()才能开始发送数据。固然,使用简化subscribe订阅方法会默认指定Long.MAX_VALUE。手动指定的例子以下:
Flowable.range(1,10).subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE);//设置请求数 } @Override public void onNext(Integer integer) { } @Override public void onError(Throwable t) { } @Override public void onComplete() { } });
不一样于RxJava 1.0中的 SingleSubscriber,RxJava 2.0中的 SingleObserver多了一个回调方法 onSubscribe。
interface SingleObserver<T> { void onSubscribe(Disposable d); void onSuccess(T value); void onError(Throwable error); }
同Single,Completable也被从新设计为Reactive-Streams架构,RxJava 1.0 的 CompletableSubscriber改成 CompletableObserver,源码以下:
interface CompletableObserver<T> { void onSubscribe(Disposable d); void onComplete(); void onError(Throwable error); }
Processor 和 Subject 的做用是相同的。关于Subject部分,RxJava 1.0与RxJava 2.0在用法上没有显著区别,这里就不介绍了。其中Processor是RxJava 2.0新增的,继承自 Flowable, 因此支持背压控制。而Subject则不支持背压控制。使用以下:
//Subject AsyncSubject<String> subject = AsyncSubject.create(); subject.subscribe(o -> Log.d("JG",o));//three subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); subject.onComplete(); //Processor AsyncProcessor<String> processor = AsyncProcessor.create(); processor.subscribe(o -> Log.d("JG",o)); //three processor.onNext("one"); processor.onNext("two"); processor.onNext("three"); processor.onComplete();
关于操做符,RxJava 1.0与RxJava 2.0在命名和行为上大多数保持了一致,须要强调的是subscribeWith操做符和compose操做符。
RxJava 2.0中,subscribe 操做再也不返回 Subscription 也就是现在的 Disposable,为了保持向后的兼容, Flowable 提供了subscribeWith方法返回当前的观察者Subscriber对象, 而且同时提供了DefaultSubsriber, ResourceSubscriber, DisposableSubscriber接口,让他们提供 Disposable对象, 从而能够管理其生命周期。
RxJava 1.0用法:
private static <T> Observable.Transformer<T, T> createIOSchedulers() { return new Observable.Transformer<T, T>() { @Override public Observable<T> call(Observable<T> tObservable) { return tObservable.subscribeOn(Schedulers.io()) .unsubscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()); } }; } public static <T> Observable.Transformer<JsonResult<T>,T> applySchedulers() { return createIOSchedulers(); }
Action1<Integer> onNext = null; String[] items = { "item1", "item2", "item3" }; Subscription subscription = Observable.from(items) .compose(RxUtil.<String>applySchedulers()) .map(new Func1<String, Integer>() { @Override public Integer call(String s) { return Integer.valueOf(s); } }) .subscribe(onNext);
RxJava 2.0用法:
public static <T> ObservableTransformer<T, T> io2MainObservable() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> upstream) { return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); } }; } public static <T> ObservableTransformer<T, T> applySchedulers() { return io2MainObservable(); }
Consumer<Integer> onNext = null; String[] items = { "item1", "item2", "item3" }; Disposable disposable = Observable.fromArray(items) .compose(RxUtil.<String>applySchedulers()) .map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return Integer.valueOf(s); } }) .subscribe(onNext);
能够注意到,RxJava 1.0中实现的是rx.Observable.Transformer接口, 该接口继承自Func1<Observable<T>, Observable<R>>, 而2.0继承自io.reactivex.ObservableTansformer<Upstream, Downstream>, 是一个独立的接口。
除此以外,RxJava 2.0还提供了 FlowableTransformer接口,用于Flowable下的compose操做符,使用以下:
public static <T> FlowableTransformer<T, T> io2MainFlowable() { return new FlowableTransformer<T, T>() { @Override public Publisher<T> apply(Flowable<T> upstream) { return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); } }; } public static <T> FlowableTransformer<T, T> applySchedulers() { return io2MainFlowable(); }
Consumer<Integer> onNext = null; Disposable disposable = Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> e) throws Exception { for(int i=0;i<10000;i++){ e.onNext(i); } e.onComplete(); } }, FlowableEmitter.BackpressureStrategy.ERROR) //指定背压处理策略,抛出异常 .compose(RxUtil.<String>applySchedulers()) .subscribe(onNext);