摘要: 原创出处 http://www.iocoder.cn/Sharding-JDBC/result-merger/ 「芋道源码」欢迎转载,保留摘要,谢谢!java
本文主要基于 Sharding-JDBC 1.5.0 正式版git
🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:github
- RocketMQ / MyCAT / Sharding-JDBC 全部源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言都将获得认真回复。甚至不知道如何读源码也能够请教噢。
- 新的源码解析文章实时收到通知。每周更新一篇左右。
- 认真的源码交流微信群。
本文分享查询结果归并的源码实现。算法
正如前文《SQL 执行》提到的**“分表分库,须要执行的 SQL 数量从单条变成了多条”,多个SQL执行**结果必然须要进行合并,例如:sql
SELECT * FROM t_order ORDER BY create_time
在各分片排序完后,Sharding-JDBC 获取到结果后,仍然须要再进一步排序。目前有 分页、分组、排序、聚合列、迭代 五种场景须要作进一步处理。固然,若是单分片SQL执行结果是无需合并的。在《SQL 执行》不知不觉已经分享了插入、更新、删除操做的结果合并,因此下面咱们一块儿看看查询结果归并的实现。数据库
Sharding-JDBC 正在收集使用公司名单:传送门。
🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门
Sharding-JDBC 也会所以,可以覆盖更多的业务场景。传送门
登记吧,骚年!传送门数组
MergeEngine,分片结果集归并引擎。缓存
// MergeEngine.java /** * 数据库类型 */ private final DatabaseType databaseType; /** * 结果集集合 */ private final List<ResultSet> resultSets; /** * Select SQL语句对象 */ private final SelectStatement selectStatement; /** * 查询列名与位置映射 */ private final Map<String, Integer> columnLabelIndexMap; public MergeEngine(final DatabaseType databaseType, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException { this.databaseType = databaseType; this.resultSets = resultSets; this.selectStatement = selectStatement; // 得到 查询列名与位置映射 columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0)); } /** * 得到 查询列名与位置映射 * * @param resultSet 结果集 * @return 查询列名与位置映射 * @throws SQLException 当结果集已经关闭 */ private Map<String, Integer> getColumnLabelIndexMap(final ResultSet resultSet) throws SQLException { ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); // 元数据(包含查询列信息) Map<String, Integer> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { result.put(SQLUtil.getExactlyValue(resultSetMetaData.getColumnLabel(i)), i); } return result; }
resultSets
结果集集合,并根据其得到 columnLabelIndexMap
查询列名与位置映射。经过 columnLabelIndexMap
,能够很方便的使用查询列名得到在返回结果记录列( header )的第几列。MergeEngine 的 #merge()
方法做为入口提供查询结果归并功能。微信
/** * 合并结果集. * * @return 归并完毕后的结果集 * @throws SQLException SQL异常 */ public ResultSetMerger merge() throws SQLException { selectStatement.setIndexForItems(columnLabelIndexMap); return decorate(build()); }
#merge()
主体逻辑就两行代码,设置查询列位置信息,并返回合适的归并结果集接口( ResultSetMerger ) 实现。// SelectStatement.java /** * 为选择项设置索引. * * @param columnLabelIndexMap 列标签索引字典 */ public void setIndexForItems(final Map<String, Integer> columnLabelIndexMap) { setIndexForAggregationItem(columnLabelIndexMap); setIndexForOrderItem(columnLabelIndexMap, orderByItems); setIndexForOrderItem(columnLabelIndexMap, groupByItems); }
部分查询列是通过推到出来,在 SQL解析 过程当中,未得到到查询列位置,须要经过该方法进行初始化。对这块不了解的同窗,回头能够看下《SQL 解析(三)之查询SQL》。🙂 如今不用回头,皇冠会掉。数据结构
#setIndexForAggregationItem()
处理 AVG聚合计算列 推导出其对应的 SUM/COUNT 聚合计算列的位置:
private void setIndexForAggregationItem(final Map<String, Integer> columnLabelIndexMap) { for (AggregationSelectItem each : getAggregationSelectItems()) { Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s, please add alias for aggregate selections", each)); each.setIndex(columnLabelIndexMap.get(each.getColumnLabel())); for (AggregationSelectItem derived : each.getDerivedAggregationSelectItems()) { Preconditions.checkState(columnLabelIndexMap.containsKey(derived.getColumnLabel()), String.format("Can't find index: %s", derived)); derived.setIndex(columnLabelIndexMap.get(derived.getColumnLabel())); } } }
#setIndexForOrderItem()
处理 ORDER BY / GROUP BY 列不在查询列 推导出的查询列的位置:
private void setIndexForOrderItem(final Map<String, Integer> columnLabelIndexMap, final List<OrderItem> orderItems) { for (OrderItem each : orderItems) { if (-1 != each.getIndex()) { continue; } Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s", each)); if (columnLabelIndexMap.containsKey(each.getColumnLabel())) { each.setIndex(columnLabelIndexMap.get(each.getColumnLabel())); } } }
ResultSetMerger,归并结果集接口。
咱们先来看看总体的类结构关系:
从 功能 上分红四种:
从 实现方式 上分红三种:
何时该用什么实现方式?
// MergeEngine.java /** * 合并结果集. * * @return 归并完毕后的结果集 * @throws SQLException SQL异常 */ public ResultSetMerger merge() throws SQLException { selectStatement.setIndexForItems(columnLabelIndexMap); return decorate(build()); } private ResultSetMerger build() throws SQLException { if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) { // 分组 或 聚合列 if (selectStatement.isSameGroupByAndOrderByItems()) { return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } else { return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } } if (!selectStatement.getOrderByItems().isEmpty()) { return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType()); } return new IteratorStreamResultSetMerger(resultSets); } private ResultSetMerger decorate(final ResultSetMerger resultSetMerger) throws SQLException { ResultSetMerger result = resultSetMerger; if (null != selectStatement.getLimit()) { result = new LimitDecoratorResultSetMerger(result, selectStatement.getLimit()); } return result; }
AbstractStreamResultSetMerger,流式归并结果集抽象类,提供从当前结果集得到行数据。
public abstract class AbstractStreamResultSetMerger implements ResultSetMerger { /** * 当前结果集 */ private ResultSet currentResultSet; protected ResultSet getCurrentResultSet() throws SQLException { if (null == currentResultSet) { throw new SQLException("Current ResultSet is null, ResultSet perhaps end of next."); } return currentResultSet; } @Override public Object getValue(final int columnIndex, final Class<?> type) throws SQLException { if (Object.class == type) { return getCurrentResultSet().getObject(columnIndex); } if (int.class == type) { return getCurrentResultSet().getInt(columnIndex); } if (String.class == type) { return getCurrentResultSet().getString(columnIndex); } // .... 省略其余数据类型读取相似代码 return getCurrentResultSet().getObject(columnIndex); } }
AbstractMemoryResultSetMerger,内存归并结果集抽象类,提供从内存数据行对象( MemoryResultSetRow ) 得到行数据。
public abstract class AbstractMemoryResultSetMerger implements ResultSetMerger { private final Map<String, Integer> labelAndIndexMap; /** * 内存数据行对象 */ @Setter private MemoryResultSetRow currentResultSetRow; @Override public Object getValue(final int columnIndex, final Class<?> type) throws SQLException { if (Blob.class == type || Clob.class == type || Reader.class == type || InputStream.class == type || SQLXML.class == type) { throw new SQLFeatureNotSupportedException(); } return currentResultSetRow.getCell(columnIndex); } }
public class MemoryResultSetRow { /** * 行数据 */ private final Object[] data; public MemoryResultSetRow(final ResultSet resultSet) throws SQLException { data = load(resultSet); } /** * 加载 ResultSet 当前行数据到内存 * @param resultSet 结果集 * @return 行数据 * @throws SQLException 当结果集关闭 */ private Object[] load(final ResultSet resultSet) throws SQLException { int columnCount = resultSet.getMetaData().getColumnCount(); Object[] result = new Object[columnCount]; for (int i = 0; i < columnCount; i++) { result[i] = resultSet.getObject(i + 1); } return result; } /** * 获取数据. * * @param columnIndex 列索引 * @return 数据 */ public Object getCell(final int columnIndex) { Preconditions.checkArgument(columnIndex > 0 && columnIndex < data.length + 1); return data[columnIndex - 1]; } /** * 设置数据. * * @param columnIndex 列索引 * @param value 值 */ public void setCell(final int columnIndex, final Object value) { Preconditions.checkArgument(columnIndex > 0 && columnIndex < data.length + 1); data[columnIndex - 1] = value; } }
#load()
方法,将当前结果集的一条行数据加载到内存。AbstractDecoratorResultSetMerger,装饰结果集归并抽象类,经过调用其装饰的归并对象 #getValue()
方法得到行数据。
public abstract class AbstractDecoratorResultSetMerger implements ResultSetMerger { /** * 装饰的归并对象 */ private final ResultSetMerger resultSetMerger; @Override public Object getValue(final int columnIndex, final Class<?> type) throws SQLException { return resultSetMerger.getValue(columnIndex, type); } }
OrderByStreamResultSetMerger,基于 Stream 方式排序归并结果集实现。
由于各个分片结果集已经排序完成,使用**《归并算法》**可以充分利用这个优点。
归并操做(merge),也叫归并算法,指的是将两个已经排序的序列合并成一个序列的操做。归并排序算法依赖归并操做。
【迭代法】
- 申请空间,使其大小为两个已经排序序列之和,该空间用来存放合并后的序列
- 设定两个指针,最初位置分别为两个已经排序序列的起始位置
- 比较两个指针所指向的元素,选择相对小的元素放入到合并空间,并移动指针到下一位置
- 重复步骤3直到某一指针到达序列尾
- 将另外一序列剩下的全部元素直接复制到合并序列尾
从定义上看,是否是超级符合咱们这个场景。😈 此时此刻,你是否是捂着胸口,感叹:“大学怎么没好好学数据结构与算法呢”?反正我是捂着了,都是眼泪。
public class OrderByStreamResultSetMerger extends AbstractStreamResultSetMerger { /** * 排序列 */ @Getter(AccessLevel.NONE) private final List<OrderItem> orderByItems; /** * 排序值对象队列 */ private final Queue<OrderByValue> orderByValuesQueue; /** * 默认排序类型 */ private final OrderType nullOrderType; /** * 是否第一个 ResultSet 已经调用 #next() */ private boolean isFirstNext; public OrderByStreamResultSetMerger(final List<ResultSet> resultSets, final List<OrderItem> orderByItems, final OrderType nullOrderType) throws SQLException { this.orderByItems = orderByItems; this.orderByValuesQueue = new PriorityQueue<>(resultSets.size()); this.nullOrderType = nullOrderType; orderResultSetsToQueue(resultSets); isFirstNext = true; } private void orderResultSetsToQueue(final List<ResultSet> resultSets) throws SQLException { for (ResultSet each : resultSets) { OrderByValue orderByValue = new OrderByValue(each, orderByItems, nullOrderType); if (orderByValue.next()) { orderByValuesQueue.offer(orderByValue); } } // 设置当前 ResultSet,这样 #getValue() 能拿到记录 setCurrentResultSet(orderByValuesQueue.isEmpty() ? resultSets.get(0) : orderByValuesQueue.peek().getResultSet()); }
属性 orderByValuesQueue
使用的队列实现是优先级队列( PriorityQueue )。有兴趣的同窗能够看看《JDK源码研究PriorityQueue》,本文不展开讲,不是主角戏份很少。咱们记住几个方法的用途:
#offer()
:增长元素。增长时,会将该元素和已有元素们按照优先级进行排序#peek()
:得到优先级第一的元素#pool()
:得到优先级第一的元素并移除一个 ResultSet 构建一个 OrderByValue 用于排序,即上文归并算法提到的**“空间”**。
public final class OrderByValue implements Comparable<OrderByValue> { /** * 已排序结果集 */ @Getter private final ResultSet resultSet; /** * 排序列 */ private final List<OrderItem> orderByItems; /** * 默认排序类型 */ private final OrderType nullOrderType; /** * 排序列对应的值数组 * 由于一条记录可能有多个排序列,因此是数组 */ private List<Comparable<?>> orderValues; /** * 遍历下一个结果集游标. * * @return 是否有下一个结果集 * @throws SQLException SQL异常 */ public boolean next() throws SQLException { boolean result = resultSet.next(); orderValues = result ? getOrderValues() : Collections.<Comparable<?>>emptyList(); return result; } /** * 得到 排序列对应的值数组 * * @return 排序列对应的值数组 * @throws SQLException 当结果集关闭时 */ private List<Comparable<?>> getOrderValues() throws SQLException { List<Comparable<?>> result = new ArrayList<>(orderByItems.size()); for (OrderItem each : orderByItems) { Object value = resultSet.getObject(each.getIndex()); Preconditions.checkState(null == value || value instanceof Comparable, "Order by value must implements Comparable"); result.add((Comparable<?>) value); } return result; } /** * 对比 {@link #orderValues},即二者的第一条记录 * * @param o 对比 OrderByValue * @return -1 0 1 */ @Override public int compareTo(final OrderByValue o) { for (int i = 0; i < orderByItems.size(); i++) { OrderItem thisOrderBy = orderByItems.get(i); int result = ResultSetUtil.compareTo(orderValues.get(i), o.orderValues.get(i), thisOrderBy.getType(), nullOrderType); if (0 != result) { return result; } } return 0; } }
OrderByValue#next()
方法时,得到其对应结果集排在第一条的记录,经过 #getOrderValues()
计算该记录的排序字段值。这样两个OrderByValue 经过 #compareTo()
方法能够比较两个结果集的第一条记录。if (orderByValue.next()) {
处,调用 OrderByValue#next()
后,添加到 PriorityQueue。所以,orderByValuesQueue.peek().getResultSet()
可以得到多个 ResultSet 中排在第一的。
经过调用 OrderByStreamResultSetMerger#next()
不断得到当前排在第一的记录。#next()
每次调用后,实际作的是当前 ResultSet 的替换,以及当前的 ResultSet 的记录指向下一条。这样提及来可能比较绕,咱们来看一张图:
// OrderByStreamResultSetMerger.java @Override public boolean next() throws SQLException { if (orderByValuesQueue.isEmpty()) { return false; } if (isFirstNext) { isFirstNext = false; return true; } // 移除上一次得到的 ResultSet OrderByValue firstOrderByValue = orderByValuesQueue.poll(); // 若是上一次得到的 ResultSet还有下一条记录,继续添加到 排序值对象队列 if (firstOrderByValue.next()) { orderByValuesQueue.offer(firstOrderByValue); } if (orderByValuesQueue.isEmpty()) { return false; } // 设置当前 ResultSet setCurrentResultSet(orderByValuesQueue.peek().getResultSet()); return true; }
orderByValuesQueue.poll()
移除上一次得到的 ResultSet。为何不能 #setCurrentResultSet()
就移除呢?若是该 ResultSet 里面还存在下一条记录,须要继续参加排序。而判断是否有下一条,须要调用 ResultSet#next()
方法,这会致使 ResultSet 指向了下一条记录。于是 orderByValuesQueue.poll()
调用是后置的。
isFirstNext
变量那的判断看着是否是很“灵异”?由于 #orderResultSetsToQueue()
处设置了第一次的 ResultSet。若是不加这个标记,会致使第一条记录“不见”了。
经过不断的 Queue#poll()
、Queue#offset()
实现排序。巧妙!仿佛 Get 新技能了:
// 移除上一次得到的 ResultSet OrderByValue firstOrderByValue = orderByValuesQueue.poll(); // 若是上一次得到的 ResultSet还有下一条记录,继续添加到 排序值对象队列 if (firstOrderByValue.next()) { orderByValuesQueue.offer(firstOrderByValue); }
在看下,咱们上文 Stream 方式归并的定义:**将数据游标与结果集的游标保持一致,顺序的从结果集中一条条的获取正确的数据。**是否是可以清晰的对上了?!🙂
GroupByStreamResultSetMerger,基于 Stream 方式分组归并结果集实现。 它继承自 OrderByStreamResultSetMerger,在排序的逻辑上,实现分组功能。实现原理也较为简单:
public final class GroupByStreamResultSetMerger extends OrderByStreamResultSetMerger { /** * 查询列名与位置映射 */ private final Map<String, Integer> labelAndIndexMap; /** * Select SQL语句对象 */ private final SelectStatement selectStatement; /** * 当前结果记录 */ private final List<Object> currentRow; /** * 下一条结果记录 GROUP BY 条件 */ private List<?> currentGroupByValues; public GroupByStreamResultSetMerger( final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException { super(resultSets, selectStatement.getOrderByItems(), nullOrderType); this.labelAndIndexMap = labelAndIndexMap; this.selectStatement = selectStatement; currentRow = new ArrayList<>(labelAndIndexMap.size()); // 初始化下一条结果记录 GROUP BY 条件 currentGroupByValues = getOrderByValuesQueue().isEmpty() ? Collections.emptyList() : new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues(); } @Override public Object getValue(final int columnIndex, final Class<?> type) throws SQLException { return currentRow.get(columnIndex - 1); } @Override public Object getValue(final String columnLabel, final Class<?> type) throws SQLException { Preconditions.checkState(labelAndIndexMap.containsKey(columnLabel), String.format("Can't find columnLabel: %s", columnLabel)); return currentRow.get(labelAndIndexMap.get(columnLabel) - 1); } }
currentRow
为当前结果记录,使用 #getValue()
、#getCalendarValue()
方法得到当前结果记录的查询列值。
currentGroupByValues
为下一条结果记录 GROUP BY 条件,经过 GroupByValue 生成:
public final class GroupByValue { /** * 分组条件值数组 */ private final List<?> groupValues; public GroupByValue(final ResultSet resultSet, final List<OrderItem> groupByItems) throws SQLException { groupValues = getGroupByValues(resultSet, groupByItems); } /** * 得到分组条件值数组 * 例如,`GROUP BY user_id, order_status` 返回的某条记录结果为 `userId = 1, order_status = 3`,对应的 `groupValues = [1, 3]` * @param resultSet 结果集(单分片) * @param groupByItems 分组列 * @return 分组条件值数组 * @throws SQLException 当结果集关闭 */ private List<?> getGroupByValues(final ResultSet resultSet, final List<OrderItem> groupByItems) throws SQLException { List<Object> result = new ArrayList<>(groupByItems.size()); for (OrderItem each : groupByItems) { result.add(resultSet.getObject(each.getIndex())); // 从结果集得到每一个分组条件的值 } return result; } }
GroupByStreamResultSetMerger 在建立时,当前结果记录实际未合并,须要先调用 #next()
,在使用 #getValue()
等方法获取值,这个和 OrderByStreamResultSetMerger 不一样,多是个 BUG。
AggregationUnit,归并计算单元接口,有两个接口方法:
#merge()
:归并聚合值#getResult()
:获取计算结果一共有三个实现类:
实现都比较易懂,直接点击连接查看源码,咱们就不浪费篇幅贴代码啦。
咱们先看看大致的调用流程:
😈 看起来代码比较多,逻辑其实比较清晰,对照着顺序图顺序往下读便可。
// GroupByStreamResultSetMerger.java @Override public boolean next() throws SQLException { // 清除当前结果记录 currentRow.clear(); if (getOrderByValuesQueue().isEmpty()) { return false; } // if (isFirstNext()) { super.next(); } // 顺序合并下面相同分组条件的记录 if (aggregateCurrentGroupByRowAndNext()) { // 生成下一条结果记录 GROUP BY 条件 currentGroupByValues = new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues(); } return true; } private boolean aggregateCurrentGroupByRowAndNext() throws SQLException { boolean result = false; // 生成计算单元 Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() { @Override public AggregationUnit apply(final AggregationSelectItem input) { return AggregationUnitFactory.create(input.getType()); } }); // 循环顺序合并下面相同分组条件的记录 while (currentGroupByValues.equals(new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues())) { // 归并聚合值 aggregate(aggregationUnitMap); // 缓存当前记录到结果记录 cacheCurrentRow(); // 获取下一条记录 result = super.next(); if (!result) { break; } } // 设置当前记录的聚合字段结果 setAggregationValueToCurrentRow(aggregationUnitMap); return result; } private void aggregate(final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap) throws SQLException { for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) { List<Comparable<?>> values = new ArrayList<>(2); if (entry.getKey().getDerivedAggregationSelectItems().isEmpty()) { // SUM/COUNT/MAX/MIN 聚合列 values.add(getAggregationValue(entry.getKey())); } else { for (AggregationSelectItem each : entry.getKey().getDerivedAggregationSelectItems()) { // AVG 聚合列 values.add(getAggregationValue(each)); } } entry.getValue().merge(values); } } private void cacheCurrentRow() throws SQLException { for (int i = 0; i < getCurrentResultSet().getMetaData().getColumnCount(); i++) { currentRow.add(getCurrentResultSet().getObject(i + 1)); } } private Comparable<?> getAggregationValue(final AggregationSelectItem aggregationSelectItem) throws SQLException { Object result = getCurrentResultSet().getObject(aggregationSelectItem.getIndex()); Preconditions.checkState(null == result || result instanceof Comparable, "Aggregation value must implements Comparable"); return (Comparable<?>) result; } private void setAggregationValueToCurrentRow(final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap) { for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) { currentRow.set(entry.getKey().getIndex() - 1, entry.getValue().getResult()); // 获取计算结果 } }
GroupByMemoryResultSetMerger,基于 内存 分组归并结果集实现。
区别于 GroupByStreamResultSetMerger,其没法使用每一个分片结果集的有序的特色,只能在内存中合并后,进行整个从新排序。于是,性能和内存都较 GroupByStreamResultSetMerger 会差。
主流程以下:
public final class GroupByMemoryResultSetMerger extends AbstractMemoryResultSetMerger { /** * Select SQL语句对象 */ private final SelectStatement selectStatement; /** * 默认排序类型 */ private final OrderType nullOrderType; /** * 内存结果集 */ private final Iterator<MemoryResultSetRow> memoryResultSetRows; public GroupByMemoryResultSetMerger( final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException { super(labelAndIndexMap); this.selectStatement = selectStatement; this.nullOrderType = nullOrderType; memoryResultSetRows = init(resultSets); } private Iterator<MemoryResultSetRow> init(final List<ResultSet> resultSets) throws SQLException { Map<GroupByValue, MemoryResultSetRow> dataMap = new HashMap<>(1024); // 分组条件值与内存记录映射 Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap = new HashMap<>(1024); // 分组条件值与聚合列映射 // 遍历结果集 for (ResultSet each : resultSets) { while (each.next()) { // 生成分组条件 GroupByValue groupByValue = new GroupByValue(each, selectStatement.getGroupByItems()); // 初始化分组条件到 dataMap、aggregationMap 映射 initForFirstGroupByValue(each, groupByValue, dataMap, aggregationMap); // 归并聚合值 aggregate(each, groupByValue, aggregationMap); } } // 设置聚合列结果到内存记录 setAggregationValueToMemoryRow(dataMap, aggregationMap); // 内存排序 List<MemoryResultSetRow> result = getMemoryResultSetRows(dataMap); // 设置当前 ResultSet,这样 #getValue() 能拿到记录 if (!result.isEmpty()) { setCurrentResultSetRow(result.get(0)); } return result.iterator(); } }
#initForFirstGroupByValue()
初始化分组条件到 dataMap
,aggregationMap
映射中,这样能够调用 #aggregate()
将聚合值归并到 aggregationMap
里的该分组条件。
private void initForFirstGroupByValue(final ResultSet resultSet, final GroupByValue groupByValue, final Map<GroupByValue, MemoryResultSetRow> dataMap, final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) throws SQLException { // 初始化分组条件到 dataMap if (!dataMap.containsKey(groupByValue)) { dataMap.put(groupByValue, new MemoryResultSetRow(resultSet)); } // 初始化分组条件到 aggregationMap if (!aggregationMap.containsKey(groupByValue)) { Map<AggregationSelectItem, AggregationUnit> map = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() { @Override public AggregationUnit apply(final AggregationSelectItem input) { return AggregationUnitFactory.create(input.getType()); } }); aggregationMap.put(groupByValue, map); } }
聚合完每一个分组条件后,将聚合列结果 aggregationMap
合并到 dataMap
。
private void setAggregationValueToMemoryRow(final Map<GroupByValue, MemoryResultSetRow> dataMap, final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) { for (Entry<GroupByValue, MemoryResultSetRow> entry : dataMap.entrySet()) { // 遍 历内存记录 for (AggregationSelectItem each : selectStatement.getAggregationSelectItems()) { // 遍历 每一个聚合列 entry.getValue().setCell(each.getIndex(), aggregationMap.get(entry.getKey()).get(each).getResult()); } } }
调用 #getMemoryResultSetRows()
方法对内存记录进行内存排序。
// GroupByMemoryResultSetMerger.java private List<MemoryResultSetRow> getMemoryResultSetRows(final Map<GroupByValue, MemoryResultSetRow> dataMap) { List<MemoryResultSetRow> result = new ArrayList<>(dataMap.values()); Collections.sort(result, new GroupByRowComparator(selectStatement, nullOrderType)); // 内存排序 return result; } // GroupByRowComparator.java private int compare(final MemoryResultSetRow o1, final MemoryResultSetRow o2, final List<OrderItem> orderItems) { for (OrderItem each : orderItems) { Object orderValue1 = o1.getCell(each.getIndex()); Preconditions.checkState(null == orderValue1 || orderValue1 instanceof Comparable, "Order by value must implements Comparable"); Object orderValue2 = o2.getCell(each.getIndex()); Preconditions.checkState(null == orderValue2 || orderValue2 instanceof Comparable, "Order by value must implements Comparable"); int result = ResultSetUtil.compareTo((Comparable) orderValue1, (Comparable) orderValue2, each.getType(), nullOrderType); if (0 != result) { return result; } } return 0; }
@Override public boolean next() throws SQLException { if (memoryResultSetRows.hasNext()) { setCurrentResultSetRow(memoryResultSetRows.next()); return true; } return false; }
memoryResultSetRows
不断得到下一条记录。IteratorStreamResultSetMerger,基于 Stream 迭代归并结果集实现。
public final class IteratorStreamResultSetMerger extends AbstractStreamResultSetMerger { /** * ResultSet 数组迭代器 */ private final Iterator<ResultSet> resultSets; public IteratorStreamResultSetMerger(final List<ResultSet> resultSets) { this.resultSets = resultSets.iterator(); // 设置当前 ResultSet,这样 #getValue() 能拿到记录 setCurrentResultSet(this.resultSets.next()); } @Override public boolean next() throws SQLException { // 当前 ResultSet 迭代下一条记录 if (getCurrentResultSet().next()) { return true; } if (!resultSets.hasNext()) { return false; } // 得到下一个ResultSet, 设置当前 ResultSet setCurrentResultSet(resultSets.next()); boolean hasNext = getCurrentResultSet().next(); if (hasNext) { return true; } while (!hasNext && resultSets.hasNext()) { setCurrentResultSet(resultSets.next()); hasNext = getCurrentResultSet().next(); } return hasNext; } }
LimitDecoratorResultSetMerger,基于 Decorator 分页结果集归并实现。
public final class LimitDecoratorResultSetMerger extends AbstractDecoratorResultSetMerger { /** * 分页条件 */ private final Limit limit; /** * 是否所有记录都跳过了,即无符合条件记录 */ private final boolean skipAll; /** * 当前已返回行数 */ private int rowNumber; public LimitDecoratorResultSetMerger(final ResultSetMerger resultSetMerger, final Limit limit) throws SQLException { super(resultSetMerger); this.limit = limit; skipAll = skipOffset(); } private boolean skipOffset() throws SQLException { // 跳过 skip 记录 for (int i = 0; i < limit.getOffsetValue(); i++) { if (!getResultSetMerger().next()) { return true; } } // 行数 rowNumber = limit.isRowCountRewriteFlag() ? 0 : limit.getOffsetValue(); return false; } @Override public boolean next() throws SQLException { if (skipAll) { return false; } // 得到下一条记录 if (limit.getRowCountValue() > -1) { return ++rowNumber <= limit.getRowCountValue() && getResultSetMerger().next(); } // 部分db 能够直 offset,不写 limit 行数,例如 oracle return getResultSetMerger().next(); } }
#next()
不断得到下一条记录。诶?应该是有蛮多地方解释的不是很清晰,若是让您阅读误解或是阻塞,很是抱歉。代码读起来比较易懂,使用文字来解释,对表述能力较差的本身,可能就绞尽脑汁,一脸懵逼。
恩,若是能够,还烦请把读起来不太爽的地方告诉我,谢谢。
厚着脸皮,道友,分享一波朋友圈可好?
以下是小礼包,嘿嘿
归并结果集接口 | SQL |
---|---|
OrderByStreamResultSetMerger | SELECT * FROM t_order ORDER BY id |
GroupByStreamResultSetMerger | SELECT uid, AVG(id) FROM t_order GROUP BY uid |
GroupByMemoryResultSetMerger | SELECT uid FROM t_order GROUP BY id ORDER BY id DESC |
IteratorStreamResultSetMerger | SELECT * FROM t_order |
LimitDecoratorResultSetMerger | SELECT * FROM t_order ORDER BY id LIMIT 10 |