前端的宝宝会用ajax,用异步编程到快乐的不行~ 咱们java也有异步,用起来比他们还快乐~ 咱们bia~ji~一个注(gǒupí)解(gāoyào)
,也是快乐风男...前端
注册一个用户,给他的帐户初始化积分(也能够想象成注册奖励),再给用户发个注册通知短信,再发个邮件,(只是举栗子,切莫牛角大法
),这样一个流程,短信和邮件我以为彻底能够拆分出来,不必拖在在主流程上来(再补充上事务[ACID:原子性,一致性,隔离性,持久性
]就行了): java
个人首发原创博客地址:你的@Async就真的异步吗 ☞ 异步历险奇遇记 里面有gitHub项目地址,关注我,里面实战多多哦~git
看code:ajax
@Service
public class UserServiceImpl implements UserService {
@Autowired
UserService userService;
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public int save(UserDTO userDTO) {
User user = new User();
BeanCopyUtils.copy(userDTO, user);
int insert = userMapper.insert(user);
System.out.println("User 保存用户成功:" + user);
userService.senMsg(user);
userService.senEmail(user);
return insert;
}
@Async
@Override
public Boolean senMsg(User user) {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("发送短信中:.....");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "给用户id:" + user.getId() + ",手机号:" + user.getMobile() + "发送短信成功");
return true;
}
@Async
@Override
public Boolean senEmail(User user) {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println("发送邮件中:.....");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "给用户id:" + user.getId() + ",邮箱:" + user.getEmail() + "发送邮件成功");
return true;
}
复制代码
结果:启动不起来,Spring循环依赖问题。 Spring不是解决了循环依赖问题吗,它是支持循环依赖的呀?怎么会呢?spring
不能否认,在这以前我也是这么坚信的,假若你目前也和我有同样坚挺的想法,那就让异常UnsatisfiedDependencyException
,has been injected into other beans [userServiceImpl] in its raw version as part of a circular reference,
,来鼓励你,拥抱你, 就是这么的不给面子,赤裸裸的circular reference
。数据库
谈到Spring Bean的循环依赖,有的小伙伴可能比较陌生,毕竟开发过程当中好像对循环依赖这个概念无感知。其实否则,你有这种错觉,那是由于你工做在Spring的襁褓中,从而让你“高枕无忧”~ 其实咱们的代码中确定被咱们写了循环依赖,好比像这样:编程
@Service
public class AServiceImpl implements AService {
@Autowired
private BService bService;
...
}
@Service
public class BServiceImpl implements BService {
@Autowired
private AService aService;
...
}
复制代码
经过实验总结出,出现使用@Async致使循环依赖问题的必要条件:springboot
- 已开启@EnableAsync的支持
- @Async注解所在的Bean被循环依赖了
那么既然不能循环依赖,咱们就不循环依赖,咱们这么来:bash
@Service
public class UserServiceImpl implements UserService {
@Autowired
UserMapper userMapper;
@Autowired
SendService sendService;
@Override
@Transactional()
public int save(UserDTO userDTO) {
User user = new User();
BeanCopyUtils.copy(userDTO, user);
int insert = userMapper.insert(user);
System.out.println("User 保存用户成功:" + user);
this.senMsg(user);
this.senEmail(user);
return insert;
}
@Async
@Override
public Boolean senMsg(User user) {
System.out.println(Thread.currentThread().getName() + "给用户id:" + user.getId() + ",手机号:" + user.getMobile() + "发送短信成功");
return true;
}
@Async
@Override
public Boolean senEmail(User user) {
System.out.println(Thread.currentThread().getName() + "给用户id:" + user.getId() + ",邮箱:" + user.getEmail() + "发送邮件成功");
return true;
}
复制代码
结果咱们测试了几把,我打印一下结果:并发
2019-08-05 21:59:32.304 INFO 14360 --- [nio-8080-exec-3] com.alibaba.druid.pool.DruidDataSource : {dataSource-1} inited
2019-08-05 21:59:32.346 DEBUG 14360 --- [nio-8080-exec-3] c.b.lea.mybot.mapper.UserMapper.insert : ==> Preparing: insert into t_user (username, sex, mobile,email) values (?, ?, ?,?)
2019-08-05 21:59:32.454 DEBUG 14360 --- [nio-8080-exec-3] c.b.lea.mybot.mapper.UserMapper.insert : ==> Parameters: 王麻子(String), 男(String), 18820158833(String), 22qq@qq.com(String)
2019-08-05 21:59:32.463 DEBUG 14360 --- [nio-8080-exec-3] c.b.lea.mybot.mapper.UserMapper.insert : <== Updates: 1
User 保存用户成功:User(id=101, username=王麻子, mobile=18820158833, email=22qq@qq.com, sex=男, password=123435, createTime=Mon Aug 05 12:20:51 CST 2019, updateTime=null)
发送短信中:.....
http-nio-8080-exec-3给用户id:101,手机号:18820158833发送短信成功
发送邮件中:.....
http-nio-8080-exec-3给用户id:101,邮箱:22qq@qq.com发送邮件成功
复制代码
这不白瞎了吗?感知不到个人爱,白写了,难受~~线程依然是http-nio-8080-exec-3
,那么为何了呢? 下面会讲的哦,先说结论:
经过实验总结出,出现使用@Async致使异步失效的缘由:
- 在本类中使用了异步是不支持异步的
- 调用者实际上是this,是当前对象,不是真正的代理对象
userService
,spring没法截获这个方法调用 因此不在不在本类中去调用,网上的解决方法有applicationContext.getBean(UserService.class)
和AopContext.currentProxy()
那么,既然不能用当前对象,那咱们用代理,AopContext.currentProxy()
,然鹅,你发现,报错了,对Cannot find current proxy: Set 'exposeProxy' property on Advised to 'true' to make it available.
就他:
@EnableAspectJAutoProxy(exposeProxy = true)
复制代码
我也这么搞,可是又报错了,细细的看报错内容,才发现少了个jar包这东西要配合切面织入,要配合,懂吗?,来上包
<!-- https://mvnrepository.com/artifact/org.aspectj/aspectjweaver -->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.2</version>
</dependency>
复制代码
再来看为撒: 这是一个性感的话题,exposeProxy = true
它的做用就是启用切面的自动代理,说人话就是暴露当前代理对象到当前线程绑定, 看个报错Cannot find current proxy: Set 'exposeProxy' property on Advised to 'true' to make it available.
就是AopContext搞得鬼.
public final class AopContext {
private static final ThreadLocal<Object> currentProxy = new NamedThreadLocal<>("Current AOP proxy");
private AopContext() {
}
// 该方法是public static方法,说明能够被任意类进行调用
public static Object currentProxy() throws IllegalStateException {
Object proxy = currentProxy.get();
// 它抛出异常的缘由是当前线程并无绑定对象
// 而给线程绑定对象的方法在下面:特别有意思的是它的访问权限是default级别,也就是说只能Spring内部去调用
if (proxy == null) {
throw new IllegalStateException("Cannot find current proxy: Set 'exposeProxy' property on Advised to 'true' to make it available.");
}
return proxy;
}
// 它最有意思的地方是它的访问权限是default的,表示只能给Spring内部去调用
// 调用它的类有CglibAopProxy和JdkDynamicAopProxy
@Nullable
static Object setCurrentProxy(@Nullable Object proxy) {
Object old = currentProxy.get();
if (proxy != null) {
currentProxy.set(proxy);
} else {
currentProxy.remove();
}
return old;
}
}
复制代码
因此咱们要作启用代理设置,让代理生效,来走起,主线程的方法使用来调用异步方法,来测试走起: no code said niao:
@Service
public class UserServiceImpl implements UserService {
@Autowired
UserMapper userMapper;
@Override@Transactional(propagation = Propagation.REQUIRED)
public int save(UserDTO userDTO) {
User user = new User();
BeanCopyUtils.copy(userDTO, user);
int insert = userMapper.insert(user);
System.out.println("User 保存用户成功:" + user);
UserService currentProxy = UserService.class.cast(AopContext.currentProxy());
currentProxy.senMsg(user);
currentProxy.senEmail(user);
int i = 1 / 0;
return insert;
}
@Async @Override @Transactional(propagation = Propagation.REQUIRES_NEW)
public void senMsg(User user) {
user.setUsername(Thread.currentThread().getName()+"发短信测试事务...."+ new Random().nextInt());
userMapper.insert(user);
System.out.println(Thread.currentThread().getName() + "给用户id:" + user.getId() + ",手机号:" + user.getMobile() + "发送短信成功");
}
@Async @Override @Transactional(propagation = Propagation.REQUIRES_NEW)
public void senEmail(User user) {
user.setUsername("发邮件测试事务...."+ new Random().nextInt());
userMapper.insert(user);
int i = 1 / 0;
System.out.println(Thread.currentThread().getName() + "给用户id:" + user.getId() + ",邮箱:" + user.getEmail() + "发送邮件成功");
}
}
复制代码
测试结果:
1. 异步线程SimpleAsyncTaskExecutor-1和SimpleAsyncTaskExecutor-2分别发短信和邮件,主线程保存用户
2. 实际结果,主线程保存的那个用户失败,名字叫'发邮件'的也保存失败,只有叫'发送短信'的用户插入成功
3. 那么就作到了事务的线程隔离,事务的互不影响,完美
4. 亲,你这么写了吗?这么写不优美,虽然有用,可是写的另辟蹊径啊
复制代码
来,咱们看个更骚气的,异步中嵌套异步,来上code:
@Service
public class UserServiceImpl implements UserService {
@Autowired
UserMapper userMapper;
@Override@Transactional(propagation = Propagation.REQUIRED)
public int save(UserDTO userDTO) {
User user = new User();
BeanCopyUtils.copy(userDTO, user);
int insert = userMapper.insert(user);
System.out.println("User 保存用户成功:" + user);
UserService currentProxy = UserService.class.cast(AopContext.currentProxy());
currentProxy.send(user);
return insert;
}
@Async @Override @Transactional(propagation = Propagation.REQUIRES_NEW)
public void send(User user) {
//发短信
user.setUsername(Thread.currentThread().getName()+"发短信测试事务...."+ new Random().nextInt());
userMapper.insert(user);
System.out.println(Thread.currentThread().getName() + "给用户id:" + user.getId() + ",手机号:" + user.getMobile() + "发送短信成功");
//发邮件
UserService currentProxy = UserService.class.cast(AopContext.currentProxy());
currentProxy.senEmail(user);
}
@Async @Override @Transactional(propagation = Propagation.REQUIRES_NEW)
public void senEmail(User user) {
user.setUsername("发邮件测试事务...."+ new Random().nextInt());
userMapper.insert(user);
System.out.println(Thread.currentThread().getName() + "给用户id:" + user.getId() + ",邮箱:" + user.getEmail() + "发送邮件成功");
}
}
复制代码
看咱们猜下结果? 数据库会新增几个数据?3个?2个?1个?0个?纳尼报错?
哈哈``` 上结果:
Set 'exposeProxy' property on Advised to 'true'
磨人的小妖精
经过实验总结出,出现致使异步嵌套使用失败的缘由:
- 在本类中使用了异步嵌套异步是不支持的
- 调用者其实被代理过一次了,再嵌套会出现'二次代理',实际上是达不到代理了效果,由于已经异步了.即时你在嵌套中不使用代理去获取,即便不保存,可是事务和异步效果都会跟随当前的代理,即嵌套的效果是达不到再次异步的.
- 解决办法应该有,可是我以为我还没找到.这个写法是咱们应该规避的,咱们应该遵循规范,启用新的服务类去完成咱们的异步工做
下面咱们举个栗子:正确的写法,优雅的写法
@Service
public class UserServiceImpl implements UserService {
@Autowired
UserMapper userMapper;
@Autowired
SendService sendService;
@Override@Transactional(propagation = Propagation.REQUIRED)
public int save(UserDTO userDTO) {
User user = new User();
BeanCopyUtils.copy(userDTO, user);
int insert = userMapper.insert(user);
System.out.println("User 保存用户成功:" + user);
sendService.senMsg(user);
sendService.senEmail(user);
return insert;
}
}
---------------无责任分割线--------------------
@Service
public class SendServiceImpl implements SendService {
@Override
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
public Boolean senMsg(User user) {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("发送短信中:.....");
} catch (InterruptedException e) {
e.printStackTrace();
}
int i = 1 / 0;
System.out.println(Thread.currentThread().getName() + "给用户id:" + user.getId() + ",手机号:" + user.getMobile() + "发送短信成功");
return true;
}
@Async
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public Boolean senEmail(User user) {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println("发送邮件中:.....");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "给用户id:" + user.getId() + ",邮箱:" + user.getEmail() + "发送邮件成功");
return true;
}
}
复制代码
结果确定完美:
所以当你看到你同事就在本类写个方法标注上@Async而后调用,请制止他吧,作的无用功~~~(关键本身还觉得有用,这是最可怕的深坑~)
那我补充点: @EnableAspectJAutoProxy(exposeProxy = true)
的做用: 此注解它导入了AspectJAutoProxyRegistrar,最终设置此注解的两个属性的方法为:
public abstract class AopConfigUtils {
...正在加(sheng)载(lue)代码中 请稍后....
public static void forceAutoProxyCreatorToUseClassProxying(BeanDefinitionRegistry registry) {
if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) {
BeanDefinition definition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME);
definition.getPropertyValues().add("proxyTargetClass", Boolean.TRUE);
}
}
public static void forceAutoProxyCreatorToExposeProxy(BeanDefinitionRegistry registry) {
if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) {
BeanDefinition definition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME);
definition.getPropertyValues().add("exposeProxy", Boolean.TRUE);
}
}
}
复制代码
看到此注解标注的属性值最终都被设置到了internalAutoProxyCreator
身上,也就是它:自动代理建立器。 首先咱们须要明晰的是:@Async的代理对象并非由自动代理建立器来建立的,而是由AsyncAnnotationBeanPostProcessor
一个单纯的BeanPostProcessor
实现的,很显然当执行AopContext.currentProxy()这句代码的时候报错了。 @EnableAsync
给容器注入的是AsyncAnnotationBeanPostProcessor
,它用于给@Async
生成代理,可是它仅仅是个BeanPostProcessor
并不属于自动代理建立器,所以exposeProxy = true
对它无效。 因此AopContext.setCurrentProxy(proxy);
这个set
方法确定就不会执行,因此,所以,但凡只要业务方法中调用AopContext.currentProxy()
方法就铁定抛异常~~
看嘛,发短信实际上是一些网关调用,我想写个看短信,邮件发送成功的标志,是否调用成功的状态,来走起
....省略...UserService
---------------无责任分割线--------------------
@Service
public class SendServiceImpl implements SendService {
@Override
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean senMsg(User user) {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("发送短信中:.....");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "给用户id:" + user.getId() + ",手机号:" + user.getMobile() + "发送短信成功");
return true;
}
@Async
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean senEmail(User user) {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println("发送邮件中:.....");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "给用户id:" + user.getId() + ",邮箱:" + user.getEmail() + "发送邮件成功");
return true;
}
}
复制代码
瞪大眼睛看,个人返回结果是boolean,属于基本类型,虽然没有用,可是报错了:
org.springframework.aop.AopInvocationException: Null return value from advice does not match primitive return type for:
public boolean com.boot.lea.mybot.service.impl.SendServiceImpl.senMsg(com.boot.lea.mybot.entity.User)
复制代码
致使个人数据库一条数据都没有,影响到主线程了,可见问题发生在主线程触发异步线程的时候,那咱们找缘由: 是走代理触发的:我先找这个类 CglibAopProxy
再顺藤摸瓜
/** * Process a return value. Wraps a return of {@code this} if necessary to be the * {@code proxy} and also verifies that {@code null} is not returned as a primitive. */
private static Object processReturnType(Object proxy, Object target, Method method, Object retVal) {
// Massage return value if necessary
if (retVal != null && retVal == target &&
!RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
// Special case: it returned "this". Note that we can't help
// if the target sets a reference to itself in another returned object.
retVal = proxy;
}
Class<?> returnType = method.getReturnType();
if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
throw new AopInvocationException(
"Null return value from advice does not match primitive return type for: " + method);
}
return retVal;
}
复制代码
在这retVal == null && returnType != Void.TYPE && returnType.isPrimitive()
,由于咱们的这种异步实际上是不支持友好的返回结果的,咱们的结果应该是void,由于这个异步线程被主线程触发后其实被当作一个任务提交到Spring的异步的一个线程池中进行异步的处理任务了,线程之间的通讯是不能之间返回的,其实用这种写法咱们就应该用void去异步执行,不要有返回值,并且咱们的返回值是isPrimitive()
,是基本类型,恰好达标....
那么我大声喊出,使用异步的时候尽可能不要有返回值,实在要有你也不能用基本类型.
有些人就是难受,就是想要返回结果,那么也是能够滴:可是要借助Furtrue
小姐姐的get()
来进行线程之间的阻塞通讯,毕竟小姐姐⁄(⁄ ⁄•⁄ω⁄•⁄ ⁄)⁄害羞.
酱紫写,你就能够阻塞等到执行任务有结果的时候去获取真正的结果了,这个写法和我以前的文章 JAVA并发异步编程 原来十个接口的活如今只须要一个接口就搞定!是同样的道理了
import com.boot.lea.mybot.service.AsyncService;
import com.boot.lea.mybot.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
@Service
@Async("taskExecutor")
public class AsyncServiceImpl implements AsyncService {
@Autowired
private UserService userService;
@Override
public Future<Long> queryUserMsgCount(final Long userId) {
System.out.println("当前线程:" + Thread.currentThread().getName() + "=-=====queryUserMsgCount");
long countByUserId = userService.countMsgCountByUserId(userId);
return new AsyncResult<>(countByUserId);
}
@Override
public Future<Long> queryCollectCount(final Long userId) {
System.out.println("当前线程:" + Thread.currentThread().getName() + "=-====queryCollectCount");
long collectCount = userService.countCollectCountByUserId(userId);
return new AsyncResult<>(collectCount);
}
复制代码
@Async的异步:
奸笑...嘻嘻嘻嘻...
)。SpringBoot环境中,要使用@Async注解,咱们须要先在启动类上加上@EnableAsync注解。这个与在SpringBoot中使用@Scheduled注解须要在启动类中加上@EnableScheduling是同样的道理(固然你使用古老的XML配置也是能够的,可是在SpringBoot环境中,建议的是全注解开发),具体原理下面会分析。加上@EnableAsync注解后,若是咱们想在调用一个方法的时候开启一个新的线程开始异步操做,咱们只须要在这个方法上加上@Async注解,固然前提是,这个方法所在的类必须在Spring环境中。
示例:非spingboot项目
<task:annotation-driven executor="annotationExecutor" />
<!-- 支持 @Async 注解 -->
<task:executor id="annotationExecutor" pool-size="20"/>
复制代码
执行流程:
EnableAsync
,@EnableAsync注解上有个@Import(AsyncConfigurationSelector.class)
,springboot的注入老套路了AsyncConfigurationSelector
,看到selectImports
方法了没,这里使用的是默认使用的是ProxyAsyncConfiguration
这个配置类ProxyAsyncConfiguration
继承AbstractAsyncConfiguration
,它里面的的setConfigurers
说明了咱们能够经过实现AsyncConfigurer
接口来完成线程池以及异常处理器的配置,并且在Spring环境中只能配置一个实现类,不然会抛出异常。 上一点代码:/** * Collect any {@link AsyncConfigurer} beans through autowiring. */
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
//AsyncConfigurer用来配置线程池配置以及异常处理器,并且在Spring环境中最多只能有一个,在这里咱们知道了,若是想要本身去配置线程池,只须要实现AsyncConfigurer接口,而且不能够在Spring环境中有多个实现AsyncConfigurer的类。
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
this.executor = configurer.getAsyncExecutor();
this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();
}
复制代码
ProxyAsyncConfiguration
注入的bean AsyncAnnotationBeanPostProcessor
,这个BeanPostBeanPostProcessor
很显然会对带有可以引起异步操做的注解(好比@Async
)的Bean进行处理AsyncAnnotationBeanPostProcessor
有重写父类的setBeanFactory
,这个方法是否是有点熟悉呢,它是BeanFactoryAware
接口中的方法,AsyncAnnotationBeanPostProcessor
的父类实现了这个接口,在咱们好久以前分析过的Bean的初始化中,是有提到过这个接口的,实现了Aware类型接口的Bean,会在初始化Bean的时候调用相应的初始化方法,具体能够查看AbstractAutowireCapableBeanFactory#initializeBean(final String beanName, final Object bean, RootBeanDefinition mbd)
方法postProcessAfterInitialization
方法在祖先类AbstractAdvisingBeanPostProcessor
中。从源码中能够看到。AsyncAnnotationBeanPostProcessor
是对Bean进行后置处理的BeanPostProcessor
JdkDynamicAopProxy
的invoke方法中,是用了责任链模式:List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
,将代理进行拦截来执行,通知链会包含setBeanFactory()方法生成的通知,执行链会用于建立ReflectiveMethodInvocation
对象,最终是调用ReflectiveMethodInvocation
的proceed()
来完成对方法的加强处理,proceed()
方法在这里会执行最后一个分支AsyncExecutionInterceptor
的invoke()
AsyncConfigurer
实现类,可是不意味着,在Spring环境中只能配置一个线程池,在Spring环境中是能够配置多个线程池,并且咱们能够在使用@Async注解进行异步操做的时候,经过在value属性上指定线程池BeanName,这样就能够指定相应的线程池来做为任务的载体,参见:determineAsyncExecutor
当咱们想要在SpringBoot中方便的使用@Async注解开启异步操做的时候,只须要实现AsyncConfigurer接口(这样就配置了默认线程池配置,固然该类须要在Spring环境中,由于是默认的,因此只能有一个,没有多个实现类排优先级的说法),实现对线程池的配置,并在启动类上加上@EnableAsync注解,便可使得@Async注解生效。
咱们甚至能够不显式的实现AsyncConfigurer,咱们能够在Spring环境中配置多个Executor类型的Bean,在使用@Async注解时,将注解的value指定为你Executor类型的BeanName,就可使用指定的线程池来做为任务的载体,这样就使用线程池也更加灵活。