Spring Boot中使用RSocket

1. 概述

RSocket应用层协议支持 Reactive Streams语义, 例如:用RSocket做为HTTP的一种替代方案。在本教程中, 咱们将看到RSocket用在spring boot中,特别是spring boot 如何帮助抽象出更低级别的RSocket API。java

2. 依赖

让咱们从添加spring-boot-starter-rsocket依赖开始:git

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-rsocket</artifactId> </dependency> 

这个依赖会传递性的拉取RSocket相关的依赖,好比:rsocket-core 和 rsocket-transport-nettygithub

3.示例的应用程序

如今继续咱们的简单应用程序。为了突出RSocket提供的交互模式,我打算建立一个交易应用程序, 交易应用程序包括客户端和服务器。web

3.1. 服务器设置

首先,咱们设置由springboot应用程序引导的RSocket server服务器。 由于有spring-boot-starter-rsocket dependency依赖,因此springboot会自动配置RSocket server。 跟日常同样, 能够用属性驱动的方式修改RSocket server默认配置值。例如:经过增长以下配置在application.properties中,来修改RSocket端口spring

spring.rsocket.server.port=7000

也能够根据须要进一步修改服务器的其余属性springboot

3.2.设置客户端

接下来,咱们来设置客户端,也是一个springboot应用程序。虽然springboot自动配置大部分RSocket相关的组件,但还要自定义一些对象来完成设置。bash

@Configuration
public class ClientConfiguration {

    @Bean
    public RSocket rSocket() { return RSocketFactory .connect() .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE) .frameDecoder(PayloadDecoder.ZERO_COPY) .transport(TcpClientTransport.create(7000)) .start() .block(); } @Bean RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) { return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, rSocketStrategies); } } 

这儿咱们正在建立RSocket客户端而且配置TCP端口为:7000。注意: 该服务端口咱们在前面已经配置过。 接下来咱们定义了一个RSocket的装饰器对象RSocketRequester。 这个对象在咱们跟RSocket server交互时会为咱们提供帮助。 定义这些对象配置后,咱们还只是有了一个骨架。在接下来,咱们将暴露不一样的交互模式, 并看看springboot在这个地方提供帮助的。服务器

4. springboot RSocket 中的 Request/Response

咱们从Request/Response开始,HTTP也使用这种通讯方式,这也是最多见的、最类似的交互模式。 在这种交互模式里, 由客户端初始化通讯并发送一个请求。以后,服务器端执行操做并返回一个响应给客户端--这时通讯完成。 在咱们的交易应用程序里, 一个客户端询问一个给定的股票的当前的市场数据。 做为回复,服务器会传递请求的数据。并发

4.1.服务器

在服务器这边,咱们首先应该建立一个controller 来持有咱们的处理器方法。 咱们会使用 @MessageMapping注解来代替像SpringMVC中的@RequestMapping或者@GetMapping注解app

@Controller
public class MarketDataRSocketController {

    private final MarketDataRepository marketDataRepository;

    public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
        this.marketDataRepository = marketDataRepository;
    }

    @MessageMapping("currentMarketData") public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) { return marketDataRepository.getOne(marketDataRequest.getStock()); } } 

来研究下咱们的控制器。 咱们将使用@Controller注解来定义一个控制器来处理进入RSocket的请求。 另外,注解@MessageMapping让咱们定义咱们感兴趣的路由和如何响应一个请求。 在这个示例中, 服务器监听路由currentMarketData, 并响应一个单一的结果Mono<MarketData>给客户端。

4.2. 客户端

接下来, 咱们的RSocket客户端应该询问一只股票的价格并获得一个单一的响应。 为了初始化请求, 咱们该使用RSocketRequester类,以下:

@RestController
public class MarketDataRestController {

    private final RSocketRequester rSocketRequester;

    public MarketDataRestController(RSocketRequester rSocketRequester) {
        this.rSocketRequester = rSocketRequester;
    }

    @GetMapping(value = "/current/{stock}") public Publisher<MarketData> current(@PathVariable("stock") String stock) { return rSocketRequester .route("currentMarketData") .data(new MarketDataRequest(stock)) .retrieveMono(MarketData.class); } } 

注意:在示例中,RSocket客户端也是一个REST风格的controller,以此来访问咱们的RSocket服务器。所以,咱们使用@RestController@GetMapping注解来定义咱们的请求/响应端点。 在端点方法中, 咱们使用的是类RSocketRequester并指定了路由。 事实上,这个是服务器端RSocket所指望的路由,而后咱们传递请求数据。最后,当调用retrieveMono()方法时,springboot会帮咱们初始化一个请求/响应交互。

5. Spring Boot RSocket中的Fire And Forget模式

接下来咱们将查看 Fire And Forget交互模式。正如名字提示的同样,客户端发送一个请求给服务器,可是不指望服务器的返回响应回来。 在咱们的交易程序中, 一些客户端会做为数据资源服务,而且推送市场数据给服务器端。

5.1.服务器端

咱们来建立另一个端点在咱们的服务器应用程序中,以下:

@MessageMapping("collectMarketData") public Mono<Void> collectMarketData(MarketData marketData) { marketDataRepository.add(marketData); return Mono.empty(); } 

咱们又一次定义了一个新的@MessageMapping路由为collectMarketData。此外, Spring Boot自动转换传入的负载为一个MarketData实例。 可是,这儿最大的不一样是咱们返回一个Mono<Void>,由于客户端不须要服务器的返回。

5.2. 客户端

来看看咱们如何初始化咱们的fire-and-forget模式的请求。 咱们将建立另一个REST风格的端点,以下:

@GetMapping(value = "/collect") public Publisher<Void> collect() { return rSocketRequester .route("collectMarketData") .data(getMarketData()) .send(); } 

这儿咱们指定路由和负载将是一个MarketData实例。 因为咱们使用send()方法来代替retrieveMono(),全部交互模式变成了fire-and-forget模式。

6.Spring Boot RSocket中的Request Stream

请求流是一种更复杂的交互模式, 这个模式中客户端发送一个请求,可是在一段时间内从服务器端获取到多个响应。 为了模拟这种交互模式, 客户端会询问给定股票的全部市场数据。

6.1.服务器端

咱们从服务器端开始。 咱们将添加另一个消息映射方法,以下:

@MessageMapping("feedMarketData") public Flux<MarketData> feedMarketData(MarketDataRequest marketDataRequest) { return marketDataRepository.getAll(marketDataRequest.getStock()); } 

正如所见, 这个处理器方法跟其余的处理器方法很是相似。 不一样的部分是咱们返回一个Flux<MarketData>来代替Mono<MarketData>。 最后咱们的RSocket服务器会返回多个响应给客户端。

6.2.客户端

在客户端这边, 咱们该建立一个端点来初始化请求/响应通讯,以下:

@GetMapping(value = "/feed/{stock}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Publisher<MarketData> feed(@PathVariable("stock") String stock) { return rSocketRequester .route("feedMarketData") .data(new MarketDataRequest(stock)) .retrieveFlux(MarketData.class); } 

咱们来研究下RSocket请求。 首先咱们定义了路由和请求负载。 而后,咱们定义了使用retrieveFlux()调用的响应指望。这部分决定了交互模式。 另外注意:因为咱们的客户端也是REST风格的服务器,客户端也定义了响应媒介类型MediaType.TEXT_EVENT_STREAM_VALUE

7.异常的处理

如今让咱们看看在服务器程序中,如何以声明式的方式处理异常。 当处理请求/响应式, 我能够简单的使用@MessageExceptionHandler注解,以下:

@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
    return Mono.just(MarketData.fromException(e)); } 

这里咱们给异常处理方法标记注解为@MessageExceptionHandler。做为结果, 这个方法将处理全部类型的异常, 由于Exception是全部其余类型的异常的超类。 咱们也能够明确地建立更多的不一样类型的,不一样的异常处理方法。 这固然是请求/响应模式,而且咱们返回的是Mono<MarketData>。咱们指望这里的响应类型跟咱们的交互模式的返回类型相匹配。

8.总结

在本教程中, 咱们介绍了springboot的RSocket支持,并详细列出了RSocket提供的不一样交互模式。查看全部示例代码在GitHub上。

原文连接:www.baeldung.com/spring-boot…

做者:baeldung

译者:sleeve

 

相关文章
相关标签/搜索