上篇文章咱们将 Flux 和 Mono 的操做符分了 11 类,咱们来继续学习转换类操做符的第 2 篇。java
转换类的操做符数量最多,日常过程当中也是使用最频繁的。react
将响应式流中元素顺序转换为目标类型的响应式流,以后再将这些流链接起来。该方法提供了 2 个重载方法,传递的第 2 个参数为内部生成响应式流的预取数量。见图知意:git
Flux.range(3, 8) .concatMap(n -> Flux.just(n - 10, n, n + 10), 3) .subscribe(System.out::println);
concatMapDelayError 和 concatMap 区别在于,当内部生成响应式流发出 error 时,是否延迟响应 error 。该方法提供了 3 个重载方法,支持传递参数:是否延迟发出错误和预取数量。github
Flux.range(3, 8) .concatMapDelayError(n -> { if (n == 4) { return Flux.error(new NullPointerException()); } return Flux.just(n - 10, n, n + 10); }) .subscribe(System.out::println, System.err::println);
concatIterable 和 concatMap 的区别在于 内部返回的类型不一样,一个为 Iterable, 一个为 响应式流。见图知意:web
Flux.range(3, 8) .publishOn(Schedulers.single()) .concatMapIterable(n -> { if (n == 4) { throw new NullPointerException(); } return Arrays.asList(n - 10, n, n + 10); }) .onErrorContinue((e, n) -> System.err.println("数据:" + n + ",发生错误:" + e)) .subscribe(System.out::println);
收集响应式流中元素的间隔发出时间,转换为 时间间隔 和 旧元素 组成的 Tuple2 的响应式流。见图知意:spring
Flux.interval(Duration.ofMillis(300)) .take(20) .elapsed(Schedulers.parallel()) .subscribe(System.out::println); Thread.sleep(7000);
从上层节点逐层展开方式递归展开树形节点。学习
Flux.just(16, 18, 20) .expand(n -> { if (n % 2 == 0) { return Flux.just(n / 2); } else { return Flux.empty(); } }) .subscribe(System.out::println);
从上层节点逐个展开方式递归展开树形节点。expand 和 expandDeep 的区别在于展开方式不一样,另外它俩都提供了 capacityHint 指定递归时初始化容器的容量。测试
Flux.just(16, 18, 20) .expandDeep(n -> { if (n % 2 == 0) { return Flux.just(n / 2); } else { return Flux.empty(); } }) .subscribe(System.out::println);
本篇咱们介绍了 Reactor 部分的转换类操做符,讲解示例时都是单个操做符,相信你们都能理解。code
因为最近学习时间不肯定,内容比较少。不管工做仍是生活的困难,咱们只要坚持,终将会被克服解决。今天的内容就学到这里,咱们下篇继续学习 Reactor 的操做符。orm
源码详见:https://github.com/crystalxmumu/spring-web-flux-study-note 下 02-reactor-core-learning 模块下 ReactorTransformOperator02Test 测试类。