疯狂创客圈 经典图书 : 《Netty Zookeeper Redis 高并发实战》 面试必备 + 面试必备 + 面试必备 【博客园总入口 】html
疯狂创客圈 经典图书 : 《SpringCloud、Nginx高并发核心编程》 大厂必备 + 大厂必备 + 大厂必备 【博客园总入口 】前端
入大厂+涨工资必备: 高并发【 亿级流量IM实战】 实战系列 【 SpringCloud Nginx秒杀】 实战系列 【博客园总入口 】java
网盘地址 和 提取码:请参见 疯狂创客圈 的 百度网盘小视频和小工具react
Stephane Maldini @smaldini Simon Baslé @simonbasle3.2.0.BUILD-SNAPSHOTgit
Appendix A: 我须要哪一个操做符?github
Appendix B: FAQ,最佳实践,以及“我如何…?”面试
Appendix C: Reactor-Extra算法
(译者加)本文档的一些典型的名词以下:Publisher (发布者)、Subscriber (订阅者)、Subscription (订阅 n.)、subscribe (订阅 v.)。event /signal (事件/信号,原文常甚至在一个句子将两个词来回用,但表示的意思是基本相同的, 所以若是你看到本文翻译有时候用事件,有时候用信号,在本文档内基本能够认为一个意思)。sequence /stream (序列/流,两个词意思类似,本文介绍的是响应式流的内容,可是出现比较多的是 sequence这个词,主要翻译为“序列”,有些地方为了更加契合且方便理解翻译为“流序列”)。element /item (主要指序列中的元素,文中两个词基本翻译为“元素”)。emit /produce /generate (发出/产生/生成,文中这三个英文词也有类似之处,对于 emit 多翻译为 “发出”,对于后两个多翻译为“生成”)、consume (消费)。Processor (未作翻译,保留英文)。operator (译做操做符,声明式的可组装的响应式方法,其组装成的链译做“操做链”)。 |
|
---|---|
本节是对 Reactor参考文档(译者加:原文估计是多我的写的,时而“document”时而“guide”,不影响理解的状况下, 翻译就一概用“文档”了) 的简要概述。你并不须要从头至尾阅读该文档。每一节的内容都是独立的,不过会有其余章节的连接。spring
本Reactor参考文档也提供HTML形式。最新版本见 http://projectreactor.io/docs/core/release/reference/docs/index.html。数据库
本文档的副本你能够自用,亦可分发给他人。不过不管是打印版仍是电子版,请免费提供。
本参考文档用 Asciidoc 编写, 其源码见 https://github.com/reactor/reactor-core/tree/master/src/docs/asciidoc (译者加:本翻译源码见 https://github.com/get-set/reactor-core/tree/master-zh/src/docs/asciidoc )。
若有任何补充,欢迎你提交 pull request。
咱们建议你将源码 checkout 到本地,这样能够使用 gradle 的 asciidoctor
任务检查文档渲染效果。 有些章节会包含其余文件,Github 并不必定可以渲染出来。
为了方便读者的反馈,多数章节在结尾都提供一个连接,这个连接能够打开一个 Github 上的 编辑界面,从而能够编辑相应章节的源码。这些连接在 HTML5 的版本中可以看到,就像这样: 翻译建议 - 关于本文档。 | |
---|---|
Reactor项目有多种方式但愿能帮助到你:
project-reactor
进行提问。全部 Reactor 项目都是开源的, 包括本文档。 若是你发现本文档有问题,或但愿补充一些内容,请参考 这里 进行了解。 | |
---|---|
Flux
, 包含 0-N 个元素的异步序列" 和 "Mono
, 异步的 0-1 结果";reactor-test
项目,参考 测试。这一节的内容可以帮助你上手使用 Reactor。包括以下内容:
Reactor 是一个用于JVM的彻底非阻塞的响应式编程框架,具有高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,好比 CompletableFuture
, Stream
, 以及 Duration
。它提供了异步序列 API Flux
(用于[N]个元素)和 Mono
(用于 [0|1]个元素),并彻底遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)。
Reactor 的 reactor-ipc
组件还支持非阻塞的进程间通讯(inter-process communication, IPC)。 Reactor IPC 为 HTTP(包括 Websockets)、TCP 和 UDP 提供了支持背压的网络引擎,从而适合 应用于微服务架构。而且完整支持响应式编解码(reactive encoding and decoding)。
Reactor Core 运行于 Java 8
及以上版本。
依赖 org.reactive-streams:reactive-streams:1.0.2
。
Andriod 支持方面:Reactor 3 并不正式支持 Andorid(若是须要能够考虑使用 RxJava 2)。可是,在 Android SDK 26(Android 0)及以上版本应该没问题。咱们但愿可以最大程度兼顾对 Android 的支持,可是咱们并不能做出保证,具体状况具体分析。 | |
---|---|
自从 reactor-core 3.0.4
,随着 Aluminium
版本发布上车(release train)以来,Reactor 3 使用了 BOM(Bill of Materials,一种标准的 Maven artifact)。
使用 BOM 能够管理一组良好集成的 maven artifacts,从而无需操心不一样版本组件的互相依赖问题。
BOM 是一系列有版本信息的 artifacts,经过“列车发布”(release train)的发布方式管理, 每趟发布列车由一个“代号+修饰词”组成,好比:
Aluminium-RELEASE Carbon-BUILD-SNAPSHOT Aluminium-SR1 Bismuth-RELEASE Carbon-SR32
代号替代了传统的“主版本.次版本”的数字形式。这些代号主要来自 Periodic Table of Elements, 按首字母顺序依次选取。
修饰词有(按照时间顺序):
BUILD-SNAPSHOT
M1
..N
: 里程碑号RELEASE
: 第一次 GA (General Availability) 发布SR1
..N
: 后续的 GA 发布(相似于 PATCH 号或 SR(Service Release))。前边提到,使用 Reactor 的最简单方式是在你的项目中配置 BOM 以及相关依赖。 注意,当你这样添加依赖的时候,要省略版本(
固然,若是你但愿使用某个版本的 artifact,仍然能够指定。甚至彻底不使用 BOM,逐个配置 artifact 的版本也是能够的。
Maven 原生支持 BOM。首先,你须要在 pom.xml
内经过添加下边的代码引入 BOM。若是 (dependencyManagement
) 已经存在,只须要添加其内容便可。
<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Bismuth-RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意 dependencyManagement 标签用来补充一般使用的 dependencies 配置。 |
|
---|---|
而后,在 dependencies
中添加相关的 reactor 项目,省略 <version>
,以下:
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> </dependencies>
依赖 Core 库 | |
---|---|
没有 version 标签 | |
reactor-test 提供了对 reactive streams 的单测 |
Gradle 没有对 Maven BOM 的支持,可是你能够使用 Spring 的 gradle-dependency-management 插件。
首先,apply 插件。
plugins { id "io.spring.dependency-management" version "1.0.1.RELEASE" }
编写本文档时,插件最新版本为 1.0.1.RELEASE,请自行使用合适的版本。 | |
---|---|
而后用它引入 BOM:
dependencyManagement { imports { mavenBom "io.projectreactor:reactor-bom:Bismuth-RELEASE" } }
Finally add a dependency to your project, without a version number:
dependencies { compile 'io.projectreactor:reactor-core' }
无需第三个 : 添加版本号。 |
|
---|---|
里程碑版(Milestones)和开发预览版(developer previews)经过 Spring Milestones repository 而不是 Maven Central 来发布。 须要添加到构建配置文件中,如:
Milestones in Maven
<repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones Repository</name> <url>https://repo.spring.io/milestone</url> </repository> </repositories>
gradle 使用下边的配置:
Milestones in Gradle
repositories { maven { url 'http://repo.spring.io/milestone' } mavenCentral() }
相似的,snapshot 版也须要配置专门的库:
BUILD-SNAPSHOTs in Maven
<repositories> <repository> <id>spring-snapshots</id> <name>Spring Snapshot Repository</name> <url>https://repo.spring.io/snapshot</url> </repository> </repositories>
BUILD-SNAPSHOTs in Gradle
repositories { maven { url 'http://repo.spring.io/snapshot' } mavenCentral() }
Reactor 是响应式编程范式的实现,总结起来有以下几点:
响应式编程是一种关注于数据流(data streams)和变化传递(propagation of change)的异步编程方式。 这意味着它能够用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。
在响应式编程方面,微软跨出了第一步,它在 .NET 生态中建立了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在JVM上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式 编程规范,它定义了一系列标准接口和交互规范。并整合到 Java 9 中(使用 Flow
类)。
响应式编程一般做为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 由于其中也有 Iterable
-Iterator
这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。
使用 iterator 是一种“命令式”(imperative)编程范式,即便访问元素的方法是 Iterable
的惟一职责。关键在于,何时执行 next()
获取元素取决于开发者。在响应式流中,相对应的 角色是 Publisher-Subscriber
,可是 当有新的值到来的时候 ,却反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键。此外,对推送来的数据的操做 是经过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者经过 描述“控制流程”来定义对数据流的处理逻辑。
除了数据推送,对错误处理(error handling)和完成(completion)信号的定义也很完善。 一个 Publisher
能够推送新的值到它的 Subscriber
(调用 onNext
方法), 一样也能够推送错误(调用 onError
方法)和完成(调用 onComplete
方法)信号。 错误和完成信号均可以终止响应式流。能够用下边的表达式描述:
onNext x 0..N [onError | onComplete]
这种方式很是灵活,不管是有/没有值,仍是 n 个值(包括有无限个值的流,好比时钟的持续读秒),均可处理。
那么咱们为何须要这样的异步响应式开发库呢?
现代应用须要应对大量的并发用户,并且即便现代硬件的处理能力飞速发展,软件性能仍然是关键因素。
广义来讲咱们有两种思路来提高程序性能:
一般,Java开发者使用阻塞式(blocking)编写代码。这没有问题,在出现性能瓶颈后, 咱们能够增长处理线程,线程中一样是阻塞的代码。可是这种使用资源的方式会迅速面临 资源竞争和并发问题。
更糟糕的是,阻塞会浪费资源。具体来讲,好比当一个程序面临延迟(一般是I/O方面, 好比数据库读写请求或网络调用),所在线程须要进入 idle 状态等待数据,从而浪费资源。
因此,并行化方式并不是银弹。这是挖掘硬件潜力的方式,可是却带来了复杂性,并且容易形成浪费。
第二种思路——提升执行效率——能够解决资源浪费问题。经过编写 异步非阻塞 的代码, (任务发起异步调用后)执行过程会切换到另外一个 使用一样底层资源 的活跃任务,而后等 异步调用返回结果再去处理。
可是在 JVM 上如何编写异步代码呢?Java 提供了两种异步编程方式:
callback
做为参数(lambda 或匿名类),当结果出来后回调这个 callback
。常见的例子好比 Swings 的 EventListener
。Future<T>
,该异步方法要返回结果的是 T
类型,经过 Future
封装。这个结果并非 马上 能够拿到,而是等实际处理结束才可用。好比, ExecutorService
执行 Callable<T>
任务时会返回 Future
对象。这些技术够用吗?并不是对于每一个用例都是如此,两种方式都有局限性。
回调很难组合起来,由于很快就会致使代码难以理解和维护(即所谓的“回调地狱(callback hell)”)。
考虑这样一种情景:在用户界面上显示用户的5个收藏,或者若是没有任何收藏提供5个建议。这须要3个 服务(一个提供收藏的ID列表,第二个服务获取收藏内容,第三个提供建议内容):
回调地狱(Callback Hell)的例子
userService.getFavorites(userId, new Callback<List<String>>() { public void onSuccess(List<String> list) { if (list.isEmpty()) { suggestionService.getSuggestions(new Callback<List<Favorite>>() { public void onSuccess(List<Favorite> list) { UiUtils.submitOnUiThread(() -> { list.stream() .limit(5) .forEach(uiList::show); }); } public void onError(Throwable error) { UiUtils.errorPopup(error); } }); } else { list.stream() .limit(5) .forEach(favId -> favoriteService.getDetails(favId, new Callback<Favorite>() { public void onSuccess(Favorite details) { UiUtils.submitOnUiThread(() -> uiList.show(details)); } public void onError(Throwable error) { UiUtils.errorPopup(error); } } )); } } public void onError(Throwable error) { UiUtils.errorPopup(error); } });
基于回调的服务使用一个匿名 Callback 做为参数。后者的两个方法分别在异步执行成功 或异常时被调用。 |
|
---|---|
获取到收藏ID的list后调用第一个服务的回调方法 onSuccess 。 |
|
若是 list 为空, 调用 suggestionService 。 |
|
服务 suggestionService 传递 List<Favorite> 给第二个回调。 |
|
既然是处理 UI,咱们须要确保消费代码运行在 UI 线程。 | |
使用 Java 8 Stream 来限制建议数量为5,而后在 UI 中显示。 |
|
在每一层,咱们都以一样的方式处理错误:在一个 popup 中显示错误信息。 | |
回到收藏 ID 这一层,若是返回 list,咱们须要使用 favoriteService 来获取 Favorite 对象。因为只想要5个,所以使用 stream 。 |
|
再一次回调。此次对每一个ID,获取 Favorite 对象在 UI 线程中推送到前端显示。 |
这里有很多代码,稍微有些难以阅读,而且还有重复代码,咱们再来看一下用 Reactor 实现一样功能:
使用 Reactor 实现以上回调方式一样功能的例子
userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);
咱们获取到收藏ID的流 | |
---|---|
咱们 异步地转换 它们(ID) 为 Favorite 对象(使用 flatMap ),如今咱们有了 Favorite 流。 |
|
一旦 Favorite 为空,切换到 suggestionService 。 |
|
咱们只关注流中的最多5个元素。 | |
最后,咱们但愿在 UI 线程中进行处理。 | |
经过描述对数据的最终处理(在 UI 中显示)和对错误的处理(显示在 popup 中)来触发(subscribe )。 |
若是你想确保“收藏的ID”的数据在800ms内得到(若是超时,从缓存中获取)呢?在基于回调的代码中, 会比较复杂。但 Reactor 中就很简单,在处理链中增长一个 timeout
的操做符便可。
Reactor 中增长超时控制的例子
userService.getFavorites(userId) .timeout(Duration.ofMillis(800)) .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);
若是流在超时时限没有发出(emit)任何值,则发出错误(error)。 | |
---|---|
一旦收到错误,交由 cacheService 处理。 |
|
处理链后边的内容与上例相似。 |
Futures 比回调要好一点,但即便在 Java 8 引入了 CompletableFuture
,它对于多个处理的组合仍不够好用。 编排多个 Futures 是可行的,但却不易。此外,Future
还有一个问题:当对 Future
对象最终调用 get()
方法时,仍然会致使阻塞,而且缺少对多个值以及更进一步对错误的处理。
考虑另一个例子,咱们首先获得 ID 的列表,而后经过它进一步获取到“对应的 name 和 statistics” 为元素的列表,整个过程用异步方式来实现。
CompletableFuture
处理组合的例子
CompletableFuture<List<String>> ids = ifhIds(); CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { Stream<CompletableFuture<String>> zip = l.stream().map(i -> { CompletableFuture<String> nameTask = ifhName(i); CompletableFuture<Integer> statTask = ifhStat(i); return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]); CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); return allDone.thenApply(v -> combinationList.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); }); List<String> results = result.join(); assertThat(results).contains( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121");
以一个 Future 开始,其中封装了后续将获取和处理的 ID 的 list。 | |
---|---|
获取到 list 后边进一步对其启动异步处理任务。 | |
对于 list 中的每个元素: | |
异步地获得相应的 name。 | |
异步地获得相应的 statistics。 | |
将两个结果一一组合。 | |
咱们如今有了一个 list,元素是 Future(表示组合的任务,类型是 CompletableFuture ),为了执行这些任务, 咱们须要将这个 list(元素构成的流) 转换为数组(List )。 |
|
将这个数组传递给 CompletableFuture.allOf ,返回一个 Future ,当因此任务都完成了,那么这个 Future 也就完成了。 |
|
有点麻烦的地方在于 allOf 返回的是 CompletableFuture<Void> ,因此咱们遍历这个 Future 的List , ,而后使用 join() 来手机它们的结果(不会致使阻塞,由于 AllOf 确保这些 Future 所有完成) |
|
一旦整个异步流水线被触发,咱们等它完成处理,而后返回结果列表。 |
因为 Reactor 内置许多组合操做,所以以上例子能够简单地实现:
Reactor 实现与 Future 一样功能的代码
Flux<String> ids = ifhrIds(); Flux<String> combinations = ids.flatMap(id -> { Mono<String> nameTask = ifhrName(id); Mono<Integer> statTask = ifhrStat(id); return nameTask.zipWith(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); Mono<List<String>> result = combinations.collectList(); List<String> results = result.block(); assertThat(results).containsExactly( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121" );
这一次,咱们从一个异步方式提供的 ids 序列(Flux<String> )开始。 |
|
---|---|
对于序列中的每个元素,咱们异步地处理它(flatMap 方法内)两次。 |
|
获取相应的 name。 | |
获取相应的 statistic. | |
异步地组合两个值。 | |
随着序列中的元素值“到位”,它们收集一个 List 中。 |
|
在生成流的环节,咱们能够继续异步地操做 Flux 流,对其进行组合和订阅(subscribe)。 最终咱们极可能获得一个 Mono 。因为是测试,咱们阻塞住(block() ),等待流处理过程结束, 而后直接返回集合。 |
|
Assert 结果。 |
回调或 Future 遇到的窘境是相似的,这也是响应式编程要经过 Publisher-Suscriber
方式来解决的。
相似 Reactor 这样的响应式库的目标就是要弥补上述“经典”的 JVM 异步方式所带来的不足, 此外还会关注一下几个方面:
可编排性,指的是编排多个异步任务的能力。好比咱们将前一个任务的结果传递给后一个任务做为输入, 或者将多个任务以分解再汇总(fork-join)的形式执行,或者将异步的任务做为离散的组件在系统中 进行重用。
这种编排任务的能力与代码的可读性和可维护性是紧密相关的。随着异步处理任务数量和复杂度 的提升,编写和阅读代码都变得愈来愈困难。就像咱们刚才看到的,回调模式是简单的,可是缺点 是在复杂的处理逻辑中,回调中会层层嵌入回调,致使 回调地狱(Callback Hell) 。你能猜到 (或有过这种痛苦经历),这样的代码是难以阅读和分析的。
Reactor 提供了丰富的编排操做,从而代码直观反映了处理流程,而且全部的操做保持在同一层次 (尽可能避免了嵌套)。
你能够想象数据在响应式应用中的处理,就像流过一条装配流水线。Reactor 既是传送带, 又是一个个的装配工或机器人。原材料从源头(最初的 Publisher
)流出,最终被加工为成品, 等待被推送到消费者(或者说 Subscriber
)。
原材料会通过不一样的中间处理过程,或者做为半成品与其余半成品进行组装。若是某处有齿轮卡住, 或者某件产品的包装过程花费了过久时间,相应的工位就能够向上游发出信号来限制或中止发出原材料。
在 Reactor 中,操做符(operator)就像装配线中的工位(操做员或装配机器人)。每个操做符 对 Publisher
进行相应的处理,而后将 Publisher
包装为一个新的 Publisher
。就像一个链条, 数据源自第一个 Publisher
,而后顺链条而下,在每一个环节进行相应的处理。最终,一个订阅者 (Subscriber
)终结这个过程。请记住,在订阅者(Subscriber
)订阅(subscribe)到一个 发布者(Publisher
)以前,什么都不会发生。
理解了操做符会建立新的 Publisher 实例这一点,可以帮助你避免一个常见的问题, 这种问题会让你以为处理链上的某个操做符没有起做用。相关内容请参考 item 。 |
|
---|---|
虽然响应式流规范(Reactive Streams specification)没有规定任何操做符, 相似 Reactor 这样的响应式库所带来的最大附加价值之一就是提供丰富的操做符。包括基础的转换操做, 到过滤操做,甚至复杂的编排和错误处理操做。
subscribe()
以前什么都不会发生在 Reactor 中,当你建立了一条 Publisher
处理链,数据还不会开始生成。事实上,你是建立了 一种抽象的对于异步处理流程的描述(从而方便重用和组装)。
当真正“订阅(subscrib)”的时候,你须要将 Publisher
关联到一个 Subscriber
上,而后 才会触发整个链的流动。这时候,Subscriber
会向上游发送一个 request
信号,一直到达源头 的 Publisher
。
向上游传递信号这一点也被用于实现 背压 ,就像在装配线上,某个工位的处理速度若是慢于流水线 速度,会对上游发送反馈信号同样。
在响应式流规范中实际定义的机制同刚才的类比很是接近:订阅者能够无限接受数据并让它的源头 “满负荷”推送全部的数据,也能够经过使用 request
机制来告知源头它一次最多可以处理 n
个元素。
中间环节的操做也能够影响 request
。想象一个可以将每10个元素分批打包的缓存(buffer
)操做。 若是订阅者请求一个元素,那么对于源头来讲能够生成10个元素。此外预取策略也能够使用了, 好比在订阅前预先生成元素。
这样可以将“推送”模式转换为“推送+拉取”混合的模式,若是下游准备好了,能够从上游拉取 n 个元素;可是若是上游元素尚未准备好,下游仍是要等待上游的推送。
在 Rx 家族的响应式库中,响应式流分为“热”和“冷”两种类型,区别主要在于响应式流如何 对订阅者进行响应:
Subscriber
,都会收到从头开始全部的数据。若是源头 生成了一个 HTTP 请求,对于每个订阅都会建立一个新的 HTTP 请求。Subscriber
,只能获取从它开始 订阅 以后 发出的数据。不过注意,有些“热”的响应式流能够缓存部分或所有历史数据。 一般意义上来讲,一个“热”的响应式流,甚至在即便没有订阅者接收数据的状况下,也能够 发出数据(这一点同 “Subscribe()
以前什么都不会发生”的规则有冲突)。更多关于 Reactor 中“热”vs“冷”的内容,请参考 this reactor-specific section。
Reactor 项目的主要 artifact 是 reactor-core
,这是一个基于 Java 8 的实现了响应式流规范 (Reactive Streams specification)的响应式库。
Reactor 引入了实现 Publisher
的响应式类 Flux
和 Mono
,以及丰富的操做方式。 一个 Flux
对象表明一个包含 0..N 个元素的响应式序列,而一个 Mono
对象表明一个包含 零/一个(0..1)元素的结果。
这种区别为这俩类型带来了语义上的信息——代表了异步处理逻辑所面对的元素基数。好比, 一个 HTTP 请求产生一个响应,因此对其进行 count
操做是没有多大意义的。表示这样一个 结果的话,应该用 Mono<HttpResponse>
而不是 Flux<HttpResponse>
,由于要置于其上的 操做一般只用于处理 0/1 个元素。
有些操做能够改变基数,从而须要切换类型。好比,count
操做用于 Flux
,可是操做 返回的结果是 Mono<Long>
。
Flux
, 包含 0-N 个元素的异步序列Flux<T>
是一个可以发出 0 到 N 个元素的标准的 Publisher<T>
,它会被一个“错误(error)” 或“完成(completion)”信号终止。所以,一个 flux 的可能结果是一个 value、completion 或 error。 就像在响应式流规范中规定的那样,这三种类型的信号被翻译为面向下游的 onNext
,onComplete
和onError
方法。
因为多种不一样的信号可能性,Flux
能够做为一种通用的响应式类型。注意,全部的信号事件, 包括表明终止的信号事件都是可选的:若是没有 onNext
事件可是有一个 onComplete
事件, 那么发出的就是 空的 有限序列,可是去掉 onComplete
那么获得的就是一个 无限的 空序列。 固然,无限序列也能够不是空序列,好比,Flux.interval(Duration)
生成的是一个 Flux<Long>
, 这就是一个无限地周期性发出规律 tick 的时钟序列。
Mono
, 异步的 0-1 结果Mono<T>
是一种特殊的 Publisher<T>
, 它最多发出一个元素,而后终止于一个 onComplete
信号或一个 onError
信号。
它只适用其中一部分可用于 Flux
的操做。好比,(两个 Mono
的)结合类操做能够忽略其中之一 而发出另外一个 Mono
,也能够将两个都发出,对于后一种状况会切换为一个 Flux
。
例如,Mono#concatWith(Publisher)
返回一个 Flux
,而 Mono#then(Mono)
返回另外一个 Mono
。
注意,Mono
能够用于表示“空”的只有完成概念的异步处理(好比 Runnable
)。这种用 Mono<Void>
来建立。
最简单的上手 Flux
和 Mono
的方式就是使用相应类提供的多种工厂方法之一。
好比,若是要建立一个 String
的序列,你能够直接列举它们,或者将它们放到一个集合里而后用来建立 Flux,以下:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar"); List<String> iterable = Arrays.asList("foo", "bar", "foobar"); Flux<String> seq2 = Flux.fromIterable(iterable);
工厂方法的其余例子以下:
Mono<String> noData = Mono.empty(); Mono<String> data = Mono.just("foo"); Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
注意,即便没有值,工厂方法仍然采用通用的返回类型。 | |
---|---|
第一个参数是 range 的开始,第二个参数是要生成的元素个数。 |
在订阅(subscribe)的时候,Flux
和 Mono
使用 Java 8 lambda 表达式。 .subscribe()
方法有多种不一样的方法签名,你能够传入各类不一样的 lambda 形式的参数来定义回调。以下所示:
基于 lambda 的对 Flux
的订阅(subscribe)
subscribe(); subscribe(Consumer<? super T> consumer); subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
订阅并触发序列。 | |
---|---|
对每个生成的元素进行消费。 | |
对正常元素进行消费,也对错误进行响应。 | |
对正常元素和错误均有响应,还定义了序列正常完成后的回调。 | |
对正常元素、错误和完成信号均有响应, 同时也定义了对该 subscribe 方法返回的 Subscription 执行的回调。 |
以上方法会返回一个 Subscription 的引用,若是再也不须要更多元素你能够经过它来取消订阅。 取消订阅时, 源头会中止生成新的数据,并清理相关资源。取消和清理的操做在 Reactor 中是在 接口 Disposable 中定义的。 |
|
---|---|
subscribe
方法示例这一小节包含了对 subscribe
的5个不一样签名的方法的示例,以下是一个无参的基本方法的使用:
Flux<Integer> ints = Flux.range(1, 3); ints.subscribe();
配置一个在订阅时会产生3个值的 Flux 。 |
|
---|---|
最简单的订阅方式。 |
第二行代码没有任何输出,可是它确实执行了。Flux
产生了3个值。若是咱们传入一个 lambda, 咱们就能够看到这几个值,以下一个列子:
Flux<Integer> ints = Flux.range(1, 3); ints.subscribe(i -> System.out.println(i));
配置一个在订阅时会产生3个值的 Flux 。 |
|
---|---|
订阅它并打印值。 |
第二行代码会输入以下内容:
1 2 3
为了演示下一个方法签名,咱们故意引入一个错误,以下所示:
Flux<Integer> ints = Flux.range(1, 4) .map(i -> { if (i <= 3) return i; throw new RuntimeException("Got to 4"); }); ints.subscribe(i -> System.out.println(i), error -> System.err.println("Error: " + error));
配置一个在订阅时会产生4个值的 Flux 。 |
|
---|---|
为了对元素进行处理,咱们须要一个 map 操做。 | |
对于多数元素,返回值自己。 | |
对其中一个元素抛出错误。 | |
订阅的时候定义如何进行错误处理。 |
如今咱们有两个 lambda 表达式:一个是用来处理正常数据,一个用来处理错误。 刚才的代码输出以下:
1 2 3 Error: java.lang.RuntimeException: Got to 4
下一个 subscribe
方法的签名既有错误处理,还有一个完成后的处理,以下:
Flux<Integer> ints = Flux.range(1, 4); ints.subscribe(i -> System.out.println(i), error -> System.err.println("Error " + error), () -> {System.out.println("Done");});
配置一个在订阅时会产生4个值的 Flux 。 |
|
---|---|
订阅时定义错误和完成信号的处理。 |
错误和完成信号都是终止信号,而且两者只会出现其中之一。为了可以最终所有正常完成,你必须处理错误信号。
用于处理完成信号的 lambda 是一对空的括号,由于它实际上匹配的是 Runnalbe
接口中的 run
方法, 不接受参数。刚才的代码输出以下:
1 2 3 4 Done
最后一个 subscribe
方法签名包含一个自定义的 subscriber
(下一节会介绍到):
SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>(); Flux<Integer> ints = Flux.range(1, 4); ints.subscribe(i -> System.out.println(i), error -> System.err.println("Error " + error), () -> {System.out.println("Done");}, s -> ss.request(10)); ints.subscribe(ss);
上面这个例子中,咱们把一个自定义的 Subscriber
做为 subscribe
方法的最后一个参数。 下边的例子是这个自定义的 Subscriber
,这是一个对 Subscriber
的最简单实现:
package io.projectreactor.samples; import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; public class SampleSubscriber<T> extends BaseSubscriber<T> { public void hookOnSubscribe(Subscription subscription) { System.out.println("Subscribed"); request(1); } public void hookOnNext(T value) { System.out.println(value); request(1); } }
SampleSubscriber
类继承自 BaseSubscriber
,在 Reactor 中, 推荐用户扩展它来实现自定义的 Subscriber
。这个类提供了一些 hook 方法,咱们能够经过重写它们来调整 subscriber 的行为。 默认状况下,它会触发一个无限个数的请求,可是当你想自定义请求元素的个数的时候,扩展 BaseSubscriber
就很方便了。
扩展的时候一般至少要覆盖 hookOnSubscribe(Subscription subscription)
和 hookOnNext(T value)
这两个方法。这个例子中, hookOnSubscribe
方法打印一段话到标准输出,而后进行第一次请求。 而后 hookOnNext
一样进行了打印,同时逐个处理剩余请求。
SampleSubscriber
输出以下:
Subscribed 1 2 3 4
建议你同时重写 hookOnError 、hookOnCancel ,以及 hookOnComplete 方法。 你最好也重写 hookFinally 方法。SampleSubscribe 确实是一个最简单的实现了 请求有限个数元素的 Subscriber 。 |
|
---|---|
本文档后边还会再讨论 BaseSubscriber
。
响应式流规范定义了另外一个 subscribe
方法的签名,它只接收一个自定义的 Subscriber
, 没有其余的参数,以下所示:
subscribe(Subscriber<? super T> subscriber);
若是你已经有一个 Subscriber
,那么这个方法签名仍是挺有用的。何况,你可能还会用到它 来作一些订阅相关(subscription-related)的回调。好比,你想要自定义“背压(backpressure)” 而且本身来触发请求。
在这种状况下,使用 BaseSubscriber
抽象类就很方便,由于它提供了很好的配置“背压” 的方法。
使用 BaseSubscriber
来配置“背压”
Flux<String> source = someStringSource(); source.map(String::toUpperCase) .subscribe(new BaseSubscriber<String>() { @Override protected void hookOnSubscribe(Subscription subscription) { request(1); } @Override protected void hookOnNext(String value) { request(1); } });
BaseSubscriber 是一个抽象类,因此咱们建立一个匿名内部类。 |
|
---|---|
BaseSubscriber 定义了多种用于处理不一样信号的 hook。它还定义了一些捕获 Subscription 对象的现成方法,这些方法能够用在 hook 中。 |
|
request(n) 就是这样一个方法。它可以在任何 hook 中,经过 subscription 向上游传递 背压请求。这里咱们在开始这个流的时候请求1个元素值。 |
|
随着接收到新的值,咱们继续以每次请求一个元素的节奏从源头请求值。 | |
其余 hooks 有 hookOnComplete , hookOnError , hookOnCancel , and hookFinally (它会在流终止的时候被调用,传入一个 SignalType 做为参数)。 |
当你修改请求操做的时候,你必须注意让 subscriber 向上提出足够的需求, 不然上游的 Flux 可能会被“卡住”。因此 BaseSubscriber 在进行扩展的时候要覆盖 hookOnSubscribe 和 onNext ,这样你至少会调用 request 一次。 |
|
---|---|
BaseSubscriber
还提供了 requestUnbounded()
方法来切换到“无限”模式(等同于 request(Long.MAX_VALUE)
)。
在这一小节,咱们介绍如何经过定义相对应的事件(onNext
、onError
和onComplete
) 建立一个 Flux
或 Mono
。全部这些方法都经过 API 来触发咱们叫作 sink(池) 的事件。 sink 的类型很少,咱们快速过一下。
最简单的建立 Flux
的方式就是使用 generate
方法。
这是一种 同步地, 逐个地 产生值的方法,意味着 sink 是一个 SynchronousSink
并且其 next()
方法在每次回调的时候最多只能被调用一次。你也能够调用 error(Throwable)
或者 complete()
,不过是可选的。
最有用的一种方式就是同时可以记录一个状态值(state),从而在使用 sink 发出下一个元素的时候可以 基于这个状态值去产生元素。此时生成器(generator)方法就是一个 BiFunction<S, SynchronousSink<T>, S>
, 其中 <S>
是状态对象的类型。你须要提供一个 Supplier<S>
来初始化状态值,而生成器须要 在每一“回合”生成元素后返回新的状态值(供下一回合使用)。
例如咱们使用一个 int
做为状态值。
基于状态值的 generate
示例
Flux<String> flux = Flux.generate( () -> 0, (state, sink) -> { sink.next("3 x " + state + " = " + 3*state); if (state == 10) sink.complete(); return state + 1; });
初始化状态值(state)为0。 | |
---|---|
咱们基于状态值 state 来生成下一个值(state 乘以 3)。 | |
咱们也能够用状态值来决定何时终止序列。 | |
返回一个新的状态值 state,用于下一次调用。 |
上面的代码生成了“3 x”的乘法表:
3 x 0 = 0 3 x 1 = 3 3 x 2 = 6 3 x 3 = 9 3 x 4 = 12 3 x 5 = 15 3 x 6 = 18 3 x 7 = 21 3 x 8 = 24 3 x 9 = 27 3 x 10 = 30
咱们也能够使用可变(mutable)类型(译者注:如上例,原生类型及其包装类,以及String等属于不可变类型) 的 <S>
。上边的例子也能够用 AtomicLong
做为状态值,在每次生成后改变它的值。
可变类型的状态变量
Flux<String> flux = Flux.generate( AtomicLong::new, (state, sink) -> { long i = state.getAndIncrement(); sink.next("3 x " + i + " = " + 3*i); if (i == 10) sink.complete(); return state; });
此次咱们初始化一个可变类型的状态值。 | |
---|---|
改变状态值。 | |
返回 同一个 实例做为新的状态值。 |
若是状态对象须要清理资源,能够使用 generate(Supplier<S>, BiFunction, Consumer<S>) 这个签名方法来清理状态对象(译者注:Comsumer 在序列终止才被调用)。 |
|
---|---|
下面是一个在 generate 方法中增长 Consumer
的例子:
Flux<String> flux = Flux.generate( AtomicLong::new, (state, sink) -> { long i = state.getAndIncrement(); sink.next("3 x " + i + " = " + 3*i); if (i == 10) sink.complete(); return state; }, (state) -> System.out.println("state: " + state)); }
一样,初始化一个可变对象做为状态变量。 | |
---|---|
改变状态。 | |
返回 同一个 实例做为新的状态。 | |
咱们会看到最后一个状态值(11)会被这个 Consumer lambda 输出。 |
若是 state 使用了数据库链接或者其余须要最终进行清理的资源,这个 Consumer
lambda 能够用来在最后关闭链接或完成相关的其余清理任务。
做为一个更高级的建立 Flux
的方式, create
方法的生成方式既能够是同步, 也能够是异步的,而且还能够每次发出多个元素。
该方法用到了 FluxSink
,后者一样提供 next
,error
和 complete
等方法。 与 generate
不一样的是,create
不须要状态值,另外一方面,它能够在回调中触发 多个事件(即便是在将来的某个时间)。
create 有个好处就是能够将现有的 API 转为响应式,好比监听器的异步方法。 |
|
---|---|
假设你有一个监听器 API,它按 chunk 处理数据,有两种事件:(1)一个 chunk 数据准备好的事件;(2)处理结束的事件。以下:
interface MyEventListener<T> { void onDataChunk(List<T> chunk); void processComplete(); }
你能够使用 create
方法将其转化为响应式类型 Flux<T>
:
Flux<String> bridge = Flux.create(sink -> { myEventProcessor.register( new MyEventListener<String>() { public void onDataChunk(List<String> chunk) { for(String s : chunk) { sink.next(s); } } public void processComplete() { sink.complete(); } }); });
桥接 MyEventListener 。 |
|
---|---|
每个 chunk 的数据转化为 Flux 中的一个元素。 |
|
processComplete 事件转换为 onComplete 。 |
|
全部这些都是在 myEventProcessor 执行时异步执行的。 |
此外,既然 create
能够是异步地,而且可以控制背压,你能够经过提供一个 OverflowStrategy
来定义背压行为。
IGNORE
: 彻底忽略下游背压请求,这可能会在下游队列积满的时候致使 IllegalStateException
。ERROR
: 当下游跟不上节奏的时候发出一个 IllegalStateException
的错误信号。DROP
:当下游没有准备好接收新的元素的时候抛弃这个元素。LATEST
:让下游只获得上游最新的元素。BUFFER
:(默认的)缓存全部下游没有来得及处理的元素(这个不限大小的缓存可能致使 OutOfMemoryError
)。Mono 也有一个用于 create 的生成器(generator)—— MonoSink ,它不能生成多个元素, 所以会抛弃第一个元素以后的全部元素。 |
|
---|---|
create
的一个变体是 push
,适合生成事件流。与 create
相似,push
也能够是异步地, 而且可以使用以上各类溢出策略(overflow strategies)管理背压。每次只有一个生成线程能够调用 next
,complete
或 error
。
Flux<String> bridge = Flux.push(sink -> { myEventProcessor.register( new SingleThreadEventListener<String>() { public void onDataChunk(List<String> chunk) { for(String s : chunk) { sink.next(s); } } public void processComplete() { sink.complete(); } public void processError(Throwable e) { sink.error(e); } }); });
桥接 SingleThreadEventListener API。 |
|
---|---|
在监听器所在线程中,事件经过调用 next 被推送到 sink。 |
|
complete 事件也在同一个线程中。 |
|
error 事件也在同一个线程中。 |
不像 push
,create
能够用于 push
或 pull
模式,所以适合桥接监听器的 的 API,由于事件消息会随时异步地到来。回调方法 onRequest
能够被注册到 FluxSink
以便跟踪请求。这个回调能够被用于从源头请求更多数据,或者经过在下游请求到来 的时候传递数据给 sink 以实现背压管理。这是一种推送/拉取混合的模式, 由于下游能够从上游拉取已经就绪的数据,上游也能够在数据就绪的时候将其推送到下游。
Flux<String> bridge = Flux.create(sink -> { myMessageProcessor.register( new MyMessageListener<String>() { public void onMessage(List<String> messages) { for(String s : messages) { sink.next(s); } } }); sink.onRequest(n -> { List<String> messages = myMessageProcessor.request(n); for(String s : message) { sink.next(s); } });
当有请求的时候取出一个 message。 | |
---|---|
若是有就绪的 message,就发送到 sink。 | |
后续异步到达的 message 也会被发送给 sink。 |
onDispose
和 onCancel
这两个回调用于在被取消和终止后进行清理工做。 onDispose
可用于在 Flux
完成,有错误出现或被取消的时候执行清理。 onCancel
只用于针对“取消”信号执行相关操做,会先于 onDispose
执行。
Flux<String> bridge = Flux.create(sink -> { sink.onRequest(n -> channel.poll(n)) .onCancel(() -> channel.cancel()) .onDispose(() -> channel.close()) });
onCancel 在取消时被调用。 |
|
---|---|
onDispose 在有完成、错误和取消时被调用。 |
handle
方法有些不一样,它在 Mono
和 Flux
中都有。然而,它是一个实例方法 (instance method),意思就是它要连接在一个现有的源后使用(与其余操做符同样)。
它与 generate
比较相似,由于它也使用 SynchronousSink
,而且只容许元素逐个发出。 然而,handle
可被用于基于现有数据源中的元素生成任意值,有可能还会跳过一些元素。 这样,能够把它当作 map
与 filter
的组合。handle
方法签名以下:
handle(BiConsumer<T, SynchronousSink<R>>)
举个例子,响应式流规范容许 null
这样的值出如今序列中。假如你想执行一个相似 map
的操做,你想利用一个现有的具备映射功能的方法,可是它会返回 null,这时候怎么办呢?
例如,下边的方法能够用于 Integer 序列,映射为字母或 null 。
public String alphabet(int letterNumber) { if (letterNumber < 1 || letterNumber > 26) { return null; } int letterIndexAscii = 'A' + letterNumber - 1; return "" + (char) letterIndexAscii; }
咱们能够使用 handle
来去掉其中的 null。
将 handle
用于一个 "映射 + 过滤 null" 的场景
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20) .handle((i, sink) -> { String letter = alphabet(i); if (letter != null) sink.next(letter); }); alphabet.subscribe(System.out::println);
映射到字母。 | |
---|---|
若是返回的是 null … | |
就不会调用 sink.next 从而过滤掉。 |
输出以下:
M I T
Reactor, 就像 RxJava,也能够被认为是 并发无关(concurrency agnostic) 的。意思就是, 它并不强制要求任何并发模型。更进一步,它将选择权交给开发者。不过,它仍是提供了一些方便 进行并发执行的库。
在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler
。 Scheduler
是一个拥有普遍实现类的抽象接口。 Schedulers
类提供的静态方法用于达成以下的执行环境:
Schedulers.immediate()
)Schedulers.single()
)。注意,这个方法对全部调用者都提供同一个线程来使用, 直到该调度器(Scheduler)被废弃。若是你想使用专注的线程,就对每个调用使用 Schedulers.newSingle()
。Schedulers.elastic()
。它根据须要建立一个线程池,重用空闲线程。线程池若是空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。 Schedulers.elastic()
可以方便地给一个阻塞 的任务分配它本身的线程,从而不会妨碍其余任务和资源,见 如何包装一个同步阻塞的调用?。Schedulers.parallel()
)。所建立线程池的大小与 CPU 个数等同。此外,你还能够使用 Schedulers.fromExecutorService(ExecutorService)
基于现有的 ExecutorService
建立 Scheduler
。(虽然不太建议,不过你也能够使用 Executor
来建立)。你也能够使用 newXXX
方法来建立不一样的调度器。好比 Schedulers.newElastic(yourScheduleName)
建立一个新的名为 yourScheduleName
的弹性调度器。
操做符基于非阻塞算法实现,从而能够利用到某些调度器的工做窃取(work stealing) 特性的好处。 | |
---|---|
一些操做符默认会使用一个指定的调度器(一般也容许开发者调整为其余调度器)例如, 经过工厂方法 Flux.interval(Duration.ofMillis(300))
生成的每 300ms 打点一次的 Flux<Long>
, 默认状况下使用的是 Schedulers.parallel()
,下边的代码演示了如何将其装换为 Schedulers.single()
:
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
Reactor 提供了两种在响应式链中调整调度器 Scheduler
的方法:publishOn
和 subscribeOn
。 它们都接受一个 Scheduler
做为参数,从而能够改变调度器。可是 publishOn
在链中出现的位置 是有讲究的,而 subscribeOn
则无所谓。要理解它们的不一样,你首先要理解 nothing happens until you subscribe()。
在 Reactor 中,当你在操做链上添加操做符的时候,你能够根据须要在 Flux
和 Mono
的实现中包装其余的 Flux
和 Mono
。一旦你订阅(subscribe)了它,一个 Subscriber
的链 就被建立了,一直向上到第一个 publisher 。这些对开发者是不可见的,开发者所能看到的是最外一层的 Flux
(或 Mono
)和 Subscription
,可是具体的任务是在中间这些跟操做符相关的 subscriber 上处理的。
基于此,咱们仔细研究一下 publishOn
和 subscribeOn
这两个操做符:
publishOn
的用法和处于订阅链(subscriber chain)中的其余操做符同样。它将上游 信号传给下游,同时执行指定的调度器 Scheduler
的某个工做线程上的回调。 它会 改变后续的操做符的执行所在线程 (直到下一个 publishOn
出如今这个链上)。subscribeOn
用于订阅(subscription)过程,做用于那个向上的订阅链(发布者在被订阅 时才激活,订阅的传递方向是向上游的)。因此,不管你把 subscribeOn
至于操做链的什么位置, 它都会影响到源头的线程执行环境(context)。 可是,它不会影响到后续的 publishOn
,后者仍可以切换其后操做符的线程执行环境。只有操做链中最先的 subscribeOn 调用才算数。 |
|
---|---|
Flux
和 Mono
不会建立线程。一些操做符,好比 publishOn
,会建立线程。同时,做为一种任务共享形式, 这些操做符可能会从其余任务池(work pool)——若是其余任务池是空闲的话——那里“偷”线程。所以, 不管是 Flux
、Mono
仍是 Subscriber
都应该精于线程处理。它们依赖这些操做符来管理线程和任务池。
publishOn
强制下一个操做符(极可能包括下一个的下一个…)来运行在一个不一样的线程上。 相似的,subscribeOn
强制上一个操做符(极可能包括上一个的上一个…)来运行在一个不一样的线程上。 记住,在你订阅(subscribe)前,你只是定义了处理流程,而没有启动发布者。基于此,Reactor 能够使用这些规则来决定如何执行操做链。而后,一旦你订阅了,整个流程就开始工做了。
下边的例子演示了支持任务共享的多线程模型:
Flux.range(1, 10000) .publishOn(Schedulers.parallel()) .subscribe(result)
建立一个有 10,000 个元素的 Flux 。 |
|
---|---|
建立等同于 CPU 个数的线程(最小为4)。 | |
subscribe() 以前什么都不会发生。 |
Scheduler.parallel()
建立一个基于单线程 ExecutorService
的固定大小的任务线程池。 由于可能会有一个或两个线程致使问题,它老是至少建立 4 个线程。而后 publishOn 方法便共享了这些任务线程, 当 publishOn
请求元素的时候,会从任一个正在发出元素的线程那里获取元素。这样, 就是进行了任务共享(一种资源共享方式)。Reactor 还提供了好几种共享资源的方式,请参考 Schedulers。
Scheduler.elastic()
也能建立线程,它可以很方便地建立专门的线程(以便跑一些可能会阻塞资源的任务, 好比一个同步服务),请见 如何包装一个同步阻塞的调用?。
内部机制保证了这些操做符可以借助自增计数器(incremental counters)和警惕条件(guard conditions) 以线程安全的方式工做。例如,若是咱们有四个线程处理一个流(就像上边的例子),每个请求会让计数器自增, 这样后续的来自不一样线程的请求就能拿到正确的元素。
若是想了解有哪些可用于错误处理的操做符,请参考 the relevant operator decision tree。 | |
---|---|
在响应式流中,错误(error)是终止(terminal)事件。当有错误发生时,它会致使流序列中止, 而且错误信号会沿着操做链条向下传递,直至遇到你定义的 Subscriber
及其 onError
方法。
这样的错误仍是应该在应用层面解决的。好比,你可能会将错误信息显示在用户界面,或者经过某个 REST 端点(endpoint)发出。所以,订阅者(subscriber)的 onError
方法是应该定义的。
若是没有定义,onError 会抛出 UnsupportedOperationException 。你能够接下来再 检测错误,并经过 Exceptions.isErrorCallbackNotImplemented 方法捕获和处理它。 |
|
---|---|
Reactor 还提供了其余的用于在链中处理错误的方法,即错误处理操做(error-handling operators)。
在你了解错误处理操做符以前,你必须牢记 响应式流中的任何错误都是一个终止事件。 即便用了错误处理操做符,也不会让源头流序列继续。而是将 onError 信号转化为一个 新的 序列 的开始。换句话说,它代替了被终结的 上游 流序列。 |
|
---|---|
如今咱们来逐个看看错误处理的方法。须要的时候咱们会同时用到命令式编程风格的 try
代码块来做比较。
你也许熟悉在 try-catch 代码块中处理异常的几种方法。常见的包括以下几种:
业务相关的异常
,而后再抛出业务异常。finally
来清理资源,或使用 Java 7 引入的 "try-with-resource"。以上全部这些在 Reactor 都有相应的基于 error-handling 操做符处理方式。
在开始研究这些操做符以前,咱们先准备好响应式链(reactive chain)方式和 try-catch 代码块方式(以便对比)。
当订阅的时候,位于链结尾的 onError
回调方法和 catch
块相似,一旦有异常,执行过程会跳入到 catch:
Flux<String> s = Flux.range(1, 10) .map(v -> doSomethingDangerous(v)) .map(v -> doSecondTransform(v)); s.subscribe(value -> System.out.println("RECEIVED " + value), error -> System.err.println("CAUGHT " + error) );
执行 map 转换,有可能抛出异常。 | |
---|---|
若是没问题,执行第二个 map 转换操做。 | |
全部转换成功的值都打印出来。 | |
一旦有错误,序列(sequence)终止,并打印错误信息。 |
这与 try/catch 代码块是相似的:
try { for (int i = 1; i < 11; i++) { String v1 = doSomethingDangerous(i); String v2 = doSecondTransform(v1); System.out.println("RECEIVED " + v2); } } catch (Throwable t) { System.err.println("CAUGHT " + t); }
若是这里抛出异常… | |
---|---|
…后续的代码跳过… | |
…执行过程直接到这。 |
既然咱们准备了两种方式作对比,咱们就来看一下不一样的错误处理场景,以及相应的操做符。
与第 (1) 条(捕获并返回一个静态的缺省值)对应的是 onErrorReturn
:
Flux.just(10) .map(this::doSomethingDangerous) .onErrorReturn("RECOVERED");
你还能够经过判断错误信息的内容,来筛选哪些要给出缺省值,哪些仍然让错误继续传递下去:
Flux.just(10) .map(this::doSomethingDangerous) .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");
若是你不仅是想要在发生错误的时候给出缺省值,而是但愿提供一种更安全的处理数据的方式, 能够使用 onErrorResume
。这与第 (2) 条(捕获并执行一个异常处理方法)相似。
假设,你会尝试从一个外部的不稳定服务获取数据,但仍然会在本地缓存一份 可能 有些过时的数据, 由于缓存的读取更加可靠。能够这样来作:
Flux.just("key1", "key2") .flatMap(k -> callExternalService(k)) .onErrorResume(e -> getFromCache(k));
对于每个 key, 异步地调用一个外部服务。 | |
---|---|
若是对外部服务的调用失败,则再去缓存中查找该 key。注意,这里不管 e 是什么,都会执行异常处理方法。 |
就像 onErrorReturn
,onErrorResume
也有能够用于预先过滤错误内容的方法变体,能够基于异常类或 Predicate
进行过滤。它其实是用一个 Function
来做为参数,还能够返回一个新的流序列。
Flux.just("timeout1", "unknown", "key2") .flatMap(k -> callExternalService(k)) .onErrorResume(error -> { if (error instanceof TimeoutException) return getFromCache(k); else if (error instanceof UnknownKeyException) return registerNewEntry(k, "DEFAULT"); else return Flux.error(error); });
这个函数式容许开发者自行决定如何处理。 | |
---|---|
若是源超时,使用本地缓存。 | |
若是源找不到对应的 key,建立一个新的实体。 | |
不然, 将问题“从新抛出”。 |
有时候并不想提供一个错误处理方法,而是想在接收到错误的时候计算一个候补的值。这相似于第 (3) 条(捕获并动态计算一个候补值)。
例如,若是你的返回类型自己就有可能包装有异常(好比 Future.complete(T success)
vs Future.completeExceptionally(Throwable error)
),你有可能使用流中的错误包装起来实例化 返回值。
这也能够使用上一种错误处理方法的方式(使用 onErrorResume
)解决,代码以下:
erroringFlux.onErrorResume(error -> Mono.just( myWrapper.fromError(error) ));
在 onErrorResume 中,使用 Mono.just 建立一个 Mono 。 |
|
---|---|
将异常包装到另外一个类中。 |
在“错误处理方法”的例子中,基于 flatMap
方法的最后一行,咱们能够猜到如何作到第 (4) 条(捕获,包装到一个业务相关的异常,而后抛出业务异常):
Flux.just("timeout1") .flatMap(k -> callExternalService(k)) .onErrorResume(original -> Flux.error( new BusinessException("oops, SLA exceeded", original) );
然而还有一个更加直接的方法—— onErrorMap
:
Flux.just("timeout1") .flatMap(k -> callExternalService(k)) .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
若是对于错误你只是想在不改变它的状况下作出响应(如记录日志),并让错误继续传递下去, 那么能够用 doOnError
方法。这对应第 (5) 条(捕获,记录错误日志,并继续抛出)。 这个方法与其余以 doOn
开头的方法同样,只起反作用("side-effect")。它们对序列都是只读, 而不会带来任何改动。
以下边的例子所示,咱们会记录错误日志,而且还经过变量自增统计错误发生个数。
LongAdder failureStat = new LongAdder(); Flux<String> flux = Flux.just("unknown") .flatMap(k -> callExternalService(k)) .doOnError(e -> { failureStat.increment(); log("uh oh, falling back, service failed for key " + k); }) .onErrorResume(e -> getFromCache(k));
对外部服务的调用失败… | |
---|---|
…记录错误日志… | |
…而后回调错误处理方法。 |
最后一个要与命令式编程对应的对比就是使用 Java 7 "try-with-resources" 或 finally
代码块清理资源。这是第 (6) 条(使用 finally
代码块清理资源或使用 Java 7 引入的 "try-with-resource")。在 Reactor 中都有对应的方法: using
和 doFinally
:
AtomicBoolean isDisposed = new AtomicBoolean(); Disposable disposableInstance = new Disposable() { @Override public void dispose() { isDisposed.set(true); } @Override public String toString() { return "DISPOSABLE"; } }; Flux<String> flux = Flux.using( () -> disposableInstance, disposable -> Flux.just(disposable.toString()), Disposable::dispose );
第一个 lambda 生成资源,这里咱们返回模拟的(mock) Disposable 。 |
|
---|---|
第二个 lambda 处理资源,返回一个 Flux<T> 。 |
|
第三个 lambda 在 2) 中的资源 Flux 终止或取消的时候,用于清理资源。 |
|
在订阅或执行流序列以后, isDisposed 会置为 true 。 |
另外一方面, doFinally
在序列终止(不管是 onComplete
、onError
仍是取消)的时候被执行, 而且可以判断是什么类型的终止事件(完成、错误仍是取消?)。
LongAdder statsCancel = new LongAdder(); Flux<String> flux = Flux.just("foo", "bar") .doFinally(type -> { if (type == SignalType.CANCEL) statsCancel.increment(); }) .take(1);
咱们想进行统计,因此用到了 LongAdder 。 |
|
---|---|
doFinally 用 SignalType 检查了终止信号的类型。 |
|
若是只是取消,那么统计数据自增。 | |
take(1) 可以在发出 1 个元素后取消流。 |
onError
为了演示当错误出现的时候如何致使上游序列终止,咱们使用 Flux.interval
构造一个更加直观的例子。 这个 interval 操做符会在每 x 单位的时间发出一个自增的 Long
值。
Flux<String> flux = Flux.interval(Duration.ofMillis(250)) .map(input -> { if (input < 3) return "tick " + input; throw new RuntimeException("boom"); }) .onErrorReturn("Uh oh"); flux.subscribe(System.out::println); Thread.sleep(2100);
注意 interval 默认基于一个 timer Scheduler 来执行。 若是咱们想在 main 方法中运行, 咱们须要调用 sleep ,这样程序就能够在尚未产生任何值的时候就退出了。 |
|
---|---|
每 250ms 打印出一行信息,以下:
tick 0 tick 1 tick 2 Uh oh
即便多给了 1 秒钟时间,也没有更多的 tick 信号由 interval
产生了,因此序列确实被错误信号终止了。
还有一个用于错误处理的操做符你可能会用到,就是 retry
,见文知意,用它能够对出现错误的序列进行重试。
问题是它对于上游 Flux
是基于重订阅(re-subscribing)的方式。这实际上已经一个不一样的序列了, 发出错误信号的序列仍然是终止了的。为了验证这一点,咱们能够在继续用上边的例子,增长一个 retry(1)
代替 onErrorReturn
来重试一次。
Flux.interval(Duration.ofMillis(250)) .map(input -> { if (input < 3) return "tick " + input; throw new RuntimeException("boom"); }) .elapsed() .retry(1) .subscribe(System.out::println, System.err::println); Thread.sleep(2100);
elapsed 会关联从当前值与上个值发出的时间间隔(译者加:以下边输出的内容中的 259/249/251…)。 |
|
---|---|
咱们仍是要看一下 onError 时的内容。 |
|
确保咱们有足够的时间能够进行 4x2 次 tick。 |
输出以下:
259,tick 0 249,tick 1 251,tick 2 506,tick 0 248,tick 1 253,tick 2 java.lang.RuntimeException: boom
一个新的 interval 从 tick 0 开始。多出来的 250ms 间隔来自于第 4 次 tick, 就是致使出现异常并执行 retry 的那次(译者加:我在机器上测试的时候 elapsed “显示”的时间间隔没有加倍,可是确实有第 4 次的间隔)。 |
|
---|---|
可见, retry(1)
不过是再一次重新订阅了原始的 interval
,从 tick 0 开始。第二次, 因为异常再次出现,便将异常传递到下游了。
还有一个“高配版”的 retry
(retryWhen
),它使用一个伴随("companion") Flux
来判断对某次错误是否要重试。这个伴随 Flux
是由操做符建立的,可是由开发者包装它, 从而实现对重试操做的配置。
这个伴随 Flux
是一个 Flux<Throwable>
,它做为 retryWhen
的惟一参数被传递给一个 Function
,你能够定义这个 Function
并让它返回一个新的 Publisher<?>
。重试的循环 会这样运行:
Flux
,后者已经被你用 Function
包装。Flux
发出元素,就会触发重试。Flux
完成(complete),重试循环也会中止,而且原始序列也会 完成(complete)。Flux
产生一个错误,重试循环中止,原始序列也中止 或 完成,而且这个错误会致使 原始序列失败并终止。了解前两个场景的区别是很重要的。若是让伴随 Flux
完成(complete)等于吞掉了错误。以下代码用 retryWhen
模仿了 retry(3)
的效果:
Flux<String> flux = Flux .<String>error(new IllegalArgumentException()) .doOnError(System.out::println) .retryWhen(companion -> companion.take(3));
持续产生错误。 | |
---|---|
在 retry 以前 的 doOnError 可让咱们看到错误。 |
|
这里,咱们认为前 3 个错误是能够重试的(take(3) ),再有错误就放弃。 |
事实上,上边例子最终获得的是一个 空的 Flux
,可是却 成功 完成了。反观对同一个 Flux
调用 retry(3)
的话,最终是以最后一个 error 终止 Flux
,故而 retryWhen
与之不一样。
实现一样的效果须要一些额外的技巧:
Flux<String> flux = Flux.<String>error(new IllegalArgumentException()) .retryWhen(companion -> companion .zipWith(Flux.range(1, 4), (error, index) -> { if (index < 4) return index; else throw Exceptions.propagate(error); }) );
技巧一:使用 zip 和一个“重试个数 + 1”的 range 。 |
|
---|---|
zip 方法让你能够在对重试次数计数的同时,仍掌握着原始的错误(error)。 |
|
容许三次重试,小于 4 的时候发出一个值。 | |
为了使序列以错误结束。咱们将原始异常在三次重试以后抛出。 |
相似的代码也能够被用于实现 exponential backoff and retry 模式 (译者加:重试指定的次数, 且每一次重试之间停顿的时间逐渐增长),参考 FAQ。 | |
---|---|
整体来讲,全部的操做符自身均可能包含触发异常的代码,或自定义的可能致使失败的代码, 因此它们都自带一些错误处理方式。
通常来讲,一个 不受检异常(Unchecked Exception) 老是由 onError
传递。例如, 在一个 map
方法中抛出 RuntimeException
会被翻译为一个 onError
事件,以下:
Flux.just("foo") .map(s -> { throw new IllegalArgumentException(s); }) .subscribe(v -> System.out.println("GOT VALUE"), e -> System.out.println("ERROR: " + e));
上边代码输出以下:
ERROR: java.lang.IllegalArgumentException: foo
Exception 能够在其被传递给 onError 以前,使用 hook 进行调整。 |
|
---|---|
Reactor,定义了一系列的可以致使“严重失败”的错误(好比 OutOfMemoryError
),也可参考 Exceptions.throwIfFatal
方法。这些错误意味着 Reactor 无力处理只能抛出,没法传递下去。
还有些状况下不受检异常仍然没法传递下去(多数处于subscribe 和 request 阶段), 由于可能因为多线程竞争致使两次 onError 或 onComplete 的状况。当这种竞争发生的时候, 没法传递下去的错误信号就被“丢弃”了。这些状况仍然能够经过自定义的 hook 来搞定,见 丢弃事件的 Hooks。 |
|
---|---|
你可能会问:“那么 受检查异常(Checked Exceptions)?”
若是你须要调用一个声明为 throws
异常的方法,你仍然须要使用 try-catch
代码块处理异常。 有几种方式:
Exceptions
可用于这种方式(咱们立刻会讲到)。Flux
(例如在 flatMap
中),将异常包装在一个产生错误的 Flux
中: return Flux.error(checkedException)
(流序列也会终止)。Reactor 有一个工具类 Exceptions
,能够确保在收到受检异常的时候将其包装(wrap)起来。
Exceptions.propagate
方法来包装异常,它一样会首先调用 throwIfFatal
, 而且不会包装 RuntimeException
。Exceptions.unwrap
方法来获得原始的未包装的异常(追溯最初的异常)。下面是一个 map
的例子,它使用的 convert 方法会抛出 IOException
:
public String convert(int i) throws IOException { if (i > 3) { throw new IOException("boom " + i); } return "OK " + i; }
如今想象你将这个方法用于一个 map
中,你必须明确捕获这个异常,而且你的 map
方法不能再次抛出它。 因此你能够将其以 RuntimeException
的形式传递给 onError
:
Flux<String> converted = Flux .range(1, 10) .map(i -> { try { return convert(i); } catch (IOException e) { throw Exceptions.propagate(e); } });
当后边订阅上边的这个 Flux
并响应错误(好比在用户界面)的时候,若是你想处理 IOException, 你还能够再将其转换为原始的异常。以下:
converted.subscribe( v -> System.out.println("RECEIVED: " + v), e -> { if (Exceptions.unwrap(e) instanceof IOException) { System.out.println("Something bad happened with I/O"); } else { System.out.println("Something bad happened"); } } );
Processors 既是一种特别的发布者(Publisher
)又是一种订阅者(Subscriber
)。 那意味着你能够 订阅一个 Processor
(一般它们会实现 Flux
),也能够调用相关方法来手动 插入数据到序列,或终止序列。
Processor 有多种类型,它们都有特别的语义规则,可是在你研究它们以前,最好问一下 本身以下几个问题:
多数状况下,你应该进行避免使用 Processor
,它们较难正确使用,主要用于一些特殊场景下。
若是你以为 Processor
适合你的使用场景,请首先看一下是否尝试过如下两种替代方式:
Processor
)若是看了以上替代方案,你仍然以为须要一个 Processor
,阅读 现有的 Processors 总览 这一节来了解一下不一样的实现吧。
Sink
门面对象来线程安全地生成流比起直接使用 Reactor 的 Processors
,更好的方式是经过调用一次 sink()
来获得 Processor
的 Sink
。
FluxProcessor
的 sink 是线程安全的“生产者(producer)”,所以可以在应用程序中 多线程并发地生成数据。例如,一个线程安全的序列化(serialized)的 sink 可以经过 UnicastProcessor
建立:
UnicastProcessor<Integer> processor = UnicastProcessor.create(); FluxSink<Integer> sink = processor.sink(overflowStrategy);
多个生产者线程能够并发地生成数据到如下的序列化 sink。
sink.next(n);
根据 Processor
及其配置,next
产生的溢出有两种可能的处理方式:
IGNORE
策略,或将 overflowStrategy
应用于 sink
。Reactor Core 内置多种 Processor
。这些 processor 具备不一样的语法,大概分为三类。 下边简要介绍一下这三种 processor:
DirectProcessor
和 UnicastProcessor
):这些 processors 只能经过直接 调用 Sink
的方法来推送数据。EmitterProcessor
和 ReplayProcessor
):这些 processors 既能够 直接调用 Sink
方法来推送数据,也能够经过订阅到一个上游的发布者来同步地产生数据。WorkQueueProcessor
和 TopicProcessor
):这些 processors 能够将从多个上游发布者获得的数据推送下去。因为使用了 RingBuffer
的数据结构来 缓存多个来自上游的数据,所以更加有健壮性。异步的 processor 在实例化的时候最复杂,由于有许多不一样的选项。所以它们暴露出一个 Builder
接口。 而简单的 processors 有静态的工厂方法。
DirectProcessor
能够将信号分发给零到多个订阅者(Subscriber
)。它是最容易实例化的,使用静态方法 create()
便可。另外一方面,它的不足是没法处理背压。因此,当 DirectProcessor
推送的是 N 个元素,而至少有一个订阅者的请求个数少于 N 的时候,就会发出一个 IllegalStateException
。
一旦 Processor
终止(一般经过调用它的 Sink
的 error(Throwable)
或 complete()
方法), 虽然它容许更多的订阅者订阅它,可是会当即向它们从新发送终止信号。
UnicastProcessor
能够使用一个内置的缓存来处理背压。代价就是它最多只能有一个订阅者。
UnicastProcessor
有多种选项,所以提供多种不一样的 create
静态方法。例如,它默认是 无限的(unbounded) :若是你在在订阅者尚未请求数据的状况下让它推送数据,它会缓存全部数据。
能够经过提供一个自定义的 Queue
的具体实现传递给 create
工厂方法来改变默认行为。若是给出的队列是 有限的(bounded), 而且缓存已满,并且未收到下游的请求,processor 会拒绝推送数据。
在上边 有限的 例子中,还能够在构造 processor 的时候提供一个回调方法,这个回调方法能够在每个 被拒绝推送的元素上调用,从而让开发者有机会清理这些元素。
EmitterProcessor
可以向多个订阅者发送数据,而且能够对每个订阅者进行背压处理。它自己也能够订阅一个 Publisher
并同步得到数据。
最初若是没有订阅者,它仍然容许推送一些数据到缓存,缓存大小由 bufferSize
定义。 以后若是仍然没有订阅者订阅它并消费数据,对 onNext
的调用会阻塞,直到有订阅者接入 (这时只能并发地订阅了)。
所以第一个订阅者会收到最多 bufferSize
个元素。然而以后, processor 不会从新发送(replay) 数据给后续的订阅者。这些后续接入的订阅者只能获取到它们开始订阅 以后 推送的数据。这个内部的 缓存会继续用于背压的目的。
默认状况下,若是全部的订阅者都取消了(基本意味着它们都再也不订阅(un-subscribed)了), 它会清空内部缓存,而且再也不接受更多的订阅者。这一点能够经过 create
静态工厂方法的 autoCancel
参数来配置。
ReplayProcessor
会缓存直接经过自身的 Sink
推送的元素,以及来自上游发布者的元素, 而且后来的订阅者也会收到重发(replay)的这些元素。
能够经过多种配置方式建立它:
cacheLast
)。create(int)
),全部的历史元素(create()
)。createTimeout(Duration)
)。createSizeOrTimeout(int, Duration)
)。TopicProcessor
是一个异步的 processor,它可以重发来自多个上游发布者的元素, 这须要在建立它的时候配置 shared
(见 build()
的 share(boolean)
配置)。
注意,若是你企图在并发环境下经过并发的上游 Publisher 调用 TopicProcessor
的 onNext
、 onComplete
,或 onError
方法,就必须配置 shared。
不然,并发调用就是非法的,从而 processor 是彻底兼容响应式流规范的。
TopicProcessor
可以对多个订阅者发送数据。它经过对每个订阅者关联一个线程来实现这一点, 这个线程会一直执行直到 processor 发出 onError
或 onComplete
信号,或关联的订阅者被取消。 最多能够接受的订阅者个数由构造者方法 executor
指定,经过提供一个有限线程数的 ExecutorService
来限制这一个数。
这个 processor 基于一个 RingBuffer
数据结构来存储已发送的数据。每个订阅者线程 自行管理其相关的数据在 RingBuffer
中的索引。
这个 processor 也有一个 autoCancel
构造器方法:若是设置为 true
(默认的),那么当 全部的订阅者取消以后,源 Publisher
(s) 也就被取消了。
WorkQueueProcessor
也是一个异步的 processor,也可以重发来自多个上游发布者的元素, 一样在建立时须要配置 shared
(它多数构造器配置与 TopicProcessor
相同)。
它放松了对响应式流规范的兼容,可是好处就在于相对于 TopicProcessor
来讲须要更少的资源。 它仍然基于 RingBuffer
,可是再也不要求每个订阅者都关联一个线程,所以相对于 TopicProcessor
来讲更具扩展性。
代价在于分发模式有些区别:来自订阅者的请求会汇总在一块儿,而且这个 processor 每次只对一个 订阅者发送数据,所以须要循环(round-robin)对订阅者发送数据,而不是一次所有发出的模式。
没法保证彻底公平的循环分发。 | |
---|---|
WorkQueueProcessor
多数构造器方法与 TopicProcessor
相同,好比 autoCancel
、share
, 以及 waitStrategy
。下游订阅者的最大数目一样由构造器 executor
配置的 ExecutorService
决定。
你最好注意不要有太多订阅者订阅 WorkQueueProcessor ,由于这 会锁住 processor。 若是你须要限制订阅者数量,最好使用一个 ThreadPoolExecutor 或 ForkJoinPool 。这个 processor 可以检测到(线程池)容量并在订阅者过多时抛出异常。 |
|
---|---|
翻译建议 - "Reactor 核心特性"
Kotlin 是一种运行于 JVM(及其余平台)上的静态(statically-typed)语言。 使用它能够在拥有与现有 Java 库良好https://kotlinlang.org/docs/reference/java-interop.html[互操做性] 的同时编写简介优雅的代码。
本小节介绍了 Reactor 3.1 如何可以完美支持 Kotlin。
Kotlin 支持 Kotlin 1.1+ 及依赖 kotlin-stdlib
(或 kotlin-stdlib-jre7
/ kotlin-stdlib-jre8
之一)
多亏了其良好的 Java 互操做性 以及 Kotlin 扩展(extensions), Reactor Kotlin APIs 既可以使用 Java APIs,还可以收益于一些 Reactor 内置的专门支持 Kotlin 的 APIs。
注意 Kotlin 的扩展须要 import 才可以使用。因此好比 Throwable.toFlux 的 Kotlin 扩展必须在 import reactor.core.publisher.toFlux 后才可以使用。多数场景下 IDE 应该可以自动给出这种相似 static import 的建议。 |
|
---|---|
例如,https://kotlinlang.org/docs/reference/inline-functions.html#reified-type-parameters[Kotlin 参数类型推导(reified type parameters)] 对于 JVM 的 通用类型擦除(generics type erasure)提供了一种变通解决方案, Reactor 就能够经过扩展(extension)来应用到这种特性。
下面是对“Reactor with Java”和“Reactor with Kotlin + extensions”的比较:
Java | Kotlin + extensions |
---|---|
Mono.just("foo") |
"foo".toMono() |
Flux.fromIterable(list) |
list.toFlux() |
Mono.error(new RuntimeException()) |
RuntimeException().toMono() |
Flux.error(new RuntimeException()) |
RuntimeException().toFlux() |
flux.ofType(Foo.class) |
flux.ofType<Foo>() or flux.ofType(Foo::class) |
StepVerifier.create(flux).verifyComplete() |
flux.test().verifyComplete() |
可参考 Reactor KDoc API 中详细的关于 Kotlin 扩展的文档。
Kotlin的一个关键特性就是 null 值安全 ——从而能够在编译时处理 null
值,而不是在运行时抛出著名的 NullPointerException
。 这样,经过“可能为空(nullability)”的声明,以及可以代表“有值或空值”的语法(避免使用相似 Optional
来进行包装),使得应用程序更加安全。(Kotlin容许在函数参数中使用可能为空的值, 请参考 comprehensive guide to Kotlin null-safety)
尽管 Java 的类型系统不容许这样的 null 值安全的表达方式, Reactor now provides null-safety 对全部 Reactor API 经过工具友好的(tooling-friendly)注解(在 reactor.util.annotation
包中定义)来支持。 默认状况下,Java APIs 用于 Kotlin 的话会被做为 平台类型(platform types) 而放松对 null 的检查。 Kotlin 对 JSR 305 注解的支持 + Reactor 可为空(nullability)的注解,为全部 Reactor API 和 Kotlin 开发者确保“null 值安全”的特性 (在编译期处理 null 值)。
JSR 305 的检查能够经过增长 -Xjsr305
编译参数进行配置: -Xjsr305={strict|warn|ignore}
。
对于 kotlin 1.1.50+,默认的配置为 -Xjsr305=warn
。若是但愿 Reactor API 可以全面支持 null 值安全 则须要配置为 strict
。不过你能够认为这是实验性的(experimental),由于 Reactor API “可能为空” 的声明可能甚至在小版本的发布中都会不断改进,并且未来也可能增长新的检查。
目前尚不支持通用类型参数、可变类型以及数组元素的“可为空”。不过应该包含在接下来的发布中,最新信息请看 这个issues。 | |
---|---|
翻译建议 - "对 Kotlin 的支持"
不管你是编写了一个简单的 Reactor 操做链,仍是开发了自定义的操做符,对它进行 自动化的测试老是一个好主意。
Reactor 内置一些专门用于测试的元素,放在一个专门的 artifact 里: reactor-test
。 你能够在 on Github 的 reactor-core 库里找到这个项目。
若是要用它来进行测试,添加 scope 为 test 的依赖。
reactor-test 用 Maven 配置 <dependencies>
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency>
若是你使用了 BOM,你不须要指定 <version> 。 |
|
---|---|
reactor-test 用 Gradle 配置 dependencies
dependencies { testCompile 'io.projectreactor:reactor-test' }
reactor-test
的两个主要用途:
StepVerifier
一步一步地测试一个给定场景的序列。TestPublisher
生成数据来测试下游的操做符(包括你本身的operator)。StepVerifier
来测试最多见的测试 Reactor 序列的场景就是定义一个 Flux
或 Mono
,而后在订阅它的时候测试它的行为。
当你的测试关注于每一次的事件的时候,就很是容易转化为使用 StepVerifier
的测试场景: 下一个指望的事件是什么?你是否指望使用 Flux
来发出一个特别的值?或者接下来 300ms 什么都不作?全部这些均可以使用 StepVerifier
API 来表示。
例如,你可能会使用以下的工具方法来包装一个 Flux
:
public <T> Flux<T> appendBoomError(Flux<T> source) { return source.concatWith(Mono.error(new IllegalArgumentException("boom"))); }
要测试它的话,你须要校验以下内容:
我但愿这个
Flux
先发出foo
,而后发出bar
,而后 生成一个内容为 boom 的错误。 最后订阅并校验它们。
使用 StepVerifier
API 来表示以上的验证过程:
@Test public void testAppendBoomError() { Flux<String> source = Flux.just("foo", "bar"); StepVerifier.create( appendBoomError(source)) .expectNext("foo") .expectNext("bar") .expectErrorMessage("boom") .verify(); }
因为被测试方法须要一个 Flux ,定义一个简单的 Flux 用于测试。 |
|
---|---|
建立一个 StepVerifier 构造器来包装和校验一个 Flux 。 |
|
传进来须要测试的 Flux (即待测方法的返回结果)。 |
|
第一个咱们指望的信号是 onNext ,它的值为 foo 。 |
|
最后咱们指望的是一个终止信号 onError ,异常内容应该为 boom 。 |
|
不要忘了使用 verify() 触发测试。 |
API 是一个构造器,经过传入一个要测试的序列来建立一个 StepVerifier
。从而你能够:
AssertionError
)。例如你可能会用到 expectNext(T...)
或 expectNextCount(long)
。assertion
的时候会用到它(好比要校验是否有一个 onNext
信号,并校验对应发出的元素是不是一个 size 为 5 的 List)。你可能会用到 consumeNextWith(Consumer<T>)
。thenAwait(Duration)
和 then(Runnable)
。对于终止事件,相应的指望方法(expectComplete()
、expectError()
,及其全部的变体方法) 使用以后就不能再继续增长别的指望方法了。最后你只能对 StepVerifier
进行一些额外的配置并 触发校验(一般调用 verify()
及其变体方法)。
从 StepVerifier 内部来看,它订阅了待测试的 Flux
或 Mono
,而后将序列中的每一个信号与测试 场景的指望进行比对。若是匹配的话,测试成功。若是有不匹配的状况,则抛出 AssertionError
异常。
请记住是 verify() 触发了校验过程。这个 API 还有一些结合了 verify() 与指望的终止信号 的方法:verifyComplete() 、verifyError() 、verifyErrorMessage(String) 等。 |
|
---|---|
注意,若是有一个传入 lambda 的指望方法抛出了 AssertionError
,会被报告为测试失败。 这可用于自定义 assertion。
默认状况下,verify() 方法(及同源的 verifyThenAssertThat 、verifyComplete() 等) 没有超时的概念。它可能会永远阻塞住。你能够使用 StepVerifier.setDefaultTimeout(Duration) 来设置一个全局的超时时间,或使用 verify(Duration) 指定。 |
|
---|---|
StepVerifier
能够用来测试基于时间的操做符,从而避免测试的长时间运行。能够使用构造器 StepVerifier.withVirtualTime
达到这一点。
示例以下:
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1))) //... 继续追加指望方法
虚拟时间(virtual time) 的功能会在 Reactor 的调度器(Scheduler
)工厂方法中插入一个自定义的 调度器。这些基于时间的操做符一般默认使用 Schedulers.parallel()
调度器。(虚拟时间的) 技巧在于使用一个 VirtualTimeScheduler
来代替默认调度器。然而一个重要的前提就是,只有在初始化 虚拟时间调度器以后的操做符才会起做用。
为了提升 StepVerifier
正常起做用的几率,它通常不接收一个简单的 Flux
做为输入,而是接收 一个 Supplier
,从而能够在配置好订阅者 以后 “懒建立”待测试的 flux。
要注意的是,Supplier<Publisher<T>> 可用于“懒建立”,不然不能保证虚拟时间 能彻底起做用。尤为要避免提早实例化 Flux ,要在 Supplier 中用 lambda 建立并返回 Flux 变量。 |
|
---|---|
有两种处理时间的指望方法,不管是否配置虚拟时间都是可用的:
thenAwait(Duration)
暂停校验步骤(容许信号延迟发出)。expectNoEvent(Duration)
一样让序列持续必定的时间,期间若是有 任何 信号发出则测试失败。两个方法都会基于给定的持续时间暂停线程的执行,若是是在虚拟时间模式下就相应地使用虚拟时间。
expectNoEvent 将订阅(subscription )也认做一个事件。假设你用它做为第一步,若是检测 到有订阅信号,也会失败。这时候能够使用 expectSubscription().expectNoEvent(duration) 来代替。 |
|
---|---|
为了快速校验前边提到的 Mono.delay
,咱们能够这样完成代码:
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1))) .expectSubscription() .expectNoEvent(Duration.ofDays(1)) .expectNext(0) .verifyComplete();
如上 tip。 | |
---|---|
期待一天内没有信号发生。 | |
而后期待一个 next 信号为 0 。 |
|
而后期待完成(同时触发校验)。 |
咱们也能够使用 thenAwait(Duration.ofDays(1))
,可是 expectNoEvent
的好处是 可以验证在此以前不会发生什么。
注意 verify()
返回一个 Duration
,这是整个测试的 真实时间。
虚拟时间并不是银弹。请记住 全部的 调度器都会被替换为 VirtualTimeScheduler 。 有些时候你能够锁定校验过程,由于虚拟时钟在遇到第一个指望校验以前并不会开始,因此对于 “无数据“的指望校验也必须可以运行在虚拟时间模式下。在无限序列中,虚拟时间模式的发挥 空间也颇有限,由于它可能致使线程(序列的发出和校验的运行都在这个线程上)卡住。 |
|
---|---|
StepVerifier
进行“后校验”当配置完你测试场景的最后的指望方法后,你能够使用 verifyThenAssertThat()
来代替 verify()
触发执行后的校验。
verifyThenAssertThat()
返回一个 StepVerifier.Assertions
对象,你能够用它来校验 整个测试场景成功刚结束后的一些状态(它也会调用 verify())。典型应用就是校验有多少 元素被操做符丢弃(参考 Hooks)。
Context
更多关于 Context
的内容请参考 增长一个 Context 到响应式序列。
StepVerifier
有一些指望方法能够用来测试 Context
:
expectAccessibleContext
: 返回一个 ContextExpectations
对象,你能够用它来在 Context
上配置指望校验。必定记住要调用 then()
来返回到对序列的指望校验上来。expectNoAccessibleContext
: 是对“没有Context
”的校验。一般用于 被测试的 Publisher
并非一个响应式的,或没有任何操做符可以传递 Context
(好比一个 generate
的 Publisher
).此外,还能够用 StepVerifierOptions
方法传入一个测试用的初始 Context
给 StepVerifier
, 从而能够建立一个校验(verifier)。
这些特性经过下边的代码演示:
StepVerifier.create(Mono.just(1).map(i -> i + 10), StepVerifierOptions.create().withInitialContext(Context.of("foo", "bar"))) .expectAccessibleContext() .contains("foo", "bar") .then() .expectNext(11) .verifyComplete();
使用 StepVerifierOptions 建立 StepVerifier 并传入初始 Context 。 |
|
---|---|
开始对 Context 进行校验,这里只是确保 Context 正常传播了。 |
|
对 Context 进行校验的例子:好比验证是否包含一个 "foo" - "bar" 键值对。 |
|
使用 then() 切换回对序列的校验。 |
|
不要忘了用 verify() 触发整个校验过程。 |
TestPublisher
手动发出元素对于更多高级的测试,若是可以彻底掌控源发出的数据就会方便不少,由于这样就能够在测试的 时候更加有的放矢地发出想测的数据。
另外一种状况就是你实现了本身的操做符,而后想校验它的行为——尤为是在源不稳定的时候——是否符合响应式流规范。
reactor-test
提供了 TestPublisher
类来应对这两种需求。这个类本质上是一个 Publisher<T>
, 你能够经过可编程的方式来用它发出各类信号:
next(T)
以及 next(T, T...)
发出 1-n 个 onNext
信号。emit(T...)
起一样做用,而且会执行 complete()
。complete()
会发出终止信号 onComplete
。error(Throwable)
会发出终止信号 onError
。使用 create
工厂方法就能够获得一个正常的 TestPublisher
。而使用 createNonCompliant
工厂方法能够建立一个“不正常”的 TestPublisher
。后者须要传入由 TestPublisher.Violation
枚举指定的一组选项,这些选项可用于告诉 publisher 忽略哪些问题。枚举值有:
REQUEST_OVERFLOW
: 容许 next
在请求不足的时候也能够调用,而不会触发 IllegalStateException
。ALLOW_NULL
: 容许 next
可以发出一个 null
值而不会触发 NullPointerException
。CLEANUP_ON_TERMINATE
: 能够重复屡次发出终止信号,包括 complete()
、error()
和 emit()
。最后,TestPublisher
还能够用不一样的 assert*
来跟踪其内部的订阅状态。
使用转换方法 flux()
和 mono()
能够将其做为 Flux
和 Mono
来使用。
PublisherProbe
检查执行路径当构建复杂的操做链时,可能会有多个子序列,从而致使多个执行路径。
多数时候,这些子序列会生成一个足够明确的 onNext
信号,你能够经过检查最终结果 来判断它是否执行。
考虑下边这个方法,它构建了一条操做链,并使用 switchIfEmpty
方法在源为空的状况下, 替换成另外一个源。
public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) { return source .flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+"))) .switchIfEmpty(fallback); }
很容易就能够测试出 switchIfEmpty 的哪个逻辑分支被使用了,以下:
@Test public void testSplitPathIsUsed() { StepVerifier.create(processOrFallback(Mono.just("just a phrase with tabs!"), Mono.just("EMPTY_PHRASE"))) .expectNext("just", "a", "phrase", "with", "tabs!") .verifyComplete(); } @Test public void testEmptyPathIsUsed() { StepVerifier.create(processOrFallback(Mono.empty(), Mono.just("EMPTY_PHRASE"))) .expectNext("EMPTY_PHRASE") .verifyComplete(); }
可是若是例子中的方法返回的是一个 Mono<Void>
呢?它等待源发送结束,执行一个额外的任务, 而后就结束了。若是源是空的,则执行另外一个备用的相似于 Runnable 的任务,以下:
private Mono<String> executeCommand(String command) { return Mono.just(command + " DONE"); } public Mono<Void> processOrFallback(Mono<String> commandSource, Mono<Void> doWhenEmpty) { return commandSource .flatMap(command -> executeCommand(command).then()) .switchIfEmpty(doWhenEmpty); }
then() 方法会忽略 command,它只关心是否结束。 |
|
---|---|
两个都是空序列,这个时候如何区分(哪边执行了)呢? |
为了验证执行路径是通过了 doWhenEmpty
的,你须要编写额外的代码,好比你须要一个这样的 Mono<Void>
:
在 3.1 版本之前,你须要为每一种状态维护一个 AtomicBoolean
变量,而后在相应的 doOn*
回调中观察它的值。这须要添加很多的额外代码。好在,版本 3.1.0 以后能够使用 PublisherProbe
来作, 以下:
@Test public void testCommandEmptyPathIsUsed() { PublisherProbe<Void> probe = PublisherProbe.empty(); StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) .verifyComplete(); probe.assertWasSubscribed(); probe.assertWasRequested(); probe.assertWasNotCancelled(); }
建立一个探针(probe),它会转化为一个空序列。 | |
---|---|
在须要使用 Mono<Void> 的位置调用 probe.mono() 来替换为探针。 |
|
序列结束以后,你能够用这个探针来判断序列是如何使用的,你能够检查是它从哪(条路径)被订阅的… | |
…对于请求也是同样的… | |
…以及是否被取消了。 |
你也能够在使用 Flux<T>
的位置经过调用 .flux()
方法来放置探针。若是你既须要用探针检查执行路径 还须要它可以发出数据,你能够用 PublisherProbe.of(Publisher)
方法包装一个 Publisher<T>
来搞定。
从命令式和同步式编程切换到响应式和异步式编程有时候是使人生畏的。 学习曲线中最陡峭的异步就是出错时如何分析和调试。
在命令式世界,调试一般都是很是直观的:直接看 stack trace 就能够找到问题出现的位置, 以及:是否问题责任所有出在你本身的代码?问题是否是发生在某些库代码?若是是, 那你的哪部分代码调用了库,是否是传参不合适致使的问题?
当你切换到异步代码,事情就变得复杂的多了。
看一下下边的 stack trace:
一段典型的 Reactor stack trace
java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:120) at reactor.core.publisher.FluxFlatMap$FlatMapMain.emitScalar(FluxFlatMap.java:380) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:349) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:119) at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:144) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:99) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:172) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:316) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:94) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:67) at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:98) at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58) at reactor.core.publisher.Mono.subscribeWith(Mono.java:2668) at reactor.core.publisher.Mono.subscribe(Mono.java:2629) at reactor.core.publisher.Mono.subscribe(Mono.java:2604) at reactor.core.publisher.Mono.subscribe(Mono.java:2582) at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:722)
这里边有好多信息,咱们获得了一个 IndexOutOfBoundsException
,内容是 "源发出了 不止一个元素"。
咱们也许能够很快假定这个源是一个 Flux/Mono,并经过下一行提到的 MonoSingle
肯定是 Mono。 看上去是来自一个 single
操做符的抱怨。
查看 Javadoc 中关于操做符 Mono#single
的说明,咱们看到 single
有一个规定: 源必须只能发出一个元素。看来是有一个源发出了多于一个元素,从而违反了这一规定。
咱们能够更进一步找出那个源吗?下边的这些内容帮不上什么忙,只是打印了一些内部的彷佛是一个响应式链的信息, 主要是一些 subscribe
和 request
的调用。
粗略过一下这些行,咱们至少能够勾画出一个大体的出问题的链:大概涉及一个 MonoSingle
、一个 FluxFlatMap
,以及一个 FluxRange
(每个都对应 trace 中的几行,但整体涉及这三个类)。 因此难道是 range().flatMap().single()
这样的链?
可是若是在咱们的应用中多处都用到这一模式,那怎么办?经过这些仍是不能肯定什么, 搜索 single
也找不到问题所在。最后一行指向了咱们的代码。咱们彷佛终于接近真相了。
不过,等等… 当咱们找到源码文件,咱们只能找到一个已存在的 Flux
被订阅了,以下:
toDebug.subscribe(System.out::println, Throwable::printStackTrace);
全部这些都发生在订阅时,可是 Flux
自己没有在这里 声明 。更糟的是, 当咱们找到变量声明的地方,咱们看到:
public Mono<String> toDebug; //请忽略 public 的属性
变量声明的地方并无 实例化 。咱们必须作最坏的打算,那就是在这个应用中, 可能在几个不一样的代码路径上对这个变量赋了值,但咱们不肯定是哪个致使了问题。
这是一种 Reactor 运行时错误,而不是编译错误。 | |
---|---|
咱们但愿找到的是操做符在哪里添加到操做链上的 —— 也就是 Flux
在哪里 声明的。咱们一般说这个 Flux
是被 组装(assembly) 的。
即使 stack trace 可以对有些许经验的开发者传递一些信息,可是在一些复杂的状况下, 这并非一种理想的方式。
幸运的是,Reactor 内置了一种面向调试的能力—— 操做期测量(assembly-time instrumentation)。
这经过 在应用启动的时候 (或至少在有问题的 Flux
或 Mono
实例化以前) 加入自定义的 Hook.onOperator
钩子(hook),以下:
Hooks.onOperatorDebug();
这行代码——经过包装操做符的构造方法,并在此捕捉 stack trace——来监测对这个 Flux
(或 Mono
)的操做符的调用(也就是“组装”链的地方)。因为这些在 操做链被声明的地方就搞定,这个 hook 应该在 早于 声明的时候被激活, 最保险的方式就是在你程序的最开始就激活它。
以后,若是发生了异常,致使失败的操做符可以找到捕捉点并补充 stack trace。
在下一小节,咱们看一下 stack trace 会有什么不一样,以及如何对其进行分析。
咱们在对上边的例子激活 operatorStacktrace
调试功能后,stack trace 以下:
java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:120) at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:314) ... ... at reactor.core.publisher.Mono.subscribeWith(Mono.java:2668) at reactor.core.publisher.Mono.subscribe(Mono.java:2629) at reactor.core.publisher.Mono.subscribe(Mono.java:2604) at reactor.core.publisher.Mono.subscribe(Mono.java:2582) at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:727) Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly trace from producer [reactor.core.publisher.MonoSingle] : reactor.core.publisher.Flux.single(Flux.java:5335) reactor.guide.GuideTests.scatterAndGather(GuideTests.java:689) reactor.guide.GuideTests.populateDebug(GuideTests.java:702) org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) org.junit.rules.RunRules.evaluate(RunRules.java:20) Error has been observed by the following operator(s): |_ Flux.single(TestWatcher.java:55)
这一条是新的:能够发现外层操做符捕捉到了 stack trace。 | |
---|---|
第一部分的 stack trace 多数与上边同样,显示了操做符内部的信息(因此省略了这一块)。 | |
从这里开始,是在调试模式下显示的内容。 | |
首先咱们得到了关于操做符组装的信息。 | |
以及错误沿着操做链传播的轨迹(从错误点到订阅点)。 | |
每个看到这个错误的操做符都会列出,包括类和行信息。若是操做符是在 Reactor 源码内部组装的,行信息会被忽略。 |
可见,捕获的 stack trace 做为 OnAssemblyException
添加到原始错误信息的以后。有两部分, 可是第一部分更加有意思。它显示了操做符触发异常的路径。这里显示的是 scatterAndGather
方法中的 single
致使的问题,而 scatterAndGather
方法是在 JUnit 中被 populateDebug
方法调用的。
既然咱们已经有足够的信息来查出罪魁祸首,咱们就来看一下 scatterAndGather
方法吧:
private Mono<String> scatterAndGather(Flux<String> urls) { return urls.flatMap(url -> doRequest(url)) .single(); }
找到了,就是这个 single 。 |
|
---|---|
如今咱们能够发现错误的根源是将多个 HTTP 请求转化为 URLs 的 flatMap
方法后边接的是 single
, 这太严格了。使用 git blame
找到代码做者,并同他讨论事后,发现他是原本是想用不那么严格的 take(1)
方法的。
咱们解决了问题。
错误被如下这些操做符观察(observed)了:
调试信息的第二部分在这个例子中意义不大,由于错误实际发生在最后一个操做符上(离 subscribe
最近的一个)。 另外一个例子可能更加清楚:
FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane")) .transform(FakeUtils1.applyFilters) .transform(FakeUtils2.enrichUser) .blockLast();
如今想象一下在 findAllUserByName
内部有个 map
方法报错了。咱们可能会看到以下的 trace:
Error has been observed by the following operator(s): |_ Flux.map(FakeRepository.java:27) |_ Flux.map(FakeRepository.java:28) |_ Flux.filter(FakeUtils1.java:29) |_ Flux.transform(GuideDebuggingExtraTests.java:41) |_ Flux.elapsed(FakeUtils2.java:30) |_ Flux.transform(GuideDebuggingExtraTests.java:42)
这与链上收到错误通知的操做符是一致:
map
。map
看到(都在 findAllUserByName
方法中)。filter
和一个 transform
看到,说明链的这部分是由一个可重复使用的转换方法组装的 (这里是 applyFilters
工具方法)。elapsed
和一个 transform
看到,相似的, elapsed
由第二个转换方法(enrichUser
) 组装。用这种形式的检测方式构造 stack trace 是成本较高的。也所以这种调试模式做为最终大招, 只应该在可控的方式下激活。
checkpoint()
方式替代调试模式是全局性的,会影响到程序中每个组装到一个 Flux
或 Mono
的操做符。好处在于能够进行 过后调试(after-the-fact debugging):不管错误是什么,咱们都会获得足够的调试信息。
就像前边见到的那样,这种全局性的调试会由于成本较高而影响性能(其影响在于生成的 stack traces 数量)。 若是咱们能大概定位到疑似出问题的操做符的话就能够不用花那么大的成本。然而,问题出现后, 咱们一般没法定位到哪个操做符可能存在问题,由于缺乏一些 trace 信息,咱们得修改代码, 打开调试模式,指望可以复现问题。
这种状况下,咱们须要切换到调试模式,并进行一些必要的准备工做以便可以更好的发现复现的问题, 并捕捉到全部的信息。(译者加:这两段感受有点废话。。。)
若是你能肯定是在你的代码中组装的响应式链存在问题,并且程序的可服务性又是很重要的, 那么你能够 使用 checkpoint() 操做符,它有两种调试技术可用。
你能够把这个操做符加到链中。这时 checkpoint
操做符就像是一个 hook,但只对它所在的链起做用。
还有一个 checkpoint(String)
的方法变体,你能够传入一个独特的字符串以方便在 assembly traceback 中识别信息。 这样会省略 stack trace,你能够依赖这个字符串(如下改称“定位描述符”)来定位到组装点。checkpoint(String)
比 checkpoint
有更低的执行成本。
checkpoint(String)
在它的输出中包含 "light" (能够方便用于搜索),以下所示:
... Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly site of producer [reactor.core.publisher.FluxElapsed] is identified by light checkpoint [light checkpoint identifier].
最后的但一样重要的是,若是你既想经过 checkpoint 添加定位描述符,同时又依赖于 stack trace 来定位组装点,你能够使用 checkpoint("description", true)
来实现这一点。这时回溯信息又出来了, 同时附加了定位描述符,以下例所示:
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly trace from producer [reactor.core.publisher.ParallelSource], described as [descriptionCorrelation1234] : reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:174) reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescription(FluxOnAssemblyTest.java:159) Error has been observed by the following operator(s): |_ ParallelFlux.checkpointnull
descriptionCorrelation1234 是经过 checkpoint 给出的定位描述符。 |
|
---|---|
定位描述符能够是静态的字符串、或人类可读的描述、或一个 correlation ID(例如, 来自 HTTP 请求头的信息)。
当全局调试模式和 checkpoint() 都开启的时候,checkpoint 的 stacks 输出会做为 suppressed 错误输出,按照声明顺序添加在操做符图(graph)的后面。 |
|
---|---|
除了基于 stack trace 的调试和分析,还有一个有效的工具能够跟踪异步序列并记录日志。
就是 log()
操做符。将其加到操做链上以后,它会读(只读,peek)每个 在其上游的 Flux
或 Mono
事件(包括 onNext
、onError
、 onComplete
, 以及 订阅、 取消、和 请求)。
边注:关于 logging 的具体实现
log
操做符经过 SLF4J 使用相似 Log4J 和 Logback 这样的公共的日志工具, 若是 SLF4J 不存在的话,则直接将日志输出到控制台。
控制台使用 System.err
记录 WARN
和 ERROR
级别的日志,使用 System.out
记录其余级别的日志。
若是你喜欢使用 JDK java.util.logging
,在 3.0.x 你能够设置 JDK 的系统属性 reactor.logging.fallback
。
假设咱们配置并激活了 logback,以及一个形如 range(1,10).take(3)
的操做链。经过将 log()
放在 take 以前, 咱们就能够看到它内部是如何运行的,以及什么样的事件会向上游传播给 range,以下所示:
Flux<Integer> flux = Flux.range(1, 10) .log() .take(3); flux.subscribe();
输出以下(经过 logger 的 console appender):
10:45:20.200 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(unbounded) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(2) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(3) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | cancel()
这里,除了 logger 本身的格式(时间、线程、级别、消息),log()
操做符 还输出了其余一些格式化的东西:
reactor.Flux.Range.1 是自动生成的日志 类别(category),以防你在操做链中屡次使用 同一个操做符。经过它你能够分辨出来是哪一个操做符的事件(这里是 range 的)。 你能够调用 log(String) 方法用自定义的类别替换这个标识符。在几个用于分隔的字符以后, 打印出了实际的事件。这里是一个 onSubscribe 调用、一个 request 调用、三个 onNext 调用, 以及一个 cancel 调用。对于第一行的 onSubscribe ,咱们知道了 Subscriber 的具体实现, 一般与操做符指定的实现是一致的,在方括号内有一些额外信息,包括这个操做符是否可以 经过同步或异步融合(fusion,具体见附录 [microfusion])的方式进行自动优化。 |
|
---|---|
第二行,咱们能够看到是一个由下游传播上来的个数无限的请求。 | |
而后 range 一下发出三个值。 | |
最后一行,咱们看到了 cancel() 。 |
最后一行,(4),最有意思。咱们看到 take
在这里发挥做用了。在它拿到足够的元素以后, 就将序列切断了。简单来讲,take()
致使源在发出用户请求的数量后 cancel()
了。
翻译建议 - "调试 Reactor"
这一章涉及以下的 Reactor 的高级特性与概念:
ConnectableFlux
对多个订阅者进行广播ParallelFlux
进行并行处理Schedulers
从代码整洁的角度来讲,重用代码是一个好办法。Reactor 提供了几种帮你打包重用代码的方式, 主要经过使用操做符或者经常使用的“操做符组合”的方法来实现。若是你以为一段操做链很经常使用, 你能够将这段操做链打包封装后备用。
transform
操做符transform
操做符能够将一段操做链封装为一个函数式(function)。这个函数式能在操做期(assembly time) 将被封装的操做链中的操做符还原并接入到调用 transform
的位置。这样作和直接将被封装的操做符 加入到链上的效果是同样的。示例以下:
Function<Flux<String>, Flux<String>> filterAndMap = f -> f.filter(color -> !color.equals("orange")) .map(String::toUpperCase); Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple")) .doOnNext(System.out::println) .transform(filterAndMap) .subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d));
上边例子的输出以下:
blue Subscriber to Transformed MapAndFilter: BLUE green Subscriber to Transformed MapAndFilter: GREEN orange purple Subscriber to Transformed MapAndFilter: PURPLE
compose
操做符compose
操做符与 transform
相似,也可以将几个操做符封装到一个函数式中。 主要的区别就是,这个函数式做用到原始序列上的话,是 基于每个订阅者的(on a per-subscriber basis) 。这意味着它对每个 subscription 能够生成不一样的操做链(经过维护一些状态值)。 以下例所示:
AtomicInteger ai = new AtomicInteger(); Function<Flux<String>, Flux<String>> filterAndMap = f -> { if (ai.incrementAndGet() == 1) { return f.filter(color -> !color.equals("orange")) .map(String::toUpperCase); } return f.filter(color -> !color.equals("purple")) .map(String::toUpperCase); }; Flux<String> composedFlux = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple")) .doOnNext(System.out::println) .compose(filterAndMap); composedFlux.subscribe(d -> System.out.println("Subscriber 1 to Composed MapAndFilter :"+d)); composedFlux.subscribe(d -> System.out.println("Subscriber 2 to Composed MapAndFilter: "+d));
上边的例子输出以下:
blue Subscriber 1 to Composed MapAndFilter :BLUE green Subscriber 1 to Composed MapAndFilter :GREEN orange purple Subscriber 1 to Composed MapAndFilter :PURPLE blue Subscriber 2 to Composed MapAndFilter: BLUE green Subscriber 2 to Composed MapAndFilter: GREEN orange Subscriber 2 to Composed MapAndFilter: ORANGE purple
到目前为止,咱们一直认为 Flux
(和 Mono
)都是这样的:它们都表明了一种异步的数据序列, 在订阅(subscribe)以前什么都不会发生。
可是实际上,广义上有两种发布者:“热”与“冷”(hot and cold)。
(本文档)到目前介绍的其实都是 cold 家族的发布者。它们为每个订阅(subscription) 都生成数据。若是没有建立任何订阅(subscription),那么就不会生成数据。
试想一个 HTTP 请求:每个新的订阅者都会触发一个 HTTP 调用,可是若是没有订阅者关心结果的话, 那就不会有任何调用。
另外一方面,热 发布者,不依赖于订阅者的数量。即便没有订阅者它们也会发出数据, 若是有一个订阅者接入进来,那么它就会收到订阅以后发出的元素。对于热发布者, 在你订阅它以前,确实已经发生了什么。
just
是 Reactor 中少数几个“热”操做符的例子之一:它直接在组装期(assembly time) 就拿到数据,若是以后有谁订阅它,就从新发送数据给订阅者。再拿 HTTP 调用举例,若是给 just
传入的数据是一个 HTTP 调用的结果,那么以后在初始化 just 的时候才会进行惟一的一次网络调用。
若是想将 just
转化为一种 冷 的发布者,你能够使用 defer
。它可以将刚才例子中对 HTTP 的请求延迟到订阅时(这样的话,对于每个新的订阅来讲,都会发生一次网络调用)。
Reactor 中多数其余的 热 发布者是扩展自 Processor 的。 |
|
---|---|
考虑其余两个例子,以下是第一个例子:
Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple")) .doOnNext(System.out::println) .filter(s -> s.startsWith("o")) .map(String::toUpperCase); source.subscribe(d -> System.out.println("Subscriber 1: "+d)); source.subscribe(d -> System.out.println("Subscriber 2: "+d));
第一个例子输出以下:
blue green orange Subscriber 1: ORANGE purple blue green orange Subscriber 2: ORANGE purple
两个订阅者都触发了全部的颜色,由于每个订阅者都会让构造 Flux
的操做符运行一次。
将下边的例子与第一个例子对比:
UnicastProcessor<String> hotSource = UnicastProcessor.create(); Flux<String> hotFlux = hotSource.publish() .autoConnect() .map(String::toUpperCase); hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d)); hotSource.onNext("blue"); hotSource.onNext("green"); hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d)); hotSource.onNext("orange"); hotSource.onNext("purple"); hotSource.onComplete();
第二个例子输出以下:
Subscriber 1 to Hot Source: BLUE Subscriber 1 to Hot Source: GREEN Subscriber 1 to Hot Source: ORANGE Subscriber 2 to Hot Source: ORANGE Subscriber 1 to Hot Source: PURPLE Subscriber 2 to Hot Source: PURPLE
第一个订阅者收到了全部的四个颜色,第二个订阅者因为是在前两个颜色发出以后订阅的, 故而收到了以后的两个颜色,在输出中有两次 "ORANGE" 和 "PURPLE"。从这个例子可见, 不管是否有订阅者接入进来,这个 Flux 都会运行。
ConnectableFlux
对多个订阅者进行广播有时候,你不只想要延迟到某一个订阅者订阅以后才开始发出数据,可能还但愿在多个订阅者 到齐 以后 才开始。
ConnectableFlux
的用意便在于此。Flux
API 中有两种主要的返回 ConnectableFlux
的方式:publish
和 replay
。
publish
会尝试知足各个不一样订阅者的需求(背压),并综合这些请求反馈给源。 尤为是若是有某个订阅者的需求为 0
,publish 会 暂停 它对源的请求。replay
将对第一个订阅后产生的数据进行缓存,最多缓存数量取决于配置(时间/缓存大小)。 它会对后续接入的订阅者从新发送数据。ConnectableFlux
提供了多种对下游订阅的管理。包括:
connect
当有足够的订阅接入后,能够对 flux 手动执行一次。它会触发对上游源的订阅。autoConnect(n)
与 connect 相似,不过是在有 n
个订阅的时候自动触发。refCount(n)
不只可以在订阅者接入的时候自动触发,还会检测订阅者的取消动做。若是订阅者数量不够, 会将源“断开链接”,再有新的订阅者接入的时候才会继续“连上”源。refCount(int, Duration)
增长了一个 "优雅的倒计时":一旦订阅者数量过低了,它会等待 Duration
的时间,若是没有新的订阅者接入才会与源“断开链接”。示例以下:
Flux<Integer> source = Flux.range(1, 3) .doOnSubscribe(s -> System.out.println("subscribed to source")); ConnectableFlux<Integer> co = source.publish(); co.subscribe(System.out::println, e -> {}, () -> {}); co.subscribe(System.out::println, e -> {}, () -> {}); System.out.println("done subscribing"); Thread.sleep(500); System.out.println("will now connect"); co.connect();
The preceding code produces the following output:
done subscribing will now connect subscribed to source 1 1 2 2 3 3
使用 autoConnect
:
Flux<Integer> source = Flux.range(1, 3) .doOnSubscribe(s -> System.out.println("subscribed to source")); Flux<Integer> autoCo = source.publish().autoConnect(2); autoCo.subscribe(System.out::println, e -> {}, () -> {}); System.out.println("subscribed first"); Thread.sleep(500); System.out.println("subscribing second"); autoCo.subscribe(System.out::println, e -> {}, () -> {});
以上代码输出以下:
subscribed first subscribing second subscribed to source 1 1 2 2 3 3
当你有许多的元素,而且想将他们分批处理,Reactor 整体上有三种方案:分组(grouping)、 窗口(windowing)(译者注:感受这个不翻译更明白。。。)、缓存(buffering)。 这三种在概念上相似,由于它们都是将 Flux<T>
进行汇集。分组和分段操做都会建立一个 Flux<Flux<T>>
,而缓存操做获得的是一个 Collection<T>
(译者注:应该是一个 Flux<Collection<T>>
)。
Flux<GroupedFlux<T>>
进行分组分组可以根据 key 将源 Flux<T>
拆分为多个批次。
对应的操做符是 groupBy
。
每一组用 GroupedFlux<T>
类型表示,使用它的 key()
方法能够获得该组的 key。
在组内,元素并不须要是连续的。当源发出一个新的元素,该元素会被分发到与之匹配的 key 所对应的组中(若是尚未该 key 对应的组,则建立一个)。
这意味着组:
StepVerifier.create( Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13) .groupBy(i -> i % 2 == 0 ? "even" : "odd") .concatMap(g -> g.defaultIfEmpty(-1) //若是组为空,显示为 -1 .map(String::valueOf) //转换为字符串 .startWith(g.key())) //以该组的 key 开头 ) .expectNext("odd", "1", "3", "5", "11", "13") .expectNext("even", "2", "4", "6", "12") .verifyComplete();
分组操做适用于分组个数很少的场景。并且全部的组都必须被消费,这样 groupBy 才能持续从上游获取数据。有时候这两种要求在一块儿——好比元素数量超多, 可是并行的用来消费的 flatMap 又太少的时候——会致使程序卡死。 |
|
---|---|
Flux<Flux<T>>
进行 window 操做window 操做是 根据个数、时间等条件,或可以定义边界的发布者(boundary-defining Publisher
), 把源 Flux<T>
拆分为 windows。
对应的操做符有 window
、windowTimeout
、windowUntil
、windowWhile
,以及 windowWhen
。
与 groupBy
的主要区别在于,窗口操做可以保持序列顺序。而且同一时刻最多只能有两个 window 是开启的。
它们 能够 重叠。操做符参数有 maxSize
和 skip
,maxSize
指定收集多少个元素就关闭 window,而 skip
指定收集多数个元素后就打开下一个 window。因此若是 maxSize > skip
的话, 一个新的 window 的开启会先于当前 window 的关闭, 从而两者会有重叠。
重叠的 window 示例以下:
StepVerifier.create( Flux.range(1, 10) .window(5, 3) //overlapping windows .concatMap(g -> g.defaultIfEmpty(-1)) //将 windows 显示为 -1 ) .expectNext(1, 2, 3, 4, 5) .expectNext(4, 5, 6, 7, 8) .expectNext(7, 8, 9, 10) .expectNext(10) .verifyComplete();
若是将两个参数的配置反过来(maxSize < skip ),序列中的一些元素就会被丢弃掉, 而不属于任何 window。 |
|
---|---|
对基于判断条件的 windowUntil
和 windowWhile
,若是序列中的元素不匹配判断条件, 那么可能致使 空 windows,以下例所示:
StepVerifier.create( Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13) .windowWhile(i -> i % 2 == 0) .concatMap(g -> g.defaultIfEmpty(-1)) ) .expectNext(-1, -1, -1) //分别被奇数 1 3 5 触发 .expectNext(2, 4, 6) // 被 11 触发 .expectNext(12) // 被 13 触发 .expectNext(-1) // 空的 completion window,若是 onComplete 前的元素可以匹配上的话就没有这个了 .verifyComplete();
Flux<List<T>>
进行缓存缓存与窗口相似,不一样在于:缓存操做以后会发出 buffers (类型为Collection<T>
, 默认是 List<T>
),而不是 windows (类型为 Flux<T>
)。
缓存的操做符与窗口的操做符是对应的:buffer
、bufferTimeout
、bufferUntil
、bufferWhile
, 以及bufferWhen
。
若是说对于窗口操做符来讲,是开启一个窗口,那么对于缓存操做符来讲,就是建立一个新的集合, 而后对其添加元素。而窗口操做符在关闭窗口的时候,缓存操做符则是发出一个集合。
缓存操做也会有丢弃元素或内容重叠的状况,以下:
StepVerifier.create( Flux.range(1, 10) .buffer(5, 3) // 缓存重叠 ) .expectNext(Arrays.asList(1, 2, 3, 4, 5)) .expectNext(Arrays.asList(4, 5, 6, 7, 8)) .expectNext(Arrays.asList(7, 8, 9, 10)) .expectNext(Collections.singletonList(10)) .verifyComplete();
不像窗口方法,bufferUntil
和 bufferWhile
不会发出空的 buffer,以下例所示:
StepVerifier.create( Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13) .bufferWhile(i -> i % 2 == 0) ) .expectNext(Arrays.asList(2, 4, 6)) // 被 11 触发 .expectNext(Collections.singletonList(12)) // 被 13 触发 .verifyComplete();
ParallelFlux
进行并行处理现在多核架构已然普及,可以方便的进行并行处理是很重要的。Reactor 提供了一种特殊的类型 ParallelFlux
来实现并行,它可以将操做符调整为并行处理方式。
你能够对任何 Flux
使用 parallel()
操做符来获得一个 ParallelFlux
. 不过这个操做符自己并不会进行并行处理,而是将负载划分到多个“轨道(rails)”上 (默认状况下,轨道个数与 CPU 核数相等)。
为了配置 ParallelFlux 如何并行地执行每个轨道,你须要使用 runOn(Scheduler)
。 注意,Schedulers.parallel()
是推荐的专门用于并行处理的调度器。
下边有两个用于比较的例子,第一个以下:
Flux.range(1, 10) .parallel(2) .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
咱们给定一个轨道数字,而不是依赖于 CPU 核数。 | |
---|---|
下边是第二个例子:
Flux.range(1, 10) .parallel(2) .runOn(Schedulers.parallel()) .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
第一个例子输出以下:
main -> 1 main -> 2 main -> 3 main -> 4 main -> 5 main -> 6 main -> 7 main -> 8 main -> 9 main -> 10
第二个例子在两个线程中并行执行,输出以下:
parallel-1 -> 1 parallel-2 -> 2 parallel-1 -> 3 parallel-2 -> 4 parallel-1 -> 5 parallel-2 -> 6 parallel-1 -> 7 parallel-1 -> 9 parallel-2 -> 8 parallel-2 -> 10
若是在并行地处理以后,须要退回到一个“正常”的 Flux
而使后续的操做链按非并行模式执行, 你能够对 ParallelFlux
使用 sequential()
方法。
注意,当你在对 ParallelFlux 使用一个 Subscriber
而不是基于 lambda 进行订阅(subscribe()
) 的时候,sequential()
会自动地被偷偷应用。
注意 subscribe(Subscriber<T>)
会合并全部的执行轨道,而 subscribe(Consumer<T>)
会在全部轨道上运行。 若是 subscribe()
方法中是一个 lambda,那么有几个轨道,lambda 就会被执行几回。
你还能够使用 groups()
做为 Flux<GroupedFlux<T>>
进入到各个轨道或组里边, 而后能够经过 composeGroup()
添加额外的操做符。
Schedulers
就像咱们在 调度器(Schedulers) 这一节看到的那样, Reactor Core 内置许多 Scheduler
的具体实现。 你能够用形如 new*
的工厂方法来建立调度器,每一种调度器都有一个单例对象,你能够使用单例工厂方法 (好比 Schedulers.elastic()
而不是 Schedulers.newElastic()
)来获取它。
当你不明确指定调度器的时候,那些须要调度器的操做符会使用这些默认的单例调度器对象。例如, Flux#delayElements(Duration)
使用的是 Schedulers.parallel()
调度器对象。
然而有些状况下,你可能须要“一刀切”(就不用对每个操做符都传入你本身的调度器做为参数了) 地调整这些默认调度器。 一个典型的例子就是,假设你须要对每个被调度的任务统计执行时长, 就想把默认的调度器包装一下,而后添加计时功能。
那么能够使用 Schedulers.Factory
类来改变默认的调度器。默认状况下,一个 Factory
会使用一些“命名比较直白” 的方法来建立全部的标准 Scheduler
。每个方法你均可以用本身的实现方式来重写。
此外,Factory
还提供一个额外的自定义方法 decorateExecutorService
。它会在建立每个 reactor-core 调度器——内部有一个 ScheduledExecutorService
(即便是好比用 Schedulers.newParallel()
方法建立的这种非默认的调度器)——的时候被调用。
你能够经过调整 ScheduledExecutorService
来改变调度器:(译者加:decorateExecutorService
方法)经过一个 Supplier
参数暴露出来,你能够直接绕过这个 supplier 返回你本身的调度器实例,或者用 (译者加: Schedulers.ScheduledExecutorService
的)get()
获得默认实例,而后包装它, 这取决于配置的调度器类型。
当你搞定了一个定制好的 Factory 后,你必须使用 Schedulers.setFactory(Factory) 方法来安装它。 |
|
---|---|
最后,对于调度器来讲,有一个可自定义的 hook:onHandleError
。这个 hook 会在提交到这个调度器的 Runnable
任务抛出异常的时候被调用(注意,若是还设置了一个 UncaughtExceptionHandler
, 那么它和 hook 都会被调用)。
Reactor 还有另一类可配置的应用于多种场合的回调,它们都在 Hooks
类中定义,整体来讲有三类:
当生成源的操做符不听从响应式流规范的时候,Dropping hooks(用于处理丢弃事件的 hooks)会被调用。 这种类型的错误是处于正常的执行路径以外的(也就是说它们不能经过 onError
传播)。
典型的例子是,假设一个发布者即便在被调用 onCompleted
以后仍然能够经过操做符调用 onNext
。 这种状况下,onNext
的值会被 丢弃,若是有多余的 onError
的信号亦是如此。
相应的 hook,onNextDropped
以及 onErrorDropped
,能够提供一个全局的 Consumer
, 以便可以在丢弃的状况发生时进行处理。例如,你能够使用它来对丢弃事件记录日志,或进行资源清理 (使用资源的值可能压根没有到达响应式链的下游)。
连续设置两次 hook 的话都会起做用:提供的每个 consumer 都会被调用。使用 Hooks.resetOn*Dropped()
方法能够将 hooks 所有重置为默认。
若是操做符在执行其 onNext
、onError
以及 onComplete
方法的时候抛出异常,那么 onOperatorError
这一个 hook 会被调用。
与上一类 hook 不一样,这个 hook 仍是处在正常的执行路径中的。一个典型的例子就是包含一个 map 函数式的 map
操做符抛出的异常(好比零做为除数),这时候仍是会执行到 onError
的。
首先,它会将异常传递给 onOperatorError
。利用这个 hook 你能够检查这个错误(以及有问题的相关数据), 并能够 改变 这个异常。固然你还能够作些别的事情,好比记录日志或返回原始异常。
注意,onOperatorError
hook 也能够被屡次设置:你能够提供一个 String
为一个特别的 BiFunction
类型的函数式设置识别符,不一样识别符的函数式都会被执行,固然,重复使用一个识别符的话, 则后来的设置会覆盖前边的设置。
所以,默认的 hook 能够使用 Hooks.resetOnOperatorError()
方法重置,而提供识别符的 hook 能够使用 Hooks.resetOnOperatorError(String)
方法来重置。
这些组装(assembly) hooks 关联了操做符的生命周期。它们会在一个操做链被组装起来的时候(即实例化的时候) 被调用。每个新的操做符组装到操做链上的时候,onEachOperator
都会返回一个不一样的发布者, 从而能够利用它动态调整操做符。onLastOperator
与之相似,不过只会在被操做链上的最后一个 (subscribe
调用以前的)操做符调用。
相似于 onOperatorError
,也能够叠加,而且经过识别符来标识。也是用相似的方式重置所有或部分 hooks。
Hooks
工具类还提供了一些预置的 hooks。利用他们能够改变一些默认的处理方式,而不用本身 编写 hook:
onNextDroppedFail()
:onNextDropped
一般会抛出 Exceptions.failWithCancel()
异常。 如今它默认还会以 DEBUG 级别对被丢弃的值记录日志。若是想回到原来的只是抛出异常的方式,使用 onNextDroppedFail()
。onOperatorDebug()
: 这个方法会激活 debug mode。它与 onOperatorError
hook 关联,因此调用 resetOnOperatorError()
同时也会重置它。不过它内部也用到了特别的识别符, 你能够经过 resetOnOperatorDebug()
方法来重置它。当从命令式编程风格切换到响应式编程风格的时候,一个技术上最大的挑战就是线程处理。
与习惯作法不一样的是,在响应式编程中,一个线程(Thread
)能够被用于处理多个同时运行的异步序列 (其实是非阻塞的)。执行过程也会常常从一个线程切换到另外一个线程。
这样的状况下,对于开发者来讲,若是依赖线程模型中相对“稳定”的特性——好比 ThreadLocal
——就会变得很难。由于它会让你将数据绑定到一个 线程 上,因此在响应式环境中使用就变得 比较困难。所以,将使用了 ThreadLocal
的库应用于 Reactor 的时候就会带来新的挑战。一般会更糟, 它用起来效果会更差,甚至会失败。 好比,使用 Logback 的 MDC 来存储日志关联的 ID,就是一个很是符合 这种状况的例子。
一般的对 ThreadLocal
的替代方案是将环境相关的数据 C
,同业务数据 T
一块儿置于序列中, 好比使用 Tuple2<T, C>
。这种方案看起来并很差,何况会在方法和 Flux
泛型中暴露环境数据信息。
自从版本 3.1.0
,Reactor 引入了一个相似于 ThreadLocal
的高级功能:Context
。它做用于一个 Flux
或一个 Mono
上,而不是应用于一个线程(Thread
)。
为了说明,这里有个读写 Context
的简单例子:
String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello World") .verifyComplete();
接下来的几个小节,咱们来了解 Context
是什么以及如何用,从而最终能够理解上边的例子。
这是一个主要面向库开发人员的高级功能。这须要开发者对 Subscription 的生命周期 充分理解,而且明白它主要用于 subscription 相关的库。 |
|
---|---|
Context
APIContext
是一个相似于 Map
(这种数据结构)的接口:它存储键值(key-value)对,你须要经过 key 来获取值:
Object
类型,因此 Context
能够包含任意数量的任意对象。Context
是 不可变的(immutable)。put(Object key, Object value)
方法来存储一个键值对,返回一个新的 Context
对象。 你也能够用 putAll(Context)
方法将两个 context 合并为一个新的 context。hasKey(Object key)
方法检查一个 key 是否已经存在。getOrDefault(Object key, T defaultValue)
方法取回 key 对应的值(类型转换为 T
), 或在找不到这个 key 的状况下返回一个默认值。getOrEmpty(Object key)
来获得一个 Optional<T>
(context 会尝试将值转换为 T
)。delete(Object key)
来删除 key 关联的值,并返回一个新的 Context
。建立一个 Context 时,你能够用静态方法 Context.of 预先存储最多 5 个键值对。 它接受 2, 4, 6, 8 或 10 个 Object 对象,两两一对做为键值对添加到 Context 。 你也能够用 Context.empty() 方法来建立一个空的 Context 。 |
|
---|---|
Context
绑定到 Flux
and Writing为了使用 context,它必需要绑定到一个指定的序列,而且链上的每一个操做符均可以访问它。 注意,这里的操做符必须是 Reactor 内置的操做符,由于 Context
是 Reactor 特有的。
实际上,一个 Context
是绑定到每个链中的 Subscriber
上的。 它使用 Subscription
的传播机制来让本身对每个操做符均可见(从最后一个 subscribe
沿链向上)。
为了填充 Context
——只能在订阅时(subscription time)——你须要使用 subscriberContext
操做符。
subscriberContext(Context)
方法会将你提供的 Context
与来自下游(记住,Context
是从下游 向上游传播的)的 Context
合并。 这经过调用 putAll
实现,最后会生成一个新的 Context
给上游。
你也能够用更高级的 subscriberContext(Function<Context, Context>) 。它接受来自下游的 Context ,而后你能够根据须要添加或删除值,而后返回新的 Context 。你甚至能够返回一个彻底不一样 的对象,虽然不太建议这样(这样可能影响到依赖这个 Context 的库)。 |
|
---|---|
填充 Context
是一方面,读取数据一样重要。多数时候,添加内容到 Context
是最终用户的责任, 可是利用这些信息是库的责任,由于库一般是客户代码的上游。
读取 context 数据使用静态方法 Mono.subscriberContext()
。
本例的初衷是为了让你对如何使用 Context
有个更好的理解。
让咱们先回头看一下最初的例子:
String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello World") .verifyComplete();
操做链以调用 subscriberContext(Function) 结尾,将 "World" 做为 "message" 这个 key 的 值添加到 Context 中。 |
|
---|---|
对源调用 flatMap 用 Mono.subscriberContext() 方法拿到 Context 。 |
|
而后使用 map 读取关联到 "message" 的值,而后与原来的值链接。 |
|
最后 Mono<String> 确实发出了 "Hello World" 。 |
上边的数字顺序并非按照代码行顺序排的,这并不是错误:它表明了执行顺序。虽然 subscriberContext 是链上的最后一个环节,但确实最早执行的(缘由在于 subscription 信号 是从下游向上的)。 |
|
---|---|
注意在你的操做链中,写入 与 读取 Context
的 相对位置 很重要:由于 Context
是不可变的,它的内容只能被上游的操做符看到,以下例所示:
String key = "message"; Mono<String> r = Mono.just("Hello") .subscriberContext(ctx -> ctx.put(key, "World")) .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger"))); StepVerifier.create(r) .expectNext("Hello Stranger") .verifyComplete();
写入 Context 的位置太靠上了… |
|
---|---|
因此在 flatMap 就没有 key 关联的值,使用了默认值。 |
|
结果 Mono<String> 发出了 "Hello Stranger" 。 |
下面的例子一样说明了 Context
的不可变性(Mono.subscriberContext()
老是返回由 subscriberContext
配置的 Context
):
String key = "message"; Mono<String> r = Mono.subscriberContext() .map( ctx -> ctx.put(key, "Hello")) .flatMap( ctx -> Mono.subscriberContext()) .map( ctx -> ctx.getOrDefault(key,"Default")); StepVerifier.create(r) .expectNext("Default") .verifyComplete();
拿到 Context 。 |
|
---|---|
在 map 方法中咱们尝试修改它。 |
|
在 flatMap 中再次获取 Context 。 |
|
读取 Context 中可能的值。 |
|
值历来没有被设置为 "Hello" 。 |
相似的,若是屡次对 Context
中的同一个 key 赋值的话,要看 写入的相对顺序 : 读取 Context
的操做符只能拿到下游最近的一次写入的值,以下例所示:
String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "Reactor")) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello Reactor") .verifyComplete();
写入 "message" 的值。 |
|
---|---|
另外一次写入 "message" 的值。 |
|
map 方法值能拿到下游最近的一次写入的值: "Reactor" 。 |
这里,首先 Context
中的 key 被赋值 "World"
。而后订阅信号(subscription signal)向上游 移动,又发生了另外一次写入。此次生成了第二个不变的 Context
,里边的值是 "Reactor"
。以后, 数据开始流动, flatMap
拿到最近的 Context
,也就是第二个值为 Reactor
的 Context
。
你可能会以为 Context
是与数据信号一块传播的。若是是那样的话,在两次写入操做中间加入的一个 flatMap
会使用最上游的这个 Context
。但并非这样的,以下:
String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "Reactor")) .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello Reactor World") .verifyComplete();
这里是第一次赋值。 | |
---|---|
这里是第二次赋值。 | |
第一个 flatMap 看到的是第二次的赋值。 |
|
第二个 flatMap 将上一个的结果与 第一次赋值 的 context 值链接。 |
|
Mono 发出的是 "Hello Reactor World" 。 |
缘由在于 Context
是与 Subscriber
关联的,而每个操做符访问的 Context
来自其下游的 Subscriber
。
最后一个有意思的传播方式是,对 Context
的赋值也能够在一个 flatMap
内部,以下:
String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key)) ) .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key)) .subscriberContext(ctx -> ctx.put(key, "Reactor")) ) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello World Reactor") .verifyComplete();
这个 subscriberContext 不会影响所在 flatMap 以外的任何东西。 |
|
---|---|
这个 subscriberContext 会影响主序列的 Context 。 |
上边的例子中,最后发出的值是 "Hello World Reactor"
而不是 "Hello Reactor World",由于赋值 "Reactor" 的 subscriberContext
是做用于第二个 flatMap
的内部序列的。因此不会在主序列可见/ 传播,第一个 flatMap
也看不到它。传播(Propagation) + 不可变性(immutability)将相似 flatMap
这样的操做符中的建立的内部序列中的 Context
与外部隔离开来。
让咱们来看一个实际的从 Context
中读取值的例子:一个响应式的 HTTP 客户端将一个 Mono<String>
(用于 PUT
请求)做为数据源,同时经过一个特定的 key 使用 Context 将关联的ID信息放入请求头中。
从用户角度,是这样调用的:
doPut("www.example.com", Mono.just("Walter"))
为了传播一个关联ID,应该这样调用:
doPut("www.example.com", Mono.just("Walter")) .subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
由上可见,用户代码使用了 subscriberContext
来为 Context
的 HTTP_CORRELATION_ID
赋值。上游的操做符是一个由 HTTP 客户端库返回的 Mono<Tuple2<Integer, String>>
(一个简化的 HTTP 响应)。因此可以正确将信息从用户代码传递给库代码。
下边的例子演示了从库的角度由 context 读取值的模拟代码,若是可以找到关联ID,则“增长请求”:
static final String HTTP_CORRELATION_ID = "reactive.http.library.correlationId"; Mono<Tuple2<Integer, String>> doPut(String url, Mono<String> data) { Mono<Tuple2<String, Optional<Object>>> dataAndContext = data.zipWith(Mono.subscriberContext() .map(c -> c.getOrEmpty(HTTP_CORRELATION_ID))); return dataAndContext .<String>handle((dac, sink) -> { if (dac.getT2().isPresent()) { sink.next("PUT <" + dac.getT1() + "> sent to " + url + " with header X-Correlation-ID = " + dac.getT2().get()); } else { sink.next("PUT <" + dac.getT1() + "> sent to " + url); } sink.complete(); }) .map(msg -> Tuples.of(200, msg)); }
用 Mono.subscriberContext() 拿到 Context 。 |
|
---|---|
提取出关联ID的值——是一个 Optional 。 |
|
若是值存在,那么就将其加入请求头。 |
在这段库代码片断中,你能够看到它是如何将 Mono
和 Mono.subscriberContext()
zip 起来的。 返回的是一个 Tuple2<String, Context>
,这个 Context
包含来自下游的 HTTP_CORRELATION_ID
的值。
库代码接着用 map
读取出那个 key 的值 Optional<String>
,若是值存在,将其做为 X-Correlation-ID
请求头。 最后一块而用 handle
来处理。
用来验证上边的库代码的测试程序以下:
@Test public void contextForLibraryReactivePut() { Mono<String> put = doPut("www.example.com", Mono.just("Walter")) .subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf")) .filter(t -> t.getT1() < 300) .map(Tuple2::getT2); StepVerifier.create(put) .expectNext("PUT <Walter> sent to www.example.com with header X-Correlation-ID = 2-j3r9afaf92j-afkaf") .verifyComplete(); }
虽然 Java 的类型系统没有表达空值安全(null-safety)的机制,可是 Reactor 如今提供了基于注解的用于声明 “可能为空(nullability)”的 API,相似于 Spring Framework 5 中提供的 API。
Reactor 自身就用到了这些注解,你也能够将其用于任何基于 Reactor 的本身的空值安全的 Java API 中。 不过,在 方法体内部 对“可能为空”的类型的使用就不在这一特性的范围内了。
这些注解是基于 JSR 305 的注解(是受相似 IntelliJ IDEA 这样的工具支持的 JSR)做为元注解(meta-annotated)的。当 Java 开发者在编写空值安全的代码时, 它们可以提供有用的警告信息,以便避免在运行时(runtime)出现 NullPointerException
异常。 JSR 305 元注解使得工具提供商能够以一种通用的方式提供对空值安全的支持,从而 Reactor 的注解就不用重复造轮子了。
对于 Kotlin 1.1.5+,须要(同时也推荐)在项目 classpath 中添加对 JSR 305 的依赖。 | |
---|---|
它们也可在 Kotlin 中使用,Kotlin 原生支持 空值安全。具体请参考 this dedicated section 。
reactor.util.annotation
包提供如下注解:
@NonNull
代表一个具体的参数、返回值或域值不能为 null
。 (若是参数或返回值应用了 @NonNullApi
则无需再加它)。@Nullable
代表一个参数、返回值或域值能够为 null
。@NonNullApi
是一个包级别的注解,代表默认状况下参数或返回值不能为 null
。(Reactor 的空值安全的注解)对于通用类型参数(generic type arguments)、可变参数(varargs),以及数组元素(array elements) 尚不支持。参考 issue #878 查看最新信息。 | |
---|---|
TIP:在这一节,若是一个操做符是专属于 Flux
或 Mono
的,那么会给它注明前缀。 公共的操做符没有前缀。若是一个具体的用例涉及多个操做符的组合,这里以方法调用的方式展示, 会以一个点(.)开头,并将参数置于圆括号内,好比: .methodCall(parameter)
。
我想搞定:
T
,我已经有了:just
Optional<T>
:Mono#justOrEmpty(Optional<T>)
null
的 T:Mono#justOrEmpty(T)
T
,且仍是由 just
方法返回
Mono#fromSupplier
或用 defer
包装 just
T
,这些元素我能够明确列举出来:Flux#just(T...)
Flux#fromArray
Flux#fromIterable
Flux#range
Stream
提供给每个订阅:Flux#fromStream(Supplier<Stream>)
Supplier<T>
:Mono#fromSupplier
Mono#fromCallable
,Mono#fromRunnable
CompletableFuture<T>
:Mono#fromFuture
empty
error
Throwable
:error(Supplier<Throwable>)
never
defer
using
Flux#generate
Flux#create
(Mono#create
也是异步的,只不过只能发一个)map
cast
Flux#index
flatMap
+ 使用一个工厂方法handle
flatMap
+ 一个异步的返回类型为 Publisher
的方法
Mono.empty()
Flux#flatMapSequential
(对每一个元素的异步任务会当即执行,但会将结果按照原序列顺序排序)Mono#flatMapMany
Flux#startWith(T...)
Flux#concatWith(T...)
Flux
转化为集合(一下都是针对 Flux
的)
collectList
,collectSortedList
collectMap
,collectMultiMap
collect
count
reduce
scan
all
any
hasElements
hasElement
Flux#concat
或 .concatWith(other)
Flux#concatDelayError
Flux#mergeSequential
Flux#merge
/ .mergeWith(other)
Flux#zip
/ Flux#zipWith
Tuple2
:Mono#zipWith
Mono#zip
Mono<Void>
:Mono#and
Mono<Void>
:Mono#when
Flux#zip
Flux#combineLatest
Flux#first
,Mono#first
,mono.or (otherMono).or(thirdMono)
,`flux.or(otherFlux).or(thirdFlux)flatMap
,不过“喜新厌旧”):switchMap
switchOnNext
repeat
Flux.interval(duration).flatMap(tick -> myExistingPublisher)
defaultIfEmpty
switchIfEmpty
ignoreElements
Mono
来表示序列已经结束:then
thenEmpty
Mono
:Mono#then(mono)
Mono#thenReturn(T)
Flux
:thenMany
Mono#delayUntilOther
Mono#delayUntil(Function)
expand(Function)
expandDeep(Function)
doOnNext
Flux#doOnComplete
,Mono#doOnSuccess
doOnError
doOnCancel
doOnSubscribe
doOnRequest
doOnTerminate
(Mono的方法可能包含有结果)
doAfterTerminate
Signal
):Flux#doOnEach
doFinally
log
single
对象:
doOnEach
single
对象:materialize
dematerialize
log
filter
filterWhen
ofType
ignoreElements
Flux#distinct
Flux#distinctUntilChanged
Flux#take(long)
Flux#take(Duration)
Mono
中返回:Flux#next()
request(N)
而不是取消:Flux#limitRequest(long)
Flux#takeLast
Flux#takeUntil
(基于判断条件),Flux#takeUntilOther
(基于对 publisher 的比较)Flux#takeWhile
Flux#elementAt
.takeLast(1)
Flux#last()
Flux#last(T)
Flux#skip(long)
Flux#skip(Duration)
Flux#skipLast
Flux#skipUntil
(基于判断条件),Flux#skipUntilOther
(基于对 publisher 的比较)Flux#skipWhile
Flux#sample(Duration)
sampleFirst
Flux#sample(Publisher)
Flux#sampleTimeout
(每个元素会触发一个 publisher,若是这个 publisher 不被下一个元素触发的 publisher 覆盖就发出这个元素)Flux#single()
Flux#single(T)
Flux#singleOrEmpty
error
…
Flux
:.concat(Flux.error(e))
Mono
:.then(Mono.error(e))
timeout
error(Supplier<Throwable>)
error
onErrorReturn
Flux
或 Mono
:onErrorResume
.onErrorMap(t -> new RuntimeException(t))
doFinally
using
工厂方法onErrorReturn
Publisher
:Flux#onErrorResume
和 Mono#onErrorResume
retry
retryWhen
IllegalStateException
:Flux#onBackpressureError
Flux#onBackpressureDrop
Flux#onBackpressureLatest
Flux#onBackpressureBuffer
Flux#onBackpressureBuffer
带有策略 BufferOverflowStrategy
Tuple2<Long, T>
…
elapsed
timestamp
timeout
Flux#interval
0
:static Mono.delay
.Mono#delayElement
,Flux#delayElements
delaySubscription
Flux
Flux<T>
拆分为一个 Flux<Flux<T>>
:
window(int)
window(int, int)
window(Duration)
window(Duration, Duration)
windowTimeout(int, Duration)
windowUntil
cutBefore
变量):.windowUntil(predicate, true)
windowWhile
(不知足条件的元素会被丢弃)window(Publisher)
,windowWhen
Flux<T>
的元素拆分到集合…
List
:
buffer(int)
buffer(int, int)
buffer(Duration)
buffer(Duration, Duration)
bufferTimeout(int, Duration)
bufferUntil(Predicate)
.bufferUntil(predicate, true)
bufferWhile(Predicate)
buffer(Publisher)
,bufferWhen
buffer(int, Supplier<C>)
Flux<T>
中具备共同特征的元素分组到子 Flux:groupBy(Function<T,K>)
TIP:注意返回值是 Flux<GroupedFlux<K, T>>
,每个 GroupedFlux
具备相同的 key 值 K
,能够经过 key()
方法获取。Flux<T>
,我想:
Flux#blockFirst
Flux#blockFirst(Duration)
Flux#blockLast
Flux#blockLast(Duration)
Iterable<T>
:Flux#toIterable
Stream<T>
:Flux#toStream
Mono<T>
,我想:
Mono#block
Mono#block(Duration)
CompletableFuture<T>
:Mono#toFuture
翻译建议 - "我须要哪一个操做符?"
不少时候,信息源是同步和阻塞的。在 Reactor 中,咱们用如下方式处理这种信息源:
Mono blockingWrapper = Mono.fromCallable(() -> { return /* make a remote synchronous call */ }); blockingWrapper = blockingWrapper.subscribeOn(Schedulers.elastic());
使用 fromCallable 方法生成一个 Mono; |
|
---|---|
返回同步、阻塞的资源; | |
使用 Schedulers.elastic() 确保对每个订阅来讲运行在一个专门的线程上。 |
由于调用返回一个值,因此你应该使用 Mono。你应该使用 Schedulers.elastic
由于它会建立一个专门的线程来等待阻塞的调用返回。
注意 subscribeOn
方法并不会“订阅”这个 Mono
。它只是指定了订阅操做使用哪一个 Scheduler
。
Flux
上的操做符好像没起做用,为啥?请确认你确实对调用 .subscribe()
的发布者应用了这个操做符。
Reactor 的操做符是装饰器(decorators)。它们会返回一个不一样的(发布者)实例, 这个实例对上游序列进行了包装并增长了一些的处理行为。因此,最推荐的方式是将操做符“串”起来。
对比下边的两个例子:
没有串起来(不正确的)
Flux<String> flux = Flux.just("foo", "chain"); flux.map(secret -> secret.replaceAll(".", "*")); flux.subscribe(next -> System.out.println("Received: " + next));
问题在这, flux 变量并无改变。 |
|
---|---|
串起来(正确的)
Flux<String> flux = Flux.just("foo", "chain"); flux = flux.map(secret -> secret.replaceAll(".", "*")); flux.subscribe(next -> System.out.println("Received: " + next));
下边的例子更好(由于更简洁):
串起来(最好的)
Flux<String> secrets = Flux .just("foo", "chain") .map(secret -> secret.replaceAll(".", "*")) .subscribe(next -> System.out.println("Received: " + next));
第一个例子的输出:
Received: foo Received: chain
后两个例子的输出:
Received: *** Received: *****
Mono
zipWith
/zipWhen
没有被调用例子
myMethod.process("a") // 这个方法返回 Mono<Void> .zipWith(myMethod.process("b"), combinator) //没有被调用 .subscribe();
若是源 Mono
为空或是一个 Mono<Void>
(Mono<Void>
一般用于“空”的场景), 下边的组合操做就不会被调用。
对于相似 zipWith
的用于转换的操做符来讲,这是比较典型的场景。 这些操做符依赖于数据元素来转换为输出的元素。 若是任何一个序列是空的,则返回的就是一个空序列,因此请谨慎使用。 例如在 then()
以后使用 zipWith()
就会致使这一问题。
对于以 Function
做为参数的 and
更是如此,由于返回的 Mono 是依赖于收到的数据懒加载的(而对于空序列或 Void
的序列来讲是没有数据发出来的)。
你能够使用 .defaultIfEmpty(T)
将空序列替换为包含 T
类型缺省值的序列(而不是 Void
序列), 从而能够避免相似的状况出现。举例以下:
在 zipWhen
前使用 defaultIfEmpty
myMethod.emptySequenceForKey("a") // 这个方法返回一个空的 Mono<String> .defaultIfEmpty("") // 将空序列转换为包含字符串 "" 的序列 .zipWhen(aString -> myMethod.process("b")) // 当 "" 发出时被调用 .subscribe();
retryWhen
来实现 retry(3)
的效果?retryWhen
方法比较复杂,但愿下边的一段模拟 retry(3)
的代码可以帮你更好地理解它的工做方式:
Flux<String> flux = Flux.<String>error(new IllegalArgumentException()) .retryWhen(companion -> companion .zipWith(Flux.range(1, 4), (error, index) -> { if (index < 4) return index; else throw Exceptions.propagate(error); }) );
技巧一:使用 zip 和一个“重试个数 + 1”的 range 。 |
|
---|---|
zip 方法让你能够在对重试次数计数的同时,仍掌握着原始的错误(error)。 |
|
容许三次重试,小于 4 的时候发出一个值。 | |
为了使序列以错误结束。咱们将原始异常在三次重试以后抛出。 |
retryWhen
进行 exponential backoff?Exponential backoff 的意思是进行的屡次重试之间的间隔愈来愈长, 从而避免对源系统形成过载,甚至宕机。基本原理是,若是源产生了一个错误, 那么已是处于不稳定状态,可能不会马上复原。因此,若是马上就重试可能会产生另外一个错误, 致使源更加不稳定。
下面是一段实现 exponential backoff 效果的例子,每次重试的间隔都会递增 (伪代码: delay = attempt number * 100 milliseconds):
Flux<String> flux = Flux.<String>error(new IllegalArgumentException()) .retryWhen(companion -> companion .doOnNext(s -> System.out.println(s + " at " + LocalTime.now())) .zipWith(Flux.range(1, 4), (error, index) -> { if (index < 4) return index; else throw Exceptions.propagate(error); }) .flatMap(index -> Mono.delay(Duration.ofMillis(index * 100))) .doOnNext(s -> System.out.println("retried at " + LocalTime.now())) );
记录错误出现的时间; | |
---|---|
使用 retryWhen + zipWith 的技巧实现重试3次的效果; |
|
经过 flatMap 来实现延迟时间递增的效果; |
|
一样记录重试的时间。 |
订阅它,输出以下:
java.lang.IllegalArgumentException at 18:02:29.338 retried at 18:02:29.459 java.lang.IllegalArgumentException at 18:02:29.460 retried at 18:02:29.663 java.lang.IllegalArgumentException at 18:02:29.663 retried at 18:02:29.964 java.lang.IllegalArgumentException at 18:02:29.964
第一次重试延迟大约 100ms | |
---|---|
第二次重试延迟大约 200ms | |
第三次重试延迟大约 300ms |
publishOn()
?如 Schedulers 所述,publishOn()
能够用来切换执行线程。 publishOn
可以影响到其以后的操做符的执行线程,直到有新的 publishOn
出现。 因此 publishOn
的位置很重要。
好比下边的例子, map()
中的 transform
方法是在 scheduler1
的一个工做线程上执行的, 而 doOnNext()
中的 processNext
方法是在 scheduler2
的一个工做线程上执行的。 单线程的调度器可能用于对不一样阶段的任务或不一样的订阅者确保线程关联性。
EmitterProcessor<Integer> processor = EmitterProcessor.create(); processor.publishOn(scheduler1) .map(i -> transform(i)) .publishOn(scheduler2) .doOnNext(i -> processNext(i)) .subscribe();
reactor-extra
为知足 reactor-core
用户的更高级需求,提供了一些额外的操做符和工具。
因为这是一个单独的包,使用时须要明确它的依赖:
dependencies { compile 'io.projectreactor:reactor-core' compile 'io.projectreactor.addons:reactor-extra' }
添加 reactor-extra 的依赖。参考 获取 Reactor 了解为何使用BOM的状况下不须要指定 version。 | |
---|---|
TupleUtils
以及函数式接口在 Java 8 提供的函数式接口基础上,reactor.function
包又提供了一些支持 3 到 8 个值的 Function
、Predicate
和 Consumer
。
TupleUtils
提供的静态方法能够方便地用于将相应的 Tuple
函数式接口的 lambda 转换为更简单的接口。
这使得咱们在使用 Tuple
中各成员的时候更加容易,好比:
.map(tuple -> { String firstName = tuple.getT1(); String lastName = tuple.getT2(); String address = tuple.getT3(); return new Customer(firstName, lastName, address); });
能够用下面的方式代替:
.map(TupleUtils.function(Customer::new));
(由于 Customer 的构造方法符合 Consumer3 的函数式接口标签) |
|
---|---|
MathFlux
的数学操做符Treactor.math
包的 MathFlux
提供了一些用于数学计算的操做符,如 max
、min
、sumInt
、averageDouble
…
reactor.retry
包中有一些可以帮助实现 Flux#repeatWhen
和 Flux#retryWhen
的工具。入口点(entry points)就是 Repeat
和 Retry
接口的工厂方法。
两个接口均可用做可变的构建器(mutative builder),而且相应的实现(implementing) 均可做为 Function
用于对应的操做符。
Reactor-extra 提供了若干专用的调度器: - ForkJoinPoolScheduler
,位于 reactor.scheduler.forkjoin
包; - SwingScheduler
,位于 reactor.swing
包; - SwtScheduler
,位于 reactor.swing
包。