朱晔和你聊Spring系列S1E5:Spring WebFlux小探

本文会来作一些应用对比Spring MVC和Spring WebFlux,观察线程模型的区别,而后作一下简单的压力测试。前端

建立一个传统的Spring MVC应用

先来建立一个新的webflux-mvc的模块:java

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>me.josephzhu</groupId>
    <artifactId>spring101-webflux-mvc</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring101-webflux-mvc</name>
    <description></description>

    <parent>
        <groupId>me.josephzhu</groupId>
        <artifactId>spring101</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
复制代码

而后在项目里定义一个咱们会使用到的POJO:react

package me.josephzhu.spring101webfluxmvc;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "mydata")
public class MyData {
    @Id
    private String id;
    private String payload;
    private long time;
}
复制代码

这里的@Document和@Id是为Mongodb服务的,咱们定义了MyData将会以mydata做为Collection的名字,而后id字段是Document的Id列。 而后咱们来建立Controller,在这个Controller里面咱们尝试三种不一样的操做:web

  1. Sleep 100ms的纯获取数据的方法。从请求中得到length参数做为payload字符串的长度,从请求中得到size参数做为MyData的个数。咱们在以后的测试过程当中能够随意调节这两个参数来调整咱们的数据量。
  2. 从Mongodb获取数据的方法,获取到数据后直接返回。
  3. 复合逻辑。先走HTTP请求从data方法获取数据,而后把数据保存进入Mongodb,最后返回这些数据。
package me.josephzhu.spring101webfluxmvc;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;

import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@RestController
public class MyController {
    @Autowired
    private RestTemplate restTemplate;
    @Autowired
    private MyRepository myRepository;

    @GetMapping("/data")
    public List<MyData> getData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {

        }
        String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining());
        return IntStream.rangeClosed(1, size)
                .mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis()))
                .collect(Collectors.toList());
    }

    @GetMapping("/dbData")
    public List<MyData> getDbData() {
        return myRepository.findAll();
    }

    @GetMapping("/saveData")
    public List<MyData> saveData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length){
        UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://localhost:8080/data")
                .queryParam("size", size)
                .queryParam("length", length);
        ResponseEntity<List<MyData>> responseEntity =
                restTemplate.exchange(builder.toUriString(),
                        HttpMethod.GET, null, new ParameterizedTypeReference<List<MyData>>() {});
        return responseEntity.getBody().stream().map(myRepository::save).collect(Collectors.toList());
    }
}
复制代码

注意,在这里咱们使用了Java 8的Steam来作一些操做避免使用for循环:spring

  1. 经过length参数构建payload(payload由length个字符a构成)。
  2. 经过size参数构建MyData的List。
  3. 在RestTemplate获取到MyData的List后,把每个对象交由myRepository的save方法来处理,而后统一收集返回结果。

这些Stream的代码都是同步处理,也不涉及外部IO,和非阻塞没有任何关系,只是方便代码编写。为了让代码能够运行,咱们还须要继续来配置下Mongodb的Repository:mongodb

package me.josephzhu.spring101webfluxmvc;

import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface MyRepository extends MongoRepository<MyData, String> { }
复制代码

由于咱们没有用到复杂的查询,在代码里只是用到了findAll方法,因此这里咱们无需定义额外的方法,只是声明接口便可。 最后,咱们建立主应用程序,顺便配置一下Mongodb和RestTemplate:数据库

package me.josephzhu.spring101webfluxmvc;

import com.mongodb.MongoClientOptions;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
@Configuration
public class Spring101WebfluxMvcApplication {

   @Bean
   MongoClientOptions mongoClientOptions(){
       return MongoClientOptions.builder().connectionsPerHost(1000).build();
   }

    @Bean
    public RestTemplate restTemplate(RestTemplateBuilder builder) {
        return builder.build();
    }

    public static void main(String[] args) {
        SpringApplication.run(Spring101WebfluxMvcApplication.class, args);
    }
}
复制代码

这里咱们配置了Mongodb客户端使得以后在进行压力测试的时候能有超过100个链接链接到Mongodb,不然会出现没法获取链接的问题。apache

建立WebFlux版本的应用

如今咱们再来新建一个webflux模块:编程

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>me.josephzhu</groupId>
    <artifactId>spring101-webflux</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring101-webflux</name>
    <description></description>

    <parent>
        <groupId>me.josephzhu</groupId>
        <artifactId>spring101</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
复制代码

这里能够注意到,咱们引入了webflux这个starter以及data-mongodb-reactive这个starter。在以前的Spring MVC项目中,咱们引入的是mvc和data-mongodb两个starter。 而后,咱们一样须要建立一下MyData类(代码和以前如出一辙,这里省略)。 最关键的一步,咱们来建立三个Controller方法的定义:json

package me.josephzhu.spring101webflux;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.springframework.web.reactive.function.server.ServerResponse.ok;

@Component
public class MyHandler {
    @Autowired
    private MyReactiveRepository myReactiveRepository;

    public Mono<ServerResponse> getData(ServerRequest serverRequest) {
        int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10"));
        int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100"));

        String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining());
        Flux<MyData> data = Flux.fromStream(IntStream.rangeClosed(1, size)
                .mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis()))).delaySequence(Duration.ofMillis(100));

        return ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(data, MyData.class);
    }

    public Mono<ServerResponse> getDbData(ServerRequest serverRequest) {
        Flux<MyData> data = myReactiveRepository.findAll();
        return ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(data, MyData.class);
    }

    public Mono<ServerResponse> saveData(ServerRequest serverRequest) {
        int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10"));
        int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100"));

        Flux<MyData> data = WebClient.create().get()
                .uri(builder -> builder
                        .scheme("http")
                        .host("localhost")
                        .port(8080)
                        .path("data")
                        .queryParam("size", size)
                        .queryParam("length", length)
                        .build())
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToFlux(MyData.class)
                .flatMap(myReactiveRepository::save);

        return ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(data, MyData.class);
    }

}
复制代码

这里要说明几点:

  1. 在WebFlux中,咱们能够采用传统的@Controller方式来定义Controller,也能够采用函数式方式来声明对外的Endpoint,也就是声明Handler+Router。咱们这里采用的是更有特点的后者来演示。
  2. 请你比较一下三个方法的实现对于两个版本的区别。最主要的区别,咱们返回的实际数据是Mono<>和Flux<>,分别表明0~1个对象和0~N对象的响应式流。
  3. 在saveData方法中,对于Spring MVC咱们使用的是阻塞的RestTemplate来从远端获取数据,对于Spring WebFlux咱们使用的是非阻塞的WebClient来获取数据。获取数据后,咱们直接使用flatMap获取到了全部的MyData转给咱们的响应式的Mongodb Repository来处理数据。
  4. 对于saveData方法中插入Mongodb的操做,这里和MVC的例子有很大的不一样须要注意。在MVC中,咱们把远程服务返回的结果转为Stream数据流,同步依次调用save方法,整个过程只会有占用一个Mongodb的链接。而在这里,直接对Flux流进行了Map,整个过程至关于并发进行了Mongodb的调用。在以后作压测的时候,咱们会再次提到这点。

刚才有提到,采用函数式声明对外的Endpoint的话除了定义Handler,还须要配置Router来和Handler关联,配置以下:

package me.josephzhu.spring101webflux;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration
public class RouterConfig {
    @Autowired
    private MyHandler myHandler;

    @Bean
    public RouterFunction<ServerResponse> config() {
        return route(GET("/data"), myHandler::getData)
                .andRoute(GET("/dbData"), myHandler::getDbData)
                .andRoute(GET("/saveData"), myHandler::saveData);
    }
}
复制代码

这段代码没有太多须要说明,这里咱们定义了三个GET请求(至关于MVC的@GetMapping),而后对应到注入的myHandler的三个方法上。 而后咱们还须要建立Mongodb的Repository:

package me.josephzhu.spring101webflux;

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface MyReactiveRepository extends ReactiveMongoRepository<MyData, String> { }
复制代码

以及配置和启动类:

package me.josephzhu.spring101webflux;

import com.mongodb.ConnectionString;
import com.mongodb.async.client.MongoClientSettings;
import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ConnectionPoolSettings;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@SpringBootApplication
@Configuration
public class Spring101WebfluxApplication {

    @Bean
    MongoClient mongoClient(){
        return MongoClients.create(mongoClientSettings());
    }

    @Bean
    MongoClientSettings mongoClientSettings(){
        return MongoClientSettings.builder()
                .clusterSettings(ClusterSettings.builder().applyConnectionString(new ConnectionString("mongodb://localhost")).build())
                .connectionPoolSettings(ConnectionPoolSettings.builder().minSize(200).maxSize(1000).maxWaitQueueSize(1000000).build())
                .build();
    }

    public static void main(String[] args) {
        SpringApplication.run(Spring101WebfluxApplication.class, args);
    }
}
复制代码

这里对Mongodb作了一些配置,主要也是但愿放大链接池这块的默认限制,为从此的压测服务。注意,在这里配置的Bean是com.mongodb.reactivestream.client下的MongoClient,以下图所示,还有其它两个MongoClient,若是修改了不匹配的MongoClient的话是不会有做用的,我在这个坑里躺了两小时。

完成后能够打开浏览器测试一下接口:

Spring MVC仍是WebFlux?

下图是官网的一个图说明了二者的关系,而后官网也给出了一些建议:

  1. 若是你如今的Spring MVC运行的没啥问题的话就别改了,有大量的类库可使用,实现简单易于理解。
  2. 若是你但愿实现轻量级的,函数式Web框架,那么能够考虑WebFlux的函数Web端点。
  3. 若是你依赖阻塞的持久化API好比JPA和JDBC那么也就只能选择Spring MVC了。目前对于非阻塞的JDBC实现有一些早期的项目在探索,可是没有到能够上生产的成熟度。
  4. 在Spring MVC应用程序中进行远程调用也是可使用响应式的WebClient的。Spring MVC也可使用其它的响应式组件。每次调用延迟越厉害受益越大。
  5. 对于大型应用程序要考虑到非阻塞方式实现的学习曲线。最简单的起步方式就是使用WebClient,彻底切换到非阻塞须要花时间熟悉函数式声明式的编程API。
    官方的意思也是能够在一些小引用上尝试WebFlux,对于大型应用不建议冒然转到WebFlux。

观察线程模型

咱们知道对于阻塞的实现方式,咱们采用线程池来服务请求(线程池中的会维护一组普通的线程,线程池只是节省线程建立的时间),对于每个请求的处理,至始至终都是在一个线程中进行,若是处理的过程当中咱们须要访问外部的网络或数据库,那么线程就处于阻塞状态,这个线程没法服务其它请求,若是当时还有更多的并发的话,就须要建立更多的线程来服务其它请求。这种实现方式是很是简单的,应对压力的增加扩容方式也是粗暴的,那就是增长更多线程。

对于非阻塞的方式,采用的是EventLoop的方式,IO操做的时候是不占用工做线程的,所以只会建立一组和CPU核数至关的工做线程用于工做处理(NodeJS甚至是单线程的,这种就更危险了,就那么一个工做线程,一旦被长时间占用其它请求都没法处理)。因为整个处理过程当中IO请求不占用线程时间,线程不会阻塞等待,再增长超过CPU核数的工做线程也是没有意义的(只会白白增长线程切换的开销)。对于这种方式在压力增加后,由于咱们不须要增长额外的线程,也就没有了绝对的瓶颈。

试想一下在阻塞模型下,对于5000的并发,并且每个并发阻塞的时间很是长,那么咱们其实须要5000个线程来服务(这么多线程99%其实都是在等待,属于空耗系统资源),建立5000的线程不谈其它的,若是线程栈大小是1M的话就须要5GB的内存。对于非阻塞的线程模型在8核机器上仍是8个工做线程,内存占用仍是这么小,能够以最小的开销应对大并发,系统的损耗不多。非阻塞的Reactive模式是内耗很是小的模式,可是这是有代价的,在实现上咱们须要确保处理过程当中没有阻塞产生,不然就会浪费宝贵的数目固定的工做线程,也就是说咱们须要依赖配套的非阻塞IO类库来使用。 在默认状况下tomcat的工做线程池初始化为10,最大200,咱们经过启动本文建立的Spring101WebfluxMvcApplication程序,用jvisualvm工具来看下初始的状况(35个线程) :

在项目的application.properties文件中咱们配置tomcat的最大线程数: server.tomcat.max-threads=250 在压力的状况下,咱们再来观察一下线程的状况(272个线程):
的确是建立多达250个工做线程。这里看到大部分线程都在休眠,由于咱们这里运行的是刚才的data()方法,在方法内咱们休眠了100毫秒。对于一样的压力,咱们再来看一下Spring101WebfluxApplication程序的线程状况(44个线程):
能够看到用于处理HTTP的Reactor线程只有8个,和本机CPU核数量一致(下面有十个Thread打头的线程是处理和Mongodb交互的,忽略),只须要这8个线程处理HTTP请求足以,由于HTTP请求的IO处理不会占用线程。

使用Gatling进行压力测试

咱们可使用Gatling类库进行压力测试,我我的感受比Jmeter方便。配置很简单,首先咱们要安装Scala的SDK,而后咱们新建一个模块:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>me.josephzhu</groupId>
    <artifactId>spring101-webstresstest</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring101-webstresstest</name>
    <description></description>

    <dependencies>
        <dependency>
            <groupId>io.gatling.highcharts</groupId>
            <artifactId>gatling-charts-highcharts</artifactId>
            <version>2.3.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>io.gatling</groupId>
                <artifactId>gatling-maven-plugin</artifactId>
                <version>2.2.4</version>
                <configuration>
                    <simulationClass>me.josephzhu.spring101.webstresstest.StressTest</simulationClass>
                    <resultsFolder>/Users/zyhome/gatling</resultsFolder>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
复制代码

引入了garling的maven插件,在这里配置了测试结果输出路径以及压测的类。接下去建立一下这个Scala测试类:

package me.josephzhu.spring101.webstresstest

import io.gatling.core.Predef._
import io.gatling.core.scenario.Simulation
import io.gatling.http.Predef._

class StressTest extends Simulation {

  val scn = scenario("data").repeat(1000) {
    exec(
      http("data")
        .get("http://localhost:8080/data?size=10&length=1000")
        .header("Content-Type", "application/json")
        .check(status.is(200)).check(substring("payload")))
  }

  setUp(scn.inject(atOnceUsers(200)))
}
复制代码

这段代码定义了以下的测试行为:

  1. 声明一个data测试场景,重复进行1000次测试,发起一个远程调用,验证调用结果的响应状态码是200而且返回的结果包含字符串payload。
  2. 测试启动的时候直接压上去200个用户,每个用户运行完这1000次测试后结束了,因此这种方式一开始会是200用户到测试最后阶段用户数会慢慢减小。固然还有其它一些测试方式(好比慢慢递增用户的方式),详见官网:
nothingFor(4 seconds), // 1
    atOnceUsers(10), // 2
    rampUsers(10) over (5 seconds), // 3
    constantUsersPerSec(20) during (15 seconds), // 4
    constantUsersPerSec(20) during (15 seconds) randomized, // 5
    rampUsersPerSec(10) to 20 during (10 minutes), // 6
    rampUsersPerSec(10) to 20 during (10 minutes) randomized, // 7
    splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy (10 seconds), // 8
    splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy atOnceUsers(30), // 9
    heavisideUsers(1000) over (20 seconds) // 10
复制代码

压力测试一

先来进行第一个测试,1000并发对data接口进行100次循环(还记得吗,接口有100ms休眠or延迟的):

class StressTest extends Simulation {

  val scn = scenario("data").repeat(100) {
    exec(
      http("mvc data")
        .get("http://localhost:8080/data?size=10&length=1000")
        .header("Content-Type", "application/json")
        .check(status.is(200)).check(substring("payload")))
  }

  setUp(scn.inject(atOnceUsers(1000)))
}
复制代码

下面两个图分别是MVC和WebFlux的测试结果(由于都是8080端口,因此测试的时候记得切换重启两个应用哦):

能够看到WebFlux的吞吐几乎是MVC的翻倍,平均响应时间少了两倍不止,很明显,在等待的时候,2000个并发用户大大超过了咱们配置的250个线程池的线程数量,这个时候只能排队,对于非阻塞的方式,延迟是不会占用处理线程的,在延迟结束后才会去占用处理线程的资源进行处理,不会收到并发用户数受限于线程池线程数的状况。 咱们把Sleep相关代码注释再进行一次测试看看状况,分别是MVC和WebFlux:
这个时候WebFlux优点没有那么明显了。

性能测试二

如今咱们来访问一下http://localhost:8080/saveData?size=100&length=1000 接口往Mongodb来初始化100条数据,而后修改一下测试脚本压测dbData接口: class StressTest extends Simulation {

val scn = scenario("data").repeat(100) { exec( http("data") .get("http://localhost:8080/dbData") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) }

setUp(scn.inject(atOnceUsers(1000))) } 下面看下此次的测试结果 ,分别是MVC和WebFlux:

吞吐量没有太多提升,平均响应时间快很多。

性能测试三

再来试一下第三个saveData接口的状况。修改测试代码:

class StressTest extends Simulation {

  val scn = scenario("data").repeat(100) {
    exec(
      http("data")
        .get("http://localhost:8080/saveData?size=5&length=100000")
        .header("Content-Type", "application/json")
        .check(status.is(200)).check(substring("payload")))
  }

  setUp(scn.inject(atOnceUsers(200)))
}
复制代码

这里咱们修改并发用户为200,每一个用户进行100次测试,每次测试存入Mongodb 5条100KB的数据,一次测试后总数据量在10万条。此次测试咱们并无使用1000并发用户,缘由是这个测试咱们会先从远端获取数据而后再存入Mongodb,远端的服务也是来自于当前应用程序,咱们的Tomcat最多只有250个线程,在启动1000个用户后,一些线程服务于saveData接口,一些线程服务于data接口(saveData接口用到的),这样至关于形成了循环依赖问题,请求在等待更多的可用线程执行服务data接口的响应,而这个时候线程又都被占了致使没法分配更多的请求,测试几乎所有超时。 下面看下此次的测试结果 ,分别是MVC和WebFlux:

WebFlux也是并发略高,性能略好的优点。对于响应时间的分布咱们再来细看下下面的图:

第一个图是MVC版本的响应时间分布,能够看到抖动比第二个图的WebFlux的大很多。 最后来看看测试过程当中MVC的JVM状况(263个线程):
以及WebFlux的(41线程):

性能测试四:

咱们来测试一下下面两种状况下对于WebFlux版本Mongodb侧的状况:

class StressTest extends Simulation {

  val scn = scenario("data").repeat(1000) {
    exec(
      http("data")
        .get("http://localhost:8080/saveData?size=1&length=1000")
        .header("Content-Type", "application/json")
        .check(status.is(200)).check(substring("payload")))
  }

  setUp(scn.inject(atOnceUsers(200)))
}
复制代码

以及

class StressTest extends Simulation {

  val scn = scenario("data").repeat(1000) {
    exec(
      http("data")
        .get("http://localhost:8080/saveData?size=5&length=1000")
        .header("Content-Type", "application/json")
        .check(status.is(200)).check(substring("payload")))
  }

  setUp(scn.inject(atOnceUsers(200)))
}
复制代码

区别就在远程服务返回的Flux是1个仍是5个。在1个的时候运行测试能够看到咱们Mongodb有64个链接(须要把以前链接池的配置最小设置为小一点,好比50):

> db.serverStatus().connections
{ "current" : 64, "available" : 3212, "totalCreated" : 8899 }
复制代码

在size为5的时候,Flux返回的是5个对象,使用这个请求压测的时候Mongodb的链接数以下:

> db.serverStatus().connections
{ "current" : 583, "available" : 2693, "totalCreated" : 10226 }
复制代码

这是由于Flux拿到的数据直接以响应式进入Mongodb,并无等到全部数据拿到以后串行调用方法。 总结一下这几回的测试,咱们发现WebFlux方式对于MVC方式能有略微的性能提高,对于请求阻塞的时候性能优点明显。我本金的测试并无看到现象中的几倍甚至几十倍的性能提高,我猜缘由以下:

  1. 本机有性能瓶颈了,压测客户端、Mongodb服务器、服务端都在本机运行,干扰因素太多,CPU的使用你争我夺,测试不公平
  2. 测试的时候CPU永远是100%还死机好几回,我根本没法测试更高的并发,没法彻底把非阻塞的性能压出来
  3. 我本机测试的时候走的是localhost而不是内网,不通过物理网卡,可能没法体现非阻塞的性能

若是有条件可使用三台独立服务器在内网进行1万以上并发用户的性能测试或许能够获得更科学的结果。

总结

本文咱们建立了WebFlux和MVC两套应用对比演示了简单返回数据、发出远程请求、使用Mongodb等一些简单的应用场景,而后来看了一下ThreadPerRequest和EventLoop方式线程模型的区别,最后使用Gatling进行了几个Case的压力测试而且观察结果。我以为:

  1. 非阻塞模型确定是好东西,在IO压力和IO延迟很大的状况下,非阻塞模型由于不须要更多的线程,内耗小,性能略好,并且也稳定,因此更利于高并发
  2. WebFlux的函数式和声明方式实现须要有很高的API熟悉使用门槛,对于复杂的逻辑这种方式的实现比回调地狱更容易绕晕,并且容易产生Bug(或许之后有可能响应式的编程在API上有可能和传统方式进行统一)
  3. 目前和WebFlux配套的其它一些Reactive的库还不是很全面成熟,要对复杂的业务逻辑全面启用响应式编程有点难,阻塞调用不是不能在WebFlux中混用,可是这种方式仍是采用了线程池来处理,如今容器也是NIO的了,有又多大区别
  4. 采用阻塞方式实现,由阻塞的线程进行自然背压进行流控,非阻塞方式很直接一竿子到底,从外部请求直接到最底层存储,须要作好流控,这是很是容易产生问题的一个点,当请求的处理无需经过线程来承载的时候,前端压力会直通最底层数据源,不收任何扩容方面的限制,直接击溃底层
  5. 对于阻塞的方式,多线程的调度自然就是一个任务的负载均衡,并不会出现太严重的卡死工做线程的问题,非阻塞应用编程咱们要有意识代码在哪一个线程上运行,若是是reactor线程的话千万不能长时间阻塞

综上所述,使用WebFlux进行响应式编程我我的认为目前只适合作类IO转发的高并发的又看中资源使用效率的应用场景(好比Gateway网关服务),对于复杂的业务逻辑不太适合,在90%的状况下响应式的编程模型和线程模型不会享受大幅性能优点,更不建议盲目把现有的应用使用WebFlux来重写。固然,这确定是一个会持续发展的方向,能够先接触研究起来。

相关文章
相关标签/搜索