在以前的文章中已经介绍过Seata的整体介绍,如何使用以及Seata-Server的原理分析,有兴趣的能够阅读下面的文章:spring
这篇文章会介绍Seata中另外两个重要的角色TM
(事务管理器)和RM
(资源管理器),首先仍是来看看下面这张图:sql
TC
的原理已经作了详细介绍,对于TM和RM咱们看见在图中都是属于
client
的角色,他们分别的功能以下:
TM
(事务管理器):用来控制整个分布式事务的管理,发起全局事务的Begin/Commit/Rollback
。RM(资源管理器)
:用来注册本身的分支事务,接受TC
的Commit
或者Rollback
请求.首先咱们来介绍一些Seata-client
中Spring
模块,Seata
经过这个模块对本身的TM
和RM
进行初始化以及扫描AT模式和TCC模式的注解并初始化这些模式须要的资源。 在Seata
的项目中有一个spring
模块,里面包含了咱们和spring
相关的逻辑,GlobalTransactionScanner
是其中的核心类:数据库
public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean,ApplicationContextAware,
DisposableBean
复制代码
上面代码是类的定义,首先它继承了AbstractAutoProxyCreator
实现了wrapIfNecessary
方法实现咱们的方法的切面代理,实现了InitializingBean
接口用于初始化咱们的客户端,实现了ApplicationContextAware
用于保存咱们的spring
容器,实现了DisposableBean
用于优雅关闭。bash
首先来看继承AbstractAutoProxyCreator实现的wrapIfNecessarymybatis
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (interceptor == null) {
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
}
}
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
复制代码
beanName
是否已经处理过 若是处理过本次就不处理。Inteceptor
,这里有三种状况第一个TCC
,第二个是全局事务管理TM的拦截器,第三个是没有注解,若是没有那么直接返回便可。interceptor
添加进入当前Bean
。而后再看从InitializingBean
中实现的afterPropertiesSet
,也就是对Seata
的初始化:app
public void afterPropertiesSet() {
initClient();
}
private void initClient() {
//init TM
TMClient.init(applicationId, txServiceGroup);
//init RM
RMClient.init(applicationId, txServiceGroup);
registerSpringShutdownHook();
}
private void registerSpringShutdownHook() {
if (applicationContext instanceof ConfigurableApplicationContext) {
((ConfigurableApplicationContext) applicationContext).registerShutdownHook();
ShutdownHook.removeRuntimeShutdownHook();
}
ShutdownHook.getInstance().addDisposable(TmRpcClient.getInstance(applicationId, txServiceGroup));
ShutdownHook.getInstance().addDisposable(RmRpcClient.getInstance(applicationId, txServiceGroup));
}
复制代码
上面的代码逻辑比较清楚:框架
TM
客户端,这里会向Server
注册该TM
。RM
客户端,这里会向Server注册该RM
。ShutdownHook
,后续将TM
和RM
优雅关闭。注意这里初始化的时候会初始化两个客户端,分别是TM
客户端和RM
客户端,不少人认为TM
和RM
是用的同一个客户端,这里须要注意一下。异步
再上面的第一部分逻辑中咱们看到咱们有两个业务核心Interceptor
,一个是GlobalTransactionalInterceptor
用来处理全局事务的管理(开启,提交,回滚),另一个是TccActionInterceptor
用来处理TCC模式。熟悉Seata的朋友会问AT模式呢,为何只有TCC模式,这里AT模式表明着就是自动处理事务,咱们不须要有切面async
首先来看看GlobalTransactionalInterceptor#invoke:分布式
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}
复制代码
Method
Method
中的注解@GlobalTransactional
注解执行handleGlobalTransaction切面逻辑,这个也是咱们全局事务的逻辑。@GlobalLock
注解,则执行handleGlobalLock切面逻辑,这个注解是用于一些非AT模式的数据库加锁,加上这个注解以后再执行Sql语句以前会查询对应的数据是否加锁,可是他不会加入全局事务。handleGlobalTransaction
逻辑以下:
private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
});
}
TransactionalTemplate#execute
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.1 get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
try {
// 2. begin transaction
beginTransaction(txInfo, tx);
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3.the needed business exception to rollback.
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}
复制代码
在handleGlobalTransaction
中将具体的实现交给了TransactionalTemplate#execute
去作了,其中具体的步骤以下:
TccActionInterceptor
咱们先看看TccActionInterceptor是如何使用:
@TwoPhaseBusinessAction(name = "TccActionOne" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, int a);
public boolean commit(BusinessActionContext actionContext);
public boolean rollback(BusinessActionContext actionContext);
复制代码
通常来讲会定义三个方法一个是阶段的try方法,另一个是二阶段的commit和rollback,每一个方法的第一个参数是咱们事务上下文,这里咱们不须要关心他在咱们切面中会自行填充处理。
接下来咱们再看看TCC相关的拦截器是如何处理的:
public Object invoke(final MethodInvocation invocation) throws Throwable {
Method method = getActionInterfaceMethod(invocation);
TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
//try method
if(businessAction != null) {
if(StringUtils.isBlank(RootContext.getXID())){
//not in distribute transaction
return invocation.proceed();
}
Object[] methodArgs = invocation.getArguments();
//Handler the TCC Aspect
Map<String, Object> ret = actionInterceptorHandler.proceed(method, methodArgs, businessAction, new Callback<Object>(){
@Override
public Object execute() throws Throwable {
return invocation.proceed();
}
});
//return the final result
return ret.get(Constants.TCC_METHOD_RESULT);
}
return invocation.proceed();
}
复制代码
Method
。GlobalTransactionalInterceptor
。若是再也不直接执行便可。TCC
切面,核心逻辑在actionInterceptorHandler#proceed
中。再来看看actionInterceptorHandler#proceed
这个方法:
public Map<String, Object> proceed(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, Callback<Object> targetCallback) throws Throwable {
Map<String, Object> ret = new HashMap<String, Object>(16);
//TCC name
String actionName = businessAction.name();
String xid = RootContext.getXID();
BusinessActionContext actionContext = new BusinessActionContext();
actionContext.setXid(xid);
//set action anme
actionContext.setActionName(actionName)
//Creating Branch Record
String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
actionContext.setBranchId(branchId);
//set the parameter whose type is BusinessActionContext
Class<?>[] types = method.getParameterTypes();
int argIndex = 0;
for (Class<?> cls : types) {
if (cls.getName().equals(BusinessActionContext.class.getName())) {
arguments[argIndex] = actionContext;
break;
}
argIndex++;
}
//the final parameters of the try method
ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
//the final result
ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
return ret;
}
复制代码
TCC
名字,本次事务XID
等。Branch
事务,一个是在本地的context
上下文中将它的commit
和rollback
信息保存起来,另外一个是向咱们的Seata-Server
注册分支事务,用于后续的管理。BusinessActionContext
。Spring的几个总要的内容已经剖析完毕,核心类主要是三个,一个Scanner
,两个Interceptor
。总体来讲比较简单,Spring作的基本上也是咱们客户端一些初始化的事,接下来咱们深刻了解一下TM这个角色。
在上面章节中咱们讲了GlobalTransactionalInterceptor
这个切面拦截器,咱们知道了这个拦截器中作了咱们TM应该作的事,事务的开启,事务的提交,事务的回滚。这里只是咱们总体逻辑的发起点,其中具体的客户端逻辑在咱们的DefaultTransactionManager中,这个类中的代码以下所示:
public class DefaultTransactionManager implements TransactionManager {
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
return response.getXid();
}
@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);
return response.getGlobalStatus();
}
@Override
public GlobalStatus rollback(String xid) throws TransactionException {
GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
globalRollback.setXid(xid);
GlobalRollbackResponse response = (GlobalRollbackResponse)syncCall(globalRollback);
return response.getGlobalStatus();
}
@Override
public GlobalStatus getStatus(String xid) throws TransactionException {
GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
queryGlobalStatus.setXid(xid);
GlobalStatusResponse response = (GlobalStatusResponse)syncCall(queryGlobalStatus);
return response.getGlobalStatus();
}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
return (AbstractTransactionResponse)TmRpcClient.getInstance().sendMsgWithResponse(request);
} catch (TimeoutException toe) {
throw new TransactionException(TransactionExceptionCode.IO, toe);
}
}
}
复制代码
在DefaultTransactionManager
中总体逻辑比较简单有四个方法:
begin
:向Server
发起GlobalBeginRequest
请求,用于开启全局事务。commit
:向Server
发起GlobalCommitRequest
请求,用于提交全局事务。rollback
:向Server
发起GlobalRollbackRequest
请求,用于回滚全局事务。getStatus
:向Server
发起GlobalStatusRequest
请求,用于查询全局事务状态信息。在Seata
中目前管理RM
有两种模式:一种是AT
模式,须要事务性数据库支持,会自动记录修改前快照和修改后的快照,用于提交和回滚;还有一种是TCC
模式,也能够看做是MT
模式,用于AT模式不支持的状况,手动进行提交和回滚。接下来将会深刻剖析一下这两种模式的实现原理。
AT
模式下须要使用Seata
提供的数据源代理,其总体实现逻辑以下图所示:
在咱们的程序中执行一个sql
语句,不管你是使用mybatis
,仍是直接使用jdbcTemplate
,都会遵循下面的步骤:
Statement
。sql
语句因此咱们能够将DataSource
,Connection
,Statement
代理起来而后执行咱们的一些特殊的逻辑,完成咱们的AT模式。
在DataSourceProxy中没有太多的业务逻辑,只是简单的将获取Connection
用咱们的ConnectionProxy
代理类进行了封装,代码以下:
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}
复制代码
首先经过咱们代理以前的DataSource
获取链接,而后用ConnectionProxy
将其代理起来。
ConnectionProxy
主要作三件事,第一个是生成代理的Statement
,第二个是保存咱们的链接上下文:加锁的Key,undoLog等,第三个是代理执行咱们的本地事务的commit
和rollback
。
首先来看看代理生成的Statement
:
@Override
public Statement createStatement() throws SQLException {
Statement targetStatement = getTargetConnection().createStatement();
return new StatementProxy(this, targetStatement);
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql);
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}
复制代码
这里也是经过咱们原来的链接直接生成Statement
,而后将其进行代理。
接下来看看对咱们上下文的管理,你们都知道咱们的一个事务其实对应的是一个数据库链接,在这个事务中的全部sql
的undolog
和lockKey
都会在链接的上下文中记录。以下面代码所示:
/**
* append sqlUndoLog
*
* @param sqlUndoLog the sql undo log
*/
public void appendUndoLog(SQLUndoLog sqlUndoLog) {
context.appendUndoItem(sqlUndoLog);
}
/**
* append lockKey
*
* @param lockKey the lock key
*/
public void appendLockKey(String lockKey) {
context.appendLockKey(lockKey);
}
复制代码
这里的代码很简单,lockKey
和undolog
都是用list
保存,直接add
便可。
当咱们的本地事务完成的时候,须要调用Connection
的commit
或rollback
来进行事务的提交或回滚。这里咱们也须要代理这两个方法来完成咱们对分支事务的处理,先来看看commit
方法。
public void commit() throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
private void processGlobalTransactionCommit() throws SQLException {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e);
}
try {
if (context.hasUndoLog()) {
UndoLogManager.flushUndoLogs(this);
}
targetConnection.commit();
} catch (Throwable ex) {
report(false);
if (ex instanceof SQLException) {
throw new SQLException(ex);
}
}
report(true);
context.reset();
}
复制代码
context
是否再全局事务中,若是在则进行提交,到Step2。context
中有undolog
,那么将Unlog
刷至数据库。上面介绍了提交事务的流程,当context
在全局锁的流程中,会进行全局锁的查询,这里比较简单就不作赘述,若是context
都没有在上述的状况中那么会直接进行事务提交。
对于咱们rollback
来讲代码比较简单:
public void rollback() throws SQLException {
targetConnection.rollback();
if (context.inGlobalTransaction()) {
if (context.isBranchRegistered()) {
report(false);
}
}
context.reset();
}
复制代码
细心的小伙伴可能发现若是咱们的本地事务提交或者回滚以后失败,那咱们的分布式事务运行结果还能正确吗?这里彻底不用担忧,再咱们的服务端有完善的超时检测,重试等机制,来帮助咱们应对这些特殊状况。
咱们通常用statement
会调用executeXXX
方法来执行咱们的sql
语句,因此在咱们的Proxy
中能够利用这个方法,再执行sql
的时候作一些咱们须要作的逻辑,下面看看execute
方法的代码:
public boolean execute(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, new StatementCallback<Boolean, T>() {
@Override
public Boolean execute(T statement, Object... args) throws SQLException {
return statement.execute((String) args[0]);
}
}, sql);
}
复制代码
这里直接将逻辑交给咱们的ExecuteTemplate
去执行,有以下代码:
public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
if (sqlRecognizer == null) {
sqlRecognizer = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
statementProxy.getConnectionProxy().getDbType());
}
Executor<T> executor = null;
if (sqlRecognizer == null) {
executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
} else {
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case UPDATE:
executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
break;
}
}
T rs = null;
try {
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException)ex;
}
return rs;
}
}
复制代码
这里是咱们代理执行sql
的核心逻辑,步骤以下:
Statement
。sql
识别器,那么咱们须要生成sql
识别器,这里咱们会借用Druid中对sql
的解析,咱们获取sql
的识别器,咱们经过这个识别器能够获取到不一样类型的sql
语句的一些条件,好比说SQLUpdateRecognizer
是用于update
的sql
识别器,咱们能够直接获取到表名,条件语句,更新的字段,更新字段的值等。sql
识别器的类型,来生成咱们不一样类型的执行器。这里有五种Executor
:INSERT,UPDATE,DELETE
的执行器会进行undolog记录而且记录全局锁,SELECT_FOR_UPDATE
只会进行查询全局锁,有一个默认的表明咱们如今还不支持,什么都不会作直接执行咱们的sql
语句。
对于INSERT,UPDATE,DELETE的执行器会继承咱们的AbstractDMLBaseExecutor
:
protected T executeAutoCommitFalse(Object[] args) throws Throwable {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
protected abstract TableRecords beforeImage() throws SQLException;
protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;
复制代码
在AbstractDMLBaseExecutor
中执行逻辑在executeAutoCommitFalse
这个方法,步骤以下:
sql
以前所受影响行的快照,这里beforeImage
会被不一样类型的sql语句从新实现。sql
语句,并获取结果。sql
以后的快照,这里的afterIamge
也会被不一样类型的sql语句从新实现。undolog
准备好,这里会保存到咱们的ConnectionContext
中。protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
if (beforeImage.getRows().size() == 0 && afterImage.getRows().size() == 0) {
return;
}
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
String lockKeys = buildLockKey(lockKeyRecords);
connectionProxy.appendLockKey(lockKeys);
SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
connectionProxy.appendUndoLog(sqlUndoLog);
}
复制代码
准备UndoLog
的时候会获取咱们的ConnectionProxy
,将咱们的Undolog
和LockKey
保存起来,给后面的本地事务commit
和rollback
使用,上面已经讲过。
上面的4.1.1-4.1.3都是说的是咱们分布式事务的第一阶段,也就是将咱们的分支事务注册到Server
,而第二阶段分支提交和分支回滚都在咱们的DataSourceManager
中,对于分支事务提交有以下代码:
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
}
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
}
return BranchStatus.PhaseTwo_Committed;
}
复制代码
这里将咱们的分支事务提交的信息,放到一个队列中,异步去处理,也就是异步删除咱们的undolog
数据,由于提交以后undolog
数据没用了。
这里有人可能会问若是当你将这个信息异步提交到队列中的时候,机器宕机,那么就不会执行异步删除undolog
的逻辑,那么这条undolog
是否是就会成为永久的脏数据呢?这里Seata
为了防止这种事出现,会定时扫描某些较老的undolog数据而后进行删除,不会污染咱们的数据。
对于咱们的分支事务回滚有以下代码:
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
UndoLogManager.undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}
复制代码
这里会先获取到咱们的数据源,接下来调用咱们的重作日志管理器的undo
方法进行日志重作,undo
方法较长这里就不贴上来了,其核心逻辑是查找到咱们的undolog
而后将里面的快照在咱们数据库进行重作。
TCC
没有AT
模式资源管理这么复杂,部分核心逻辑在以前的Interceptor
中已经讲解过了,好比二阶段方法的保存等。这里主要看看TCC
的分支事务提交和分支事务回滚,在TCCResourceManager
中有:
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource) tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
}
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
if (targetTCCBean == null || commitMethod == null) {
throw new ShouldNeverHappenException("TCC resource is not available, resourceId:" + resourceId);
}
boolean result = false;
//BusinessActionContext
BusinessActionContext businessActionContext =
getBusinessActionContext(xid, branchId, resourceId, applicationData);
Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
LOGGER.info("TCC resource commit result :" + ret + ", xid:" + xid + ", branchId:" + branchId + ", resourceId:" +
resourceId);
if (ret != null && ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult) ret).isSuccess();
} else {
result = (boolean) ret;
}
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
复制代码
步骤以下:
TCC
资源,若是没有抛出异常。commit
方法。commit
方法。Server
,由Server
决定是否重试。这里的branchRollback
方法也比较简单,这里就不作过多分析了。
经过上面分析咱们知道,Seata
的初始化是依赖Spring
去进行,咱们的全局事务的开启/提交/回滚都是依赖咱们的TM事务管理器,而咱们的分支事务的管理是依靠咱们的RM
,其中提供了两个模式AT
和TCC
,AT
模式必须使用数据库,其核心实现是实现数据源的代理,将咱们本身的逻辑注入进去。而咱们的TCC能弥补咱们没有使用数据库的状况,将提交和回滚都交由咱们本身实现,其核心实现逻辑是依赖将一个资源的二阶段的方法和咱们的目标对象在咱们的资源上下文中保存下来,方便咱们后续使用。
最后若是你们对分布式事务感兴趣,欢迎你们使用并阅读Seata
的代码,并给咱们提出建议。
若是你们以为这篇文章对你有帮助,你的关注和转发是对我最大的支持,O(∩_∩)O: