(11)照虎画猫深刻理解响应式流规范——响应式Spring的道法术器

本系列文章索引《响应式Spring的道法术器》
前情提要 响应式流 | Reactor 3快速上手
本文源码html

2 响应式编程之法

上一章在响应式编程库方面,本着“快速上手”的原则,介绍了响应式流的概念,以及Reactor 3的使用。这一章,咱们基于对Reactor 3源码的模仿,从《响应式流规范》入手,深刻了解响应式流开发库。java

2.1 响应式流规范

现代软件对近乎实时地处理数据的需求愈来愈强烈,对不断变化的信息的即时响应,意味着更大的商业价值,流处理是一种快速将数据转换为有用信息的手段。react

数据流中的元素能够是一个一个的待计算的数据,也能够是一个一个待响应的事件。前者多用于大数据处理,好比Storm、Spark等产品,后者经常使用于响应式编程,好比Netflix在使用的RxJava、Scala编程语言的发明者Typesafe公司(已改名为Lightbend)的Akka Stream、Java开发者都熟悉的Pivotal公司的Project Reactor、走在技术前沿的Vert.x等。git

软件行业是一个很是注重分享和交流的行业。随着对响应式编程技术的讨论与沟通逐渐深刻,2013年底的时候,Netflix、Pivotal、Typesafe等公司的工程师们共同发起了关于制定“响应式流规范(Reactive Stream Specification)”的倡议和讨论,并在github上建立了reactive-streams-jvm项目。到2015年5月份,1.0版本的规范出炉,项目README就是规范正文。github

各个响应式开发库都要遵循这个规范,其好处也是显而易见的。之因此咱们编写的Java代码能够在Hotspot、J9和Zing等JVM运行,是由于它们都遵循Java虚拟机规范。相似的,因为各个响应式开发库都遵循响应式流规范,所以互相兼容,不一样的开发库之间能够进行交互,咱们甚至能够同时在项目中使用多个响应式开发库。对于Spring WebFlux来讲,也可使用RxJava做为响应式库。编程

<img height=300em src="https://leanote.com/api/file/getImage?fileId=5a9e3ed2ab644159cf00128e"/>;api

虽然响应式流规范是用来约束响应式开发库的,做为使用者的咱们若是可以了解这一规范对于咱们理解开发库的使用也是颇有帮助的,由于规范的内容都是对响应式编程思想的精髓的呈现。访问reactive-streams-jvm项目,能够浏览规范的细节,包括其中定义的响应式流的特色:数组

  1. 具备处理无限数量的元素的能力;
  2. 按序处理;
  3. 异步地传递元素;
  4. 必须实现非阻塞的回压(backpressure)。

2.1.1 响应式流接口

响应式流规范定义了四个接口,以下:安全

1.Publisher是可以发出元素的发布者。并发

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

2.Subscriber是接收元素并作出响应的订阅者。

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

当执行subscribe方法时,发布者会回调订阅者的onSubscribe方法,这个方法中,一般订阅者会借助传入的Subscription向发布者请求n个数据。而后发布者经过不断调用订阅者的onNext方法向订阅者发出最多n个数据。若是数据所有发完,则会调用onComplete告知订阅者流已经发完;若是有错误发生,则经过onError发出错误数据,一样也会终止流。

title

订阅后的回调用表达式表示就是onSubscribe onNext* (onError | onComplete)?,即以一个onSubscribe开始,中间有0个或多个onNext,最后有0个或1个onErroronComplete事件。

PublisherSubscriber融合了迭代器模式和观察者模式。

咱们常常用到的IterableIterator就是迭代器模式的体现,能够知足上边第1和2个特色关于按需处理数据流的要求;而观察者模式基于事件的回调机制有助于知足第3个特色关于异步传递元素的要求。

3.SubscriptionPublisherSubscriber的“中间人”。

public interface Subscription {
    public void request(long n);
    public void cancel();
}

当发布者调用subscribe方法注册订阅者时,会经过订阅者的回调方法onSubscribe传入Subscription对象,以后订阅者就可使用这个Subscription对象的request方法向发布者“要”数据了。回压机制正是基于此来实现的,所以第4个特色也可以实现了。

4.ProcessorPublisherSubscriber于一身。

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

这四个接口在JEP 266跟随Java 9版本被引入了Java SDK

这四个接口是实现各开发库之间互相兼容的桥梁,响应式流规范也仅仅聚焦于此,而对诸如转换、合并、分组等等的操做一律未作要求,所以是一个很是抽象且精简的接口规范。

若是这时候有人要造轮子,再写一套响应式开发库,如何基于这几个接口展开呢?

2.1.2 照虎画猫,理解订阅后发生了什么

Reactor 3是遵循响应式流规范的实现,所以,小撸一把Reactor的源码有助于咱们理解规范中定义的接口的使用。

Reactor中,咱们最早接触的生成Publisher的方法就是Flux.just(),下面咱们来动手写代码模拟一下Reactor的实现方式。不过具有生产能力开发库会考虑性能、并发安全性等诸多因素,所谓“照虎画猫”,咱们的代码只是模拟出实现思路,代码少的多,但五脏俱全。

源码位于:https://github.com/get-set/get-reactive/tree/master/my-reactor

首先,引入响应式流规范的四个接口定义,基于Java 9的话能够直接使用java.util.concurrent.Flow

<dependency>
        <groupId>org.reactivestreams</groupId>
        <artifactId>reactive-streams</artifactId>
        <version>1.0.2</version>
    </dependency>

首先建立最最基础的类Flux,它是一个Publisher

package reactor.core.publisher;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public abstract class Flux<T> implements Publisher<T> {
    public abstract void subscribe(Subscriber<? super T> s);
}

在Reactor中,Flux既是一个发布者,又充当工具类的角色,当咱们用Flux.just()Flux.range()Flux.interval()等工厂方法生成Flux时,会new一个新的Flux,好比Flux.just会返回一个FluxArray对象。

public static <T> Flux<T> just(T... data) {
        return new FluxArray<>(data);
    }

返回的FluxArray对象是Flux.just生成的Publisher,它继承自Flux,并实现了subscribe方法。

public class FluxArray<T> extends Flux<T> {
    private T[] array;  // 1

    public FluxArray(T[] data) {
        this.array = data;
    }

    @Override
    public void subscribe(Subscriber<? super T> actual) {
        actual.onSubscribe(new ArraySubscription<>(actual, array)); // 2
    }
}
  1. FluxArray内部使用一个数组来保存数据;
  2. subscribe方法一般会回调SubscriberonSubscribe方法,该方法须要传入一个Subscription对象,从而订阅者以后能够经过回调传回的Subscriptionrequest方法跟FluxArray请求数据,这也是回压的应有之义。

继续编写ArraySubscription

public class FluxArray<T> extends Flux<T> {
    ...
    static class ArraySubscription<T> implements Subscription { // 1
        final Subscriber<? super T> actual;
        final T[] array;    // 2
        int index;
        boolean canceled;

        public ArraySubscription(Subscriber<? super T> actual, T[] array) {
            this.actual = actual;
            this.array = array;
        }

        @Override
        public void request(long n) {
            if (canceled) {
                return;
            }
            long length = array.length;
            for (int i = 0; i < n && index < length; i++) {
                actual.onNext(array[index++]);  // 3
            }
            if (index == length) {
                actual.onComplete();    // 4
            }
        }

        @Override
        public void cancel() {  // 5
            this.canceled = true;
        }
    }
}
  1. ArraySubscription是一个静态内部类。静态内部类是最简单的一种内部类,你尽能够把它当成普通的类,只不过刚好定义在其余类的内部;
  2. 可见在Subscription内也有一份数据;
  3. 当有能够发出的元素时,回调订阅者的onNext方法传递元素;
  4. 当全部的元素都发完时,回调订阅者的onComplete方法;
  5. 订阅者可使用Subscription取消订阅。

到此为止,发布者就开发完了。咱们测试一下:

@Test
public void fluxArrayTest() {
    Flux.just(1, 2, 3, 4, 5).subscribe(new Subscriber<Integer>() { // 1

        @Override
        public void onSubscribe(Subscription s) {
            System.out.println("onSubscribe");
            s.request(6);   // 2
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("onNext:" + integer);
        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });
}
  1. Subscriber经过匿名内部类定义,其中须要实现接口的四个方法;
  2. 订阅时请求6个元素。

测试方法运行以下:

1
2
3
4
5
Completed.

若是请求3个元素呢?输出以下:

1
2
3

没有完成事件,OK,一个简单的Flux.just就完成了,经过这个例子咱们可以初步摸出Flux工厂方法的一些“套路”:

  • 工厂方法返回的是Flux子类的实例,如FluxArray
  • FluxArraysubscribe方法会返回给订阅者一个Subscription实现类的对象,这个ArraySubscriptionFluxArray的静态内部类,定义了“如何发布元素”的逻辑;
  • 订阅者能够经过这个ArraySubscription对象向发布者请求n个数据;发布者也能够借助这个ArraySubscription对象向订阅者传递数据元素(onNext/onError/onComplete)。

用图来表示以下(因为Subscription是静态内部类,能够看作普通类,就单独放一边了):

title

上图的这个过程基本适用于大多数的用于生成Flux/Mono的静态工厂方法,如Flux.justFlux.range等。

首先,使用相似Flux.just的方法建立发布者后,会建立一个具体的发布者(Publisher),如FluxArray

  1. 当使用.subscribe订阅这个发布者时,首先会new一个具备相应逻辑的Subscription(如ArraySubscription,这个Subscription定义了如何处理下游的request,以及如何“发出数据”);
  2. 而后发布者将这个Subscription经过订阅者的.onSubscribe方法传给订阅者;
  3. 在订阅者的.onSubscribe方法中,须要经过Subscription发起第一次的请求.request
  4. Subscription收到请求,就能够经过回调订阅者的onNext方法发出元素了,有多少发多少,但不能超过请求的个数;
  5. 订阅者在onNext中一般定义对元素的处理逻辑,处理完成以后,能够继续发起请求;
  6. 发布者根据继续知足订阅者的请求;
  7. 直至发布者的序列结束,经过订阅者的onComplete予以告知;固然序列发送过程当中若是有错误,则经过订阅者的onError予以告知并传递错误信息;这两种状况都会致使序列终止,订阅过程结束。

以上从1~7这些阶段称为订阅期(subscribe time)

2.1.3 照虎画猫——操做符“流水线”

响应式开发库的一个很赞的特性就是能够像组装流水线同样将操做符串起来,用来声明复杂的处理逻辑。好比:

Flux ff = Flux.just(1, 2, 3, 4, 5)
    .map(i -> i * i)
    .filter(i -> (i % 2) == 0);
ff.subscribe(...)

经过源码,咱们能够了解这种“流水线”的实现机制。下面咱们仍然是经过照虎画猫的方式模拟一下Reactor中Flux.map的实现方式。

Flux.map用于实现转换,转换后元素的类型可能会发生变化,转换的逻辑由参数Function决定。方法自己返回的是一个转换后的Flux,基于此,该方法实现以下:

public abstract class Flux<T> implements Publisher<T> {
    ...
    public <V> Flux<V> map(Function<? super T, ? extends V> mapper) {   // 1
        return new FluxMap<>(this, mapper); // 2
    }
}
  1. 泛型方法,经过泛型表示可能出现的类型的变化(T → V);
  2. FluxMap就是新的Flux。

既然FluxMap是一个新的Flux,那么与2.1.2中FluxArray相似,其内部定义有MapSubscription,这是一个Subscription,可以根据其订阅者的请求发出数据。

public class FluxMap<T, R> extends Flux<R> {

    private final Flux<? extends T> source;
    private final Function<? super T, ? extends R> mapper;

    public FluxMap(Flux<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    public void subscribe(Subscriber<? super R> actual) {
        source.subscribe(new MapSubscriber<>(actual, mapper));
    }

    static final class MapSubscription<T, R> implements Subscription {
        private final Subscriber<? super R> actual;
        private final Function<? super T, ? extends R> mapper;

        MapSubscriber(Subscriber<? super R> actual, Function<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void request(long n) {   // 1
            // TODO 收到请求,发出元素
        }

        @Override
        public void cancel() {
            // TODO 取消订阅
        }
    }
}
  1. 可是map操做符并不产生数据,只是数据的搬运工。收到request后要发出的数据来自上游。

因此MapSubscription同时也应该是一个订阅者,它订阅上游的发布者,并将数据处理后传递给下游的订阅者(为了跟Reactor源码一致,将MapSubscription更名为MapSubscriber,其实没差)。

title

如图,对下游是做为发布者,传递上游的数据到下游;对上游是做为订阅者,传递下游的请求到上游。

static final class MapSubscriber<T, R> implements Subscriber<T>, Subscription { // 1
    ...
}
  1. 实现了SubscriberSubscription

这样,总共有5个方法要实现:来自Subscriber接口的onSubscribeonNextonErroronComplete,和来自Subscription接口的requestcancel。下面咱们本着“搬运工”的角色实现这几个方法便可。

static final class MapSubscriber<T, R> implements Subscriber<T>, Subscription {
    private final Subscriber<? super R> actual;
    private final Function<? super T, ? extends R> mapper;

    boolean done;

    Subscription subscriptionOfUpstream;

    MapSubscriber(Subscriber<? super R> actual, Function<? super T, ? extends R> mapper) {
        this.actual = actual;
        this.mapper = mapper;
    }

    @Override
    public void onSubscribe(Subscription s) {
        this.subscriptionOfUpstream = s;    // 1
        actual.onSubscribe(this);           // 2
    }

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        actual.onNext(mapper.apply(t));     // 3
    }

    @Override
    public void onError(Throwable t) { 
        if (done) {
            return;
        }
        done = true;
        actual.onError(t);                  // 4
    }

    @Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;
        actual.onComplete();                // 5
    }

    @Override
    public void request(long n) {
        this.subscriptionOfUpstream.request(n);     // 6
    }

    @Override
    public void cancel() {
        this.subscriptionOfUpstream.cancel();       // 7
    }
}
  1. 拿到来自上游的Subscription;
  2. 回调下游的onSubscribe,将自身做为Subscription传递过去;
  3. 收到上游发出的数据后,将其用mapper进行转换,而后接着发给下游;
  4. 将上游的错误信号原样发给下游;
  5. 将上游的完成信号原样发给下游;
  6. 将下游的请求传递给上游;
  7. 将下游的取消操做传递给上游。

从这个对源码的模仿,能够体会到,当有多个操做符串成“操做链”的时候:

  • 向下:很天然地,数据和信号(onSubscribeonNextonErroronComplete)是经过每个操做符向下传递的,传递的过程当中进行相应的操做处理,这一点并不难理解;
  • 向上:然而在内部咱们看不到的是,有一个自下而上的“订阅链”,这个订阅链能够用来传递request,所以回压(backpressure)能够实现从下游向上游的传递。

这一节最开头的那一段代码的执行过程以下图所示:

title

2.1.4 LambdaSubscriber

在1.3.2节的时候,介绍了.subscribe的几个不一样方法签名的变种:

subscribe(  Consumer<? super T> consumer) 

subscribe(  @Nullable Consumer<? super T> consumer, 
            Consumer<? super Throwable> errorConsumer) 

subscribe(  @Nullable Consumer<? super T> consumer,
            @Nullable Consumer<? super Throwable> errorConsumer,
            @Nullable Runnable completeConsumer) 

subscribe(  @Nullable Consumer<? super T> consumer,
            @Nullable Consumer<? super Throwable> errorConsumer,
            @Nullable Runnable completeConsumer,
            @Nullable Consumer<? super Subscription> subscriptionConsumer)

用起来很是方便,可是响应式流规范中只定义了一个订阅方法subscribe(Subscriber subscriber)。实际上,这几个方法最终都是调用的subscribe(LambdaSubscriber subscriber),并经过LambdaSubscriber实现了对不一样个数参数的组装。以下图所示:

title

所以,

flux.subscribe(System.out::println, System.err::println);

是调用的:

flux.subscribe(new LambdaSubscriber(System.out::println, System.err::println, null, null));
相关文章
相关标签/搜索