三丰 soft张三丰 java
一个能够无需等待被调用函数的返回值就让操做继续进行的方法。web
异步调用就是你 喊 你朋友吃饭 ,你朋友说知道了 ,待会忙完去找你 ,你就去作别的了。同步调用就是你 喊 你朋友吃饭 ,你朋友在忙 ,你就一直在那等,等你朋友忙完了 ,大家一块儿去。spring
jdk并发包里的Future表明了将来的某个结果,当咱们向线程池中提交任务的时候会返回该对象,能够经过future得到执行的结果,可是jdk1.8以前的Future有点鸡肋,并不能实现真正的异步,须要阻塞的获取结果,或者不断的轮询。promise
一般咱们但愿当线程执行完一些耗时的任务后,可以自动的通知咱们结果,很遗憾这在原生jdk1.8以前是不支持的,可是咱们能够经过第三方的库实现真正的异步回调。多线程
public class JavaFuture { public static void main(String[] args) throws Throwable, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(1); Future<String> f = executor.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("task started!"); longTimeMethod(); System.out.println("task finished!"); return "hello"; } }); //此处get()方法阻塞main线程 System.out.println(f.get()); System.out.println("main thread is blocked"); } }
若是想得到耗时操做的结果,能够经过get()方法获取,可是该方法会阻塞当前线程,咱们能够在作完剩下的某些工做的时候调用get()方法试图去获取结果。并发
也能够调用非阻塞的方法isDone来肯定操做是否完成,isDone这种方式有点儿相似下面的过程:app
直到jdk1.8才算真正支持了异步操做,jdk1.8中提供了lambda表达式,使得java向函数式语言又靠近了一步。借助jdk原生的CompletableFuture能够实现异步的操做,同时结合lambada表达式大大简化了代码量。代码例子以下:异步
package netty_promise; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Supplier; public class JavaPromise { public static void main(String[] args) throws Throwable, ExecutionException { // 两个线程的线程池 ExecutorService executor = Executors.newFixedThreadPool(2); //jdk1.8以前的实现方式 CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { System.out.println("task started!"); try { //模拟耗时操做 longTimeMethod(); } catch (InterruptedException e) { e.printStackTrace(); } return "task finished!"; } }, executor); //采用lambada的实现方式 future.thenAccept(e -> System.out.println(e + " ok")); System.out.println("main thread is running"); } }
实现方式相似下图:async
先把longTimeMethod 封装到Spring的异步方法中,这个异步方法的返回值是Future的实例。这个方法必定要写在Spring管理的类中,注意注解@Async。ide
@Service public class AsynchronousService{ @Async public Future springAsynchronousMethod(){ Integer result = longTimeMethod(); return new AsyncResult(result); } }
其余类调用这个方法。这里注意,必定要其余的类,若是在同类中调用,是不生效的。
@Autowired private AsynchronousService asynchronousService; public void useAsynchronousMethod(){ Future future = asynchronousService.springAsynchronousMethod(); future.get(1000, TimeUnit.MILLISECONDS); }
其实Spring只不过在原生的Future中进行了一次封装,咱们最终得到的仍是Future实例。
当咱们须要实现并发、异步等操做时,一般都会使用到ThreadPoolTaskExecutor。
当一个任务被提交到线程池时,首先查看线程池的核心线程是否都在执行任务,否就选择一条线程执行任务,是就执行第二步。查看核心线程池是否已满,不满就建立一条线程执行任务,不然执行第三步。查看任务队列是否已满,不满就将任务存储在任务队列中,不然执行第四步。查看线程池是否已满,不满就建立一条线程执行任务,不然就按照策略处理没法执行的任务。
在ThreadPoolExecutor中表现为:
若是当前运行的线程数小于corePoolSize,那么就建立线程来执行任务(执行时须要获取全局锁)。若是运行的线程大于或等于corePoolSize,那么就把task加入BlockQueue。若是建立的线程数量大于BlockQueue的最大容量,那么建立新线程来执行该任务。若是建立线程致使当前运行的线程数超过maximumPoolSize,就根据饱和策略来拒绝该任务。
public interface TaskDecorator A callback interface for a decorator to be applied to any Runnable about to be executed. Note that such a decorator is not necessarily being applied to the user-supplied Runnable/Callable but rather to the actual execution callback (which may be a wrapper around the user-supplied task). The primary use case is to set some execution context around the task's invocation, or to provide some monitoring/statistics for task execution.
意思就是说这是一个执行回调方法的装饰器,主要应用于传递上下文,或者提供任务的监控/统计信息。看上去正好能够应用于咱们这种场景。多线程的场景下要多注意。
上文中的错误信息涉及到RequestAttributes 和SecurityContext,他们都是经过ThreadLocal来保存线程数据,在同步方法中没有问题,使用线程池异步调用时,咱们能够经过配合线程池的TaskDecorator装饰器拷贝上下文传递。
注意 线程池中的线程是可复用的,使用ThreadLocal须要注意内存泄露问题,因此线程执行完成后须要在finally方法中移除上下文对象。
代码以下
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.TaskDecorator; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.security.core.context.SecurityContext; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.web.context.request.RequestAttributes; import org.springframework.web.context.request.RequestContextHolder; import javax.annotation.Nonnull; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class AsyncConfig implements AsyncConfigurer { @Bean("ttlExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置线程池核心容量 executor.setCorePoolSize(20); // 设置线程池最大容量 executor.setMaxPoolSize(100); // 设置任务队列长度 executor.setQueueCapacity(200); // 设置线程超时时间 executor.setKeepAliveSeconds(60); // 设置线程名称前缀 executor.setThreadNamePrefix("ttl-executor-"); // 设置任务丢弃后的处理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 设置任务的装饰 executor.setTaskDecorator(new ContextCopyingDecorator()); executor.initialize(); return executor; } static class ContextCopyingDecorator implements TaskDecorator { @Nonnull @Override public Runnable decorate(@Nonnull Runnable runnable) { RequestAttributes context = RequestContextHolder.currentRequestAttributes(); SecurityContext securityContext = SecurityContextHolder.getContext(); return () -> { try { RequestContextHolder.setRequestAttributes(context); SecurityContextHolder.setContext(securityContext); runnable.run(); } finally { SecurityContextHolder.clearContext(); RequestContextHolder.resetRequestAttributes(); } }; } } }