最近读了Spring声明式事务相关源码,如今将相关原理及本人注释过的实现源码整理到博客上并对一些工做中的案例与事务源码中的参数进行总结。java
名词 | 概念 |
---|---|
PlatformTransactionManager | 事务管理器,管理事务的各生命周期方法,下文简称TxMgr |
TransactionAttribute | 事务属性, 包含隔离级别,传播行为,是否只读等信息,下文简称TxAttr |
TransactionStatus | 事务状态,下文简称TxStatus |
TransactionInfo | 事务信息,内含TxMgr, TxAttr, TxStatus等信息,下文简称TxInfo |
TransactionSynchronization | 事务同步回调,内含多个钩子方法,下文简称TxSync / transaction synchronization |
TransactionSynchronizationManager | 事务同步管理器,维护当前线程事务资源,信息以及TxSync集合 |
Spring中的Propagation枚举和TransactionDefinition接口定义了7种事务传播行为:mysql
这里介绍如下Spring声明式事务的套路。若是能知晓大体套路会对接下来看源码有很大的帮助。文笔不是太好,下面的描述若有不清,还请指出。git
咱们给一个bean的方法加上@Transactional
注解后,Spring容器给咱们的是一个代理的bean。当咱们对事务方法调用时,会进入Spring的ReflectiveMethodInvocation#proceed方法。这是AOP的主要实现,在进入业务方法前会调用各类方法拦截器,咱们须要关注的拦截器是org.springframework.transaction.interceptor.TransactionInterceptor。
TransactionInterceptor的职责相似于一个“环绕切面”,在业务方法调用前根据状况开启事务,在业务方法调用完回到拦截器后进行善后清理。github
事务切面在源码中具体的实现方法是TransactionAspectSupport#invokeWithinTransaction,相信平时你们debug的时候在调用栈中常常能看到此方法。事务切面关注的是TransactionInfo(TxInfo),TxInfo是一个“很是大局观”的东西(里面啥都有:TxMgr, TxAttr, TxStatus还有前一次进入事务切面的TransactionInfo)。web
所以事务切面会调用createTransactionIfNecessary方法来建立事务并拿到一个TxInfo(不管是否真的物理建立了一个事务)。若是事务块内的代码发生了异常,则会根据TxInfo里面的TxAttr配置的rollback规则看看这个异常是否是须要回滚,不须要回滚就尝试提交,不然就尝试回滚。若是未发生异常,则尝试提交。spring
事务切面对于尝试提交会判断是否到了最外层事务(某个事务边界)。举个例子:有四个事务方法依次调用,传播行为分别是 方法1:REQUIRED, 方法2:REQUIRED, 方法3: REQUIRES_NEW, 方法4: REQUIRED。很显然这其中包含了两个独立的物理事务,当退栈到方法4的事务切面时,会发现没有到事务最外层,因此不会有真正的物理提交。而在退栈到了方法3对应的事务切面时会发现是外层事务,此时会发生物理提交。同理,退栈到方法1的事务切面时也会触发物理提交。sql
那么问题来了,Spring是怎么判断这所谓“最外层事务”的呢。
答案是TxStatus中有个属性叫newTransaction用于标记是不是新建事务(根据事务传播行为得出,好比加入已有事务则会是false),以及一个名为transaction的Object用于表示物理事务对象(由具体TxMgr子类负责给出)。Spring会根据每一层事务切面建立的TxStatus内部是否持有transaction对象以及newTransaction标志位判断是否属于外层事务。session
相似的,Spring对于回滚事务也是会在最外层事务方法对应的切面中进行物理回滚。而在非最外层事务的时候会由具体txMgr子类给对应的事务打个的标记用于标识这个事务该回滚,这样的话在全部同一物理事务方法退栈过程当中在事务切面中都能读取到事务被打了应该回滚的标记。能够说这是同一物理事务方法之间进行通讯的机制。ide
Spring事务代码中用ThreadLocal来进行资源与事务的生命周期的同步管理。函数
在事务切面层面,TransactionAspectSupport里面有个transactionInfoHolder的ThreadLocal对象,用于把TxInfo绑定到线程。那么这样在咱们的业务代码或者其余切面中,咱们能够拿到TxInfo,也能拿到TxStatus。拿到TxStatus咱们就能够调用setRollbackOnly来打标以手动控制事务必须回滚。
TransactionSynchronizationManager是Spring事务代码中对ThreadLocal使用最多的类,目前它内部含有6个ThreadLocal,分别是:
Map<Object, Object>
用于保存事务相关资源,好比咱们经常使用的DataSourceTransactionManager会在开启物理事务的时候把<DataSource, ConnectionHolder>
绑定到线程。Set<TransactionSynchronization>
用于保存transaction synchronization,这个能够理解为是回调钩子对象,内部含有beforeCommit, afterCommit, beforeCompletion等钩子方法。下面是我作的几张图,比较丑陋。举了三个不一样的事务传播状况,列一下TxInfo的信息(TxInfo中主要列了TxStatus的一些关键字段以及oldTransactionInfo字段)
最多见的REQUIRED调用REQUIRED
REQUIRED调用REQUIRES_NEW
REQUIRED调用NOT_SUPPORTED
直接进入源码实现部分,最好须要对Spring AOP,拦截器,代理等有个基本认知阅读起来会比较轻松。
TransactionInterceptor是Spring实现声明式事务的拦截器,它实现了AOP联盟的MethodInterceptor接口,它的父类TransactionAspectSupport封装了一些用于实现事务切面对事务进行管理的基本代码。下面来看一下TransactionInterceptor的继承关系。
TransactionInterceptor对于MethodInterceptor#invoke的实现很简单,就是调用父类的的invokeWithinTransaction,并传递给此方法一个回调用于继续后续的拦截调用。
@Override public Object invoke(final MethodInvocation invocation) throws Throwable { // 由于这里的invocation.getThis多是一个代理类,须要获取目标原生class。 Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // 调用父类TransactionAspectSupport的invokeWithinTransaction方法,第三个参数是一个简易回调实现,用于继续方法调用链。 return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() { @Override public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } }); }
这里就是上面TransactionInterceptor调用的invokeWithinTransaction实现,能够将之看做是一个大的环绕切面,将事务的建立与提交/回滚包在事务方法的外围。
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // 获取TransactionAttribute、PlatformTransactionManager、以及链接点方法信息。 final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. // 根据上面抓取出来的txAttribute, tm, 链接点方法等信息判断是否须要开启事务。 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // 执行回调,若是没有后续拦截器的话,就进入事务方法了。 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 事务发生异常。 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 把上一层事务的TxInfo从新绑到ThreadLocal中。 cleanupTransactionInfo(txInfo); } // 事务未发生异常。 commitTransactionAfterReturning(txInfo); return retVal; } else { try { Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, new TransactionCallback<Object>() { @Override public Object doInTransaction(TransactionStatus status) { TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { return new ThrowableHolder(ex); } } finally { cleanupTransactionInfo(txInfo); } } }); if (result instanceof ThrowableHolder) { throw ((ThrowableHolder) result).getThrowable(); } else { return result; } } catch (ThrowableHolderException ex) { throw ex.getCause(); } } }
protected TransactionInfo createTransactionIfNecessary( PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) { // 若是事务属性中name为null,则建立一个简易委托类,name为链接点方法标识。 if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // 根据事务属性判断是否须要开启事务,并返回状态。 status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); } protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm, TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) { TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); // 事务方法。 if (txAttr != null) { if (logger.isTraceEnabled()) { logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); } // The transaction manager will flag an error if an incompatible tx already exists. txInfo.newTransactionStatus(status); } else { // 非事务方法。 if (logger.isTraceEnabled()) logger.trace("Don't need to create transaction for [" + joinpointIdentification + "]: This method isn't transactional."); } // 不管是否建立了新事务,这里都会把当前的txInfo对象经过threadLocal变量绑定到当前线程。 txInfo.bindToThread(); return txInfo; }
AbstractPlatformTransactionManager是各类事务管理器的抽象基类,也能够说是骨架。它封装了不少事务管理的流程代码,子类须要实现一些模板方法。下面列出一些主要的模板方法。
咱们首先从上面createTransactionIfNecessary方法中调用到的getTransaction方法开始看起。
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { // 根据具体的tm实现获取一个transaction对象。 Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { definition = new DefaultTransactionDefinition(); } // 已经存在事务的状况。 if (isExistingTransaction(transaction)) { return handleExistingTransaction(definition, transaction, debugEnabled); } // timeout不能小于-1。 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // 若是传播行为是MANDATORY,则应该抛出异常(由于此时不存在事务) if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } // 传播行为是REQUIRED, REQUIRES_NEW, NESTED。 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // 注意这里的newTransaction标识位是true。 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 调用供子类实现的模板方法doBegin来开启事务。 doBegin(transaction, definition); // 调用TransactionSynchronizationManager来保存一些事务上下文信息。 prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } /* * 这里的else分支说明传播行为是SUPPORTS或NOT_SUPPORTED或NEVER,这几种状况对于当前无事务的逻辑都是直接继续运行。 */ else { // 若是有指定事务隔离级别,则能够打warn日志报出指定隔离级别开启但没有事务的警告。 if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); // 注意这里的transaction传的是null,这在尝试commit的时候会判断出其实没有实际须要提交的事务。 return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } } /** * prepareSynchronization方法根据status是否须要维护新的事务相关资源,信息与回调。 */ protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) { TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null); TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); TransactionSynchronizationManager.initSynchronization(); } }
下面咱们来看一下对于当前已经有事务的状况下,Spring是如何处理的:
private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { // 传播行为为NEVER,抛出异常。 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } // 传播行为为NOT_SUPPORTED,则挂起当前事务。 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } // 传播行为为REQUIRES_NEW,则挂起当前事务并开启新事务运行。 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 开启事务,由具体子类TxMgr实现。 doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } catch (Error beginErr) { resumeAfterBeginException(transaction, suspendedResources, beginErr); throw beginErr; } } // 传播行为为NESTED。 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 若是不支持嵌套事务抛出异常。 if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } // 是否对嵌套事务建立还原点。 if (useSavepointForNestedTransaction()) { DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); // 这里最终会委托SavePointManager去建立还原点。 status.createAndHoldSavepoint(); return status; } else { // 一般tm若是是JTA的话会走到这里来,这种状况就经过嵌套的begin和commit/rollback实现。 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } // 此时说明传播行为为SUPPORTS或REQUIRED或MANDATORY,则直接加入当前事务便可。 if (debugEnabled) { logger.debug("Participating in existing transaction"); } /* * validateExistingTransaction是用来对SUPPORTS和REQUIRED的传播行为进行事务定义校验的开关。 * 默认是不开启的,此时在加入事务的时候内层注解的一些设定至关于会被忽略。 */ if (isValidateExistingTransaction()) { // 若是自定义了隔离级别校验与已有事务是否匹配。 if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); // 若是已有事务隔离级别为null(说明是默认级别)或者已有事务隔离级别不等于当前事务定义的隔离级别抛出异常。 if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } // 若是已有事务为只读,但本方法事务定义为非只读,则抛出异常。 if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
/** * 本方法作的事情主要就是抓取当前事务资源,事务基本信息以及事务回调transaction synchronization等塞到SuspendedResourcesHolder中返回。 */ protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException { /* * 在TransactionSynchronizationManager的JavaDoc上已经写明了在须要进行transaction synchronization注册的时候须要先检查当前线程是否激活 * isSynchronizationActive是去读TransactionSynchronizationManager中当前线程绑定的TransactionSynchronization集合是否为null。 */ if (TransactionSynchronizationManager.isSynchronizationActive()) { // 对全部本线程当前注册的transaction synchronization调用suspend方法。 List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null; if (transaction != null) { // 挂起当前事务,由具体的TxMgr子类实现。 suspendedResources = doSuspend(transaction); } /* * 将当前线程绑定的事务名,是否只读,隔离级别,是否有实际事务等信息抓取出来。 * 与刚才抓取出来的transaction synchronization集合一块儿包到SuspendedResourcesHolder中返回。 */ String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException ex) { // 恢复transaction synchronization。 doResumeSynchronization(suspendedSynchronizations); throw ex; } catch (Error err) { // 恢复transaction synchronization。 doResumeSynchronization(suspendedSynchronizations); throw err; } } else if (transaction != null) { // 挂起当前事务。 Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { // 啥都没有。 return null; } } private List<TransactionSynchronization> doSuspendSynchronization() { // getSynchronizations返回的是一个不可变的快照。 List<TransactionSynchronization> suspendedSynchronizations = TransactionSynchronizationManager.getSynchronizations(); // 逐一挂起。 for (TransactionSynchronization synchronization : suspendedSynchronizations) { synchronization.suspend(); } // 清理以后除非从新init不然没法再register新的transaction synchronization了。 TransactionSynchronizationManager.clearSynchronization(); return suspendedSynchronizations; }
protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder) throws TransactionException { if (resourcesHolder != null) { Object suspendedResources = resourcesHolder.suspendedResources; if (suspendedResources != null) { // 恢复挂起的事务资源,由具体的TxMgr子类实现。 doResume(transaction, suspendedResources); } List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; if (suspendedSynchronizations != null) { // 还原挂起的事务的一些信息。 TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel); TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly); TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name); // 恢复挂起的transaction syncrhonization。 doResumeSynchronization(suspendedSynchronizations); } } } /** * 从新初始化当前线程维护的synchronization集合,逐一对这些synchronization调用resume方法并加入到集合中。 */ private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) { TransactionSynchronizationManager.initSynchronization(); for (TransactionSynchronization synchronization : suspendedSynchronizations) { synchronization.resume(); TransactionSynchronizationManager.registerSynchronization(synchronization); } }
commit与rollback两个方法是PlatformTransactionManager接口两个关键方法。
通常在事务切面加强的方法成功状况下会调用commit方法。在事务发生异常后,completeTransactionAfterThrowing方法会根据异常与事务规则是否匹配来决定是否须要回滚。若是须要回滚则调用rollback方法。
须要注意的是commit/rollback方法只是尝试,Spring会根据事务状态信息来具体处理,不表明必定会物理提交/回滚,Spring会在事务最外层边界才可能触发物理提交/回滚,甚至也有可能调用commit后发现须要rollback。
public final void commit(TransactionStatus status) throws TransactionException { // 若是事务已经完成了,则抛出异常。 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; /* * 经过前文的源码分析能够知道不管事务怎么传播,每一次进入事务拦截器,在进入业务方法以前都会把一个TransactionInfo对象塞到 * transactionInfoHolder这个线程本地变量中。而TransactionInfo包含了一个TransactionStatus对象。 * commit方法是在业务方法正常完成后调用的,所谓isLocalRollbackOnly就是读当前TransactionStatus对象中的rollbackOnly标志位。 * 正如其名,它是一个局部的标志位,只有建立该status的那一层在业务方法执行完毕后会读到本层status的这个局部标志位。 * * 咱们能够在用户代码(业务方法或者切面)中经过TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); * 来置当前事务层的status对象的rollbackOnly标志位为true以手动控制回滚。 */ if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus); return; } /* * shouldCommitOnGlobalRollbackOnly默认实现是false。 * 这里判断的语义就是若是发现事务被标记全局回滚而且在全局回滚标记状况下不该该提交事务的话,则进行回滚。 * * 咱们一般用的DataSourceTransactionManager对于isGlobalRollbackOnly的判断是去读status中transaction对象的ConnectionHolder的rollbackOnly标志位。 */ if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus); /* * isNewTransaction用来判断是不是最外层(事务边界)。 * 举个例子:传播行为REQUIRE方法调REQUIRE方法再调REQUIRE方法,第三个方法抛出异常,第二个方法捕获,第一个方法走到这里会发现到了最外层事务边界。 * 而failEarlyOnGlobalRollbackOnly是一个标志位,若是开启了则会尽早抛出异常。 * * 默认状况下failEarlyOnGlobalRollbackOnly开关是关闭的。这样若是内层事务发生了异常,退栈到外层事务后,代码走到这里回滚完后会抛出UnexpectedRollbackException。 */ if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } return; } processCommit(defStatus); } private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { // 钩子函数,TxMgr子类能够覆盖默认的空实现。 prepareForCommit(status); // 回调transaction synchronization的beforeCommit方法。 triggerBeforeCommit(status); // 回调transaction synchronization的beforeCompletion方法。 triggerBeforeCompletion(status); beforeCompletionInvoked = true; boolean globalRollbackOnly = false; if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { globalRollbackOnly = status.isGlobalRollbackOnly(); } if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } status.releaseHeldSavepoint(); } // 最外层事务边界。 else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } // 由具体TxMgr子类实现。 doCommit(status); } /* * 咱们通常用的DataSourceTransactionManager是不会走到这里的。 * 由于默认shouldCommitOnGlobalRollbackOnly开关是关闭的,检测到golobalRollbackOnly是不会走到processCommit方法的。 * * 但shouldCommitOnGlobalRollbackOnly这个开关对于JtaTransactionManager来讲是默认开启的,这里主要是须要针对检测到globalRollbackOnly可是doCommit没有抛出异常的状况。 */ if (globalRollbackOnly) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // 回调transaction synchronization的afterCompletion方法。 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // doCommit发生异常后,根据rollbackOnCommitFailure开关决定是否回滚,此开关默认关闭。 if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { // 回调transaction synchronization的afterCompletion方法。 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } catch (Error err) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, err); throw err; } try { // 回调transaction synchronization的afterCommit方法。 triggerAfterCommit(status); } finally { // 回调transaction synchronization的afterCompletion方法。 triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { // 后续工做:status标记completed,在最外层清空transaction synchronization集合,恢复挂起事务资源等等。 cleanupAfterCompletion(status); } }
AbstractPlatformTransactionManager#rollback方法,不然调用commit方法。
public final void rollback(TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; processRollback(defStatus); } private void processRollback(DefaultTransactionStatus status) { try { try { // 回调transaction synchronization对象的beforeCompletion方法。 triggerBeforeCompletion(status); if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint"); } status.rollbackToHeldSavepoint(); } // 在最外层事务边界进行回滚。 else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } // 由具体TxMgr子类实现回滚。 doRollback(status); } else if (status.hasTransaction()) { /* * 内层事务被标记为rollBackOnly或者globalRollbackOnParticipationFailure开关开启时,给当前事务标记须要回滚。 * * 若是内层事务显式打上了rollBackOnly的标记,最终全事务必定是回滚掉的。 * * 但若是没有被打上rollBackOnly标记,则globalRollbackOnParticipationFailure开关就很重要了。 * globalRollbackOnParticipationFailure开关默认是开启的,也就是说内层事务挂了,最终的结果只能是全事务回滚。 * 但若是globalRollbackOnParticipationFailure开关被关闭的话,内层事务挂了,外层事务业务方法中能够根据状况控制是否回滚。 */ if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } // 由具体TxMgr子类实现回滚。 doSetRollbackOnly(status); } else { if (status.isDebug()) { logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); } } } else { logger.debug("Should roll back transaction but cannot - no transaction available"); } } catch (RuntimeException ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } catch (Error err) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw err; } // 回调transaction synchronization对象的afterCompletion方法。 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); } finally { // 后续工做:status标记completed,在最外层清空transaction synchronization集合,恢复挂起事务资源等等。 cleanupAfterCompletion(status); } }
这个问题也是使用Spring声明式事务很常见的问题。
首先保证配置都是正确的,而且开启了Spring事务(好比@EnableTransactionManagement
)。
要明白进事务的本质就是进到事务切面的代理方法中,最多见的是同一个类的非事务方法调用一个加了事务注解的方法没进入事务。
咱们以cglib代理为例,因为Spring的对于cglib AOP代理的实现,进入被代理方法的时候实际上已经离开了“代理这一层壳子”,能够认为代码走到的是一个朴素的bean,调用同一个bean中方法天然与代理没有半毛钱关系了。
通常对于声明式事务都是以调用另外一个类的加了@Transactional
注解的public方法做为入口的。
这个案例是工做中同事让我看的一个线上真实案例。
下面是我将原问题用简化代码的形式模拟业务场景的简单demo,只有core Java代码和Spring相关注解的可运行代码。
首先建立一个表。
create table t (value varchar(20));
新建三个类, ServiceA, ServiceB和Response。
/** * 模拟加了事务注解的外层service。 */ @Service public class ServiceA { @Autowired private DataSource dataSource; @Autowired private ServiceB serviceB; @Transactional public Response insert() { executeSql("insert into t select 'serviceA 开始'"); try { serviceB.insert(); } catch (Exception e) { System.out.println("serviceB#insert挂了,缘由: " + e); return Response.FAIL; } return Response.SUCC; } private void executeSql(String sql) { Connection connection = DataSourceUtils.getConnection(dataSource); try { connection.createStatement().execute(sql); } catch (SQLException e) { throw new RuntimeException(e); } } }
/** * 被外层ServiceA调用的ServiceB,抛出异常模拟来模拟挂了的状况。 */ @Service public class ServiceB { @Autowired private DataSource dataSource; // 用于控制是否模拟insert方法挂了的状况。 private boolean flag = true; @Transactional public void insert() { executeSql("insert into t select '这里是ServiceB挂以前'"); if (true) { throw new RuntimeException("模拟内层事务某条语句挂了的状况"); } executeSql("insert into t select '这里是ServiceB挂以后'"); } private void executeSql(String sql) { Connection connection = DataSourceUtils.getConnection(dataSource); try { connection.createStatement().execute(sql); } catch (SQLException e) { throw new RuntimeException(e); } } }
/** * Response类。 */ public class Response { public static final Response SUCC = new Response(); public static final Response FAIL = new Response(); }
若是调用ServiceA#insert能够观察到控制台输出
serviceB#insert挂了,缘由: java.lang.RuntimeException: 模拟内层事务挂了的状况
的报错日志。
但ServiceA#insert没有返回Response.FAIL而且Spring还抛出了异常
org.springframework.transaction.UnexpectedRollbackException: Transaction rolled back because it has been marked as rollback-only
观察MySQL通用日志,结果以下:
2017-10-03T12:46:21.791139Z 382 Connect root@localhost on test using SSL/TLS 2017-10-03T12:46:21.796530Z 382 Query /* mysql-connector-java-5.1.42 ( Revision: 1f61b0b0270d9844b006572ba4e77f19c0f230d4 ) */SELECT @@session.auto_increment_increment AS auto_increment_increment, @@character_set_client AS character_set_client, @@character_set_connection AS character_set_connection, @@character_set_results AS character_set_results, @@character_set_server AS character_set_server, @@init_connect AS init_connect, @@interactive_timeout AS interactive_timeout, @@license AS license, @@lower_case_table_names AS lower_case_table_names, @@max_allowed_packet AS max_allowed_packet, @@net_buffer_length AS net_buffer_length, @@net_write_timeout AS net_write_timeout, @@query_cache_size AS query_cache_size, @@query_cache_type AS query_cache_type, @@sql_mode AS sql_mode, @@system_time_zone AS system_time_zone, @@time_zone AS time_zone, @@tx_isolation AS tx_isolation, @@wait_timeout AS wait_timeout 2017-10-03T12:46:21.821206Z 382 Query SET character_set_results = NULL 2017-10-03T12:46:21.821757Z 382 Query SET autocommit=1 2017-10-03T12:46:21.826013Z 382 Query SET autocommit=0 2017-10-03T12:46:21.837792Z 382 Query select @@session.tx_read_only 2017-10-03T12:46:21.840326Z 382 Query insert into t select 'serviceA 开始' 2017-10-03T12:46:21.850478Z 382 Query select @@session.tx_read_only 2017-10-03T12:46:21.853736Z 382 Query insert into t select '这里是ServiceB挂以前' 2017-10-03T12:46:21.854520Z 382 Query rollback 2017-10-03T12:46:21.855058Z 382 Query SET autocommit=1 2017-10-03T12:46:21.855514Z 382 Query select @@session.tx_read_only 2017-10-03T12:46:21.856284Z 382 Quit
能够很清楚的看到整个事务最终被回滚掉了, ServiceB#insert并无执行insert into t select '这里是ServiceB挂以后'
。
其实对于Spring事务来讲,这样的结果是正确的,但对于开发者来讲,这个结果确实看似有些“不能理解”。
咱们不妨来分析一下缘由:
首先ServiceB#insert自己是直接抛出RuntimeException的,那么退栈到事务切面后,事务切面会发现须要回滚但由于ServiceB#insert还不是事务的最外层边界,因此在AbstractPlatformTransactionManager#processRollback方法仅仅会调用doSetRollbackOnly(status);
,子类DataSourceTransactionManager会拿出TxStatus中的transaction对象打上回滚标记,具体来讲就是transaction对象(对于DataSourceTransactionManager来讲类型是DataSourceTransactionObject)会取出ConnectionHolder,调用setRollbackOnly。咱们知道这样就至关于标记是一个全局的标记了,由于只要是隶属于同一个物理事务的Spring事务都可以读到同一个ConnectionHolder。
好了,接下来到了ServiceA在catch块准备返回Response.FAIL的时候,退栈到事务切面,在AbstractPlatformTransactionManager#commit方法读到if(!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly())
条件成立,接下来调用processRollback
,因为在事务最外层边界会物理回滚掉,而且也正是到了事务最外层边界,Spring抛出UnexpectedRollbackException。
至此缘由已经分析完毕。
那么问题怎么解决呢,这个问题有好几种解决办法,可是得根据具体状况决定。
第一种: 根据实际代码与业务状况看看ServiceB#insert是否有必要加事务。若是不加事务的话,其实就事务角度来分析,ServiceB#insert至关于被内联到了ServiceA#insert中。就上面的示例而言,若是咱们把ServiceB#insert的事务注解拿掉,则事务是能够顺利提交的,Spring也不会抛出UnexpectedRollbackException。可是ServiceB#insert实际上并无完整执行,因此这样的解决思路很容易致使出现不完整的脏数据。固然仍是要看具体业务需求,若是能够接受的话也无所谓。
第二种:手动控制是否回滚。若是不能接受ServiceB挂的话,能够在catch块里加上TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
用于显式控制回滚。这样Spring就明白你本身要求回滚事务,而不是unexpected了。Spring也不会抛出UnexpectedRollbackException了。那么若是在ServiceA中捕获到异常,真的就是不想回滚,即使ServiceA发生了异常,也想要最终提交整个事务呢?若是有这样的需求的话,能够给TxMgr配置一个参数setGlobalRollbackOnParticipationFailure(false);
。这样只要没有显式在代码里经过给TxStatus设置回滚标记,Spring在内层事务挂了的状况,不会去给该事务打上须要回滚的标记。换句话说,这时候处于彻底手动挡来控制内层事务挂了的状况到底整个事务的提交/回滚了。
第三种:继续向上抛出异常。能够考虑继续向上抛出异常,通常在web项目都会配置一个大的异常处理切面,统一返回失败Response。Service层不须要考虑Response。所以能够考虑重构掉ServiceA的方法签名,改成void,不关心Response。也不用去捕捉ServiceB的异常了。
本文主要以Spring声明式事务为切入点,介绍了Spring事务的实现原理与源码。因为在前文的套路简介中也以文字描述了Spring声明式事务的大体套路,这里再也不赘述。这里顺便提一句,阅读Spring源码除了看注释,调试代码,其实很个东西很容易忽视——Spring打印log的语句,那些语句的内容不少时候都是颇有启发的,会让你忽然明白整个分支逻辑究竟是想干什么。
这里再整理一下整个事务切面的流程:
下面总结一下Spring事务控制的一些重要参数。掌握这些参数能够更灵活地配置TxMgr。
Spring事务在控制提交和回滚中用了很多判断条件,了解其中一些关键参数的含义对debug问题颇有帮助。下文描述的一些控制参数的默认是指在AbstractPlatformTransactionManager中的默认值。
TransactionSynchronizationManager.getCurrentTransactionName()
也能拿到当前的事务名(为此NOT_SUPPORTED事务方法名)。同理,在没有事务的状况下进入SUPPORTS传播行为的方法也可以读到当前事务名currentTransactionName。在阅读源码的过程当中,避免不了阅读Java Doc注释,发现有一处写了个"PROPAGATION_REQUIRES",按照TransactionDefinition中定义的常量应该是"PROPAGATION_REQUIRED"以及"PROPAGATION_REQUIRES_NEW",前者是"REQUIRED"而不是"REQUIRES"。我用正则在Spring全源码中搜了一下把"PROPAGATION_REQUIRED"写成"PROPAGATION_REQUIRES"的有三处,而后给Spring发了一个PR。已经合入Spring主干。
Spring 4.3.5 源码 StackOverflow