响应式编程是一种关注于数据流(data streams)和变化传递(propagation of change)的异步编程方式。 这意味着它能够用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。html
在响应式编程方面,微软跨出了第一步,它在 .NET 生态中建立了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在 JVM 上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式 编程规范,它定义了一系列标准接口和交互规范。并整合到 Java 9 中(Flow 类)。java
响应式编程一般做为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 由于其中也有 Iterable-Iterator 这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。react
此外,对推送来的数据的操做是经过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者经过描述“控制流程”来定义对数据流的处理逻辑。git
除了数据推送,对错误处理(error handling)和完成(completion)信号的定义也很完善。一个 Publisher 能够推送新的值到它的 Subscriber(调用 onNext 方法), 一样也能够推送错误(调用 onError 方法)和完成(调用 onComplete 方法)信号。 错误和完成信号均可以终止响应式流。能够用下边的表达式描述:github
onNext x 0..N [onError | onComplete]
复制代码
这种方式很是灵活,不管是有/没有值,仍是 n 个值(包括有无限个值的流,好比时钟的持续读秒),均可处理。编程
以上来自 projectreactor.io/docs/core/r… 翻译数组
Reactive Streams 是上面提到的一套标准的响应式编程规范。它由四个核心概念构成:bash
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
复制代码
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
复制代码
public interface Subscription {
// request(n)用来发起请求数据,其中n表示请求数据的数量,它必须大于0,
// 不然会抛出IllegalArgumentException,并触发onError,request的调用会
// 累加,若是没有终止,最后会触发相应次数的onNext方法.
public void request(long n);
// cancel至关于取消订阅,调用以后,后续不会再收到订阅,onError 和
// onComplete也不会被触发
public void cancel();
}
复制代码
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
复制代码
Reactive Streams 经过上面的四个核心概念和相关的函数,对响应式流进行了一个框架性的约定,它没有具体实现。简单来讲,它只提供通用的、合适的解决方案,你们都按照这个规约来实现就行了。app
Java 的 Reactive Programming 类库主要有三个,分别是 Akka-Streams ,RxJava 和 Project Reactor。Spring 5 开始支持 Reactive Programming,其底层使用的是 Project Reactor。本篇主要是对 Project Reactor 中的一些点进行学习总结。框架
Project Reactor 是一个基于 Java 8 的实现了响应式流规范 (Reactive Streams specification)的响应式库。
Reactor 引入了实现 Publisher 的响应式类 Flux 和 Mono,以及丰富的操做方式。 一个 Flux 对象表明一个包含 0..N 个元素的响应式序列,而一个 Mono 对象表明一个包含零或者一个(0..1)元素的结果。
Flux 是生产者,即咱们上面提到的 Publisher,它表明的是一个包含 0-N 个元素的异步序列,Mono能够看作 Flux 的有一个特例,表明 0-1 个元素,若是不须要生产任何元素,只是须要一个完成任务的信号,可使用 Mono。
先来看这张图,这里是直接从官方文档上贴过来的。就这张图作下说明,先来关注几个点:
那总体来看就是 Flux 产生元数据,经过一系列 operator 操做获得转换结果,正常成功就是 onCompleted,出现错误就是 onError。看下面的一个小例子:
Flux.just("glmapper","leishu").subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription subscription) {
// subscription 表示订阅关系
System.out.println("onSubscribe,"+ subscription.getClass());
// subscription 经过 request 来触发 onNext
subscription.request(2);
}
@Override
public void onNext(String s) {
System.out.println("currrent value is = " + s);
}
@Override
public void onError(Throwable throwable) {
System.out.println("it's error.");
}
@Override
public void onComplete() {
System.out.println("it's completed.");
}
});
复制代码
执行结果:
onSubscribe,class reactor.core.publisher.StrictSubscriber
currrent value is = glmapper
currrent value is = leishu
it's completed. 复制代码
若是在 onSubscribe 方法中咱们不执行 request,则不会有后续任何操做。关于 request 下面看。
Flux 是一个可以发出 0 到 N 个元素的标准的 Publisher,它会被一个 "error" 或 "completion" 信号终止。所以,一个 Flux 的结果多是一个 value、completion 或 error。 就像在响应式流规范中规定的那样,这三种类型的信号被翻译为面向下游的
onNext
,onComplete
和onError
方法。
这张图也来自官方文档,和上面 Flux 的区别就是,Mono 最多只能 emitted 一个元素。
Mono.just("glmapper").subscribe(System.out::println);
复制代码
经过上面两段小的代码来看,最直观的感觉是,Flux 至关于一个 List,Mono 至关于 Optional。其实在编程中全部的结果咱们均可以用 List 来 表示,可是当只返回一个或者没有结果时,用 Optional 可能会更精确些。
Optional 相关概念可自行搜索 jdk Optional
另外,Mono 和 Flux 都提供了一些工厂方法,用于建立相关的实例,这里简单罗列一下:
// 能够指定序列中包含的所有元素。建立出来的 Flux
// 序列在发布这些元素以后会自动结束。
Flux.just("glmapper", "leishu");
// 从一个Iterable 对象中建立 Flux 对象,固然还能够是数组、Stream对象等
Flux.fromIterable(Arrays.asList("glmapper","leishu"));
// 建立一个只包含错误消息的序列。
Flux.error(new IllegalStateException());
// 建立一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间
// 隔来发布。除了间隔时间以外,还能够指定起始元素发布以前的延迟时间。
Flux.interval(Duration.ofMillis(100)).take(10);
// 建立一个不包含任何消息通知的序列。
Flux.never();
// 建立一个不包含任何元素,只发布结束消息的序列。
Flux.empty();
// 建立包含从 start 起始的 count 个数量的 Integer 对象的序列
Flux.range(int start, int count);
// Mono 同上
Mono.empty();
Mono.never();
Mono.just("glmapper");
Mono.error(new IllegalStateException());
复制代码
上面的这些静态方法适合于简单的序列生成,当序列的生成须要复杂的逻辑时,则应该使用 generate() 或 create() 方法。
Reactor 的核心调用过程大体能够分为图中的几个阶段
当须要处理 Flux 或 Mono 中的消息时,能够经过 subscribe 方法来添加相应的订阅逻辑。在调用 subscribe 方法时能够指定须要处理的消息类型。能够只处理其中包含的正常消息,也能够同时处理错误消息和完成消息。
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.subscribe(System.out::println, System.err::println);
复制代码
结果:
1
2
java.lang.IllegalStateException
复制代码
正常的消息处理相对简单。当出现错误时,有多种不一样的处理策略:
经过 onErrorReturn() 方法返回一个默认值
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.onErrorReturn(0)
.subscribe(System.out::println);
复制代码
结果:
1
2
0
复制代码
经过 onErrorResume()方法来根据不一样的异常类型来选择要使用的产生元素的流
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalArgumentException()))
.onErrorResume(e -> {
if (e instanceof IllegalStateException) {
return Mono.just(0);
} else if (e instanceof IllegalArgumentException) {
return Mono.just(-1);
}
return Mono.empty();
}).subscribe(System.out::println);
复制代码
结果:
1
2
-1
复制代码
经过 retry 操做符来进行重试,重试的动做是经过从新订阅序列来实现的。在使用 retry 操做符时能够指定重试的次数。
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.retry(1)
.subscribe(System.out::println);
复制代码
结果:
1
2
1
2
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException
Caused by: java.lang.IllegalStateException
at com.glmapper.bridge.boot.reactor.SimpleTest.testFluxSub(SimpleTest.java:75)
at com.glmapper.bridge.boot.reactor.SimpleTest.main(SimpleTest.java:23)
复制代码
在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler,Scheduler 是一个拥有普遍实现类的抽象接口,Schedulers 类提供的静态方法用于达成以下的执行环境:
Schedulers.immediate().schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// main-11
复制代码
Schedulers.single().schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// single-1-11
复制代码
Schedulers.elastic().schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// elastic-2-11
复制代码
Schedulers.parallel().schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// parallel-1-11
复制代码
ExecutorService executorService = Executors.newSingleThreadExecutor();
Schedulers.fromExecutorService(executorService).schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// pool-4-thread-1-11
复制代码
Schedulers.newElastic("test-elastic").schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// test-elastic-4-11
复制代码
一些操做符默认会使用一个指定的调度器(一般也容许开发者调整为其余调度器)例如, 经过工厂方法 Flux.interval(Duration.ofMillis(100)) 生成的每 100ms 打点一次的 Flux, 默认状况下使用的是 Schedulers.parallel(),下边的代码演示了如何将其装换为 Schedulers.single()
Flux<String> intervalResult = Flux.interval(Duration.ofMillis(100),
Schedulers.newSingle("test"))
.map(i -> Thread.currentThread().getName() +"@"+i);
intervalResult.subscribe(System.out::println);
复制代码
结果:
test-1@0
test-1@1
test-1@2
test-1@3
test-1@4
// 省略
复制代码
Reactor 提供了两种在响应式链中调整调度器 Scheduler 的方法:publishOn 和 subscribeOn。 它们都接受一个 Scheduler 做为参数,从而能够改变调度器。可是 publishOn 在链中出现的位置是有讲究的,而 subscribeOn 则无所谓。
Flux.create(sink -> {
sink.next(Thread.currentThread().getName());
sink.complete();
})
.publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);
复制代码
结果:
[elastic-2] [single-1] parallel-1
复制代码
上面这段代码使用 create() 方法建立一个新的 Flux 对象,其中包含惟一的元素是当前线程的名称。
接着是两对 publishOn() 和 map()方法,其做用是先切换执行时的调度器,再把当前的线程名称做为前缀添加。
最后经过 subscribeOn()方法来改变流产生时的执行方式。
最内层的线程名字 parallel-1 来自产生流中元素时使用的 Schedulers.parallel()调度器,中间的线程名称 single-1 来自第一个 map 操做以前的 Schedulers.single() 调度器,最外层的线程名字 elastic-2 来自第二个 map 操做以前的 Schedulers.elastic()调度器。
先到这里,剩下的想到再补充...