spring+atomikos+mytatis+jpa实现分布式事务管理实战篇

最近在工做中,因为业务的发展需求,须要搭建一个相似中台的项目(拥有独立的数据库),那么跟咱们项目中本来存在库,一共两个库,而数据库服务器为MYSQL,这时候就涉及到分布式事务的管理了。我也在网上找了不少解决方案,但貌似aomikos+mybatis+jta的解决方案比较少,只能参考网上的案例来集成代码到咱们的项目当中,运行起来发现不定时抛出一些莫名其妙的异常,凭着本身的感受一步步把这些坑给修补了,跑了两天也没看到异常抛出,程序也正常执行,注:我是在dubbo服务接口层以及spring quartz应用中都集成了atomikos+jta。下面来看一下集成的步骤java

1)首先引入JAR包,我使用的是MAVEN来管理项目以及JAR包node

<!-- transaction -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-tx</artifactId>
		</dependency>
		<dependency>
			<groupId>javax.transaction</groupId>
			<artifactId>jta</artifactId>
			<version>1.1</version>
		</dependency>
		<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>atomikos-util</artifactId>
			<version>4.0.4</version>
		</dependency>
		<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>transactions</artifactId>
			<version>4.0.4</version>
		</dependency>
		<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>transactions-jta</artifactId>
			<version>4.0.4</version>
		</dependency>
		<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>transactions-jdbc</artifactId>
			<version>4.0.4</version>
		</dependency>
		<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>transactions-api</artifactId>
			<version>4.0.4</version>
		</dependency>
		<dependency>
			<groupId>cglib</groupId>
			<artifactId>cglib-nodep</artifactId>
		</dependency>
		<!-- transaction -->

2)在你的项目class包中加入jta.properties配置文件,因为MAVEN JAVA项目,src/main/resources下面存放的在编译时,会放到class里,因此咱们放在mysql

而jta.properties的配置内容以下,按需调整吧spring

com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
com.atomikos.icatch.console_file_name = ${log.dir}jta/tm.out
com.atomikos.icatch.log_base_name = tmlog
com.atomikos.icatch.tm_unique_name = com.atomikos.spring.jdbc.tm
com.atomikos.icatch.console_log_level=ERROR
com.atomikos.icatch.enable_logging=false

配置完这个以后,下面就是数据源以及事务的配置了,数据源配置以下:sql

<bean id="abstractXADataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"
		init-method="init" destroy-method="close" abstract="true">
		<property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
		<property name="poolSize" value="10" />
		<!--min-pool-size 最小链接数-->
		<property name="minPoolSize" value="10" />
		<!--max-pool-size 最大链接数 -->
		<property name="maxPoolSize" value="30" />
		<!--获取链接失败从新获等待最大时间,在这个时间内若是有可用链接,将返回-->
		<property name="borrowConnectionTimeout" value="60" />
		 <!-- 若是不设置这个值,Atomikos使用默认的300秒(即5分钟),那么在处理大批量数据读取的时候,
		 一旦超过5分钟,就会抛出相似 Resultset is close 的错误 -->
		<property name="reapTimeout" value="20" />
		<!-- max-idle-time 最大闲置时间,超过最小链接池链接的链接将将关闭 -->
		<property name="maxIdleTime" value="60" />
		<!-- maintenance-interval 链接回收时间  -->
		<property name="maintenanceInterval" value="60" />
		<!-- login-timeout java数据库链接池,最大可等待获取datasouce的时间  -->
		<property name="loginTimeout" value="60" />
		<property name="testQuery" value="SELECT 1" />
		<!-- max-lifetime 链接最大存活时间 -->
		<property name="maxLifetime" value="60"></property>
	</bean>

	<!-- WS数据源配置, 使用DBCP数据库链接池 -->
	<bean id="saleDataSource" parent="abstractXADataSource">
	    <property name="uniqueResourceName" value="saleDB" />  
	    <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
	    <property name="xaProperties">
            <props>
                <prop key="url">${jdbc.url}</prop>
                <prop key="password">${jdbc.password}</prop>
                <prop key="user">${jdbc.username}</prop>
				<prop key="autoReconnect">true</prop>
				<prop key="pinGlobalTxToPhysicalConnection">true</prop>
            </props>
        </property>
	</bean>
	
	<!-- 主数据源配置, 使用DBCP数据库链接池 -->
	<bean id="masterDataSource" parent="abstractXADataSource">
	    <property name="uniqueResourceName" value="masterDB" /> 
	    <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" /> 
	    <property name="xaProperties">
            <props>
                <prop key="url">${csmjdbc.url}</prop>
                <prop key="password">${jdbc.password}</prop>
                <prop key="user">${jdbc.username}</prop>
				<prop key="autoReconnect">true</prop>
				<prop key="pinGlobalTxToPhysicalConnection">true</prop>
            </props>
        </property>
	</bean>
	
	<!-- 主库MyBatis配置 -->
	<bean id="sqlSessionFactoryMain" class="org.mybatis.spring.SqlSessionFactoryBean">
		<property name="dataSource" ref="masterDataSource" />
		<!-- 自动扫描entity目录, 省掉Configuration.xml里的手工配置 -->
		<property name="typeAliasesPackage" value="${type.aliases.package:com.csair}" />
		<!-- 显式指定Mapper文件位置 -->
		<property name="mapperLocations" value="classpath*:/main/dao/mapper/**/*.xml" />
		<property name="typeHandlersPackage"
			value="${type.handlers.package:com.csair.diamond.repository.mybatis.handler}" />
		<property name="configLocation" value="classpath:/META-INF/mybatis-config.xml" />
	</bean>
	
	
	<!-- WS库的MyBatis配置 -->
	<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
		<property name="dataSource" ref="saleDataSource" />
		<!-- 自动扫描entity目录, 省掉Configuration.xml里的手工配置 -->
		<property name="typeAliasesPackage" value="${type.aliases.package:com.csair}" />
		<!-- 显式指定Mapper文件位置 -->
		<property name="mapperLocations" value="classpath*:/dao/mapper/**/*.xml" />
		<property name="typeHandlersPackage"
			value="${type.handlers.package:com.csair.diamond.repository.mybatis.handler}" />
		<property name="plugins">
			<list>
				<ref bean="mybatisSqlInjectionHandlerInterceptor" />
				<ref bean="mybatisStatementHandlerInterceptor" />
				<ref bean="mybatisResultSetHandlerInterceptor" />
			</list>
		</property>
		<property name="configLocation" value="classpath:/META-INF/mybatis-config.xml" />
	</bean>

	<bean id="mybatisStatementHandlerInterceptor"
		class="com.csair.diamond.repository.mybatis.interceptor.StatementHandlerInterceptor">
		<property name="dialectClass"
			value="com.csair.diamond.repository.mybatis.MySqlDialect" />
	</bean>

	<bean id="mybatisResultSetHandlerInterceptor"
		class="com.csair.diamond.repository.mybatis.interceptor.ResultSetHandlerInterceptor">
	</bean>

	<bean id="mybatisSqlInjectionHandlerInterceptor"
		class="com.csair.diamond.repository.mybatis.interceptor.SqlInjectionHandlerInterceptor">
	</bean>

	<!-- 扫描basePackage下全部以@Repository标识的 接口 -->
	<bean  name = "mapperScannerConfigurer" class="org.mybatis.spring.mapper.MapperScannerConfigurer">
		<property name="basePackage"
			value="${mapper.scanner.base.package:com.csair.csm.dao}" />
		<property name="annotationClass"
			value="com.csair.diamond.repository.annotation.Repository" />
		<!-- <property name="sqlSessionFactory" ref="sqlSessionFactory" /> -->
		<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
	</bean>
	
	<bean name = "mainMapperScannerConfigurer" class="org.mybatis.spring.mapper.MapperScannerConfigurer">
		<property name="basePackage"
			value="com.csair.csm.main.dao" />
		<property name="annotationClass"
			value="com.csair.diamond.repository.annotation.Repository" />
		<property name="sqlSessionFactoryBeanName" value="sqlSessionFactoryMain" />
	</bean>

事务的配置以下数据库

<!-- atomikos事务管理器 -->
	<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
         <!-- close()时是否强制终止事务 -->
        <property name="forceShutdown">
            <value>true</value>
        </property>
    </bean>
 
    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
      <property name="transactionTimeout" value="300" />
    </bean>
 	<!-- spring 事务管理器 -->  
    <bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager" ref="atomikosTransactionManager" />
        <property name="userTransaction" ref="atomikosUserTransaction" />
        <!-- 必须设置,不然程序出现异常 JtaTransactionManager does not support custom isolation levels by default -->
        <property name="allowCustomIsolationLevels" value="true"/> 
    </bean>
  	<tx:annotation-driven transaction-manager="springTransactionManager" proxy-target-class="true"  />

 

spring-quartz配置文件以下:api

<bean id="quartzSchedulerSupplier"
		class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
		<property name="dataSource">
			<ref bean="saleDataSource" />
		</property>
		<property name="applicationContextSchedulerContextKey" value="applicationContextKey" />
		<property name="configLocation" value="classpath:quartz.properties" />
		<property name="triggers">
			<list>
				<ref bean="triggerSupplier" />
				<ref bean="triggerProduct" />
				<ref bean="triggerPicture" />
			</list>
		</property>
	</bean>

 

我是使用事务的注解形式回滚事务,我本来是采用切入点来定义某个包下面的全部类以哪些方法开头是有事务而且回滚,哪些是没有事务的,可是集成spring quartz会发现事务开启不一致的问题,spring quartz也有本身的数据源,一套本身的事务,若是不用事务注解,定时任务会开启本身的本地事务,而定时任务调用服务又是一个XA的事务,这时候就会抛事务不统一。服务器

Caused by: com.atomikos.datasource.ResourceException: XA resource 'mysql/first': resume for XID '3137322E31362E33312E38332E746D30303030313030303131:3137322E31362E33312E38332E746D31' raised -9: the XA resource is currently involved in a local (non-XA) transactionmybatis

解决方案:采用事务注解,而且在定时任务入口类标注上注解。app

若是发现如下这种错误异常问题,主要是系统有两个事务,一个事务提交,一个事务未提交,致使事务冲突,建议采用事务注解在最外层方法标注上。

Cannot call method 'commit' while a global transaction is runing(由于我是继承了定时任务,在方法内部手动开启事务,当我方法内部事务还未提交时,定时任务本身内部的事务就提交了,这时候就会抛这个异常,若是集成定时任务,建议不要手动开启事务)

 

 

若是在运行期抛[ERROR][2016-11-03 10:17:30,771][com.atomikos.recovery.imp.CachedRepository]Corrupted log file - restart JVM
com.atomikos.recovery.LogReadException: java.lang.ArrayIndexOutOfBoundsException: 1 at ,解决方案就是把atomikos的jar包版本升级到4.0.4便可。

若是在运行期发现一开始是正常,后期频繁抛以下的错误,根据个人填坑经验,这个是数据库链接池选型不对的问题,以及配置链接池属性跟数据库的配置不一致。

2019-05-06 17:57:37.865 [ Atomikos:2 ] - [ WARN  ] [com.atomikos.recovery.xa.XaResourceRecoveryManager : 40] - Error while retrieving xids from resource - will retry later...
com.mysql.jdbc.jdbc2.optional.MysqlXAException: No operations allowed after connection closed.
        at com.mysql.jdbc.jdbc2.optional.MysqlXAConnection.mapXAExceptionFromSQLException(MysqlXAConnection.java:608)
        at com.mysql.jdbc.jdbc2.optional.MysqlXAConnection.recover(MysqlXAConnection.java:335)
        at com.mysql.jdbc.jdbc2.optional.MysqlXAConnection.recover(MysqlXAConnection.java:255)
        at com.atomikos.datasource.xa.RecoveryScan.recoverXids(RecoveryScan.java:32)
        at com.atomikos.recovery.xa.XaResourceRecoveryManager.retrievePreparedXidsFromXaResource(XaResourceRecoveryManager.java:158)
        at com.atomikos.recovery.xa.XaResourceRecoveryManager.recover(XaResourceRecoveryManager.java:67)
        at com.atomikos.datasource.xa.XATransactionalResource.recover(XATransactionalResource.java:451)
        at com.atomikos.icatch.imp.TransactionServiceImp.performRecovery(TransactionServiceImp.java:490)
        at com.atomikos.icatch.imp.TransactionServiceImp.access$000(TransactionServiceImp.java:56)
        at com.atomikos.icatch.imp.TransactionServiceImp$1.alarm(TransactionServiceImp.java:471)
        at com.atomikos.timing.PooledAlarmTimer.notifyListeners(PooledAlarmTimer.java:95)
        at com.atomikos.timing.PooledAlarmTimer.run(PooledAlarmTimer.java:82)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

一开始xaDataSourceClassName属性值我是采用com.alibaba.druid.pool.xa.DruidXADataSource

<bean id="abstractXADataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"
		init-method="init" destroy-method="close" abstract="true">
		<property name="xaDataSourceClassName" value="com.alibaba.druid.pool.xa.DruidXADataSource" />
		<property name="poolSize" value="10" />
		<property name="minPoolSize" value="10" />
		<property name="maxPoolSize" value="30" />
		<property name="borrowConnectionTimeout" value="60" />
		<property name="reapTimeout" value="20" />
		<property name="maxIdleTime" value="60" />
		<property name="maintenanceInterval" value="60" />
		<property name="loginTimeout" value="60" />
		<property name="testQuery" value="SELECT 1" />
	</bean>

后来采用xaDataSourceClassName为com.mysql.jdbc.jdbc2.optional.MysqlXADataSource

<bean id="abstractXADataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"
		init-method="init" destroy-method="close" abstract="true">
		<property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
		<property name="poolSize" value="10" />
		<!--min-pool-size 最小链接数-->
		<property name="minPoolSize" value="10" />
		<!--max-pool-size 最大链接数 -->
		<property name="maxPoolSize" value="30" />
		<!--获取链接失败从新获等待最大时间,在这个时间内若是有可用链接,将返回-->
		<property name="borrowConnectionTimeout" value="60" />
		 <!-- 若是不设置这个值,Atomikos使用默认的300秒(即5分钟),那么在处理大批量数据读取的时候,
		 一旦超过5分钟,就会抛出相似 Resultset is close 的错误 -->
		<property name="reapTimeout" value="20" />
		<!-- max-idle-time 最大闲置时间,超过最小链接池链接的链接将将关闭 -->
		<property name="maxIdleTime" value="60" />
		<!-- maintenance-interval 链接回收时间  -->
		<property name="maintenanceInterval" value="60" />
		<!-- login-timeout java数据库链接池,最大可等待获取datasouce的时间  -->
		<property name="loginTimeout" value="60" />
		<property name="testQuery" value="SELECT 1" />
		<!-- max-lifetime 链接最大存活时间 -->
		<property name="maxLifetime" value="60"></property>
	</bean>
相关文章
相关标签/搜索