先推荐一下码云上的一个GVP(最有价值的开源项目) AgileBPM(下面简称ab),我下面讲解的方案也是它的Bo支持多数据源操做事务管理器,友情连接:http://doc.agilebpm.cn/java
目前是解决的是处理单系统内的多数据源问题,简单来讲就是在单系统中的一个线程内,保护多个数据源事务,这也是ab项目所须要的场景。redis
参考了码云上的开源的lcn分布式事务解决方案,以为再拓展一下也是能够解决微服务间的分布式事务处理,利用redis放一个事务处理的共同空间,而后在共同空间内来统筹事务,不过它处理commit异常的问题也是用通用方式(commit失败不少项目都是采起tcc的方式处理)。spring
ps:以前本人试过使用jta事务管理器,这个性能真看不下去。一会就卡。。因此就想着本身定义个管理器,本身来释放资源。sql
1 用AbstractRoutingDataSource让系统支持多数据源数据库
动态数据源配置:缓存
真正的数据源(druid数据源):分布式
展现一下DynamicDataSource是继承了AbstractRoutingDataSource的实现,这里不是重点。ide
2 实现支持这种路由数据源的事务管理器微服务
先继承AbstractPlatformTransactionManager(事务管理器的抽象类,咱们很经常使用的DataSourceTransactionManager就是继承它的)
性能
里面须要实现几个关键点就行(笔者只考虑了事务传播性为PROPAGATION_REQUIRED的状况,这也是项目最经常使用的,其余我没支持,毕竟是定制化的事务管理器)
package com.dstz.bus.service.impl; import java.sql.Connection; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import javax.sql.DataSource; import org.springframework.beans.factory.InitializingBean; import org.springframework.jdbc.datasource.ConnectionHolder; import org.springframework.jdbc.datasource.DataSourceUtils; import org.springframework.transaction.CannotCreateTransactionException; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionException; import org.springframework.transaction.TransactionSystemException; import org.springframework.transaction.support.AbstractPlatformTransactionManager; import org.springframework.transaction.support.DefaultTransactionStatus; import org.springframework.transaction.support.ResourceTransactionManager; import org.springframework.transaction.support.TransactionSynchronizationManager; import com.dstz.base.core.util.AppUtil; import com.dstz.base.core.util.ThreadMapUtil; import com.dstz.base.db.datasource.DataSourceUtil; import com.dstz.base.db.datasource.DbContextHolder; import com.dstz.base.db.datasource.DynamicDataSource; /** * <pre> * 描述:ab 结合sys多数据源操做 专门为bo db实例化作的事务管理器 * 它只保护系统数据源(包含dataSourceDefault),不会保护datasource * 其实能够作到,可是这个事务管理器目前只为bo多数据源的保护,因此我没支持 * 做者:aschs * 邮箱:aschs@qq.com * 日期:2018年10月10日 * 版权:summer * </pre> */ public class AbDataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean { private int i = 0; @Override public void afterPropertiesSet() throws Exception { logger.debug("ab的事务管理器已就绪"); } @Override public Object getResourceFactory() { return DataSourceUtil.getDataSourceByAlias(DataSourceUtil.GLOBAL_DATASOURCE); } /** * <pre> * 生成一个在整个事务处理都用到的资源 * 这里我放了在过程当中的全部链接 Map<数据源别名,链接> * </pre> */ @Override protected Object doGetTransaction() { return new HashMap<String, Connection>(); } /** * 判断是否已存在事务 */ @Override protected boolean isExistingTransaction(Object transaction) { return (boolean) ThreadMapUtil.getOrDefault("abTransactionManagerExist", false); } /** * <pre> * 必须实现的一个方法,设置线程内的事务为回滚状态。 * 这里实际上是为了预防传播性设置为 让线程内能够屡次管理器操做的状况下,用来通知你们不要只作回滚,别commit了。 * 在该事务管理器只支持PROPAGATION_REQUIRED 的状况下(线程只有一个管理器操做),没多大用,只是必需要实现这个 * 否则抽象类那里会有报错代码。 * </pre> */ @Override protected void doSetRollbackOnly(DefaultTransactionStatus status) { ThreadMapUtil.put("abTransactionManagerRollbackOnly", true);//标记ab事务管理器在线程内已准备要回滚了 } /** * <pre> * 准备事务,获取连接 * </pre> */ @Override protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { logger.info("分布式事务开始:"+i); Map<String, Connection> conMap = (Map<String, Connection>) transaction; Map<String, DataSource> dsMap = DataSourceUtil.getDataSources(); // 遍历系统中的全部数据源,打开链接 for (Entry<String, DataSource> entry : dsMap.entrySet()) { Connection con = null; try { ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(entry.getValue()); if (conHolder == null) { con = entry.getValue().getConnection(); con.setAutoCommit(false); // 缓存连接 TransactionSynchronizationManager.bindResource(entry.getValue(), new ConnectionHolder(con)); } else { con = conHolder.getConnection(); } //系统数据源放进资源里 if(DbContextHolder.getDataSource().equals(entry.getKey())) { DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE); TransactionSynchronizationManager.bindResource(dynamicDataSource, new ConnectionHolder(con)); } conMap.put(entry.getKey(), con); logger.debug("数据源别名[" + entry.getKey() + "]打开链接成功"); } catch (Throwable ex) { doCleanupAfterCompletion(conMap); throw new CannotCreateTransactionException("数据源别名[" + entry.getKey() + "]打开链接错误", ex); } } ThreadMapUtil.put("abTransactionManagerExist", true);//标记ab事务管理器已经在线程内启动了 } @Override protected void doCommit(DefaultTransactionStatus status) { Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction(); for (Entry<String, Connection> entry : conMap.entrySet()) { try { entry.getValue().commit(); logger.debug("数据源别名[" + entry.getKey() + "]提交事务成功"); } catch (SQLException ex) { doCleanupAfterCompletion(conMap); throw new TransactionSystemException("数据源别名[" + entry.getKey() + "]提交事务失败", ex); } } logger.info("分布式事务提交:"+i); } /** * 回滚 */ @Override protected void doRollback(DefaultTransactionStatus status) throws TransactionException { Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction(); for (Entry<String, Connection> entry : conMap.entrySet()) { try { entry.getValue().rollback(); logger.debug("数据源别名[" + entry.getKey() + "]回滚事务成功"); } catch (SQLException ex) { doCleanupAfterCompletion(conMap); throw new TransactionSystemException("数据源别名[" + entry.getKey() + "]回滚事务失败", ex); } } logger.info("分布式事务回滚:"+i); } /** * 回收连接 */ @Override protected void doCleanupAfterCompletion(Object transaction) { Map<String, Connection> conMap = (Map<String, Connection>) transaction; for (Entry<String, Connection> entry : conMap.entrySet()) { DataSource dataSource = DataSourceUtil.getDataSourceByAlias(entry.getKey()); TransactionSynchronizationManager.unbindResource(dataSource); DataSourceUtils.releaseConnection(entry.getValue(), dataSource); logger.debug("数据源别名[" + entry.getKey() + "]关闭连接成功"); } //最后把本地资源也释放了 DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE); TransactionSynchronizationManager.unbindResource(dynamicDataSource); ThreadMapUtil.remove("abTransactionManagerExist"); ThreadMapUtil.remove("abTransactionManagerRollbackOnly"); ThreadMapUtil.remove(); logger.info("分布式事务释放:"+(i++)); } }
事务管理器的方法调用顺序和时机大概说一下:
1 doGetTransaction方法:来初始化事务处理过程当中的公共资源,后面调用的其余方法都是以它为媒介的。
2 doBegin方法:开始事务操做,主要是打开数据源的连接,记得要放到事务资源管理服务中TransactionSynchronizationManager,很是重要,由于这个过程当中用到的jdbc操做是从这里面拿的。
3 doCommit(doRollback):如题,把获取的连接提交或者回滚操做。
4 doCleanupAfterCompletion:回收连接资源
至此,事务管理器的逻辑已经结束了~
最后,实现务必实现isExistingTransaction,用来处理重复线程内屡次触发了事务切面的逻辑
这里笔者用简单的线程变量来标记是否线程内已存在了事务管理,由于我只支持PROPAGATION_REQUIRED传播性,因此没考虑内部嵌入的其余状况,其实也是内部commit一下,资源确定是最后统一释放的。
3 使用自定义事务管理器
先提一下,这里笔者只保护会使用到多数据源的模块,其实大部分系统逻辑仍是用DataSourceTransactionManager就够,不须要保护太多数据源(由于释放和打开连接是有性能损耗的)。
能够看出,主要的逻辑系统仍是使用传统管理器,而后在特定地方声明特殊管理器则可:
5 到这里,整个分布式事务管理已完成了,主要是利用了路由数据源AbstractRoutingDataSource和自定义事务管理器实现的~
6 AgileBPM的多数据源结合展现(可跳过)
这里展现一下,这个开源的流程系统的强大可配置性(让人发指的灵活性-。-)的数据源管理功能。
数据源模板,在这里你可使用定义不一样的数据源实现类,只要在项目import就行
这里有一个内置的阿里的数据源,后面有须要你能够增长其余模板,例如BasicDataSOurce这个,最经常使用的数据源。
有了模板,就能够新建数据源了:
这里的是特殊默认数据源,系统本地数据源,用户能够随便添加。
而后,我就基于AgileBPM的强大数据源管理下,进行了分布式数据源测试。测试逻辑很简单,就是在一个线程内,操做多个数据源,而后看一下会不会一块儿回滚。
这里展现了一下,AgileBPM中使用数据源的便捷性,根据配置的别名,直接拿来代码开发则可,测试代码比较随意了,能保证一致性。
7 这样的实如今压测中的表现
本人用的是jmetter来压测事务处理,它的表现跟传统的DataSourceTransactionManager表现是同样的!!!!(虽然过程遇到了线程变量的坑,但已修复)。
配置,400进程同时施压:
这是压测结果
这是日志输出,我故意输出了每一次获取连接,提交,和释放的事务处理过程
8 挫败:原来行业内的问题主要卡在commit上。
因为对分布式事务产生极大兴趣,因此专研了一下,这么简单的实现为啥别人都以为是打问题呢?原来是由于commit会出错的状况,第一个连接commit成功后,第二个连接commit失败,那么第一个连接已不能回滚了!!!!因此行业内大部分方案都在处理这种状况,虽然到了commit阶段,数据库已经对相关资源产生了写锁,数据也写入磁盘,就等commit刷进去了,产生错误的几率是极少了。做为行业内的大难题,不少方案在处理这个问题。什么2pc原则。。等等,有空我整理一下。大部分主流项目解决方案仍是tcc为主,毕竟这个最通用直接。
8.1顿悟!!
顿悟!!其实我以上这种实现方案就是2pc的实现方案,在jdbc都操做了sql没问题后,再一并提交的方案就是2pc。可是2pc有这个commit提交存在的设计缺陷(这种时机是存在不多可能性的),因此别人就提出tcc和消息队列的解决commit异常的更可靠的方案(可是,只要是串行逻辑就没有百分百可靠的方案,只是下降了可能性罢了)。因此,ab项目关于分布式是采用了2pc的解决方案,顺带提一下jta事务也是相似的逻辑,不过他们的性能主要卡在消息通知上。例如全部连接操做sql都成功了,我须要通知AB连接去提交,我通知了A,A提交成功,而后我通知B,B没收到消息,那么AB资源都会卡住不释放,而后B会超时致使回滚了。因此,jta在消息通知上比较损耗性能……关于2pc的友情连接:
落寞的流月城(632266504) 14:13:42
https://cloud.tencent.com/developer/article/1355859
9关于AB项目的多数据bo场景方案
在经历挫败以后,理性分析了一下,其实当前这种方案已经知足了ab的分布式事务处理的需求了。首先,其实commit失败的场景是少之又少,笔者调整了逻辑,后面把重要的系统数据源放在最后提交,保证了系统数据源的强一致性,也就是说保证了流程数据的一致性。
缘由分析,这里细想一个场景,我先把业务数据从1改为2,而后驱动流程流转,假如个人bo是其余数据源A,A先提交成功,可是系统的本地数据源B提交失败了,那么致使B操做的流程数据会回滚,可是A的数据已提交没法回滚。结果是,流程没有流转,可是业务数据已更新了为2了。这种场景在ab中,至关于,我操做了一下业务数据的保存,由于流程没有变,只是保存了一下数据,对于流程系统自己来讲,有时候仍是好事,由于虽然流程流转失败了,可是业务数据不想再填写一次。因此我说这种方案已经知足ab项目的多数据源下的分布式场景的需求了。
固然,若是用户仍是执着于全部数据源的强一致性,在ab项目中能够在bo保存前,先备份一下bo数据,而后在doCommit时恢复备份数据则能够,ab里有不少时机插件,定义了一些时机插件列表,而后你多实现了插件则会运行,ab的插件代码展现,以下:
ab做为面向技术人员的流程系统,里面内嵌提供了丰富的便捷开发的写法和实现。