Rxjava入门指南(一):响应式编程与Rxjava入门

响应式编程介绍

先来介绍一下响应式编程:响应式编程是一种,基于事件驱动的方式,处理异步数据(事件)流的编程范式。java

实际上就是 观察者模式+数据流 + 事件控制数据库

 

1.什么算是数据流?编程

举个例子,在界面中点击登陆按钮发出登陆请求,这个事件就是一个数据流。网络

2.什么算是基于事件驱动?框架

打个比方,发出登陆请求至关于拧开水龙头(水龙头发生变化),而执行登陆请求就至关于接收水的水池(水池发生变化),若是水龙头有水流出时,水池能够接收水管中的水。异步

 

 

水池能够根据水龙头发出的请求而产生响应,即从对事件的发生和对事件的响应这个角度来编程。ide

3.怎样算是响应?异步编程

举个维基百科中的例子:若是咱们声明变量a = b + c,那么当b的值改变后,a的值是不会发生改变的,而响应式编程则是当b发生改变后,可让a响应从而发生变化。spa

和观察者模式同样,当一个对象改变状态,会通知其余订阅者。线程

4.那为何是异步的?由于拧开水龙头和水池接收水不是线性操做:

假若有水龙头A和B,水池正在接收水龙头A中的水,当水龙头B中的水流向水池时,不会阻塞水池接收水龙头A中的水,二者是不相关的。

一样,假如咱们有水池A和水池B,当水龙头的水流向水池A的同时,不会阻塞水龙头的水流向水池B

因此说,事件的发出(拧开水龙头)和事件的接收(水池接收水)这两个动做是异步的,互不相扰的,不会像同步操做那样产生阻塞。

 

Rxjava介绍(rxjava2)

rx全称为Reactive Extension,即响应式、可扩展,是一套基于事件的异步编程的API。

在Rxjava中,事件的发出对应:Observable(可观测的物),事件的接收对应:Observer(观察者),这个过程的链接对应着subscribe()方法(订阅)。

咱们来经过代码描述这个过程:

建立Observable对象(水龙头)

        //建立可观测的物Observable:水龙头
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) {
                //ObservableEmitter 事件发射器,能够发出的事件类型有:onNext、onError、onComplete
                emitter.onNext("水流1"); //发出事件
                emitter.onNext("水流2");
                emitter.onNext("水流3");
                emitter.onComplete(); //与onError互斥,只能发出二者中的一个
                emitter.onNext("水流4"); // 当发出onComplete或onError后,虽然能够继续发出事件,但不会再被接收到
            }
        });

 

建立Observer对象(水池)

        //建立观察者Observer:水池
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable disposable) {
          //这里获取到了Disposable对象,下面会介绍该对象 Log.d(TAG,
"subscribe:创建链接"); } @Override public void onNext(String water) { Log.d(TAG,"接收水流:" + water); } @Override public void onError(Throwable error) { Log.d(TAG,"报错啦"); } @Override public void onComplete() { Log.d(TAG,"通知结束啦"); } };

 

创建订阅关系(链接)

        //创建链接:水管
        observable.subscribe(observer);  

 

输出结果以下:

subscribe:创建链接
接收水流:水流1
接收水流:水流2
接收水流:水流3
通知结束啦

能够看到在创建链接后,依次接收到了水流一、二、3,而水流4没有被接收到。 

咱们能够把new Observer到Observable的subscirbe方法里,把整个过程连起来:

//建立可观测的物Observable:水龙头
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) {
                //ObservableEmitter 事件发射器,能够发出的事件类型有:onNext、onError、onComplete
                emitter.onNext("水流1"); //发出事件
                emitter.onNext("水流2");
                emitter.onNext("水流3");
                emitter.onComplete(); //与onError互斥,只能发出二者中的一个
                emitter.onNext("水流4"); // 当发出onComplete或onError后,会用Disposable对象的dispose()方法
// 虽然水龙头能够继续发出事件,但水池不会再被接收到
} }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable disposable) { //这里能够获取到Disposable对象 Log.d(TAG,"subscribe:创建链接"); } @Override public void onNext(String water) { Log.d(TAG,"接收水流:" + water); } @Override public void onError(Throwable error) { Log.d(TAG,"报错啦"); } @Override public void onComplete() { Log.d(TAG,"通知结束啦"); } });

Observable的subscirbe方法还提供了其余几种重载形式:

public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}

咱们能够传入Consumer对象,让观察者处理本身关心的事件类型

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) {
                //ObservableEmitter 事件发射器,能够发出的事件类型有:onNext、onError、onComplete
                emitter.onNext("水流1"); //发出事件
                emitter.onNext("水流2");
                emitter.onNext("水流3");
                emitter.onComplete(); //与onError互斥,只能发出二者中的一个
                emitter.onNext("水流4"); // 当发出onComplete或onError后,虽然能够继续发出事件,但不会再被接收到
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String str) {
                Log.d(TAG, "onNext: " + str);
            }
        });

如今这个方法的返回结果再也不是void而是返回了Disposable对象。

以前咱们在Observer的onSubscribe中就提到了该对象,在onSubscribe方法中能够获取到Disposable对象,因此不须要返回对象。

Disposable对象

  • dispose():解除订阅
  • isDisposed():查询是否解除了订阅

通常在activity或者fragment销毁时,咱们能够调用dispose方法来取消订阅,释放数据流中剩余的全部内部资源,防止内存泄漏。

Rxjava提供了CompositeDisposable容器管理多个Disposable对象,咱们能够在获取到Disposable对象后把它添加到容器中,而后在onDestroy方法中调用compositeDisposable.dispose()清除全部订阅:

//添加Disposable对象
    protected void addDisposable(Disposable disposable) {
        if (null == compositeDisposable || compositeDisposable.isDisposed()) {
            compositeDisposable = new CompositeDisposable();
        }
        compositeDisposable.add(disposable);
    }

    //清除全部订阅
    protected void clearDisposable() {
        compositeDisposable.clear();
    }

异步

咱们简单的实现了事件的发出和响应,可是这个过程还发生在同一个线程内,并非异步的。

rxjava提供了方便的线程调度方法用于切换线程:

//事件的发出在新线程newThread()
observable.subscribeOn(Schedulers.newThread())
       //事件的响应在主线程 .observeOn(AndroidSchedulers.mainThread())
       //创建链接 .subscribe(consumer对象);

以前的例子能够就成了这样:

//建立可观测的物Observable:水龙头
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) {
                //ObservableEmitter 事件发射器,能够发出的事件类型有:onNext、onError、onComplete
                emitter.onNext("水流1"); //发出事件
                emitter.onNext("水流2");
                emitter.onNext("水流3");
                emitter.onComplete(); //与onError互斥,只能发出二者中的一个
                emitter.onNext("水流4"); // 当发出onComplete或onError后,虽然能够继续发出事件,但不会再被接收到
            }
        }).subscribeOn(Schedulers.newThread())
                //事件的响应在主线程                                              
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() { //创建链接
                    @Override
                    public void accept(String str) {
                        Log.d(TAG, "onNext: " + str);
                    }
                });

这样就实现了异步,事件发出时所在的线程为RxNewThreadScheduler-1,而接收事件所在的线程为main。

RxJava内置的线程包括(这些线程之间的具体区别暂且不提,主要是讨论如何使用):

  • AndroidSchedulers.mainThread() 主线程
  • Schedulers.io() IO线程,线程名RxCachedThreadScheduler-1
  • Schedulers.newThread() 新线程
  • Schedulers.computation() 须要大量计算的线程

之后咱们就能够在IO线程中执行读写文件、读写数据库、网络操做等操做,在computation线程中执行计算操做。

如今咱们已经经过rxjava实现了简单的异步观察者模式,模拟出了事件响应。

应用举例:

 Retrofit网络请求框架提供了对Rxjava的支持,在IO线程中进行网络请求,在主线程中去处理结果。

 

Disposable disposable = retrofit.create(ILoginApi.class).login(loginDTO)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<ResultVO<LoginVO>>() {
                               @Override
                               public void accept(ResultVO<LoginVO> loginResultVO) throws Exception {
                                   //成功获取
                               }
                           },
                        new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {
                                //打印异常信息
                            }
                        });

 

参考:通俗易懂的rxjava2教程:https://www.jianshu.com/p/464fa025229e

若有纰漏,请大佬及时批评指出

相关文章
相关标签/搜索