sharding-jdbc在分库分表方面提供了很大的便利性,在使用DB的时候,一般都会涉及到事务这个概念,而在分库分表的环境上再加上事务,就会使事情变得复杂起来。本章试图剖析sharding-jdbc在事务方面的解决思路。spring
传统的事务模型以下:sql
Connection conn = getConnection(); try{ Statement stmt1 = conn.parpareStatement(sql1); stmt1.executeUpdate(); Statement stmt2 = conn.parpareStatement(sql2); stmt2.executeUpdate(); conn.commit(); }catch(Exception e){ conn.rollback(); }
对于同一个链接,能够执行多条sql语句,任何一条语句出现错误的时候,整个操做流程均可以回滚,从而达到事务的原子操做。数据库
再来看最基本的spring事务操做:缓存
class ServiceA(){ public void updateA(){...} } class ServiceB(){ public void updateB(){...} } @Transactional class ServiceC(){ public void updateC(){ serviceA.updateA(); serviceB.updateB(); } }
咱们知道,当updateC执行的时候,不论是updateA仍是updateB出现了异常,updateC均可以总体回滚,达到原子操做的效果,其主要缘由是updateA和updateB共享了同一个Connection,这是spring底层经过ThreadLocal缓存了Connection实现的。网络
以上介绍的这两种状况都只是针对单库单表的原子操做,事务的实现并不难理解,那么在跨库的状况下,sharding-jdbc又是如何解决事务问题的呢?异步
在官方文档中,针对弱事务有以下三点说明:分布式
为了理解以上几点,咱们来看看sharding-jdbc默认是如何处理事务的。ide
这是一个很是常见的处理模式,一个总链接处理了多条sql语句,最后一次性提交整个事务,每一条sql语句可能会分为多条子sql分库分表去执行,这意味着底层可能会关联多个真正的数据库链接,咱们先来看看若是一切正常,commit会如何去处理。post
public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection { @Override public final void commit() throws SQLException { Collection<SQLException> exceptions = new LinkedList<>(); for (Connection each : cachedConnections.values()) { try { each.commit(); } catch (final SQLException ex) { exceptions.add(ex); } } throwSQLExceptionIfNecessary(exceptions); } }
引擎会遍历底层全部真正的数据库链接,一个个进行commit操做,若是任何一个出现了异常,直接捕获异常,可是也只是捕获而已,而后接着下一个链接的commit,这也就很好的说明了,若是在执行任何一条sql语句出现了异常,整个操做是能够原子性回滚的,由于此时全部链接都不会执行commit,但若是已经到了commit这一步的话,若是有链接commit失败了,是不会影响到其余链接的。google
sharding-jdbc的弱事务并非完美的,有时可能会致使数据的一致性问题,因此针对某些特定的场景,又提出了柔性事务的概念。先来看一张官方的说明图:
这里想表达两个意思:
1. 对于sql的执行,在执行前记录日志,若是执行成功,把日志删除,若是执行失败,重试必定次数(若是未达到最大尝试次数便执行成功了,同样删除日志)。
2. 异步任务不断扫描执行日志,若是重试次数未达到最大上限,尝试从新执行,若是执行成功,删除日志。
从上面两点分析能够看出,因为采用的是重试的模式,也就是说同一条语句,是有可能被屡次执行的,因此官方提到了柔性事务的适用场景:
并且它还有必定的限制: SQL须要知足幂等性,具体为:
在有了一个大概的了解以后,咱们来更加深刻的了解。
sharding-jdbc使用了google的EventBus事件模型,注册了一个Listener,监听器对三种事件进行了处理,以下代码所示:
switch (event.getEventExecutionType()) { case BEFORE_EXECUTE: transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0)); return; case EXECUTE_SUCCESS: transactionLogStorage.remove(event.getId()); return; case EXECUTE_FAILURE: boolean deliverySuccess = false; for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) { if (deliverySuccess) { return; } boolean isNewConnection = false; Connection conn = null; PreparedStatement preparedStatement = null; try { conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML); if (!isValidConnection(conn)) { bedSoftTransaction.getConnection().release(conn); conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML); isNewConnection = true; } preparedStatement = conn.prepareStatement(event.getSql()); //TODO for batch event need split to 2-level records for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) { preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex)); } preparedStatement.executeUpdate(); deliverySuccess = true; transactionLogStorage.remove(event.getId()); } catch (final SQLException ex) { log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex); } finally { close(isNewConnection, conn, preparedStatement); } } return; default: throw new UnsupportedOperationException(event.getEventExecutionType().toString()); }
以上代码能够抽取为以下图的描述:
监听器根据三种不一样的事件类型对事务日志进行不一样的操做。有监听 ,必然就有事件的投递,那么引擎是何时产生这些事件的呢?
咱们知道每一条sql语句拆分后有可能对应多条子sql语句,而每一条子sql语句是单独执行的,执行是封装在一个内部方法的:
private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback, final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception { synchronized (baseStatementUnit.getStatement().getConnection()) { T result; ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown); ExecutorDataMap.setDataMap(dataMap); List<AbstractExecutionEvent> events = new LinkedList<>(); if (parameterSets.isEmpty()) { events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList())); } for (List<Object> each : parameterSets) { events.add(getExecutionEvent(sqlType, baseStatementUnit, each)); } for (AbstractExecutionEvent event : events) { EventBusInstance.getInstance().post(event); } try { result = executeCallback.execute(baseStatementUnit); } catch (final SQLException ex) { for (AbstractExecutionEvent each : events) { each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE); each.setException(Optional.of(ex)); EventBusInstance.getInstance().post(each); ExecutorExceptionHandler.handleException(ex); } return null; } for (AbstractExecutionEvent each : events) { each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS); EventBusInstance.getInstance().post(each); } return result; } }
以上代码能够简化为以下流程:
其实执行流程比较简单,但还有两个重要的细节这里没有体现:
当使用柔性事务的时候,须要建立事务管理器,并获取事务对象,调用事务对象的begin开始一个事务,在这一步,会强制设置链接的autoCommit=true,这会致使全部的sql语句执时后当即提交,想一想若是能回滚,那柔性事务也就失去了意义。
当事务执行begin时,会标记当前不抛出异常,这样当执行sql语句有异常时,会生成相应的EXECUTE_FAILURE事件,从而进行事务日志处理,而不是往外抛出异常,当事务结束时,调用事务对象的end方法,恢复异常的捕获。
一个常见的代码编写模式以下(来自官方的demo)
private static void updateFailure(final DataSource dataSource) throws SQLException { String sql1 = "UPDATE t_order SET status='UPDATE_1' WHERE user_id=10 AND order_id=1000"; String sql2 = "UPDATE t_order SET not_existed_column=1 WHERE user_id=1 AND order_id=?"; String sql3 = "UPDATE t_order SET status='UPDATE_2' WHERE user_id=10 AND order_id=1000"; SoftTransactionManager transactionManager = new SoftTransactionManager(getSoftTransactionConfiguration(dataSource)); transactionManager.init(); BEDSoftTransaction transaction = (BEDSoftTransaction) transactionManager.getTransaction(SoftTransactionType.BestEffortsDelivery); Connection conn = null; try { conn = dataSource.getConnection(); transaction.begin(conn); PreparedStatement preparedStatement1 = conn.prepareStatement(sql1); PreparedStatement preparedStatement2 = conn.prepareStatement(sql2); preparedStatement2.setObject(1, 1000); PreparedStatement preparedStatement3 = conn.prepareStatement(sql3); preparedStatement1.executeUpdate(); preparedStatement2.executeUpdate(); preparedStatement3.executeUpdate(); } finally { transaction.end(); if (conn != null) { conn.close(); } } }
看到这个编写模式,你必定会想,若是我使用MyBatis和spring,这一切可否整合起来,这个话题有兴趣你们能够去尝试。
分布式事务处理起来有必定的难度,sharding-jdbc采用了简单的弱事务模式和特殊场景下的柔性事务模式,没有最好,只有更好,根据自身业务去选择事务模式才是最重要的。