本系列为本人Java编程方法论 响应式解读系列的Webflux
部分,现分享出来,前置知识Rxjava2 ,Reactor的相关解读已经录制分享视频,并发布在b站,地址以下:java
Rxjava源码解读与分享:www.bilibili.com/video/av345…react
Reactor源码解读与分享:www.bilibili.com/video/av353…程序员
NIO源码解读相关视频分享: www.bilibili.com/video/av432…web
NIO源码解读视频相关配套文章:数据库
BIO到NIO源码的一些事儿之NIO 下 之 Selector BIO到NIO源码的一些事儿之NIO 下 Buffer解读 上 BIO到NIO源码的一些事儿之NIO 下 Buffer解读 下服务器
Java编程方法论-Spring WebFlux篇 01 为何须要Spring WebFlux 上网络
其中,Rxjava与Reactor做为本人书中内容将不对外开放,你们感兴趣能够花点时间来观看视频,本人对着两个库进行了全面完全细致的解读,包括其中的设计理念和相关的方法论,也但愿你们能够留言纠正我其中的错误。
随着Servlet 3.1
的引入,经过Spring MVC
便可以实现非阻塞行为。 可是,因为Servlet API
依然包含几个阻塞的接口。一样,咱们在应用程序设计的API中也可能会使用到阻塞,而该API原本是被设定为非阻塞。 在这种状况下,相关阻塞API的使用确定会下降应用程序性能。 咱们来看下面这段代码:
@GetMapping
void onResponse(){
try{
//some logic here
}catch(Exception e){
//sendError() is a blocking API
response.sendError(500);
}
}
复制代码
这段代码使用在Spring MVC中,Spring容器针对这个错误而对相应页面的渲染则是阻塞的。以下:
@Controller
public class MyCustomErrorController implements ErrorController {
@RequestMapping(path = "/error")
public String greeting() {
return "myerror";
}
@Override
public String getErrorPath() {
return "/error";
}
}
复制代码
此处渲染的页面为myerror.jsp
,具体代码就不贴了。固然,咱们确定有办法来异步解决这个错误处理问题,但咱们出错的可能性就会变大,要知道,咱们最终仍是要通过Servlet
对象的,而Servlet
相关api有阻塞的也有非阻塞的,咱们来经过一张图来方便理解。
当产生请求访问时事件时,则该事件处理流向如上图所示(咱们只关注进入到Servlet容器的处理阶段),能够知道,这个过程尤为是Filter链这里,都是能够发生IO阻塞的,再根据上一节所讲内容,咱们可使用一张图来展现咱们能够肯定的非阻塞IO。
Spring MVC
中在所写代码逻辑中作到完美的无阻塞,咱们依然没法改变与避免
Servlet 3.1+
中那些架构设计层面的缺陷,
Servlet
的相关阻塞API咱们依然会用到。那么咱们是否是可使用netty来避免这样的情形?因而咱们就能够将目光放到
Spring WebFlux
之上。
咱们业务端来说,绝大多数程序员对于并发的操做并不在行的,也就很难写出性能很好并且符合规范的代码,这也形成了在Spring web MVC
下,咱们很难针对本身的业务进行合理的异步化操做。好比,咱们每每会将I/O
操做与当前执行线程进行绑定到一块儿,也就是生产和消费两种业务绑定在一块儿,这样,即使咱们异步,二者也是在同一个线程中进行,这样,假如并发量很大的状况下,异步化会产生大量的线程,CPU
会在切换线程上消耗更多的性能,这是咱们所不肯看到的,而RxJava
和Reactor
给咱们提供了很好的调度API
,如Reactor中的publishOn
,RxJava中的observeOn
,能够保证咱们将生产和消费分离,同时,做为生产或消费线程所在的线程池,其每每是针对于使用了这个线程池的多个订阅服务,这样,每个线程均可能同时为多个订阅关系服务,一个单独的订阅关系并不会一直占有这个线程,当有元素下发时,将会根据订阅者请求数量和元素产生的速度以及是否有多个线程在处理此订阅关系的下发元素,使用调度器的话,这里拿Reactor中的publishOn
来说,当上游只支持同步的话(FluxPublishOn.PublishOnSubscriber#onSubscribe
内调用源的requestFusion
方法判断),那就始终在同一个线程内消费(FluxPublishOn.PublishOnSubscriber#trySchedule
内进行判断,经过WIP
控制),当咱们定义好publishOn
中队列大小后,每当队列内元素消耗完毕,而后上游元素产生太慢,就会跳出当前消费线程,直到有新元素下发时,就再次从线程池中拿到一个线程消费。读者假如此处有疑问,请回顾本书以前内容(因书并未出版,可回顾本人相关分享视频)。 这样服务器的性能就能够获得最大程度的利用。这个咱们在Spring MVC
中确实很难自行实现,比较复杂。 另外,经过Reactor
对于背压的实现,咱们能够作到相似消息中间件对于消息的积压,不至于数据在网络传输的过程当中丢失,这样就能够更好的应对高并发场景下的访问需求。 接下来,咱们就来对Webflux
下的背压使用进行一波大体的说明。
为了帮助理解Backpressure
在WebFlux
使用时底层的工做原理,咱们有必要回顾一下默认使用的TCP/IP
传输层。咱们知道,浏览器和服务器之间的正常通讯(服务器到服务器之间的通讯一般也是同样)是经过TCP
链接完成的(一样包括WebFlux
中的WebClient
和服务器之间的通讯)。同时,咱们会从Reactive Streams
规范的角度来回顾一下背压的含义,以便更好的针对背压进行控制。
在Reactive Streams
中,背压包括两部分,一部分是接收端的消息积压,另外一部分是消费者能够经过发出通知来表达该消费者能够消耗多少元素,以此来进行需求调节。整个过程是操做的元素对象,那么,在这里,咱们就碰到一个棘手的问题:TCP
是针对字节抽象而不是逻辑元素抽象。 咱们一般所说的背压控制是指制向或者从网络发送或接收的逻辑元素的数量。而TCP本身的流程控制是基于字节而不是逻辑元素。
由上,可知道,在WebFlux
的实现中,背压经过数据传输流程控制来调节,但它不会暴露接收方的实际需求。 咱们能够经过下图来观察其中的交互流程:
上图显示了两个微服务之间的通讯,其中左侧发送数据流,右侧对该流进行消费。接下来对上图整个过程进行简要说明:
WebFlux
中,它将逻辑对象元素转换为字节流并将它们传输到TCP网络或从TCP网络接收字节流并转换为逻辑对象元素。正如咱们从上图中能够看到的那样,接收者的需求与发送者的需求不一样(这里指图中的request
请求的逻辑元素)。这也就意味着二者的需求是相互独立的,也就是说,在WebFlux中,咱们能够经过业务逻辑(服务)交互来展示需求,但不多会暴露服务A与服务B交互的相关背压细节。 也就是说,webflux
中的背压设计并无对数据发送服务端进行按需设计,这点可能与咱们所指望的有所出入,不是那么完美,显得有失公平。
若是咱们想很简单的对背压进行控制,咱们能够经过Reactor
的相关操做来控制请求数量,也能够在自定义订阅者的时候进行限定,这里咱们经过Flux
下的limitRate(n)
来实现。首先咱们先来看下其实现思路,其实就是一个调度操做,只不过咱们以前有讲,publishOn
本身是一个中间存储站,它将上下游进行分离下游的请求数量在这里进行管理,publishOn
本身有一个每次向上游请求的数量限制,关于publishOn
操做源码细节,能够回顾以前相关章节内容(因书并未出版,可回顾本人相关分享视频)。也就是说,咱们只须要在publishOn
之上封装一个API
来实现便可:
//reactor.core.publisher.Flux#limitRate(int)
public final Flux<T> limitRate(int prefetchRate) {
return onAssembly(this.publishOn(Schedulers.immediate(), prefetchRate));
}
复制代码
假如咱们有一个包含questions
的源,由于解决问题的能力有限,想要对其进行限流,因而咱们就能够进行以下操做:
@PostMapping("/questions")
public Mono<Void> postAllQuestions(Flux<Question> questionsFlux) {
return questionService.process(questionsFlux.limitRate(10))
.then();
}
复制代码
咱们熟悉publishOn
后,能够知道limitRate()
操做会首先从上游获取10个元素存到其内定义的队列中。这意味着即便咱们定义的订阅者所设定的请求元素数量为Long.MAX_VALUE
,limitRate
操做也会将此需求拆分为一块一块去请求下发。此处涉及的源码以下,你们可对照理解:
//reactor.core.publisher.FluxPublishOn.PublishOnSubscriber#runAsync
if (e == limit) {
if (r != Long.MAX_VALUE) {
r = REQUESTED.addAndGet(this, -e);
}
s.request(e);
e = 0L;
}
复制代码
上面是提交的数据的分块处理,咱们有时候会涉及到数据库请求数据的处理,好比查询,同时将所发送数据进行限流逐步发送,能够进行以下操做:
@GetMapping("/questions")
public Flux<Question> getAllQuestions() {
return questionService.retreiveAll()
.limitRate(10);
}
复制代码
由此,咱们也能理解背压在webflux
中的做用机制了。对于这些特性,Spring MVC
也就很难提供了。
相信你们也明确感觉到了使用Spring WebFlux
的好处了,也知道为什么会要求使用Servlet 3.1+
,同时对于webflux
中背压的做用有了更清晰的认知。不过,咱们须要注意的是,经过官方文档可知,Spring Webflux
能够在Servlet Container
或Netty
上运行,而本书更关心Spring Webflux
基于Netty
服务器的运行。那么,接下来,咱们将接触Reactor-netty
的内在细节。