1、前言java
Sharding-JDBC 是一款优秀的分库分表框架,从3.0开始,Sharding-JDBC改名为Sharding-Sphere,以前用Sharding-JDBC 2时,对于同库分表而言,sql执行是串行的,由于同数据源的connection只会获取一个,而且对于connection加上了synchronized,因此对于同库分表而言,整个执行过程彻底是串行的。最后为了同库分表能够并行,不得不为同一个库配置多个链接池。Sharding-Sphere 3.0对执行引擎进行了优化,引入内存限制模式和链接限制模式来动态控制并行度。算法
本篇博客主要剖析如下两个问题:sql
一、内存限制模式和链接限制模式是如何控制同一数据源串行和并行的多线程
二、执行引擎优雅的设计框架
2、Sharding-Sphere的两种模式的差异异步
内存限制模式:对于同一数据源,若是有10张分表,那么执行时,会获取10个链接并行async
链接限制模式:对于同一数据源,若是有10张分表,那么执行时,只会获取1个链接串行ide
控制链接模式的算法以下:函数
更多设计的细节能够仔细阅读Sharding-Sphere官网:http://shardingsphere.io/document/current/cn/features/sharding/principle/execute/优化
3、jdbc知识点回顾
对于一个庞大分库分表框架,咱们应该从哪一个入口看进去呢?对于基于JDBC规范实现的分库分表框架,咱们只要理一下jdbc的执行过程,就知道了这个庞大框架的脉络,下面一块儿来回顾jdbc的执行过程。
一、加载驱动:Class.forName()
二、获取链接connection
三、由connection建立Statement或者PreparedStatement
四、用Statement或者PreparedStatement执行SQL获取结果集
五、关闭资源,流程结束
那么要看懂Sharding-Sphere的SQL执行过程,从Statement或者PreparedStatement看进去就够了。
4、源码解析
从PreparedStatement为入口,看进去,主要有以下5个类
一、ShardingPreparedStatement 实现了PreparedStatement接口
二、PreparedStatementExecutor继承于AbstractStatementExecutor,是SQL的执行器
三、SQLExecutePrepareTemplate用于获取分片执行单元,以及肯定链接模式(内存限制模式和链接限制模式)
四、ShardingExecuteEngine是执行引擎,提供一个多线程的执行环境,本质上而言,ShardingExecuteEngine不作任何业务相关的事情,只是提供多线程执行环境,执行传入的回调函数(很是巧妙的设计)
类的关系以下,一目了然:
接下来,咱们从ShardingPreparedStatement的executeQuery方法看进去,代码以下:
@Override public ResultSet executeQuery() throws SQLException { ResultSet result; try { clearPrevious(); sqlRoute(); initPreparedStatementExecutor(); MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getShardingContext().getShardingRule(), preparedStatementExecutor.executeQuery(), routeResult.getSqlStatement(), connection.getShardingContext().getMetaData().getTable()); result = new ShardingResultSet(preparedStatementExecutor.getResultSets(), mergeEngine.merge(), this); } finally { clearBatch(); } currentResultSet = result; return result; }
其中,initPreparedStatementExecutor用于初始化preparedStatementExecutor,初始化作了以下操做,根据路由单元获取statement执行单元
public void init(final SQLRouteResult routeResult) throws SQLException { setSqlType(routeResult.getSqlStatement().getType()); getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits())); cacheStatements(); } private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException { return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() { @Override public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize); } @Override public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException { return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode); } }); }
那么获取statement执行单元时,是如何肯定链接模式的呢?getSqlExecutePrepareTemplate().getExecuteUnitGroups点进去看,SQLExecutePrepareTemplate作了什么操做?
private List<ShardingExecuteGroup<StatementExecuteUnit>> getSQLExecuteGroups( final String dataSourceName, final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException { List<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>(); int desiredPartitionSize = Math.max(sqlUnits.size() / maxConnectionsSizePerQuery, 1); List<List<SQLUnit>> sqlUnitGroups = Lists.partition(sqlUnits, desiredPartitionSize); ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY; List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitGroups.size()); int count = 0; for (List<SQLUnit> each : sqlUnitGroups) { result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback)); } return result; }
上面这段代码就是文章开头的公式,经过 maxConnectionsSizePerQuery来控制链接模式,当maxConnectionsSizePerQuery小于本数据源执行单元时,选择链接限制模式,反之,则选择内存限制模式
当preparedStatementExecutor被初始化完成,即可进行查询
public List<QueryResult> executeQuery() throws SQLException { final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), getSqlType(), isExceptionThrown) { @Override protected QueryResult executeSQL(final StatementExecuteUnit statementExecuteUnit) throws SQLException { return getQueryResult(statementExecuteUnit); } }; return executeCallback(executeCallback); }
这里,callback是一个很是巧妙的设计,executeSQL便是须要执行的sql,这里能够根据须要去灵活实现,例如select、update等等操做,而executeCallback(executeCallback)即是真正的执行者,executeCallback调用sqlExecuteTemplate的executeGroup,把执行分组传入ShardingExecuteEngine执行引擎。
@SuppressWarnings("unchecked") protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException { return sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback); } public final class SQLExecuteTemplate { private final ShardingExecuteEngine executeEngine; /** * Execute group. * * @param sqlExecuteGroups SQL execute groups * @param callback SQL execute callback * @param <T> class type of return value * @return execute result * @throws SQLException SQL exception */ public <T> List<T> executeGroup(final Collection<ShardingExecuteGroup<? extends StatementExecuteUnit>> sqlExecuteGroups, final SQLExecuteCallback<T> callback) throws SQLException { return executeGroup(sqlExecuteGroups, null, callback); } /** * Execute group. * * @param sqlExecuteGroups SQL execute groups * @param firstCallback first SQL execute callback * @param callback SQL execute callback * @param <T> class type of return value * @return execute result * @throws SQLException SQL exception */ @SuppressWarnings("unchecked") public <T> List<T> executeGroup(final Collection<ShardingExecuteGroup<? extends StatementExecuteUnit>> sqlExecuteGroups, final SQLExecuteCallback<T> firstCallback, final SQLExecuteCallback<T> callback) throws SQLException { try { return executeEngine.groupExecute((Collection) sqlExecuteGroups, firstCallback, callback); } catch (final SQLException ex) { ExecutorExceptionHandler.handleException(ex); return Collections.emptyList(); } } }
接下来,精彩的时刻到了,执行引擎作了哪些事情呢?请继续往下看。
public <I, O> List<O> groupExecute( final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException { if (inputGroups.isEmpty()) { return Collections.emptyList(); } Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator(); ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next(); Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback); return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures); } private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final List<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> callback) { Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>(); for (ShardingExecuteGroup<I> each : inputGroups) { result.add(asyncGroupExecute(each, callback)); } return result; } private <I, O> ListenableFuture<Collection<O>> asyncGroupExecute(final ShardingExecuteGroup<I> inputGroup, final ShardingGroupExecuteCallback<I, O> callback) { final Map<String, Object> dataMap = ShardingExecuteDataMap.getDataMap(); return executorService.submit(new Callable<Collection<O>>() { @Override public Collection<O> call() throws SQLException { ShardingExecuteDataMap.setDataMap(dataMap); return callback.execute(inputGroup.getInputs(), false); } }); } private <I, O> Collection<O> syncGroupExecute(final ShardingExecuteGroup<I> executeGroup, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException { return callback.execute(executeGroup.getInputs(), true); }
sqlExecuteTemplate调用了ShardingExecuteEngine的groupExecute,groupExecute分为两个主要方法,asyncGroupExecute异步执行方法和syncGroupExecute同步执行方法,乍一看,不是多线程吗?怎么出现了一个同步,这里的多线程运用很是巧妙,先从执行分组中取出第一个元素firstInputs,剩下的丢进asyncGroupExecute的线程池,第一个任务让当前线程执行,不浪费一个线程。
这里执行引擎真正执行的是传入的回调函数,那么这个回调源于哪里呢?咱们再回头去看看PreparedStatementExecutor的executeQuery方法,回调函数由此建立。
public List<QueryResult> executeQuery() throws SQLException { final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), getSqlType(), isExceptionThrown) { @Override protected QueryResult executeSQL(final StatementExecuteUnit statementExecuteUnit) throws SQLException { return getQueryResult(statementExecuteUnit); } }; return executeCallback(executeCallback); }
全部的逻辑一鼓作气,易于扩展,设计之巧妙,可贵的好代码。
最后,Sharding-Sphere是一个很是优秀的分库分表框架。
---------------------------------------------------------------------------------------------------------
快乐源于分享。
此博客乃做者原创, 转载请注明出处