分布式事务之JTA原理与实现(三)

著做权归做者全部,任何形式的转载都请联系做者得到受权并注明出处。

外部(全局)事务-JTA

  • 外部事务管理器提供JTA事务管理
  • JTA事务管理器能够管理多个数据资源
  • 经过2阶段提交实现多数据源的事务

事务机制过程图

外部(全局)事务java

外部(全局)事务-不使用应用服务器

XAsql

JTA

什么是JTA

Java 事务编程接口(JTA:Java Transaction API)和 Java 事务服务 (JTS;Java Transaction Service) 为 J2EE 平台提供了分布式事务服务。分布式事务(Distributed Transaction)包括事务管理器(Transaction Manager)和一个或多个支持 XA 协议的资源管理器 ( Resource Manager )。

咱们能够将资源管理器看作任意类型的持久化数据存储。
事务管理器承担着全部事务所参与单元的协调与控制。数据库

JTA 事务有效的屏蔽了底层事务资源,使应用能够以透明的方式参入到事务处理中;可是与本地事务相比,XA 协议的系统开销大,在系统开发过程当中应慎重考虑是否确实须要分布式事务。若确实须要分布式事务以协调多个事务资源,则应实现和配置所支持 XA 协议的事务管理器,如 JMS、JDBC 数据库链接池等。编程

JTA事务管理的弊端缓存

  • 两阶段提交
  • 事务时间太长、锁数据的时间太长
  • 低性能、低吞吐量 不使用JTA实现多数据源的事务管理
  • Spring 事务同步机制
  • 多个数据源上实现近似事务一致性
  • 高性能、高吞吐量

使用 JTA 处理事务的示例以下(注意:connA 和 connB 是来自不一样数据库的链接)bash

public void transferAccount() { 
        UserTransaction userTx = null; 
        Connection connA = null; 
        Statement stmtA = null; 
                
        Connection connB = null; 
        Statement stmtB = null; 
    
        try{ 
            ## 得到 Transaction 管理对象
            userTx = (UserTransaction)getContext().lookup("\java:comp/UserTransaction"); 
            ## 从数据库 A 中取得数据库链接
            connA = getDataSourceA().getConnection(); 
            
            ## 从数据库 B 中取得数据库链接
            connB = getDataSourceB().getConnection(); 
      
            ## 启动事务
            userTx.begin();
            
            ## 将 A 帐户中的金额减小 500 
            stmtA = connA.createStatement(); 
            stmtA.execute("update t_account set amount = amount - 500 where account_id = 'A'");
            
            ## 将 B 帐户中的金额增长 500 
            stmtB = connB.createStatement(); 
            stmtB.execute("\update t_account set amount = amount + 500 where account_id = 'B'");
            
            ## 提交事务
            userTx.commit();
            ## 事务提交:转帐的两步操做同时成功(数据库 A 和数据库 B 中的数据被同时更新)
        } catch(SQLException sqle){ 
            try{ 
                 ## 发生异常,回滚在本事务中的操纵
                 userTx.rollback();
                ## 事务回滚:转帐的两步操做彻底撤销 
                ##( 数据库 A 和数据库 B 中的数据更新被同时撤销)
                stmt.close(); 
                conn.close(); 
                ## 剩余业务代码 
            }catch(Exception ignore){ } 
            sqle.printStackTrace(); 
            
        } catch(Exception ne){ 
            e.printStackTrace(); 
        } 
    }
复制代码

JTA 实现原理

不少开发人员都会对 JTA 的内部工做机制感兴趣:咱们大多时候编写的代码没有任何与事务资源(如数据库链接)互动的代码,可是个人操做(数据库更新)却实实在在的被包含在了事务中,那 JTA 到底是经过何种方式来实现这种透明性的呢?服务器

JTA的事务管理器(Transaction Manager)和资源管理器(Resource Manager)

要理解 JTA 的实现原理首先须要了解其架构:网络

  • 事务管理器(Transaction Manager)
  • 一个或多个支持 XA 协议的资源管理器 ( Resource Manager )

咱们能够将资源管理器看作任意类型的持久化数据存储介质;
事务管理器则承担着全部事务参与单元的协调与控制。多线程

根据所面向对象的不一样,咱们能够将 JTA的事务管理器和资源管理器理解为两个方面:架构

  • 面向开发人员的使用接口(事务管理器)
  • 面向服务提供商的实现接口(资源管理器)

其中开发接口的主要部分即为上述示例中引用的 UserTransaction 对象,开发人员经过此接口在信息系统中实现分布式事务;而实现接口则用来规范提供商(如数据库链接提供商)所提供的事务服务,它约定了事务的资源管理功能,使得 JTA 能够在异构事务资源之间执行协同沟通。以数据库为例,IBM 公司提供了实现分布式事务的数据库驱动程序,Oracle 也提供了实现分布式事务的数据库驱动程序,在同时使用 DB2 和 Oracle 两种数据库链接时, JTA 便可以根据约定的接口协调者两种事务资源从而实现分布式事务。正是基于统一规范的不一样实现使得 JTA 能够协调与控制不一样数据库或者 JMS 厂商的事务资源,其JTA 体系架构以下图所示:

开发人员使用开发人员接口,实现应用程序对全局事务的支持;各提供商(数据库,JMS 等)依据提供商接口的规范提供事务资源管理功能;事务管理器( TransactionManager )将应用对分布式事务的使用映射到实际的事务资源并在事务资源间进行协调与控制。 下面,本文将对包括 UserTransactionTransactionTransactionManager 在内的三个主要接口以及其定义的方法进行介绍。

UserTransaction 和 Transaction

面向开发人员的接口为 UserTransaction (使用方法如上例所示),开发人员一般只使用此接口实现 JTA 事务管理,其定义了以下的方法:

  • begin()- 开始一个分布式事务,(在后台 TransactionManager 会建立一个 Transaction 事务对象并把此对象经过 * ThreadLocale 关联到当前线程上 )
  • commit()- 提交事务(在后台 TransactionManager 会从当前线程下取出事务对象并把此对象所表明的事务提交)
  • rollback()- 回滚事务(在后台 TransactionManager 会从当前线程下取出事务对象并把此对象所表明的事务回滚)
  • getStatus()- 返回关联到当前线程的分布式事务的状态 (Status 对象里边定义了全部的事务状态,感兴趣的读者能够参考 API 文档 )
  • setRollbackOnly()- 标识关联到当前线程的分布式事务将被回滚

面向提供商的实现接口主要涉及到 TransactionManager 和 Transaction 两个对象;

Transaction 表明了一个物理意义上的事务,在开发人员调用 UserTransaction.begin() 方法时 TransactionManager 会建立一个 Transaction 事务对象(标志着事务的开始)并把此对象经过 ThreadLocale 关联到当前线程。 UserTransaction 接口中的 commit()、rollback(),getStatus() 等方法都将最终委托给 Transaction 类的对应方法执行。

Transaction 接口定义了以下的方法:

  • commit()- 协调不一样的事务资源共同完成事务的提交
  • rollback()- 协调不一样的事务资源共同完成事务的回滚
  • setRollbackOnly()- 标识关联到当前线程的分布式事务将被回滚
  • getStatus()- 返回关联到当前线程的分布式事务的状态
  • enListResource(XAResource xaRes, int flag)- 将事务资源加入到当前的事务中(在上述示例中,在对数据库 A 操做时 其所表明的事务资源将被关联到当前事务中,一样,在对数据库 B 操做时其所表明的事务资源也将被关联到当前事务中)
  • delistResourc(XAResource xaRes, int flag)- 将事务资源从当前事务中删除
  • registerSynchronization(Synchronization sync)- 回调接口

Hibernate 等 ORM 工具都有本身的事务控制机制来保证事务,但同时它们还须要一种回调机制以便在事务完成时获得通知从而触发一些处理工做,如清除缓存等。这就涉及到了 Transaction 的回调接口 registerSynchronization。工具能够经过此接口将回调程序注入到事务中,当事务成功提交后,回调程序将被激活。 TransactionManager 自己并不承担实际的事务处理功能,它更多的是充当用户接口和实现接口之间的桥梁。下面列出了 TransactionManager 中定义的方法,能够看到此接口中的大部分事务方法与 UserTransaction 和 Transaction 相同。
在开发人员调用 UserTransaction.begin() 方法时 TransactionManager 会建立一个 Transaction 事务对象(标志着事务的开始)并把此对象经过 ThreadLocale 关联到当前线程上;一样 UserTransaction.commit() 会调用 TransactionManager.commit(), 方法将从当前线程下取出事务对象 Transaction 并把此对象所表明的事务提交, 即调用 Transaction.commit() TransactionManager定义了以下的方法:

  • begin()- 开始事务
  • commit()- 提交事务
  • rollback()- 回滚事务
  • getStatus()- 返回当前事务状态
  • setRollbackOnly()
  • getTransaction()- 返回关联到当前线程的事务
  • setTransactionTimeout(int seconds)- 设置事务超时时间
  • resume(Transaction tobj)- 继续当前线程关联的事务
  • suspend()- 挂起当前线程关联的事务

在系统开发过程当中会遇到须要将事务资源暂时排除的操做,此时就须要调用 suspend() 方法将当前的事务挂起:在此方法后面所作的任何操做将不会被包括在事务中,在非事务性操做完成后调用 resume() 以继续事务(注: 要进行此操做须要得到 TransactionManager 对象, 其得到方式在不一样的 J2EE 应用服务器上是不同的)

下面将经过具体的代码向读者介绍 JTA 实现原理。下图列出了示例实现中涉及到的 Java 类,其中 UserTransactionImpl 实现了 UserTransaction 接口,TransactionManagerImpl 实现了 TransactionManager 接口,TransactionImpl 实现了 Transaction 接口

JTA 实现类图:

## 开始事务
public class UserTransactionImpl implenments UserTransaction{
    public void begin() throws NotSupportedException, SystemException { 
     ## 将开始事务的操做委托给 TransactionManagerImpl 
         TransactionManagerImpl.singleton().begin(); 
     }
}
复制代码
## 开始事务
public class TransactionManagerImpl implements TransactionManager{
## 此处 transactionHolder 用于将 Transaction 所表明的事务对象关联到线程上
    private static ThreadLocal<TransactionImpl> transactionHolder = new ThreadLocal<TransactionImpl>(); 
    ## TransacationMananger 必须维护一个全局对象,所以使用单实例模式实现
     private static TransactionManagerImpl singleton = new TransactionManagerImpl(); 
     
     private TransactionManagerImpl(){ } 
     
     public static TransactionManagerImpl singleton(){ 
         return singleton; 
     } 
 
     public void begin() throws NotSupportedException, SystemException { 
         ## XidImpl 实现了 Xid 接口,其做用是惟一标识一个事务
         XidImpl xid = new XidImpl(); 
         ## 建立事务对象,并将对象关联到线程
         TransactionImpl tx = new TransactionImpl(xid); 
         transactionHolder.set(tx); 
     }
}
复制代码

如今咱们就能够理解 Transaction 接口上没有定义 begin 方法的缘由了:Transaction 对象自己就表明了一个事务,在它被建立的时候就代表事务已经开始,所以也就不须要额外定义 begin() 方法了。

##提交事务
public class UserTransactionImpl implenments UserTransaction{
    public void commit() throws RollbackException, HeuristicMixedException, 
        HeuristicRollbackException, SecurityException, 
        IllegalStateException, SystemException { 
        
        // 检查是不是 Roll back only 事务,若是是回滚事务
           if(rollBackOnly){ 
            rollback(); 
            return; 
          } else { 
           // 将提交事务的操做委托给 TransactionManagerImpl 
           TransactionManagerImpl.singleton().commit(); 
          } 
    }
}
复制代码
## 提交事务
public class TransactionManagerImpl implenments TransactionManager{
public void commit() throws RollbackException, HeuristicMixedException, 
    HeuristicRollbackException, SecurityException, 
    IllegalStateException, SystemException { 
       ## 取得当前事务所关联的事务并经过其 commit 方法提交
       TransactionImpl tx = transactionHolder.get(); 
       tx.commit(); 
    }
}
复制代码

同理, rollback、getStatus、setRollbackOnly 等方法也采用了与 commit() 相同的方式实现。 UserTransaction 对象不会对事务进行任何控制, 全部的事务方法都是经过 TransactionManager 传递到实际的事务资源即 Transaction 对象上。
上述示例演示了 JTA 事务的处理过程,下面将为您展现事务资源(数据库链接,JMS)是如何以透明的方式加入到 JTA 事务中的。首先须要明确的一点是,在 JTA 事务代码中得到的数据库源 ( DataSource ) 必须是支持分布式事务的。在以下的代码示例中,尽管全部的数据库操做都被包含在了 JTA 事务中,可是由于 MySql 的数据库链接是经过本地方式得到的,对 MySql 的任何更新将不会被自动包含在全局事务中。

## JTA 事务处理
public void transferAccount() { 
        UserTransaction userTx = null; 
        Connection mySqlConnection = null; 
        Statement mySqlStat = null; 
        Connection connB = null; 
        Statement stmtB = null; 
    
        try{ 
            ## 得到 Transaction 管理对象
            userTx = (UserTransaction)getContext().lookup("java:comp/UserTransaction");
            ## 以本地方式得到 mySql 数据库链接
            mySqlConnection = DriverManager.getConnection("localhost:1111"); 
            ## 从数据库 B 中取得数据库链接, getDataSourceB 返回应用服务器的数据源
            connB = getDataSourceB().getConnection(); 
            ## 启动事务
            userTx.begin();
            ## 将 A 帐户中的金额减小 500 
            ## mySqlConnection 是从本地得到的数据库链接,不会被包含在全局事务中
            mySqlStat = mySqlConnection.createStatement(); 
            mySqlStat.execute(" update t_account set amount = amount - 500 where account_id = 'A'");
            ## connB 是从应用服务器得的数据库链接,会被包含在全局事务中
            stmtB = connB.createStatement(); 
            stmtB.execute("update t_account set amount = amount + 500 where account_id = 'B'");
            ## 事务提交:connB 的操做被提交,mySqlConnection 的操做不会被提交
            userTx.commit();
        } catch(SQLException sqle){ 
            ## 处理异常代码
        } catch(Exception ne){ 
            e.printStackTrace(); 
        } 
    }
复制代码

为何必须从支持事务的数据源中得到的数据库链接才支持分布式事务呢?

  • 其实支持事务的数据源与普通的数据源是不一样的,它实现了额外的 XADataSource 接口。咱们能够简单的将 XADataSource 理解为普通的数据源(继承了 java.sql.PooledConnection),只是它为支持分布式事务而增长了 getXAResource 方法。另外,由 XADataSource 返回的数据库链接与普通链接也是不一样的,此链接除了实现 java.sql.Connection 定义的全部功能以外还实现了XAConnection 接口。咱们能够把 XAConnection 理解为普通的数据库链接,它支持全部 JDBC 规范的数据库操做,不一样之处在于 XAConnection 增长了对分布式事务的支持

事务资源类图

应用程序从支持分布式事务的数据源得到的数据库链接是 XAConnection 接口的实现,而由此数据库链接建立的会话(Statement)也为了支持分布式事务而增长了功能,以下代码所示:

## JTA 事务资源处理
public void transferAccount() { 
        UserTransaction userTx = null; 
        Connection conn = null; 
        Statement stmt = null; 
        
        try{ 
            ##得到 Transaction 管理对象
            userTx = (UserTransaction)getContext().lookup("java:comp/UserTransaction"); 
            ##从数据库中取得数据库链接, getDataSourceB 返回支持分布式事务的数据源
            conn = getDataSourceB().getConnection(); 
            ## 会话 stmt 已经为支持分布式事务进行了功能加强
            stmt = conn.createStatement(); 
            ## 启动事务
            userTx.begin();
            stmt.execute("update t_account ... where account_id = 'A'"); 
            userTx.commit();
        } catch(SQLException sqle){ 
            ## 处理异常代码
        } catch(Exception ne){ 
            e.printStackTrace(); 
        } 
    }
复制代码

咱们来看一下由 XAConnection 数据库链接建立的会话(Statement)部分的代码实现(不一样的 JTA 提供商会有不一样的实现方式,此处代码示例只是向您演示事务资源是如何被自动加入到事务中)。 咱们以会话对象的 execute 方法为例,经过在方法开始部分增长对 associateWithTransactionIfNecessary 方法的调用,便可以保证在 JTA 事务期间,对任何数据库链接的操做都会被透明的加入到事务中

##将事务资源自动关联到事务对象
public class XAStatement implements Statement{
    public void execute(String sql) { 
        ## 对于每次数据库操做都检查此会话所在的数据库链接是否已经被加入到事务中
        associateWithTransactionIfNecessary(); 
        try{ 
          ## 处理数据库操做的代码
          .... 
        } catch(SQLException sqle){ 
            ## 处理异常代码
        } catch(Exception ne){ 
            e.printStackTrace(); 
        } 
    } 
    public void associateWithTransactionIfNecessary(){ 
    ## 得到 TransactionManager 
        TransactionManager tm = getTransactionManager(); 
        Transaction tx = tm.getTransaction();
        ## 检查当前线程是否有分布式事务
        if(tx != null){ 
        ## 在分布式事务内,经过 tx 对象判断当前数据链接是否已经被包含在事务中,若是不是那么将此链接加入到事务中
        Connection conn = this.getConnection(); 
        ##tx.hasCurrentResource, xaConn.getDataSource() 不是标准的 JTA 
        ## 接口方法,是为了实现分布式事务而增长的自定义方法
        if(!tx.hasCurrentResource(conn)){ 
            XAConnection xaConn = (XAConnection)conn; 
            XADataSource xaSource = xaConn.getDataSource(); 
            ## 调用 Transaction 的接口方法,将数据库事务资源加入到当前事务中
            tx.enListResource(xaSource.getXAResource(), 1);
            } 
        } 
   }
}
复制代码

XAResource 与 Xid

  • XAResource 是 Distributed Transaction Processing: The XA Specification 标准的 Java 实现,它是对底层事务资源的抽象,定义了分布式事务处理过程当中事务管理器和资源管理器之间的协议,各事务资源提供商(如 JDBC 驱动,JMS)将提供此接口的实现。使用此接口,开发人员能够经过本身的编程实现分布式事务处理,但这些一般都是由应用服务器实现的(服务器自带实现更加高效,稳定)。
  • Xid:在使用分布式事务以前,为了区分事务使之不发生混淆,用来标识事务,能够把 Xid 想象成事务的一个标志符,每次在新事务建立是都会为事务分配一个 Xid,Xid 包含三个元素:formatIDgtrid(全局事务标识符)和 bqual(分支修饰词标识符)。 formatID 一般是零,这意味着你将使用 OSI CCR(Open Systems Interconnection Commitment, Concurrency 和 Recovery 标准)来命名;若是你要使用另一种格式,那么 formatID 应该大于零,-1 值意味着 Xid 为无效。 gtrid 和 bqual 分别包含 64 个字节二进制码来分别标识全局事务和分支事务, 惟一的要求是 gtrid 和 bqual 必须是全局惟一的。

XAResource 接口中主要定义了以下方法:

  • commit()- 提交事务
  • isSameRM(XAResource xares)- 检查当前的 XAResource 与参数是否同一事务资源
  • prepare()- 通知资源管理器准备事务的提交工做
  • rollback()- 通知资源管理器回滚事务

在事务被提交时,Transaction 对象会收集全部被当前事务包含的 XAResource 资源,而后调用资源的提交方法,以下代码所示:

## 提交事务
public class TransactionImpl implements Transaction{
    public void commit() throws RollbackException, HeuristicMixedException, 
         HeuristicRollbackException, SecurityException, 
         IllegalStateException, SystemException { 
         ## 获得当前事务中的全部事务资源
         List<XAResource> list = getAllEnlistedResouces(); 
         ## 通知全部的事务资源管理器,准备提交事务
         ## 对于生产级别的实现,此处须要进行额外处理以处理某些资源准备过程当中出现的异常
         for(XAResource xa : list){ 
             xa.prepare(); 
         } 
         ## 全部事务性资源,提交事务
         for(XAResource xa : list){ 
             xa.commit(); 
         } 
   }
}
复制代码

经过如上介绍相信读者对 JTA 的原理已经有所了解,本文中的示例代码都是理想状况下的假设实现。一款完善成熟的 JTA 事务实现须要考虑与处理的细节很是多,如性能(提交事务的时候使用多线程方式并发提交事务)、容错(网络,系统异常)等, 其成熟也须要通过较长时间的积累。感兴趣的读者能够阅读一些开源 JTA 实现以进一步深刻学习。

本文参考:JTA 深度历险 - 原理与实现

相关文章
相关标签/搜索