目录html
Java多线程相关说明java
异步调用分为两类web
多线程调用方法spring
多线程线程池apache
Springboot实现线程池配置tomcat
@Async注解app
Java多线程相关说明
异步调用分为两类
- 有返回值:用户须要Callable、Futrue等来接收最终的处理结果,但这个过程是异步非阻塞的。
- 无返回值:用户提交请求到接口以后不须要任何返回值,请求到达服务端以后就没有任何关系了,用户能够不等待而去作其余的业务操做。
多线程调用方法
- Callable:有返回值的线程方法,call 将会对用户请求作出结果反馈。
- Runnable:线程的接口,须要实现run方法。
- Thread:经过Thread类继承重写run方法实现。
多线程线程池
- Executor(Executors) 提供多种线程调度的线程池方案。
- AsyncTaskExecutor 异步线程池方案。
- ThreadPoolTaskExecutor 任务线程池方案。
注意:线程池合理分配能够实现服务器CPU高效利用,初始化大小,最大大小,排队队列的大小都会影响系统业务吞吐量,能够使用Jmeter压测接口来验证。切记:接口异步了并不表明全部的任务线程池都能吸纳,若是排队满了,线程池会抛异常,因此为了不抛异常咱们在提交任务到线程池的时候须要加一个本地队列,切不可将全部请求都怼到线程池。线程池的大小有限制,早晚会到达这个边界,到达边界就会异常,因此应该避免。
Springboot实现线程池配置
AsyncTaskExecutor
异步任务线程池,这个从名字上看一眼就知道是Executor的系列接口:
public interface AsyncTaskExecutor extends TaskExecutor { long TIMEOUT_IMMEDIATE = 0L; long TIMEOUT_INDEFINITE = 9223372036854775807L; void execute(Runnable var1, long var2); Future<?> submit(Runnable var1); <T> Future<T> submit(Callable<T> var1); } @FunctionalInterface public interface TaskExecutor extends Executor { void execute(Runnable var1); } /** @since 1.5 * @author Doug Lea */ public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); }
在Springboot中能够做为配置使用:
@Configuration public class AsyncTaskExecutorConfig extends WebMvcConfigurerAdapter { @Override public void configureAsyncSupport(AsyncSupportConfigurer configurer) { configurer.setDefaultTimeout(-1); configurer.setTaskExecutor(asyncTaskExecutor()); } @Bean("asyncTaskExecutor") public AsyncTaskExecutor asyncTaskExecutor() { return new SimpleAsyncTaskExecutor("async"); } }
ThreadPoolTaskExecutor
这个类是比较经常使用的有界任务线程池。在Springboot中也做为配置类使用:
@Configuration public class ThreadPoolTaskExecutorConfig { @Value("${executors.threadPoolTaskExecutor.corePoolSize:10}") private Integer corePoolSize; @Value("${executors.threadPoolTaskExecutor.maxPoolSize:500}") private Integer maxPoolSize; @Value("${executors.threadPoolTaskExecutor.queueCapacity:500}") private Integer queueCapacity; @Value("${executors.threadPoolTaskExecutor.keepAliveSeconds:60}") private Integer keepAliveSeconds; @Value("${executors.threadPoolTaskExecutor.allowCoreThreadTimeOut:true}") private Boolean allowCoreThreadTimeOut; @Value("${executors.threadPoolTaskExecutor.threadNamePrefix:ThreadPoolTaskExecutor}") private String threadNamePrefix; @Value("${executors.threadPoolTaskExecutor.awaitTerminationSeconds:10}") private Integer awaitTerminationSeconds; @Value("${executors.threadPoolTaskExecutor.rejectedExecutionHandler:AbortPolicy}") private String rejectedExecutionHandler; /** * 线程池配置【不能将全部任务都提交到线程会产生线程排队,即便线程排队足够大也不能这样处理】 * * @return */ @Bean("taskExecutor") public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); // 核心线程数 taskExecutor.setCorePoolSize(corePoolSize); // 最大线程数 taskExecutor.setMaxPoolSize(maxPoolSize); // 队列容量大小 taskExecutor.setQueueCapacity(queueCapacity); // 是否容许核心线程超时 taskExecutor.setAllowCoreThreadTimeOut(allowCoreThreadTimeOut); // 线程保活时间 taskExecutor.setKeepAliveSeconds(keepAliveSeconds); // 线程命名前缀规则 taskExecutor.setThreadNamePrefix(threadNamePrefix); // 等待终止时间(秒) taskExecutor.setAwaitTerminationSeconds(awaitTerminationSeconds); /** * @see https://blog.csdn.net/pfnie/article/details/52755769 * 线程池满了以后如何处理:默认是 new AbortPolicy(); * * (1) ThreadPoolExecutor.AbortPolicy 处理程序遭到拒绝将抛出运行时RejectedExecutionException; * (2) ThreadPoolExecutor.CallerRunsPolicy 线程调用运行该任务的 execute 自己。此策略提供简单的反馈控制机制,可以减缓新任务的提交速度 * (3) ThreadPoolExecutor.DiscardPolicy 不能执行的任务将被删除; * (4) ThreadPoolExecutor.DiscardOldestPolicy 若是执行程序还没有关闭,则位于工做队列头部的任务将被删除,而后重试执行程序(若是再次失败,则重复此过程)。 */ switch (rejectedExecutionHandler){ case "AbortPolicy": taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); break; case "CallerRunsPolicy": taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); break; case "DiscardPolicy": taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); break; case "DiscardOldestPolicy": taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); break; } // 初始化线程池 taskExecutor.initialize(); return taskExecutor; } }
上面的参数配置的比较详细了,这个类全部参数基本都在上面了很简单。后面的示例用的就是该线程池。
Rest风格的异步请求接口
@Async注解
示例以下:
@Slf4j @RestController @RequestMapping("/test") public class AsyncController { /** * HTTP心跳发送接口 * * @return */ @RequestMapping("/async/heartBeat.do") @Async public void heartBeat(XhyPosition position) { long start = System.currentTimeMillis(); try { //mobileService.savePositionBySync(position); log.info("HTTP心跳发送成功!"); } catch (Exception e) { e.printStackTrace(); } finally { long end = System.currentTimeMillis(); log.info("HTTP心跳耗时=" + (end - start)); } } }
须要在Springboot启动入口添加@EnableAsync注解。
Callable异步返回值
Callable 类型的返回Spring在底层已经作了代理进行处理,不须要经过线程池提交。
@Slf4j @RestController @RequestMapping("/test") public class CallableController { @Autowired MobileWebService mobileService; /** * HTTP心跳发送接口 * * @return */ @RequestMapping("/callable/heartBeat.do") public Callable<ResponseEntity> heartBeat(XhyPosition position) { Callable<ResponseEntity> responseEntityCallable = new Callable<ResponseEntity>() { @Override public ResponseEntity call() throws Exception { //mobileService.savePositionBySync(position); log.info("HTTP心跳发送成功!"); return ResponseEntity.ok("HTTP心跳发送成功!"); } }; return responseEntityCallable; } }
但值得注意的是一样的写法,没有返回值的回调线程方法是不执行的,须要经过线程池进行提交。
/** * HTTP心跳发送接口 * * @return */ @RequestMapping("/callable/heartBeat2.do") public void heartBeat2(XhyPosition position) { Callable<ResponseEntity> responseEntityCallable = new Callable<ResponseEntity>() { @Override public ResponseEntity call() throws Exception { //mobileService.savePositionBySync(position); log.info("HTTP心跳发送成功!"); return ResponseEntity.ok("HTTP心跳发送成功!"); } }; // 必须进行手动提交 threadPoolExecutor.submit(responseEntityCallable); }
DeferredResult
DeferredResult是一个结果的封装类型,Spring MVC推荐使用DeferredResult、Callable、CompletableFuture的返回值来实现异步接口调用。下面是本节示例:
@RestController @RequestMapping("/test") public class DeferredResultController { /** * HTTP心跳发送接口 * * @return */ @RequestMapping("/deferred/heartBeat.do") public DeferredResult<StatusCode> heartBeat(XhyPosition position) { DeferredResult<StatusCode> deferredResult = new DeferredResult<>(); deferredResult.setResult(StatusCode.SUCCESS); return deferredResult; } }
WebAsyncTask
WebAsyncTask提供了多环节的回调操做,能够知足大多数需求,但Spring并不推荐此种方式的异步。
/** * @Copyright: 2019-2021 * @FileName: WebAsyncTaskController.java * @Author: PJL * @Date: 2020/4/14 10:04 * @Description: 异步接口调用之WebAsyncTask[操做完善:但Spring不推荐使用,推荐使用:DeferredResultController、Callable、CompletableFuture] */ @Slf4j @RestController @RequestMapping("/test") public class WebAsyncTaskController { /** * 异步线程超时设置 */ @Value("${system.asyncRequestTimeout:10}") private Integer asyncRequestTimeout; @Autowired MobileWebService mobileService; @Autowired AggregationRedisService aggregationRedisService; @Qualifier("asyncTaskExecutor") @Autowired AsyncTaskExecutor executor; /** * HTTP[异步请求]心跳发送接口 * @return */ @RequestMapping("/webAsyncTask/blank.do") public WebAsyncTask<StatusCode> blank(){ WebAsyncTask webAsyncTask= new WebAsyncTask(asyncRequestTimeout*1000L,()-> { return StatusCode.SUCCESS; }); return webAsyncTask; } /** * HTTP[异步请求]心跳发送接口 * @return */ @RequestMapping("/webAsyncTask/heartBeat.do") public WebAsyncTask<StatusCode> heartBeat(XhyPosition position){ WebAsyncTask webAsyncTask= new WebAsyncTask(asyncRequestTimeout*1000L,()-> { position.setT(System.currentTimeMillis()); MobileServiceDataQueue.addToPositionQueue(position); return StatusCode.SUCCESS; }); // 处理完成异步回调 webAsyncTask.onCompletion(()->{ log.debug("HTTP心跳发送........完成!"); // aggregationRedisService.publish(Constants.MOBILE_POSITION_FINISH_UPLOAD,"HTTP心跳处理完成!"); }); // 超时处理 webAsyncTask.onTimeout(()->{ //log.debug("HTTP心跳发送........超时!"); return StatusCode.TIMEOUT; }); // 错误处理 webAsyncTask.onError(()->{ log.debug("HTTP心跳发送........错误!"); return StatusCode.ERROR; }); return webAsyncTask; } /** * HTTP[线程池]心跳发送接口 * @return */ @RequestMapping("/webAsyncTask/heartBeatWithThreadPool.do") public WebAsyncTask<StatusCode> heartBeatWithThreadPool(XhyPosition position){ WebAsyncTask webAsyncTaskByPool= new WebAsyncTask(asyncRequestTimeout*1000L,executor,()-> { position.setT(System.currentTimeMillis()); MobileServiceDataQueue.addToPositionQueue(position); return StatusCode.SUCCESS; }); // 处理完成异步回调 webAsyncTaskByPool.onCompletion(()->{ log.debug("HTTP心跳发送....by pool....完成!"); // aggregationRedisService.publish(Constants.MOBILE_POSITION_FINISH_UPLOAD,"HTTP心跳处理完成!"); }); // 超时处理 webAsyncTaskByPool.onTimeout(()->{ log.debug("HTTP心跳发送........超时!"); return StatusCode.TIMEOUT; }); // 错误处理 webAsyncTaskByPool.onError(()->{ log.debug("HTTP心跳发送........错误!"); return StatusCode.ERROR; }); return webAsyncTaskByPool; } }
多线程异步并发异常
cannot dispatch without an AsyncContext:
[org.apache.catalina.core.AsyncListenerWrapper] java.lang.IllegalArgumentException: Cannot dispatch without an AsyncContext at org.springframework.util.Assert.notNull(Assert.java:198) at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.dispatch(StandardServletAsyncWebRequest.java:131) at org.springframework.web.context.request.async.WebAsyncManager.setConcurrentResultAndDispatch(WebAsyncManager.java:391) at org.springframework.web.context.request.async.WebAsyncManager.lambda$startCallableProcessing$2(WebAsyncManager.java:315) at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.lambda$onError$0(StandardServletAsyncWebRequest.java:146) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.onError(StandardServletAsyncWebRequest.java:146) at org.apache.catalina.core.AsyncListenerWrapper.fireOnError(AsyncListenerWrapper.java:49) at org.apache.catalina.core.AsyncContextImpl.setErrorState(AsyncContextImpl.java:410) at org.apache.catalina.connector.CoyoteAdapter.asyncDispatch(CoyoteAdapter.java:239) at org.apache.coyote.AbstractProcessor.dispatch(AbstractProcessor.java:242) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:53) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:861) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1579) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:748)
由于HTTP底层包含了TCP的链接,客户端发送Jmeter方法请求会分配系统临时端口用于客户端请求处理。Jmeter 接口请求参数keep_alive须要去掉勾选否则端口不会及时释放。
这个问题能够总结为是:Spring容器异步分发工做机制客户端与服务端在链接断开时无法处理异步请求的目标servlet没法分发所致。这个问题不是程序自己业务代码报错,能够忽略。
模拟Servlet异步请求分发
参考:https://www.concretepage.com/java-ee/jsp-servlet/asynccontext-example-in-servlet-3
package com.concretepage.servlet; import java.io.IOException; import java.io.PrintWriter; import javax.servlet.AsyncContext; import javax.servlet.ServletRequest; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @WebServlet(asyncSupported = true, value = "/AsyncContextExample", loadOnStartup = 1) public class AsyncContextExample extends HttpServlet { private static final long serialVersionUID = 1L; public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException{ doGet(request,response); } public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException{ response.setContentType("text/html"); PrintWriter out = response.getWriter(); AsyncContext asyncContext = request.startAsync(); asyncContext.setTimeout(0); ServletRequest servReq = asyncContext.getRequest(); boolean b = servReq.isAsyncStarted(); out.println("isAsyncStarted : "+b); asyncContext.dispatch("/asynctest.jsp"); out.println("<br/>asynchronous task finished."); } }