(二)Flux入门学习:流的概念,特性和基本操做react
(三)Flux深刻学习:流的高级特性和进阶用法web
(四)reactor-core响应式api如何测试和调试?数据库
(五)Spring reactive: Spring WebFlux的使用编程
(六)Spring reactive: webClient的使用api
Spring framework 5 的一大新特性:响应式编程(Reactive Programming)。那么什么是响应式?他能给咱们带来什么?如何优雅地使用?本系列会从最基础的概念和简单的api讲起,再慢慢深刻探讨响应式的一些高级特性,最后讲解实战内容,例如WebFlux和WebClient等在Spring boot中的使用,如何测试和调试。网络
想要了解原理的话,美团点评的这篇博客 Java NIO浅析 很是适合入门。多线程
简单地说:异步
当咱们调用socket.read()、socket.write()这类阻塞函数的时候,这类函数不能当即返回,也没法中断,须要等待socket可读或者可写,才会返回,所以一个线程只能处理一个请求。在这等待的过程当中,cpu并不干活,(即阻塞住了),那么cpu的资源就没有很好地利用起来。所以对于这种状况,咱们使用多线程来提升cpu资源的利用率:在等待的这段时间,就能够切换到别的线程去处理事件,直到socket可读或可写了,经过中断信号通知cpu,再切换回来继续处理数据。例如线程A正在等待socket可读,而线程B已经就绪了,那么就能够先切换到线程B去处理。虽然上下文切换也会花一些时间,可是远比阻塞在线程A这里空等要好。固然计算机内部实际的状况比这复杂得多。socket
而NIO的读写函数能够马上返回,这就给了咱们不开线程利用CPU的最好机会:若是一个链接不能读写(socket.read()返回0或者socket.write()返回0),咱们能够把这件事记下来。所以只须要一个线程不断地轮询这些事件,一旦有就绪的时间,处理便可。不须要多线程。
阻塞型IO
非阻塞型IO
响应式编程就是基于reactor的思想,当你作一个带有必定延迟的才可以返回的io操做时,不会阻塞,而是马上返回一个流,而且订阅这个流,当这个流上产生了返回数据,能够马上获得通知并调用回调函数处理数据。
咱们首先须要理解响应式编程的基本模型:
Reactor中的发布者(Publisher)由Flux和Mono两个类定义,它们都提供了丰富的操做符(operator)。一个Flux对象表明一个包含0..N个元素的响应式序列,元素能够是普通对象、数据库查询的结果、http响应体,甚至是异常。而一个Mono对象表明一个包含零/一个(0..1)元素的结果。上图就是一个Flux类型的数据流,Flux往流上发送了3个元素,Subscriber经过订阅这个流来接收通知。
如何建立一个流?最简单的方式有如下几种:
//建立一个流,并直接往流上发布一个值为value数据 Flux.just(value); //经过list建立一个流,往流上依次发布list中的数据 Flux.fromIterable(list); //建立一个流,并向流上从i开始连续发布n个数据,数据类型为Integer Flux.range(i, n); //建立一个流,并定时向流上发布一个数据,数据从0开始递增,数据类型为Long Flux.interval(Duration.ofSeconds(n));
既然是“数据流”的发布者,Flux和Mono均可以发出三种“数据信号”:元素值、错误信号、完成信号,错误信号和完成信号都是终止信号,完成信号用于告知下游订阅者该数据流正常结束,错误信号终止数据流的同时将错误传递给下游订阅者。
subscriber是一个订阅者,他只有很是简单的4个接口:
public interface Subscriber<T> { void onSubscribe(Subscription var1); //收到下一个元素值信号时的行为 void onNext(T var1); //收到错误信号时的行为 void onError(Throwable var1); //收到终止信号时的行为 void onComplete(); }
Subscriber必需要订阅一个Flux才可以接收通知:
flux.subscribe( value -> handleData(value), error -> handleError(error), () -> handleComplete() );
上面这个例子经过lambda表达式,定义了Subscriber分别在收到消息,收到错误,和消息流结束时的行为,当Subscriber接收到一个新数据,就会异步地执行handleData方法处理数据。
接下来咱们建立几个最简单的流来试一下:
首先咱们新建一个maven项目,引入reactor的类库:
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.2.3.RELEASE</version> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <version>3.2.3.RELEASE</version> <scope>test</scope> </dependency> </dependencies>
编写代码以下:
public class ReactorTests { @After public void after() { sleep(30_000); } @Test public void testJust() { Flux.just("hello", "world") .subscribe(System.out::println); } @Test public void testList() { List<String> words = Arrays.asList( "hello", "reactive", "world" ); Flux.fromIterable(words) .subscribe(System.out::println); } @Test public void testRange() { Flux.range(1, 10) .subscribe(System.out::println); } @Test public void testInterval() { Flux.interval(Duration.ofSeconds(1)) .subscribe(System.out::println); } }
订阅这些流,收到数据以后只是简单地把它打印出来,运行这些Test,就可以看到订阅者在接收到流上的数据时,异步地去处理这些数据。