本文基于Spring Cloud Finchley SR4mysql
本文经过几个问题,解析下Spring WebFlux用法最佳实践,并与另外一框架Vertx做对比web
是能够的,这样的依赖是可行的(容器用tomcat和undertow或者其余均可以,这里使用undertow):spring
这个问题,除此运用像WebFlux和Vertx的框架的人,都会对这个有误解。认为仅仅简单的把webFlux的依赖添加进来,以后接口返回Mono就实现了异步背压的Reactor模型。实际上并非这样的。 咱们来举几个例子,分步骤深刻了解下。 首先为了测试方便,咱们将web容器的处理http请求线程池的大小改为惟一一个,对于Tomcat,配置:sql
server.thread.max-thread=1
对于UnderTow(咱们这里用的是underTow):数据库
# 设置IO线程数, 它主要执行非阻塞的任务,它们会负责多个链接, 默认设置每一个CPU核心一个线程 server.undertow.io-threads=1 # 阻塞任务线程池, 当执行相似servlet请求阻塞IO操做, undertow会从这个线程池中取得线程 # 它的值设置取决于系统线程执行任务的阻塞系数,默认值是IO线程数*8 server.undertow.worker-threads=1
以后,配置Log4J2日志格式为:api
<Property name="springAppName">test</Property> <Property name="LOG_ROOT">log</Property> <Property name="LOG_DATEFORMAT_PATTERN">yyyy-MM-dd HH:mm:ss.SSS</Property> <Property name="LOG_EXCEPTION_CONVERSION_WORD">%xwEx</Property> <Property name="LOG_LEVEL_PATTERN">%5p</Property> <Property name="logFormat"> %d{${LOG_DATEFORMAT_PATTERN}} ${LOG_LEVEL_PATTERN} [${springAppName},%X{X-B3-TraceId},%X{X-B3-SpanId}] [${sys:PID}] [%t][%C:%L]: %m%n${sys:LOG_EXCEPTION_CONVERSION_WORD} </Property>
这样的格式可使咱们看到线程号,还有sleuth的traceId和spanId(咱们的项目依赖了sleuth)。 首先编写测试代码,看看直接简单调用并just是否实现了异步背压:缓存
@Log4j2 @RestController public class TestController { @Autowired private TestService testService; @RequestMapping("/test") public Mono<String> test() { log.info("test started"); return Mono.just(testService.simulateIOTest()); } @Service public static class TestService { public String simulateIOTest() { try { //simulate io log.info("simulate start"); TimeUnit.SECONDS.sleep(5); log.info("simulate end"); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; } } }
并发调用接口,查看日志,发现:tomcat
2019-11-12 09:05:41.595 INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:05:41.596 INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start 2019-11-12 09:05:46.598 INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end 2019-11-12 09:05:46.635 INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:05:46.635 INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start 2019-11-12 09:05:51.636 INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end 2019-11-12 09:05:51.643 INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:05:51.643 INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start 2019-11-12 09:05:56.644 INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end
能够明显看出,请求是串行处理的,由于只有一个线程,而且这个线程还在等待请求处理完。这就不符合Reactor模型,处理http请求的线程XNIO-2 task-1应该不等待请求处理完而直接处理下一个请求才对。 把Mono.just(testService.simulateIOTest())
替换成Mono.fromCallable(() -> testService.simulateIOTest())
等等相似的是同样的效果,这里必须本身用其余的线程池,去处理实际请求,处理结束的时候,将结果填写到最外层的Mono里面。这样的话,考虑到代码整洁性不采用纯回调写法,要求每个调用方法返回的都是Future类型的。这里咱们返回CompletableFuture。并发
@Log4j2 @RestController public class TestController { @Autowired private TestService testService; @RequestMapping("/test") public Mono<String> test() { log.info("test started"); return Mono.create(stringMonoSink -> testService.simulateIOTest().thenApply(s -> { log.info("apply"); //填写成功结果 stringMonoSink.success(s); return s; })); } @Service public static class TestService { public CompletableFuture<String> simulateIOTest() { return CompletableFuture.supplyAsync(() -> { try { //simulate io log.info("simulate start"); TimeUnit.SECONDS.sleep(5); log.info("simulate end"); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }); } } }
结果是:app
2019-11-12 09:18:03.457 INFO [test,8d6eddc9cc80612f,8d6eddc9cc80612f] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:18:03.458 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start 2019-11-12 09:18:04.155 INFO [test,c654462e159fd43e,c654462e159fd43e] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:18:04.156 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start 2019-11-12 09:18:04.962 INFO [test,8366a95d002ca25a,8366a95d002ca25a] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:18:04.963 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start 2019-11-12 09:18:05.756 INFO [test,5f851d9e2ef49f14,5f851d9e2ef49f14] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:18:05.757 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start 2019-11-12 09:18:08.459 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end 2019-11-12 09:18:08.459 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply 2019-11-12 09:18:09.156 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end 2019-11-12 09:18:09.156 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply 2019-11-12 09:18:09.964 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end 2019-11-12 09:18:09.964 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply 2019-11-12 09:18:10.757 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end 2019-11-12 09:18:10.757 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply
这样,才真正实现了Reactor模型。
CompletableFuture能够指定线程池,亦能够不指定。若是像上面不指定的话,那么使用的线程池就是Java8以后会默认启动一个大小为CPU核数减一的CommonForkJoinPool去执行。须要指定的话,基本上每一个方法均可以额外传入一个线程池做为参数。
最佳实践是,只要涉及到IO的,就交给不一样的线程池去作,不一样种类的IO的线程池不一样。例如,用于数据库IO的线程池,用于RPC的线程池,用于缓存访问的线程池等等。
这里还有一个问题存在,就是异步调用,致使spanId和traceId丢失了,例如上面的例子:
2019-11-12 09:18:03.457 INFO [test,8d6eddc9cc80612f,8d6eddc9cc80612f] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:18:03.458 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
8d6eddc9cc80612f
这个丢失了,致使微服务调用链日志追踪变得不可行,因此,这里咱们对于异步的代码,也须要在异步调用前强制设置下spanId和traceId。
综上以后,修改的代码是:
@Log4j2 @RestController public class TestController { @Autowired private TestService testService; @RequestMapping("/test") public Mono<String> test() { log.info("test started"); return Mono.fromFuture(testService.simulateIOTest()); } @Service public static class TestService { @Autowired private Tracer tracer; ThreadFactory build = (new ThreadFactoryBuilder()).setNameFormat("test_service_executor-%d").build(); private ExecutorService executorService = new ThreadPoolExecutor(50, 50, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(131072), build, new ThreadPoolExecutor.AbortPolicy()); public CompletableFuture<String> simulateIOTest() { Span span = tracer.currentSpan(); return CompletableFuture.supplyAsync(() -> { try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) { //simulate io log.info("simulate start"); TimeUnit.SECONDS.sleep(5); log.info("simulate end"); return "hello"; } catch (Exception e) { throw new RuntimeException(e); } }, executorService); } } }
结果是:
2019-11-12 09:44:30.953 INFO [test,bc1ba4169e037577,bc1ba4169e037577] [2796] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:28]:test started 2019-11-12 09:44:30.991 INFO [test,bc1ba4169e037577,bc1ba4169e037577] [2796] [test_service_executor-0][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start 2019-11-12 09:44:35.991 INFO [test,bc1ba4169e037577,bc1ba4169e037577] [2796] [test_service_executor-0][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
实际上,从设计上看,基本思路是同样的。对于任意一个IO操做,若是有原生的异步客户端(返回是一个Future),则运用Future封装交给其余线程池处理,不影响http请求线程接受其余请求。
主要区别在于:
WorkerExecutor
类。