说在前面sql
本文转自“天河聊技术”微信公众号数据库
sql路由这里的内容比较多,包含单表路由或者绑定表路由、多库多表路由、笛卡尔积路由,分三部分来介绍,今天先介绍单表或绑定表路由。微信
sql路由源码解析app
com.dangdang.ddframe.rdb.sharding.routing.PreparedStatementRoutingEngine、com.dangdang.ddframe.rdb.sharding.routing.StatementRoutingEngine两个sql路由引擎类,预编译的用的比较多,咱们以预编译的Statement的引擎类来跟踪下sharding-jdbc是对sql怎么进行路由的。ide
上层sql执行器接收到逻辑sql后再进行sql路由的时候会建立预编译statement对象的路由器,所以会调用其构造器性能
/** * 预解析的SQL路由器. * * @author zhangliang */ public final class PreparedStatementRoutingEngine { // 逻辑sql private final String logicSQL; //sql路由器 private final SQLRouter sqlRouter; // sql语句对象 private SQLStatement sqlStatement; public PreparedStatementRoutingEngine(final String logicSQL, final ShardingContext shardingContext) { this.logicSQL = logicSQL; sqlRouter = SQLRouterFactory.createSQLRouter(shardingContext); }
/** * 路由引擎工厂. * * @author zhangiang */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class SQLRouterFactory { /** * 建立SQL路由器. * * @param shardingContext 数据源运行期上下文 * @return SQL路由器 */ // 这里是静态工厂方法实现 public static SQLRouter createSQLRouter(final ShardingContext shardingContext) { return HintManagerHolder.isDatabaseShardingOnly() ? new DatabaseHintSQLRouter(shardingContext) : new ParsingSQLRouter(shardingContext); } }
接下来会建立ParsingSQLRouter对象ui
** * 须要解析的SQL路由器. * * @author zhangiang */ public final class ParsingSQLRouter implements SQLRouter { // 分库分表配置对象 private final ShardingRule shardingRule; // 支持的数据库类型 private final DatabaseType databaseType; // 是否要展现sql private final boolean showSQL; private final List<Number> generatedKeys; // 上面这些属性值都是存储在分片上下文中 public ParsingSQLRouter(final ShardingContext shardingContext) { shardingRule = shardingContext.getShardingRule(); databaseType = shardingContext.getDatabaseType(); showSQL = shardingContext.isShowSQL(); generatedKeys = new LinkedList<>(); }
这个方法是sql路由的入口方法this
/** * SQL路由. * 当第一次路由时进行SQL解析,以后的路由复用第一次的解析结果. * * @param parameters SQL中的参数 * @return 路由结果 */ public SQLRouteResult route(final List<Object> parameters) {//sql路由业务方法 if (null == sqlStatement) { sqlStatement = sqlRouter.parse(logicSQL, parameters.size()); } return sqlRouter.route(logicSQL, parameters, sqlStatement); }
进入到这个parse方法对象
sqlStatement = sqlRouter.parse(logicSQL, parameters.size());
@Override public SQLStatement parse(final String logicSQL, final int parametersSize) { // 建立sql解析引擎 SQLParsingEngine parsingEngine = new SQLParsingEngine(databaseType, logicSQL, shardingRule); // 开启度量上下文 Context context = MetricsContext.start("Parse SQL"); // sql解析器解析得到sql语句对象 SQLStatement result = parsingEngine.parse(); if (result instanceof InsertStatement) { ((InsertStatement) result).appendGenerateKeyToken(shardingRule, parametersSize); } MetricsContext.stop(context); return result; }
进入下面的sql路由方法,返回路由结果ip
return sqlRouter.route(logicSQL, parameters, sqlStatement);
private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) { Collection<String> tableNames = sqlStatement.getTables().getTableNames(); RoutingEngine routingEngine; // 若是表集合是1,或者是绑定表路由就走简单路由规则 if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames)) { routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);//单表路由 } else { // TODO 可配置是否执行笛卡尔积 routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement); } return routingEngine.route();//tianhe TODO 笛卡尔积 }
建立简单路由引擎 routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);//单表路由
/** * 简单路由引擎. * * @author zhangliang */ @RequiredArgsConstructor public final class SimpleRoutingEngine implements RoutingEngine { // 分库分表配置对象 private final ShardingRule shardingRule; // sql参数 private final List<Object> parameters; // 逻辑表名 private final String logicTableName; // sql语句对象 private final SQLStatement sqlStatement;
不是单表路由,就走多库多表路由引擎,建立多库多表路由对象
routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
/** * 混合多库表路由引擎. * * @author gaohongtao * @author zhangliang */ @RequiredArgsConstructor @Slf4j public final class ComplexRoutingEngine implements RoutingEngine { private final ShardingRule shardingRule; private final List<Object> parameters; private final Collection<String> logicTables; private final SQLStatement sqlStatement;
return routingEngine.route();//tianhe TODO 笛卡尔积
这里是路由逻辑,这里有三种实现,一种是单表或者绑定表路由,一种是多库多表路由,一种是笛卡尔积路由
单表或者绑定表路由
@Override public RoutingResult route() { // 根据逻辑表名得到表规则配置对象 TableRule tableRule = shardingRule.getTableRule(logicTableName); // 根据表规则配置对象得到数据源集合 Collection<String> routedDataSources = routeDataSources(tableRule); Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size()); for (String each : routedDataSources) { routedMap.put(each, routeTables(tableRule, each)); } return generateRoutingResult(tableRule, routedMap); }
根据逻辑表名得到表规则配置对象 TableRule tableRule = shardingRule.getTableRule(logicTableName);
/** * 根据逻辑表名称查找分片规则. * * @param logicTableName 逻辑表名称 * @return 该逻辑表的分片规则 */ public TableRule getTableRule(final String logicTableName) { // 根据逻辑表返回表规则配置对象 Optional<TableRule> tableRule = tryFindTableRule(logicTableName); if (tableRule.isPresent()) { return tableRule.get(); } // 若是默认数据源不为空就根据默认数据源建立表配置规则对象 if (dataSourceRule.getDefaultDataSource().isPresent()) { return createTableRuleWithDefaultDataSource(logicTableName, dataSourceRule); } throw new ShardingJdbcException("Cannot find table rule and default data source with logic table: '%s'", logicTableName); }
// 若是默认数据源不为空就根据默认数据源建立表配置规则对象 if (dataSourceRule.getDefaultDataSource().isPresent()) { return createTableRuleWithDefaultDataSource(logicTableName, dataSourceRule); }
// 根据默认数据源建立部分库数据分片策略,数据表不分表分片策略对象,并建立表配置规则对象进行装载 private TableRule createTableRuleWithDefaultDataSource(final String logicTableName, final DataSourceRule defaultDataSourceRule) { Map<String, DataSource> defaultDataSourceMap = new HashMap<>(1); defaultDataSourceMap.put(defaultDataSourceRule.getDefaultDataSourceName(), defaultDataSourceRule.getDefaultDataSource().get()); return TableRule.builder(logicTableName) .dataSourceRule(new DataSourceRule(defaultDataSourceMap)) .databaseShardingStrategy(new DatabaseShardingStrategy("", new NoneDatabaseShardingAlgorithm())) .tableShardingStrategy(new TableShardingStrategy("", new NoneTableShardingAlgorithm())).build(); }
返回到这里
@Override public RoutingResult route() { // 根据逻辑表名得到表规则配置对象 TableRule tableRule = shardingRule.getTableRule(logicTableName); // 根据表规则配置对象得到数据源集合 Collection<String> routedDataSources = routeDataSources(tableRule); Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size()); for (String each : routedDataSources) { routedMap.put(each, routeTables(tableRule, each)); } return generateRoutingResult(tableRule, routedMap); }
// 根据表规则配置对象得到数据源集合 Collection<String> routedDataSources = routeDataSources(tableRule);
根据分片列获取分片值
getShardingValues(strategy.getShardingColumns());
// 根据真实的数据源名称和分片值计算静态分片 Collection<String> result = strategy.doStaticSharding(tableRule.getActualDatasourceNames(), shardingValues);
/** * 计算静态分片. * * @param availableTargetNames 全部的可用分片资源集合 * @param shardingValues 分片值集合 * @return 分库后指向的数据源名称集合 */ public Collection<String> doStaticSharding(final Collection<String> availableTargetNames, final Collection<ShardingValue<?>> shardingValues) { Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); // 若是没有解析到传入的数据源分片值,要走全库路由 if (shardingValues.isEmpty()) { result.addAll(availableTargetNames); } else { // 若是传入分片值,根据分片值去获取具体的数据源 result.addAll(doSharding(shardingValues, availableTargetNames)); } return result; }
注意上面的数据库路由的默认实现,若是不传入数据库分片值会走全库路由的,数据量大的话是会影响性能的,因此建议必需要传入分片值,阿里的TDDL这里的实现是直接报错的。
// 若是传入分片值,根据分片值去获取具体的数据源 result.addAll(doSharding(shardingValues, availableTargetNames));
private Collection<String> doSharding(final Collection<ShardingValue<?>> shardingValues, final Collection<String> availableTargetNames) { // 若是没分片 if (shardingAlgorithm instanceof NoneKeyShardingAlgorithm) { return Collections.singletonList(((NoneKeyShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues.iterator().next())); } // 若是按一个分片值分片 if (shardingAlgorithm instanceof SingleKeyShardingAlgorithm) { SingleKeyShardingAlgorithm<?> singleKeyShardingAlgorithm = (SingleKeyShardingAlgorithm<?>) shardingAlgorithm; ShardingValue shardingValue = shardingValues.iterator().next(); switch (shardingValue.getType()) { case SINGLE: // = 元算符分片 return Collections.singletonList(singleKeyShardingAlgorithm.doEqualSharding(availableTargetNames, shardingValue)); case LIST: // in运算符分片 return singleKeyShardingAlgorithm.doInSharding(availableTargetNames, shardingValue); case RANGE: // between运算符分片 return singleKeyShardingAlgorithm.doBetweenSharding(availableTargetNames, shardingValue); default: // 如今只支持这三种运算符分片 throw new UnsupportedOperationException(shardingValue.getType().getClass().getName()); } } // 若是是多个分片值 if (shardingAlgorithm instanceof MultipleKeysShardingAlgorithm) { return ((MultipleKeysShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues); } // 其余方式的分片不支持 throw new UnsupportedOperationException(shardingAlgorithm.getClass().getName()); }
返回到这里
@Override public RoutingResult route() { // 根据逻辑表名得到表规则配置对象 TableRule tableRule = shardingRule.getTableRule(logicTableName); // 根据表规则配置对象得到数据源集合 Collection<String> routedDataSources = routeDataSources(tableRule); Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size()); for (String each : routedDataSources) { routedMap.put(each, routeTables(tableRule, each)); } return generateRoutingResult(tableRule, routedMap); }
根据数据源和表配置规则组装路由map
routedMap.put(each, routeTables(tableRule, each));
下面这个方法是获取路由的表的集合
private Collection<String> routeTables(final TableRule tableRule, final String routedDataSource) { // 获取表分片策略 TableShardingStrategy strategy = shardingRule.getTableShardingStrategy(tableRule); // 获取分片值 List<ShardingValue<?>> shardingValues = HintManagerHolder.isUseShardingHint() ? getTableShardingValuesFromHint(strategy.getShardingColumns()) : getShardingValues(strategy.getShardingColumns());//doDynamicSharding // 若是是动态分片走动态分片,若是是静态分片走静态分片 Collection<String> result = tableRule.isDynamic() ? strategy.doDynamicSharding(shardingValues) : strategy.doStaticSharding(tableRule.getActualTableNames(routedDataSource), shardingValues); Preconditions.checkState(!result.isEmpty(), "no table route info"); return result; }
/** * 计算动态分片. * * @param shardingValues 分片值集合 * @return 分库后指向的分片资源集合 */ public Collection<String> doDynamicSharding(final Collection<ShardingValue<?>> shardingValues) {//doDynamicSharding Preconditions.checkState(!shardingValues.isEmpty(), "Dynamic table should contain sharding value."); Collection<String> availableTargetNames = Collections.emptyList(); Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); result.addAll(doSharding(shardingValues, availableTargetNames)); return result; }
返回到这里
@Override public RoutingResult route() { // 根据逻辑表名得到表规则配置对象 TableRule tableRule = shardingRule.getTableRule(logicTableName); // 根据表规则配置对象得到数据源集合 Collection<String> routedDataSources = routeDataSources(tableRule); Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size()); for (String each : routedDataSources) { routedMap.put(each, routeTables(tableRule, each)); } return generateRoutingResult(tableRule, routedMap); }
// 生成路由结果 return generateRoutingResult(tableRule, routedMap);
private RoutingResult generateRoutingResult(final TableRule tableRule, final Map<String, Collection<String>> routedMap) { RoutingResult result = new RoutingResult(); // 遍历roadMap,roadMap里面key值存储的是数据源名称,value值是物理数据表集合 for (Entry<String, Collection<String>> entry : routedMap.entrySet()) { // 获取最下数据单元,每一个数据单元是一个DataNode Collection<DataNode> dataNodes = tableRule.getActualDataNodes(entry.getKey(), entry.getValue()); for (DataNode each : dataNodes) { // 组装数据表单元装载到路由结果中 result.getTableUnits().getTableUnits().add(new TableUnit(each.getDataSourceName(), logicTableName, each.getTableName())); } } return result; }
数据模型
/** * SQL路由结果. * * @author gaohongtao * @author zhangliang */ @RequiredArgsConstructor @Getter public final class SQLRouteResult { // sql语句对象 private final SQLStatement sqlStatement; // 最小sql执行单元集合 private final Set<SQLExecutionUnit> executionUnits = new LinkedHashSet<>(); private final List<Number> generatedKeys = new LinkedList<>(); }
/** * SQL最小执行单元. * * @author gaohongtao */ @RequiredArgsConstructor @Getter @EqualsAndHashCode @ToString public final class SQLExecutionUnit { // 具体的数据源 private final String dataSource; // 具体要执行的物理sql语句 private final String sql; }
/** * 路由表单元. * * @author zhangliang */ @RequiredArgsConstructor @Getter @EqualsAndHashCode @ToString public final class TableUnit { // 数据源名 private final String dataSourceName; // 逻辑表名 private final String logicTableName; // 物理表名 private final String actualTableName; }
/** * 路由表单元集合. * * @author zhangliang */ @Getter @ToString public final class TableUnits { // 路由表单元集合 private final List<TableUnit> tableUnits = new LinkedList<>();
/** * 路由结果. * * @author zhangliang */ @Getter public class RoutingResult { // 表路由单元集合 private final TableUnits tableUnits = new TableUnits();
/** * 路由表单元. * * @author zhangliang */ @RequiredArgsConstructor @Getter @EqualsAndHashCode @ToString public final class TableUnit { // 数据源名 private final String dataSourceName; // 逻辑表名 private final String logicTableName; // 物理表名 private final String actualTableName; }
/** * 分库分表数据单元. * * @author zhangliang */ @RequiredArgsConstructor @Getter @EqualsAndHashCode @ToString public class DataNode { private static final String DELIMITER = "."; private final String dataSourceName;//数据库名 private final String tableName;//表名
说到最后
以上介绍,仅供参考。