基于 XA 协议实现一个分布式事务处理框架

引言

现目前处理分布式事务的方案有不少,好比java

  • 基于 XA 协议的方案
  • 基于 TCC 协议方案
  • 基于 SAGA 协议的方案

而实现了对应的协议的有mysql

  • 在 Java 中基于 XA 协议实现的框架有 Atomikos,JOTM 等框架
  • TCC 协议的框架有 EasyTransaction、tcc-transaction 等框架
  • Sharding Sphere 能够经过 SPI 注入 Atomikos 等来实现 XA 分布式事务

对于 XA 协议来讲,它使用的是 2PC 协议的方式,是阻塞式的,而且它还依赖于数据库自身提供的 XA 接口的可靠性,对于大部分商业数据库来讲作的都还蛮不错,在 Mysql 中只有 InnoDB 引擎支持 XA 分布式事务。git

基于 XA 的分布式事务方案更适用于单机应用中跨库事务,它也能够作到远程调用事务支持github

XA 协议

XA 协议由 Tuxedo 首先提出的,并交给X/Open组织,这个组织随即定义了一套分布式事务的标准即 X/Open DTP(X/Open Distributed Transaction Processing Reference Model) 即一些列的接口各个厂商须要遵循这个标准来实现sql

在 DTP 模型中定义了五个组成元素数据库

  • AP(Application Program):也就是应用程序,能够理解为使用 DTP 的程序
  • TM(Transaction Manager):事务管理器,负责协调和管理事务,提供给 AP 编程接口以及管理资源管理器
  • RM(Resource Manager):资源管理器,能够理解为一个 DBMS 系统,或者消息服务器管理系统,应用程序经过资源管理器对资源进行控制,资源必须实现 XA 定义的接口(好比 Mysql 就实现了对 XA 协议的支持)
  • 通讯资源管理器(Communication Resource Manager):它规定了,对于须要跨应用的分布式事务,事务管理器彼此之间须要进行通讯,就须要经过这个组件来完成
  • 通讯协议(Communication Protocol)
    • XA 协议:应用或应用服务器与事务管理之间通讯的协议
    • TX 协议:全局事务管理器与资源管理器之间通讯的接口

其中在 DTP 中又定义了如下几个概念编程

  • 全局事务:资源管理器会操做多个分支事务,根据分支事务的准备状况决定是提交仍是回滚
  • 分支事务:一个全局事务中会有多个分支事务,统一由资源管理器来管理,好比购买商品这个方法,可能涉及到扣款事务方法,生成订单事务方法,增长财务流水事务方法调用等
  • 控制线程:须要表示全局事务以及分支事务的关系

DTP 模型主要是经过 2PC 来控制事务管理器和资源管理之间的交互的api

  • 第一阶段:准备阶段,事务管理器询问资源管理器分支事务是否正常执行完毕,资源管理返回是或者否
  • 第二阶段:提交阶段,事务管理器根据上一阶段全部资源管理器的反馈结果,若是都是那么提交事务,若是否一个失败,那么回滚事务

XA 优缺点

优势:能够作到对业务无侵入,可是对事务管理器和资源管理器要求比较高服务器

缺点:网络

  • 同步阻塞:在二阶段提交的过程当中直到 commit 结束为止,全部节点都须要等到其它节点完成后才可以释放事务资源,这种同步阻塞极大的限制了分布式系统的性能
    • 好比本来在一个购买商品的场景中,一个扣款服务能够直接完成提交事务释放锁资源,如今须要等到订单生成,帐户流水记录,积分增长扣减等操做完成后才能释放
    • 而且若是说在这个操做中,资源管理器由于网络等缘由没有办法收到事务管理器的指令,那么它会一直处于阻塞状态状态而不释放锁资源
  • 单点问题:协调者(事务管理器)若是一旦出现故障那么整个服务就会不可用,因此须要实现对应的协调者(资源管理器)选举操做
  • 数据不一致:若是说全部资源管理器都反馈的准备好了,这时候进入 commit 阶段,可是因为网络问题或者说某个资源管理器挂了,那么就会存在一部分机器执行了事务,另一部分没有执行致使数据不一致
    • 固然这种状况若是说存在协调者选举的话能够必定程度的避免,好比说协调者同时向 A、B、C 三个服务器发出 commit,这时 A 和 B commit 成功,可是 C 在收到命令前就挂了,那么这个时候 C 若是不须要恢复了就不须要管它了(之后启用的时候能够经过好比 mysql binlog 等同步数据)
    • 若是说 C 过一会就恢复了的话,协调者能够保存以前提交的状态,C 去询问协调者该 commit 仍是 rollback,协调者就去检查本身的记录去看一下 A 和 B 上次是 commit 成功了仍是失败了,而后给 C 发送指令去执行完成这个事务,最终让数据保持一致
    • 可是说,若是说协调者再发送指令给 A、B、C 后它本身立马就挂了,这个时候正好 C 正好执行完了操做后(多是 commit 多是 rollback)后也挂了,碰巧的是,协调者立刻选举完成了而后 A 和 B 返回成功了,这个时候若是 A 和 B 都 commit 了,而 C rollback 了那么就会形成数据不一致了(由于 C 已经执行完了这个事务,不像上一个场景没有执行事务或者事务执行失败,它能够再执行或者回滚了)
  • 容错性很差:若是说在提交询问阶段,参与者挂了那么这个时候协调者就只能依靠超时机制来处理是否须要中断事务

能够看出 2PC 协议存在阻塞低效和数据不一致的问题,因此在大型应用须要较高的吞吐量的应用是不多使用这种方案的

JTA 事务规范

JTA(Java Transaction API)即 Java 事务 API 和 JTS(Java Transaction Service)即 Java 事务服务,他们为 JAVA 平台提供了分布式事务服务,能够把 JTA 理解为是 XA 协议规范的 Java 版本。它也采用的是 DTP 模型

咱们知道 JTA 它只是一个规范,定义了如何去于实现了 XA 协议的资源管理器交互的接口,可是并无对应的实现,官方推荐 Atomikos 来实现 XA 分布式事务,感谢的能够直接取研究 Atomikos 源码。

基于 XA 协议实现一个分布式事务处理框架

能够经过对 XA 协议的实际应用来加深咱们的理解,代码以下

public static void test() {
        try {
            // 开启全局事务
            transactionManager.begin();
            // 向服务器 A 数据库写入数据
            saveDB1();
            // 向服务器 B 数据库写入数据
            saveDB2();
            // 询问 RM 分支事务是否准备就绪
            boolean prepareSuccess = transactionManager.prepare();
            // 目前没有涉及到远程事务的支持,在本地都是同步的方式调用因此此处没有作作阻塞等待而是返回马上知道是否成功
            // 若是涉及到远程事务的支持,那么此处应该就有一个阻塞唤醒机制
            if (prepareSuccess) {
                // 开始提交分支事务
                transactionManager.commit();
            } else {
                // 回滚
                transactionManager.rollback();
            }
        } catch (Exception e) {
            e.printStackTrace();
            // 若是出错了就进行回滚各分支事务
            try {
                transactionManager.rollback();
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        } finally {
            // 资源回收
            GlobalInfo.remove();
        }
    }
    
    private static void saveDB1() throws XAException, SQLException {
        // 由于存在多个数据源,因此须要指定是使用哪个数据源
        XAResourceManager xaResourceManager = RMUtil.getResourceManager(dbPool1);
        // 分支事务开启
        xaResourceManager.begin();
        xaResourceManager.execute("insert into test1(name, age) values('pt', 21)");
        // 事务执行完毕处于准备阶段等待 TM 下达 commit 指令
        xaResourceManager.prepare();
    }

    private static void saveDB2() throws XAException, SQLException {
        XAResourceManager xaResourceManager = RMUtil.getResourceManager(dbPool2);
        // 分支事务开启
        xaResourceManager.begin();
        xaResourceManager.execute("insert into test2(name, age) values('tom', 22)");
        // 事务执行完毕处于准备阶段等待 TM 下达 commit 指令
        xaResourceManager.prepare();
        // 测试回滚
        // throw new RuntimeException("xx");
    }
复制代码

包结构

  • db:中存放了数据库链接池的实现,能够实现多数据源的切换
  • rm:中存放了资源管理器的实现,主要是针对分支事务的处理与隔离
  • tm:中存放了事务管理器的实现,它主要是去操做 rm 包来完成全局事务的提交或者回滚

代码放在了 github 上面感兴趣的能够了解一下

github 代码连接

参考:

相关文章
相关标签/搜索