EasyTransaction主要源码分析

EasyTransaction是一个全功能的分布式事务框架,如下特性摘抄自其首页:https://github.com/QNJR-GROUP/EasyTransactionhtml

  • 一个框架包含多种事务形态,一个框架搞定全部类型的事务
  • 多种事务形态可混合使用
  • 高性能,大多数业务系统瓶颈在业务数据库,若不启用框架的幂等功能,对业务数据库的额外消耗仅为写入25字节的一行
  • 可选的框架自带幂等实现及调用错乱次序处理,大幅减轻业务开发工做量,但启用的同时会在业务数据库增长一条幂等控制行
  • 业务代码可实现彻底无入侵
  • 支持嵌套事务
  • 无需额外部署协调者,不一样APP的服务协调自身发起的事务,也避免了单点故障
  • 分布式事务ID可关联业务ID,业务类型,APPID,便于监控各个业务的分布式事务执行状况

本文主要分享EasyTransaction core中各个package的做用其主要实现。git

请先阅读 Seata架构的比对思考 http://www.javashuo.com/article/p-xetvolvp-u.html ,再结合 代码以及demo调试过程看这篇,直接看的话这里的点太零碎了github

1、context包

主要类数据库

LogProcessContext

其用于存储ET事务的上下文信息。在开启ET事务(第一次ET远程调用,或者主动调用startSoftTrans方法)时,将建立本类的实例并将其与Spring的本地事务上下文绑定,经过:架构

TransactionSynchronizationManager.bindResource()

执行绑定。当须要取得ET上下文时,经过并发

TransactionSynchronizationManager.getResource()

取得。app

ET上下文中包含的主要内容有:框架

  • 最终事务状态
  • 所有的全局事务日志
  • 未Flush到外部的全局事务日志
  • 事务ID等内容

2、包core

本包主要类为异步

EasyTransFacade
TransactionHook
ConsistentGuardian
ExecuteCacheManager

类EasyTransFacade

其定义了业务调用方的接口,只包含两个:分布式

public void startEasyTrans(String busCode,long trxId);

public <P extends EasyTransRequest<R,E>,E extends EasyTransExecutor, R extends Serializable> Future<R> execute(P params);

第一个用于开启全局事务,主要的操做为:

  • 挂载TransactionHook到当前的Spring本地事务中,使得能够在关键节点(如本地事务提交前、本地事务回滚后等等)嵌入ET的代码
  • 将 LogProcessContext 绑定到当前的Spring本地事务,使得ET能够在当前Spring本地事务中随时取得ET全局事务的状态。
  • 在当前已开启的本地事务中,写入一条事务执行记录到业务库中,其对Crash恢复时识别全局事务的状态起关键做用

第二个表示执行某个远程事务方法。

  • 经过调用参数Object对应的Class获取对应的处理器(如TCC处理器,可靠消息处理器等)并执行调用,具体调用的形态后续专门的章节再继续

基于注解的接口调用也是经过这两个方法封装而成。

类TransactionHook

其为ET框架代码与Spring原生事务的主要交界点,ET经过TransactionSynchronization定义的方法,在Spring本地事务执行过程当中,扩展支持了全局事务。主要扩展了如下两个方法

beforeCommit(boolean readOnly)
afterCompletion(int status)

beforeCommit方法将会

  • 在Spring本地事务提交前将全部未落盘的全局事务日志落盘
  • 并执行全部未执行的远程调用(ET会尽可能延后全局事务以此堆积并批量执行)
  • 如有不成功的全局事务,则抛出异常,回滚事务(包括本地以及全局)

afterCompletion方法将会

  • 获取本地事务的最终结果(提交/回滚/未知)以及 ET父级事务的状态(提交/回滚/未知)来决定本级ET事务的最终状态(提交/回滚/未知)
  • 得到最终的本级ET事务状态后,异步执行最终一致处理(调用consistentGuardian.process)

类ConsistentGuardian

本类用于处理ET全局事务的最终一致,例如TCC的Conifrim/Cancel,可靠消息的发送消息。

最终一致处理一般会在同步操做(TCC的TRY等)对应的本地事务执行完成后抛到线程池异步执行,但执行失败的话,会有兜底的补偿(recovery包),后续再详细讲述

该类的主要工做机制是根据以前写入的全局事务日志,获取日志对应的处理器(如从TCC的事务日志获取对应的TCC日志处理器),以此

  • 判断当前ET事务的最终状态(若当前ET事务状态仍未肯定的话)
  • 传入最终ET事务状态到日志处理器,依次处理对应的事务日志,处理的典型过程例子:
    • 若存在TRY方法对应的日志
    • 而且找不到TRY对应的CONFRIM/CANCEL日志
    • 则根据ET最终事务状态,调用对应CONFIRM/CANCEL方法

类ExecuteCacheManager

本类主要服务于ET的如下指望

  • 批量写入ET事务日志(以减小IO)
  • 批量并发执行远程业务调用(以减小串行等待远程相应时间)

其主要实现的是,

  • 对每一个传入的Calleble对象都返回一个通过改写的Futrure对象
  • 当任意一个Futrue的get方法都没有被调用前,全部以前传入的Callable对象都不会执行。
  • 当任意一个Future的get被调用时,全部callable都会被批量执行,这里包含了批量写入日志以及批量并发执行远程调用

3、包datasource

主要包含如下两个接口,其主要做用于业务数据源。

DataSourceSelector
TransStatusLogger

类DataSourceSelector

该类主要用于获取当前事务/请求对应的数据源及其事务管理器,若应用有多个业务数据源,则须要自行实现对应的数据源选择器,主要包含如下方法

DataSource selectDataSource(String appId,String busCode,long trxId);
DataSource selectDataSource(String appId,String busCode,EasyTransRequest<?, ?> request);

第一个方法是开启ET事务时候选择对应的数据源

第二个方法是被调用方接受到请求时选择对应的数据源(用于幂等、防悬挂处理,若不须要可忽略)

该接口包含一个默认实现,当只有单数据源时,能够直接用该实现

SingleDataSourceSelector

类TransStatusLogger

该类主要用来读写用于判断ET事务状态的记录,该记录会在ET事务开启时,写入当前的数据库表中,随着业务对应事务(Spring本地事务)提交而提交,回滚而回滚。

更具体请直接看实现

4、包executor

该包存储的是事务发起方(远程服务调用方)相关处理类的位置,不一样的事务类型(TCC,可靠事务等)有不一样的Executor,以TCC为例讲解,其余的事务类型实现都相似。

TccMethodExecutor

该类实现了三个接口

  • EasyTransExecutor
  • LogProcessor
  • DemiLogEventHandler

EasyTransExecutor接口定义了方法

<P extends EasyTransRequest<R,E>,E extends EasyTransExecutor,R  extends Serializable> Future<R> execute(Integer sameBusinessCallSeq, P params);

该方法供类EasyTransFacade.execute使用,其对应的是执行TCC里的TRY方法,具体的,它

  • 将TRY方法调用对应的RPC请求包装成Runnable类
  • 构建本次调用对应的全局事务日志(主要包含本次调用的具体参数、对应远程方法等)
  • 而后传入上面章节提到的类ExecuteCacheManager方法中

LogProcessor接口定义了如何处理事务日志,其包含一个主要方法

boolean logProcess(LogProcessContext ctx, Content currentContent)

该方法将会判断,若是传入的日志类型是PreTccCallContent(TCC TRY请求对应的日志)的话,将会监听该日志最终的配对信息(类ConsistentGuardian会在处理当前ET事务的日志后,发送消息,告知全部须要配对的日志的配对结果),若是

  • 监听到成功配对(找到CONFIRM或者CANCEL对应的日志)的消息,则再也不作后续处理
  • 监听到配对失败(没有存在对应的CONFIRM/CANCEL日志)的消息,则根据当前的ET事务状态执行对应的CONFIRM或者CANCEL操做,并记录对应的日志

其余的事务形态的实现也相似,再也不赘述

5、包recovery

用于兜底恢复事务,实现最终一致。

代码不复杂,能够自行查看。

6、包Filter

该包主要用于实现ET对应的Filter,该Filter做用于被调用端。咱们能够经过实现ET的Filter扩展被调用端的功能,如处理幂等、处理嵌套事务、增长调用上下文的处理等等。

7、包idempotent

实现幂等、方悬挂等处理对应的包

幂等及防悬挂处理的主要原理:

  • 当远程调用过来时,写入调用日志到当前的开启的业务日志中,并记录 调用对应的ID,调用参数对应的MD5
  • 有结果返回时就将结果更新存储到日志中
  • 当有重复请求过来时,就检查ID对应的记录是否存在,若存在则检查参数的MD5是否一致,若一致则返回以前的存储结果
  • 防悬挂也相似,在上述的日志中,将会记录调用的方法是什么,如
    • 当找不到请求对应日志时,但当前为cancel操做的话,框架将直接返回成功
    • 上述cancel已经成功执行后,try方法再来到时,发现cancel已经执行,就直接将try报错返回

8、包idgen

用于生成ET的分布式事务ID,当自行制定ID时,本包对应的方法不会被调用。当不指定ID时,将会自动生成一个。

9、包log

定义ET事务日志对应的Class,以及其读写接口。
事务日志在以前TccExecutor等章节已提到,再也不赘述。

须要扩展事务日志存储实现的,直接实现如下接口便可

TransactionLogReader
TransactionLogWritter

(ET事务日志的读写不须要与业务事务在同一个事务中,也不能在同一个事务中)

10、包master

用于在同一个appId中选择一个做为master进行兜底最终一致补偿的包。

实际上选择不须要太精确,任意一个appId下的实例都可,也能够同时有多个master存在(但目前没有意义,也会浪费性能)

11、包monitor

用于提供ET实例状态的包,可供Dashboard,监控等扩展使用

12、包protocol

供客户直接定义分布式事务服务的包,其包含一些客户直接使用的 父类、配置接口、配置注解等

十3、包provider.factory

从Spring中获取并存储对应的bean实现,以便于快速方便地经过ET对应的定义,取得对应bean

十4、包queue

若要扩展新增对应的消息队列实现,则实现这个包对应的接口

十5、包rpc

同上

十6、包serialization

ET框架所使用的序列化形式,可自行扩展

十7、包stingcodec

用于压缩字符串,将字符串替换为数字id,以提升存储效率

相关文章
相关标签/搜索