Android异步框架RxJava 1.x系列(一) - 观察者模式及实现

Android异步框架RxJava 1.x系列(一) - 观察者模式及实现

前言

RxJava 是一款基于 Java VM 实现的响应式编程扩展库 - 基于观察者模式的异步和事件处理框架。RxJava 官方目前同时维护了两个版本,分别是 1.x 和 2.x,区别是它们使用不一样的 group id 和 namespacesjava

 

 

版本 group id namespaces
v1.x io.reactivex io.reactivex
v2.x io.reactivex.rxjava2 rx

本系列的文章将针对 RxJava 1.x 进行介绍,先给出 Github 的地址:react

经过 Gradle 引入相关依赖:android

compile 'io.reactivex:rxjava:1.0.14' compile 'io.reactivex:rxandroid:1.0.1' 复制代码

正文

1. RxJava的定义

一个精准的解释以下:RxJava 是一个运行于 Java VM ,由可观测序列组成的,异步、基于事件的函数库。git

2. RxJava的优势

换句话说,『一样是作异步,为何人们用它,而不用现成的 AsyncTask / Handler / XXX / ... ?』github

一个词:简洁。web

异步操做很关键的一点是程序的简洁性,由于在调度过程比较复杂的状况下,异步代码常常会既难写也难被读懂。 Android 创造的 AsyncTask 和Handler,其实都是为了让异步代码更加简洁。RxJava 的优点也是简洁,但它的简洁的不同凡响之处在于,随着程序逻辑变得愈来愈复杂,它依然可以保持简洁。编程

 

 

在 Android 开发中,假设有这样一个需求:界面上有一个自定义的视图 imageCollectorView,它的做用是显示多张图片,并能使用 addImage(Bitmap) 方法来任意增长显示的图片。如今须要程序将一个给出的目录数组 File[] folders 中每一个目录下的 png 图片都加载出来并显示在 imageCollectorView 中。后端

注意: 因为读取图片的过程较为耗时,须要放在后台执行,而图片的显示则必须在 UI 线程执行。数组

经常使用的实现方式有多种,这里给出其中一种:缓存

new Thread() { @Override public void run() { super.run(); for (File folder : folders) { File[] files = folder.listFiles(); for (File file : files) { if (file.getName().endsWith(".png")) { final Bitmap bitmap = getBitmapFromFile(file); getActivity().runOnUiThread(new Runnable() { @Override public void run() { imageCollectorView.addImage(bitmap); } }); } } } } }.start(); 复制代码

而若是使用 RxJava,实现方式是这样的:

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() { @Override public Observable<File> call(File file) { return Observable.from(file.listFiles()); } }) .filter(new Func1<File, Boolean>() { @Override public Boolean call(File file) { return file.getName().endsWith(".png"); } }) .map(new Func1<File, Bitmap>() { @Override public Bitmap call(File file) { return getBitmapFromFile(file); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { imageCollectorView.addImage(bitmap); } }); 复制代码

能够发现,使用 RxJava 方式代码量明显大大增长,所谓简洁从何而来?

这里说的简洁是指的逻辑上的。观察一下你会发现,RxJava 的这个实现,是一条从上到下的链式调用,没有任何嵌套,这在逻辑的简洁性上是具备优点的。当需求变得复杂时,这种优点将更加明显(试想若是还要求只选取前 10 张图片,常规方式要怎么办?若是有更多这样那样的要求呢?再试想,在这一大堆需求实现完两个月以后须要改功能,当你翻回这里看到本身当初写下的那一片迷之缩进,你能保证本身将迅速看懂,而不是对着代码从新捋一遍思路?)。

另外,若是你的 IDE 是 Android Studio,其实每次打开某个 Java 文件的时候,你会看到被自动 Lambda 化的预览,这将让你更加清晰地看到程序逻辑:

Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") }) .map((Func1) (file) -> { getBitmapFromFile(file) }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) }); 复制代码

因此,RxJava 有啥优势?就好在简洁,优势就是把复杂逻辑,经过函数式编程模型穿成一条线。

3. 观察者模式的扩展

RxJava 的异步实现,是经过一种扩展的观察者模式来实现的。

3.1. 通用的观察者模式

观察者模式面向的需求是:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,须要在 B 变化的一瞬间作出反应。

举个例子,新闻里喜闻乐见的警察抓小偷,警察须要在小偷伸手做案的时候实施抓捕。在这个例子里,警察是观察者,小偷是被观察者,警察须要时刻盯着小偷的一举一动,才能保证不会漏过任何瞬间。

程序的观察者模式略有不一样,观察者不须要时刻盯着被观察者(例如 A 不须要每过 2ms 就检查一次 B 的状态),而是采用注册( Register )或者称为订阅(Subscribe)的方式,告诉被观察者:我须要你的某种状态,你要在它变化的时候通知我。

采起这样被动的观察方式,既省去了反复检索状态的资源消耗,也可以获得最高的反馈速度。

Android 开发中一个典型的例子是点击监听器 OnClickListener 。对设置 OnClickListener来讲,View 是被观察者,OnClickListener 是观察者,两者经过 setOnClickListener() 方法达成订阅关系。订阅以后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已注册的 OnClickListener 。

OnClickListener 的观察者模式大体以下图:

 

 

如图所示,经过 setOnClickListener() 方法,Button 持有 OnClickListener 的引用(这一过程没有在图上画出)。当用户点击时,Button 自动调用 OnClickListener 的 onClick() 方法。

按照观察者模式抽象出来的各个概念:

  • Button: 被观察者
  • OnClickListener: 观察者
  • setOnClickListener(): 订阅
  • onClick(): 事件处理

就由专用的观察者模式转变成了通用的观察者模式,以下图:

 

 

3.2. RxJava的观察者模式

RxJava 有四个基本概念:

  • Observable: 可观察者,即被观察者
  • Observer: 观察者
  • Subscribe: 订阅
  • Event: 事件处理

Observable 和 Observer 经过 subscribe() 方法实现订阅关系,使得Observable 能够在须要的时候发出事件来通知 Observer

与传统观察者模式不一样,RxJava 的事件回调方法除了普通事件 onNext() (至关于 onClick()) 以外,还定义了两个特殊的事件:onCompleted() 和 onError()

  • onCompleted(): 事件队列完结

RxJava 不只把每一个事件单独处理,还会把它们看作一个队列。RxJava规定,当不会再有新的 onNext() 发出时,须要触发 onCompleted() 方法做为事件完成标志。

  • onError(): 事件队列异常

在事件处理过程当中出异常时,onError() 会被触发,同时队列自动终止,不容许再有事件发出。

在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个被调用,而且是事件序列中的最后一个执行。

RxJava 的观察者模式大体以下图:

 

 

4. RxJava的基本使用

基于以上的概念,RxJava 的基本使用有 3 个步骤:

4.1. 建立Obsever

Observer 即观察者,它决定事件触发的时候将有怎样的行为。 RxJava 中的 Observer 接口的声明方式:

Observer<String> observer = new Observer<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error: " + e.getMessage()); } }; 复制代码

除了 Observer 接口以外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是彻底同样的:

Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error: " + e.getMessage()); } }; 复制代码

实质上,在 RxJava 的 subscribe 过程当中,Observer 也老是会先被转换成一个 Subscriber再使用。因此若是你只想使用基本功能,选择 Observer 和 Subscriber 是彻底同样的。它们的区别对于使用者来讲主要有两点:

  • onStart()

这是 Subscriber 增长的方法。它会在 subscribe 刚开始,而事件还未发送以前被调用。能够用于作一些准备工做,例如数据的清零或重置。这是一个可选方法,默认状况下它的实现为空。

须要注意的是,若是对准备工做的线程有要求(例如: 弹出一个显示进度的对话框,这必须在主线程执行),onStart() 就不适用了。由于它老是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来作准备工做,可使用 doOnSubscribe() 方法,具体能够在后面的章节中看到。

  • unsubscribe()

这是 Subscriber 所实现的另外一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将再也不接收事件。通常在这个方法调用前,可使用 isUnsubscribed() 先判断一下状态。

unsubscribe() 这个方法很重要,由于在 subscribe() 以后, Observable 会持有 Subscriber 的引用。这个引用若是不能及时被释放,将有内存泄露的风险。

注意:在再也不使用的时候尽快在合适的地方(例如: onPause() 和 onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以免内存泄露的发生。

4.2. 建立Obsevable

4.2.1. Obsevable.create()

Observable 即被观察者,它决定何时触发事件以及触发怎样的事件。 RxJava 使用 create() 方法来建立一个 Observable ,并为它定义事件触发规则。示例以下:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Hi"); subscriber.onNext("Aloha"); subscriber.onCompleted(); } }); 复制代码

能够看到,这里传入了一个 OnSubscribe 对象做为参数。OnSubscribe 会被存储在返回的 Observable 对象中。

它的做用至关于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用三次 onNext() 和一次 onCompleted())。

这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

4.2.2. Obsevable.just(T...)

create() 方法是 RxJava 最基本的建立事件序列的方法。基于这个方法,RxJava 还提供了一些方法用于快捷建立事件队列,例如 just() 方法:

Observable observable = Observable.just("Hello", "Hi", "Aloha"); // 将会依次调用方法序列:onNext("Hello") -> onNext("Hi") -> onCompleted() 复制代码

4.2.3. Obsevable.from(T[])和from(Iterable<? extends T>)

将传入的数组或 Iterable 拆分红具体对象后,依次发送给观察者,示例以下:

String[] words = {"Hello", "Hi", "Aloha"}; Observable observable = Observable.from(words); // 将会依次调用方法序列:onNext("Hello") -> onNext("Hi") -> onCompleted() 复制代码

4.3. Subscribe关联

建立了 Observable 和 Observer 以后,再用 subscribe() 方法将它们关联起来,整条链子就能够工做了。代码很简单:

observable.subscribe(observer);
// 或者 observable.subscribe(subscriber); 复制代码

可能会注意到,subscribe() 这个方法有点怪:它看起来是『observable 订阅了 observer / subscriber』,而不是『observer / subscriber 订阅了 observable』。这看起来就像『杂志订阅了读者』同样颠倒了对象关系。

这让人读起来有点别扭,不过若是把 API 设计成 『observer.subscribe(observable) / subscriber.subscribe(observable)』,虽然更加符合思惟逻辑,但对流式 API 的设计就形成影响了,比较起来明显是得不偿失的。

Observable.subscribe(Subscriber) 的内部实现是这样的(核心代码):

public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); onSubscribe.call(subscriber); return subscriber; } 复制代码

能够看到subscriber() 作了3件事:

(a). 调用Subscriber.onStart()

这个方法在前面已经介绍过,是一个可选的准备方法。

(b). 调用Observable中的OnSubscribe.call(Subscriber)

事件发送的逻辑开始运行。从这也能够看出,在RxJava中,Observable并非在建立的时候就当即开始发送事件,而是在它被订阅的时候,即当subscribe()方法执行的时候。

(c). 返回Subscription

将传入的Subscriber做为Subscription返回。这是为了方便后面的unsubscribe()。

整个过程当中对象间的关系以下图:

 

 

或者能够看动图:

 

 

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义建立出 Subscriber。形式以下:

Action1<String> onNextAction = new Action1<String>() { // onNext() @Override public void call(String s) { Log.d(tag, s); } }; Action1<Throwable> onErrorAction = new Action1<Throwable>() { // onError() @Override public void call(Throwable throwable) { // Error handling } }; Action0 onCompletedAction = new Action0() { // onCompleted() @Override public void call() { Log.d(tag, "completed"); } }; // 自动建立 Subscriber ,并使用 onNextAction 来定义 onNext() observable.subscribe(onNextAction); // 自动建立 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError() observable.subscribe(onNextAction, onErrorAction); // 自动建立 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted() observable.subscribe(onNextAction, onErrorAction, onCompletedAction); 复制代码

简单解释一下这段代码中出现的 Action1 和 Action0

  • Action0

Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的。因为 onCompleted() 方法也是无参无返回值的,所以 Action0 能够被当成一个包装对象,将 onCompleted() 的内容打包起来将本身做为一个参数传入 subscribe() 以实现不完整定义的回调。

  • Action1

Action1 也是一个接口,它一样只有一个方法 call(T param),这个方法也无返回值,但有一个参数。与 Action0 同理,因为 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,所以 Action1 能够将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。

事实上,虽然 Action0 和 Action1 在 API 中使用最普遍,但 RxJava 提供了多个 ActionX 形式的接口 (例如: Action2Action3),它们能够被用以包装不一样的无返回值的方法。

4.4. 场景示例

4.4.1. 打印字符串数组

将字符串数组 names 中的全部字符串依次打印出来:

String[] names = ...;
Observable.from(names)
    .subscribe(new Action1<String>() { @Override public void call(String name) { Log.d(tag, name); } }); 复制代码

4.4.2. 由ID取得图片显示

int drawableRes = ...; ImageView imageView = ...; Observable.create(new OnSubscribe<Drawable>() { @Override public void call(Subscriber<? super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); } }).subscribe(new Observer<Drawable>() { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); } }); 复制代码

正如上面两个例子这样,建立出 Observable 和 Subscriber,再用 subscribe() 将它们串起来,一次 RxJava 的基本使用就完成了,很是简单!

然而。

 

 

小结

在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。也就是说,若是只用上面的方法,实现出来的只是一个同步的观察者模式。观察者模式自己的目的就是『后台处理,前台回调』的异步机制,所以异步对于 RxJava 是相当重要的。而要实现异步,则须要用到 RxJava 的另外一个核心的概念 Scheduler,后续将给出详细介绍。


欢迎关注技术公众号: 零壹技术栈

 

零壹技术栈

 

本账号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。

相关文章
相关标签/搜索