阅读PDF版本前端
本文会来作一些应用对比Spring MVC和Spring WebFlux,观察线程模型的区别,而后作一下简单的压力测试。java
先来建立一个新的webflux-mvc的模块:react
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>me.josephzhu</groupId> <artifactId>spring101-webflux-mvc</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring101-webflux-mvc</name> <description></description> <parent> <groupId>me.josephzhu</groupId> <artifactId>spring101</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
而后在项目里定义一个咱们会使用到的POJO:web
package me.josephzhu.spring101webfluxmvc; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; @Data @AllArgsConstructor @NoArgsConstructor @Document(collection = "mydata") public class MyData { @Id private String id; private String payload; private long time; }
这里的@Document和@Id是为Mongodb服务的,咱们定义了MyData将会以mydata做为Collection的名字,而后id字段是Document的Id列。
而后咱们来建立Controller,在这个Controller里面咱们尝试三种不一样的操做:spring
package me.josephzhu.spring101webfluxmvc; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.client.RestTemplate; import org.springframework.web.util.UriComponentsBuilder; import java.util.List; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; @RestController public class MyController { @Autowired private RestTemplate restTemplate; @Autowired private MyRepository myRepository; @GetMapping("/data") public List<MyData> getData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length) { try { Thread.sleep(100); } catch (InterruptedException e) { } String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining()); return IntStream.rangeClosed(1, size) .mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis())) .collect(Collectors.toList()); } @GetMapping("/dbData") public List<MyData> getDbData() { return myRepository.findAll(); } @GetMapping("/saveData") public List<MyData> saveData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length){ UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://localhost:8080/data") .queryParam("size", size) .queryParam("length", length); ResponseEntity<List<MyData>> responseEntity = restTemplate.exchange(builder.toUriString(), HttpMethod.GET, null, new ParameterizedTypeReference<List<MyData>>() {}); return responseEntity.getBody().stream().map(myRepository::save).collect(Collectors.toList()); } }
注意,在这里咱们使用了Java 8的Steam来作一些操做避免使用for循环:mongodb
package me.josephzhu.spring101webfluxmvc; import org.springframework.data.mongodb.repository.MongoRepository; import org.springframework.stereotype.Repository; @Repository public interface MyRepository extends MongoRepository<MyData, String> { }
由于咱们没有用到复杂的查询,在代码里只是用到了findAll方法,因此这里咱们无需定义额外的方法,只是声明接口便可。
最后,咱们建立主应用程序,顺便配置一下Mongodb和RestTemplate:数据库
package me.josephzhu.spring101webfluxmvc; import com.mongodb.MongoClientOptions; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.client.RestTemplate; @SpringBootApplication @Configuration public class Spring101WebfluxMvcApplication { @Bean MongoClientOptions mongoClientOptions(){ return MongoClientOptions.builder().connectionsPerHost(1000).build(); } @Bean public RestTemplate restTemplate(RestTemplateBuilder builder) { return builder.build(); } public static void main(String[] args) { SpringApplication.run(Spring101WebfluxMvcApplication.class, args); } }
这里咱们配置了Mongodb客户端使得以后在进行压力测试的时候能有超过100个链接链接到Mongodb,不然会出现没法获取链接的问题。apache
如今咱们再来新建一个webflux模块:编程
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>me.josephzhu</groupId> <artifactId>spring101-webflux</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring101-webflux</name> <description></description> <parent> <groupId>me.josephzhu</groupId> <artifactId>spring101</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
这里能够注意到,咱们引入了webflux这个starter以及data-mongodb-reactive这个starter。在以前的Spring MVC项目中,咱们引入的是mvc和data-mongodb两个starter。
而后,咱们一样须要建立一下MyData类(代码和以前如出一辙,这里省略)。
最关键的一步,咱们来建立三个Controller方法的定义:json
package me.josephzhu.spring101webflux; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Duration; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.springframework.web.reactive.function.server.ServerResponse.ok; @Component public class MyHandler { @Autowired private MyReactiveRepository myReactiveRepository; public Mono<ServerResponse> getData(ServerRequest serverRequest) { int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10")); int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100")); String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining()); Flux<MyData> data = Flux.fromStream(IntStream.rangeClosed(1, size) .mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis()))).delaySequence(Duration.ofMillis(100)); return ok() .contentType(MediaType.APPLICATION_JSON) .body(data, MyData.class); } public Mono<ServerResponse> getDbData(ServerRequest serverRequest) { Flux<MyData> data = myReactiveRepository.findAll(); return ok() .contentType(MediaType.APPLICATION_JSON) .body(data, MyData.class); } public Mono<ServerResponse> saveData(ServerRequest serverRequest) { int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10")); int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100")); Flux<MyData> data = WebClient.create().get() .uri(builder -> builder .scheme("http") .host("localhost") .port(8080) .path("data") .queryParam("size", size) .queryParam("length", length) .build()) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToFlux(MyData.class) .flatMap(myReactiveRepository::save); return ok() .contentType(MediaType.APPLICATION_JSON) .body(data, MyData.class); } }
这里要说明几点:
package me.josephzhu.spring101webflux; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.ServerResponse; import static org.springframework.web.reactive.function.server.RequestPredicates.GET; import static org.springframework.web.reactive.function.server.RouterFunctions.route; @Configuration public class RouterConfig { @Autowired private MyHandler myHandler; @Bean public RouterFunction<ServerResponse> config() { return route(GET("/data"), myHandler::getData) .andRoute(GET("/dbData"), myHandler::getDbData) .andRoute(GET("/saveData"), myHandler::saveData); } }
这段代码没有太多须要说明,这里咱们定义了三个GET请求(至关于MVC的@GetMapping),而后对应到注入的myHandler的三个方法上。
而后咱们还须要建立Mongodb的Repository:
package me.josephzhu.spring101webflux; import org.springframework.data.mongodb.repository.ReactiveMongoRepository; import org.springframework.stereotype.Repository; @Repository public interface MyReactiveRepository extends ReactiveMongoRepository<MyData, String> { }
以及配置和启动类:
package me.josephzhu.spring101webflux; import com.mongodb.ConnectionString; import com.mongodb.async.client.MongoClientSettings; import com.mongodb.connection.ClusterSettings; import com.mongodb.connection.ConnectionPoolSettings; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoClients; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @SpringBootApplication @Configuration public class Spring101WebfluxApplication { @Bean MongoClient mongoClient(){ return MongoClients.create(mongoClientSettings()); } @Bean MongoClientSettings mongoClientSettings(){ return MongoClientSettings.builder() .clusterSettings(ClusterSettings.builder().applyConnectionString(new ConnectionString("mongodb://localhost")).build()) .connectionPoolSettings(ConnectionPoolSettings.builder().minSize(200).maxSize(1000).maxWaitQueueSize(1000000).build()) .build(); } public static void main(String[] args) { SpringApplication.run(Spring101WebfluxApplication.class, args); } }
这里对Mongodb作了一些配置,主要也是但愿放大链接池这块的默认限制,为从此的压测服务。注意,在这里配置的Bean是com.mongodb.reactivestream.client下的MongoClient,以下图所示,还有其它两个MongoClient,若是修改了不匹配的MongoClient的话是不会有做用的,我在这个坑里躺了两小时。
完成后能够打开浏览器测试一下接口:
下图是官网的一个图说明了二者的关系,而后官网也给出了一些建议:
咱们知道对于阻塞的实现方式,咱们采用线程池来服务请求(线程池中的会维护一组普通的线程,线程池只是节省线程建立的时间),对于每个请求的处理,至始至终都是在一个线程中进行,若是处理的过程当中咱们须要访问外部的网络或数据库,那么线程就处于阻塞状态,这个线程没法服务其它请求,若是当时还有更多的并发的话,就须要建立更多的线程来服务其它请求。这种实现方式是很是简单的,应对压力的增加扩容方式也是粗暴的,那就是增长更多线程。
对于非阻塞的方式,采用的是EventLoop的方式,IO操做的时候是不占用工做线程的,所以只会建立一组和CPU核数至关的工做线程用于工做处理(NodeJS甚至是单线程的,这种就更危险了,就那么一个工做线程,一旦被长时间占用其它请求都没法处理)。因为整个处理过程当中IO请求不占用线程时间,线程不会阻塞等待,再增长超过CPU核数的工做线程也是没有意义的(只会白白增长线程切换的开销)。对于这种方式在压力增加后,由于咱们不须要增长额外的线程,也就没有了绝对的瓶颈。
试想一下在阻塞模型下,对于5000的并发,并且每个并发阻塞的时间很是长,那么咱们其实须要5000个线程来服务(这么多线程99%其实都是在等待,属于空耗系统资源),建立5000的线程不谈其它的,若是线程栈大小是1M的话就须要5GB的内存。对于非阻塞的线程模型在8核机器上仍是8个工做线程,内存占用仍是这么小,能够以最小的开销应对大并发,系统的损耗不多。非阻塞的Reactive模式是内耗很是小的模式,可是这是有代价的,在实现上咱们须要确保处理过程当中没有阻塞产生,不然就会浪费宝贵的数目固定的工做线程,也就是说咱们须要依赖配套的非阻塞IO类库来使用。
在默认状况下tomcat的工做线程池初始化为10,最大200,咱们经过启动本文建立的Spring101WebfluxMvcApplication程序,用jvisualvm工具来看下初始的状况(35个线程):
在项目的application.properties文件中咱们配置tomcat的最大线程数:
server.tomcat.max-threads=250
在压力的状况下,咱们再来观察一下线程的状况(272个线程):
的确是建立多达250个工做线程。这里看到大部分线程都在休眠,由于咱们这里运行的是刚才的data()方法,在方法内咱们休眠了100毫秒。对于一样的压力,咱们再来看一下Spring101WebfluxApplication程序的线程状况(44个线程):
能够看到用于处理HTTP的Reactor线程只有8个,和本机CPU核数量一致(下面有十个Thread打头的线程是处理和Mongodb交互的,忽略),只须要这8个线程处理HTTP请求足以,由于HTTP请求的IO处理不会占用线程。
咱们可使用Gatling类库进行压力测试,我我的感受比Jmeter方便。配置很简单,首先咱们要安装Scala的SDK,而后咱们新建一个模块:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>me.josephzhu</groupId> <artifactId>spring101-webstresstest</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring101-webstresstest</name> <description></description> <dependencies> <dependency> <groupId>io.gatling.highcharts</groupId> <artifactId>gatling-charts-highcharts</artifactId> <version>2.3.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>io.gatling</groupId> <artifactId>gatling-maven-plugin</artifactId> <version>2.2.4</version> <configuration> <simulationClass>me.josephzhu.spring101.webstresstest.StressTest</simulationClass> <resultsFolder>/Users/zyhome/gatling</resultsFolder> </configuration> </plugin> </plugins> </build> </project>
引入了garling的maven插件,在这里配置了测试结果输出路径以及压测的类。接下去建立一下这个Scala测试类:
package me.josephzhu.spring101.webstresstest import io.gatling.core.Predef._ import io.gatling.core.scenario.Simulation import io.gatling.http.Predef._ class StressTest extends Simulation { val scn = scenario("data").repeat(1000) { exec( http("data") .get("http://localhost:8080/data?size=10&length=1000") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) } setUp(scn.inject(atOnceUsers(200))) }
这段代码定义了以下的测试行为:
nothingFor(4 seconds), // 1 atOnceUsers(10), // 2 rampUsers(10) over (5 seconds), // 3 constantUsersPerSec(20) during (15 seconds), // 4 constantUsersPerSec(20) during (15 seconds) randomized, // 5 rampUsersPerSec(10) to 20 during (10 minutes), // 6 rampUsersPerSec(10) to 20 during (10 minutes) randomized, // 7 splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy (10 seconds), // 8 splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy atOnceUsers(30), // 9 heavisideUsers(1000) over (20 seconds) // 10
先来进行第一个测试,1000并发对data接口进行100次循环(还记得吗,接口有100ms休眠or延迟的):
class StressTest extends Simulation { val scn = scenario("data").repeat(100) { exec( http("mvc data") .get("http://localhost:8080/data?size=10&length=1000") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) } setUp(scn.inject(atOnceUsers(1000))) }
下面两个图分别是MVC和WebFlux的测试结果(由于都是8080端口,因此测试的时候记得切换重启两个应用哦):
能够看到WebFlux的吞吐几乎是MVC的翻倍,平均响应时间少了两倍不止,很明显,在等待的时候,2000个并发用户大大超过了咱们配置的250个线程池的线程数量,这个时候只能排队,对于非阻塞的方式,延迟是不会占用处理线程的,在延迟结束后才会去占用处理线程的资源进行处理,不会收到并发用户数受限于线程池线程数的状况。
咱们把Sleep相关代码注释再进行一次测试看看状况,分别是MVC和WebFlux:
这个时候WebFlux优点没有那么明显了。
如今咱们来访问一下http://localhost:8080/saveData?size=100&length=1000 接口往Mongodb来初始化100条数据,而后修改一下测试脚本压测dbData接口:
class StressTest extends Simulation {
val scn = scenario("data").repeat(100) {
exec(
http("data")
.get("http://localhost:8080/dbData")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(1000)))
}
下面看下此次的测试结果 ,分别是MVC和WebFlux:
吞吐量没有太多提升,平均响应时间快很多。
再来试一下第三个saveData接口的状况。修改测试代码:
class StressTest extends Simulation {
val scn = scenario("data").repeat(100) {
exec(
http("data")
.get("http://localhost:8080/saveData?size=5&length=100000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(200)))
}
这里咱们修改并发用户为200,每一个用户进行100次测试,每次测试存入Mongodb 5条100KB的数据,一次测试后总数据量在10万条。此次测试咱们并无使用1000并发用户,缘由是这个测试咱们会先从远端获取数据而后再存入Mongodb,远端的服务也是来自于当前应用程序,咱们的Tomcat最多只有250个线程,在启动1000个用户后,一些线程服务于saveData接口,一些线程服务于data接口(saveData接口用到的),这样至关于形成了循环依赖问题,请求在等待更多的可用线程执行服务data接口的响应,而这个时候线程又都被占了致使没法分配更多的请求,测试几乎所有超时。
下面看下此次的测试结果 ,分别是MVC和WebFlux:
WebFlux也是并发略高,性能略好的优点。对于响应时间的分布咱们再来细看下下面的图:
第一个图是MVC版本的响应时间分布,能够看到抖动比第二个图的WebFlux的大很多。
最后来看看测试过程当中MVC的JVM状况(263个线程):
以及WebFlux的(41线程):
咱们来测试一下下面两种状况下对于WebFlux版本Mongodb侧的状况:
class StressTest extends Simulation { val scn = scenario("data").repeat(1000) { exec( http("data") .get("http://localhost:8080/saveData?size=1&length=1000") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) } setUp(scn.inject(atOnceUsers(200))) }
以及
class StressTest extends Simulation { val scn = scenario("data").repeat(1000) { exec( http("data") .get("http://localhost:8080/saveData?size=5&length=1000") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) } setUp(scn.inject(atOnceUsers(200))) }
区别就在远程服务返回的Flux
> db.serverStatus().connections { "current" : 64, "available" : 3212, "totalCreated" : 8899 }
在size为5的时候,Flux返回的是5个对象,使用这个请求压测的时候Mongodb的链接数以下:
> db.serverStatus().connections { "current" : 583, "available" : 2693, "totalCreated" : 10226 }
这是由于Flux拿到的数据直接以响应式进入Mongodb,并无等到全部数据拿到以后串行调用方法。
总结一下这几回的测试,咱们发现WebFlux方式对于MVC方式能有略微的性能提高,对于请求阻塞的时候性能优点明显。我本金的测试并无看到现象中的几倍甚至几十倍的性能提高,我猜缘由以下:
本文咱们建立了WebFlux和MVC两套应用对比演示了简单返回数据、发出远程请求、使用Mongodb等一些简单的应用场景,而后来看了一下ThreadPerRequest和EventLoop方式线程模型的区别,最后使用Gatling进行了几个Case的压力测试而且观察结果。我以为: