Spring5.2.2 WebFlux之WebClient

    Spring WebFlux为HTTP请求提供了一个reactive的、非阻塞的WebClient 。客户端有一个功能性的、流畅的API,其中包含用于声明性合成的反应式类型,请参阅反应式库。WebFlux客户端和服务器依赖相同的非阻塞编解码器对请求和响应内容进行编码和解码。javascript

      内部WebClient 委托给HTTP客户端库。默认状况下,它使用Reactor Netty,内置了对Jetty reactive HttpClient的支持,其余的能够经过ClientHttpConnector插入。css

一、配置java

     建立WebClient 的最简单方法是经过静态工厂方法之一:react

      WebClient.create()nginx

      WebClient.create(String baseUrl)web

  以上方法使用Reactor Netty HttpClient 和expect的默认设置io.projectreactor.netty:reactor-netty在类路径上。spring

你也可使用WebClient.builder()有更多方式:json

  • uriBuilderFactory:自定义的UriBuilderFactory 用做基本的URL。swift

  • defaultHeader:每一个请求的头。ruby

  • defaultCookie:每一个请求的Cookies。

  • defaultRequestConsumer 定制每一个请求。

  • filter:每一个请求的客户端过滤器。

  • exchangeStrategies: HTTP消息读取器/编写器自定义。

  • clientConnector:HTTP客户端库设置。

如下示例配置HTTP编解码器:

WebClient client = WebClient.builder() .exchangeStrategies(builder -> { return builder.codecs(codecConfigurer -> { //... }); }) .build();

    一旦构建,WebClient 实例是不可变的。可是,能够在不影响原始实例的状况下克隆它并生成修改后的副本,以下例所示:

WebClient client1 = WebClient.builder() .filter(filterA).filter(filterB).build();
WebClient client2 = client1.mutate() .filter(filterC).filter(filterD).build();
// client1 有filterA, filterB
// client2 有 filterA, filterB, filterC, filterD


1.一、最大内存大小

    Spring WebFlux配置了在编解码器中缓冲内存数据的限制,以免应用程序内存问题。默认状况下,它被配置为256KB,若是这对于你的用例来讲还不够,你将看到如下内容:

org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer

你可使用如下代码示例在全部默认编解码器上配置此限制:

WebClient webClient = WebClient.builder() .exchangeStrategies(builder -> builder.codecs(codecs -> codecs.defaultCodecs().maxInMemorySize(2 * 1024 * 1024) ) ) .build();

1.三、Reactor Netty

    要自定义Reactor Netty设置,只需提供预配置的HttpClient

HttpClient httpClient = HttpClient.create().secure(sslSpec -> ...);
WebClient webClient = WebClient.builder() .clientConnector(new ReactorClientHttpConnector(httpClient)) .build();

Resources

    默认状况下,HttpClient 参与持有的全局reactor.netty.http.HttpResources,包括事件循环线程和链接池。这是推荐的模式,由于固定的共享资源是事件循环并发的首选。在此模式下,全局资源将保持活动状态,直到进程退出。

        若是服务器与进程同步,则一般不须要显式关闭。可是,若是服务器能够在进程内启动或中止(例如,部署为WAR的Spring MVC应用程序),则可使用globalResources=true(默认值)声明一个Spring管理的bean,ReactorResourceFactory 类型为Reactor Netty global resources,以确保在Spring ApplicationContext 关闭时Reactor Netty全局资源,以下例所示:

@Beanpublic ReactorResourceFactory reactorResourceFactory() { return new ReactorResourceFactory();}

    你也能够选择不参与全局 Reactor Netty资源。可是,在这种模式下,确保全部Reactor Netty客户端和服务端实例使用共享资源的负担就在你身上,以下例所示:

@Beanpublic ReactorResourceFactory resourceFactory() { ReactorResourceFactory factory = new ReactorResourceFactory();    factory.setUseGlobalResources(false); //建立独立于全局资源的资源。 return factory;}
@Beanpublic WebClient webClient() {
Function<HttpClient, HttpClient> mapper = client -> { // 进一步自定义 };//将ReactorClientHttpConnector构造方法与资源工厂一块儿使用。 ClientHttpConnector connector = new ReactorClientHttpConnector(resourceFactory(), mapper);  //连接WebClient.Builder. return WebClient.builder().clientConnector(connector).build(); }

1.四、超时

       要配置链接超时:

import io.netty.channel.ChannelOption;
HttpClient httpClient = HttpClient.create() .tcpConfiguration(client -> client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000));

要配置读和/或写超时值:

import io.netty.handler.timeout.ReadTimeoutHandler;import io.netty.handler.timeout.WriteTimeoutHandler;
HttpClient httpClient = HttpClient.create() .tcpConfiguration(client -> client.doOnConnected(conn -> conn .addHandlerLast(new ReadTimeoutHandler(10)) .addHandlerLast(new WriteTimeoutHandler(10))));

1.五、Jetty

      如下示例显示如何自定义Jetty HttpClient 设置:

HttpClient httpClient = new HttpClient();httpClient.setCookieStore(...);ClientHttpConnector connector = new JettyClientHttpConnector(httpClient);
WebClient webClient = WebClient.builder().clientConnector(connector).build();

默认状况下,HttpClient 建立本身的资源(ExecutorByteBufferPoolScheduler),这些资源在进程退出或调用stop()以前保持活动状态。

      你能够在Jetty客户端(和服务端)的多个实例之间共享资源,并经过声明JettyResourceFactory类型的Spring托管bean来确保在Spring ApplicationContext 关闭时关闭资源,以下例所示:

@Beanpublic JettyResourceFactory resourceFactory() { return new JettyResourceFactory();}
@Beanpublic WebClient webClient() {
HttpClient httpClient = new HttpClient(); // 进一步自定义   //将JettyClientHttpConnector构造方法与资源工厂一块儿使用。 ClientHttpConnector connector = new JettyClientHttpConnector(httpClient, resourceFactory());    //链接WebClient.Builder. return WebClient.builder().clientConnector(connector).build(); }

二、retrieve()

    retrieve()方法是获取响应体并对其进行解码的最简单方法。下面的示例演示如何执行此操做:

WebClient client = WebClient.create("https://example.org");
Mono<Person> result = client.get() .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(Person.class);

你还能够从响应中得到解码的对象流,以下例所示:

Flux<Quote> result = client.get() .uri("/quotes").accept(MediaType.TEXT_EVENT_STREAM) .retrieve() .bodyToFlux(Quote.class);

    默认状况下,带有4xx或5xx状态代码的响应将致使WebClientResponseException 或其HTTP状态特定的子类之一,如WebClientResponseException.BadRequestWebClientResponseException.NotFound,以及其余。也可使用onStatus 方法自定义生成的异常,以下例所示:

Mono<Person> result = client.get() .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON) .retrieve() .onStatus(HttpStatus::is4xxClientError, response -> ...) .onStatus(HttpStatus::is5xxServerError, response -> ...) .bodyToMono(Person.class);

使用onStatus 时,若是预期响应包含内容,则onStatus 回调应使用它。不然,内容将被自动清空,以确保资源被释放。

三、exchange()

     exchange()方法比retrieve 方法提供更多的控制。如下示例至关于retrieve(),但也提供了对ClientResponse的访问:

Mono<Person> result = client.get() .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON) .exchange() .flatMap(response -> response.bodyToMono(Person.class));

如此的话,你还能够建立彻底响应:

Mono<ResponseEntity<Person>> result = client.get() .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON) .exchange() .flatMap(response -> response.toEntity(Person.class));

      请注意(与retrieve())不一样,对于exchange(),4xx和5xx响应没有自动错误信号。必须检查状态代码并决定如何继续。

      使用exchange()时,必须确保始终使用或释放主体,即便发生异常(请参见使用DataBuffer)。一般,经过调用ClientResponse 上的bodyTo*toEntity*来将body转换为所需类型的对象,但也能够调用releaseBody()放弃正文内容而不使用它,或调用oBodilessEntity()只获取状态和头(同时丢弃正文)。

     最后,还有bodyToMono(Void.class),只有在不须要响应内容时才应使用。若是响应确实包含内容,则链接将关闭,而且不会放回池中,由于它不会处于可重用状态。

四、Request Body

     请求主体能够从ReactiveAdapterRegistry处理的任何异步类型进行编码,如Mono 协同程序Deferred ,以下例所示:

Mono<Person> personMono = ... ;
Mono<Void> result = client.post() .uri("/persons/{id}", id) .contentType(MediaType.APPLICATION_JSON) .body(personMono, Person.class) .retrieve() .bodyToMono(Void.class);

还能够对对象流进行编码,以下例所示:

Flux<Person> personFlux = ... ;
Mono<Void> result = client.post() .uri("/persons/{id}", id) .contentType(MediaType.APPLICATION_STREAM_JSON) .body(personFlux, Person.class) .retrieve() .bodyToMono(Void.class);

或者,若是可使用下面的body值,则可使用下面的方法:

Person person = ... ;
Mono<Void> result = client.post() .uri("/persons/{id}", id) .contentType(MediaType.APPLICATION_JSON) .bodyValue(person) .retrieve() .bodyToMono(Void.class);

表单数据

      要发送表单数据,能够提供MultiValueMap<String, String>做为主体。注意,内容被FormHttpMessageWriter自动设置为application/x-www-form-urlencoded。下面的示例演示如何使用MultiValueMap<String, String>

MultiValueMap<String, String> formData = ... ;
Mono<Void> result = client.post() .uri("/path", id) .bodyValue(formData) .retrieve() .bodyToMono(Void.class);

你还可使用BodyInserters以联机方式提供表单数据,以下例所示:

import static org.springframework.web.reactive.function.BodyInserters.*;
Mono<Void> result = client.post() .uri("/path", id) .body(fromFormData("k1", "v1").with("k2", "v2")) .retrieve() .bodyToMono(Void.class);

Multipart 数据

        要发送多部分数据,你须要提供MultiValueMap<String, ?>其值能够是表示部件内容的Object 实例,也能够是表示部件内容和头的HttpEntity 实例。MultipartBodyBuilder 提供了一个方便的API来准备多部分请求。下面的示例演示如何建立MultiValueMap<String, ?>:

MultipartBodyBuilder builder = new MultipartBodyBuilder();builder.part("fieldPart", "fieldValue");builder.part("filePart1", new FileSystemResource("...logo.png"));builder.part("jsonPart", new Person("Jason"));builder.part("myPart", part); // 来自服务器请求的部分
MultiValueMap<String, HttpEntity<?>> parts = builder.build();

     在大多数状况下,你没必要为每一个部分指定Content-TypeContent-Type是根据选择序列化它的HttpMessageWriter 自动肯定的,若是是资源,则根据文件扩展名肯定。若有必要,你能够经过重载的构建器part 方法之一显式地为每一个部件提供要使用的MediaType 

      一旦准备好MultiValueMap ,将其传递给WebClient 的最简单方法是经过body 方法,以下例所示:

MultipartBodyBuilder builder = ...;
Mono<Void> result = client.post() .uri("/path", id) .body(builder.build()) .retrieve() .bodyToMono(Void.class);

      若是MultiValueMap 至少包含一个非字符串值(也就是说,application/x-www-form-urlencoded),则无需将Content-Type设置为multipart/form-data。使用MultipartBodyBuilder时老是这样,它确保了HttpEntity 封装器。

      做为MultipartBodyBuilder的替代,你还能够经过内置的BodyInserters提供多部份内容,内联样式,以下例所示:

import static org.springframework.web.reactive.function.BodyInserters.*;
Mono<Void> result = client.post() .uri("/path", id) .body(fromMultipartData("fieldPart", "value").with("filePart", resource)) .retrieve() .bodyToMono(Void.class);

五、客户端Filters

       你能够经过注册客户端筛选器(ExchangeFilterFunctionWebClient.Builder为了拦截和修改请求,以下例所示:

WebClient client = WebClient.builder() .filter((request, next) -> {
ClientRequest filtered = ClientRequest.from(request) .header("foo", "bar") .build();
return next.exchange(filtered); }) .build();

      这能够用于交叉关注点,例如身份验证。如下示例使用筛选器经过静态工厂方法进行基自己份验证:

import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
WebClient client = WebClient.builder() .filter(basicAuthentication("user", "password")) .build();

     过滤器将全局应用于每一个请求。要更改特定请求的筛选器行为,能够向ClientRequest 添加请求属性,而后链中的全部筛选器均可以访问这些属性,以下例所示:

WebClient client = WebClient.builder() .filter((request, next) -> { Optional<Object> usr = request.attribute("myAttribute"); // ... }) .build();
client.get().uri("https://example.org/") .attribute("myAttribute", "...") .retrieve() .bodyToMono(Void.class);
}

      你还能够复制现有的WebClient、插入新的筛选器或删除已注册的筛选器。如下示例在索引0处插入基自己份验证筛选器:

import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
WebClient client = webClient.mutate() .filters(filterList -> { filterList.add(0, basicAuthentication("user", "password")); }) .build();

六、同步使用

     WebClient 能够同步方式使用,方法是在末尾阻塞,结果是:

Person person = client.get().uri("/person/{id}", i).retrieve() .bodyToMono(Person.class) .block();
List<Person> persons = client.get().uri("/persons").retrieve() .bodyToFlux(Person.class) .collectList() .block();

      可是,若是须要进行多个调用,则更有效的方法是避免对每一个响应单独进行阻塞,而是等待组合结果:

Mono<Person> personMono = client.get().uri("/person/{id}", personId) .retrieve().bodyToMono(Person.class);
Mono<List<Hobby>> hobbiesMono = client.get().uri("/person/{id}/hobbies", personId) .retrieve().bodyToFlux(Hobby.class).collectList();
Map<String, Object> data = Mono.zip(personMono, hobbiesMono, (person, hobbies) -> { Map<String, String> map = new LinkedHashMap<>(); map.put("person", person); map.put("hobbies", hobbies); return map; }) .block();

     以上只是一个例子。还有许多其余模式和运算符能够用来组合一个响应式管道,该管道能够进行许多远程调用,可能有些是嵌套的、相互依赖的,直到最后都不会阻塞。

      对于Flux Mono,你永远没必要阻塞Spring MVCSpring WebFlux 控制器。只需从controller方法返回获得的reactive类型,只需在控制器方法中使用挂起函数或返回Flow 

欢迎关注和转发Spring中文社区(加微信群,能够关注后加我微信):

本文分享自微信公众号 - Spring中文社区(gh_81d233bb13a4)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索