摘要: 原创出处 http://www.iocoder.cn/Sharding-JDBC/transaction-bed/ 「芋道源码」欢迎转载,保留摘要,谢谢!java
本文主要基于 Sharding-JDBC 1.5.0 正式版git
🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:github
- RocketMQ / MyCAT / Sharding-JDBC 全部源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言都将获得认真回复。甚至不知道如何读源码也能够请教噢。
- 新的源码解析文章实时收到通知。每周更新一篇左右。
- 认真的源码交流微信群。
数据库表分库后,业务场景下的单库本地事务可能变成跨库分布式事务。虽然咱们能够经过合适的分库规则让操做的数据在同库下,继续保证单库本地事务,这也是很是推崇的,但不是全部场景下都能适用。若是这些场景对事务的一致性有要求,咱们就不得不解决分布式事务的“麻烦”。sql
分布式事务是个很大的话题,咱们来看看 Sharding-JDBC 对她的权衡:数据库
Sharding-JDBC因为性能方面的考量,决定不支持强一致性分布式事务。咱们已明确规划线路图,将来会支持最终一致性的柔性事务。微信
Sharding-JDBC 提供了两种 柔性事务:架构
本文分享 最大努力送达型 的实现。建议前置阅读:《Sharding-JDBC 源码分析 —— SQL 执行》。运维
Sharding-JDBC 正在收集使用公司名单:传送门。
🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门
Sharding-JDBC 也会所以,可以覆盖更多的业务场景。传送门
登记吧,骚年!传送门dom
概念异步
在分布式数据库的场景下,相信对于该数据库的操做最终必定能够成功,因此经过最大努力反复尝试送达操做。
从概念看,可能不是很直白的理解是什么意思,本文会最大努力让你干净理解。
架构图
执行过程有 四种 状况:
总体成漏斗倒三角,上一个阶段失败,交给下一个阶段重试:
整个过程经过以下 组件 完成:
下面,咱们逐节分享每一个组件。
柔性事务管理器,SoftTransactionManager 实现,负责对柔性事务配置( SoftTransactionConfiguration ) 、柔性事务( AbstractSoftTransaction )的管理。
调用 #init()
初始化柔性管理器:
// SoftTransactionManager.java /** * 柔性事务配置对象 */ @Getter private final SoftTransactionConfiguration transactionConfig; // SoftTransactionManager.java /** * 初始化事务管理器. */ public void init() throws SQLException { // 初始化 最大努力送达型事务监听器 EventBusInstance.getInstance().register(new BestEffortsDeliveryListener()); // 初始化 事务日志数据库存储表 if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) { Preconditions.checkNotNull(transactionConfig.getTransactionLogDataSource()); createTable(); } // 初始化 内嵌的最大努力送达型异步做业 if (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) { new NestedBestEffortsDeliveryJobFactory(transactionConfig).init(); } }
transaction_log
)**不存在则进行建立。在『事务日志存储器』小节会详细分享SoftTransactionConfiguration
SoftTransactionConfiguration,柔性事务配置对象。
public class SoftTransactionConfiguration { /** * 事务管理器管理的数据源. */ @Getter(AccessLevel.NONE) private final DataSource targetDataSource; /** * 同步的事务送达的最大尝试次数. */ private int syncMaxDeliveryTryTimes = 3; /** * 事务日志存储类型. */ private TransactionLogDataSourceType storageType = RDB; /** * 存储事务日志的数据源. */ private DataSource transactionLogDataSource; /** * 内嵌的最大努力送达型异步做业配置对象. */ private Optional<NestedBestEffortsDeliveryJobConfiguration> bestEffortsDeliveryJobConfiguration = Optional.absent(); }
在 Sharding-JDBC 里,目前柔性事务分红两种:
继承 AbstractSoftTransaction
public abstract class AbstractSoftTransaction { /** * 分片链接原自动提交状态 */ private boolean previousAutoCommit; /** * 分片链接 */ @Getter private ShardingConnection connection; /** * 事务类型 */ @Getter private SoftTransactionType transactionType; /** * 事务编号 */ @Getter private String transactionId; }
AbstractSoftTransaction 实现了开启柔性事务、关闭柔性事务两个方法提供给子类调用:
#beginInternal()
/** * 开启柔性 * * @param conn 分片链接 * @param type 事务类型 * @throws SQLException */ protected final void beginInternal(final Connection conn, final SoftTransactionType type) throws SQLException { // TODO 判断若是在传统事务中,则抛异常 Preconditions.checkArgument(conn instanceof ShardingConnection, "Only ShardingConnection can support eventual consistency transaction."); // 设置执行错误,不抛出异常 ExecutorExceptionHandler.setExceptionThrown(false); connection = (ShardingConnection) conn; transactionType = type; // 设置自动提交状态 previousAutoCommit = connection.getAutoCommit(); connection.setAutoCommit(true); // 生成事务编号 // TODO 替换UUID为更有效率的id生成器 transactionId = UUID.randomUUID().toString(); }
调用 ExecutorExceptionHandler.setExceptionThrown(false)
设置执行 SQL 错误时,也不抛出异常。
调用 connection.setAutoCommit(true);
,设置执行自动提交。使用最大努力型事务时,上层业务执行 SQL 会立刻提交,即便调用 Connection#rollback()
也是没法回滚的,这点必定要注意。
#end()
/** * 结束柔性事务. */ public final void end() throws SQLException { if (connection != null) { ExecutorExceptionHandler.setExceptionThrown(true); connection.setAutoCommit(previousAutoCommit); SoftTransactionManager.closeCurrentTransactionManager(); } } // SoftTransactionManager.java /** * 关闭当前的柔性事务管理器. */ static void closeCurrentTransactionManager() { ExecutorDataMap.getDataMap().put(TRANSACTION, null); ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, null); }
#end()
清理线程变量。不然,下次请求使用到该线程,会继续在这个柔性事务内。BEDSoftTransaction
BEDSoftTransaction,最大努力送达型柔性事务。
public class BEDSoftTransaction extends AbstractSoftTransaction { /** * 开启柔性事务. * * @param connection 数据库链接对象 */ public void begin(final Connection connection) throws SQLException { beginInternal(connection, SoftTransactionType.BestEffortsDelivery); } }
TCCSoftTransaction
TCCSoftTransaction,TCC 型柔性事务,暂未实现。实现后,会更新到 《Sharding-JDBC 源码分析 —— 分布式事务(二)之事务补偿型》。
经过调用 SoftTransactionManager#getTransaction()
建立柔性事务对象:
/** * {@link ExecutorDataMap#dataMap} 柔性事务对象 key */ private static final String TRANSACTION = "transaction"; /** * {@link ExecutorDataMap#dataMap} 柔性事务配置 key */ private static final String TRANSACTION_CONFIG = "transactionConfig"; // SoftTransactionManager.java /** * 建立柔性事务. * * @param type 柔性事务类型 * @return 柔性事务 */ public AbstractSoftTransaction getTransaction(final SoftTransactionType type) { AbstractSoftTransaction result; switch (type) { case BestEffortsDelivery: result = new BEDSoftTransaction(); break; case TryConfirmCancel: result = new TCCSoftTransaction(); break; default: throw new UnsupportedOperationException(type.toString()); } // TODO 目前使用不支持嵌套事务,之后这里须要可配置 if (getCurrentTransaction().isPresent()) { throw new UnsupportedOperationException("Cannot support nested transaction."); } ExecutorDataMap.getDataMap().put(TRANSACTION, result); ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, transactionConfig); return result; }
后续能够从 ExecutorDataMap 中获取当前线程的柔性事务和柔性事务配置:
// SoftTransactionManager.java /** * 获取当前线程的柔性事务配置. * * @return 当前线程的柔性事务配置 */ public static Optional<SoftTransactionConfiguration> getCurrentTransactionConfiguration() { Object transactionConfig = ExecutorDataMap.getDataMap().get(TRANSACTION_CONFIG); return (null == transactionConfig) ? Optional.<SoftTransactionConfiguration>absent() : Optional.of((SoftTransactionConfiguration) transactionConfig); } /** * 获取当前的柔性事务. * * @return 当前的柔性事务 */ public static Optional<AbstractSoftTransaction> getCurrentTransaction() { Object transaction = ExecutorDataMap.getDataMap().get(TRANSACTION); return (null == transaction) ? Optional.<AbstractSoftTransaction>absent() : Optional.of((AbstractSoftTransaction) transaction); }
柔性事务执行过程当中,会经过事务日志( TransactionLog ) 记录每条 SQL 执行状态:
经过实现事务日志存储器接口( TransactionLogStorage ),提供存储功能。目前有两种实现:
本节只分析 RdbTransactionLogStorage。对 MemoryTransactionLogStorage 感兴趣的同窗能够点击连接传送到达。
TransactionLogStorage 有五个接口方法,下文每一个小标题都是一个方法。
// TransactionLogStorage.java /** * 存储事务日志. * * @param transactionLog 事务日志 */ void add(TransactionLog transactionLog); // RdbTransactionLogStorage.java @Override public void add(final TransactionLog transactionLog) { String sql = "INSERT INTO `transaction_log` (`id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?);"; try ( // ... 省略你熟悉的代码 } catch (final SQLException ex) { throw new TransactionLogStorageException(ex); } }
#add()
和下文的 #remove()
异常时,都打印下异常日志都文件系统TransactionLog (transaction_log) 数据库表结构以下:
字段 | 名字 | 数据库类型 | 备注 |
---|---|---|---|
id | 事件编号 | VARCHAR(40) | EventBus 事件编号,非事务编号 |
transaction_type | 柔性事务类型 | VARCHAR(30) | |
data_source | 真实数据源名 | VARCHAR(255) | |
sql | 执行 SQL | TEXT | 已经改写过的 SQL |
parameters | 占位符参数 | TEXT | JSON 字符串存储 |
creation_time | 记录时间 | LONG | |
async_delivery_try_times | 已异步重试次数 | INT |
// TransactionLogStorage.java /** * 根据主键删除事务日志. * * @param id 事务日志主键 */ void remove(String id); // RdbTransactionLogStorage.java @Override public void remove(final String id) { String sql = "DELETE FROM `transaction_log` WHERE `id`=?;"; try ( // ... 省略你熟悉的代码 } catch (final SQLException ex) { throw new TransactionLogStorageException(ex); } }
// TransactionLogStorage.java /** * 读取须要处理的事务日志. * * <p>须要处理的事务日志为: </p> * <p>1. 异步处理次数小于最大处理次数.</p> * <p>2. 异步处理的事务日志早于异步处理的间隔时间.</p> * * @param size 获取日志的数量 * @param maxDeliveryTryTimes 事务送达的最大尝试次数 * @param maxDeliveryTryDelayMillis 执行送达事务的延迟毫秒数. */ List<TransactionLog> findEligibleTransactionLogs(int size, int maxDeliveryTryTimes, long maxDeliveryTryDelayMillis); // RdbTransactionLogStorage.java @Override public List<TransactionLog> findEligibleTransactionLogs(final int size, final int maxDeliveryTryTimes, final long maxDeliveryTryDelayMillis) { List<TransactionLog> result = new ArrayList<>(size); String sql = "SELECT `id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`, `async_delivery_try_times` " + "FROM `transaction_log` WHERE `async_delivery_try_times`<? AND `transaction_type`=? AND `creation_time`<? LIMIT ?;"; try (Connection conn = dataSource.getConnection()) { // ... 省略你熟悉的代码 } catch (final SQLException ex) { throw new TransactionLogStorageException(ex); } return result; }
// TransactionLogStorage.java /** * 增长事务日志异步重试次数. * * @param id 事务主键 */ void increaseAsyncDeliveryTryTimes(String id); // RdbTransactionLogStorage.java @Override public void increaseAsyncDeliveryTryTimes(final String id) { String sql = "UPDATE `transaction_log` SET `async_delivery_try_times`=`async_delivery_try_times`+1 WHERE `id`=?;"; try ( // ... 省略你熟悉的代码 } catch (final SQLException ex) { throw new TransactionLogStorageException(ex); } }
// TransactionLogStorage.java /** * 处理事务数据. * * @param connection 业务数据库链接 * @param transactionLog 事务日志 * @param maxDeliveryTryTimes 事务送达的最大尝试次数 */ boolean processData(Connection connection, TransactionLog transactionLog, int maxDeliveryTryTimes); // RdbTransactionLogStorage.java @Override public boolean processData(final Connection connection, final TransactionLog transactionLog, final int maxDeliveryTryTimes) { // 重试执行失败 SQL try ( Connection conn = connection; PreparedStatement preparedStatement = conn.prepareStatement(transactionLog.getSql())) { for (int parameterIndex = 0; parameterIndex < transactionLog.getParameters().size(); parameterIndex++) { preparedStatement.setObject(parameterIndex + 1, transactionLog.getParameters().get(parameterIndex)); } preparedStatement.executeUpdate(); } catch (final SQLException ex) { // 重试失败,更新事务日志,增长已异步重试次数 increaseAsyncDeliveryTryTimes(transactionLog.getId()); throw new TransactionCompensationException(ex); } // 移除重试执行成功 SQL 对应的事务日志 remove(transactionLog.getId()); return true; }
#processData()
是带有一些逻辑的。根据事务日志( TransactionLog )重试执行失败的 SQL,若成功,移除事务日志;若失败,更新事务日志,增长已异步重试次数最大努力送达型事务监听器,BestEffortsDeliveryListener,负责记录事务日志、同步重试执行失败 SQL。
// BestEffortsDeliveryListener.java @Subscribe @AllowConcurrentEvents public void listen(final DMLExecutionEvent event) { if (!isProcessContinuously()) { return; } SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get(); TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource()); BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get(); switch (event.getEventExecutionType()) { case BEFORE_EXECUTE: // 执行前,插入事务日志 //TODO 对于批量执行的SQL须要解析成两层列表 transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0)); return; case EXECUTE_SUCCESS: // 执行成功,移除事务日志 transactionLogStorage.remove(event.getId()); return; case EXECUTE_FAILURE: // 执行失败,同步重试 boolean deliverySuccess = false; for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) { // 同步【屡次】重试 if (deliverySuccess) { return; } boolean isNewConnection = false; Connection conn = null; PreparedStatement preparedStatement = null; try { // 得到数据库链接 conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML); if (!isValidConnection(conn)) { // 由于可能执行失败是数据库链接异常,因此判断一次,若是无效,从新获取数据库链接 bedSoftTransaction.getConnection().release(conn); conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML); isNewConnection = true; } preparedStatement = conn.prepareStatement(event.getSql()); // 同步重试 //TODO 对于批量事件须要解析成两层列表 for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) { preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex)); } preparedStatement.executeUpdate(); deliverySuccess = true; // 同步重试成功,移除事务日志 transactionLogStorage.remove(event.getId()); } catch (final SQLException ex) { log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex); } finally { close(isNewConnection, conn, preparedStatement); } } return; default: throw new UnsupportedOperationException(event.getEventExecutionType().toString()); } }
BestEffortsDeliveryListener 经过 EventBus 实现监听 SQL 的执行。Sharding-JDBC 如何实现 EventBus 的,请看《Sharding-JDBC 源码分析 —— SQL 执行》
调用 #isProcessContinuously()
方法判断是否处于最大努力送达型事务中,当且仅当处于该状态才进行监听事件处理
SQL 执行前,插入事务日志
SQL 执行成功,移除事务日志
SQL 执行失败,根据柔性事务配置( SoftTransactionConfiguration )同步的事务送达的最大尝试次数( syncMaxDeliveryTryTimes
)进行屡次重试直到成功。整体逻辑和 RdbTransactionLogStorage#processData()
方法逻辑相似,区别在于获取分片数据库链接的特殊处理:此处调用失败,数据库链接多是异常无效的,所以调用了 #isValidConnection()
判断链接的有效性。若无效,则从新获取分片数据库链接。另外,如果从新获取分片数据库链接,须要进行关闭释放 (Connection#close()
):
// BestEffortsDeliveryListener.java /** * 经过 SELECT 1 校验数据库链接是否有效 * * @param conn 数据库链接 * @return 是否有效 */ private boolean isValidConnection(final Connection conn) { try (PreparedStatement preparedStatement = conn.prepareStatement("SELECT 1")) { try (ResultSet rs = preparedStatement.executeQuery()) { return rs.next() && 1 == rs.getInt("1"); } } catch (final SQLException ex) { return false; } } /** * 关闭释放预编译SQL对象和数据库链接 * * @param isNewConnection 是否新建立的数据库链接,是的状况下才释放 * @param conn 数据库链接 * @param preparedStatement 预编译SQL */ private void close(final boolean isNewConnection, final Connection conn, final PreparedStatement preparedStatement) { if (null != preparedStatement) { try { preparedStatement.close(); } catch (final SQLException ex) { log.error("PreparedStatement closed error:", ex); } } if (isNewConnection && null != conn) { try { conn.close(); } catch (final SQLException ex) { log.error("Connection closed error:", ex); } } }
当最大努力送达型事务监听器( BestEffortsDeliveryListener )屡次同步重试失败后,交给最大努力送达型异步做业进行屡次异步重试,而且屡次执行有固定间隔。
Sharding-JDBC 提供了两个最大努力送达型异步做业实现:
二者实现代码逻辑基本一致。前者相比后者,用于开发测试,去除对 Zookeeper 依赖,没法实现高可用,所以生产环境下不适合使用。
BestEffortsDeliveryJob 所在 Maven 项目为 sharding-jdbc-transaction-async-job
,基于当当开源的 Elastic-Job 实现。以下是官方对该 Maven 项目的简要说明:
因为柔性事务采用异步尝试,须要部署独立的做业和Zookeeper。sharding-jdbc-transaction采用elastic-job实现的sharding-jdbc-transaction-async-job,经过简单配置便可启动高可用做业异步送达柔性事务,启动脚本为start.sh。
BestEffortsDeliveryJob
public class BestEffortsDeliveryJob extends AbstractIndividualThroughputDataFlowElasticJob<TransactionLog> { /** * 最大努力送达型异步做业配置对象 */ @Setter private BestEffortsDeliveryConfiguration bedConfig; /** * 事务日志存储器对象 */ @Setter private TransactionLogStorage transactionLogStorage; @Override public List<TransactionLog> fetchData(final JobExecutionMultipleShardingContext context) { return transactionLogStorage.findEligibleTransactionLogs(context.getFetchDataCount(), bedConfig.getJobConfig().getMaxDeliveryTryTimes(), bedConfig.getJobConfig().getMaxDeliveryTryDelayMillis()); } @Override public boolean processData(final JobExecutionMultipleShardingContext context, final TransactionLog data) { try ( Connection conn = bedConfig.getTargetDataSource(data.getDataSource()).getConnection()) { transactionLogStorage.processData(conn, data, bedConfig.getJobConfig().getMaxDeliveryTryTimes()); } catch (final SQLException | TransactionCompensationException ex) { log.error(String.format("Async delivery times %s error, max try times is %s, exception is %s", data.getAsyncDeliveryTryTimes() + 1, bedConfig.getJobConfig().getMaxDeliveryTryTimes(), ex.getMessage())); return false; } return true; } @Override public boolean isStreamingProcess() { return false; } }
#fetchData()
方法获取须要处理的事务日志 (TransactionLog),内部调用了 TransactionLogStorage#findEligibleTransactionLogs()
方法#processData()
方法处理事务日志,重试执行失败的 SQL,内部调用了 TransactionLogStorage#processData()
#fetchData()
和 #processData()
调用是 Elastic-Job 控制的。每一轮定时调度,每条事务日志只执行一次。当超过最大异步调用次数后,该条事务日志再也不处理,因此生产使用时,最好增长下相应监控超过最大异步重试次数的事务日志。AsyncSoftTransactionJobConfiguration,异步柔性事务做业配置对象。
public class AsyncSoftTransactionJobConfiguration { /** * 做业名称. */ private String name = "bestEffortsDeliveryJob"; /** * 触发做业的cron表达式. */ private String cron = "0/5 * * * * ?"; /** * 每次做业获取的事务日志最大数量. */ private int transactionLogFetchDataCount = 100; /** * 事务送达的最大尝试次数. */ private int maxDeliveryTryTimes = 3; /** * 执行事务的延迟毫秒数. * * <p>早于此间隔时间的入库事务才会被做业执行.</p> */ private long maxDeliveryTryDelayMillis = 60 * 1000L; }
Sharding-JDBC 提供的最大努力送达型异步做业实现( BestEffortsDeliveryJob ),经过与 Elastic-Job 集成,能够很便捷而且有质量保证的高可用、高性能使用。一部分团队,可能已经引入或自研了相似 Elastic-Job 的分布式做业中间件解决方案,每多一个中间件,就是多一个学习与运维成本。那么是否可使用本身的分布式做业解决方案?答案是,能够的。参考 BestEffortsDeliveryJob 的实现,经过调用 TransactionLogStorage 来实现:
// 伪代码(不考虑性能、异常) List<TransactionLog> transactionLogs = transactionLogStorage.findEligibleTransactionLogs(....); for (TransactionLog transactionLog : transactionLogs) { transactionLogStorage.processData(conn, log, maxDeliveryTryTimes); }
固然,我的仍是很推荐 Elastic-Job。
😈 笔者要开始写《Elastic-Job 源码分析》。
另外,若是有支持事务消息的分布式队列系统,能够经过 TransactionLogStorage 实现存储事务消息存储成消息。为何要支持事务消息?若是 SQL 执行是成功的,须要回滚(删除)事务消息。
哈哈哈
算是坚持把这个系列写完了,给本身 32 個赞。
知足!
《Elastic-Job 源码分析》 走起!不 High 不结束!