Android 入门(十三)Rxjava

要求:会使用 rxjava 进行平常开发,复杂功能能够经过搜索和查阅官方文档解决便可java

基础用法

想要用 RxJava 必需要在 build.gradle 内加入依赖react

implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
复制代码

使用步骤就是:1. 建立被观察者,2. 建立观察者,3. 订阅android

// 建立被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        Log.d(TAG, "subscribe: ");
        emitter.onNext("1");
        emitter.onNext("2");
        emitter.onNext("3");
        emitter.onComplete();
    }
})
复制代码

ObservableEmitter emitter 对象是发射器的意思,有三种发射的方法 void onNext(T value)、void onError(Throwable error)、void onComplete(),onNext 方法能够无限调用,Observer(观察者)全部的都能接收到,onError和onComplete是互斥的,Observer(观察者)只能接收到一个,OnComplete 能够重复调用,可是Observer(观察者)只会接收一次,而 onError 不能够重复调用,第二次调用就会报异常。数据库

// 建立观察者
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "onSubscribe: ");
    }
    @Override
    public void onNext(String s) {
        Log.d(TAG, "onNext: " + s);
    }
    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "onError: ");
    }
    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete: ");
    }
};
复制代码

onNext、onError、onComplete 都是跟被观察者发射的方法一一对应的,这里就至关于接收了。须要特别说明的是 onSubscribe 方法中的 Disposable 参数,它只有两个方法 dispose() 和 isDisposed(),前者是取消对 Observable(被观察者)的订阅,后面很明显就是查看订阅的状态。编程

// 订阅
observable.subscribe(observer);
复制代码

为了保证链式编程,这里的逻辑好像是 Observable 订阅了 Observer。其实我认为还有其余的缘由,熟悉「观察者模式」的应该知道「具体被观察者」会持有一个容器用来存储观察者,这样才能实现数据更新以后通知全部的观察者。bash

异步

若是认为 RxJava 只是实现了一个「观察者模式」那就大错特错,其实 RxJava 主要是给咱们提供了一个异步编程的工具。网络

在介绍异步以前,咱们先看一下 Observable.just() 方法,这个方法最多能够接受 10 个参数,而且对这些参数依次调用 onNext 方法,执行完 onNext 以后还会调用 onComplete。app

public static <T> Observable<T> just(T item1, T item2)
复制代码

其实 RxJava 的异步使用也很简单,在订阅以前给 Observable 加上 subscribeOn(Scheduler scheduler) 和 observeOn(Scheduler scheduler) 描述,subscribeOn() 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程,或者叫作事件产生的线程。observeOn() 指定 Observer 所运行的线程,或者叫事件消费的线程。异步

  • Schedulers.immediate(): 直接在当前线程运行,至关于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread(): 老是启用新线程,并在新线程执行操做。
  • Schedulers.io(): I/O 操做(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差很少,区别在于 io() 的内部实现是是用一个无数量上限的线程池,能够重用空闲的线程,所以多数状况下 io() 比 newThread() 更有效率。不要把计算工做放在 io() 中,能够避免建立没必要要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操做限制性能的操做,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待时间会浪费 CPU。
  • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操做将在 Android 主线程运行。

因此简单的使用以下:ide

Observable.just(11, 22, 33)
        .subscribeOn(Schedulers.io())   // 指定 subscribe() 发生在 IO 线程
        .observeOn(AndroidSchedulers.mainThread())  // 指定 Observer 的回调发生在主线程
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }
            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
复制代码

因为 subscribeOn(Schedulers.io()) 的指定,被建立的事件内容 11,22,33 将会在 IO 线程发出。而因为 observeOn(AndroidScheculers.mainThread()) 的指定,所以 Observer 数字的打印将发生在主线程 。

这种策略很是适合大多数的「后台线程处理数据,主线程显示结果」的状况。

变换

所谓变换,就是将事件序列中的对象或者整个序列进行加工处理,转换成不一样的事件或者事件序列。

map() 变换 将对象集合转化成对象

Observable.just(1, 2, 3)
        .map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "I'm " + integer;
            }
        })
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });
复制代码

在这里面最重要的是 Function 接口,它就一个方法,就是将构造函数中的第一个参数转为第二个参数进行返回。

public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception;
}
复制代码

flatMap() 变换

private void flatMap() {
        Log.d(TAG, "flatMap: =============");
        List<Person> personList = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            List<Plan> plans = new ArrayList<>();
            // 计划一
            Plan plan1 = new Plan("8:30", "上班");
            // 填充准备作的事情
            List<String> actions1 = new ArrayList<>();
            actions1.add("1打开电脑");
            actions1.add("1打开AS");
            actions1.add("1打开WX");
            plan1.setActionList(actions1);

            // 计划二
            Plan plan2 = new Plan("12:00", "吃饭");
            // 填充准备作的事情
            List<String> actions2 = new ArrayList<>();
            actions2.add("2下楼");
            actions2.add("2买饭");
            actions2.add("2开吃");
            plan2.setActionList(actions2);

            // 计划三
            Plan plan3 = new Plan("18:00", "下班");
            // 填充准备作的事情
            List<String> actions3 = new ArrayList<>();
            actions3.add("3关闭WX");
            actions3.add("3关闭AS");
            actions3.add("3关闭电脑");
            plan3.setActionList(actions3);

            plans.add(plan1);
            plans.add(plan2);
            plans.add(plan3);

            Person person = new Person("tom" + i, plans);
            personList.add(person);
        }

        Observable.fromIterable(personList)
                .flatMap(new Function<Person, ObservableSource<Plan>>() {
                    @Override
                    public ObservableSource<Plan> apply(Person person) throws Exception {
                        if ("tom1".equals(person.getName())) {
                            return Observable.fromIterable(person.getPlanList()).delay(10, TimeUnit.SECONDS);
                        }
                        return Observable.fromIterable(person.getPlanList());
                    }
                })
                .flatMap(new Function<Plan, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Plan plan) throws Exception {
                        return Observable.fromIterable(plan.getActionList());
                    }
                })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: ");
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext: " + s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });
    }
复制代码

参考

给 Android 开发者的 RxJava 详解

RxJava2 只看这一篇文章就够了

相关文章
相关标签/搜索