RxJava 入门

导入

我相信你们确定对ReactiveX 和 RxJava 都不陌生,由于如今只要是和技术相关的网站,博客都会随处见到介绍ReactiveX和RxJava的文章。html

ReactiveX

  • ReactiveX是Reactive Extensions 的缩写,即响应式编程的扩展。
  • “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序库)。
  • Rx是一种编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步 数据流.由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源。他在各个经常使用编程语言上都有实现。如Java,C#,PHP,Swift,Scala等等.社区网站是reactivex.io
  • ReactiveX不只仅是一个编程接口,它是一种编程思想的突破,它影响了许多其它的程序库和框架以及编程语言。

关于响应式编程

  • 事件总线(Event buses)或我们常见的单击事件就是一个异步事件流,你能够观察这个流,也能够基于这个流作一些自定义操做。响应式就是基于这种想法。你可以建立全部事物的数据流,而不只仅只是单击和悬停事件数据流。 流廉价且无处不在,任何事物均可以看成一个流:变量、用户输入、属性、缓存、数据结构等等。好比,假设你的微博评论就是一个跟单击事件同样的数据流,你可以监听这个流,并作出响应。
  • 有一堆的函数可以建立(create)任何流,也能将任何流进行组合(combine)和过滤(filter)。 这正是“函数式”的魔力所在。一个流能做为另外一个流的输入(input),甚至多个流也能够做为其它流的输入。你能合并(merge)两个流。你还能经过过滤(filter)一个流获得那些你感兴趣的事件。你能将一个流中的数据映射(map)到一个新的流中。
  • 响应式编程的主要组成部分是observable, operator 和 observer
  • 通常响应式编程的信息流:Observable -> Operator1 -> Operator2->...->OperatorN->Observer
  • Observable 是事件的生产者,Observer是事件的最终消费者。中间能够经过任意多个的Operator对事件进行处理和转换
  • 由于Observer一般在主线程中执行,所以设计上要求代码尽量的简单,只对事件做出相应(不对事件或者数据进行修改,全部修改事件的工做所有由operator完成)

RxJava 和 RxAndroid

  • RxJava是ReactiveX在 Java 平台的实现,你能够将它看做一个普通的Java类库。
  • RxAndroid是RxJava的一个针对Android平台的扩展,主要用于 Android 开发。
  • RxJava就是一个作异步开发的框架,和Android系统提供的 Handler+Thread,AsyncTask,Context.runOnUiThread等是解决的是一样的问题。那么他跟系统提供的异步编程方案比,有什么好处呢。或者说他有什么样的优点值得咱们花时间和精力切换到RxJava呢?

总结起来能够用两个词来归纳:异步和简洁java

主要概念

Observable(被观察者)

Observables 负责发出一系列的事件,这里的事件能够是任何东西,例如网络请求的结果,复杂计算处理的结果,数据库操做的结构,文件操做的结果等,事件执行结束后交给Observer的回调处理。react

Observer(观察者)

进行订阅接受处理事件android

Operator(操做符)中文文档

负责对事件进行各类变化和处理git

Scheduler(调度器)

提供了各类调度器,是RxJava能够方便的实现异步开发github

事件

这里的事件值指的是 onNext (有新数据),onComplete (全部数据处理完成),onError (事件队列异常)sql

RxJava的好处(为何RxJava对于Android如此重要)

  • 轻松使用并发:让异步编程变得简单简洁.像写同步代码同样。
  • 方便的线程切换
  • 简单而完善的异常处理机制:传统的try/cache没办法处理异步中子线程产生的异常,RxJava 提供了合适的错误处理机制
  • 强大的操做符支持,函数式的风格,链式调用。

举个例子

假如如今咱们有这样一个需求:界面上有一个自定义的视图 imageCollectorView ,它的做用是显示多张图片,并能使用 addImage(Bitmap) 方法来任意增长显示的图片。如今须要程序将一个给出的目录数组 File[] folders 中每一个目录下的 png 图片都加载出来并显示在 imageCollectorView 中。须要注意的是,因为读取图片的这一过程较为耗时,须要放在后台执行,而图片的显示则必须在 UI 线程执行。shell

目录数组数据库

File[] folders=new File[]{......};复制代码

线程方式实现(call hell)编程

new Thread() {
    @Override
    public void run() {
        super.run();
        try{
          for (File folder : folders) {
              File[] files = folder.listFiles();
              for (File file : files) {
                  if (file.getName().endsWith(".png")) {
                      final Bitmap bitmap = getBitmapFromFile(file);
                      getActivity().runOnUiThread(new Runnable() {
                          @Override
                          public void run() {
                              imageCollectorView.addImage(bitmap);
                          }
                      });
                  }
              }
          }
        }catch(Exception e){
          //error handling
          //只能在这里进行异常处理
        }

    }
}.start();复制代码

RxJava 实现

//建立Observable
Observable observable = Observable.create(new ObservableOnSubscribe<File>() {
            @Override
            public void subscribe(ObservableEmitter<File> e) throws Exception {
                for (File file : files) {
                    e.onNext(file);
                }
                e.onComplete();
            }
        });
//建立Observer
Observer<Bitmap> observer = new Observer<Bitmap>() {
            @Override
            public void onSubscribe(Disposable disposable) {

            }

            @Override
            public void onNext(Bitmap bitmap) {
                imageCollectorView.addImage(bitmap);
            }

            @Override
            public void onError(Throwable throwable) {
              //error handling
            }

            @Override
            public void onComplete() {
                Log.i(TAG,"All images are shown");
            }
        };
//对事件集进行处理并链接消费者
observable.flatMap(new Func1<File, Observable<File>>() {//分别获取每一个文件夹下面的文件,组合成一个Observable
              @Override
              public Observable<File> call(File file) {
                  return Observable.from(file.listFiles());
              }
          })
          .filter(new Func1<File, Boolean>() {//过滤出全部扩展名为png的文件
              @Override
              public Boolean call(File file) {
                  return file.getName().endsWith(".png");
              }
          })
          .map(new Func1<File, Bitmap>() {//根据File对象,获取Bitmap对象
              @Override
              public Bitmap call(File file) {
                  return getBitmapFromFile(file);
              }
          })
          .subscribeOn(Schedulers.io())//指定Observable的全部操做符的操做在io线程中执行
          .observeOn(AndroidSchedulers.mainThread())//指定消费者在主线程中执行
          .subscribe(observer);//链接观察者复制代码

有的人可能说了,你这不是代码更多,更复杂了吗?
不要着急,这只是最基础的版本,稍后会对代码进行简化。
但即便是这种状况下,代码虽然多了,但咱们能够发现,他的逻辑更清晰了,也没有那么多的嵌套了。

简化代码

  • 对于一个数组,可用建立操做符“from”来建立Observable
  • 若是咱们只对结果感兴趣,不关心异常处理和事件发射完成事件,我也能够将Observer用Consumer来替换
//建立Observable
Observable observable = Observable.from(folers);
//建立Observer
Consumer<Bitmap> consumer=new Consumer<Bitmap>() {
            @Override
            public void accept(@NonNull Bitmap bitmap) throws Exception {
                imageCollectorView.addImage(bitmap);
            }
          };
//对事件集进行处理并链接消费者
observable.flatMap(new Func1<File, Observable<File>>() {//分别获取每一个文件夹下面的文件,组合成一个Observable
              @Override
              public Observable<File> call(File file) {
                  return Observable.from(file.listFiles());
              }
          })
          .filter(new Func1<File, Boolean>() {//过滤出全部扩展名为png的文件
              @Override
              public Boolean call(File file) {
                  return file.getName().endsWith(".png");
              }
          })
          .map(new Func1<File, Bitmap>() {//根据File对象,获取Bitmap对象
              @Override
              public Bitmap call(File file) {
                  return getBitmapFromFile(file);
              }
          })
          .subscribeOn(Schedulers.io())//指定Observable的全部操做符的操做在io线程中执行
          .observeOn(AndroidSchedulers.mainThread())//指定消费者在主线程中执行
          .subscribe(consumer);//链接消费者复制代码

RxJava 链式调用实现

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<Bitmap>() {
                @Override
                public void accept(@NonNull Bitmap bitmap) throws Exception {
                    imageCollectorView.addImage(bitmap);
                }
              });复制代码

RxJava + lambda 实现

Observable.from(folders)
    .flatMap(file -> Observable.from(file.listFiles())
    .filter(file -> file.getName().endsWith(".png"))
    .map( file -> getBitmapFromFile(file))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(bitmap -> imageCollectorView.addImage(bitmap));//无异常处理,有异常会抛到主线程,不影响咱们原来程序的crash处理复制代码

关于lambda(匿名函数,它能够包含表达式和语句)在Android中的使用:

  • 要在 Android 的较早版本中测试 Lambda 表达式、方法引用和类型注解,请前往您的 build.gradle 文件,将 compileSdkVersion 和 targetSdkVersion 设置为 23 或更低。您仍须要启用 Jack 工具链以使用这些 Java 8 功能。
  • 经测试,按照官方提供的方案配置后,虽然可使用lambda,但编译速度变的很慢。
  • 在运行的时候,而且我测试用的项目引用改了 Bouncy Castle(轻量级加密解密工具包) 这个包报出了内存溢出的异常,因此我感受如今还不太稳定。
  • 第三方开源的实现方案:retrolambda
  • 固然咱们也能够不用lambda,这样代码看着比较多,但因其只有一层嵌套的链式调用,因此逻辑结构并不复杂。事实上 Android Studio 会自动帮咱们把这部分代码折叠成lambda的形式。

更进一步,假设咱们如今须要忽略掉前5张,一共显示10张

Observable.from(folders)
    .flatMap(file -> Observable.from(file.listFiles())
    .filter(file -> file.getName().endsWith(".png"))

    .skip(5)
    .take(10)

    .map( file -> getBitmapFromFile(file))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(bitmap -> imageCollectorView.addImage(bitmap));//无异常处理,有异常会抛到主线程,不影响咱们原来程序的crash处理复制代码

操做符简介

建立操做

用于建立Observable的操做符

  • Create — 经过调用观察者的方法从头建立一个Observable

    create操做符是全部建立型操做符的“根”,也就是说其余建立型操做符最后都是经过create操做符来建立Observable的

  • From — 将其它的对象或数据结构转换为Observable

  • Just — 将对象或者对象集合转换为一个会发射这些对象的Observable
  • Defer — 在观察者订阅以前不建立这个Observable,为每个观察者建立一个新的Observable
  • Empty/Never/Throw — 建立行为受限的特殊Observable

    通常用于测试

  • Interval — 建立一个定时发射整数序列的Observable

  • Range — 建立发射指定范围的整数序列的Observable
  • Repeat — 建立重复发射特定的数据或数据序列的Observable
  • Start — 建立发射一个函数的返回值的Observable
  • Timer — 建立在一个指定的延迟以后发射单个数据的Observable
变换操做

这些操做符可用于对Observable发射的数据进行变换

  • Map — 映射,经过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
  • Buffer — 缓存,能够简单的理解为缓存,它按期从Observable收集数据到一个集合,而后把这些数据集合打包发射,而不是一次发射一个
  • FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,而后将这些Observable发射的数据平坦化的放进一个单独的Observable,能够认为是一个将嵌套的数据结构展开的过程。
  • GroupBy — 分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每个Observable发射一组不一样的数据
  • Scan — 扫描,对Observable发射的每一项数据应用一个函数,而后按顺序依次发射这些值
  • Window — 窗口,按期未来自Observable的数据分拆成一些Observable窗口,而后发射这些窗口,而不是每次发射一项。相似于Buffer,但Buffer发射的是数据,Window发射的是Observable,每个Observable发射原始Observable的数据的一个子集
过滤操做

这些操做符用于从Observable发射的数据中进行选择,符合必定条件的发送给观察者进行处理,不符合条件的直接丢弃

  • Filter — 过滤,过滤掉没有经过谓词测试的数据项,只发射经过测试的
  • Skip — 跳过前面的若干项数据
  • SkipLast — 跳事后面的若干项数据
  • Take — 只保留前面的若干项数据
  • TakeLast — 只保留后面的若干项数据
  • Debounce — 只有在空闲了一段时间后才发射数据,通俗的说,就是若是一段时间没有操做,就执行一次操做
  • Distinct — 去重,过滤掉重复数据项
  • ElementAt — 取值,取特定位置的数据项
  • First — 首项,只发射知足条件的第一条数据
  • IgnoreElements — 忽略全部的数据,只保留/终止通知(onError或onCompleted)
  • Last — 末项,只发射最后一条数据
  • Sample — 取样,按期发射最新的数据,等因而数据抽样,有的实现里叫ThrottleFirst
组合操做

组合操做符用于将多个Observable组合成一个单一的Observable

  • And/Then/When — 经过模式(And条件)和计划(Then次序)组合两个或多个Observable发射的数据集
  • CombineLatest — 当两个Observables中的任何一个发射了一个数据时,经过一个指定的函数组合每一个Observable发射的最新数据(一共两个数据),而后发射这个函数的结果
  • Join — 不管什么时候,若是一个Observable发射了一个数据项,只要在另外一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射
  • Merge — 将两个Observable发射的数据组合并成一个
  • StartWith — 在发射原来的Observable的数据序列以前,先发射一个指定的数据序列或数据项
  • Switch — 将一个发射Observable序列的Observable转换为这样一个Observable:它逐个发射那些Observable最近发射的数据
  • Zip — 打包,使用一个指定的函数将多个Observable发射的数据组合在一块儿,而后将这个函数的结果做为单项数据发射
错误处理

这些操做符用于从错误通知中恢复

  • Catch — 捕获,继续序列操做,将错误替换为正常的数据,从onError通知中恢复
  • Retry — 重试,若是Observable发射了一个错误通知,从新订阅它,期待它正常终止
辅助操做

一组用于处理Observable的操做符

  • Delay — 延迟一段时间发射结果数据
  • Do — 注册一个动做占用一些Observable的生命周期事件,至关于Mock某个操做
  • Materialize/Dematerialize — 将发射的数据和通知都当作数据发射,或者反过来
  • ObserveOn — 指定观察者观察Observable的调度程序(工做线程)
  • SubscribeOn — 指定Observable应该在哪一个调度程序上执行
  • Serialize — 强制Observable按次序发射数据而且功能是有效的
  • Subscribe — 收到Observable发射的数据和通知后执行的操做
  • TimeInterval — 将一个Observable转换为发射两个数据之间所耗费时间的Observable
  • Timeout — 添加超时机制,若是过了指定的一段时间没有发射数据,就发射一个错误通知
  • Timestamp — 给Observable发射的每一个数据项添加一个时间戳
  • Using — 建立一个只在Observable的生命周期内存在的一次性资源
条件和布尔操做

这些操做符可用于单个或多个数据项,也可用于Observable

  • All — 判断Observable发射的全部的数据项是否都知足某个条件
  • Amb — 给定多个Observable,只让第一个发射数据的Observable发射所有数据
  • Contains — 判断Observable是否会发射一个指定的数据项
  • DefaultIfEmpty — 发射来自原始Observable的数据,若是原始Observable没有发射数据,就发射一个默认数据
  • SequenceEqual — 判断两个Observable是否按相同的数据序列
  • SkipUntil — 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,而后发射原始Observable的剩余数据
  • SkipWhile — 丢弃原始Observable发射的数据,直到一个特定的条件为假,而后发射原始Observable剩余的数据
  • TakeUntil — 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知
  • TakeWhile — 发射原始Observable的数据,直到一个特定的条件为真,而后跳过剩余的数据
算术和聚合操做

这些操做符可用于整个数据序列

  • Average — 计算Observable发射的数据序列的平均值,而后发射这个结果
  • Concat — 不交错的链接多个Observable的数据
  • Count — 计算Observable发射的数据个数,而后发射这个结果
  • Max — 计算并发射数据序列的最大值
  • Min — 计算并发射数据序列的最小值
  • Reduce — 按顺序对数据序列的每个应用某个函数,而后返回这个值
  • Sum — 计算并发射数据序列的和
链接操做

一些有精确可控的订阅行为的特殊Observable

  • Connect — 指示一个可链接的Observable开始发射数据给订阅者

    • 可链接的Observable (connectable Observable)与普通的Observable差很少,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操做符时才会开始。用这个方法,你能够等待全部的观察者都订阅了Observable以后再开始发射数据。
    • RxJava中connect是ConnectableObservable接口的一个方法,使用publish操做符能够将一个普通的Observable转换为一个ConnectableObservable。
    • 举例

      ConnectableObservable<String> connectableObservable = Observable.just("a", "c", "d").publish();
          connectableObservable.subscribe(new Consumer<String>() {
              @Override
              public void accept(@NonNull String s) throws Exception {
                  LogUtil.i(s);
              }
          });
      
          LogUtil.i("subscribe end.....");
      
          Observable.timer(3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
              @Override
              public void accept(@NonNull Long aLong) throws Exception {
                  LogUtil.i("connect method called after 3 seconds.");
                  connectableObservable.connect();
              }
          });复制代码
      03-20 15:54:19.328 27493-27493/me.sunbird.react_native_demo I/x_log:RxJavaActivity.testConnectableObservable(L:586): subscribe end.....
      03-20 15:54:22.378 27493-27573/me.sunbird.react_native_demo I/x_log:RxJavaActivity$34.accept(L:591): connect method called after 3 seconds.
      03-20 15:54:22.419 27493-27573/me.sunbird.react_native_demo I/x_log:RxJavaActivity$33.accept(L:582): a
      03-20 15:54:22.419 27493-27573/me.sunbird.react_native_demo I/x_log:RxJavaActivity$33.accept(L:582): c
      03-20 15:54:22.420 27493-27573/me.sunbird.react_native_demo I/x_log:RxJavaActivity$33.accept(L:582): d复制代码
  • Publish — 将一个普通的Observable转换为可链接的

  • RefCount — 使一个可链接的Observable表现得像一个普通的Observable
  • Replay — 确保全部的观察者收到一样的数据序列,即便他们在Observable开始发射数据以后才订阅
转换操做
  • To — 将Observable转换为其它的对象或数据结构
  • Blocking 阻塞Observable的操做符
操做符决策树

几种主要的需求

  • 直接建立一个Observable(建立操做)
  • 组合多个Observable(组合操做)
  • 对Observable发射的数据执行变换操做(变换操做)
  • 从Observable发射的数据中取特定的值(过滤操做)
  • 转发Observable的部分值(条件/布尔/过滤操做)
  • 对Observable发射的数据序列求值(算术/聚合操做)

Scheduler(调度器)中文文档

本质上RxJava就是一个作异步开发的框架,能使咱们极其灵活的进行线程切换。
咱们可使用ObserveOn和SubscribeOn操做符,可让Observable在一个特定的调度器上执行,ObserveOn指示一个Observable在一个特定的调度器上调用观察者的onNext, onError和onCompleted方法,SubscribeOn更进一步,它指示Observable将所有的处理过程(包括发射数据和通知)放在特定的调度器上执行。
subscribeOn 和 observeOn 两个操做符是极其容易混淆的,能够看下这篇博客来完全分清楚这两个操做符SubscribeOn 和 ObserveOn

调度器类型 效果
Schedulers.computation( ) 用于计算任务,如事件循环或和回调处理,不要用于IO操做(IO操做请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor做为调度器
Schedulers.immediate( ) 在当前线程当即开始执行任务
Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操做,这个调度器的线程池会根据须要增加;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( ) 为每一个任务建立一个新线程
Schedulers.trampoline( ) 当其它排队的任务完成后,在当前线程排队开始执行
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                LogUtil.w("subscribe method is running in thread:" + Thread.currentThread().getName());
                observableEmitter.onNext("a");
                observableEmitter.onComplete();
            }
        }).map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                LogUtil.w("first map is running in thread:" + Thread.currentThread().getName());
                return s;
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        LogUtil.w("second map is running in thread:" + Thread.currentThread().getName());
                        return s;
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        LogUtil.w("third map is running in thread:" + Thread.currentThread().getName());
                        return s;
                    }
                })
                .observeOn(Schedulers.computation())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {
                        LogUtil.w("fourth map is running in thread:" + Thread.currentThread().getName());
                        return s;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        LogUtil.w("consumer accept method is running in thread:" + Thread.currentThread().getName());
                    }
                  });复制代码

运行后咱们能够获得以下结果:

03-20 11:44:23.716 8687-8723/? W/x_log:RxJavaActivity$27.subscribe(L:528): subscribe method is running in thread:RxCachedThreadScheduler-1
03-20 11:44:23.717 8687-8723/? W/x_log:RxJavaActivity$28.apply(L:535): first map is running in thread:RxCachedThreadScheduler-1
03-20 11:44:23.721 8687-8724/? W/x_log:RxJavaActivity$29.apply(L:543): second map is running in thread:RxNewThreadScheduler-1
03-20 11:44:23.726 8687-8725/? W/x_log:RxJavaActivity$30.apply(L:551): third map is running in thread:RxCachedThreadScheduler-2
03-20 11:44:23.729 8687-8726/? W/x_log:RxJavaActivity$31.apply(L:559): fourth map is running in thread:RxComputationThreadPool-1
03-20 11:44:23.836 8687-8687/? W/x_log:RxJavaActivity$32.accept(L:567): consumer accept method is running in thread:main复制代码

RxJava 的使用场景举例

复杂的数据变换
Observable.just("1", "2", "2", "3", "4", "5")//建立Observable
    .map(Integer::parseInt)//对每一项执行Integer.parseInt方法
    .filter(s -> s > 1)//过滤出全部值 >1 的对象
    .distinct() //去重,这里也能够传递一个方法,来定义两个对象是否equals的策略,很是灵活
    .take(3)//取到前3个
    .reduce((sum, item) -> sum + item) //累加
    .subscribe(System.out::println);//9 打印出最终累加的结果。复制代码
Retrofit结合RxJava作网络请求框架

这里不做详解,具体的介绍能够看扔物线的这篇文章,对RxJava的入门者有很大的启发。其中也讲到了RxJava和Retrofit如何结合来实现更简洁的代码

RxJava代替EventBus进行数据传递
RxBus

RxBus并非一个库,而是一种模式,是使用了RxJava的思想来达到EventBus的数据传递效果。这篇文章把RxBus讲的比较详细。

square/Otto 对 RxBus 的态度

This project is deprecated in favor of RxJava and RxAndroid. These projects permit the same event-driven programming model as Otto, but they’re more capable and offer better control of threading.

为了支持 RxJava 和 RxAndroid,咱们已经废弃了这个项目。这两个项目提供了和 Otto 同样的基于事件驱动的编程模型,并且他们更强大,并提供更好的线程控制。

If you’re looking for guidance on migrating from Otto to Rx, this post is a good start.

若是你正在寻找从 Otto 迁移到 Rx 的教程,阅读这篇文章将会是一个很好的开始。

一个网络请求依赖另一个网络请求返回的结果。例如:登陆以后,根据拿到的token去获取消息列表。
@GET("/token")
public Observable<String> getToken();

@GET("/user")
public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);

...

getToken()
    .flatMap(new Func1<String, Observable<User>>() {
        @Override
        public Observable<User> call(String token) {
            return getUser(token, userId);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });复制代码
同一个页面同事发起两个以上的请求,此时对页面中的ProgressBar进行管理,即两个请求只能展现一个ProgressBar,而且全部的请求都结束后,ProgressBar 才能消失
@GET("/date1")
public Observable<String> getDate1();

@GET("/data2")
public Observable<String> getData2() ... progressDialog.show() Observable.merge(getData1(), getData2()) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer() {
                    @Override
                    public void onSubscribe(Disposable disposable) {

                    }

                    @Override
                    public void onNext(String s) {
                      // 这里进行结果处理
                    }

                    @Override
                    public void onError(Throwable throwable) {
                      //error handling
                      progressDialog.dismiss();
                    }

                    @Override
                    public void onComplete() {
                      progressDialog.dismiss();
                    }
                  });复制代码
使用throttleFirst(throttle:节流阀)防止按钮重复点击
RxView.clicks(button)
    .throttleFirst(1, TimeUnit.SECONDS)
    .subscribe(new Observer<Object>() {
        @Override
        public void onCompleted() {
              log.d ("completed");
        }

        @Override
        public void onError(Throwable e) {
              log.e("error");
        }

        @Override
        public void onNext(Object o) {
             log.d("button clicked");
        }
    });复制代码
使用debounce(去抖动)作textSearch

用简单的话讲就是当N个结点发生的时间太靠近(即发生的时间差小于设定的值T),debounce就会自动过滤掉前N-1个结点。
好比在作百度地址联想的时候,可使用debounce减小频繁的网络请求。避免每输入(删除)一个字就作一次联想

RxTextView.textChangeEvents(inputEditText)
      .debounce(400, TimeUnit.MILLISECONDS)
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Observer<TextViewTextChangeEvent>() {
    @Override
    public void onCompleted() {
        log.d("onComplete");
    }

    @Override
    public void onError(Throwable e) {
        log.d("Error");
    }

    @Override
    public void onNext(TextViewTextChangeEvent onTextChangeEvent) {
        log.d(format("Searching for %s", onTextChangeEvent.text().toString()));
    }
});复制代码

RxJava 的生态

  • rx-preferences -使SharedPreferences支持RxJava

  • RxAndroid -RxJava的Android拓展

  • RxLifecycle -帮助使用了RxJava的安卓应用控制生命周期

  • RxBinding -安卓UI控件的RxJava绑定API

  • Android-ReactiveLocation -Google Play Service API wrapped in RxJava

  • storio -支持RxJava的数据库

  • retrofit -支持RxJava的网络请求库

  • sqlbrite -支持RxJava的sqlite数据库

  • RxPermissions -RxJava实现的Android运行时权限控制

  • reark -RxJava architecture library for Android

  • frodo -Android Library for Logging RxJava Observables and Subscribers.

RxJava 的现况

  • RxJava 最新版 2.0.7(注:不兼容1.x 的版本) 大小 2.1M
  • RxAndroid 大小 10k
  • 支持Java6以上,Android2.3以上
  • github star 22511

RxJava 的将来展望

通用的数据流,强大的操做符,灵活的线程调度,简单完善的异常处理机制,函数式编程等等特性,奠基了RxJava的强大地位。
Android 系统中处处都是异步,好比网络请求,文件读写,数据库查询,系统服务或者第三方SDK服务等,这些异步请求很是耗时,须要在非UI线程中执行,而对UI的修改又必需要在主线程中执行。若是再包含多个异步执行嵌套的话,就会让咱们的代码显得凌乱。经过RxJava提供的强大而通用的异步处理机制,可使咱们的代码逻辑更清晰,便于后期的维护。而且如今RxJava的生态愈来愈大,我的认为,之后全部的涉及异步操做的系统服务,第三方库,第三方服务SDK都会以Observable或类Observable的方式提供给咱们调用,而不是像如今这样,让咱们传递一个又一个的listener。

参考资料

分享工具

zeplin(软件演示)
  • 方便的效果图管理
  • 新文件,修改文件提示
  • 自动测量
  • 标注评论
  • 支持pohtoshop和sketch
  • 第一个项目免费
charles(软件演示)
  • 简单已用,功能强大
  • focus 某一个域名下的请求,方便查找
  • 对某个请求修改参数,从新请求,方便调试
  • 各类格式良好的response解析
  • copy出cURL 格式的请求,方便传递个任何人进行请求模拟
相关文章
相关标签/搜索