Reactor
是第四代响应式框架,跟RxJava 2 有些类似。Reactor 项目由Pivotal 启动,以响应式流规范、Java8 和ReactiveX 术语表为基础。它的设计是Reactor 2(上一个主要版本)和RxJava 核心贡献者共同努力的结果。java
在以前的同系列文章 RxJava 实例解析 和测试RxJava里,咱们已经了解了响应式编程的基础:数据流的概念、Observable 类和它的各类操做以及经过工厂方法建立静态和动态的Observable 对象。react
Observable 是事件的源头,Observer 提供了一组简单的接口,并经过订阅事件源来消费 Observable 的事件。Observable 经过 onNext 向 Observer 通知事件的到达,后面可能会跟上 onError 或 onComplete 来表示事件的结束。web
RxJava 提供了 TestSubscriber 来测试 Observable,TestSubscriber 是一个特别的 Observer,能够用它断言流事件。spring
在这篇文章里,咱们将会对 Reactor
和 RxJava
进行比较,包括它们的相同点和不一样点。数据库
Reactor 有两种类型, Flux<T>
和 Mono<T>
。编程
由于这两种类型之间的简单区别,咱们能够很容易地区分响应式 API 的类型:从返回的类型咱们就能够知道一个方法会发射并忘记或请求并等待(Mono),仍是在处理一个包含多个数据项的流(Flux)。后端
Flux
和 Mono
的一些操做利用了这个特色在这两种类型间互相转换。例如,调用 Flux
Reactor
设计的原则之一是要保持 API 的精简,而对这两种响应式类型的分离,是表现力与 API 易用性之间的折中。数组
正如RxJava 实例解析
里所说的,从设计概念方面来看,RxJava
有点相似 Java 8 Streams API
。而 Reactor
看起来有点像 RxJava
,不过这决不仅是个巧合。这样的设计是为了可以给复杂的异步逻辑提供一套原生的具备 Rx 操做风格的响应式流 API。因此说 Reactor 扎根于响应式流,同时在 API 方面尽量地与 RxJava 靠拢。并发
Reactive Streams (如下简称为 RS)是一种规范,它为基于非阻塞回压的异步流处理提供了标准。它是一组包含了 TCK 工具套件和四个简单接口(Publisher、Subscriber、Subscription 和 Processor)的规范,这些接口将被集成到 Java 9.
RS 主要跟响应式回压(稍后会详细介绍)以及多个响应式事件源之间的交互操做有关。它并不提供任何操做方法,它只关注流的生命周期。
Reactor 不一样于其它框架的最关键一点就是 RS。Flux 和 Mono 这二者都是 RS 的 Publisher 实现,它们都具有了响应式回压的特色。
在 RxJava 1
里,只有少部分操做支持回压,RxJava 1 的 Observable 并无实现 RS 里的任何类型,不过它有一些 RS 类型的适配器。能够说,RxJava 1 实际上比 RS 规范出现得更早,并且在 RS 规范设计期间,RxJava 1 充当了函数式工做者的角色。
因此,你在使用那些 Publisher 适配器时,它们并不会为你提供任何操做。为了能作一些有用的操做,你可能须要用回 Observable,而这个时候你须要另外一个适配器。这种视觉上的混乱会破坏代码的可读性,特别是像 Spring 5 这样的框架,若是整个框架创建在这样的 Publisher 之上,那么就更是杂乱不堪。
RS 规范不支持 null 值,因此在从 RxJava 1 迁移到 Reactor 或 RxJava 2 时要注意这点。若是你在代码里把 null 用做特殊用途,那么就更是要注意了。
RxJava 2
是在 RS 规范以后出现的,因此它直接在 Flowable 类型里实现了 Publisher。不过除了 RS 类型,RxJava 2 还保留了 RxJava 1 的遗留类型(Observable、Completable 和 Single)而且引入了其它一些可选类型——Maybe。这些类型提供了不一样的语义,不过它们并无实现 RS 接口,这是它们的不足之处。跟 RxJava 1 不同,RxJava 2 的 Observable 不支持 RxJava 2 的回压协议(只有 Flowable 具有这个特性)。之因此这样设计是为了可以为一些场景提供一组丰富且流畅的 API,好比用户界面发出的事件,在这样的场景里是不须要用到回压的,并且也不可能用到。Completable、Single 和 Maybe 不须要支持回压,不过它们也提供了一组丰富的 API,并且在被订阅以前不会作任何事情。
在响应式领域,Reactor 变得越发精益,它的 Mono 和 Flux 两种类型都实现了 Publisher,而且都支持回压。虽然把 Mono 做为一个 Publisher 须要付出一些额外的开销,不过 Mono 在其它方面的优点弥补了它的缺点。在后续部分咱们将看到对 Mono 来讲回压意味着什么。
ReactiveX 和 RxJava 的操做术语表有时候真的难以掌握,由于历史缘由,有些操做的名字让人感到困惑。Reactor 尽可能把 API 设计得紧凑,在给 API 取名时尽可能选择好一点的名字,不过总的来讲,这两套 API 看起来仍是很相像。在最新的 RxJava 2 迭代版本中,RxJava 2 借鉴了 Reactor 的一些术语,这预示着这两个项目之间可能会有愈来愈紧密的合做。一些操做和概念老是先出如今其中的一个项目里,而后互相借鉴,最后会同时渗透到两个项目里。
例如,Flux 也有常见的 just 工厂方法(虽然只有两种变形:接受一个参数或变长参数)。不过 from 方法有不少个变种,最值得一提的是 fromIterable。固然,Flux 也包含了那些常规的操做:map、merge、concat、flatMap、take,等等。
Reactor 把 RxJava 里使人困惑的 amb 操做改为了看起来更加中肯的 firstEmitting。另外,为了保持 API 的一致,toList 被从新命名为 collectList。实际上,全部以 collect 开头的操做都会把值聚合到一个特定类型的集合里,不过只会为每一个集合生成一个 Mono。而全部以 to 开头的操做被保留用于类型转换,转换以后的类型能够用于非响应式编程,例如 toFuture()。
在类初始化和资源使用方面,Reactor 之因此也能表现得如此精益,要得益于它的融合特性:Reactor 能够把多个串行的操做(例如调用 concatWith 两次)合并成单个操做,这样就能够只对这个操做的内部类作一次初始化(也就是 macro-fusion)。这个特性包含了基于数据源的优化,抵消了 Mono 在实现 Publisher 时的一些额外开销。它还能在多个相关的操做之间共享资源(也就是 micro-fusion),好比内部队列。这些特性让 Reactor 成为彻彻底底的的第四代响应式框架,不过这个超出了这篇文章的讨论范围。
下面让咱们来看看几个 Reactor 的操做。
(这一小节包含了一些代码片断,咱们建议你动手去运行它们,深刻体验一下 Reactor。因此你须要打开 IDE,并建立一个测试项目,把 Reactor 加入到依赖项里。)
对于 Maven,能够把下面的依赖加到 pom.xml 里:
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.0.3.RELEASE</version> </dependency>
对于 Gradle,要把 Reactor 做为依赖项,相似这样:
dependencies { compile "io.projectreactor:reactor-core:3.0.3.RELEASE" }
咱们来重写前面几篇同系列文章里的例子!
Observable 的建立跟在 RxJava 里有点相似,在 Reactor 里可使用 just(T...) 和 fromIterator(Iterable
public class ReactorSnippets { private static List<String> words = Arrays.asList( "the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog" ); @Test public void simpleCreation() { Flux<String> fewWords = Flux.just("Hello", "World"); Flux<String> manyWords = Flux.fromIterable(words); fewWords.subscribe(System.out::println); System.out.println(); manyWords.subscribe(System.out::println); } }
跟在 RxJava 里同样,上面的代码会打印出:
Hello World the quick brown fox jumped over the lazy dog
为了打印句子里的每个字母,咱们还须要 flatMap 方法(跟在 RxJava 里同样),不过在 Reactor 里咱们使用 fromArray 来代替 from。而后咱们会用 distinct 过滤掉重复的字母,并用 sort 对它们进行排序。最后,咱们使用 zipWith 和 range 输出每一个字母的次序:
@Test public void findingMissingLetter() { Flux<String> manyLetters = Flux .fromIterable(words) .flatMap(word -> Flux.fromArray(word.split(""))) .distinct() .sort() .zipWith(Flux.range(1, Integer.MAX_VALUE), (string, count) -> String.format("%2d. %s", count, string)); manyLetters.subscribe(System.out::println); }
咱们能够很容易地看到s被遗漏了:
1. a 2. b ... 18. r 19. t 20. u ... 25. z
咱们能够经过纠正单词数组来修复这个问题,不过也可使用 concat/concatWith 和一个 Mono 来手动往字母 Flux 里添加“s”:
@Test public void restoringMissingLetter() { Mono<String> missing = Mono.just("s"); Flux<String> allLetters = Flux .fromIterable(words) .flatMap(word -> Flux.fromArray(word.split(""))) .concatWith(missing) .distinct() .sort() .zipWith(Flux.range(1, Integer.MAX_VALUE), (string, count) -> String.format("%2d. %s", count, string)); allLetters.subscribe(System.out::println); }
这样,在去重和排序后,遗漏的 s 字母就被添加进来了:
1. a 2. b ... 18. r 19. s 20. t ... 26. z
上一篇文章提到了 Rx 和 Streams API 之间的类似之处,而实际上,在数据就绪的时候,Reactor 也会像 Java Steams 那样开始简单地推送数据事件(能够参看下面关于回压的内容)。只是在主线程里对事件源进行订阅没法完成更加复杂的异步操做,主要是由于在订阅完成以后,控制权会立刻返回到主线程,并退出整个程序。例如:
@Test public void shortCircuit() { Flux<String> helloPauseWorld = Mono.just("Hello") .concatWith(Mono.just("world") .delaySubscriptionMillis(500)); helloPauseWorld.subscribe(System.out::println); }
这个单元测试会打印出Hello,但没法打印出world,由于程序会过早地退出。在作简单测试的时候,若是你只是像这样写一个简单的主类,你一般会掉入陷阱。做为补救,你能够建立一个 CountDownLatch 对象,并在 Subscriber(包括 onError 和 onComplete)里调用 countDown 方法。不过这样就变得不那么响应式了,不是吗?(万一你忘了调用 countDown 方法,而恰好发生错误了该怎么办?)
解决这个问题的第二种方法是使用一些操做转换到非响应式模式。toItetable 和 toStream 会生成阻塞实例。咱们在例子里使用 toStream:
@Test public void blocks() { Flux<String> helloPauseWorld = Mono.just("Hello") .concatWith(Mono.just("world") .delaySubscriptionMillis(500)); helloPauseWorld.toStream() .forEach(System.out::println); }
正如你所期待的那样,在打印出Hello以后有一个短暂的停顿,而后打印出“world”并退出。咱们以前也提过,RxJava 的 amb 操做在 Reactor 里被重命名为 firstEmitting(正如它的名字所表达的:选择第一个 Flux 来触发)。在下面的例子里,咱们会建立一个 Mono,这个 Mono 会有 450 毫秒的延迟,还会建立一个 Flux,这个 Flux 以 400 毫秒的间隔触发事件。在使用 firstEmitting() 对它们进行合并时,由于 Flux 的第一个值先于 Mono 的值出现,因此最后 Flux 会被采用:
@Test public void firstEmitting() { Mono<String> a = Mono.just("oops I'm late") .delaySubscriptionMillis(450); Flux<String> b = Flux.just("let's get", "the party", "started") .delayMillis(400); Flux.firstEmitting(a, b) .toIterable() .forEach(System.out::println); }
这个单元测试会打印出句子的全部部分,它们之间有 400 毫秒的时间间隔。
这个时候你可能会想,若是我写的测试使用的是 4000 毫秒的间隔而不是 400 毫秒,那会怎样?你不会想在一个单元测试里等待 4 秒钟的!在后面的部分,咱们会看到 Reactor 提供了一些测试工具能够很好地解决这个问题。
咱们已经经过例子比较了 Reactor 的一些经常使用操做,如今咱们要回头看看这个框架其它方面的不一样点。
Reactor 选择 Java 8 做为运行基础而不是以前的任何版本,这再一次与它简化 API 的目标不谋而合:RxJava 选择了 Java 6,而 Java 6 里没有 java.util.function 包,RxJava 也就没法利用这个包下面的 Functino 类和 Consumer 类,因此它必须建立不少相似 Func一、Func二、Action0、Action1 这样的类。RxJava 2 使用相似 Reactor 2 的方式把这些类做为 java.util.function 的镜像,由于它还得支持 Java 7。
Reactor API 还使用了 Java 8 里新引入的一些类型。由于大部分基于时间的操做都跟时间段有关系(例如超时、时间间隔、延迟,等等),因此直接就使用了 Java 8 里的 Duration 类。
Java 8 Stream API 和 CompletableFuture 跟 Flux/Mono 之间能够很容易地进行互相转换。那么通常状况下咱们是否要把 Stream 转成 Flux?不必定。虽说 Flux 或 Mono 对 IO 和内存相关操做的封装所产生的开销微不足道,不过 Stream 自己也并不会带来很大延迟,因此直接使用 Stream API 是没有问题的。对于上述状况,在 RxJava 2 里须要使用 Observable,由于 Observable 不支持回压,因此一旦对其进行订阅,它就成为事件推送的来源。Reactor 是基于 Java 8 的,因此在大部分状况下,Stream API 已经可以知足需求了。要注意的是,尽管 Flux 和 Mono 的工厂模式也支持简单类型,但它们的主要用途仍是在于把对象合并到更高层次的流里面。因此通常来讲,在现有代码上应用响应式模式时,你不会但愿把“long getCount()”这样的方法转成“Mono
回压是 RS 规范和 Reactor 主要关注点之一(若是还有其它关注点的话)。回压的原理是说,在一个推送场景里,生产者的生产速度比消费者的消费速度快,消费者会向生产者发出信号说“嘿,慢一点,我处理不过来了。”生产者能够借机控制数据生成的速度,而不是抛弃数据或者冒着产生级联错误的风险继续生成数据。
你也许会想,在 Mono 里为何也须要回压:什么样的消费者会被一个单独的触发事件压垮?答案是“应该不会有这样的消费者”。不过,在 Mono 和 CompletableFuture 工做原理之间仍然有一个关键的不一样点。后者只有推送:若是你持有一个 Future 的引用,那么说明一个异步任务已经在执行了。另外一方面,回压的 Flux 或 Mono 会启动延迟的拉取 - 推送迭代:
对 Mono 来讲,subscribe() 方法就至关于一个按钮,按下它就等于说我准备好接收数据了。Flux 也有一个相似的按钮,不过它是 request(n) 方法,这个方法是 subscribe() 的通常化用法。
Mono 做为一个 Publisher,它每每表明着一个耗费资源的任务(在 IO、延迟等方面),意识到这点是理解回压的关键:若是不对其进行订阅,你就不须要为之付出任何代价。由于 Mono 常常跟具备回压的 Flux 一块儿被编排到一个响应式链上,来自多个异步数据源的结果有可能被组合到一块儿,这种按需触发的能力是避免阻塞的关键。
咱们可使用回压来区分 Mono 的不一样使用场景,相比上述的例子,Mono 有另一个常见的使用场景:把 Flux 的数据异步地聚合到 Mono 里。reduce 和 hasElement 能够消费 Flux 里的每个元素,再把这些数据以某种形式聚合起来(分别是 reduce 函数的调用结果和一个 boolean 值),做为一个 Mono 对外暴露数据。在这种状况下,使用 Long.MAX_VALUE 向上游发出回压信号,上游会以彻底推送的方式工做。
关于回压另外一个有意思的话题是它如何对存储在内存里的流的对象数量进行限制。做为一个 Publisher,数据源颇有可能出现生成数据缓慢的问题,而来自下游的请求超出了可用数据项。在这种状况下,整个流很天然地进入到推送模式,消费者会在有新数据到达时收到通知。当生产高峰来临,或者在生产速度加快的状况下,整个流又回到了拉取模式。在以上两种状况下,最多有 N 项数据(request() 请求的数据量)会被保留在内存里。
你能够对内存的使用状况进行更精确的推算,把 N 项数据跟每项数据须要消耗的内存 W 结合起来:这样你就能够推算出最多须要消耗 W*N 的内存。实际上,Reactor 在大多数状况下会根据 N 来作出优化:根据状况建立内部队列,并应用预取策略,每次自动请求 75% 的数据量。
Reactor 的操做有时候会根据它们所表明的语义和调用者的指望来改变回压信号。例如对于操做 buffer(10):下游请求 N 项数据,而这个操做会向上游请求 10N 的数据量,这样就能够填满缓冲区,为订阅者提供足够的数据。这一般被称为“主动式回压”,开发人员能够充分利用这种特性,例如在微批次场景里,能够显式地告诉 Reactor 该如何从一个输入源切换到一个输出地。
Reactor 是 Spring 整个生态系统的基础,特别是 Spring 5(经过 Spring Web Reactive)和 Spring Data “kay”(跟 spring-data-commons 2.0 相对应的)。
这两个项目的响应式版本是很是有用的,咱们所以能够开发出彻底响应式的 Web 应用:异步地处理请求,一直到数据库,最后异步地返回结果。Spring 应用所以能够更有效地利用资源,避免为每一个请求单独分配一个线程,还要等待 I/O 阻塞。
Reactor 将被用于将来 Spring 应用的内部响应式核心组件,以及这些 Spring 组件暴露出来的 API。通常状况下,它们能够处理 RS Publisher,不过大多数时候它们要面对的是 Flux/Mono,须要用到 Reactor 的丰富特性。固然,你也能够自行选择其它响应式框架,Reactor 提供了能够用来适配其它 Reactor 类型和 RxJava 类型甚至简单的 RS 类型的钩子接口。
目前,你能够经过 Spring Boot 2.0.0.BUILD-SNAPSHOT 和 spring-boot-starter-web-reactive 依赖项(能够在 start.spring.io 上生成一个这样的项目)来体验 Spring Web Reactive:
<dependency> <groupId>org.springframework.boot.experimental</groupId> <artifactId>spring-boot-starter-web-reactive</artifactId> </dependency>
你能够像往常那样写你的 @Controller,只不过把 Spring MVC 的底层变成响应式的,把大部分 Spring MVC 的契约换成了响应式非阻塞的契约。响应式层默认运行在 Tomcat 8.5 上,你也能够选择使用 Undertow 或 Netty。
{% asset_img 1.jpg %}
另外,虽然 Spring API 是以 Reactor 类型为基础的,不过在 Spring Web Reactive 模块里能够为请求和响应使用各类各样的响应式类型:
下面是使用 Spring Web Reactive 的例子:
@Controller public class ExampleController { private final MyReactiveLibrary reactiveLibrary; public ExampleController(@Autowired MyReactiveLibrary reactiveLibrary) { this.reactiveLibrary = reactiveLibrary; } @RequestMapping("hello/{who}") @ResponseBody public Mono<String> hello(@PathVariable String who) { return Mono.just(who) .map(w -> "Hello " + w + "!"); } @RequestMapping(value = "heyMister", method = RequestMethod.POST) @ResponseBody public Flux<String> hey(@RequestBody Mono<Sir> body) { return Mono.just("Hey mister ") .concatWith(body .flatMap(sir -> Flux.fromArray(sir.getLastName().split(""))) .map(String::toUpperCase) .take(1) ).concatWith(Mono.just(". how are you?")); } }
第一个端点含有一个路径变量,它被转成 Mono,并被映射到一个问候语里返回给客户端。
一个发到 /hello/SImon 的 GET 请求会获得“Hello Simon!”的文本响应。
第二个端点相对复杂一些:它异步地接收序列化 Sir 对象(一个包含了 firstName 和 lastName 属性的类)并使用 flatMap 方法把它映射到一个字母流里,这个字母流包含了 lastName 的全部字母。而后它选取流里的第一个字母,把它转成大写,并跟问候语串在一块儿。
因此向 /heyMister POST 一个 JSON 对象
{ "firstName": "Paul", "lastName": "tEsT" }
会返回字符串“Hello mister T. How are you?”。
响应式 Spring Data 目前也在开发当中,它被做为 Kay 发布的一部分,代码在 spring-data-commons 2.0.x 分支上。如今已经有一个里程碑版本可使用:
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-releasetrain</artifactId> <version>Kay-M1</version> <scope>import</scope> <type>pom</type> </dependency> </dependencies> </dependencyManagement>
而后简单地添加 Spring Data Commons 的依赖(它会自动从上面的 BOM 里获取版本号):
<dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-commons</artifactId> </dependency>
Spring Data 对响应式的支持主要表如今新的 ReactiveCrudRepository 接口,它扩展了 Repository。这个接口暴露了 CRUD 方法,使用的是 Reactor 类型的输入和返回值。还有一个 RxJava 1 的版本,叫做 RxJava1CrudRepository。要在 CrudRepository 里经过 id 获取一个实体,能够调用“T findOne(ID id)”方法,而在 ReactiveCrudRepository 和 RxJava1CrudRepository 里要分别调用“Mono
假设有一个响应式的后端存储(或者 mock 的 ReactiveCrudRepository bean),下面的 controller 将从前到后都是响应式的:
@Controller public class DataExampleController { private final ReactiveCrudRepository<Sir, String> reactiveRepository; public DataExampleController( @Autowired ReactiveCrudRepository<Sir, String> repo) { this.reactiveRepository = repo; } @RequestMapping("data/{who}") @ResponseBody public Mono<ResponseEntity<Sir>> hello(@PathVariable String who) { return reactiveRepository.findOne(who) .map(ResponseEntity::ok) .defaultIfEmpty(ResponseEntity.status(404) .body(null)); } }
请注意整个流程:咱们异步地获取实体并用 map 把它包装成 ResponseEntity,取得一个能够立刻返回的 Mono。若是 Spring Data repository 找不到这个 key 的数据,会返回一个空的 Mono。咱们使用 defaultIfEmpty 显式地返回 404。
测试 RxJava这篇文章里提到了如何测试 Observable。正如咱们所看到的,RxJava 提供了 TestScheduler,咱们能够把它跟 RxJava 的操做一块儿使用,这些操做接受一个 Scheduler 参数,TestScheduler 会为这些操做启动虚拟的时钟。RxJava 还提供了一个 TestSubscriber 类,能够用它等待 Observable 执行完毕,也能够用它对每一个事件进行断言(onNext 的值和它的数量、触发的 onError,等等)。在 RxJava 2 里,TestSubscriber 就是 RS Subscriber,你能够用它测试 Reactor 的 Flux 和 Mono!
在 Reactor 里,上述两个使用普遍的特性被组合到了 StepVerifier 类里。从 reactor-addons 仓库的 reactor-test 模块里能够获取到 StepVerifier。在建立 Publisher 实例时,调用 StepVerifier.create 方法能够初始化一个 StepVerifier。若是要使用虚拟时钟,能够调用 StepVerifier.withVirtualTime 方法,这个方法接受一个 Supplier 做为参数。之因此这样设计,是由于它会首先保证建立一个 VirtualTimeScheduler 对象,并把它做为默认的 Scheduler 传给旧有的操做。StepVerifier 会对在 Supplier 里建立的 Flux/Mono 进行配置,把基于时间的操做转为“虚拟时间操做”。接下来你就能够编写各类你所指望的用例:下一个元素应该是什么,是否应该出现错误,是否应该及时向前移动,等等。借助其它方法,好比事件与 Predicate 的匹配或者对 onNext 事件的消费,你能够与那些值之间作一些更高级的交互(犹如在使用断言框架)。任何地方抛出的 AssertionError 都会在最终的结果里反应出来。最后,调用 verify() 对你的用例进行测试,这个方法会经过 StepVerifier.create 或 StepVerifier.withVirtualTime 方法对预约义的事件源进行订阅。
让咱们来举一些简单的例子来讲明 StepVerifier 时如何工做的。首先要添加依赖到 POM 里:
<dependency> <groupId>io.projectreactor.addons</groupId> <artifactId>reactor-test</artifactId> <version>3.0.3.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.5.2</version> <scope>test</scope> </dependency>
假设你有一个叫做 MyReactiveLibrary 的类,你要对这个类生成的一些 Flux 进行测试:
@Component public class MyReactiveLibrary { public Flux<String> alphabet5(char from) { return Flux.range((int) from, 5) .map(i -> "" + (char) i.intValue()); } public Mono<String> withDelay(String value, int delaySeconds) { return Mono.just(value) .delaySubscription(Duration.ofSeconds(delaySeconds)); } }
第一个方法将返回给定字母以后的 5 个字母。第二个方法返回一个 Flux,它会以给定的时间间隔触发给定值,其中的时间间隔以秒为单位。第一个测试是要保证使用 x 调用 alphabet5 的输出被限定在 x、y、z。使用 StepVerifier 看起来是这样的:
@Test public void testAlphabet5LimitsToZ() { MyReactiveLibrary library = new MyReactiveLibrary(); StepVerifier.create(library.alphabet5('x')) .expectNext("x", "y", "z") .expectComplete() .verify(); }
第二个测试要保证 alphabet5 返回的每一个值都是字母。在这里咱们使用断言框架 AssertJ :
@Test public void testAlphabet5LastItemIsAlphabeticalChar() { MyReactiveLibrary library = new MyReactiveLibrary(); StepVerifier.create(library.alphabet5('x')) .consumeNextWith(c -> assertThat(c) .as("first is alphabetic").matches("[a-z]")) .consumeNextWith(c -> assertThat(c) .as("second is alphabetic").matches("[a-z]")) .consumeNextWith(c -> assertThat(c) .as("third is alphabetic").matches("[a-z]")) .consumeNextWith(c -> assertThat(c) .as("fourth is alphabetic").matches("[a-z]")) .expectComplete() .verify(); }
结果这些测试都运行失败。让咱们检查一下 StepVirifier 的输出,看看能不能找出 bug:
java.lang.AssertionError: expected: onComplete(); actual: onNext({) java.lang.AssertionError: [fourth is alphabetic] Expecting: "{" to match pattern: "[a-z]"
看起来咱们的方法并无在 z 的时候停住,而是继续发出 ASCII 字符。咱们能够加入.take(Math.min(5,'z'-from+1)) 来修复这个 bug,或者把 Math.min 做为 range 的第二个参数。
咱们要作的最后一个测试须要用到虚拟时钟:咱们使用 withVirtualTime 构造器来测试方法的延迟,而不须要真的等待指定的时间:
@Test public void testWithDelay() { MyReactiveLibrary library = new MyReactiveLibrary(); Duration testDuration = StepVerifier.withVirtualTime(() -> library.withDelay("foo", 30)) .expectSubscription() .thenAwait(Duration.ofSeconds(10)) .expectNoEvent(Duration.ofSeconds(10)) .thenAwait(Duration.ofSeconds(10)) .expectNext("foo") .expectComplete() .verify(); System.out.println(testDuration.toMillis() + "ms"); }
这个测试用例测试一个将被延迟 30 秒的 Flux:在订阅以后的 30 秒内不会发生任何事情,而后发生一个 onNext("foo") 事件后结束。
System.out 会打印出验证所须要的时间,在最近的一次测试中它用掉了 8 毫秒。
若是调用构造器的 create 方法,thenAwait 和 expectNoEvent 方法仍然可使用,不过它们会阻塞指定的时间。
在RxJava 实例解析一文中提到的动态和静态 Observable 对 Reactor 来讲也是适用的。
若是你要建立一个自定义的 Flux,须要使用 Reactor 的 FluxSink。这个类将会为你考虑全部跟异步有关的状况,你只须要把注意力集中在触发事件上。
使用 Flux.create 并从回调中得到的 FluxSink 能够用于后续的触发事件。这个自定义的 Flux 是静态的,为了把它变成动态的,可使用 publish() 和 connect() 方法。基于上一篇文章中的例子,咱们几乎能够把它逐字逐句地翻译成 Reactor 的版本:
SomeFeed<PriceTick> feed = new SomeFeed<>(); Flux<PriceTick> flux = Flux.create(emitter -> { SomeListener listener = new SomeListener() { @Override public void priceTick(PriceTick event) { emitter.next(event); if (event.isLast()) { emitter.complete(); } } @Override public void error(Throwable e) { emitter.error(e); }}; feed.register(listener); }, FluxSink.OverflowStrategy.BUFFER); ConnectableFlux<PriceTick> hot = flux.publish();
在链接到动态 Flux 以前,能够作两次订阅:一个订阅将打印每一个 tick 的细节,另外一个订阅会打印出 instrument:
hot.subscribe(priceTick -> System.out.printf("%s %4s %6.2f%n", priceTick .getDate(), priceTick.getInstrument(), priceTick.getPrice())); hot.subscribe(priceTick -> System.out.println(priceTick.getInstrument()));
接下来咱们链接到动态 Flux,并在程序结束前让它运行 5 秒钟:
hot.connect(); Thread.sleep(5000);
(要注意,若是 PriceTick 的 isLast() 方法改变了,那么 feed 自己也会结束)。
FluxSink 经过 isCancelled() 来检查下游的订阅是否已取消。你还能够经过 requestedFromDownstream() 来得到请求数,这个在遵循回压策略时很管用。最后,你能够经过 setCancellation 方法释放全部使用过的资源。
要注意,FluxSink 使用了回压,因此你必须提供一个 OverflowStrategy 来显式地处理回压。这个等价于使用 onBackpressureXXX 操做(例如,FluxSink.OverflowStrategy.BUFFER 等价于.onBackpressureBuffer()),它们会覆盖来自下游的回压信号。
在这篇文章里,咱们学习了 Reactor,一个运行在 Java 8 之上并以 Rx 规范和 Reactive Streams 规范为基础的第四代响应式框架。咱们展现了 RxJava 中的设计理念是如何被应用在 Reactor 上的,尽管它们之间有一些 API 设计上的差异。咱们还展现了 Reactor 如何成为 Spring 5 的基础,还提供了一些跟测试 Publisher、Flux 和 Mono 有关的资源。
关注公众号:JAVA九点半课堂,这里有一批优秀的程序猿,加入咱们,一块儿探讨技术,共同进步!回复“资料”获取 2T 行业最新资料!