本系列文章索引《响应式Spring的道法术器》
前情提要 lambda与函数式 | Reactor 3快速上手
本文源码html
Spring WebFlux是随Spring 5推出的响应式Web框架。java
1)服务端技术栈react
Spring提供了完整的支持响应式的服务端技术栈。linux
如上图所示,左侧为基于spring-webmvc的技术栈,右侧为基于spring-webflux的技术栈,git
@Controller
、@RequestMapping
)的开发模式;由此看来,Spring WebFlux与Vert.x有一些相通之处,都是创建在非阻塞的异步I/O和事件驱动的基础之上的。github
2)响应式Http客户端web
此外,Spring WebFlux也提供了一个响应式的Http客户端API WebClient
。它能够用函数式的方式异步非阻塞地发起Http请求并处理响应。其底层也是由Netty提供的异步支持。ajax
咱们能够把WebClient
看作是响应式的RestTemplate
,与后者相比,前者:spring
固然,与服务端对应的,Spring WebFlux也提供了响应式的Websocket客户端API。mongodb
简单介绍这些,让咱们来Coding吧~
本节,咱们经过如下几个例子来逐步深刻地了解它的使用方法:
** 1. 先介绍一下使用Spring WebMVC风格的基于注解的方式如何编写响应式的Web服务,这几乎没有学习成本,很是赞。虽然这种方式在开发上与Spring WebMVC变化不大,可是框架底层已是彻底的响应式技术栈了;
WebClient
与前几步作好的服务端进行通讯;Spring Boot 2是基于Spring 5的,其中一个比较大的更新就在于支持包括spring-webflux和响应式的spring-data在内的响应式模块。Spring Boot 2即将发布正式版,不过目前的版本从功能上已经完备,下边的例子咱们就用Spring Boot 2在进行搭建。
咱们首先用Spring WebMVC开发一个只有Controller层的简单的Web服务,而后仅仅作一点点调整就可切换为基于Spring WebFlux的具备一样功能的Web服务。
咱们使用Spring Boot 2搭建项目框架。
如下截图来自IntelliJ IDEA,不过其余IDE也都是相似的。
1)基于Spring Initializr建立项目
本节的例子很简单,不涉及Service层和Dao层,所以只选择spring-webmvc便可,也就是“Web”的starter。
也可使用网页版的https://start.spring.io来建立项目:
建立后的项目POM中,包含下边的依赖,即表示基于Spring WebMVC:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
2)建立Controller和Endpoint
建立Controller类HelloController
,仅提供一个Endpoint:/hello
:
@RestController public class HelloController { @GetMapping("/hello") public String hello() { return "Welcome to reactive world ~"; } }
3)启动应用
OK了,一个简单的基于Spring WebMVC的Web服务。咱们新增了HelloController.java
,修改了application.properties
。
使用IDE启动应用,或使用maven命令:
mvn spring-boot:run
经过打印的log能够看到,服务运行于Tomcat的8080端口:
测试Endpoint。在浏览器中访问http://localhost:8080/hello
,或运行命令:
curl http://localhost:8080/hello
返回Welcome to reactive world ~
。
基于Spring WebFlux的项目与上边的步骤一致,仅有两点不一样。咱们此次偷个懒,就不重新建项目了,修改一下上边的项目:
4)依赖“Reactive Web”的starter而不是“Web”
修改项目POM,调整依赖使其基于Spring WebFlux:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-WebFlux</artifactId> <!--【改】增长“flux”四个字符--> </dependency>
5)Controller中处理请求的返回类型采用响应式类型
@RestController public class HelloController { @GetMapping("/hello") public Mono<String> hello() { // 【改】返回类型为Mono<String> return Mono.just("Welcome to reactive world ~"); // 【改】使用Mono.just生成响应式数据 } }
6)启动应用
仅须要上边两步就改完了,是否是很简单,一样的方法启动应用。启动后发现应用运行于Netty上:
访问http://localhost:8080/hello
,结果与Spring WebMVC的相同。
7)总结
从上边这个很是很是简单的例子中能够看出,Spring真是用心良苦,WebFlux提供了与以前WebMVC相同的一套注解来定义请求的处理,使得Spring使用者迁移到响应式开发方式的过程变得异常轻松。
虽然咱们只修改了少许的代码,可是其实这个简单的项目已经脱胎换骨了。整个技术栈从命令式的、同步阻塞的【spring-webmvc + servlet + Tomcat】变成了响应式的、异步非阻塞的【spring-webflux + Reactor + Netty】。
Netty是一套异步的、事件驱动的网络应用程序框架和工具,可以开发高性能、高可靠性的网络服务器和客户端程序,所以与一样是异步的、事件驱动的响应式编程范式一拍即合。
下边的内容了解便可,就不实战了。
在Java 7推出异步I/O库,以及Servlet3.1增长了对异步I/O的支持以后,Tomcat等Servlet容器也随后开始支持异步I/O,而后Spring WebMVC也增长了对Reactor库的支持,因此上边第4)步若是不是将spring-boot-starter-web
替换为spring-boot-starter-WebFlux
,而是增长reactor-core
的依赖的话,仍然能够用注解的方式开发基于Tomcat的响应式应用。
既然是响应式编程了,有些朋友可能会想统一用函数式的编程风格,WebFlux知足你。WebFlux提供了一套函数式接口,能够用来实现相似MVC的效果。咱们先接触两个经常使用的。
再回头瞧一眼上边例子中咱们用Controller
定义定义对Request的处理逻辑的方式,主要有两个点:
@RequestMapping
注解定义好这个方法对什么样url进行响应。在WebFlux的函数式开发模式中,咱们用HandlerFunction
和RouterFunction
来实现上边这两点。
HandlerFunction
至关于Controller
中的具体处理方法,输入为请求,输出为装在Mono
中的响应:Mono<T extends ServerResponse> handle(ServerRequest request);
RouterFunction
,顾名思义,路由,至关于@RequestMapping
,用来判断什么样的url映射到那个具体的HandlerFunction
,输入为请求,输出为装在Mono里边的Handlerfunction
:Mono<HandlerFunction<T>> route(ServerRequest request);
咱们看到,在WebFlux中,请求和响应再也不是WebMVC中的ServletRequest
和ServletResponse
,而是ServerRequest
和ServerResponse
。后者是在响应式编程中使用的接口,它们提供了对非阻塞和回压特性的支持,以及Http消息体与响应式类型Mono和Flux的转换方法。
下面咱们用函数式的方式开发两个Endpoint:
/time
返回当前的时间;/date
返回当前的日期。对于这两个需求,HandlerFunction很容易写:
// 返回包含时间字符串的ServerResponse HandlerFunction<ServerResponse> timeFunction = request -> ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body( Mono.just("Now is " + new SimpleDateFormat("HH:mm:ss").format(new Date())), String.class); // 返回包含日期字符串的ServerResponse HandlerFunction<ServerResponse> dateFunction = request -> ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body( Mono.just("Today is " + new SimpleDateFormat("yyyy-MM-dd").format(new Date())), String.class);
那么RouterFunction为:
RouterFunction<ServerResponse> router = RouterFunctions.route(GET("/time"), timeFunction) .andRoute(GET("/date"), dateFunction);
按照常见的套路,
RouterFunctions
是工具类。
不过这么写在业务逻辑复杂的时候不太好组织,咱们一般采用跟MVC相似的代码组织方式,将同类业务的HandlerFunction放在一个类中,而后在Java Config中将RouterFunction配置为Spring容器的Bean。咱们继续在第一个例子的代码上开发:
1)建立统一存放处理时间的Handler类
建立TimeHandler.java
:
import static org.springframework.web.reactive.function.server.ServerResponse.ok; @Component public class TimeHandler { public Mono<ServerResponse> getTime(ServerRequest serverRequest) { return ok().contentType(MediaType.TEXT_PLAIN).body(Mono.just("Now is " + new SimpleDateFormat("HH:mm:ss").format(new Date())), String.class); } public Mono<ServerResponse> getDate(ServerRequest serverRequest) { return ok().contentType(MediaType.TEXT_PLAIN).body(Mono.just("Today is " + new SimpleDateFormat("yyyy-MM-dd").format(new Date())), String.class); } }
因为出现次数一般比较多,这里静态引入
ServerResponse.ok()
方法。
2)在Spring容器配置RouterFunction
咱们采用Spring如今比较推荐的Java Config的配置Bean的方式,建立用于存放Router的配置类RouterConfig.java
:
import static org.springframework.web.reactive.function.server.RequestPredicates.GET; import static org.springframework.web.reactive.function.server.RouterFunctions.route; @Configuration public class RouterConfig { @Autowired private TimeHandler timeHandler; @Bean public RouterFunction<ServerResponse> timerRouter() { return route(GET("/time"), req -> timeHandler.getTime(req)) .andRoute(GET("/date"), timeHandler::getDate); // 这种方式相对于上一行更加简洁 } }
3)重启服务试一试
重启服务测试一下吧:
$ curl http://localhost:8080/date Today is 2018-02-26 $ curl http://localhost:8080/time Now is 21:12:53
咱们可能会遇到一些须要网页与服务器端保持链接(起码看上去是保持链接)的需求,好比相似微信网页版的聊天类应用,好比须要频繁更新页面数据的监控系统页面或股票看盘页面。咱们一般采用以下几种技术:
既然响应式编程是一种基于数据流的编程范式,天然在服务器推送方面驾轻就熟,咱们基于函数式方式再增长一个Endpoint /times
,能够每秒推送一次时间。
1)增长Handler方法
TimeHandler.java
:
public Mono<ServerResponse> sendTimePerSec(ServerRequest serverRequest) { return ok().contentType(MediaType.TEXT_EVENT_STREAM).body( // 1 Flux.interval(Duration.ofSeconds(1)). // 2 map(l -> new SimpleDateFormat("HH:mm:ss").format(new Date())), String.class); }
MediaType.TEXT_EVENT_STREAM
表示Content-Type
为text/event-stream
,即SSE;2)配置router
RouterConfig.java
:
@Bean public RouterFunction<ServerResponse> timerRouter() { return route(GET("/time"), timeHandler::getTime) .andRoute(GET("/date"), timeHandler::getDate) .andRoute(GET("/times"), timeHandler::sendTimePerSec); // 增长这一行 }
3)重启服务试一下
重启服务后,测试一下:
curl http://localhost:8080/times data:21:32:22 data:21:32:23 data:21:32:24 data:21:32:25 data:21:32:26 <Ctrl+C>
就酱,访问这个url会收到持续不断的报时数据(时间数据是在data
中的)。
那么用注解的方式如何进行服务端推送呢,这个演示就融到下一个例子中吧~
开发基于响应式流的应用,就像是在搭建数据流流动的管道,从而异步的数据可以顺畅流过每一个环节。前边的例子主要聚焦于应用层,然而绝大多数系统免不了要与数据库进行交互,因此咱们也须要响应式的持久层API和支持异步的数据库驱动。就像从自来水厂到家里水龙头这个管道中,若是任何一个环节发生了阻塞,那就可能形成总体吞吐量的降低。
各个数据库都开始陆续推出异步驱动,目前Spring Data支持的能够进行响应式数据访问的数据库有MongoDB、Redis、Apache Cassandra和CouchDB。今天咱们用MongoDB来写一个响应式demo。
咱们这个例子很简单,就是关于User
的增删改查,以及基于注解的服务端推送。
1)编写User
既然是举例,咱们随便定义几个属性吧~
public class User { private String id; private String username; private String phone; private String email; private String name; private Date birthday; }
而后为了方便开发,咱们引入lombok库,它可以经过注解的方式为咱们添加必要的Getter/Setter/hashCode()/equals()/toString()/构造方法等,添加依赖(版本可自行到http://search.maven.org搜索最新):
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.20</version> </dependency>
而后为User
添加注解:
@Data // 生成无参构造方法/getter/setter/hashCode/equals/toString @AllArgsConstructor // 生成全部参数构造方法 @NoArgsConstructor // @AllArgsConstructor会致使@Data不生成无参构造方法,须要手动添加@NoArgsConstructor,若是没有无参构造方法,可能会致使好比com.fasterxml.jackson在序列化处理时报错 public class User { ...
咱们能够利用IDE看一下生成的方法(以下图黄框所示):
可能须要先在IDE中进行少许配置以便支持lombok的注解,好比IntelliJ IDEA:
- 安装“lombok plugin”:
- 开启对注解编译的支持:
lombok对于Java开发者来讲绝对算是个福音了,但愿使用Kotlin的朋友不要笑话咱们土哦~
2)增长Spring Data的依赖
在POM中增长Spring Data Reactive Mongo的依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> </dependency>
MongoDB是文档型的NoSQL数据库,所以,咱们使用@Document
注解User
类:
@Data @AllArgsConstructor @Document public class User { @Id private String id; // 注解属性id为ID @Indexed(unique = true) // 注解属性username为索引,而且不能重复 private String username; private String name; private String phone; private Date birthday; }
OK,这样咱们的模型就准备好了。MongoDB会自动建立collection,默认为类名首字母小写,也就是user
。
3)配置数据源
Spring Boot为咱们搞定了几乎全部的配置,太赞了,下边是MongoDB的默认配置:
# MONGODB (MongoProperties) spring.data.mongodb.authentication-database= # Authentication database name. spring.data.mongodb.database=test # Database name. spring.data.mongodb.field-naming-strategy= # Fully qualified name of the FieldNamingStrategy to use. spring.data.mongodb.grid-fs-database= # GridFS database name. spring.data.mongodb.host=localhost # Mongo server host. Cannot be set with uri. spring.data.mongodb.password= # Login password of the mongo server. Cannot be set with uri. spring.data.mongodb.port=27017 # Mongo server port. Cannot be set with uri. spring.data.mongodb.repositories.enabled=true # Enable Mongo repositories. spring.data.mongodb.uri=mongodb://localhost/test # Mongo database URI. Cannot be set with host, port and credentials. spring.data.mongodb.username= # Login user of the mongo server. Cannot be set with uri.
请根据须要添加自定义的配置,好比个人MongoDB是跑在IP为192.168.0.101的虚拟机的Docker中的,就可在application.properties
中增长一条:
spring.data.mongodb.host=192.168.0.101
4)增长DAO层repository
与非响应式Spring Data的CrudReposity
对应的,响应式的Spring Data也提供了相应的Repository库:ReactiveCrudReposity
,固然,咱们也可使用它的子接口ReactiveMongoRepository
。
咱们增长UserRepository
:
public interface UserRepository extends ReactiveCrudRepository<User, String> { // 1 Mono<User> findByUsername(String username); // 2 Mono<Long> deleteByUsername(String username); }
ReactiveCrudRepository
的泛型分别是User
和ID
的类型;ReactiveCrudRepository
已经提供了基本的增删改查的方法,根据业务须要,咱们增长四个方法(在此膜拜一下Spring团队的牛人们,使得咱们仅需按照规则定义接口方法名便可完成DAO层逻辑的开发,牛~)5)Service层
因为业务逻辑几乎为零,只是简单调用了DAO层,直接贴代码:
@Service public class UserService { @Autowired private UserRepository userRepository; /** * 保存或更新。 * 若是传入的user没有id属性,因为username是unique的,在重复的状况下有可能报错, * 这时找到以保存的user记录用传入的user更新它。 */ public Mono<User> save(User user) { return userRepository.save(user) .onErrorResume(e -> // 1 userRepository.findByUsername(user.getUsername()) // 2 .flatMap(originalUser -> { // 4 user.setId(originalUser.getId()); return userRepository.save(user); // 3 })); } public Mono<Long> deleteByUsername(String username) { return userRepository.deleteByUsername(username); } public Mono<User> findByUsername(String username) { return userRepository.findByUsername(username); } public Flux<User> findAll() { return userRepository.findAll(); } }
onErrorResume
进行错误处理;User -> Publisher
,因此用flatMap
。6)Controller层
直接贴代码:
@RestController @RequestMapping("/user") public class UserController { @Autowired private UserService userService; @PostMapping("") public Mono<User> save(User user) { return this.userService.save(user); } @DeleteMapping("/{username}") public Mono<Long> deleteByUsername(@PathVariable String username) { return this.userService.deleteByUsername(username); } @GetMapping("/{username}") public Mono<User> findByUsername(@PathVariable String username) { return this.userService.findByUsername(username); } @GetMapping("") public Flux<User> findAll() { return this.userService.findAll(); } }
7)启动应用测试一下
因为涉及到POST和DELETE方法的请求,建议用支持RESTful的client来测试,好比“Restlet client”:
如图,增长操做是成功的,只要username不变,再次发送请求会更新该记录。
图中birthday的时间差8小时,不去管它。
用一样的方法增长一个李四,以后咱们再来测试一下查询。
1) 根据用户名查询(METHOD:GET URL:http://localhost:8080/user/zhangsan),下边输出是格式化的JSON:
{ "id": "5a9504a167646d057051e229", "username": "zhangsan", "name": "张三", "phone": "18610861861", "birthday": "1989-12-31T16:00:00.000+0000" }
2) 查询所有(METHOD:GET URL:http://localhost:8080/user)
[{"id":"5a9504a167646d057051e229","username":"zhangsan","name":"张三","phone":"18610861861","birthday":"1989-12-31T16:00:00.000+0000"},{"id":"5a9511db67646d3c782f2e7f","username":"lisi","name":"李四","phone":"18610861862","birthday":"1992-02-01T16:00:00.000+0000"}]
测试一下删除(METHOD:DELETE URL:http://localhost:8080/user/zhangsan),返回值为1,再查询所有,发现张三已经被删除了,OK。
8)stream+json
看到这里细心的朋友可能会有点嘀咕,怎么看是否是异步的呢?毕竟查询所有的时候,结果都用中括号括起来了,这和原来返回List<User>
的效果彷佛没多大区别。假设一下查询100个数据,若是是异步的话,以咱们对“异步响应式流”的印象彷佛应该是一个一个至少是一批一批的到达客户端的嘛。咱们加个延迟验证一下:
@GetMapping("") public Flux<User> findAll() { return this.userService.findAll().delayElements(Duration.ofSeconds(1)); }
每一个元素都延迟1秒,如今咱们在数据库里弄三条记录,而后请求查询所有的那个URL,发现并非像/times
同样一秒一个地出来,而是3秒以后一起出来的。果真如此,这一点都不响应式啊!
与/times
相似,咱们也加一个MediaType,不过因为这里返回的是JSON,所以不能使用TEXT_EVENT_STREAM
,而是使用APPLICATION_STREAM_JSON
,即application/stream+json
格式。
@GetMapping(value = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<User> findAll() { return this.userService.findAll().delayElements(Duration.ofSeconds(2)); }
produces
后边的值应该是application/stream+json
字符串,所以用APPLICATION_STREAM_JSON_VALUE
。重启服务再次请求,发现三个user是一秒一个的速度出来的,中括号也没有了,而是一个一个独立的JSON值构成的json stream:
{"id":"5a9504a167646d057051e229","username":"zhangsan","name":"张三","phone":"18610861861","birthday":"1989-12-31T16:00:00.000+0000"} {"id":"5a9511db67646d3c782f2e7f","username":"lisi","name":"李四","phone":"18610861862","birthday":"1992-02-01T16:00:00.000+0000"} {"id":"5a955f08fa10b93ec48df37f","username":"wangwu","name":"王五","phone":"18610861865","birthday":"1995-05-04T16:00:00.000+0000"}
9)总结
若是有Spring Data开发经验的话,切换到Spring Data Reactive的难度并不高。跟Spring WebFlux相似:原来返回User
的话,那如今就返回Mono<User>
;原来返回List<User>
的话,那如今就返回Flux<User>
。
对于稍微复杂的业务逻辑或一些必要的异常处理,好比上边的save方法,请必定采用响应式的编程方式来定义,从而一切都是异步非阻塞的。以下图所示,从HttpServer(如Netty或Servlet3.1以上的Servlet容器)到ServerAdapter(Spring WebFlux框架提供的针对不一样server的适配器),到咱们编写的Controller和DAO,以及异步数据库驱动,构成了一个完整的异步非阻塞的管道,里边流动的就是响应式流。
WebClient
开发响应式Http客户端下面,咱们用WebClient测试一下前边几个例子的成果。
1) /hello,返回Mono
@Test public void webClientTest1() throws InterruptedException { WebClient webClient = WebClient.create("http://localhost:8080"); // 1 Mono<String> resp = webClient .get().uri("/hello") // 2 .retrieve() // 3 .bodyToMono(String.class); // 4 resp.subscribe(System.out::println); // 5 TimeUnit.SECONDS.sleep(1); // 6 }
WebClient
对象并指定baseUrl;CountDownLatch
。运行效果以下:
2) /user,返回Flux
为了多演示一些不一样的实现方式,下边的例子咱们调整几个地方,可是效果跟上边是同样的:
@Test public void webClientTest2() throws InterruptedException { WebClient webClient = WebClient.builder().baseUrl("http://localhost:8080").build(); // 1 webClient .get().uri("/user") .accept(MediaType.APPLICATION_STREAM_JSON) // 2 .exchange() // 3 .flatMapMany(response -> response.bodyToFlux(User.class)) // 4 .doOnNext(System.out::println) // 5 .blockLast(); // 6 }
Content-Type: application/stream+json
;ClientResponse
,retrive()
能够看作是exchange()
方法的“快捷版”;flatMap
来将ClientResponse映射为Flux;blockLast
方法,顾名思义,在收到最后一个元素前会阻塞,响应式业务场景中慎用。运行效果以下:
3) /times,服务端推送
@Test public void webClientTest3() throws InterruptedException { WebClient webClient = WebClient.create("http://localhost:8080"); webClient .get().uri("/times") .accept(MediaType.TEXT_EVENT_STREAM) // 1 .retrieve() .bodyToFlux(String.class) .log() // 2 .take(10) // 3 .blockLast(); }
Content-Type: text/event-stream
,即SSE;log()
代替doOnNext(System.out::println)
来查看每一个元素;/times
是一个无限流,这里取前10个,会致使流被取消;运行效果以下:
许多朋友看到这个题目会想到Websocket,的确,Websocket确实能够实现全双工通讯,但它的数据传输并不是是彻底基于HTTP协议的,关于Websocket咱们后边再聊。
下面咱们实现一个这样两个Endpoint:
/events
,“源源不断”地收集数据,并存入数据库;/events
,“源源不断”将数据库中的记录发出来。0)准备
1、数据模型MyEvent
:
@Data @AllArgsConstructor @NoArgsConstructor @Document(collection = "event") // 1 public class MyEvent { @Id private Long id; // 2 private String message; }
event
;2、DAO层:
public interface MyEventRepository extends ReactiveMongoRepository<MyEvent, Long> { // 1 }
insert(Flux)
方法,这个方法是在ReactiveMongoRepository
中定义的。3、简单起见就不要Service层了,直接Controller:
@RestController @RequestMapping("/events") public class MyEventController { @Autowired private MyEventRepository myEventRepository; @PostMapping(path = "") public Mono<Void> loadEvents(@RequestBody Flux<MyEvent> events) { // 1 // TODO return null; } @GetMapping(path = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<MyEvent> getEvents() { // 2 // TODO return null; } }
Mono<Void>
做为方法返回值,表示若是传输完的话只给一个“完成信号”就OK了;Flux<MyEvent>
,不要忘了注解上produces = MediaType.APPLICATION_STREAM_JSON_VALUE
。准备到此为止,类以下。咱们来完成上边的两个TODO吧。
1)接收数据流的Endpoint
在客户端,WebClient
能够接收text/event-stream
和application/stream+json
格式的数据流,也能够在请求的时候上传一个数据流到服务器;
在服务端,WebFlux也支持接收一个数据流做为请求参数,从而实现一个接收数据流的Endpoint。
咱们先看服务端。Controller中的loadEvents
方法:
@PostMapping(path = "", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE) // 1 public Mono<Void> loadEvents(@RequestBody Flux<MyEvent> events) { return this.myEventRepository.insert(events).then(); // 2 }
application/stream+json
,与getEvents
方法的区别在于这个方法是consume
这个数据流;insert
返回的是保存成功的记录的Flux,但咱们不须要,使用then
方法表示“忽略数据元素,只返回一个完成信号”。服务端写好后,启动之,再看一下客户端怎么写(仍是放在src/test
下):
@Test public void webClientTest4() { Flux<MyEvent> eventFlux = Flux.interval(Duration.ofSeconds(1)) .map(l -> new MyEvent(System.currentTimeMillis(), "message-" + l)).take(5); // 1 WebClient webClient = WebClient.create("http://localhost:8080"); webClient .post().uri("/events") .contentType(MediaType.APPLICATION_STREAM_JSON) // 2 .body(eventFlux, MyEvent.class) // 3 .retrieve() .bodyToMono(Void.class) .block(); }
take
的话表示无限个元素的数据流;application/stream+json
;body
方法设置请求体的数据。运行一下这个测试,根据控制台数据能够看到是一条一条将数据发到/events
的,看一下MongoDB中的数据:
2)发出无限流的Endpoint
回想一下前边/user
的例子,当数据库中全部的内容都查询出来以后,这个流就结束了,由于其后跟了一个“完成信号”,咱们能够经过在UserService
的findAll()
方法的流上增长log()
操做符来观察更详细的日志:
咱们能够看到在三个onNext
信号后是一个onComplete
信号。
这样的流是有限流,这个时候若是在数据库中再新增一个User的话,已经结束的请求也不会再有新的内容出现了。
反观/times
请求,它会无限地发出SSE,而不会有“完成信号”出现,这是无限流。
咱们但愿的状况是不管是请求GET的/events
以后,当全部数据都发完以后,不要结束,而是挂起等待新的数据。若是咱们用上边的POST的/events
传入新的数据到数据库后,新的数据会自动地流到客户端。
这能够在DAO层配置实现:
public interface MyEventRepository extends ReactiveMongoRepository<MyEvent, Long> { @Tailable // 1 Flux<MyEvent> findBy(); // 2 }
@Tailable
注解的做用相似于linux的tail
命令,被注解的方法将发送无限流,须要注解在返回值为Flux这样的多个元素的Publisher的方法上;findAll()
是想要的方法,可是在ReactiveMongoRepository
中咱们够不着,因此使用findBy()
代替。而后完成Controller中的方法:
@GetMapping(path = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<MyEvent> getEvents() { return this.myEventRepository.findBy(); }
不过,这还不够,@Tailable
仅支持有大小限制的(“capped”)collection,而自动建立的collection是不限制大小的,所以咱们须要先手动建立。Spring Boot提供的CommandLineRunner
能够帮助咱们实现这一点。
Spring Boot应用程序在启动后,会遍历CommandLineRunner接口的实例并运行它们的run方法。
@Bean // 1 public CommandLineRunner initData(MongoOperations mongo) { // 2 return (String... args) -> { // 3 mongo.dropCollection(MyEvent.class); // 4 mongo.createCollection(MyEvent.class, CollectionOptions.empty().size(200).capped()); // 5 }; }
WebFluxDemoApplication
了;MongoOperations
提供对MongoDB的操做方法,由Spring注入的mongo实例已经配置好,直接使用便可;CommandLineRunner
也是一个函数式接口,其实例能够用lambda表达;启动应用,咱们检查一下event
collection:
OK,这个时候咱们请求一下http://localhost:8080/events
,发现立马返回了,并无挂起。缘由在于collection中一条记录都没有,而@Tailable
起做用的前提是至少有一条记录。
跑一下WebClient测试程序插入5条数据,而后再次请求:
请求是挂起的,这没错,可是只有两条数据,看WebClient测试程序的控制台明明发出了5个请求啊。
缘由定义的CollectionOptions.empty().size(200).capped()
中,size
指的是以字节为单位的大小,而且会向上取到256的整倍数,因此咱们刚才定义的是256byte大小的collection,因此最多容纳两条记录。咱们能够这样改一下:
CollectionOptions.empty().maxDocuments(200).size(100000).capped()
maxDocuments
限制了记录条数,size
限制容量且是必须定义的,由于MongoDB不像关系型数据库有严格的列和字段大小定义,鬼知道会存多大的数据进来,因此容量限制是必要的。
好了,再次启动应用,先插入5条数据,而后请求/events
,收到5条记录后请求仍然挂起,在插入5条数据,curl客户端又会陆续收到新的数据。
咱们用代码搭建了图中箭头所表示的“管道”,看效果仍是很畅通的嘛。如今再回想咱们最初的那个Excel的例子,是否是感受这个demo颇有响应式的“范儿”了呢?
这一节,咱们对WebFlux作了一个简单的基于实例的介绍,相信你对响应式编程及其在WEB应用中如何发挥做用有了更多的体会,本章的实战是比较基础的,初衷是但愿可以经过上手编写代码体会响应式编程的感受,由于切换到响应式思惟方式并不是易事。
这一章的核心关键词其实翻来覆去就是:“异步非阻塞的响应式流”。咱们了解了异步非阻塞的好处,也知道如何让数据流动起来,下面咱们就经过对实例的性能测试,借助实实在在的数据,真切感觉一下异步非阻塞的“丝滑”。