本文会来作一些应用对比Spring MVC和Spring WebFlux,观察线程模型的区别,而后作一下简单的压力测试。前端
先来建立一个新的webflux-mvc的模块:java
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>me.josephzhu</groupId>
<artifactId>spring101-webflux-mvc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring101-webflux-mvc</name>
<description></description>
<parent>
<groupId>me.josephzhu</groupId>
<artifactId>spring101</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
复制代码
而后在项目里定义一个咱们会使用到的POJO:react
package me.josephzhu.spring101webfluxmvc;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "mydata")
public class MyData {
@Id
private String id;
private String payload;
private long time;
}
复制代码
这里的@Document和@Id是为Mongodb服务的,咱们定义了MyData将会以mydata做为Collection的名字,而后id字段是Document的Id列。 而后咱们来建立Controller,在这个Controller里面咱们尝试三种不一样的操做:web
package me.josephzhu.spring101webfluxmvc;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@RestController
public class MyController {
@Autowired
private RestTemplate restTemplate;
@Autowired
private MyRepository myRepository;
@GetMapping("/data")
public List<MyData> getData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining());
return IntStream.rangeClosed(1, size)
.mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis()))
.collect(Collectors.toList());
}
@GetMapping("/dbData")
public List<MyData> getDbData() {
return myRepository.findAll();
}
@GetMapping("/saveData")
public List<MyData> saveData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length){
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://localhost:8080/data")
.queryParam("size", size)
.queryParam("length", length);
ResponseEntity<List<MyData>> responseEntity =
restTemplate.exchange(builder.toUriString(),
HttpMethod.GET, null, new ParameterizedTypeReference<List<MyData>>() {});
return responseEntity.getBody().stream().map(myRepository::save).collect(Collectors.toList());
}
}
复制代码
注意,在这里咱们使用了Java 8的Steam来作一些操做避免使用for循环:spring
这些Stream的代码都是同步处理,也不涉及外部IO,和非阻塞没有任何关系,只是方便代码编写。为了让代码能够运行,咱们还须要继续来配置下Mongodb的Repository:mongodb
package me.josephzhu.spring101webfluxmvc;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface MyRepository extends MongoRepository<MyData, String> { }
复制代码
由于咱们没有用到复杂的查询,在代码里只是用到了findAll方法,因此这里咱们无需定义额外的方法,只是声明接口便可。 最后,咱们建立主应用程序,顺便配置一下Mongodb和RestTemplate:数据库
package me.josephzhu.spring101webfluxmvc;
import com.mongodb.MongoClientOptions;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;
@SpringBootApplication
@Configuration
public class Spring101WebfluxMvcApplication {
@Bean
MongoClientOptions mongoClientOptions(){
return MongoClientOptions.builder().connectionsPerHost(1000).build();
}
@Bean
public RestTemplate restTemplate(RestTemplateBuilder builder) {
return builder.build();
}
public static void main(String[] args) {
SpringApplication.run(Spring101WebfluxMvcApplication.class, args);
}
}
复制代码
这里咱们配置了Mongodb客户端使得以后在进行压力测试的时候能有超过100个链接链接到Mongodb,不然会出现没法获取链接的问题。apache
如今咱们再来新建一个webflux模块:编程
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>me.josephzhu</groupId>
<artifactId>spring101-webflux</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring101-webflux</name>
<description></description>
<parent>
<groupId>me.josephzhu</groupId>
<artifactId>spring101</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
复制代码
这里能够注意到,咱们引入了webflux这个starter以及data-mongodb-reactive这个starter。在以前的Spring MVC项目中,咱们引入的是mvc和data-mongodb两个starter。 而后,咱们一样须要建立一下MyData类(代码和以前如出一辙,这里省略)。 最关键的一步,咱们来建立三个Controller方法的定义:json
package me.josephzhu.spring101webflux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
@Component
public class MyHandler {
@Autowired
private MyReactiveRepository myReactiveRepository;
public Mono<ServerResponse> getData(ServerRequest serverRequest) {
int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10"));
int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100"));
String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining());
Flux<MyData> data = Flux.fromStream(IntStream.rangeClosed(1, size)
.mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis()))).delaySequence(Duration.ofMillis(100));
return ok()
.contentType(MediaType.APPLICATION_JSON)
.body(data, MyData.class);
}
public Mono<ServerResponse> getDbData(ServerRequest serverRequest) {
Flux<MyData> data = myReactiveRepository.findAll();
return ok()
.contentType(MediaType.APPLICATION_JSON)
.body(data, MyData.class);
}
public Mono<ServerResponse> saveData(ServerRequest serverRequest) {
int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10"));
int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100"));
Flux<MyData> data = WebClient.create().get()
.uri(builder -> builder
.scheme("http")
.host("localhost")
.port(8080)
.path("data")
.queryParam("size", size)
.queryParam("length", length)
.build())
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToFlux(MyData.class)
.flatMap(myReactiveRepository::save);
return ok()
.contentType(MediaType.APPLICATION_JSON)
.body(data, MyData.class);
}
}
复制代码
这里要说明几点:
刚才有提到,采用函数式声明对外的Endpoint的话除了定义Handler,还须要配置Router来和Handler关联,配置以下:
package me.josephzhu.spring101webflux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@Configuration
public class RouterConfig {
@Autowired
private MyHandler myHandler;
@Bean
public RouterFunction<ServerResponse> config() {
return route(GET("/data"), myHandler::getData)
.andRoute(GET("/dbData"), myHandler::getDbData)
.andRoute(GET("/saveData"), myHandler::saveData);
}
}
复制代码
这段代码没有太多须要说明,这里咱们定义了三个GET请求(至关于MVC的@GetMapping),而后对应到注入的myHandler的三个方法上。 而后咱们还须要建立Mongodb的Repository:
package me.josephzhu.spring101webflux;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface MyReactiveRepository extends ReactiveMongoRepository<MyData, String> { }
复制代码
以及配置和启动类:
package me.josephzhu.spring101webflux;
import com.mongodb.ConnectionString;
import com.mongodb.async.client.MongoClientSettings;
import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ConnectionPoolSettings;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@SpringBootApplication
@Configuration
public class Spring101WebfluxApplication {
@Bean
MongoClient mongoClient(){
return MongoClients.create(mongoClientSettings());
}
@Bean
MongoClientSettings mongoClientSettings(){
return MongoClientSettings.builder()
.clusterSettings(ClusterSettings.builder().applyConnectionString(new ConnectionString("mongodb://localhost")).build())
.connectionPoolSettings(ConnectionPoolSettings.builder().minSize(200).maxSize(1000).maxWaitQueueSize(1000000).build())
.build();
}
public static void main(String[] args) {
SpringApplication.run(Spring101WebfluxApplication.class, args);
}
}
复制代码
这里对Mongodb作了一些配置,主要也是但愿放大链接池这块的默认限制,为从此的压测服务。注意,在这里配置的Bean是com.mongodb.reactivestream.client下的MongoClient,以下图所示,还有其它两个MongoClient,若是修改了不匹配的MongoClient的话是不会有做用的,我在这个坑里躺了两小时。
下图是官网的一个图说明了二者的关系,而后官网也给出了一些建议:
咱们知道对于阻塞的实现方式,咱们采用线程池来服务请求(线程池中的会维护一组普通的线程,线程池只是节省线程建立的时间),对于每个请求的处理,至始至终都是在一个线程中进行,若是处理的过程当中咱们须要访问外部的网络或数据库,那么线程就处于阻塞状态,这个线程没法服务其它请求,若是当时还有更多的并发的话,就须要建立更多的线程来服务其它请求。这种实现方式是很是简单的,应对压力的增加扩容方式也是粗暴的,那就是增长更多线程。
对于非阻塞的方式,采用的是EventLoop的方式,IO操做的时候是不占用工做线程的,所以只会建立一组和CPU核数至关的工做线程用于工做处理(NodeJS甚至是单线程的,这种就更危险了,就那么一个工做线程,一旦被长时间占用其它请求都没法处理)。因为整个处理过程当中IO请求不占用线程时间,线程不会阻塞等待,再增长超过CPU核数的工做线程也是没有意义的(只会白白增长线程切换的开销)。对于这种方式在压力增加后,由于咱们不须要增长额外的线程,也就没有了绝对的瓶颈。
试想一下在阻塞模型下,对于5000的并发,并且每个并发阻塞的时间很是长,那么咱们其实须要5000个线程来服务(这么多线程99%其实都是在等待,属于空耗系统资源),建立5000的线程不谈其它的,若是线程栈大小是1M的话就须要5GB的内存。对于非阻塞的线程模型在8核机器上仍是8个工做线程,内存占用仍是这么小,能够以最小的开销应对大并发,系统的损耗不多。非阻塞的Reactive模式是内耗很是小的模式,可是这是有代价的,在实现上咱们须要确保处理过程当中没有阻塞产生,不然就会浪费宝贵的数目固定的工做线程,也就是说咱们须要依赖配套的非阻塞IO类库来使用。 在默认状况下tomcat的工做线程池初始化为10,最大200,咱们经过启动本文建立的Spring101WebfluxMvcApplication程序,用jvisualvm工具来看下初始的状况(35个线程) :
咱们可使用Gatling类库进行压力测试,我我的感受比Jmeter方便。配置很简单,首先咱们要安装Scala的SDK,而后咱们新建一个模块:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>me.josephzhu</groupId>
<artifactId>spring101-webstresstest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring101-webstresstest</name>
<description></description>
<dependencies>
<dependency>
<groupId>io.gatling.highcharts</groupId>
<artifactId>gatling-charts-highcharts</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.gatling</groupId>
<artifactId>gatling-maven-plugin</artifactId>
<version>2.2.4</version>
<configuration>
<simulationClass>me.josephzhu.spring101.webstresstest.StressTest</simulationClass>
<resultsFolder>/Users/zyhome/gatling</resultsFolder>
</configuration>
</plugin>
</plugins>
</build>
</project>
复制代码
引入了garling的maven插件,在这里配置了测试结果输出路径以及压测的类。接下去建立一下这个Scala测试类:
package me.josephzhu.spring101.webstresstest
import io.gatling.core.Predef._
import io.gatling.core.scenario.Simulation
import io.gatling.http.Predef._
class StressTest extends Simulation {
val scn = scenario("data").repeat(1000) {
exec(
http("data")
.get("http://localhost:8080/data?size=10&length=1000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(200)))
}
复制代码
这段代码定义了以下的测试行为:
nothingFor(4 seconds), // 1
atOnceUsers(10), // 2
rampUsers(10) over (5 seconds), // 3
constantUsersPerSec(20) during (15 seconds), // 4
constantUsersPerSec(20) during (15 seconds) randomized, // 5
rampUsersPerSec(10) to 20 during (10 minutes), // 6
rampUsersPerSec(10) to 20 during (10 minutes) randomized, // 7
splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy (10 seconds), // 8
splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy atOnceUsers(30), // 9
heavisideUsers(1000) over (20 seconds) // 10
复制代码
先来进行第一个测试,1000并发对data接口进行100次循环(还记得吗,接口有100ms休眠or延迟的):
class StressTest extends Simulation {
val scn = scenario("data").repeat(100) {
exec(
http("mvc data")
.get("http://localhost:8080/data?size=10&length=1000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(1000)))
}
复制代码
下面两个图分别是MVC和WebFlux的测试结果(由于都是8080端口,因此测试的时候记得切换重启两个应用哦):
如今咱们来访问一下http://localhost:8080/saveData?size=100&length=1000 接口往Mongodb来初始化100条数据,而后修改一下测试脚本压测dbData接口: class StressTest extends Simulation {
val scn = scenario("data").repeat(100) { exec( http("data") .get("http://localhost:8080/dbData") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) }
setUp(scn.inject(atOnceUsers(1000))) } 下面看下此次的测试结果 ,分别是MVC和WebFlux:
再来试一下第三个saveData接口的状况。修改测试代码:
class StressTest extends Simulation {
val scn = scenario("data").repeat(100) {
exec(
http("data")
.get("http://localhost:8080/saveData?size=5&length=100000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(200)))
}
复制代码
这里咱们修改并发用户为200,每一个用户进行100次测试,每次测试存入Mongodb 5条100KB的数据,一次测试后总数据量在10万条。此次测试咱们并无使用1000并发用户,缘由是这个测试咱们会先从远端获取数据而后再存入Mongodb,远端的服务也是来自于当前应用程序,咱们的Tomcat最多只有250个线程,在启动1000个用户后,一些线程服务于saveData接口,一些线程服务于data接口(saveData接口用到的),这样至关于形成了循环依赖问题,请求在等待更多的可用线程执行服务data接口的响应,而这个时候线程又都被占了致使没法分配更多的请求,测试几乎所有超时。 下面看下此次的测试结果 ,分别是MVC和WebFlux:
咱们来测试一下下面两种状况下对于WebFlux版本Mongodb侧的状况:
class StressTest extends Simulation {
val scn = scenario("data").repeat(1000) {
exec(
http("data")
.get("http://localhost:8080/saveData?size=1&length=1000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(200)))
}
复制代码
以及
class StressTest extends Simulation {
val scn = scenario("data").repeat(1000) {
exec(
http("data")
.get("http://localhost:8080/saveData?size=5&length=1000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(200)))
}
复制代码
区别就在远程服务返回的Flux是1个仍是5个。在1个的时候运行测试能够看到咱们Mongodb有64个链接(须要把以前链接池的配置最小设置为小一点,好比50):
> db.serverStatus().connections
{ "current" : 64, "available" : 3212, "totalCreated" : 8899 }
复制代码
在size为5的时候,Flux返回的是5个对象,使用这个请求压测的时候Mongodb的链接数以下:
> db.serverStatus().connections
{ "current" : 583, "available" : 2693, "totalCreated" : 10226 }
复制代码
这是由于Flux拿到的数据直接以响应式进入Mongodb,并无等到全部数据拿到以后串行调用方法。 总结一下这几回的测试,咱们发现WebFlux方式对于MVC方式能有略微的性能提高,对于请求阻塞的时候性能优点明显。我本金的测试并无看到现象中的几倍甚至几十倍的性能提高,我猜缘由以下:
若是有条件可使用三台独立服务器在内网进行1万以上并发用户的性能测试或许能够获得更科学的结果。
本文咱们建立了WebFlux和MVC两套应用对比演示了简单返回数据、发出远程请求、使用Mongodb等一些简单的应用场景,而后来看了一下ThreadPerRequest和EventLoop方式线程模型的区别,最后使用Gatling进行了几个Case的压力测试而且观察结果。我以为:
综上所述,使用WebFlux进行响应式编程我我的认为目前只适合作类IO转发的高并发的又看中资源使用效率的应用场景(好比Gateway网关服务),对于复杂的业务逻辑不太适合,在90%的状况下响应式的编程模型和线程模型不会享受大幅性能优点,更不建议盲目把现有的应用使用WebFlux来重写。固然,这确定是一个会持续发展的方向,能够先接触研究起来。