Spring Transaction 事务的使用和实现原理php
业务系统的数据,通常最后都会落入到数据库中,例如 MySQL
、Oracle
等主流数据库,不可避免的,在数据更新时,有可能会遇到错误,这时须要将以前的数据更新操做撤回,避免错误数据。html
Spring
的声明式事务能帮咱们处理回滚操做,让咱们不须要去关注数据库底层的事务操做,能够不用在出现异常状况下,在 try / catch / finaly 中手写回滚操做。java
Spring
的事务保证程度比行业中其它技术(例如 TCC
/ 2PC
/ 3PC
等)稍弱一些,但使用 Spring
事务已经知足大部分场景,因此它的使用和配置规则也是值得学习的。mysql
接下来一块儿学习 Spring
事务是如何使用以及实现原理吧。git
1.建立数据库表github
create table test.user(
id int auto_increment
primary key,
name varchar(20) null, age int(3) null)
engine=InnoDB charset=utf8;
复制代码
2.建立对应数据库表的 POspring
public class JdbcUser {
private Integer id;
private String name;
private Integer age;
...(使用 ctrl + N 进行代码补全 setter 和 getter)
}
复制代码
3.建立表与实体间的映射sql
在使用 JdbcTemplate
时很纠结,在 Java
类中写了不少硬编码 SQL
,与 MyBatis
使用方法不同,为了示例简单,使用了 JdbcTemplate
,不过仍是建议朋友们用 MyBatis
,让代码风格整洁。数据库
public class UserRowMapper implements RowMapper {
@Override
public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
JdbcUser user = new JdbcUser();
user.setId(rs.getInt("id"));
user.setName(rs.getString("name"));
user.setAge(rs.getInt("age"));
return user;
}
}
复制代码
4.建立数据操做接口编程
public interface UserDao {
/** * 插入 * @param user 用户信息 */
void insertUser(JdbcUser user);
/** * 根据 id 进行删除 * @param id 主键 */
void deleteById(Integer id);
/** * 查询 * @return 所有 */
List<JdbcUser> selectAll();
}
复制代码
5.建立数据操做接口实现类
跟书中例子不同,没有在接口上加入事务注解,而是在公共方法上进行添加,能够在每一个方法上自定义传播事件、隔离级别。
public class UserJdbcTemplate implements UserDao {
private DataSource dataSource;
private JdbcTemplate jdbcTemplate;
@Override
@Transactional(propagation = Propagation.REQUIRED)
public void insertUser(JdbcUser user) {
String sql = "insert into user (id, name, age) values (?, ?, ?)";
jdbcTemplate.update(sql, user.getId(), user.getName(), user.getAge());
System.out.println("Create record : " + user.toString());
}
@Override
@Transactional
public void deleteById(Integer id) {
String sql = "delete from user where id = ?";
jdbcTemplate.update(sql, id);
System.out.println("Delete record, id = " + id);
// 事务测试,抛出异常,让上面的插入操做回滚
throw new RuntimeException("aa");
}
@Override
public List<JdbcUser> selectAll() {
String sql = "select * from user";
List<JdbcUser> users = jdbcTemplate.query(sql, new UserRowMapper());
return users;
}
public void setDataSource(DataSource dataSource) {
// 使用 setter 注入参数时,同时初始化 jdbcTemplate
this.dataSource = dataSource;
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
}
复制代码
6.建立配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<!-- 数据源 MySQL -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/test?characterEncoding=utf8"/>
<property name="username" value="root"/>
<property name="password" value="12345678"/>
</bean>
<bean id="userJdbcTemplate" class="transaction.UserJdbcTemplate">
<property name="dataSource" ref="dataSource"/>
</bean>
<!-- 开启事务,若是将这行去掉,将不会建立事务 -->
<tx:annotation-driven/>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
</beans>
复制代码
7.添加依赖
记得添加数据库链接和 jdbc
、tx
这两个 spring
模块的依赖
optional(project(":spring-jdbc")) // for Quartz support
optional(project(":spring-tx")) // for Quartz support
compile group: 'mysql', name: 'mysql-connector-java', version: '5.1.6'
复制代码
8.启动代码
public class TransactionBootstrap {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("transaction/transaction.xml");
UserJdbcTemplate jdbcTemplate = (UserJdbcTemplate) context.getBean("userJdbcTemplate");
System.out.println("--- Records Creation start ----");
JdbcUser user = new JdbcUser(4, "test", 21);
jdbcTemplate.insertUser(user);
}
}
复制代码
经过上面的代码,我作了两个测试:
<tx:annotation-driven/>
这一行被注释了,虽然咱们执行的方法中抛出了 RuntimeExcepton
,可是数据库中依旧被插入了数据。Spring
成功执行了事务,回滚了插入操做。具体位置在:org.springframework.transaction.annotation.Transactional
属性 | 类型 | 做用 |
---|---|---|
value | String | 可选的限定描述符,指定使用的事务管理器 |
propagation | 枚举:Propagation | 可选的事务传播行为 |
isolation | 枚举:Isolation | 可选的事务隔离级别设置 |
readOnly | boolean | 设置读写或只读事务,默认是只读 |
rollbackFor | Class 数组,必须继承自 Throwable | 致使事务回滚的异常类数组 |
rollbackForClassName | 类名称数组,必须继承自 Throwable | |
noRollbackFor | Class 数组,必须继承自 Throwable | 不会致使事务回滚的异常类数组 |
noRollbackForClassName | 类名称数组,必须继承自 Throwable |
这是默认的传播属性,若是外部调用方有事务,将会加入到事务,没有的话新建一个。
若是当前存在事务,则加入到该事务;若是当前没有事务,则以非事务的方式继续运行。
以非事务方式运行,若是当前存在事务,则把当前事务挂起。
以非事务方式运行,若是当前存在事务,则抛出异常。
最低级别,只能保证不读取 物理上损害的数据,容许脏读
只能读到已经提交的数据
可重复读
串行化读,读写相互阻塞
这里只是简单描述了一下这两个主要属性,由于底层与数据库相关,能够看下我以前整理过的 MySQL锁机制
介绍完如何使用还有关键属性设定,本着知其然,知其因此然的学习精神,来了解代码是如何实现的吧。
以前在解析自定义标签时提到,AOP
和 TX
都使用了自定义标签,按照咱们上一篇 AOP
的学习,再来一遍解析自定义标签的套路:事务自定义标签。
定位到 TxNamespaceHandler
类的初始化方法:
@Override
public void init() {
registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
// 使用 AnnotationDrivenBeanDefinitionParser 解析器,解析 annotation-driven 标签
registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}
复制代码
根据上面的方法,Spring
在初始化时候,若是遇到诸如 <tx:annotation-driven>
开头的配置后,将会使用 AnnotationDrivenBeanDefinitionParser
解析器的 parse
方法进行解析。
public BeanDefinition parse(Element element, ParserContext parserContext) {
registerTransactionalEventListenerFactory(parserContext);
String mode = element.getAttribute("mode");
// AspectJ 另外处理
if ("aspectj".equals(mode)) {
// mode="aspectj"
registerTransactionAspect(element, parserContext);
if (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader())) {
registerJtaTransactionAspect(element, parserContext);
}
}
else {
// mode="proxy"
AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
}
return null;
}
复制代码
Spring
中的事务默认是以 AOP
为基础,若是须要使用 AspectJ
的方式进行事务切入,须要在 mode
属性中配置:
<tx:annotation-driven mode="aspectj"/>
复制代码
本篇笔记主要围绕着默认实现方式,动态 AOP
来学习,若是对于 AspectJ
实现感兴趣请查阅更多资料~
与 AOP
同样,在解析时,会建立一个自动建立代理器,在事务 TX
模块中,使用的是 InfrastructureAdvisorAutoProxyCreator
。
首先来看,在默认配置状况下,AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext)
作了什么操做:
private static class AopAutoProxyConfigurer {
public static void configureAutoProxyCreator(Element element, ParserContext parserContext) {
// 注册 InfrastructureAdvisorAutoProxyCreator 自动建立代理器
AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);
// txAdvisorBeanName = org.springframework.transaction.config.internalTransactionAdvisor
String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME;
if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) {
Object eleSource = parserContext.extractSource(element);
// Create the TransactionAttributeSource definition.
// 建立 TransactionAttributeSource 的 bean
RootBeanDefinition sourceDef = new RootBeanDefinition(
"org.springframework.transaction.annotation.AnnotationTransactionAttributeSource");
// 注册 bean,并使用 Spring 中的定义规则生成 beanName
String sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef);
// 建立 TransactionInterceptor 的 bean
RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);
interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);
// 建立 TransactionAttributeSourceAdvisor 的 bean
RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
// 将 sourceName 的 bean 注入 advisor 的 transactionAttributeSource 属性中
advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
// 将 interceptorName 的 bean 注入到 advisor 的 adviceBeanName 属性中
advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);
if (element.hasAttribute("order")) {
// 若是配置了 order 属性,则加入到 bean 中
advisorDef.getPropertyValues().add("order", element.getAttribute("order"));
}
// 以 txAdvisorBeanName 名字注册 advisorDef
parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);
// 建立 CompositeComponentDefinition
CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource);
compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName));
compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName));
compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName));
parserContext.registerComponent(compositeDef);
}
}
}
复制代码
在这里注册了代理类和三个 bean
,这三个关键 bean
支撑了整个事务功能,为了待会更好的理解这三者的关联关系,咱们先来回顾下 AOP
的核心概念:
org.aopalliance.aop.Advice
。还有其它继承接口,例如 MethodBeforeAdvice
,特定指方法执行前的加强。Advice
和 Pointcut
的适配器。回顾完 AOP
的概念后,继续来看下这三个关键 bean
:
Advice
接口,在这里定义了拦截行为。Pointcut
接口,可是在后面目标方法判断的时候,实际上仍是委托给了 AnnotationTransactionAttributeSource.getTransactionAttributeSource
,经过适配器模式,返回了 Pointcut
切点信息。Advisor
接口,包装了上面两个信息。这三个 bean
组成的结构与 AOP
切面环绕实现的结构一致,因此先学习 AOP
的实现,对事务的了解会有所帮助
接着看咱们的自动建立代理器是如何建立的:
AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element)
public static void registerAutoProxyCreatorIfNecessary( ParserContext parserContext, Element sourceElement) {
BeanDefinition beanDefinition = AopConfigUtils.registerAutoProxyCreatorIfNecessary(
parserContext.getRegistry(), parserContext.extractSource(sourceElement));
useClassProxyingIfNecessary(parserContext.getRegistry(), sourceElement);
registerComponentIfNecessary(beanDefinition, parserContext);
}
private static void registerComponentIfNecessary(@Nullable BeanDefinition beanDefinition, ParserContext parserContext) {
if (beanDefinition != null) {
// 注册的 beanName 是 org.springframework.aop.config.internalAutoProxyCreator
parserContext.registerComponent(
new BeanComponentDefinition(beanDefinition, AopConfigUtils.AUTO_PROXY_CREATOR_BEAN_NAME));
}
}
复制代码
在这一步中,注册了一个 beanName
是 org.springframework.aop.config.internalAutoProxyCreator
的 bean
:InfrastructureAdsivorAutoProxyCreator
,下图是它的继承体系图:
能够看到,它实现了 InstantiationAwareBeanPostProcessor
这个接口,也就是说在 Spring
容器中,全部 bean
实例化时,Spring
都会保证调用其 postProcessAfterInitialization
方法。
与上一篇介绍的 AOP
代理器同样,在实例化 bean
的时候,调用了代理器父类 AbstractAutoProxyCreator
的 postProcessAfterInitialization
方法:
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
if (bean != null) {
// 组装 key
Object cacheKey = getCacheKey(bean.getClass(), beanName);
if (this.earlyProxyReferences.remove(cacheKey) != bean) {
// 若是适合被代理,则须要封装指定的 bean
return wrapIfNecessary(bean, beanName, cacheKey);
}
}
return bean;
}
复制代码
其中关于 wrapIfNecessory
方法,在上一篇 AOP
中已经详细讲过,这里讲下这个方法作了什么工做:
bean
对应的加强器与建立 AOP
代理类似的过程就再也不重复说,讲下它们的不一样点:
AopUtils#canApply(Advisor, Class<?>, boolean)
public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
if (advisor instanceof IntroductionAdvisor) {
return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
}
else if (advisor instanceof PointcutAdvisor) {
PointcutAdvisor pca = (PointcutAdvisor) advisor;
return canApply(pca.getPointcut(), targetClass, hasIntroductions);
}
else {
// It doesn't have a pointcut so we assume it applies.
return true;
}
}
复制代码
咱们在前面看到,TransactionAttributeSourceAdvisor
的父类是 PointcutAdvisor
,因此在目标方法判断的时候,会取出切点信息 pca.getPointcut()
。
咱们以前注入的切面类型 bean
是 AnnotationTransactionAttributeSource
,经过下面的方法包装,最后返回对象类型是 TransactionAttributeSourcePointcut
的切点信息
private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
@Override
@Nullable
protected TransactionAttributeSource getTransactionAttributeSource() {
// 实现父类的方法,在子类中进行了扩展,返回以前在标签注册时的AnnotationTransactionAttributeSource
return transactionAttributeSource;
}
};
复制代码
在匹配 match
操做中,区别的是 AOP
识别的是 @Before
、@After
,而咱们的事务 TX
识别的是 @Transactional
标签。
判断是不是事务方法的入口方法在这:
org.springframework.transaction.interceptor.TransactionAttributeSourcePointcut#matches
@Override
public boolean matches(Method method, Class<?> targetClass) {
// 事务切点匹配的方法
TransactionAttributeSource tas = getTransactionAttributeSource();
return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}
复制代码
那它到底到哪一步解析事务注解的呢,继续跟踪代码,答案是:
AnnotationTransactionAttributeSource#determineTransactionAttribute
protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
for (TransactionAnnotationParser parser : this.annotationParsers) {
TransactionAttribute attr = parser.parseTransactionAnnotation(element);
if (attr != null) {
return attr;
}
}
return null;
}
复制代码
在这一步中,遍历注册的注解解析器进行解析,因为咱们关注的是事务解析,因此直接定位到事务注解的解析器:
SpringTransactionAnnotationParser#parseTransactionAnnotation(AnnotatedElement)
public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
// 解析事务注解的属性
AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
element, Transactional.class, false, false);
if (attributes != null) {
return parseTransactionAnnotation(attributes);
}
else {
return null;
}
}
复制代码
首先判断是否含有 @Transactional
注解,若是有的话,才继续调用 parse
解析方法:
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
// 注释 9.4 解析事务注解的每个属性
Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
rbta.setTimeout(attributes.getNumber("timeout").intValue());
rbta.setReadOnly(attributes.getBoolean("readOnly"));
rbta.setQualifier(attributes.getString("value"));
List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
rbta.setRollbackRules(rollbackRules);
return rbta;
}
复制代码
经过上面的步骤,完成了对应类或者方法的事务属性解析。
主要步骤在于寻找加强器,以及判断这些加强器是否与方法或者类匹配。
若是某个 bean
属于可被事务加强时,也就是适用于加强器 BeanFactoryTransactionAttributeSourceAdvisor
进行加强。
以前咱们注入了 TransactionInterceptor
到 BeanFactoryTransactionAttributeSourceAdvisor
中,因此在调用事务加强器加强的代理类时,会执行 TransactionInterceptor
进行加强。同时,也就是在 TransactionInterceptor
类中的 invoke
方法中完成整个事务的逻辑。
TransactionInterceptor
支撑着整个事务功能的架构。跟以前 AOP
的 JDK
动态代理 分析的同样,TransactionInterceptor
拦截器继承于 MethodInterceptor
,因此咱们要从它的关键方法 invoke()
看起:
public Object invoke(MethodInvocation invocation) throws Throwable {
// 注释 9.5 执行事务拦截器,完成整个事务的逻辑
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
复制代码
实际调用了父类的方法:TransactionAspectSupport#invokeWithinTransaction
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
// 若是transaction属性为null,则该方法是非事务性的
TransactionAttributeSource tas = getTransactionAttributeSource();
// 获取对应事务属性
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取事务管理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 构造方法惟一标识(类.方法)
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 声明式事务处理
// 标准事务划分 : 使用 getTransaction 和 commit / rollback 调用
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal;
try {
//传入的是回调函数对象: invocation.proceed。 执行被加强的方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 异常回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清除信息
cleanupTransactionInfo(txInfo);
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
// 编程式事务处理
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
...
return result;
}
}
}
复制代码
贴出的代码有删减,简略了错误异常的 try / catch
和编程式事务处理的逻辑。由于咱们更多使用到的是声明式事务处理,就是在 XML
文件配置或者 @Transactional
注解编码,实际经过 AOP
实现,而编程式事务处理是经过 Transaction Template
实现,比较少使用到,因此省略了这部分处理代码。
经过该方法,肯定要用于给定事务的特定事务管理器
TransactionAspectSupport#determineTransactionManager
protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
// Do not attempt to lookup tx manager if no tx attributes are set
// 注释 9.6 寻找事务管理器
if (txAttr == null || this.beanFactory == null) {
// 若是没有事务属性或者 BeanFactory 为空时,从缓存里面寻找
return asPlatformTransactionManager(getTransactionManager());
}
String qualifier = txAttr.getQualifier();
// 若是注解配置中指定了事务管理器,直接取出使用
if (StringUtils.hasText(qualifier)) {
return determineQualifiedTransactionManager(this.beanFactory, qualifier);
}
else if (StringUtils.hasText(this.transactionManagerBeanName)) {
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
}
else {
// 上面步骤都没找到,最后才去容器中,根据 className 来寻找
PlatformTransactionManager defaultTransactionManager = asPlatformTransactionManager(getTransactionManager());
...
return defaultTransactionManager;
}
}
复制代码
因为最开始咱们在 XML
文件中配置过 transactionManager
属性,因此该方法在咱们例子中将会返回类型是 DataSourceTransactionManager
的事务管理器,下面是 DataSourceTransactionManager
的继承体系:
它实现了 InitializingBean
接口,不过只是在 afterPropertiesSet()
方法中,简单校验 dataSource
是否为空,不细说这个类。
TransactionAspectSupport#createTransactionIfNecessary
protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
// 若是没有名称指定则使用方法惟一标识,并使用 DelegatingTransactionAttribute 包装 txAttr
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 获取 TransactionStatus
status = tm.getTransaction(txAttr);
}
}
// 根据指定的属性与 status 准备一个 TransactionInfo
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
复制代码
在建立事务方法中,主要完成如下三件事:
DelegatingTransactionAttribute
包装 txAttr
实例tm.getTransaction(txAttr)
prepareTransactionInfo(tm, txAttr, joinpointIdentification, status)
核心方法在第二点和第三点,分别摘取核心进行熟悉。
status = tm.getTransaction(txAttr);
因为代码较长,直接来总结其中几个关键点
建立对应的事务实例,咱们使用的是 DataSourceTransactionManager
中的 doGetTransaction
方法,建立基于 JDBC
的事务实例。
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 若是当前线程已经记录数据库连接则使用原有连接
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
// false 表示非新建立链接
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
复制代码
其中在同一个线程中,判断是否有重复的事务,是在 TransactionSynchronizationManager.getResource(obtainDataSource())
中完成的,关键判断逻辑是下面这个:
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
复制代码
结论:resources
是一个 ThreadLocal
线程私有对象,每一个线程独立存储,因此判断是否存在事务,判断的依据是当前线程、当前数据源(DataSource)中是否存在活跃的事务 - map.get(actualKey)
。
根据前面说的,判断当前线程是否存在事务,判断依据为当前线程记录的链接不为空且链接中(connectionHolder)中的 transactionActive
属性不为空,若是当前线程存在事务,将根据不一样的事务传播特性进行处理。具体代码逻辑以下:
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
// 当前线程存在事务,分状况进行处理
return handleExistingTransaction(def, transaction, debugEnabled);
}
复制代码
在配置中配置设定为 PROPAGATION_NEVER
,表示该方法须要在非事务的环境下运行,但处于事务处理的状态(多是外部带事务的方法调用了非事务的方法),将会抛出异常:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
复制代码
若是有事务存在,将事务挂起,而不是抛出异常:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
复制代码
对于挂起操做,主要目的是记录原有事务的状态,以便于后续操做对事务的恢复:
实际上,suspend()
方法调用的是事务管理器 DataSourceTransactionManager
中的 doSuspend()
方法
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 将数据库链接设置为 null
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
复制代码
最后调用的关键方法是 TransactionSynchronizationManager#doUnbindResource
private static Object doUnbindResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.remove(actualKey);
if (map.isEmpty()) {
resources.remove();
}
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
value = null;
}
if (value != null && logger.isTraceEnabled()) {
Thread.currentThread().getName() + "]");
}
return value;
}
复制代码
看了第七条参考资料中的文章,结合代码理解了事务挂起的操做:移除当前线程、数据源活动事务对象的一个过程
那它是如何实现事务挂起的呢,答案是在 doSuspend()
方法中的 txObject.setConnectionHolder(null)
,将 connectionHolder
设置为 null
。
一个 connectionHolder
表示一个数据库链接对象,若是它为 null
,表示在下次须要使用时,得从缓存池中获取一个链接,新链接的自动提交是 true
。
表示当前方法必须在它本身的事务里运行,一个新的事务将被启动,而若是有一个事务正在运行的话,则这个方法运行期间被挂起。
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 新事务的创建
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
复制代码
与前一个方法相同的是,在 PROPAGATION_REQUIRES_NEW
广播特性下,也会使用 suspend
方法将原事务挂起。方法 doBegin()
,是事务开启的核心。
表示若是当前正有一个事务在运行中,则该方法应该运行在一个嵌套的事务中,被嵌套的事务能够独立于封装事务进行提交或者回滚,若是封装事务不存在,行为就像 PROPAGATION_REQUIRES_NEW
。
在代理处理上,有两个分支,与 PROPAGATION_REQUIRES_NEW
类似的不贴出来,讲下使用 savepoint
保存点的方式事务处理:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 嵌入式事务的处理
if (useSavepointForNestedTransaction()) {
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 建立 savepoint
status.createAndHoldSavepoint();
return status;
}
}
复制代码
学习过数据库的朋友应该清楚 savepoint
,能够利用保存点回滚部分事务,从而使事务处理更加灵活和精细。跟踪代码,发现建立保存点调用的方法是 org.hsqldb.jdbc.JDBCConnection#setSavepoint(java.lang.String)
,感兴趣的能够往下继续深刻学习~
其实在前面方法中,都出现过这个方法 doBegin()
,在这个方法中建立事务,顺便设置数据库的隔离级别、timeout
属性和设置 connectionHolder
:
DataSourceTransactionManager#doBegin
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 设置隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// configured the connection pool to set it already).
// 更改自动提交设置,由 spring 进行控制
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
con.setAutoCommit(false);
}
// 准备事务链接
prepareTransactionalConnection(con, definition);
// 设置判断当前线程是否存在事务的依据
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
// 将当前获取到的链接绑定到当前线程
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
}
复制代码
结论:Spring 事务的开启,就是将数据库自动提交属性设置为 false
在声明式的事务处理中,主要有如下几个处理步骤:
tas.getTransactionAttribute(method, targetClass)
TransactionManager
:determineTransactionManager(txAttr);
createTransactionIfNecessary(tm, txAttr, joinpointIdentification)
invocation.proceed()
completeTransactionAfterThrowing(txInfo, ex);
cleanupTransactionInfo(txInfo)
commitTransactionAfterReturning(txInfo)
这两步操做,主要调用了底层数据库链接的 API
,因此没有细说。
本篇文章简单记录了如何使用 Spring
的事务,以及在代码中如何实现。
在以前的使用场景中,只用到了默认配置的声明式事务 @Transactional
,不了解其它属性设置的含义,也不知道在默认配置下,若是是同一个类中的方法自调用是不支持事务。
因此,通过这一次学习和总结,在下一次使用时,就可以知道不一样属性设置能解决什么问题,例如修改广播特性 PROPAGATION
,让事务支持方法自调用,还有设置事务超时时间 timeout
、隔离级别等属性。
因为我的技术有限,若是有理解不到位或者错误的地方,请留下评论,我会根据朋友们的建议进行修正
Gitee 地址 https://gitee.com/vip-augus/spring-analysis-note.git
Github 地址 https://github.com/Vip-Augus/spring-analysis-note