咱们先来看下面这个Demo。java
pom.xml中maven依赖:web
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.14.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
启动类SpringBootAsyncApplication.javaspring
@SpringBootApplication @EnableAsync public class SpringBootAsyncApplication { public static void main(String[] args) { SpringApplication.run(SpringBootAsyncApplication.class, args); } }
DemoController.javac#
@RestController @RequestMapping(value = "/demos") @Slf4j public class DemoController { @Autowired private IDemoService demoService; @GetMapping("") public String test(@RequestParam String name) { long start = System.currentTimeMillis(); log.info("start send. ThreadName: {}", Thread.currentThread().getName()); demoService.send(name); long end = System.currentTimeMillis(); log.info("send end, time: {}", (end - start)); return "success"; } }
IDemoService接口实现类DemoServiceImpl.javasegmentfault
@Service @Slf4j public class DemoServiceImpl implements IDemoService { @Async @Override public void send(String name) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("async name={}, ThreadName: {}", name, Thread.currentThread().getName()); } }
启动项目后,访问GET http://127.0.0.1:8002/demos?name=test,获得以下结果:app
2019-04-12 17:17:32.462 INFO 12 --- [nio-8002-exec-1] c.l.d.s.controller.DemoController : start send. ThreadName: http-nio-8002-exec-1 2019-04-12 17:17:32.466 INFO 12 --- [nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either 2019-04-12 17:17:32.467 INFO 12 --- [nio-8002-exec-1] c.l.d.s.controller.DemoController : send end, time: 5 2019-04-12 17:17:37.468 INFO 12 --- [cTaskExecutor-1] c.l.d.s.service.impl.DemoServiceImpl : async name=test, ThreadName: SimpleAsyncTaskExecutor-1
经过控制台日志打印,咱们能够看到有两个线程,一个是主线程,一个是异步的线程。没等异步的线程执行完,主线程就直接执行完毕,返回响应结果了,这就是异步的效果。less
为何会抛出下面这段日志?异步
[nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
没有发现用于异步处理的任务执行器,既没有TaskExecutor类型的bean,也没有名为“taskExecutor”的bean。什么意思?并且异步开启的线程为啥前缀是SimpleAsyncTaskExecutor?带着这些疑问,咱们深刻到源码中,看看到底发生了什么。async
spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.Async.javamaven
/** * Annotation that marks a method as a candidate for <i>asynchronous</i> execution. * Can also be used at the type level, in which case all of the type's methods are * considered as asynchronous. * * <p>In terms of target method signatures, any parameter types are supported. * However, the return type is constrained to either {@code void} or * {@link java.util.concurrent.Future}. In the latter case, you may declare the * more specific {@link org.springframework.util.concurrent.ListenableFuture} or * {@link java.util.concurrent.CompletableFuture} types which allow for richer * interaction with the asynchronous task and for immediate composition with * further processing steps. * * <p>A {@code Future} handle returned from the proxy will be an actual asynchronous * {@code Future} that can be used to track the result of the asynchronous method * execution. However, since the target method needs to implement the same signature, * it will have to return a temporary {@code Future} handle that just passes a value * through: e.g. Spring's {@link AsyncResult}, EJB 3.1's {@link javax.ejb.AsyncResult}, * or {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}. * * @author Juergen Hoeller * @author Chris Beams * @since 3.0 * @see AnnotationAsyncExecutionInterceptor * @see AsyncAnnotationAdvisor */ @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Async { /** * 指定异步操做的限定符值 * A qualifier value for the specified asynchronous operation(s). * <p>May be used to determine the target executor to be used when executing this * method, matching the qualifier value (or the bean name) of a specific * {@link java.util.concurrent.Executor Executor} or * {@link org.springframework.core.task.TaskExecutor TaskExecutor} * bean definition. * <p>When specified on a class level {@code @Async} annotation, indicates that the * given executor should be used for all methods within the class. Method level use * of {@code Async#value} always overrides any value set at the class level. * @since 3.1.2 */ String value() default ""; }
spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.EnableAsync.java
package org.springframework.scheduling.annotation; import java.lang.annotation.Annotation; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.context.annotation.AdviceMode; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.core.Ordered; /** * Enables Spring's asynchronous method execution capability, similar to functionality * found in Spring's {@code <task:*>} XML namespace. * * <p>To be used together with @{@link Configuration Configuration} classes as follows, * enabling annotation-driven async processing for an entire Spring application context: * * <pre class="code"> * @Configuration * @EnableAsync * public class AppConfig { * * }</pre> * * {@code MyAsyncBean} is a user-defined type with one or more methods annotated with * either Spring's {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous} * annotation, or any custom annotation specified via the {@link #annotation} attribute. * The aspect is added transparently for any registered bean, for instance via this * configuration: * * <pre class="code"> * @Configuration * public class AnotherAppConfig { * * @Bean * public MyAsyncBean asyncBean() { * return new MyAsyncBean(); * } * }</pre> * * <p>By default, Spring will be searching for an associated thread pool definition: * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context, * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If * neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} * will be used to process async method invocations. Besides, annotated methods having a * {@code void} return type cannot transmit any exception back to the caller. By default, * such uncaught exceptions are only logged. * * <p>To customize all this, implement {@link AsyncConfigurer} and provide: * <ul> * <li>your own {@link java.util.concurrent.Executor Executor} through the * {@link AsyncConfigurer#getAsyncExecutor getAsyncExecutor()} method, and</li> * <li>your own {@link org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler * AsyncUncaughtExceptionHandler} through the {@link AsyncConfigurer#getAsyncUncaughtExceptionHandler * getAsyncUncaughtExceptionHandler()} * method.</li> * </ul> * * <pre class="code"> * @Configuration * @EnableAsync * public class AppConfig implements AsyncConfigurer { * * @Override * public Executor getAsyncExecutor() { * ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); * executor.setCorePoolSize(7); * executor.setMaxPoolSize(42); * executor.setQueueCapacity(11); * executor.setThreadNamePrefix("MyExecutor-"); * executor.initialize(); * return executor; * } * * @Override * public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { * return MyAsyncUncaughtExceptionHandler(); * } * }</pre> * * <p>If only one item needs to be customized, {@code null} can be returned to * keep the default settings. Consider also extending from {@link AsyncConfigurerSupport} * when possible. * * <p>Note: In the above example the {@code ThreadPoolTaskExecutor} is not a fully managed * Spring bean. Add the {@code @Bean} annotation to the {@code getAsyncExecutor()} method * if you want a fully managed bean. In such circumstances it is no longer necessary to * manually call the {@code executor.initialize()} method as this will be invoked * automatically when the bean is initialized. * * <p>For reference, the example above can be compared to the following Spring XML * configuration: * * <pre class="code"> * {@code * <beans> * * <task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/> * * <task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/> * * <bean id="asyncBean" class="com.foo.MyAsyncBean"/> * * <bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/> * * </beans> * }</pre> * * The above XML-based and JavaConfig-based examples are equivalent except for the * setting of the <em>thread name prefix</em> of the {@code Executor}; this is because * the {@code <task:executor>} element does not expose such an attribute. This * demonstrates how the JavaConfig-based approach allows for maximum configurability * through direct access to actual componentry. * * <p>The {@link #mode} attribute controls how advice is applied: If the mode is * {@link AdviceMode#PROXY} (the default), then the other attributes control the behavior * of the proxying. Please note that proxy mode allows for interception of calls through * the proxy only; local calls within the same class cannot get intercepted that way. * * <p>Note that if the {@linkplain #mode} is set to {@link AdviceMode#ASPECTJ}, then the * value of the {@link #proxyTargetClass} attribute will be ignored. Note also that in * this case the {@code spring-aspects} module JAR must be present on the classpath, with * compile-time weaving or load-time weaving applying the aspect to the affected classes. * There is no proxy involved in such a scenario; local calls will be intercepted as well. * * @author Chris Beams * @author Juergen Hoeller * @author Stephane Nicoll * @author Sam Brannen * @since 3.1 * @see Async * @see AsyncConfigurer * @see AsyncConfigurationSelector */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { /** * Indicate the 'async' annotation type to be detected at either class * or method level. * <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1 * {@code @javax.ejb.Asynchronous} annotation will be detected. * <p>This attribute exists so that developers can provide their own * custom annotation type to indicate that a method (or all methods of * a given class) should be invoked asynchronously. */ Class<? extends Annotation> annotation() default Annotation.class; /** * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed * to standard Java interface-based proxies. * <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>. * <p>The default is {@code false}. * <p>Note that setting this attribute to {@code true} will affect <em>all</em> * Spring-managed beans requiring proxying, not just those marked with {@code @Async}. * For example, other beans marked with Spring's {@code @Transactional} annotation * will be upgraded to subclass proxying at the same time. This approach has no * negative impact in practice unless one is explicitly expecting one type of proxy * vs. another — for example, in tests. */ boolean proxyTargetClass() default false; /** * Indicate how async advice should be applied. * <p><b>The default is {@link AdviceMode#PROXY}.</b> * Please note that proxy mode allows for interception of calls through the proxy * only. Local calls within the same class cannot get intercepted that way; an * {@link Async} annotation on such a method within a local call will be ignored * since Spring's interceptor does not even kick in for such a runtime scenario. * For a more advanced mode of interception, consider switching this to * {@link AdviceMode#ASPECTJ}. */ AdviceMode mode() default AdviceMode.PROXY; /** * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor} * should be applied. * <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run * after all other post-processors, so that it can add an advisor to * existing proxies rather than double-proxy. */ int order() default Ordered.LOWEST_PRECEDENCE; }
@EnableAsync启用Spring异步方法执行功能,相似于Spring的<task:*>XML命名空间,和@Configuration注解类一块儿使用,为整个Spring应用程序上下文启用注释驱动的异步处理。
默认状况下,Spring将会搜索一个关联的线程池定义:要么是一个在上下文中惟一的TaskExecutor类型的bean,要么是一个名叫"taskExecutor"的Executor类型的bean;若是二者都没法解决,SimpleAsyncTaskExecutor将用于处理异步方法调用。此外,具备void返回类型的带注释的方法不能将任何异常传回调用者。默认状况下,此类未捕获异常只会被记录下来。
为了定制全部这些,须要实现AsyncConfigurer类,并重写AsyncConfigurer类中getAsyncExecutor()方法和getAsyncUncaughtExceptionHandler()方法。
@Configuration @EnableAsync public class AppConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor= new ThreadPoolTaskExecutor(); executor.setCorePoolSize(7); executor.setMaxPoolSize(42); executor.setQueueCapacity(11); executor.setThreadNamePrefix("MyExecutor-"); executor.initialize(); return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return MyAsyncUncaughtExceptionHandler(); } }
若是只须要定制一个项目,则能够返回null以保持默认设置,若是可能的话还能够考虑从AsyncConfigurerSupport扩展。
注意:在上面的示例中,ThreadPoolTaskExecutor不是一个彻底受管理Spring bean。若是你想要一个彻底受管理的bean,能够将@Bean注解添加到getAsyncExecutor()方法上;在这种状况下,就没有必要再手动调用executor.initialize()方法进行bean初始化了。
@Bean @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor= new ThreadPoolTaskExecutor(); executor.setCorePoolSize(7); executor.setMaxPoolSize(42); executor.setQueueCapacity(11); executor.setThreadNamePrefix("MyExecutor-"); return executor; }
为了便于参考,能够将上面的示例与下面的Spring XML配置进行比较:
<beans> <task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/> <task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/> <bean id="asyncBean" class="com.foo.MyAsyncBean"/> <bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/> </beans>
上述基于xml和基于Java配置的示例除了设置执行器的线程名前缀外,其他都是等价的,这是由于<task:executor>元素没有暴露这样的属性。这演示了基于Java配置的方法如何经过直接访问实际组件来实现最大的可配置性。
#mode属性控制应用如何被通知:若是mode是AdviceMode#PROXY(默认值),那么其余属性控制代理的行为。请注意,代理模式只容许拦截经过代理进行的调用,同一类内的本地调用不能以这种方式被拦截。
注意,若是#mode被设置为AdviceMode#ASPECTJ,那么#proxyTargetClass属性值将被忽略。还要注意,在这种状况下,spring-aspects模块JAR必须出如今类路径上,编译时或加载时应用切面到受影响的类上。在这种状况下不涉及代理,本地调用也会被拦截。
上面这些都是EnableAsync类注释,具体是如何实现的?且看下面。
有没有发现,EnableAsync类惟一的核心注解就是@Import(AsyncConfigurationSelector.class),咱们来看下它的源码:
spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.AsyncConfigurationSelector.java
/** * Selects which implementation of {@link AbstractAsyncConfiguration} should be used based * on the value of {@link EnableAsync#mode} on the importing {@code @Configuration} class. * * @author Chris Beams * @since 3.1 * @see EnableAsync * @see ProxyAsyncConfiguration */ public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; /** * {@inheritDoc} * @return {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} for * {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, respectively */ @Override public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] { ProxyAsyncConfiguration.class.getName() }; case ASPECTJ: return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME }; default: return null; } } }
根据导入@Configuration类上EnableAsync#mode的值选择AbstractAsyncConfiguration的哪一个实现类应该被使用。
spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.ProxyAsyncConfiguration.java
/** * {@code @Configuration} class that registers the Spring infrastructure beans necessary * to enable proxy-based asynchronous method execution. * * @author Chris Beams * @author Stephane Nicoll * @since 3.1 * @see EnableAsync * @see AsyncConfigurationSelector */ @Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { /** * 定义了一个AsyncAnnotationBeanPostProcessor类bean */ @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); // 新建一个异步注解bean后处理器 AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); // 获取@EnableAsync中用户自定义annotation Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); // 若是不是默认注解,则设置异步注解配置 if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } // 设置线程任务执行器 if (this.executor != null) { bpp.setExecutor(this.executor); } // 设置异常处理器 if (this.exceptionHandler != null) { bpp.setExceptionHandler(this.exceptionHandler); } // 设置是否升级到CGLIB子类代理,默认不开启 bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); // 设置执行优先级,默认最后执行 bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); return bpp; } }
spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.AbstractAsyncConfiguration.java
/** * Abstract base {@code Configuration} class providing common structure for enabling * Spring's asynchronous method execution capability. * * @author Chris Beams * @author Stephane Nicoll * @since 3.1 * @see EnableAsync */ @Configuration public abstract class AbstractAsyncConfiguration implements ImportAware { // 注解属性 protected AnnotationAttributes enableAsync; // 线程任务执行器 protected Executor executor; // 异常处理器 protected AsyncUncaughtExceptionHandler exceptionHandler; @Override public void setImportMetadata(AnnotationMetadata importMetadata) { this.enableAsync = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false)); if (this.enableAsync == null) { throw new IllegalArgumentException( "@EnableAsync is not present on importing class " + importMetadata.getClassName()); } } /** * Collect any {@link AsyncConfigurer} beans through autowiring. */ @Autowired(required = false) void setConfigurers(Collection<AsyncConfigurer> configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer.getAsyncExecutor(); this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler(); } }
能够看到接口org.springframework.scheduling.annotation.AsyncConfigurer的惟一实现类org.springframework.scheduling.annotation.AsyncConfigurerSupport:
/** * A convenience {@link AsyncConfigurer} that implements all methods * so that the defaults are used. Provides a backward compatible alternative * of implementing {@link AsyncConfigurer} directly. * * @author Stephane Nicoll * @since 4.1 */ public class AsyncConfigurerSupport implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { return null; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; } }
这就是上面注释讲的,能够经过实现AsyncConfigurer接口实现默认线程池和异常处理的定制化。
回到ProxyAsyncConfiguration类,这是一个Configuration类,在其中经过@bean注入了AsyncAnnotationBeanPostProcessor 类。
AsyncAnnotationBeanPostProcessor这个类的Bean初始化时,重写了BeanFactoryAware接口setBeanFactory方法,对AsyncAnnotationAdvisor异步注解切面进行了构造。
@Override public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; }
它会在普通bean属性以后、初始化回调(如InitializingBean#afterPropertiesSet() 或者一个自定义初始化方法)以前被调用。
AsyncAnnotationBeanPostProcessor的后置bean处理是经过其父类AbstractAdvisingBeanPostProcessor来实现的,该类实现了BeanPostProcessor接口,重写postProcessAfterInitialization方法。
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean) { // Ignore AOP infrastructure such as scoped proxies. return bean; } //把Advisor添加进bean ProxyFactory-》AdvisedSupport-》Advised if (bean instanceof Advised) { Advised advised = (Advised) bean; if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) { // Add our local Advisor to the existing proxy's Advisor chain... if (this.beforeExistingAdvisors) { advised.addAdvisor(0, this.advisor); } else { advised.addAdvisor(this.advisor); } return bean; } } //构造ProxyFactory代理工厂,添加代理的接口,设置切面,最后返回代理类:AopProxy if (isEligible(bean, beanName)) { ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this.advisor); customizeProxyFactory(proxyFactory); return proxyFactory.getProxy(getProxyClassLoader()); } // No async proxy needed. return bean; }
JDK动态代理类JdkDynamicAopProxy实现AopProxy接口,最终执行的是InvocationHandler接口的invoke方法。
/** * Implementation of {@code InvocationHandler.invoke}. * <p>Callers will see exactly the exception thrown by the target, * unless a hook method throws an exception. */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { MethodInvocation invocation; Object oldProxy = null; boolean setProxyContext = false; TargetSource targetSource = this.advised.targetSource; Class<?> targetClass = null; Object target = null; try { if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) { // The target does not implement the equals(Object) method itself. return equals(args[0]); } else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) { // The target does not implement the hashCode() method itself. return hashCode(); } else if (method.getDeclaringClass() == DecoratingProxy.class) { // There is only getDecoratedClass() declared -> dispatch to proxy config. return AopProxyUtils.ultimateTargetClass(this.advised); } else if (!this.advised.opaque && method.getDeclaringClass().isInterface() && method.getDeclaringClass().isAssignableFrom(Advised.class)) { // Service invocations on ProxyConfig with the proxy config... return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args); } Object retVal; if (this.advised.exposeProxy) { // Make invocation available if necessary. oldProxy = AopContext.setCurrentProxy(proxy); setProxyContext = true; } // May be null. Get as late as possible to minimize the time we "own" the target, // in case it comes from a pool. target = targetSource.getTarget(); if (target != null) { targetClass = target.getClass(); } // Get the interception chain for this method. List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass); // Check whether we have any advice. If we don't, we can fallback on direct // reflective invocation of the target, and avoid creating a MethodInvocation. if (chain.isEmpty()) { // We can skip creating a MethodInvocation: just invoke the target directly // Note that the final invoker must be an InvokerInterceptor so we know it does // nothing but a reflective operation on the target, and no hot swapping or fancy proxying. Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse); } else { // We need to create a method invocation... invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain); // Proceed to the joinpoint through the interceptor chain. retVal = invocation.proceed(); } // Massage return value if necessary. Class<?> returnType = method.getReturnType(); if (retVal != null && retVal == target && returnType != Object.class && returnType.isInstance(proxy) && !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) { // Special case: it returned "this" and the return type of the method // is type-compatible. Note that we can't help if the target sets // a reference to itself in another returned object. retVal = proxy; } else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) { throw new AopInvocationException( "Null return value from advice does not match primitive return type for: " + method); } return retVal; } finally { if (target != null && !targetSource.isStatic()) { // Must have come from TargetSource. targetSource.releaseTarget(target); } if (setProxyContext) { // Restore old proxy. AopContext.setCurrentProxy(oldProxy); } } }
@Async注解的拦截器是AsyncExecutionInterceptor,它继承了MethodInterceptor接口。而MethodInterceptor就是AOP规范中的Advice(切点的处理器)。chain不为空,执行第二个分支,构造ReflectiveMethodInvocation,而后执行proceed方法。
@Override public Object proceed() throws Throwable { // We start with an index of -1 and increment early. if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) { return invokeJoinpoint(); } Object interceptorOrInterceptionAdvice = this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex); if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) { // Evaluate dynamic method matcher here: static part will already have // been evaluated and found to match. InterceptorAndDynamicMethodMatcher dm = (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice; if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) { return dm.interceptor.invoke(this); } else { // Dynamic matching failed. // Skip this interceptor and invoke the next in the chain. return proceed(); } } else { // It's an interceptor, so we just invoke it: The pointcut will have // been evaluated statically before this object was constructed. return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this); } }
核心方法是InterceptorAndDynamicMethodMatcher.interceptor.invoke(this),实际就是执行了AsyncExecutionInterceptor.invoke。
/** * Intercept the given method invocation, submit the actual calling of the method to * the correct task executor and return immediately to the caller. * @param invocation the method to intercept and make asynchronous * @return {@link Future} if the original method returns {@code Future}; {@code null} * otherwise. */ @Override public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } Callable<Object> task = new Callable<Object>() { @Override public Object call() throws Exception { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; } }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
spring-aop-4.3.18.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionAspectSupport.java
/** * Determine the specific executor to use when executing the given method. * Should preferably return an {@link AsyncListenableTaskExecutor} implementation. * @return the executor to use (or {@code null}, but just if no default executor is available) */ protected AsyncTaskExecutor determineAsyncExecutor(Method method) { AsyncTaskExecutor executor = this.executors.get(method); if (executor == null) { Executor targetExecutor; String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } else { targetExecutor = this.defaultExecutor; if (targetExecutor == null) { synchronized (this.executors) { if (this.defaultExecutor == null) { this.defaultExecutor = getDefaultExecutor(this.beanFactory); } targetExecutor = this.defaultExecutor; } } } if (targetExecutor == null) { return null; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } return executor; }
@Aync注解有个value能够标注使用哪一个executor,这里的getExecutorQualifier就是寻找这个标识。这里若是defaultExecutor为null的话,则获取找默认的executor。
/** * Retrieve or build a default executor for this advice instance. * An executor returned from here will be cached for further use. * <p>The default implementation searches for a unique {@link TaskExecutor} bean * in the context, or for an {@link Executor} bean named "taskExecutor" otherwise. * If neither of the two is resolvable, this implementation will return {@code null}. * @param beanFactory the BeanFactory to use for a default executor lookup * @return the default executor, or {@code null} if none available * @since 4.2.6 * @see #findQualifiedExecutor(BeanFactory, String) * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME */ protected Executor getDefaultExecutor(BeanFactory beanFactory) { if (beanFactory != null) { try { // Search for TaskExecutor bean... not plain Executor since that would // match with ScheduledExecutorService as well, which is unusable for // our purposes here. TaskExecutor is more clearly designed for it. return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { logger.debug("Could not find unique TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.debug("Could not find default TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { logger.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } return null; }
若是工程里头没有定义默认的task executor的话,则获取bean的时候会抛出NoSuchBeanDefinitionException。这就是为何上面例子中会抛出以下错误的缘由。
[nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
spring-aop-4.3.18.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionInterceptor.java
/** * This implementation searches for a unique {@link org.springframework.core.task.TaskExecutor} * bean in the context, or for an {@link Executor} bean named "taskExecutor" otherwise. * If neither of the two is resolvable (e.g. if no {@code BeanFactory} was configured at all), * this implementation falls back to a newly created {@link SimpleAsyncTaskExecutor} instance * for local use if no default could be found. * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME */ @Override protected Executor getDefaultExecutor(BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); }
AsyncExecutionInterceptor重写了getDefaultExecutor方法,先调用AsyncExecutionAspectSupport的getDefaultExecutor,若是默认的找不到,这里new一个SimpleAsyncTaskExecutor。这也是为何上面例子中出现SimpleAsyncTaskExecutor线程前缀的缘由。
总体流程大致可梳理为两条线:
1.从注解开始:@EnableAsync--》ProxyAsyncConfiguration类构造一个bean(类型:AsyncAnnotationBeanPostProcessor)
2.从AsyncAnnotationBeanPostProcessor这个类的bean的生命周期走:AOP-Advisor切面初始化(setBeanFactory())--》AOP-生成代理类AopProxy(postProcessAfterInitialization())--》AOP-切点执行(InvocationHandler.invoke)
参考
https://segmentfault.com/a/1190000011339882
https://blog.csdn.net/qq_39470742/article/details/83382338
https://blog.csdn.net/fenglongmiao/article/details/82429460
https://www.baeldung.com/spring-async