单系统下的分布式数据库事务方案(拓展spring的事务管理器)AgileBPM多数据的解决方案

先推荐一下码云上的一个GVP(最有价值的开源项目) AgileBPM(下面简称ab),我下面讲解的方案也是它的Bo支持多数据源操做事务管理器,友情连接:http://doc.agilebpm.cn/java

目前是解决的是处理单系统内的多数据源问题,简单来讲就是在单系统中的一个线程内,保护多个数据源事务,这也是ab项目所须要的场景。redis

参考了码云上的开源的lcn分布式事务解决方案,以为再拓展一下也是能够解决微服务间的分布式事务处理,利用redis放一个事务处理的共同空间,而后在共同空间内来统筹事务,不过它处理commit异常的问题也是用通用方式(commit失败不少项目都是采起tcc的方式处理)。spring

ps:以前本人试过使用jta事务管理器,这个性能真看不下去。一会就卡。。因此就想着本身定义个管理器,本身来释放资源。sql

1 用AbstractRoutingDataSource让系统支持多数据源数据库

动态数据源配置:缓存

 

真正的数据源(druid数据源):分布式

展现一下DynamicDataSource是继承了AbstractRoutingDataSource的实现,这里不是重点。ide

2 实现支持这种路由数据源的事务管理器微服务

先继承AbstractPlatformTransactionManager(事务管理器的抽象类,咱们很经常使用的DataSourceTransactionManager就是继承它的)
性能

里面须要实现几个关键点就行(笔者只考虑了事务传播性为PROPAGATION_REQUIRED的状况,这也是项目最经常使用的,其余我没支持,毕竟是定制化的事务管理器)

package com.dstz.bus.service.impl;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import javax.sql.DataSource;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.datasource.ConnectionHolder;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.ResourceTransactionManager;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import com.dstz.base.core.util.AppUtil;
import com.dstz.base.core.util.ThreadMapUtil;
import com.dstz.base.db.datasource.DataSourceUtil;
import com.dstz.base.db.datasource.DbContextHolder;
import com.dstz.base.db.datasource.DynamicDataSource;

/**
 * <pre>
 * 描述:ab 结合sys多数据源操做 专门为bo db实例化作的事务管理器
 * 它只保护系统数据源(包含dataSourceDefault),不会保护datasource
 * 其实能够作到,可是这个事务管理器目前只为bo多数据源的保护,因此我没支持
 * 做者:aschs
 * 邮箱:aschs@qq.com
 * 日期:2018年10月10日
 * 版权:summer
 * </pre>
 */
public class AbDataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
    private int i = 0;
    
    @Override
    public void afterPropertiesSet() throws Exception {
        logger.debug("ab的事务管理器已就绪");
    }

    @Override
    public Object getResourceFactory() {
        return DataSourceUtil.getDataSourceByAlias(DataSourceUtil.GLOBAL_DATASOURCE);
    }

    /**
     * <pre>
     * 生成一个在整个事务处理都用到的资源
     * 这里我放了在过程当中的全部链接 Map<数据源别名,链接>
     * </pre>
     */
    @Override
    protected Object doGetTransaction() {
        return new HashMap<String, Connection>();
    }
    
    /**
     * 判断是否已存在事务
     */
    @Override
    protected boolean isExistingTransaction(Object transaction) {
        return (boolean) ThreadMapUtil.getOrDefault("abTransactionManagerExist", false);
    }
    
    /**
     * <pre>
     * 必须实现的一个方法,设置线程内的事务为回滚状态。
     * 这里实际上是为了预防传播性设置为 让线程内能够屡次管理器操做的状况下,用来通知你们不要只作回滚,别commit了。
     * 在该事务管理器只支持PROPAGATION_REQUIRED 的状况下(线程只有一个管理器操做),没多大用,只是必需要实现这个
     * 否则抽象类那里会有报错代码。
     * </pre>
     */
    @Override
    protected void doSetRollbackOnly(DefaultTransactionStatus status) {
        ThreadMapUtil.put("abTransactionManagerRollbackOnly", true);//标记ab事务管理器在线程内已准备要回滚了
    }
    
    /**
     * <pre>
     * 准备事务,获取连接
     * </pre>
     */
    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
        logger.info("分布式事务开始:"+i);
        
        Map<String, Connection> conMap = (Map<String, Connection>) transaction;
        Map<String, DataSource> dsMap = DataSourceUtil.getDataSources();
        // 遍历系统中的全部数据源,打开链接
        for (Entry<String, DataSource> entry : dsMap.entrySet()) {
            Connection con = null;
            try {
                ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(entry.getValue());
                if (conHolder == null) {
                    con = entry.getValue().getConnection();
                    con.setAutoCommit(false);
                    // 缓存连接
                    TransactionSynchronizationManager.bindResource(entry.getValue(), new ConnectionHolder(con));
                } else {
                    con = conHolder.getConnection();
                }
                
                //系统数据源放进资源里
                if(DbContextHolder.getDataSource().equals(entry.getKey())) {
                    DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE);
                    TransactionSynchronizationManager.bindResource(dynamicDataSource, new ConnectionHolder(con));
                }
                
                conMap.put(entry.getKey(), con);
                logger.debug("数据源别名[" + entry.getKey() + "]打开链接成功");
            } catch (Throwable ex) {
                doCleanupAfterCompletion(conMap);
                throw new CannotCreateTransactionException("数据源别名[" + entry.getKey() + "]打开链接错误", ex);
            }
        }
        
        ThreadMapUtil.put("abTransactionManagerExist", true);//标记ab事务管理器已经在线程内启动了
    }

    @Override
    protected void doCommit(DefaultTransactionStatus status) {
        Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction();
        for (Entry<String, Connection> entry : conMap.entrySet()) {
            try {
                entry.getValue().commit();
                logger.debug("数据源别名[" + entry.getKey() + "]提交事务成功");
            } catch (SQLException ex) {
                doCleanupAfterCompletion(conMap);
                throw new TransactionSystemException("数据源别名[" + entry.getKey() + "]提交事务失败", ex);
            }
        }
        logger.info("分布式事务提交:"+i);
    }
    
    /**
     * 回滚
     */
    @Override
    protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
        Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction();
        for (Entry<String, Connection> entry : conMap.entrySet()) {
            try {
                entry.getValue().rollback();
                logger.debug("数据源别名[" + entry.getKey() + "]回滚事务成功");
            } catch (SQLException ex) {
                doCleanupAfterCompletion(conMap);
                throw new TransactionSystemException("数据源别名[" + entry.getKey() + "]回滚事务失败", ex);
            }
        }
        logger.info("分布式事务回滚:"+i);
    }
    
    /**
     * 回收连接
     */
    @Override
    protected void doCleanupAfterCompletion(Object transaction) {
        Map<String, Connection> conMap = (Map<String, Connection>) transaction;
        for (Entry<String, Connection> entry : conMap.entrySet()) {
            DataSource dataSource = DataSourceUtil.getDataSourceByAlias(entry.getKey());
            TransactionSynchronizationManager.unbindResource(dataSource);
            DataSourceUtils.releaseConnection(entry.getValue(), dataSource);
            logger.debug("数据源别名[" + entry.getKey() + "]关闭连接成功");
        }
        
        //最后把本地资源也释放了
        DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE);
        TransactionSynchronizationManager.unbindResource(dynamicDataSource);
        
        ThreadMapUtil.remove("abTransactionManagerExist");
        ThreadMapUtil.remove("abTransactionManagerRollbackOnly");
        ThreadMapUtil.remove();
        
        logger.info("分布式事务释放:"+(i++));
    }
}

 

 事务管理器的方法调用顺序和时机大概说一下:

1 doGetTransaction方法:来初始化事务处理过程当中的公共资源,后面调用的其余方法都是以它为媒介的。

2 doBegin方法:开始事务操做,主要是打开数据源的连接,记得要放到事务资源管理服务中TransactionSynchronizationManager,很是重要,由于这个过程当中用到的jdbc操做是从这里面拿的。

3 doCommit(doRollback):如题,把获取的连接提交或者回滚操做。

4 doCleanupAfterCompletion:回收连接资源

至此,事务管理器的逻辑已经结束了~

最后,实现务必实现isExistingTransaction,用来处理重复线程内屡次触发了事务切面的逻辑

这里笔者用简单的线程变量来标记是否线程内已存在了事务管理,由于我只支持PROPAGATION_REQUIRED传播性,因此没考虑内部嵌入的其余状况,其实也是内部commit一下,资源确定是最后统一释放的。

3 使用自定义事务管理器

先提一下,这里笔者只保护会使用到多数据源的模块,其实大部分系统逻辑仍是用DataSourceTransactionManager就够,不须要保护太多数据源(由于释放和打开连接是有性能损耗的)。

能够看出,主要的逻辑系统仍是使用传统管理器,而后在特定地方声明特殊管理器则可:

5 到这里,整个分布式事务管理已完成了,主要是利用了路由数据源AbstractRoutingDataSource和自定义事务管理器实现的~

6 AgileBPM的多数据源结合展现(可跳过)

这里展现一下,这个开源的流程系统的强大可配置性(让人发指的灵活性-。-)的数据源管理功能。

数据源模板,在这里你可使用定义不一样的数据源实现类,只要在项目import就行

这里有一个内置的阿里的数据源,后面有须要你能够增长其余模板,例如BasicDataSOurce这个,最经常使用的数据源。

有了模板,就能够新建数据源了:

这里的是特殊默认数据源,系统本地数据源,用户能够随便添加。

而后,我就基于AgileBPM的强大数据源管理下,进行了分布式数据源测试。测试逻辑很简单,就是在一个线程内,操做多个数据源,而后看一下会不会一块儿回滚。

 

 这里展现了一下,AgileBPM中使用数据源的便捷性,根据配置的别名,直接拿来代码开发则可,测试代码比较随意了,能保证一致性。

 7 这样的实如今压测中的表现

本人用的是jmetter来压测事务处理,它的表现跟传统的DataSourceTransactionManager表现是同样的!!!!(虽然过程遇到了线程变量的坑,但已修复)。

配置,400进程同时施压:

这是压测结果

 这是日志输出,我故意输出了每一次获取连接,提交,和释放的事务处理过程

 8 挫败:原来行业内的问题主要卡在commit上。

因为对分布式事务产生极大兴趣,因此专研了一下,这么简单的实现为啥别人都以为是打问题呢?原来是由于commit会出错的状况,第一个连接commit成功后,第二个连接commit失败,那么第一个连接已不能回滚了!!!!因此行业内大部分方案都在处理这种状况,虽然到了commit阶段,数据库已经对相关资源产生了写锁,数据也写入磁盘,就等commit刷进去了,产生错误的几率是极少了。做为行业内的大难题,不少方案在处理这个问题。什么2pc原则。。等等,有空我整理一下。大部分主流项目解决方案仍是tcc为主,毕竟这个最通用直接。

8.1顿悟!!

顿悟!!其实我以上这种实现方案就是2pc的实现方案,在jdbc都操做了sql没问题后,再一并提交的方案就是2pc。可是2pc有这个commit提交存在的设计缺陷(这种时机是存在不多可能性的),因此别人就提出tcc和消息队列的解决commit异常的更可靠的方案(可是,只要是串行逻辑就没有百分百可靠的方案,只是下降了可能性罢了)。因此,ab项目关于分布式是采用了2pc的解决方案,顺带提一下jta事务也是相似的逻辑,不过他们的性能主要卡在消息通知上。例如全部连接操做sql都成功了,我须要通知AB连接去提交,我通知了A,A提交成功,而后我通知B,B没收到消息,那么AB资源都会卡住不释放,而后B会超时致使回滚了。因此,jta在消息通知上比较损耗性能……关于2pc的友情连接:

落寞的流月城(632266504) 14:13:42
https://cloud.tencent.com/developer/article/1355859

 

 

9关于AB项目的多数据bo场景方案

在经历挫败以后,理性分析了一下,其实当前这种方案已经知足了ab的分布式事务处理的需求了。首先,其实commit失败的场景是少之又少,笔者调整了逻辑,后面把重要的系统数据源放在最后提交,保证了系统数据源的强一致性,也就是说保证了流程数据的一致性。

 

缘由分析,这里细想一个场景,我先把业务数据从1改为2,而后驱动流程流转,假如个人bo是其余数据源A,A先提交成功,可是系统的本地数据源B提交失败了,那么致使B操做的流程数据会回滚,可是A的数据已提交没法回滚。结果是,流程没有流转,可是业务数据已更新了为2了。这种场景在ab中,至关于,我操做了一下业务数据的保存,由于流程没有变,只是保存了一下数据,对于流程系统自己来讲,有时候仍是好事,由于虽然流程流转失败了,可是业务数据不想再填写一次。因此我说这种方案已经知足ab项目的多数据源下的分布式场景的需求了。

固然,若是用户仍是执着于全部数据源的强一致性,在ab项目中能够在bo保存前,先备份一下bo数据,而后在doCommit时恢复备份数据则能够,ab里有不少时机插件,定义了一些时机插件列表,而后你多实现了插件则会运行,ab的插件代码展现,以下:

ab做为面向技术人员的流程系统,里面内嵌提供了丰富的便捷开发的写法和实现。

相关文章
相关标签/搜索