JPA多数据源分布式事务处理-两种事务方案

前言

多数据源的事务处理是个老生常谈的话题,跨两个数据源的事务管理也算是分布式事务的范畴,在同一个JVM里处理多数据源的事务,比较经典的处理方案是JTA(基于XA协议建模的java标准事务抽象)+XA(XA事务协议),常见的JTA实现框架有Atomikos、Bitronix、Narayana,Spring对这些框架都有组件封装,基本能够作到开箱即用程度。本文除了分享XA事务方案外,提供了一种新的多数据源事务解决思路和视角。html

问题背景

在解决mysql字段脱敏处理时,结合sharding-jdbc的脱敏组件功能,为了sql兼容和最小化应用改造,博主给出了一个多数据源融合的字段脱敏解决方案(只把包含脱敏字段表的操做走sharding-jdbc脱敏代理数据源)。这个方案解决了问题的同时,带来了一个新的问题,数据源的事务是独立的,正如我文中所述《JPA项目多数据源模式整合sharding-jdbc实现数据脱敏》,在spring上下文中,每一个数据源对应一个独立的事务管理器,默认的事务管理器的数据源就用业务自己的数据源,因此须要加密的业务使用时,须要指定@Transactional注解里的事务管理器名称为脱敏对应的事务管理器名称。简单的业务场景这样用也就没有问题了,可是通常的业务场景总有一个事务覆盖两个数据源的操做,这个时候单指定哪一个事务管理器都不行,so,这里须要一种多数据源的事务管理器。java

XA事务方案

XA协议采用2PC(两阶段提交)的方式来管理分布式事务。XA接口提供资源管理器与事务管理器之间进行通讯的标准接口。在JDBC的XA事务相关api抽象里,相关接口定义以下mysql

XADataSource,XA协议数据源spring

public interface XADataSource extends CommonDataSource {
  /**
   * 尝试创建物理数据库链接,使用给定的用户名和密码。返回的链接能够在分布式事务中使用
   */
  XAConnection getXAConnection() throws SQLException;
   //省略getLogWriter等非关键方法
 }

XAConnectionsql

public interface XAConnection extends PooledConnection {

    /**
     * 检索一个{@code XAResource}对象,事务管理器将使用该对象管理该{@code XAConnection}对象在分布式事务中的事务行为
     */
    javax.transaction.xa.XAResource getXAResource() throws SQLException;
}

XAResource数据库

public interface XAResource {
    /**
     * 提交xid指定的全局事务
     */
    void commit(Xid xid, boolean onePhase) throws XAException;

    /**
     * 结束表明事务分支执行的工做。资源管理器从指定的事务分支中分离XA资源,并让事务完成。
     */
    void end(Xid xid, int flags) throws XAException;

    /**
     * 通知事务管理器忽略此xid事务分支
     */
    void forget(Xid xid) throws XAException;

    /**
     * 判断是否同一个资源管理器
     */
    boolean isSameRM(XAResource xares) throws XAException;

    /**
     * 指定xid事务准备阶段
     */
    int prepare(Xid xid) throws XAException;

    /**
     * 从资源管理器获取准备好的事务分支的列表。事务管理器在恢复期间调用此方法,
     * 以获取当前处于准备状态或初步完成状态的事务分支的列表。
     */
    Xid[] recover(int flag) throws XAException;

    /**
     * 通知资源管理器回滚表明事务分支完成的工做。
     */
    void rollback(Xid xid) throws XAException;

    /**
     * 表明xid中指定的事务分支开始工做。
     */
    void start(Xid xid, int flags) throws XAException;

    //省略非关键方法
}

相比较普通的事务管理,JDBC的XA协议管理多了一个XAResource资源管理器,XA事务相关的行为(开启、准备、提交、回滚、结束)都由这个资源管理器来控制,这些都是框架内部的行为,体如今开发层面提供的数据源也变成了XADataSource。而JTA的抽象里,定义了UserTransaction、TransactionManager。想要使用JTA事务,必须先实现这两个接口。因此,若是咱们要使用JTA+XA控制多数据源的事务,在sprign boot里以Atomikos为例,api

引入Atomikos依赖app

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

spring boot已经帮咱们把XA事务管理器自动装载类定义好了,如:框架

建立JTA事务管理器分布式

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties({ AtomikosProperties.class, JtaProperties.class })
@ConditionalOnClass({ JtaTransactionManager.class, UserTransactionManager.class })
@ConditionalOnMissingBean(PlatformTransactionManager.class)
class AtomikosJtaConfiguration {

    @Bean(initMethod = "init", destroyMethod = "shutdownWait")
    @ConditionalOnMissingBean(UserTransactionService.class)
    UserTransactionServiceImp userTransactionService(AtomikosProperties atomikosProperties,
            JtaProperties jtaProperties) {
        Properties properties = new Properties();
        if (StringUtils.hasText(jtaProperties.getTransactionManagerId())) {
            properties.setProperty("com.atomikos.icatch.tm_unique_name", jtaProperties.getTransactionManagerId());
        }
        properties.setProperty("com.atomikos.icatch.log_base_dir", getLogBaseDir(jtaProperties));
        properties.putAll(atomikosProperties.asProperties());
        return new UserTransactionServiceImp(properties);
    }
    @Bean(initMethod = "init", destroyMethod = "close")
    @ConditionalOnMissingBean(TransactionManager.class)
    UserTransactionManager atomikosTransactionManager(UserTransactionService userTransactionService) throws Exception {
        UserTransactionManager manager = new UserTransactionManager();
        manager.setStartupTransactionService(false);
        manager.setForceShutdown(true);
        return manager;
    }
    @Bean
    @ConditionalOnMissingBean(XADataSourceWrapper.class)
    AtomikosXADataSourceWrapper xaDataSourceWrapper() {
        return new AtomikosXADataSourceWrapper();
    }
    @Bean
    JtaTransactionManager transactionManager(UserTransaction userTransaction, TransactionManager transactionManager,
            ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
        JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(userTransaction, transactionManager);
        transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(jtaTransactionManager));
        return jtaTransactionManager;
    }
    、、、、、、、、、、
}

显然,想要使用XA事务,除了须要提供UserTransaction、TransactionManager的实现。还必需要有一个XADataSource,而sharding-jdbc代理的数据源是DataSource的,咱们须要将XADataSource包装成普通的DataSource,spring已经提供了一个AtomikosXADataSourceWrapper的XA数据源包装器,并且在AtomikosJtaConfiguration里已经注册到Spring上下文中,因此咱们在自定义数据源时能够直接注入包装器实例,而后,由于是JPA环境,因此在建立EntityManagerFactory实例时,须要指定JPA的事务管理类型为JTA,综上,普通的业务默认数据源配置以下:

/**
 * @author: kl @kailing.pub
 * @date: 2020/5/18
 */
@Configuration
@EnableConfigurationProperties({JpaProperties.class, DataSourceProperties.class})
public class DataSourceConfiguration{

    @Primary
    @Bean
    public DataSource dataSource(AtomikosXADataSourceWrapper wrapper, DataSourceProperties dataSourceProperties) throws Exception {
        MysqlXADataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(MysqlXADataSource.class).build();
        return wrapper.wrapDataSource(dataSource);
    }

    @Primary
    @Bean(initMethod = "afterPropertiesSet")
    public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) {
        return factoryBuilder.dataSource(dataSource)
                .packages(Constants.BASE_PACKAGES)
                .properties(jpaProperties.getProperties())
                .persistenceUnit("default")
                .jta(true)
                .build();
    }

    @Bean
    @Primary
    public EntityManager entityManager(EntityManagerFactory entityManagerFactory){
        //必须使用SharedEntityManagerCreator建立SharedEntityManager实例,不然SimpleJpaRepository中的事务不生效
        return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
    }
}

sharding-jdbc加密数据源和普通业务数据源实际上是同一个数据源,只是走加解密逻辑的数据源须要被sharding-jdbc的加密组件代理一层,加上了加解密的处理逻辑。因此配置以下:

/**
 * @author: kl @kailing.pub
 * @date: 2020/5/18
 */
@Configuration
@EnableConfigurationProperties({JpaProperties.class,SpringBootEncryptRuleConfigurationProperties.class, SpringBootPropertiesConfigurationProperties.class})
public class EncryptDataSourceConfiguration {

    @Bean
    public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException {
        return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps());
    }

    @Bean(initMethod = "afterPropertiesSet")
    public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory(@Qualifier("encryptDataSource") DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException {
        return factoryBuilder.dataSource(dataSource)
                .packages(Constants.BASE_PACKAGES)
                .properties(jpaProperties.getProperties())
                .persistenceUnit("encryptPersistenceUnit")
                .jta(true)
                .build();
    }

    @Bean
    public EntityManager encryptEntityManager(@Qualifier("encryptEntityManagerFactory") EntityManagerFactory entityManagerFactory){
        //必须使用SharedEntityManagerCreator建立SharedEntityManager实例,不然SimpleJpaRepository中的事务不生效
        return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
    }
}
  • 遇到问题一、:Connection pool exhausted - try increasing 'maxPoolSize' and/or 'borrowConnectionTimeout' on the DataSourceBean.
  • 解决问题:默认AtomikosXADataSourceWrapper包装器初始化的数据源链接池最大为1,因此须要添加配置参数如:
spring.jta.atomikos.datasource.max-pool-size=20
  • 遇到问题二、: XAER_INVAL: Invalid arguments (or unsupported command)
  • 解决问题:这个是mysql实现XA的bug,仅当您在同一事务中屡次访问同一MySQL数据库时,才会发生此问题,在mysql链接url加上以下参数便可,如:
spring.datasource.url = jdbc:mysql://127.0.0.1:3306/xxx?pinGlobalTxToPhysicalConnection=true

Mysql XA事务行为

在这个场景中,虽然是多数据源,可是底层连接的是同一个mysql数据库,因此XA事务行为为,从第一个执行的sql开始(并非JTA事务begin阶段),生成xid并XA START事务,而后XA END。第二个数据源的sql执行时会判断是否同一个mysql资源,若是是同一个则用刚生成的xid从新XA START RESUME,而后XA END,最终虽然在应用层是两个DataSource,其实最后只会调用XA COMMIT一次。mysql驱动实现的XAResource的start以下:

public void start(Xid xid, int flags) throws XAException {
        StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);
        commandBuf.append("XA START ");
        appendXid(commandBuf, xid);

        switch (flags) {
            case TMJOIN:
                commandBuf.append(" JOIN");
                break;
            case TMRESUME:
                commandBuf.append(" RESUME");
                break;
            case TMNOFLAGS:
                // no-op
                break;
            default:
                throw new XAException(XAException.XAER_INVAL);
        }
        dispatchCommand(commandBuf.toString());
        this.underlyingConnection.setInGlobalTx(true);
    }

第一次sql执行时,flags=0,走的TMNOFLAGS逻辑,第二次sql执行时,flags=134217728,走的TMRESUME,从新开启事务的逻辑。以上是Mysql XA的真实事务逻辑,可是博主研究下来发现,msyql xa并不支持XA START RESUME这种语句,并且有不少限制《Mysql XA交易限制》,因此在mysql数据库使用XA事务时,最好了解下mysql xa的缺陷

链式事务方案

链式事务不是我独创的叫法,在spring-data-common项目的Transaction包下,已经有一个默认实现ChainedTransactionManager,前文中《深刻理解spring的@Transactional工做原理》已经分析了Spring的事务抽象,由PlatformTransactionManager(事务管理器)、TransactionStatus(事务状态)、TransactionDefinition(事务定义)等形态组成,ChainedTransactionManager也是实现了PlatformTransactionManager和TransactionStatus。实现原理也很简单,在ChainedTransactionManager内部维护了事务管理器的集合,经过代理编排真实的事务管理器,在事务开启、提交、回滚时,都分别操做集合里的事务。以达到对多个事务的统一管理。这个方案比较简陋,并且有缺陷,在提交阶段,若是异常不是发生在第一个数据源,那么会存在以前的提交不会回滚,因此在使用ChainedTransactionManager时,尽可能把出问题可能性比较大的事务管理器放链的后面(开启事务、提交事务顺序相反)。这里只是抛出了一种新的多数据源事务管理的思路,能用XA尽可能用XA管理。

普通的业务默认数据源配置以下:

/**
 * @author: kl @kailing.pub
 * @date: 2020/5/18
 */
@Configuration
@EnableConfigurationProperties({JpaProperties.class, DataSourceProperties.class})
public class DataSourceConfiguration{

    @Primary
    @Bean
    public DataSource dataSource(DataSourceProperties dataSourceProperties){
       return dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
    }

    @Primary
    @Bean(initMethod = "afterPropertiesSet")
    public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) {
        return factoryBuilder.dataSource(dataSource)
                .packages(Constants.BASE_PACKAGES)
                .properties(jpaProperties.getProperties())
                .persistenceUnit("default")
                .build();
    }

    @Bean
    @Primary
    public EntityManager entityManager(EntityManagerFactory entityManagerFactory){
        //必须使用SharedEntityManagerCreator建立SharedEntityManager实例,不然SimpleJpaRepository中的事务不生效
        return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
    }

    @Primary
    @Bean
    public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory){
        JpaTransactionManager txManager = new JpaTransactionManager();
        txManager.setEntityManagerFactory(entityManagerFactory);
        return txManager;
    }
}

sharding-jdbc加密数据源配置以下:

/**
 * @author: kl @kailing.pub
 * @date: 2020/5/18
 */
@Configuration
@EnableConfigurationProperties({JpaProperties.class,SpringBootEncryptRuleConfigurationProperties.class, SpringBootPropertiesConfigurationProperties.class})
public class EncryptDataSourceConfiguration {

    @Bean
    public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException {
        return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps());
    }

    @Bean(initMethod = "afterPropertiesSet")
    public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory(@Qualifier("encryptDataSource") DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException {
        return factoryBuilder.dataSource(dataSource)
                .packages(Constants.BASE_PACKAGES)
                .properties(jpaProperties.getProperties())
                .persistenceUnit("encryptPersistenceUnit")
                .build();
    }

    @Bean
    public EntityManager encryptEntityManager(@Qualifier("encryptEntityManagerFactory") EntityManagerFactory entityManagerFactory){
        //必须使用SharedEntityManagerCreator建立SharedEntityManager实例,不然SimpleJpaRepository中的事务不生效
        return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
    }

    @Bean
    public PlatformTransactionManager chainedTransactionManager(PlatformTransactionManager transactionManager) throws SQLException {
        JpaTransactionManager encryptTransactionManager = new JpaTransactionManager();
        encryptTransactionManager.setEntityManagerFactory(encryptEntityManagerFactory());
        //使用链式事务管理器包装真正的transactionManager、txManager事务
        ChainedTransactionManager chainedTransactionManager = new ChainedTransactionManager(encryptTransactionManager,transactionManager);
        return chainedTransactionManager;
    }
}

使用这种方案,在涉及到多数据源的业务时,须要指定使用哪一个事务管理器,如:

@PersistenceContext(unitName = "encryptPersistenceUnit")
    private EntityManager entityManager;

    @PersistenceContext
    private EntityManager manager;

    @Transactional(transactionManager = "chainedTransactionManager")
    public AccountModel  save(AccountDTO dto){
        AccountModel accountModel = AccountMapper.INSTANCE.dtoTo(dto);

        entityManager.persist(accountModel);
        entityManager.flush();
        AccountModel accountMode2 = AccountMapper.INSTANCE.dtoTo(dto);

        manager.persist(accountMode2);
        manager.flush();

        return accountModel;
    }

结语

综上,对于JPA的多数据源分布式事务处理,JTA的事务管理器通过spring boot的封装已经能够开箱即用了。重点在JPA环境下,须要指定EntityManagerFactory的事务使用JTA事务。另本文分享了一种链式事务编排的方式也能够应用在这种场景,可是特殊的场景下不能保证事务的完整性,因此博主推荐使用JtaTransactionManager,有符合的场景也能够试试ChainedTransactionManager。

做者简介:

独立博客KL博客(http://www.kailing.pub)博主。

相关文章
相关标签/搜索