摘要: 原创出处 www.iocoder.cn/Sharding-JD… 「芋道源码」欢迎转载,保留摘要,谢谢!html
本文主要基于 Sharding-JDBC 1.5.0 正式版 java
🙂🙂🙂关注微信公众号:【芋道源码】有福利: mysql
- RocketMQ / MyCAT / Sharding-JDBC 全部源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言都将获得认真回复。甚至不知道如何读源码也能够请教噢。
- 新的源码解析文章实时收到通知。每周更新一篇左右。
- 认真的源码交流微信群。
- 掘金Java QQ 群:217878901
越过千山万水(SQL 解析、SQL 路由、SQL 改写),咱们终于来到了 SQL 执行。开森不开森?!git
本文主要分享SQL 执行的过程,不包括结果聚合。《结果聚合》 东半球第二良心笔者会更新,关注微信公众号【芋道源码】完稿后第一时间通知您哟。github
绿框部分 SQL 执行主流程。sql
Sharding-JDBC 正在收集使用公司名单:传送门。
🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门
Sharding-JDBC 也会所以,可以覆盖更多的业务场景。传送门
登记吧,骚年!传送门数据库
ExecutorEngine,SQL执行引擎。编程
分表分库,须要执行的 SQL 数量从单条变成了多条,此时有两种方式执行:数组
前者,编码容易,性能较差,总耗时是多条 SQL 执行时间累加。
后者,编码复杂,性能较好,总耗时约等于执行时间最长的 SQL。安全
👼 ExecutorEngine 固然采用的是后者,并行执行 SQL。
Guava( Java 工具库 ) 提供的继承自 ExecutorService 的线程服务接口,提供建立 ListenableFuture 功能。ListenableFuture 接口,继承 Future 接口,有以下好处:
咱们强烈地建议你在代码中多使用ListenableFuture来代替JDK的 Future, 由于:
- 大多数Futures 方法中须要它。
- 转到ListenableFuture 编程比较容易。
- Guava提供的通用公共类封装了公共的操做方方法,不须要提供Future和ListenableFuture的扩展方法。
传统JDK中的Future经过异步的方式计算返回结果:在多线程运算中可能或者可能在没有结束返回结果,Future是运行中的多线程的一个引用句柄,确保在服务执行返回一个Result。
ListenableFuture能够容许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用, 或者在运算(多线程执行)完成后当即执行。这样简单的改进,使得能够明显的支持更多的操做,这样的功能在JDK concurrent中的Future是不支持的。
如上内容来自《Google Guava包的ListenableFuture解析
》,文章写的很棒。下文你会看到 Sharding-JDBC 是如何经过 ListenableFuture 简化并发编程的。
下面看看 ExecutorEngine 如何初始化 ListeningExecutorService
// ShardingDataSource.java
public ShardingDataSource(final ShardingRule shardingRule, final Properties props) {
// .... 省略部分代码
shardingProperties = new ShardingProperties(props);
int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
executorEngine = new ExecutorEngine(executorSize);
// .... 省略部分代码
}
// ExecutorEngine
public ExecutorEngine(final int executorSize) {
executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(
executorSize, executorSize, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShardingJDBC-%d").build()));
MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
}复制代码
MoreExecutors#listeningDecorator()
建立 ListeningExecutorService,这样 #submit()
,#invokeAll()
能够返回 ListenableFuture。#setNameFormat()
并发编程时,必定要对线程名字作下定义,这样排查问题会方便不少。MoreExecutors#addDelayedShutdownHook()
,应用关闭时,等待全部任务所有完成再关闭。默认配置等待时间为 60 秒,建议将等待时间作成可配的。数据源关闭时,会调用 ExecutorEngine 也进行关闭。
// ShardingDataSource.java
@Override
public void close() {
executorEngine.close();
}
// ExecutorEngine
@Override
public void close() {
executorService.shutdownNow();
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (final InterruptedException ignored) {
}
if (!executorService.isTerminated()) {
throw new ShardingJdbcException("ExecutorEngine can not been terminated");
}
}复制代码
#shutdownNow()
尝试使用 Thread.interrupt()
打断正在执行中的任务,未执行的任务再也不执行。建议打印下哪些任务未执行,由于 SQL 未执行,可能数据未能持久化。#awaitTermination()
由于 #shutdownNow()
打断不是当即结束,须要一个过程,所以这里等待了 5 秒。ExecutorEngine 对外暴露 #executeStatement()
,#executePreparedStatement()
,#executeBatch()
三个方法分别提供给 StatementExecutor、PreparedStatementExecutor、BatchPreparedStatementExecutor 调用。而这三个方法,内部调用的都是 #execute()
私有方法。
// ExecutorEngine.java
/** * 执行Statement. * @param sqlType SQL类型 * @param statementUnits 语句对象执行单元集合 * @param executeCallback 执行回调函数 * @param <T> 返回值类型 * @return 执行结果 */
public <T> List<T> executeStatement(final SQLType sqlType, final Collection<StatementUnit> statementUnits, final ExecuteCallback<T> executeCallback) {
return execute(sqlType, statementUnits, Collections.<List<Object>>emptyList(), executeCallback);
}
/** * 执行PreparedStatement. * @param sqlType SQL类型 * @param preparedStatementUnits 语句对象执行单元集合 * @param parameters 参数列表 * @param executeCallback 执行回调函数 * @param <T> 返回值类型 * @return 执行结果 */
public <T> List<T> executePreparedStatement( final SQLType sqlType, final Collection<PreparedStatementUnit> preparedStatementUnits, final List<Object> parameters, final ExecuteCallback<T> executeCallback) {
return execute(sqlType, preparedStatementUnits, Collections.singletonList(parameters), executeCallback);
}
/** * 执行Batch. * @param sqlType SQL类型 * @param batchPreparedStatementUnits 语句对象执行单元集合 * @param parameterSets 参数列表集 * @param executeCallback 执行回调函数 * @return 执行结果 */
public List<int[]> executeBatch(
final SQLType sqlType, final Collection<BatchPreparedStatementUnit> batchPreparedStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<int[]> executeCallback) {
return execute(sqlType, batchPreparedStatementUnits, parameterSets, executeCallback);
}复制代码
#execute()
执行过程大致流程以下图:
/** * 执行 * * @param sqlType SQL 类型 * @param baseStatementUnits 语句对象执行单元集合 * @param parameterSets 参数列表集 * @param executeCallback 执行回调函数 * @param <T> 返回值类型 * @return 执行结果 */
private <T> List<T> execute( final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
if (baseStatementUnits.isEmpty()) {
return Collections.emptyList();
}
Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
BaseStatementUnit firstInput = iterator.next();
// 第二个任务开始全部 SQL任务 提交线程池【异步】执行任务
ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
T firstOutput;
List<T> restOutputs;
try {
// 第一个任务【同步】执行任务
firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
// 等待第二个任务开始全部 SQL任务完成
restOutputs = restFutures.get();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
ExecutorExceptionHandler.handleException(ex);
return null;
}
// 返回结果
List<T> result = Lists.newLinkedList(restOutputs);
result.add(0, firstOutput);
return result;
}复制代码
#executeInternal()
执行任务。private <T> T syncExecute(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) throws Exception {
// 【同步】执行任务
return executeInternal(sqlType, baseStatementUnit, parameterSets, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap());
}复制代码
#executeInternal()
执行任务。private <T> ListenableFuture<List<T>> asyncExecute(
final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
for (final BaseStatementUnit each : baseStatementUnits) {
// 提交线程池【异步】执行任务
result.add(executorService.submit(new Callable<T>() {
@Override
public T call() throws Exception {
return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
}
}));
}
// 返回 ListenableFuture
return Futures.allAsList(result);
}复制代码
Futures.allAsList(result);
和 restOutputs = restFutures.get();
。神器 Guava 简化并发编程 的好处就提现出来了。ListenableFuture#get()
当全部任务都成功时,返回全部任务执行结果;当任何一个任务失败时,立刻抛出异常,无需等待其余任务执行完成。_😮 Guava 真她喵神器,公众号:【芋道源码】会更新 Guava 源码分享的一个系列哟!老司机还不赶忙上车?_
// ExecutorEngine.java
private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback, final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
synchronized (baseStatementUnit.getStatement().getConnection()) {
T result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
List<AbstractExecutionEvent> events = new LinkedList<>();
// 生成 Event
if (parameterSets.isEmpty()) {
events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));
} else {
for (List<Object> each : parameterSets) {
events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
}
}
// EventBus 发布 EventExecutionType.BEFORE_EXECUTE
for (AbstractExecutionEvent event : events) {
EventBusInstance.getInstance().post(event);
}
try {
// 执行回调函数
result = executeCallback.execute(baseStatementUnit);
} catch (final SQLException ex) {
// EventBus 发布 EventExecutionType.EXECUTE_FAILURE
for (AbstractExecutionEvent each : events) {
each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
each.setException(Optional.of(ex));
EventBusInstance.getInstance().post(each);
ExecutorExceptionHandler.handleException(ex);
}
return null;
}
// EventBus 发布 EventExecutionType.EXECUTE_SUCCESS
for (AbstractExecutionEvent each : events) {
each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
EventBusInstance.getInstance().post(each);
}
return result;
}
}复制代码
result = executeCallback.execute(baseStatementUnit);
执行回调函数。StatementExecutor,PreparedStatementExecutor,BatchPreparedStatementExecutor 经过传递执行回调函数( ExecuteCallback )实现给 ExecutorEngine 实现并行执行。public interface ExecuteCallback<T> {
/** * 执行任务. * * @param baseStatementUnit 语句对象执行单元 * @return 处理结果 * @throws Exception 执行期异常 */
T execute(BaseStatementUnit baseStatementUnit) throws Exception;
}复制代码
synchronized (baseStatementUnit.getStatement().getConnection())
原觉得 Connection 非线程安全,所以须要用同步,后翻查资料《数据库链接池为何要创建多个链接》,Connection 是线程安全的。等跟张亮大神请教确认缘由后,咱会进行更新。
解答:MySQL、Oracle 的 Connection 实现是线程安全的。数据库链接池实现的 Connection 不必定是线程安全,例如 Druid 的线程池 Connection 非线程安全
FROM github.com/dangdangdot…
druid的数据源的stat这种filter在并发使用同一个connection连接时没有考虑线程安全的问题,故形成多个线程修改filter中的状态异常。
改造这个问题时,考虑到mysql驱动在执行statement时对同一个connection是线程安全的。也就是说同一个数据库连接的会话是串行执行的。故在sjdbc的executor对于多线程执行的状况也进行了针对数据库连接级别的同步。故该方案不会下降sjdbc的性能。
同时jdk1.7版本的同步采用了锁升级技术,在碰撞较低的状况下开销也是很小的。
ExecutionEvent 这里先不解释,在本文第四节【EventBus】分享。
Executor,执行器,目前一共有三个执行器。不一样的执行器对应不一样的执行单元 (BaseStatementUnit)。
执行器类 | 执行器名 | 执行单元 |
---|---|---|
StatementExecutor | 静态语句对象执行单元 | StatementUnit |
PreparedStatementExecutor | 预编译语句对象请求的执行器 | PreparedStatementUnit |
BatchPreparedStatementExecutor | 批量预编译语句对象请求的执行器 | BatchPreparedStatementUnit |
StatementExecutor,多线程执行静态语句对象请求的执行器,一共有三类方法:
#executeQuery()
// StatementExecutor.java
/** * 执行SQL查询. * @return 结果集列表 */
public List<ResultSet> executeQuery() {
Context context = MetricsContext.start("ShardingStatement-executeQuery");
List<ResultSet> result;
try {
result = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<ResultSet>() {
@Override
public ResultSet execute(final BaseStatementUnit baseStatementUnit) throws Exception {
return baseStatementUnit.getStatement().executeQuery(baseStatementUnit.getSqlExecutionUnit().getSql());
}
});
} finally {
MetricsContext.stop(context);
}
return result;
}复制代码
#executeUpdate()
由于有四个不一样状况的#executeUpdate()
,因此抽象了 Updater 接口,从而达到逻辑重用。// StatementExecutor.java
/** * 执行SQL更新. * @return 更新数量 */
public int executeUpdate() {
return executeUpdate(new Updater() {
@Override
public int executeUpdate(final Statement statement, final String sql) throws SQLException {
return statement.executeUpdate(sql);
}
});
}
private int executeUpdate(final Updater updater) {
Context context = MetricsContext.start("ShardingStatement-executeUpdate");
try {
List<Integer> results = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<Integer>() {
@Override
public Integer execute(final BaseStatementUnit baseStatementUnit) throws Exception {
return updater.executeUpdate(baseStatementUnit.getStatement(), baseStatementUnit.getSqlExecutionUnit().getSql());
}
});
return accumulate(results);
} finally {
MetricsContext.stop(context);
}
}
/** * 计算总的更新数量 * @param results 更新数量数组 * @return 更新数量 */
private int accumulate(final List<Integer> results) {
int result = 0;
for (Integer each : results) {
result += null == each ? 0 : each;
}
return result;
}复制代码
#execute()
由于有四个不一样状况的#execute()
,因此抽象了 Executor 接口,从而达到逻辑重用。/** * 执行SQL请求. * @return true表示执行DQL语句, false表示执行的DML语句 */
public boolean execute() {
return execute(new Executor() {
@Override
public boolean execute(final Statement statement, final String sql) throws SQLException {
return statement.execute(sql);
}
});
}
private boolean execute(final Executor executor) {
Context context = MetricsContext.start("ShardingStatement-execute");
try {
List<Boolean> result = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<Boolean>() {
@Override
public Boolean execute(final BaseStatementUnit baseStatementUnit) throws Exception {
return executor.execute(baseStatementUnit.getStatement(), baseStatementUnit.getSqlExecutionUnit().getSql());
}
});
if (null == result || result.isEmpty() || null == result.get(0)) {
return false;
}
return result.get(0);
} finally {
MetricsContext.stop(context);
}
}复制代码
PreparedStatementExecutor,多线程执行预编译语句对象请求的执行器。比 StatementExecutor 多了 parameters
参数,方法逻辑上基本一致,就不重复分享啦。
BatchPreparedStatementExecutor,多线程执行批量预编译语句对象请求的执行器。
// BatchPreparedStatementExecutor.java
/** * 执行批量SQL. * * @return 执行结果 */
public int[] executeBatch() {
Context context = MetricsContext.start("ShardingPreparedStatement-executeBatch");
try {
return accumulate(executorEngine.executeBatch(sqlType, batchPreparedStatementUnits, parameterSets, new ExecuteCallback<int[]>() {
@Override
public int[] execute(final BaseStatementUnit baseStatementUnit) throws Exception {
return baseStatementUnit.getStatement().executeBatch();
}
}));
} finally {
MetricsContext.stop(context);
}
}
/** * 计算每一个语句的更新数量 * * @param results 每条 SQL 更新数量 * @return 每一个语句的更新数量 */
private int[] accumulate(final List<int[]> results) {
int[] result = new int[parameterSets.size()];
int count = 0;
// 每一个语句按照顺序,读取到其对应的每一个分片SQL影响的行数进行累加
for (BatchPreparedStatementUnit each : batchPreparedStatementUnits) {
for (Map.Entry<Integer, Integer> entry : each.getJdbcAndActualAddBatchCallTimesMap().entrySet()) {
result[entry.getKey()] += null == results.get(count) ? 0 : results.get(count)[entry.getValue()];
}
count++;
}
return result;
}复制代码
眼尖的同窗会发现,为何有 BatchPreparedStatementExecutor,而没有 BatchStatementExecutor 呢?目前 Sharding-JDBC 不支持 Statement 批量操做,只能进行 PreparedStatement 的批操做。
// PreparedStatement 批量操做,不会报错
PreparedStatement ps = conn.prepareStatement(sql)
ps.addBatch();
ps.addBatch();
// Statement 批量操做,会报错
ps.addBatch(sql); // 报错:at com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationStatement.addBatch复制代码
AbstractExecutionEvent,SQL 执行事件抽象接口。
public abstract class AbstractExecutionEvent {
/** * 事件编号 */
private final String id;
/** * 数据源 */
private final String dataSource;
/** * SQL */
private final String sql;
/** * 参数 */
private final List<Object> parameters;
/** * 事件类型 */
private EventExecutionType eventExecutionType;
/** * 异常 */
private Optional<SQLException> exception;
}复制代码
AbstractExecutionEvent 有两个实现子类:
EventExecutionType,事件触发类型。
那究竟有什么用途呢? Sharding-JDBC 使用 Guava(没错,又是它)的 EventBus 实现了事件的发布和订阅。从上文 ExecutorEngine#executeInternal()
咱们能够看到每一个分片 SQL 执行的过程当中会发布相应事件:
怎么订阅事件呢?很是简单,例子以下:
EventBusInstance.getInstance().register(new Runnable() {
@Override
public void run() {
}
@Subscribe // 订阅
@AllowConcurrentEvents // 是否容许并发执行,即线程安全
public void listen(final DMLExecutionEvent event) { // DMLExecutionEvent
System.out.println("DMLExecutionEvent:" + event.getSql() + "\t" + event.getEventExecutionType());
}
@Subscribe // 订阅
@AllowConcurrentEvents // 是否容许并发执行,即线程安全
public void listen2(final DQLExecutionEvent event) { //DQLExecutionEvent
System.out.println("DQLExecutionEvent:" + event.getSql() + "\t" + event.getEventExecutionType());
}
});复制代码
#register()
任何类均可以,并不是必定须要使用 Runnable 类。此处例子单纯由于方便@Subscribe
注解在方法上,实现对事件的订阅@AllowConcurrentEvents
注解在方法上,表示线程安全,容许并发执行#listen()
订阅了 DMLExecutionEvent 事件EventBus#post()
发布事件,同步调用订阅逻辑Sharding-JDBC 正在收集使用公司名单:传送门。
🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门
Sharding-JDBC 也会所以,可以覆盖更多的业务场景。传送门
登记吧,骚年!传送门
BestEffortsDeliveryListener,最大努力送达型事务监听器。
本文暂时暂时不分析其实现,仅仅做为另一个订阅者的例子。咱们会在《柔性事务》进行分享。
public final class BestEffortsDeliveryListener {
@Subscribe
@AllowConcurrentEvents
public void listen(final DMLExecutionEvent event) {
if (!isProcessContinuously()) {
return;
}
SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();
TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
switch (event.getEventExecutionType()) {
case BEFORE_EXECUTE:
//TODO 对于批量执行的SQL须要解析成两层列表
transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),
event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
return;
case EXECUTE_SUCCESS:
transactionLogStorage.remove(event.getId());
return;
case EXECUTE_FAILURE:
boolean deliverySuccess = false;
for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {
if (deliverySuccess) {
return;
}
boolean isNewConnection = false;
Connection conn = null;
PreparedStatement preparedStatement = null;
try {
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.UPDATE);
if (!isValidConnection(conn)) {
bedSoftTransaction.getConnection().release(conn);
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.UPDATE);
isNewConnection = true;
}
preparedStatement = conn.prepareStatement(event.getSql());
//TODO 对于批量事件须要解析成两层列表
for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
}
preparedStatement.executeUpdate();
deliverySuccess = true;
transactionLogStorage.remove(event.getId());
} catch (final SQLException ex) {
log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
} finally {
close(isNewConnection, conn, preparedStatement);
}
}
return;
default:
throw new UnsupportedOperationException(event.getEventExecutionType().toString());
}
}
}复制代码
本文完,但也未完。
跨分片事务问题。例如:
UPDATE t_order SET nickname = ? WHERE user_id = ?复制代码
A 节点 connection.commit()
时,应用忽然挂了!B节点 connection.commit()
还来不及执行。
咱们一块儿去《柔性事务》寻找答案。
道友,分享一波朋友圈可好?