Android的三方库 - RxJava:RxJava的使用和基本订阅流程

一:什么是RxJava?

GitHub关于RxJava的介绍:编程

a library for composing asynchronous and event-based programs by using observable sequences安全

他的意思就是 一个经过可观测的序列来组成异步和基于事件的库。bash

RxJava的出现消除同步问题、线程安全等问题app

总的来讲就是方便咱们异步编程。异步

二:RxJava的优势和缺点

优势

异步async

链式调用结构ide

使用复杂的异步调用方式的时候依旧能够保持简洁异步编程

缺点

学习成本比较高,入门的门槛比较高学习

难以理解的API,须要查看源码才能理解API的具体效果ui

三:RxJava的基础使用

首先明白他的基础使用步骤:

  1. 建立被观察者(Observable)
  2. 建立观察者(Observer)
  3. 订阅(subscribe)

1.建立被观察者

正常建立被观察者:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("ONE");
                emitter.onNext("TWO");
                emitter.onNext("THREE");
                emitter.onComplete();
            }
        });
复制代码

在这里面一共产生了四个事件:One、Two、Three、结束。

PS:

非正常建立第一弹:

Observable observable = Observable.just("ONE","TWO","THREE");

非正常建立第二弹:

String[] values = {"ONE", "TWO", "THREE"};
Observable observable = Observable.fromArray(values);
复制代码

其实这样的非正常建立是内部将这些信息包装成onNext()这样的事件发送给观察者。

2.建立观察者

正常建立:

Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("z", "onSubscribe: ");
            }

            @Override
            public void onNext(String s) {
                Log.i("z", "onNext: s = " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i("z", "onError: ");
            }

            @Override
            public void onComplete() {
                Log.i("z", "onComplete: ");
            }
        };
复制代码

非正常建立:

Consumer<String> observer = new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i("z", "accept: s = " + s);
            }
        };
        
复制代码

3.订阅

observable.subscribe(observer);

你已经注意到不同的地方,为何被观察者订阅了观察者?

之因此会这样,是由于RxJava为了保持链式调用的流畅性。

4. 异步调用

RxJava既然是异步库,固然对于异步的处理会更好

在咱们看RxJava的异步调用以前,咱们先来学习下其中比较重要的两个点

  • subscribeOn()
  • observeOn()
subscribeOn

这个表示Observable在一个指定的环境下建立,只能使用一次,屡次建立的话会以第一次为准。

observeOn

表示 事件传递和 最终处理发生在哪一个环境下,能够屡次调用,每次指定以后,下一步就生效。

好比:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("ONE");
                emitter.onNext("TWO");
                emitter.onNext("THREE");
                emitter.onComplete();
            }
        }) // 被观察者在一个新的线程中建立
        .subscribeOn(Schedulers.newThread()) 
                // 下面这个操做是在io线程中
                .observeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        return s.toLowerCase();
                    }
                })
                  // 切换,观察者是在主线程中
                .observeOn(AndroidSchedulers.mainThread()) 
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        
                    }
                });

复制代码

四:RxJava的基础订阅流程

先看一下基础的调用方式:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.i(TAG, "subscribe: ");
                emitter.onNext("ONE");
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe: ");
            }


            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext: s = " + s);
            }


            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError: e = " + e.getMessage());
            }


            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete: ");
            }
        });
    }
});
复制代码

结果:

onSubscribe:
subscribe:
onNext: s = ONE
复制代码

咱们先从订阅开始看,也就是subscribe方法

public final void subscribe(Observer<? super T> observer) {
        ... // 忽略部分源码
        subscribeActual(observer);
        ... // 忽略部分源码
}
复制代码

直接找到主要的方法subscribeActual(observer),这个是抽象的方法,会被实如今子类中。

因此咱们接着看看Observable的子类实现:

咱们进入到create方法中:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
复制代码

其实 返回的就是 ObservableCreate 的对象,

须要注意的是: ObservableCreateObservable的一个子类 ObservableCreate 被建立都会传入一个source的字段,这个source就是 ObservableOnSubscribe

ObservableCreate 具体实现了 subscribeActual 方法

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    // 在这里触发 observer#onSubscribe()
    observer.onSubscribe(parent);

    try {
        // 在这里回调
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
复制代码

在这里方法能够看到 观察者observeronSubscribe 会先于回调发生。

而后调用 ObservableOnSubscribe 的方法 subscribe

具体的事件后由开发者去作, 能够看到在案例中调用了 CreateEmitter,能够进入到 CreateEmitter 看看onNext() 的实现

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}
复制代码

能够看到 在CreateEmitteronNext() 中调用了 观察者observeronNext() 方法.

而后能够看到案例中的调用:

@Override
public void onNext(String s) {
   Log.i(TAG, "onNext: s = " + s);
 }
复制代码
相关文章
相关标签/搜索