反应式编程 RxJava 设计原理解析

本文首发于 vivo互联网技术 微信公众号 
连接: https://mp.weixin.qq.com/s/duO1pAfaKUI2_x_GVvZHMg
做者:Yunjie Ma

1、ReactiveX 与 RxJava

ReactiveX 的全称为Reactive Extension,通常缩写为 Rx,即咱们日常所说的反应式编程。其设计原理主要使用了观察者模式,区分数据的生产者和消费者,经过事件流的方式进行数据的异步处理。java

RxJava 是 ReactiveX Java语言的实现,其编程体验与Java 8中的函数式编程和流(Stream)有很大的类似之处,在掌握了Java8的相关知识后,你能够很轻松的就上手使用 RxJava。编程

本篇文章主要聚焦对RxJava中几种主要的设计模式的理解,经过梳理Observable的相关类图以及讲解这些类之间的关系,让你们可以更清晰的理解RxJava中事件驱动的工做原理。设计模式

2、RxJava中的概念

首先咱们写一个简单的RxJava的程序,把数组中的元素做为事件发送,最终由消费者打印在控制台:数组

咱们以这段简单的代码为基础,讲解下贯穿整个ReactiveX设计的四个概念:观察者,被观察者,事件,订阅微信

  • 观察者:对事件进行响应的对象,也能够称做消费者,在上述的代码中,subscirbe方法的参数是一个Consumer对象,该对象后续会被包装成一个LambdaObserver对象,即为这段代码中的观察者(消费者)。
  • 被观察者:产生事件的对象,也能够称做生产者,在上述代码中,Observable.fromArray(...)返回的是一个Observable对象,即为这段程序的被观察者(生产者)。
  • 事件:RxJava中存在四种事件流:onSubscribe(订阅事件),onNext(正常事件),onError(异常事件),onComplete(完成事件)。在上述代码中,是将数组中的元素做为onNext事件中的数据进行发送。
  • 订阅:建立观察者与被观察者之间观察关系,对应着上述代码中的subscribe()方法。RxJava的事件驱动模型是一种“拉模型”,在观察者没有进行事件订阅以前是不会有事件产生的,只有观察者进行订阅后,才会触发被观察者生产事件。

对上述代码进行时序分析,能够清晰的看出这一段代码的运行过程,最终由FromArrayDisposable生产了onNext和onComplete事件,并通知Observer进行消费。异步

与此同时,咱们也看到,简单的一行代码,居然涉及这么多类的交互,若是增长一些其余的操做符,咱们对整个程序把控起来就没那么容易了,下面咱们将经过分析RxJava中的一些主要的设计模式,剖析类与类的关联关系,来更清晰地理解RxJava的工做原理。函数式编程

3、 集大成者Observable

在整个数据处理的过程当中,Observable能够说是最重要的一个对象。从上面的时序图能够看出,客户端(消息的生产者或者消费者)只和Observable进行交互,观察者和被观察者之间关系的建立也是由Observable去实现,而不用咱们显示的编码实现,这大大下降了咱们使用观察者模式的成本。函数

那么Observable主要有哪些做用呢,咱们首先来看下和Observable相关的类图:编码

从图中咱们能够看出:spa

  • Observable实现了ObservableSource接口,从字面意思就能够理解,这是一个提供观察能力的接口,因此Observable的一大能力是供观察者进行事件订阅,而进行事件订阅的方法实现就是调用Observable的subscribe()方法
  • Observable是一个抽象类,它提供了subscribeActual模板方法供子类实现,从源码中能够看出,Observable的subscribe()方法最终会委托子类的subscribeActual()方法实现,这个方法会创建生产者与消费者之间的关联关系。
  • 除此以外,Observable仍是一个工厂类,它提供了静态方法fromArray()、create()等用来建立具体的可观察对象,同时还提供了flatMap()、concatMap()等操做方法对可观察对象进行包装。

Observable的存在让生产者和消费者彻底的解耦了,生产者只需关注本身生成何种Observable对象,而消费者也只需关注本身观察的是哪一种Observable。

在实际的应用中,Rxjava已经提供了各类各样的操做符供咱们使用,生产者只须要调用Observable中相应的方法便可以生成所需的可观察对象,供消费者进行事件订阅。消费者只需调用可观察对象的subscribe()方法便可与生产者创建观察关系,极其方便。

4、 真实的观察

观察者模式是RxJava设计的核心思想,在观察者模式中老是存在观察的对象和被观察的对象,从上文的解析中也能够看出Observable更多的是一个控制器的做用,而并不是真正的事件的来源。那么在RxJava中,什么才是真正的生产者,什么才是真正的消费者呢。

咱们来分析下如下三种常见的Observable:

先简单介绍下这几个Observable的做用,fromArray的做用是将数组中的元素做为onNext事件发送,create的做用是发送自定义事件,just的做用是发送单个事件。

上一小节有讲到实际的订阅行为是由各个Observable类中subscribeActual()方法实现的,咱们来看下这三个类的subscribeActual()方法。

除去细枝末节,这三个方法均可以分红如下三步

  1. 建立被观察者对象,并传入观察者observer,创建二者的关联关系;
  2. 触发onSubscribe事件,观察者响应该事件;
  3. 进行事件的拉取,咱们能够进入到d.run(),source.subscribe(parent),sd.run()这些方法的内部看一些,能够看到这些方法就是在发送onNext(),onError(),onComplete()等事件。

下图是整个流程中的相关类图。实际事件的发送者是FromArrayDisposable等对象,而实际的观察者,则是一个实现了Observer接口的实体类。若是咱们在subscribe时传入的是一个lambda表达式,以后会被包装成一个默认的LambdaObserver对象,进行事件消费。

5、 包装的必要

RxJava中提供了丰富的操做符,好比flatMap,concatMap等能够对事件转换,subscribeOn,observableOn等能够对生产和消费的线程进行控制。这些操做符实际上调用了Observable中的包装方法对原有的可观察对象进行包装,返回了一个加强了的可观察对象。

操做符种类繁多,在这就不一一举例,咱们以flatMap为例,分析一下这些操做符是如何工做的。

首先,flatMap操做会返回一个ObservableFlatMap对象,在建立这个对象时,会将原始的Observable对象做为构造函数的参数传入。

查看其核心方法subscribeActual,

能够看到这一类对象的subscribeActual方法和上一节中的方法不太同样,这里面并无去实际的建立观察关系,而是作了两件事:

  1. 对观察者进行加强,将其包装成为MergeObserver对象,由其对产生的时间进行响应。
  2. 再调用source的subscribe方法,这里source就是前面构造函数中传入的Observable对象,由其再进行观察关系的创建。
    下图是RxJava中装饰器模式的相关类图:全部的包装类都继承了AbstractObservableWithUpstream类,该抽象类有一个类型为ObservableSource的成员函数,用来持有被装饰的对象。

Observable是支持链式操做的,就和Java 8中的Stream同样,咱们来考虑这样一行代码。

咱们在分析上面这串代码时,必定会凌乱很是,在看源码时也会看到前面忘掉后面,可是若是咱们对RxJava的包装流程足够了解的话,就能够很轻松的对上述代码进行分析。

6、 小结

RxJava的封装足够强大,可让咱们很方便的进行使用和扩展,但这也给咱们理解其真实的工做原理带来了难度,若是咱们对整个事件的处理过程处于只知其一;不知其二的状态,那咱们就没法从容的对服务进行异步编排,在实际开发过程当中也难以发现问题的根源。

本文主要分析了RxJava中主要的设计模式,其中有模板模式、工厂模式、观察者模式、装饰器模式,理解了这些设计模式,理解了RxJava中类与类的关系,咱们就可以对整个事件的处理流程了然于胸,分析代码时也可以事半功倍。

相关文章
相关标签/搜索