响应式编程简介之:Reactor

简介

Reactor是reactivex家族的一个很是重要的成员,Reactor是第四代的reactive library,它是基于Reactive Streams标准基础上开发的,主要用来构建JVM环境下的非阻塞应用程序。java

今天给你们介绍一下Reactor。react

Reactor简介

Reactor是基于JVM的非阻塞API,他直接跟JDK8中的API相结合,好比:CompletableFuture,Stream和Duration等。git

它提供了两个很是有用的异步序列API:Flux和Mono,而且实现了Reactive Streams的标准。程序员

而且还能够和reactor-netty相结合,做为一些异步框架的底层服务,好比咱们很是熟悉的Spring MVC 5中引入的WebFlux。github

咱们知道WebFlux的底层使用的是reactor-netty,而reactor-netty又引用了Reactor。因此,若是你在POM中引入了webFlux依赖:web

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
复制代码

那么项目将会自动引入Reactor。spring

若是你用的不是Spring webflux,不要紧,你能够直接添加下面的依赖来使用Reactor:编程

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>
复制代码

reactive programming的发展史

最最开始的时候微软为.NET平台建立了Reactive Extensions (Rx) library。接着RxJava实现了JVM平台的Reactive。markdown

而后Reactive Streams标准出现了,它定义了Java平台必须知足的的一些规范。而且已经集成到JDK9中的java.util.concurrent类中。框架

在Flow中定义了实现Reactive Streams的四个很是重要的组件,分别是Publisher,Subscriber,Subscription和Processor。

Iterable-Iterator 和Publisher-Subscriber的区别

通常来讲reactive在面向对象的编程语言中是以观察者模式的扩展来使用的。

咱们来具体看一下这个观察者模式的实现,以Publisher和Subscriber为例:

public static interface Publisher<T> {
        public void subscribe(Subscriber<? super T> subscriber);
    }
复制代码
public static interface Subscriber<T> {

        public void onSubscribe(Subscription subscription);

        public void onNext(T item);

        public void onError(Throwable throwable);

        public void onComplete();
    }
复制代码

上面定义了两个接口,Publisher和Subscriber,Publisher的做用就是subscribe到subscriber。

而subscriber定义了4个on方法,用来触发特定的事件。

那么Publisher中的subscribe是怎么触发Subscriber的onSubscribe事件呢?

很简单,咱们看一个具体的实现:

public void subscribe(Flow.Subscriber<? super T> subscriber) {
        Subscription sub;
        if (throwable != null) {
            assert iterable == null : "non-null iterable: " + iterable;
            sub = new Subscription(subscriber, null, throwable);
        } else {
            assert throwable == null : "non-null exception: " + throwable;
            sub = new Subscription(subscriber, iterable.iterator(), null);
        }
        subscriber.onSubscribe(sub);

        if (throwable != null) {
            sub.pullScheduler.runOrSchedule();
        }
    }
复制代码

上面的例子是PullPublisher的subscribe实现。咱们能够看到,在这个subscribe中触发了subscriber.onSubscribe方法。而这就是观察者模式的秘密。

或者说,当Publisher调用subscribe的时候,是主动push subscriber的onSubscribe方法。

熟悉Iterable-Iterator模式的朋友应该都知道,Iterator模式,实际上是一个主动的pull模式,由于须要不断的去调用next()方法。因此它的控制权是在调用方。

为何要使用异步reactive

在现代应用程序中,随着用户量的增多,程序员须要考虑怎么才能提高系统的处理能力。

传统的block IO的方式,由于须要占用大量的资源,因此是不适合这样的场景的。咱们须要的是NO-block IO。

JDK中提供了两种异步编程的模型:

第一种是Callbacks,异步方法能够经过传入一个Callback参数的形式来在Callback中执行异步任务。比较典型的像是java Swing中的EventListener。

第二中就是使用Future了。咱们使用Callable来提交一个任务,而后经过Future来拿到它的运行结果。

这两种异步编程会有什么问题呢?

callback的问题就在于回调地狱。熟悉JS的朋友应该很理解这个回调地狱的概念。

简单点讲,回调地狱就是在callback中又使用了callback,从而形成了这种callback的层级调用关系。

而Future主要是对一个异步执行的结果进行获取,它的 get()其实是一个block操做。而且不支持异常处理,也不支持延迟计算。

当有多个Future的组合应该怎么处理呢?JDK8 实际上引入了一个CompletableFuture类,这个类是Future也是一个CompletionStage,CompletableFuture支持then的级联操做。不过CompletableFuture提供的方法不是那么的丰富,可能知足不了个人需求。

因而咱们的Reactor来了。

Flux

Reactor提供了两个很是有用的操做,他们是 Flux 和 Mono。 其中Flux 表明的是 0 to N 个响应式序列,而Mono表明的是0或者1个响应式序列。

咱们看一个Flux是怎么transfer items的:

先看下Flux的定义:

public abstract class Flux<T> implements Publisher<T> 复制代码

能够看到Flux其实就是一个Publisher,用来产生异步序列。

Flux提供了很是多的有用的方法,来处理这些序列,而且提供了completion和error的信号通知。

相应的会去调用Subscriber的onNext, onComplete, 和 onError 方法。

Mono

咱们看下Mono是怎么transfer items的:

看下Mono的定义:

public abstract class Mono<T> implements Publisher<T> 复制代码

Mono和Flux同样,也是一个Publisher,用来产生异步序列。

Mono由于只有0或者1个序列,因此只会触发Subscriber的onComplete和onError方法,没有onNext。

另外一方面,Mono其实能够看作Flux的子集,只包含Flux的部分功能。

Mono和Flux是能够互相转换的,好比Mono#concatWith(Publisher)返回一个Flux,而 Mono#then(Mono)返回一个Mono.

Flux和Mono的基本操做

咱们看下Flux建立的例子:

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
复制代码

能够看到Flux提供了不少种建立的方式,咱们能够自由选择。

再看看Flux的subscribe方法:

Disposable subscribe(); 

Disposable subscribe(Consumer<? super T> consumer); 

Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); 

Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); 

Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
复制代码

subscribe能够一个参数都没有,也能够多达4个参数。

看下没有参数的状况:

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);

numbersFromFiveToSeven.subscribe();
复制代码

注意,没有参数并不表示Flux的对象不被消费,只是不可见而已。

看下带参数的状况:consumer用来处理on each事件,errorConsumer用来处理on error事件,completeConsumer用来处理on complete事件,subscriptionConsumer用来处理on subscribe事件。

前面的3个参数很好理解,咱们来举个例子:

Flux<Integer> ints3 = Flux.range(1, 4);
        ints3.subscribe(System.out::println,
                error -> System.err.println("Error " + error),
                () -> System.out.println("Done"),
                sub -> sub.request(2));
复制代码

咱们构建了从1到4的四个整数的Flux,on each就是打印出来,若是中间有错误的话,就输出Error,所有完成就输出Done。

那么最后一个subscriptionConsumer是作什么用的呢?

subscriptionConsumer accept的是一个Subscription对象,咱们看下Subscription的定义:

public interface Subscription {

    public void request(long n);
    public void cancel();
}
复制代码

Subscription 定义了两个方法,用来作初始化用的,咱们能够调用request(n)来决定此次subscribe获取元素的最大数目。

好比上面咱们的例子中,虽然构建了4个整数,可是最终输出的只有2个。

上面全部的subscribe方法,都会返回一个Disposable对象,咱们能够经过Disposable对象的dispose()方法,来取消这个subscribe。

Disposable只定义了两个方法:

public interface Disposable {

	void dispose();

	default boolean isDisposed() {
		return false;
	}
复制代码

dispose的原理是向Flux 或者 Mono发出一个中止产生新对象的信号,可是并不能保证对象产生立刻中止。

有了Disposable,固然要介绍它的工具类Disposables。

Disposables.swap() 能够建立一个Disposable,用来替换或者取消一个现有的Disposable。

Disposables.composite(…​)能够将多个Disposable合并起来,在后面统一作处理。

总结

本文介绍了Reactor的基本原理和两很是重要的组件Flux和Mono,下一篇文章咱们会继续介绍Reactor core的一些更加高级的用法。敬请期待。

本文的例子learn-reactive

本文做者:flydean程序那些事

本文连接:www.flydean.com/introductio…

本文来源:flydean的博客

欢迎关注个人公众号:「程序那些事」最通俗的解读,最深入的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

相关文章
相关标签/搜索