Reactor 项目的主要 artifact 是 reactor-core,这是一个基于 Java 8 的实现了响应式流规范的响应式库。html
Reactor 提供了实现 Publisher 的响应式类 Flux 和 Mono,以及丰富的操做符。一个 Flux 表明 0...N 个元素的响应式流;一个 Mono 表明 0|1 个元素的响应式流。java
Flux 和 Mono 之间能够转换,好比 Flux 的 count 操做(计算流中元素个数)返回 Mono,Mono 的 concatWith 操做(链接另外一个响应式流)返回 Flux。react
Flux<T> 是一个可以发出 0 到 N 个元素的标准 Publisher<T>,它会被一个完成(completion)或错误(error)信号终止。所以,一个 Flux 的可能结果是 value、completion
或 value,这三个分别会传递给订阅者中的 onNext、onComplete、onError 方法。git
注意:全部的信号事件,包括表明终止的信号事件都是可选的。若是没有 onNext 事件,可是有 onComplete 事件,那么发出的就是空的有限流;若是去掉 onComplete
就获得一个 无限的空数据流。无限的数据流能够不是空的,好比 Flux.interval(Duration) 生成的是一个 Flux,这是一个无限周期性发出规律整数的时钟数据流。 github
下图展现的是 Flux 基于时间线的弹珠交互图,经过操做符转换 Flux 中元素:web
后续在学习操做符的过程当中,咱们将见到不少相似的弹珠图,请你们详细了解清楚该图各部分的含义。spring
Mono<T> 是一种特殊的Publisher<T>,它最多只能发出一个元素,而后(可选的)终止于 onComplete 或 onError 信号。编程
Mono 中的操做符是 Flux 中操做符的子集,即 Flux 中只有部分操做符适用于 Mono,有些操做符是将 Mono 和另外一个 Publisher 链接转换为 Flux。例如,Mono#concatWith(Publisher
) 转换为 Flux,Mono#then(Mono) 返回另外一个 Mono。数组
注意:可使用 Mono
来建立一个只有完成概念的空值异步处理过程(相似于 Runnable)。 异步
下图展现的是 Mono 基于时间线的弹珠交互图:
如同建立 Java Stream 同样,Reactor 也为咱们提供了 多个工厂方法用来建立 Flux 和 Mono,有了 Stream 的基础,建立的基本方法咱们来快速过一下。
下面的建立方法,若是是 Flux 或 Mono 独有的,会在方法名前增长类名前缀。
下面的示例代码中都有用到 subscribe 方法,下面会讲到,你们先了解它是响应式流的订阅方法,用于触发流,相似于 Java Stream 中的终端操做。
使用提供的元素发出数据而后结束的流。
Mono.just("hello, world").subscribe(System.out::println); Mono.justOrEmpty(str).subscribe(System.out::println); Mono.justOrEmpty(optional).subscribe(System.out::println); Flux.just("hello", "world").subscribe(System.out::println); Flux.just("hello").subscribe(System.out::println);
Flux 提供了 fromArray(从数组)、fromIterable(从迭代器)、fromStream(从 Java Stream 流) 的方式来建立 Flux。
String[] array = new String[]{"hello", "reactor", "flux"}; List<String> iterable = Arrays.asList("foo", "bar", "foobar"); Flux.fromArray(array).subscribe(System.out::println); Flux.fromIterable(iterable).subscribe(System.out::println); Flux.fromStream(Arrays.stream(array)).subscribe(System.out::println);
从 start 开始构建一个 Flux,该 Flux 仅发出一系列递增计数的整数。 也就是说,在 start(包括)和 start + count(排除)之间发出整数,而后完成。见图识意:
Flux.range(3, 5).subscribe(System.out::println);
在全局计时器上建立一个 Flux,该 Flux 在初始延迟后,发出从0开始并以指定的时间间隔递增的长整数。 若是未及时产生,则会经过溢出 IllegalStateException 发出 onError
信号,详细说明没法发出的缘由。 在正常状况下,Flux 将永远不会完成。interval 提供了 3 个重载方法,三者的区别主要在因而否延迟发出、以及使用的调度器。
interval 生成的是一个无限数据流。
Flux<Long> interval(Duration period) Flux<Long> interval(Duration delay, Duration period) Flux<Long> interval(Duration delay, Duration period, Scheduler timer)
见图识意:
Flux.interval(Duration.ofMillis(30), Duration.ofMillis(500)).subscribe(System.out::println);
生成一个空的有限流。见图识意:
Flux.empty().subscribe(System.out::println, System.out::println, () -> System.out.println("结束"));
生成一个空的无限流。见图识意:
Flux.never().subscribe(System.out::println, System.out::println, () -> System.out.println("结束"));
生成一个错误流。error 有 3 个重载方法,它们的主要区别是否当即生成错误及是否由 Supplier 提供,见图识意:
Flux.error(new IllegalStateException(), true) .log() .subscribe(System.out::println, System.err::println);
Flux 和 Mono 还提供了编程式的建立数据流的方法,诸如 create、generate、push、handle 等的方式,这些内容暂时不是咱们的重点,这里咱们不细展开,感兴趣的可看 Api 进行研究下。
在上面建立 Flux 和 Mono 笔记的示例代码中,咱们已经提到了 subscribe 订阅,在 subscribe 订阅中,Flux 和 Mono 支持 Java 8 Lambda 表达式。下面咱们来看看 Reactor
为咱们提供了哪些订阅方法。
subscribe(); // ① subscribe(Consumer<? super T> consumer); // ② subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); // ③ subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); // ④ subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer); // ⑤ subscribe(Subscriber<? super T> actual); // ⑥
注意:序号⑤ 变量传递一个 Subscription 的引用,若是再也不须要更多元素时,能够经过它来取消订阅。取消订阅时,源头会中止生成数据,并清理相关的资源。取消和清理的操做是在 Disposable 接口中定义的。
来看下序号 ⑤ 的 subscribe 的弹珠图:
Flux.range(1, 4) .subscribe(System.out::println, error -> System.err.println("发生错误:" + error), () -> System.out.println("完成"), sub -> { System.out.println("已订阅"); // 理解背压 // 尝试修改下 request 中的值,看看有啥变化 sub.request(10); });
注意:序号⑥ 的方式支持背压等操做,不在咱们本次笔记的范畴,咱们仍是先略过,后期在学习。
在上节咱们讲解 Reactor 调试部分时,遗漏了记录数据流的日志方法,再此作下补充:除了基于 stack trace 的方式调试分析,咱们还可使用 log
操做符,来跟踪响应式流并记录日志。将它添加到操做链上以后,它会读取每个再其上游的 Flux 和 Mono 事件(包括 onNext、onError、onComplete、Subscribe、Cancel 和 Request)。
// 尝试交换下 take 和 log 的顺序,看看有啥变化 Flux.range(1, 10) // .log() .take(3) .log() .subscribe();
本篇咱们介绍了 Reactor 的基础知识:先是了解了 Reactor 为咱们提供的响应式流类 Flux 和 Mono,以后学习了如何建立他们和订阅他们,由于有以前 Stream
的基础,想来你们对这些知识点都好理解和接受。
今天的内容就学到这里,咱们下篇开始学习 Reactor 的操做符。
源码详见:https://github.com/crystalxmumu/spring-web-flux-study-note 下 02-reactor-core-learning
模块下 ReactorBasicLearningTest 测试类。