本专题大纲:java
本文为本专题倒数第二篇文章。程序员
在上篇文章中咱们一块儿学习了Spring中的事务抽象机制以及动手模拟了一下Spring中的事务管理机制,那么本文咱们就经过源码来分析一下Spring中的事务管理究竟是如何实现的,本文将选用Spring5.2.x
版本。sql
Spring事务管理的入口就是@EnableTransactionManagement
注解,因此咱们直接从这个注解入手,其源码以下:数据库
public @interface EnableTransactionManagement { // 是否使用cglib代理,默认是jdk代理 boolean proxyTargetClass() default false; // 使用哪一种代理模式,Spring AOP仍是AspectJ AdviceMode mode() default AdviceMode.PROXY; // 为了完成事务管理,会向容器中添加通知 // 这个order属性表明了通知的执行优先级 // 默认是最低优先级 int order() default Ordered.LOWEST_PRECEDENCE; }
须要注意的是,@EnableTransactionManagement
的proxyTargetClass
会影响Spring中全部经过自动代理生成的对象。若是将proxyTargetClass
设置为true,那么意味经过@EnableAspectJAutoProxy
所生成的代理对象也会使用cglib进行代理。关于@EnableTransactionManagement
跟@EnableAspectJAutoProxy
混用时的一些问题等咱们在对@EnableTransactionManagement
有必定了解后再专门作一个比较,如今咱们先来看看这个注解到底在作了什么?缓存
从上图中能够看出这个注解作的就是向容器中注册了AutoProxyRegistrar
跟一个ProxyTransactionManagementConfiguration
(这里就不考虑AspectJ了,咱们日常都是使用SpringAOP),微信
AutoProxyRegistrar
用于开启自动代理,其源码以下:mybatis
这个类实现了ImportBeanDefinitionRegistrar
,它的做用是向容器中注册别的BeanDefinition
,咱们直接关注它的registerBeanDefinitions
方法便可app
// AnnotationMetadata,表明的是AutoProxyRegistrar的导入类的元信息 // 既包含了类元信息,也包含了注解元信息 public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { boolean candidateFound = false; // 获取@EnableTransactionManagement所在配置类上的注解元信息 Set<String> annTypes = importingClassMetadata.getAnnotationTypes(); // 遍历注解 for (String annType : annTypes) { // 能够理解为将注解中的属性转换成一个map AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType); if (candidate == null) { continue; } // 直接从map中获取对应的属性 Object mode = candidate.get("mode"); Object proxyTargetClass = candidate.get("proxyTargetClass"); // mode,代理模型,通常都是SpringAOP // proxyTargetClass,是否使用cglib代理 if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() && Boolean.class == proxyTargetClass.getClass()) { // 注解中存在这两个属性,而且属性类型符合要求,表示找到了合适的注解 candidateFound = true; // 实际上会往容器中注册一个InfrastructureAdvisorAutoProxyCreator if (mode == AdviceMode.PROXY) { AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry); if ((Boolean) proxyTargetClass) { AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry); return; } } } } // ...... }
若是对AOP比较了解的话,那么应该知道@EnableAspectJAutoProxy
注解也向容器中注册了一个能实现自动代理的bd,那么当@EnableAspectJAutoProxy
跟@EnableTransactionManagement
同时使用会有什么问题吗?答案你们确定知道,不会有问题,那么为何呢?咱们查看源码会发现,@EnableAspectJAutoProxy
最终调用的是异步
AopConfigUtils#registerAspectJAnnotationAutoProxyCreatorIfNecessary
,其源码以下ide
public static BeanDefinition registerAspectJAnnotationAutoProxyCreatorIfNecessary( BeanDefinitionRegistry registry, @Nullable Object source) { return registerOrEscalateApcAsRequired(AnnotationAwareAspectJAutoProxyCreator.class, registry, source); }
@EnableTransactionManagement
最终调用的是,AopConfigUtils#registerAutoProxyCreatorIfNecessary
,其源码以下
public static BeanDefinition registerAutoProxyCreatorIfNecessary( BeanDefinitionRegistry registry, @Nullable Object source) { return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source); }
它们最终都会调用registerOrEscalateApcAsRequired
方法,只不过传入的参数不同而已,一个是AnnotationAwareAspectJAutoProxyCreator
,另外一个是InfrastructureAdvisorAutoProxyCreator
。
registerOrEscalateApcAsRequired
源码以下:
private static BeanDefinition registerOrEscalateApcAsRequired( Class<?> cls, BeanDefinitionRegistry registry, @Nullable Object source) { if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) { BeanDefinition apcDefinition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME); if (!cls.getName().equals(apcDefinition.getBeanClassName())) { // 当前已经注册到容器中的Bean的优先级 int currentPriority = findPriorityForClass(apcDefinition.getBeanClassName()); // 当前准备注册到容器中的Bean的优先级 int requiredPriority = findPriorityForClass(cls); // 谁的优先级大就注册谁,AnnotationAwareAspectJAutoProxyCreator是最大的 // 因此AnnotationAwareAspectJAutoProxyCreator会覆盖别的Bean if (currentPriority < requiredPriority) { apcDefinition.setBeanClassName(cls.getName()); } } return null; } // 注册bd RootBeanDefinition beanDefinition = new RootBeanDefinition(cls); beanDefinition.setSource(source); beanDefinition.getPropertyValues().add("order", Ordered.HIGHEST_PRECEDENCE); beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); registry.registerBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME, beanDefinition); return beanDefinition; }
InfrastructureAdvisorAutoProxyCreator
跟AnnotationAwareAspectJAutoProxyCreator
的优先级是如何定义的呢?咱们来看看AopConfigUtils
这个类中的一个静态代码块
static { APC_PRIORITY_LIST.add(InfrastructureAdvisorAutoProxyCreator.class); APC_PRIORITY_LIST.add(AspectJAwareAdvisorAutoProxyCreator.class); APC_PRIORITY_LIST.add(AnnotationAwareAspectJAutoProxyCreator.class); }
实际上它们的优先级就是在APC_PRIORITY_LIST
这个集合中的下标,下标越大优先级越高,因此AnnotationAwareAspectJAutoProxyCreator
的优先级最高,因此AnnotationAwareAspectJAutoProxyCreator
会覆盖InfrastructureAdvisorAutoProxyCreator
,那么这种覆盖会不会形成问题呢?答案确定是不会的,由于你用了这么久了也没出过问题嘛~那么再思考一个问题,为何不会出现问题呢?这是由于InfrastructureAdvisorAutoProxyCreator
只会使用容器内部定义的Advisor
,可是AnnotationAwareAspectJAutoProxyCreator
会使用全部实现了Advisor
接口的通知,也就是说AnnotationAwareAspectJAutoProxyCreator
的做用范围大于InfrastructureAdvisorAutoProxyCreator
,所以这种覆盖是没有问题的。限于篇幅缘由这个问题我不作详细解答了,有兴趣的同窗能够看下两个类的源码。
@EnableTransactionManagement
除了注册了一个AutoProxyRegistrar
外,还向容器中注册了一个ProxyTransactionManagementConfiguration
。
那么这个ProxyTransactionManagementConfiguration
有什么做用呢?
若是你们对我文章的风格有一些了解的话就会知道,分析一个类通常有两个切入点
- 它的继承关系
- 它提供的API
你们本身在阅读源码时也能够参考这种思路,分析一个类的继承关系可让咱们了解它从抽象到实现的过程,即便不去细看API也能知道它的大致做用。仅仅知道它的大体做用是不够的,为了更好了解它的细节咱们就须要进一步去探索它的具体实现,也就是它提供的API。这算是我看了这么就源码的一点心得,正好想到了因此在这里分享下,以后会专门写一篇源码心得的文章
这个类的继承关系仍是很简单的,只有一个父类AbstractTransactionManagementConfiguration
源码以下:
@Configuration public abstract class AbstractTransactionManagementConfiguration implements ImportAware { @Nullable protected AnnotationAttributes enableTx; @Nullable protected TransactionManager txManager; // 这个方法就是获取@EnableTransactionManagement的属性 // importMetadata:就是@EnableTransactionManagement这个注解所在类的元信息 @Override public void setImportMetadata(AnnotationMetadata importMetadata) { // 将EnableTransactionManagement注解中的属性对存入到map中 // AnnotationAttributes实际上就是个map this.enableTx = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableTransactionManagement.class.getName(), false)); // 这里能够看到,限定了导入的注解必须使用@EnableTransactionManagement if (this.enableTx == null) { throw new IllegalArgumentException( "@EnableTransactionManagement is not present on importing class " + importMetadata.getClassName()); } } // 咱们能够配置TransactionManagementConfigurer // 经过TransactionManagementConfigurer向容器中注册一个事务管理器 // 通常不会这么使用,更多的是经过@Bean的方式直接注册 @Autowired(required = false) void setConfigurers(Collection<TransactionManagementConfigurer> configurers) { // ..... TransactionManagementConfigurer configurer = configurers.iterator().next(); this.txManager = configurer.annotationDrivenTransactionManager(); } // 向容器中注册一个TransactionalEventListenerFactory // 这个类用于处理@TransactionalEventListener注解 // 能够实现对事件的监听,而且在事务的特定阶段对事件进行处理 @Bean(name = TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public static TransactionalEventListenerFactory transactionalEventListenerFactory() { return new TransactionalEventListenerFactory(); } }
上面的代码中你们可能比较不熟悉的就是TransactionalEventListenerFactory
,这个类主要是用来处理@TransactionalEventListener
注解的,咱们来看一个实际使用的例子
@Component public class DmzListener { // 添加一个监听器 // phase = TransactionPhase.AFTER_COMMIT意味着这个方法在事务提交后执行 @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void listen(DmzTransactionEvent transactionEvent){ System.out.println("事务已提交"); } } // 定义一个事件 public class DmzTransactionEvent extends ApplicationEvent { public DmzTransactionEvent(Object source) { super(source); } } @Component public class DmzService { @Autowired ApplicationContext applicationContext; // 一个须要进行事务管理的方法 @Transactional public void invokeWithTransaction() { // 发布一事件 applicationContext.publishEvent(new DmzTransactionEvent(this)); // 以一条sout语句提代sql执行过程 System.out.println("sql invoked"); } } // 测试方法 public class Main { public static void main(String[] args) { AnnotationConfigApplicationContext ac = new AnnotationConfigApplicationContext(Config.class); DmzService dmzService = ac.getBean(DmzService.class); dmzService.invokeWithTransaction(); } } // 最后程序会按顺序输出 // sql invoked // 事务已提交
经过上面的例子咱们能够看到,虽然咱们在invokeWithTransaction
方法中一开始就发布了一个事件,可是监听事件的方法倒是在invokeWithTransaction
才执行的,正常事件的监听是同步的,假设咱们将上述例子中的@TransactionalEventListener
注解替换成为@EventListener
注解,以下:
@Component public class DmzListener { // 添加一个监听器 @EventListener public void listen(DmzTransactionEvent transactionEvent){ System.out.println("事务已提交"); } }
这个时候程序的输出就会是
// 事务已提交 // sql invoked
那么@TransactionalEventListener
注解是实现这种看似异步(实际上并非)的监听方式的呢?
你们按照上面这个调用链能够找到这么一段代码
经过上面的代码,咱们能够发现最终会调用到TransactionalEventListenerFactory
的createApplicationListener
方法,经过这个方法建立一个事件监听器而后添加到容器中,createApplicationListener
方法源码以下:
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) { return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method); }
就是建立了一个ApplicationListenerMethodTransactionalAdapter
,这个类自己就是一个事件监听器(实现了ApplicationListener
接口)。咱们直接关注它的事件监听方法,也就是onApplicationEvent
方法,其源码以下:
@Override public void onApplicationEvent(ApplicationEvent event) { // 激活了同步,而且真实存在事务 if (TransactionSynchronizationManager.isSynchronizationActive() && TransactionSynchronizationManager.isActualTransactionActive()) { // 其实是依赖事务的同步机制实现的事件监听 TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event); TransactionSynchronizationManager.registerSynchronization(transactionSynchronization); } // 在没有开启事务的状况下是否处理事件 else if (this.annotation.fallbackExecution()) { // .... // 若是注解中的fallbackExecution为true,意味着没有事务开启的话 // 也会执行监听逻辑 processEvent(event); } else { // .... } }
到这一步逻辑已经清楚了,@TransactionalEventListener
所标注的方法在容器启动时被解析成了一个ApplicationListenerMethodTransactionalAdapter
,这个类自己就是一个事件监听器,当容器中的组件发布了一个事件后,若是事件匹配,会进入它的onApplicationEvent
方法,这个方法并无直接执行咱们所定义的监听逻辑,而是给当前事务注册了一个同步的行为,当事务到达某一个阶段时,这个行为会被触发。经过这种方式,实现一种伪异步。实际上注册到事务的的同步就是TransactionSynchronizationEventAdapter
,这个类的源码很是简单,这里就单独取它一个方法看下
// 这个方法会在事务提交前执行 public void beforeCommit(boolean readOnly) { // 在执行时会先判断在@TransactionalEventListener注解中定义的phase是否是BEFORE_COMMIT // 若是不是的话,什么事情都不作 if (this.phase == TransactionPhase.BEFORE_COMMIT) { processEvent(); } }
别看上面这么多内容,到目前为止咱们仍是只对ProxyTransactionManagementConfiguration
的父类作了介绍,接下来咱们就来看看ProxyTransactionManagementConfiguration
自身作了什么事情。
// proxyBeanMethods=false,意味着不对配置类生成代理对象 @Configuration(proxyBeanMethods = false) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration { // 注册了一个BeanFactoryTransactionAttributeSourceAdvisor // advisor就是一个绑定了切点的通知 // 能够看到通知就是TransactionInterceptor // 切点会经过TransactionAttributeSource去解析@Transacational注解 // 只会对有这个注解的方法进行拦截 @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME) // BeanDefinition的角色是一个基础设施类 @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor( TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) { BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor(); advisor.setTransactionAttributeSource(transactionAttributeSource); advisor.setAdvice(transactionInterceptor); if (this.enableTx != null) { advisor.setOrder(this.enableTx.<Integer>getNumber("order")); } return advisor; } // 注册一个AnnotationTransactionAttributeSource // 这个类的主要做用是用来解析@Transacational注解 @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public TransactionAttributeSource transactionAttributeSource() { return new AnnotationTransactionAttributeSource(); } // 事务是经过AOP实现的,AOP的核心就是拦截器 // 这里就是注册了实现事务须要的拦截器 @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) { TransactionInterceptor interceptor = new TransactionInterceptor(); interceptor.setTransactionAttributeSource(transactionAttributeSource); if (this.txManager != null) { interceptor.setTransactionManager(this.txManager); } return interceptor; } }
实现事务管理的核心就是SpringAOP,而完成SpringAOP的核心就是通知(Advice),通知的核心就是拦截器,关于SpringAOP、切点、通知在以前的文章中已经作过详细介绍了,因此对这一块本文就跳过了,咱们直接定位到事务管理的核心TransactionInterceptor
。
TransactionInterceptor
实现了MethodInterceptor
,核心方法就是invoke
方法,咱们直接定位到这个方法的具体实现逻辑
// invocation:表明了要进行事务管理的方法 public Object invoke(MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // 核心方法就是invokeWithinTransaction return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }
这个方法很长,可是主要能够分为三段
这里咱们只分析标准事务管理
,下面的源码也只保留标准事务管理相关代码
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // 以前在配置类中注册了一个AnnotationTransactionAttributeSource // 这里就是直接返回了以前注册的那个Bean,经过它去获取事务属性 TransactionAttributeSource tas = getTransactionAttributeSource(); // 解析@Transactional注解获取事务属性 final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); // 获取对应的事务管理器 final TransactionManager tm = determineTransactionManager(txAttr); // ... // 忽略响应式的事务管理 // ... // 作了个强转PlatformTransactionManager PlatformTransactionManager ptm = asPlatformTransactionManager(tm); // 切点名称(类名+方法名),会被做为事务的名称 final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { // 建立事务 TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { // 这里执行真正的业务逻辑 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 方法执行出现异常,在异常状况下完成事务 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 清除线程中的事务信息 cleanupTransactionInfo(txInfo); } // ... // 省略不重要代码 // ... // 提交事务 commitTransactionAfterReturning(txInfo); return retVal; } // .... // 省略回调实现事务管理相关代码 // .... return result; } }
经过上面这段代码能够概括出事务管理的流程以下:
tas.getTransactionAttribute
createTransactionIfNecessary
invocation.proceedWithInvocation
completeTransactionAfterThrowing
cleanupTransactionInfo
commitTransactionAfterReturning
接下来咱们一步步分析
// 获取事务对应的属性,实际上返回一个AnnotationTransactionAttributeSource // 以后再调用AnnotationTransactionAttributeSource的getTransactionAttribute // getTransactionAttribute:先从拦截的方法上找@Transactional注解 // 若是方法上没有的话,再从方法所在的类上找,若是类上尚未的话尝试从接口或者父类上找 public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) { if (method.getDeclaringClass() == Object.class) { return null; } // 在缓存中查找 Object cacheKey = getCacheKey(method, targetClass); TransactionAttribute cached = this.attributeCache.get(cacheKey); if (cached != null) { if (cached == NULL_TRANSACTION_ATTRIBUTE) { return null; } else { return cached; } } else { // 这里真正的去执行解析 TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass); // 缓存解析的结果,若是为事务属性为null,也放入一个标志 // 表明这个方法不须要进行事务管理 if (txAttr == null) { this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE); } else { String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass); if (txAttr instanceof DefaultTransactionAttribute) { ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification); } this.attributeCache.put(cacheKey, txAttr); } return txAttr; } }
真正解析注解时调用了computeTransactionAttribute
方法,其代码以下:
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) { // 默认状况下allowPublicMethodsOnly为true // 这意味着@Transactional若是放在非public方法上不会生效 if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) { return null; } // method是接口中的方法 // specificMethod是具体实现类的方法 Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass); // 如今目标类方法上找 TransactionAttribute txAttr = findTransactionAttribute(specificMethod); if (txAttr != null) { return txAttr; } // 再在目标类上找 txAttr = findTransactionAttribute(specificMethod.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } // 降级到接口跟接口中的方法上找这个注解 if (specificMethod != method) { txAttr = findTransactionAttribute(method); if (txAttr != null) { return txAttr; } txAttr = findTransactionAttribute(method.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } } return null; }
能够看到在computeTransactionAttribute
方法中又进一步调用了findTransactionAttribute
方法,咱们一步步跟踪最终会进入到SpringTransactionAnnotationParser#parseTransactionAnnotation(AnnotationAttributes)
这个方法中
整个调用链我这里也画出来了,感兴趣的你们能够跟一下
对于本文,咱们直接定位到最后一步,对应源码以下:
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) { // 最终返回的是一个RuleBasedTransactionAttribute // 在上篇文章分析过了,定义了在出现异常时如何回滚 RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute(); Propagation propagation = attributes.getEnum("propagation"); rbta.setPropagationBehavior(propagation.value()); Isolation isolation = attributes.getEnum("isolation"); rbta.setIsolationLevel(isolation.value()); rbta.setTimeout(attributes.getNumber("timeout").intValue()); rbta.setReadOnly(attributes.getBoolean("readOnly")); rbta.setQualifier(attributes.getString("value")); List<RollbackRuleAttribute> rollbackRules = new ArrayList<>(); for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) { rollbackRules.add(new RollbackRuleAttribute(rbRule)); } for (String rbRule : attributes.getStringArray("rollbackForClassName")) { rollbackRules.add(new RollbackRuleAttribute(rbRule)); } for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) { rollbackRules.add(new NoRollbackRuleAttribute(rbRule)); } for (String rbRule : attributes.getStringArray("noRollbackForClassName")) { rollbackRules.add(new NoRollbackRuleAttribute(rbRule)); } rbta.setRollbackRules(rollbackRules); return rbta; }
到这一步很明确了,解析了@Transactional
注解,并将这个注解的属性封装到了一个RuleBasedTransactionAttribute
对象中返回。
对应代码以下:
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // 若是没有为事务指定名称,使用切点做为事务名称 if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // 调用事务管理器的方法,获取一个事务并返回事务的状态 status = tm.getTransaction(txAttr); } // ....省略日志 } // 将事务相关信息封装到TransactionInfo对象中 // 并将TransactionInfo绑定到当前线程 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
从上面方法签名上咱们能够看到,建立事务实际上就是建立了一个TransactionInfo
。一个TransactionInfo
对象包含了事务相关的全部信息,例如实现事务使用的事务管理器(PlatformTransactionManager
),事务的属性(TransactionAttribute
),事务的状态(transactionStatus
)以及与当前建立的关联的上一个事务信息(oldTransactionInfo
)。咱们能够经过TransactionInfo
对象的hasTransaction
方法判断是否真正建立了一个事务。
上面的核心代码只有两句
tm.getTransaction
,经过事务管理器建立事务prepareTransactionInfo
,封装TransactionInfo
并绑定到线程上咱们先来看看getTransaction
干了啥,对应代码以下:
这个代码应该是整个Spring实现事务管理里面最难的了,由于牵涉到事务的传播机制,不一样传播级别是如何进行处理的就是下面这段代码决定的,比较难,但愿你们能耐心看完
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // 事务的属性(TransactionAttribute),经过解析@Transacational注解de'dao TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); // 获取一个数据库事务对象(DataSourceTransactionObject), // 这个对象中封装了一个从当前线程上下文中获取到的链接 Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); // 判断是否存在事务 // 若是以前获取到的链接不为空,而且链接上激活了事务,那么就为true if (isExistingTransaction(transaction)) { // 若是已经存在了事务,须要根据不一样传播机制进行不一样的处理 return handleExistingTransaction(def, transaction, debugEnabled); } // 校验事务的超时设置,默认为-1,表明不进行超时检查 if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } // 检查隔离级别是否为mandatory(强制性要求必须开启事务) // 若是为mandatory,可是没有事务存在,那么抛出异常 if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } // 目前为止没有事务,而且隔离级别不是mandatory else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 当隔离级别为required,required_new,nested时均须要新建事务 // 若是存在同步,将注册的同步挂起 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { // 开启一个新事务 return startTransaction(def, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { // 建立一个空事务,没有实际的事务提交以及回滚机制 // 会激活同步:将数据库链接绑定到当前线程上 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } }
上面这段代码能够分为两种状况分析
应用程序直接调用了一个被事务管理的方法(直接调用)
在一个须要事务管理的方法中调用了另一个须要事务管理的方法(嵌套调用)
用代码表示以下:
@Service public class IndexService { @Autowired DmzService dmzService; // 直接调用 @Transactional public void directTransaction(){ // ...... } // 嵌套调用 @Transactional public void nestedTransaction(){ dmzService.directTransaction(); } }
ps:咱们暂且不考虑自调用的状况,由于自调用可能会出现事务失效,在下篇文章咱们专门来聊一聊事务管理中的那些坑
咱们先来看看直接调用的状况下上述代码时如何执行的
doGetTransaction
源码分析protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); // 在建立DataSourceTransactionManager将其设置为了true // 标志是否容许 txObject.setSavepointAllowed(isNestedTransactionAllowed()); // 从线程上下文中获取到对应的这个链接池中的链接 // 获取对应数据源下的这个绑定的链接 // 当咱们将数据库链接绑定到线程上时,实际上绑定到当前线程的是一个map // 其中key是对应的数据源,value是经过这个数据源获取的一个链接 ConnectionHolder conHolder =(ConnectionHolder)TransactionSynchronizationManager .getResource(obtainDataSource()); // 若是当前上下文中已经有这个链接了,那么将newConnectionHolder这个标志设置为false // 表明复用了以前的链接(不是一个新链接) txObject.setConnectionHolder(conHolder, false); return txObject; }
直接调用的状况下,获取到的链接确定为空,因此这里返回的是一个没有持有数据库链接的DataSourceTransactionObject
。
isExistingTransaction
源码分析protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; // 只有在存在链接而且链接上已经激活了事务才会返回true return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); }
直接调用的状况下,这里确定是返回fasle
MANDATORY
的方法将会抛出异常required、requires_new、nested
时,会真正开启事务,对应代码以下else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 若是存在同步,将注册的同步挂起 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { // 开启一个新事务 return startTransaction(def, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } }
咱们先来看看它的挂起操做干了什么,对应代码以下:
// 直接调用时,传入的transaction为null protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException { // 在以前的代码中没有进行任何激活同步的操做,因此不会进入下面这个判断 if (TransactionSynchronizationManager.isSynchronizationActive()) { // ... } // 传入的transaction为null,这个判断也不进 else if (transaction != null) { // ... } else { return null; } }
从上面能够看出,这段代码其实啥都没干,OK,减负,直接跳过。
接着,就是真正的开启事务了,会调用一个startTransaction
方法,对应代码以下:
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) { // 默认状况下,getTransactionSynchronization方法会返回SYNCHRONIZATION_ALWAYS // 因此这里是true boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // 根据以前的事务定义等相关信息构造一个事务状态对象 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 真正开启事务,会从数据源中获取链接并绑定到线程上 doBegin(transaction, definition); // 在这里会激活同步 prepareSynchronization(status, definition); return status; }
newTransactionStatus
实际上就是调用了DefaultTransactionStatus
的构造函数,咱们来看一看每一个参数的含义以及实际传入的是什么。对应代码以下:
// definition:事务的定义,解析@Transactional注解获得的 // transaction:经过前面的doGetTransaction方法获得的,关联了一个数据库链接 // newTransaction:是不是一个新的事务 // debug:是否开启debug级别日志 // newSynchronization:是否须要一个新的同步 // suspendedResources:表明了执行当前事务时挂起的资源 protected DefaultTransactionStatus newTransactionStatus( TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { // 是不是一个真实的新的同步 // 除了传入的标志以外还须要判断当前线程上的同步是否激活 // 没有激活才算是一个真正的新的同步 boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive(); // 返回一个事务状态对象 // 包含了事务的定于、事务使用的链接、事务是否要开启一个新的同步、事务挂起的资源等 return new DefaultTransactionStatus( transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources); }
在完成事务状态对象的构造以后,就是真正的开启事务了,咱们也不难猜出所谓开启事务其实就是从数据源中获取一个一个链接并设置autoCommit为false。对应代码以下:
protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { // 判断txObject中是否存在链接而且链接上已经激活了事务 // txObject是经过以前的doGetTransaction方法获得的 // 直接调用的状况下,这个判断确定为true if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { // 从数据源中获取一个链接 Connection newCon = obtainDataSource().getConnection(); } // 将链接放入到txObject中 // 第二个参数为true,标志这是一个新链接 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } // 标志链接资源跟事务同步 txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); // 应用事务定义中的read only跟隔离级别 // 实际就是调用了Connection的setReadOnly跟setTransactionIsolation方法 // 若是事务定义中的隔离级别跟数据库默认的隔离级别不一致会返回的是数据库默认的隔离级别 // 不然返回null // 主要是为了在事务完成后能将链接状态恢复 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); txObject.setReadOnly(definition.isReadOnly()); // 设置autoCommit为false,显示开启事务 if (con.getAutoCommit()) { // 表明在事务完成后须要将链接重置为autoCommit=true // 跟以前的previousIsolationLevel做用同样,都是为了恢复链接 txObject.setMustRestoreAutoCommit(true); con.setAutoCommit(false); } // 是否经过显示的语句设置read only,默认是不须要的 prepareTransactionalConnection(con, definition); txObject.getConnectionHolder().setTransactionActive(true); // 设置超时时间 int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // 将链接绑定到当前线程上下文中,实际存入到线程上下文的是一个map // 其中key为数据源,value为从该数据源中获取的一个链接 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { // 出现异常的话,须要归还链接 if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
概括起来,doBegin
作了这么几件事
在经过doBegin
开启了事务后,接下来调用了prepareSynchronization
,这个方法的主要目的就是为了准备这个事务须要的同步,对应源码以下:
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) { // 到这里真正激活了事务 TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); // 隔离级别 // 只有在不是默认隔离级别的状况下才会绑定到线程上,不然绑定一个null TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null); // 是否只读 TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); // 初始化同步行为 TransactionSynchronizationManager.initSynchronization(); } }
主要对一些信息进行了同步,例如事务真正激活了事务,事务隔离级别,事务名称,是否只读。同时初始化了同步事务过程当中要执行的一些回调(也就是一些同步的行为)
在前面咱们已经介绍了在直接调用的状况下,若是传播级别为mandatory
会直接抛出异常,传播级别为required、requires_new、nested
时,会调用startTransaction
真正开启一个事务,可是除了这几种传播级别以外还有supports、not_supported、never
。这几种传播级别在直接调用时会作什么呢?前面那张图我其实已经画出来了,会开启一个空事务("empty" transaction
),对应代码以下:
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // ...... // mandatory抛出异常 // required、requires_new、nested调用startTransaction开启事务 // supports、not_supported、never会进入下面这个判断 else { // 默认同步级别就是always boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); // 直接调用prepareTransactionStatus返回一个事务状态对象 return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } }
prepareTransactionStatus
代码以下
// 传入的参数为:def, null, true, newSynchronization, debugEnabled, null protected final DefaultTransactionStatus prepareTransactionStatus( TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { // 调用这个方法时 // definition:传入的是解析@Transactional注解获得的事务属性 // transaction:写死的传入为null,意味着没有真正的事务() // newTransaction:写死的传入为true // newSynchronization:默认同步级别为always,在没有真正事务的时候也进行同步 // suspendedResources:写死了,传入为null,不挂起任何资源 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, newTransaction, newSynchronization, debug, suspendedResources); prepareSynchronization(status, definition); return status; }
能够看到这个方法跟以前介绍的startTransaction
方法相比较下就有几点不一样,最明显的是少了一个步骤,在prepareTransactionStatus
方法中没有调用doBegin
方法,这意味这个这个方法不会去获取数据库链接,更不会绑定数据库链接到上下文中,仅仅是作了一个同步的初始化。
其次,startTransaction
方法在调用newTransactionStatus
传入的第二个参数是从doGetTransaction
方法中获取的,不可能为null,而调用prepareTransactionStatus
方法时,写死的传入为null。这也表明了prepareTransactionStatus
不会真正开启一个事务。
虽然不会真正开启一个事务,只是开启了一个“空事务”,可是当这个空事务完成时仍然会触发注册的回调。
前面已经介绍了在直接调用下七种不一样隔离级别在建立事务时的不一样表现,代码看似不少,实际仍是比较简单的,接下来咱们要介绍的就是嵌套调用
,也就是已经存在事务的状况下,调用了另一个被事务管理的方法(而且事务管理是生效的)。咱们须要关注的是下面这段代码
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // 事务的属性(TransactionAttribute) TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); // 从线程上下文中获取到一个链接,并封装到一个DataSourceTransactionObject对象中 Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); // 判断以前获取到的事务上是否有链接,而且链接上激活了事务 if (isExistingTransaction(transaction)) // 嵌套调用处理在这里 return handleExistingTransaction(def, transaction, debugEnabled); } // ... // 下面是直接调用的状况,前文已经分析过了 // 省略直接调用的相关代码 }
处理嵌套调用的核心代码其实就是handleExistingTransaction
。可是进入这个方法前首先isExistingTransaction
这个方法得返回true才行,因此咱们先来看看isExistingTransaction
这个方法作了什么,代码以下:
protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; // 首先判断txObject中是否已经绑定了链接 // 其次判断这个链接上是否已经激活了事务 return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); }
结合咱们以前分析的直接调用的代码逻辑,能够看出,只有外围的事务的传播级别为required、requires_new、nested
时,这个判断才会成立。由于只有在这些传播级别下才会真正的开启事务,才会将链接绑定到当前线程,doGetTransaction
方法才能返回一个已经绑定了数据库链接的事务对象。
在知足了isExistingTransaction
时,会进入嵌套调用的处理逻辑,也就是handleExistingTransaction
方法,其代码以下:
代码很长,可是你们跟着我一步步梳理就能够了,其实逻辑也不算特别复杂
整个代码逻辑其实就是为了实现事务的传播,在不一样的传播级别下作不一样的事情
private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { // 若是嵌套的事务的传播级别为never,那么直接抛出异常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } // 若是嵌套的事务的传播级别为not_soupported,那么挂起外围事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } // 挂起外围事务 Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); // 开启一个新的空事务 return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } // 若是嵌套的事务传播级别为requires_new,那么挂起外围事务,而且新建一个新的事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { SuspendedResourcesHolder suspendedResources = suspend(transaction); try { return startTransaction(definition, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } // 若是嵌套事务的传播级别为nested,会获取当前线程绑定的数据库链接 // 并经过数据库链接建立一个保存点(save point) // 其实就是调用Connection的setSavepoint方法 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 默认是容许的,因此这个判断不会成立 if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } // 默认是true if (useSavepointForNestedTransaction()) { DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status; } else { // JTA进行事务管理才会进入这这里,咱们不作考虑 return startTransaction(definition, transaction, debugEnabled, null); } } if (debugEnabled) { logger.debug("Participating in existing transaction"); } // 嵌套事务传播级别为supports、required、mandatory时,是否须要校验嵌套事务的属性 // 主要校验的是个隔离级别跟只读属性 // 默认是不须要校验的 // 若是开启了校验,那么会判断若是外围事务的隔离级别跟嵌套事务的隔离级别是否一致 // 若是不一致,直接抛出异常 if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; // 这里会抛出异常 } } // 嵌套事务的只读为false if (!definition.isReadOnly()) { // 可是外围事务的只读为true,那么直接抛出异常 if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { // 这里会抛出异常 } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
观察上面的代码咱们能够发现,返回值只有两个状况(不考虑抛出异常)
startTransaction
方法prepareTransactionStatus
方法在前面咱们已经介绍了这两个方法的区别,相比较于startTransaction
方法,prepareTransactionStatus
并不会再去数据源中获取链接。
另外咱们还能够发现,只有传播级别为required_new
的状况才会去调用startTransaction
方法(不考虑JTA),也就是说,只有required_new
才会真正的获取一个新的数据库链接,其他的状况若是支持事务的话都是复用了外围事务获取到的链接,也就是说它们实际上是加入了外围的事务中,例如supports、required、mandatory、nested
,其中nested
又比较特殊,由于它不只仅是单纯的加入了外围的事务,并且在加入前设置了一个保存点,若是仅仅是嵌套事务发生了异常,会回滚到以前设置的这个保存点上。另外须要注意的是,由于是直接复用了外部事务的链接,因此supports、required、mandatory、nested
这几种传播级别下,嵌套的事务会随着外部事务的提交而提交,同时也会跟着外部事物的回滚而回滚。
接下来咱们开始细节性的分析上边的代码,对于传播级别为never,没啥好说的,直接抛出异常,由于不支持在事务中运行嘛~
当传播级别为not_supported
时会进入下面这段代码
// 若是嵌套的事务的传播级别为not_soupported,那么挂起外围事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } // 挂起外围事务 Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); // 开启一个新的空事务 return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); }
主要就是两个动做,首先挂起外围事务。不少同窗可能不是很理解挂起这个词的含义,挂起实际上作了什么呢?
其中第一步对应的代码以下:
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException { // 嵌套调用下,同步是已经被激活了的 if (TransactionSynchronizationManager.isSynchronizationActive()) { // 解绑线程上绑定的同步回调,并返回 List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null; if (transaction != null) { // 这里实际上就是解绑线程上绑定的数据库链接 // 同时返回这个链接 suspendedResources = doSuspend(transaction); } // 解绑线程上绑定的事务属性并返回 String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); // 最后集中封装为一个SuspendedResourcesHolder返回 return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException | Error ex) { // 出现异常的话,恢复 doResumeSynchronization(suspendedSynchronizations); throw ex; } } else if (transaction != null) { // 若是没有激活同步,那么也须要将链接挂起 Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { // Neither transaction nor synchronization active. return null; } }
在上面的代码咱们要注意到一个操做,它会清空线程绑定的数据库链接,同时在后续操做中也不会再去获取一个数据库链接从新绑定到当前线程上,因此not_supported
传播级别下每次执行SQL均可能使用的不是同一个数据库链接对象(依赖于业务中获取链接的方式)。这点你们要注意,跟后面的几种有很大的区别。
获取到须要挂起的资源后,调用了prepareTransactionStatus
,这个方法咱们以前分析过了,可是在这里传入的参数是不一样的
// definition:非null,解析事务注解得来的 // transaction:null // newTransaction:false // newSynchronization:true // suspendedResources:表明挂起的资源,包括链接以及同步 protected final DefaultTransactionStatus prepareTransactionStatus( TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { DefaultTransactionStatus status = newTransactionStatus( definition, transaction, newTransaction, newSynchronization, debug, suspendedResources); prepareSynchronization(status, definition); return status; }
最终会返回一个这样的事务状态对象,其中的transaction
为null、newTransaction
为false,这两者表明了不存在一个真正的事务。在后续的事务提交跟回滚时会根据事务状态对象中的这两个属性来判断是否须要真正执行回滚,若是不存在真正的事务,那么也就没有必要去回滚(固然,这只是针对内部的空事务而言,若是抛出的异常同时中断了外部事务,那么外部事务仍是会回滚的)。除了这两个属性外,还有newSynchronization
,由于在挂起同步时已经将以前的同步清空了,因此newSynchronization
仍然为true,这个属性会影响后续的一些同步回调,只有为true的时候才会执行回调操做。最后就是suspendedResources
,后续须要根据这个属性来恢复外部事务的状态。
当传播级别为requires_new
时,会进入下面这段代码
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { // 第一步,也是挂起 SuspendedResourcesHolder suspendedResources = suspend(transaction); try { // 第二步,开启一个新事务,会绑定一个新的链接到当前线程上 return startTransaction(definition, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error beginEx) { // 出现异常,恢复外部事务状态 resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } }
很简单吧,挂起事务,而后新开一个事务,而且新事务的链接跟外围事务的不同。也就是说这两个事务互不影响。它也会返回一个事务状态对象,可是不一样的是,transaction
不为null、newTransaction
为true。也就是说它有本身的提交跟回滚机制,也不难理解,毕竟是两个不一样的数据库链接嘛~
当传播级别为nested
进入下面这段代码
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (useSavepointForNestedTransaction()) { DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status; } else { // JTA相关,不考虑 return startTransaction(definition, transaction, debugEnabled, null); } }
前面其实已经介绍过了,也很简单。仍是把重点放在返回的事务状态对象中的那几个关键属性上,transaction
不为null,可是newTransaction
为false,也就是说它也不是一个新事务,另外须要注意的是,它没有挂起任何事务相关的资源,仅仅是建立了一个保存点而已。这个事务在回滚时,只会回滚到指定的保存点。同时由于它跟外围事务共用一个链接,因此它会跟随外围事务的提交而提交,回滚而回滚。
剩下的supports、required、mandatory
这几种传播级别都会进入下面这段代码
// 省略了校验相关代码,前面已经介绍过了,默认是关闭校验的 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
咱们会发现相比较于nested
只是少了一个建立保存点的动做。最终返回的事务状态对象中的属性,transaction
不为null,可是newTransaction
为false,也就是说它也不是一个新事务。同时由于没有挂起外围事务的同步,因此它也不是一个新的同步(newSynchronization为false
)。
对于每一个隔离级别下返回的事务状态对象中的属性但愿你们有必定了解,由于后续的回滚、提交等操做都依赖于这个事务状态对象。
到目前为止,咱们就介绍完了事务的建立,紧接着就是真正的执行业务代码了,要保证业务代码能被事务管理,最重要的一点是保证在业务代码中执行SQL时仍然是使用咱们在开启事务时绑定到线程上的数据库链接。那么是如何保证的呢?咱们分为两种状况讨论
JdbcTemplate
访问数据库Mybatis
访问数据库对于第二种,可能须要你对Mybatis
稍微有些了解
咱们要讨论的只是
JdbcTemplate
是如何获取到以前绑定在线程上的链接这个问题,不要想的太复杂
在事务专题的第一篇我就对JdbcTemplate
作了一个简单的源码分析,它底层获取数据库链接实际上就是调用了DataSourceUtils#doGetConnection
方法,代码以下:
关键代码我已经标出来了,实际上获取链接时也是从线程上下文中获取
Mybatis相对来讲比较复杂,你们目前作为了解便可。当Spring整合Mybatis时,事务是交由Spring来管理的,那么Spring是如何接管Mybatis的事务的呢?核心代码位于SqlSessionFactoryBean#buildSqlSessionFactory
方法中。其中有这么一段代码
在这里替换掉了Mybatis的事务工厂(Mybatis
依赖事务工厂建立的事务对象来获取链接),使用了Spring本身实现的一个事务工厂SpringManagedTransactionFactory
。经过它能够获取一个事务对象SpringManagedTransaction
。咱们会发现这个事务对象在获取链接时调用的也是DataSourceUtils
的方法
private void openConnection() throws SQLException { this.connection = DataSourceUtils.getConnection(this.dataSource); this.autoCommit = this.connection.getAutoCommit(); this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource); }
因此它也能保证使用的是开启事务时绑定在线程上的链接,从而保证事务的正确性。
出现异常时其实分为两种状况,并非必定会回滚
对应代码以下:
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.getTransactionStatus() != null) { // transactionAttribute是从@Transactional注解中解析得来的 if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try { // 回滚 txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } // 省略异常处理 } else { try { // 即便出现异常仍然提交事务 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } // 省略异常处理 } } }
能够看到,只有在知足txInfo.transactionAttribute.rollbackOn(ex)
这个条件时才会真正执行回滚,不然即便出现了异常也会先提交事务。这个条件取决于@Transactiona
注解中的rollbackFor
属性是如何配置的,若是不进行配置的话,默认只会对RuntimeException
或者Error
进行回滚。
在进行回滚时会调用事务管理器的rollback
方法,对应代码以下:
public final void rollback(TransactionStatus status) throws TransactionException { // 事务状态为已完成的时候调用回滚会抛出异常 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; // 这里真正处理回滚 processRollback(defStatus, false); }
private void processRollback(DefaultTransactionStatus status, boolean unexpected) { try { // 传入时写死的为false boolean unexpectedRollback = unexpected; try { // 触发以前注册的同步回调 triggerBeforeCompletion(status); // 存在保存点,根据咱们以前的分析,说明这是一个嵌套调用的事务 // 而且内部事务的传播级别为nested if (status.hasSavepoint()) { // 这里会回滚到定义的保存点 status.rollbackToHeldSavepoint(); } // 根据咱们以前的分析有两种状况会知足下面这个判断 // 1.直接调用,传播级别为nested、required、requires_new // 2.嵌套调用,而且内部事务的传播级别为requires_new else if (status.isNewTransaction()) { // 直接获取当前线程上绑定的数据库链接并调用其rollback方法 doRollback(status); } else { // 到这里说明存在事务,可是不是一个新事务而且没有保存点 // 也就是嵌套调用而且内部事务的传播级别为supports、required、mandatory if (status.hasTransaction()) { // status.isLocalRollbackOnly,表明事务的结果只能为回滚 // 默认是false的,在整个流程中没有看到修改这个属性 // isGlobalRollbackOnParticipationFailure // 这个属性的含义是在加入的事务失败时是否回滚整个事务,默认为true if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { // 从这里能够看出,但内部的事务发生异常时会将整个大事务标记成回滚 doSetRollbackOnly(status); } else { // 进入这个判断说明修改了全局配置isGlobalRollbackOnParticipationFailure // 内部事务异常并不影响外部事务 } } else { // 不存在事务,回滚不作任何操做 logger.debug("Should roll back transaction but cannot - no transaction available"); } // isFailEarlyOnGlobalRollbackOnly这个参数默认为false // unexpectedRollback的值一开始就被赋值成了false if (!isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false; } } } catch (RuntimeException | Error ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } // 触发同步回调 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); // unexpectedRollback是false // 这个值若是为true,说明 if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } } finally { // 在事务完成后须要执行一些清理动做 cleanupAfterCompletion(status); } }
上面的代码结合注释看起来应该都很是简单,咱们最后关注一下cleanupAfterCompletion
这个方法,对应代码以下
private void cleanupAfterCompletion(DefaultTransactionStatus status) { // 将事务状态修改成已完成 status.setCompleted(); // 是不是新的同步 // 清理掉线程绑定的全部同步信息 // 直接调用时,在任意传播级别下这个条件都是知足的 // 嵌套调用时,只有传播级别为not_supported、requires_new才会知足 if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } // 是不是一个新的事务 // 直接调用下,required、requires_new、nested都是新开的一个事务 // 嵌套调用下,只有requires_new会新起一个事务 if (status.isNewTransaction()) { // 真正执行清理 doCleanupAfterCompletion(status.getTransaction()); } // 若是存在挂起的资源,将挂起的资源恢复 // 恢复的操做跟挂起的操做正好相反 // 就是将以前从线程解绑的资源(数据库链接等)已经同步回调从新绑定到线程上 if (status.getSuspendedResources() != null) { if (status.isDebug()) { logger.debug("Resuming suspended transaction after completion of inner transaction"); } Object transaction = (status.hasTransaction() ? status.getTransaction() : null); resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources()); } }
再来看看doCleanupAfterCompletion
到底作了什么,源码以下:
protected void doCleanupAfterCompletion(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; // 先判断是不是一个新链接 // 直接调用,若是真正开启了一个事务一定是个链接 // 可是嵌套调用时,只有requires_new会新起一个链接,其他的都是复用外部事务的链接 // 这种状况下不能将链接从线程上下文中清除,由于外部事务还须要使用 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.unbindResource(obtainDataSource()); } // 恢复链接的状态 // 1.从新将链接设置为自动提交 // 2.恢复隔离级别 // 3.将read only从新设置为false Connection con = txObject.getConnectionHolder().getConnection(); try { if (txObject.isMustRestoreAutoCommit()) { con.setAutoCommit(true); } DataSourceUtils.resetConnectionAfterTransaction( con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly()); } catch (Throwable ex) { logger.debug("Could not reset JDBC Connection after transaction", ex); } // 最后,由于事务已经完成了因此归还链接 if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); } // 将事务对象中绑定的链接相关信息也清空掉 txObject.getConnectionHolder().clear(); }
提交事务有两种状况
- 正常执行完成,提交事务
- 出现异常,可是不知足回滚条件,仍然提交事务
可是无论哪一种清空最终都会调用
AbstractPlatformTransactionManager#commit
方法,因此咱们就直接分析这个方法
代码以下:
public final void commit(TransactionStatus status) throws TransactionException { // 跟处理回滚时是同样的,都会先校验事务的状态 // 若是事务已经完成了,那么直接抛出异常 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } // 这里会检查事务的状态是否被设置成了只能回滚 // 这里检查的是事务状态对象中的rollbackOnly属性 DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { processRollback(defStatus, false); return; } // 这里会检查事务对象自己是否被设置成了rollbackOnly // 以前咱们在分析回滚的代码时知道,当内部的事务发生回滚时(supports、required) // 默认状况下会将整个事务对象标记为回滚,实际上在外部事务提交时就会进入这个判断 // shouldCommitOnGlobalRollbackOnly:在全局被标记成回滚时是否还要提交,默认为false if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { processRollback(defStatus, true); return; } // 真正处理提交 processCommit(defStatus); }
能够看到,即便进入了提交事务的逻辑仍是可能回滚。
rollbackOnly
rollbackOnly
两者在回滚时都是调用了processRollback
方法,可是稍有区别,经过事务状态对象形成的回滚最终在回滚后并不会抛出异常,可是事务对象自己会抛出异常给调用者。
真正处理提交的逻辑在processCommit
方法中,代码以下:
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { boolean unexpectedRollback = false; // 留给子类复写的一个方法,没有作实质性的事情 prepareForCommit(status); // 触发同步回调 triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; // 存在保存点,将事务中的保存点清理掉 if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } // 直接调用,传播级别为required、nested、requires_new // 嵌套调用,且传播级别为requires_new else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } // 虽然前面已经检查过rollbackOnly了,在shouldCommitOnGlobalRollbackOnly为true时 // 仍然须要提交事务,将unexpectedRollback设置为true,意味着提交事务后仍要抛出异常 unexpectedRollback = status.isGlobalRollbackOnly(); // 提交的逻辑很简单,调用了connection的commit方法 doCommit(status); } else if (isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = status.isGlobalRollbackOnly(); } if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // unexpectedRollback为true时,进入这个catch块 // 触发同步回调 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // connection的doCommit方法抛出SqlException时进入这里 if (isRollbackOnCommitFailure()) { // 在doCommit方法失败后是否进行回滚,默认为false doRollbackOnCommitException(status, ex); } else { // 触发同步回调 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException | Error ex) { // 剩余其它异常进入这个catch块 if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } // 触发同步回调 try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { // 跟回滚时同样,作一些清理动做 cleanupAfterCompletion(status); } }
整个逻辑仍是比较简单的,最核心的步骤就是调用了数据库链接对象(Connection)的commit方法。
本文主要分析了Spring中事务的实现机制,从事务实现的入口---->EnableTransactionManagement
注解,到事务实现的核心TransactionInterceptor
都作了详细的分析。其中最复杂的一块在于事务的建立,也就是createTransactionIfNecessary
,既要考虑直接调用的状况,也要考虑嵌套调用的状况,而且还须要针对不一样的传播级别作不一样的处理。对于整个事务管理,咱们须要分清楚下面这几个对象的含义
TransactionDefinition
表明的是咱们经过@Transactional
注解指定的事务的属性,包括是否只读、超时、回滚等。
TransactionStatus
表明的是事务的状态,这个状态分为不少方面,好比事务的运行状态(是否完成、是否被标记为rollbakOnly),事务的同步状态(这个事务是否开启了一个新的同步),还有事务是否设置了保存点。更准确的来讲,TransactionStatus
表明的是被@Transactional
注解修饰的方法的状态,只要被@Transactional
注解修饰了,在执行通知逻辑时就会生成一个TransactionStatus
对象,即便这个方法没有真正的开启一个数据库事务
DataSourceTransactionObject
表明的是一个数据库事务对象,实际上保存的就是一个数据库链接以及这个链接的状态。
TransactionInfo
表明的是事务相关的全部信息,组合了事务管理器,事务状态,事务定义并持有一个旧的TransactionInfo
引用,这个对象在事务管理的流程中其实没有实际的做用,主要的目的是为了让咱们在事务的运行过程当中获取到事务的相关信息,咱们能够直接调用TransactionAspectSupport
的currentTransactionInfo
方法获取到当前线程绑定的TransactionInfo
对象。
另外,看完本文但愿你们对于事务的传播机制能有更深一点的认知,而不是停留在概念上,要讲清楚事务的传播机制咱们先作如下定。
凡是在执行代理方法的过程当中从数据源从新获取了链接并调用了其setAtuoCommit(false)
方法,并且还将这个链接绑定到了线程上下文中,咱们就认为它新建了一个事务。注意了,有三个要素
链接是从数据源中获取到的
调用了链接的setAtuoCommit(false)
方法
这个链接被绑定到了线程上下文中
凡是在执行代理方法的过程当中,挂起了外部事务,而且没有新建事务,那么咱们认为这个这个方法是以非事务的方式运行的。
凡是在执行代理方法的过程当中,没有挂起外部事务,可是也没有新建事务,那么咱们认为这个方法加入了外部的事务
挂起事务的表如今于清空了线程上下文中绑定的链接!
同时咱们分为两种状况讨论来分析传播机制
直接调用
不存在加入外部事务这么一说,要么就是新建事务,要么就是以非事务的方式运行,固然,也可能抛出异常。
传播级别 | 运行方式 |
---|---|
requires_new | 新建事务 |
nested | 新建事务 |
required | 新建事务 |
supports | 非事务的方式运行 |
not_supported | 非事务的方式运行 |
never | 非事务的方式运行 |
mandatory | 抛出异常 |
间接调用
时咱们要分两种状况讨论
在外部方法新建事务的状况下时(说明外部事务的传播级别为requires_new
,nested
或者required
),当前方法的运行方式以下表所示
传播级别 | 运行方式 |
---|---|
requires_new | 新建事务 |
nested | |
required | 直接加入到外部事务 |
supports | 直接加入到外部事务 |
not_supported | 挂起外部事务,以非事务的方式运行 |
never | 抛出异常 |
mandatory | 直接加入到外部事务 |
当外部方法没有新建事务时,其实它的运行方式跟直接调用是同样的,这里就不赘述了。
本文到这里就结束了,文章很长,但愿你能够耐心看完哈~
码到手酸,求个三连啊!啊!啊!
若是本文对你由帮助的话,记得点个赞吧!也欢迎关注个人公众号,微信搜索:程序员DMZ,或者扫描下方二维码,跟着我一块儿认认真真学Java,踏踏实实作一个coder。
我叫DMZ,一个在学习路上匍匐前行的小菜鸟!