实战SpringCloud响应式微服务系列教程(第七章)

本章节继续介绍:Flux和Mono操做符(二)java

1.条件操做符

Reactor中经常使用的条件操做符有defaultIfRmpty、skipUntil、skipWhile、takeUntil和takeWhile等。app

一、defaultIfRmpty

defaultIfRmpty操做符返回来自原始数据流的元素,若是原始数据流中没有元素,则返回一个默认元素。函数

defaultIfRmpty操做符在实际开发过程当中应用普遍,一般用在对方法返回值的处理上。以下controller层对service层返回值的处理。微服务

@GetMapper("/article/{id}")
public Mono<ResponseEntity<Article>> findById(@PathVariable String id){
     return articleService.findOne(id)
               .map(ResponseEntity::ok)
               .defaultIfRmpty(ResponseEntity.status(404).body(null));
}

 

二、takeUntil

takeUntil操做符的基本用法是takeUntil(Predicate<? super T>> predicate),其中Predicate表明一种断言条件,takeUntil将提取元素直到断言条件返回true。工具

示例代码以下:url

Flux.range(1,100).takeUntil(i -> i == 10).subscribe(System.out::println);

 

三、takeWhile

takeWhile操做符的基本用法是takeWhile(Predicate<? super T>> continuePredicate),其中continuePredicate也表明一种断言条件。与takeUntil不一样的是,takeWhile会在continuePredicate条件返回true时才进行元素的提取。spa

示例代码以下:3d

Flux.range(1,100).takeWhile(i -> i <= 10).subscribe(System.out::println);

 

四、skipUntil

与takeUntil相对应,skipUntil的基本用法是skipUntil(Predicate<? super T>> predicate)。skipUntil将丢弃原始数据中的元素,直到Predicate返回true。code

五、skipWhile

与takeWhile相对应,skipWhile操做符的基本用法是skipWhile(Predicate<? super T>> continuePredicate)。当continuePredicate返回true时才进行元素的丢弃。对象

2.数学操做符

Reactor中经常使用的数学操做符有concat、count、reduce等。

一、concat

concat用来合并来自不一样Flux的数据,这种合并采用的是顺序的方式。

二、count

count操做符比较简单,用来统计Flux中元素的个数。

三、reduce

reduce操做符对流中包含的全部元素进行累积操做,获得一个包含计算结果的Mono序列。具体的累计操做也是经过一个BiFunction来实现的。

示例代码以下:

Flux.range(1,10).reduce((x,y) -> x+y).subscribe(System.out::println);

 

这里BiFunction就是一个求和函数,用来对1到10的数字进行求和,运行结果为55。

与其相似的还有一个reduceWith。

示例代码以下:

Flux.range(1,10).reduceWith(() - >5,(x,y) -> x+y).subscribe(System.out::println);

 

这里使用5来初始化求和过程,获得的结果是60。

3.Observable工具操做符

Reactor中经常使用的Observable操做符有delay、subscribe、timeout等。

一、delay

delay将时间的传递向后延迟一段时间。

二、subscribe

在前面的代码演示了subscribe操做符的用法,咱们能够经过subscribe()方法来添加相应的订阅逻辑。

在前面章节中咱们提到了Reactor中的消息类型有三种,即正常消息,异常消息和完成消息。subscribe操做符能够只处理其中包含的正常消息,也能够同时处理异常消息和完成消息。当咱们用subscribe处理异常消息时能够采用如下方式。

Mono.just(100)
         .conacatWith(Mono.error(new IllegalStateException()))
         .subscribe(System.out::println,System.err::println);

 

以上代码执行结果以下,咱们获得了一个100,同时也获取了IllegalStateExxeption这个异常。

100
java.lang.IllegalStateExxeption

有时候咱们不想直接抛出异常,而是想采用一个容错策略来返回一个默认值,就能够采用如下方式。

Mono.just(100)
         .conacatWith(Mono.error(new IllegalStateException()))
         .onErrorReturn(0)
         .subscribe(System.out::println);

 

以上代码执行结果以下。当产生异常时,使用onErrorReturn()方法返回一个默认值0.

100
0

另外容错策略也是经过switchOnError()方法使用另外的流产生元素。如下代码示例演示了这种策略。

与上面的执行结果相同。

Mono.just(100)
         .conacatWith(Mono.error(new IllegalStateException()))
         .switchOnError(Mono.just(0))
         .subscribe(System.out::println);

 

三、timeout

timeout操做符维持原始被观察者的状态,在特定时间内没有产生任何事件时,将生成一个异常。

四、block

block操做符在没有接收到下一个元素以前一直被阻塞。block操做符一般用来把响应式的数据流转换成传统的数据流。

例如,使用以下方法时,咱们分别将Flux数据流和Mono数据流转变成了普通的List<Order>对象和单个Order对象,一样也能够设置block的等待时间。

public List<Order> getAllOrder(){
      return orderService.getAllOrders().block(Duration.ofSecond(5));
}

public Order getOrderById(Long orderId){
      return orderService.getOrderById(orderId).block(Duration.ofSecond(2));
}

往期

实战SpringCloud响应式微服务系列教程(第一章)

实战SpringCloud响应式微服务系列教程(第二章)

实战SpringCloud响应式微服务系列教程(第三章)

实战SpringCloud响应式微服务系列教程(第四章)

实战SpringCloud响应式微服务系列教程(第五章)

实战SpringCloud响应式微服务系列教程(第六章)

相关文章
相关标签/搜索