RxJava系列二(基本概念及使用介绍)

转载请注明出处:https://zhuanlan.zhihu.com/p/20687307java


前言

上一篇的示例代码中你们必定发现了Observable这个类。从纯Java的观点看,Observable类源自于经典的观察者模式。RxJava的异步实现正是基于观察者模式来实现的,并且是一种扩展的观察者模式。异步

观察者模式

观察者模式基于Subject这个概念,Subject是一种特殊对象,又叫作主题或者被观察者。当它改变时那些由它保存的一系列对象将会获得通知,而这一系列对象被称做Observer(观察者)。它们会对外暴漏了一个通知方法(比方说update之类的),当Subject状态发生变化时会调用的这个方法。ide

观察者模式很适合下面这些场景中的任何一个:

  1. 当你的架构有两个实体类,一个依赖另外一个,你想让它们互不影响或者是独立复用它们时。

  2. 当一个变化的对象通知那些与它自身变化相关联的未知数量的对象时。

  3. 当一个变化的对象通知那些无需推断具体类型的对象时。

一般一个观察者模式的类图是这样的:

若是你对观察者模式不是很了解,那么强烈建议你先去学习下。关于观察者模式的详细介绍能够参考我以前的文章:设计模式之观察者模式

扩展的观察者模式

在RxJava中主要有4个角色:

  • Observable

  • Subject

  • Observer

  • Subscriber

Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。说直白点Observable对应于观察者模式中的被观察者,而ObserverSubscriber对应于观察者模式中的观察者Subscriber实际上是一个实现了Observer的抽象类,后面咱们分析源码的时候也会介绍到。Subject比较复杂,之后再分析。

上一篇文章中咱们说到RxJava中有个关键概念:事件。观察者Observer和被观察者Observable经过subscribe()方法实现订阅关系。从而Observable 能够在须要的时候发出事件来通知Observer

RxJava如何使用

我本身在学习一种新技术的时候一般喜欢先去了解它是怎么用的,掌握了使用方法后再去深挖其原理。那么咱们如今就来讲说RxJava到底该怎么用。

第一步:建立观察者Observer

Observer<Object> observer = new Observer<Object>() {

    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Object s) {

    }
 };

这么简单,一个观察者Observer建立了!

大兄弟你等等...,你以前那篇观察者模式中不是说观察者只提供一个update方法的吗?这特么怎么有三个?!!

少年勿急,且听我慢慢道来。在普通的观察者模式中观察者通常只会提供一个update()方法用于被观察者的状态发生变化时,用于提供给被观察者调用。而在RxJava中的观察者Observer提供了:onNext()onCompleted()onError()三个方法。还记得吗?开篇咱们讲过RxJava是基于一种扩展的观察这模式实现,这里多出的onCompleted和onError正是对观察者模式的扩展。ps:onNext就至关于普通观察者模式中的update

RxJava中添加了普通观察者模式缺失的三个功能:

  1. RxJava中规定当再也不有新的事件发出时,能够调用onCompleted()方法做为标示;

  2. 当事件处理出现异常时框架自动触发onError()方法;

  3. 同时Observables支持链式调用,从而避免了回调嵌套的问题。

第二步:建立被观察者Observable

Observable.create()方法能够建立一个Observable,使用crate()建立Observable须要一个OnSubscribe对象,这个对象继承Action1。当观察者订阅咱们的Observable时,它做为一个参数传入并执行call()函数。

Observable<Object> observable = Observable.create(new 
            Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {

    }
});

除了create(),just()和from()一样能够建立Observable。看看下面两个例子:

just(T...)将传入的参数依次发送

Observable observable = Observable.just("One", "Two", "Three");
//上面这行代码会依次调用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();

from(T[])/from(Iterable<? extends T>)将传入的数组或者Iterable拆分红Java对象依次发送

String[] parameters = {"One", "Two", "Three"};
Observable observable = Observable.from(parameters);
//上面这行代码会依次调用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();

第三步:被观察者Observable订阅观察者Observerps:你没看错,不一样于普通的观察者模式,这里是被观察者订阅观察者

有了观察者和被观察者,咱们就能够经过subscribe()来实现两者的订阅关系了。

observable.subscribe(observer);

observable.subscribe(observer)

连在一块儿写就是这样:

Observable.create(new Observable.OnSubscribe<Integer>() {

    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 5; i++) {
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }

}).subscribe(new Observer<Integer>() {

    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError");
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Item is " + item);
    }
});

至此一个完整的RxJava调用就完成了。

兄台,你叨逼叨叨逼叨的说了一大堆,但是我没搞定你特么到底在干啥啊?!!不急,我如今就来告诉大家到底发生了什么。

首先咱们使用Observable.create()建立了一个新的Observable<Integer>,并为create()方法传入了一个OnSubscribe,OnSubscribe中包含一个call()方法,一旦咱们调用subscribe()订阅后就会自动触发call()方法。call()方法中的参数Subscriber其实就是subscribe()方法中的观察者Observer。咱们在call()方法中调用了5次onNext()和1次onCompleted()方法。一套流程周下来之后输出结果就是下面这样的:

Item is 0
Item is 1
Item is 2
Item is 3
Item is 4
onCompleted

看到这里可能你又要说了,大兄弟你别唬我啊!OnSubscribe的call()方法中的参数Subscriber怎么就变成了subscribe()方法中的观察者Observer?!!!这俩儿货明明看起来就是两个不一样的类啊。

咱们先看看Subscriber这个类:

public abstract class Subscriber<T> implements Observer<T>, Subscription {
    
    ...
}

从源码中咱们能够看到,Subscriber是Observer的一个抽象实现类,因此我首先能够确定的是Subscriber和Observer类型是一致的。接着往下咱们看看subscribe()这个方法:

public final Subscription subscribe(final Observer<? super T> observer) {

    //这里的if判断对于咱们要分享的问题没有关联,能够先无视
    if (observer instanceof Subscriber) {
        return subscribe((Subscriber<? super T>)observer);
    }
    return subscribe(new Subscriber<T>() {

        @Override
        public void onCompleted() {
            observer.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onNext(T t) {
            observer.onNext(t);
        }

    });
}

咱们看到subscribe()方法内部首先将传进来的Observer作了一层代理,将它转换成了Subscriber。咱们再看看这个方法内部的subscribe()方法:

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

进一步往下追踪看看return后面这段代码到底作了什么。精简掉其余无关代码后的subscribe(subscriber, this)方法是这样的:

private static <T> Subscription subscribe(Subscriber<? super T> subscriber,                     Observable<T> observable) {

    subscriber.onStart();
    try {
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
        return Subscriptions.unsubscribed();
    }
}

咱们重点看看hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber),前面这个hook.onSubscribeStart(observable, observable.onSubscribe)返回的是它本身括号内的第二个参数observable.onSubscribe,而后调用了它的call方法。而这个observable.onSubscribe正是create()方法中的Subscriber,这样整个流程就理顺了。看到这里是否是对RxJava的执行流程清晰了一点呢?这里也建议你们在学习新技术的时候多去翻一翻源码,知其然还要能知其因此然不是吗。

subscribe()的参数除了能够是Observer和Subscriber之外还能够是Action一、Action0;这是一种更简单的回调,只有一个call(T)方法;因为太简单这里就不作详细介绍了!

异步

上一篇文章中开篇就讲到RxJava就是来处理异步任务的。可是默认状况下咱们在哪一个线程调用subscribe()就在哪一个线程生产事件,在哪一个线程生产事件就在哪一个线程消费事件。那怎么作到异步呢?RxJava为咱们提供Scheduler用来作线程调度,咱们来看看RxJava提供了哪些Scheduler。

Schedulers 做用
Schedulers.immediate() 默认的Scheduler,直接在当前线程运行
Schedulers.newThread() 老是开启一个新线程
Schedulers.io() 用于IO密集型任务,如异步阻塞IO操做,这个调度器的线程池会根据须要增加;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io()默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.computation() 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操做限制性能的操做,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待时间会浪费 CPU
Schedulers.from(executor) 使用指定的Executor做为调度器
Schedulers.trampoline() 当其它排队的任务完成后,在当前线程排队开始执行
AndroidSchedulers.mainThread() RxAndroid中新增的Scheduler,表示在Android的main线程中运行

同时RxJava还为咱们提供了subscribeOn()observeOn()两个方法来指定Observable和Observer运行的线程。

Observable.from(getCommunitiesFromServer())
            .flatMap(community -> Observable.from(community.houses))
            .filter(house -> house.price>=5000000).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::addHouseInformationToScreen);

上面这段代码你们应该有印象吧,没错正是咱们上一篇文章中的例子。subscribeOn(Schedulers.io())指定了获取小区列表、处理房源信息等一系列事件都是在IO线程中运行,observeOn(AndroidSchedulers.mainThread())指定了在屏幕上展现房源的操做在UI线程执行。这就作到了在子线程获取房源,主线程展现房源。

好了,RxJava系列的入门内容咱们就聊到这。下一篇咱们再继续介绍更多的API以及它们内部的原理。

若是你们喜欢这一系列的文章,欢迎关注个人知乎专栏和GitHub。

相关文章
相关标签/搜索