本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归做者(秦凯新)全部,禁止转载,欢迎学习。算法
Spark提供了AppendOnlyMap数据结构来对任务执行结果进行聚合运算。可谓一件利器,为何这样说呢?由于Spark是基于内存运算的大数据计算引擎,即能基于内存作数据存储,也能基于内存进行插入,更新,聚合,排序等操做。由此能够看出,Spark真正把内存的使用技术发挥到了极致。数组
* A simple open hash table optimized for the append-only use case, where keys
* are never removed, but the value for each key may be changed.
*
* This implementation uses quadratic probing with a power-of-2 hash table
* size, which is guaranteed to explore all spaces for each key (see
* http://en.wikipedia.org/wiki/Quadratic_probing).
* The map can support up to `375809638 (0.7 * 2 ^ 29)` elements.
复制代码
提供对null值得缓存缓存
initialCapacity : 主构造函数传入 class AppendOnlyMap[K, V](initialCapacity: Int = 64)数据结构
capacity :容量取值为:nextPowerOf2(initialCapacity),具体就是补零对比,相同为原值,不相同则左移加一位。架构
data : 用于保存key和聚合值得数组。new Array[AnyRef](2 * capacity)app
* Holds keys and values in the same array for memory locality;
* specifically, the order of elements is key0, value0, key1, value1,
* key2, value2, etc.
复制代码
LOAD_FACTOR :默认为0.7框架
growThreshold : (LOAD_FACTOR * capacity).toIntide
growTable :扩容容量为原先的两倍,对key进行re-hash放入新数组。Double the table's size and re-hash everything。函数
update :key和value的更新。三种状况:1:rehash(key.hashCode) & mask对应位置没有值,直接插入。2:对应位置有值且等于原先key,直接更新。3:对应位置有值且 不等于原先key,向后挪动一位。oop
def update(key: K, value: V): Unit = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
incrementSize()
}
nullValue = value
haveNullValue = true
return
}
var pos = rehash(key.hashCode) & mask
var i = 1
while (true) {
val curKey = data(2 * pos)
if (curKey.eq(null)) {
data(2 * pos) = k
data(2 * pos + 1) = value.asInstanceOf[AnyRef]
incrementSize() // Since we added a new key
return
} else if (k.eq(curKey) || k.equals(curKey)) {
data(2 * pos + 1) = value.asInstanceOf[AnyRef]
return
} else {
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
}
复制代码
changeValue :缓存聚合算法,根据指定函数进行值的聚合操做,updateFunc为匿名函数。三种状况:1:rehash(key.hashCode) & mask对应位置没有值,与NULL值聚合。2:对应位置有值且等于原先key,直接聚合。3:对应位置有值,且不等于原先key,向后挪动一位插入。
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
incrementSize()
}
nullValue = updateFunc(haveNullValue, nullValue)
haveNullValue = true
return nullValue
}
var pos = rehash(k.hashCode) & mask
var i = 1
while (true) {
val curKey = data(2 * pos)
if (curKey.eq(null)) {
val newValue = updateFunc(false, null.asInstanceOf[V])
data(2 * pos) = k
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
incrementSize()
return newValue
} else if (k.eq(curKey) || k.equals(curKey)) {
val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
return newValue
} else {
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
null.asInstanceOf[V] // Never reached but needed to keep compiler happy
}
复制代码
destructiveSortedIterator:在不牺牲额外内存和不牺牲AppendOnlyMap的有效性的前提下,对AppendOnlyMap的data数组中的数据进行排序实现。这里使用了优化版的TimSort,英文解释以下:
* return an iterator of the map in sorted order. This provides a way to sort the
* map without using additional memory, at the expense of destroying the validity
* of the map.
复制代码
代码片断以下:
def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
destroyed = true
// Pack KV pairs into the front of the underlying array
var keyIndex, newIndex = 0
while (keyIndex < capacity) {
if (data(2 * keyIndex) != null) {
data(2 * newIndex) = data(2 * keyIndex)
data(2 * newIndex + 1) = data(2 * keyIndex + 1)
newIndex += 1
}
keyIndex += 1
}
assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)
new Iterator[(K, V)] {
var i = 0
var nullValueReady = haveNullValue
def hasNext: Boolean = (i < newIndex || nullValueReady)
def next(): (K, V) = {
if (nullValueReady) {
nullValueReady = false
(null.asInstanceOf[K], nullValue)
} else {
val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
i += 1
item
}
}
}
}
复制代码
SizeTrackingAppendOnlyMap :以自身的大小进行样本采集和大小估算。
An append-only map that keeps track of its estimated size in bytes.
复制代码
SizeTrackingAppendOnlyMap的代码段,好短啊:
private[spark] class SizeTrackingAppendOnlyMap[K, V]
extends AppendOnlyMap[K, V] with SizeTracker
{
override def update(key: K, value: V): Unit = {
super.update(key, value)
super.afterUpdate()
}
override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
val newValue = super.changeValue(key, updateFunc)
super.afterUpdate()
newValue
}
override protected def growTable(): Unit = {
super.growTable()
resetSamples()
}
}
复制代码
PartitionedAppendOnlyMap :增长了partitionedDestructiveSortedIterator,调用了AppendOnlyMap的destructiveSortedIterator对底层数组进行整理和排序后得到迭代器。
* Implementation of WritablePartitionedPairCollection that wraps a map in which the
* keys are tuples of (partition ID, K)
private[spark] class PartitionedAppendOnlyMap[K, V]
extends SizeTrackingAppendOnlyMap[(Int, K), V] with
WritablePartitionedPairCollection[K, V] {
(WritablePartitionedPairCollection定义的接口,未实现)
def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
: Iterator[((Int, K), V)] = {
val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
(AppendOnlyMap内部方法,对底层的data数组进行整理和排序后得到迭代器)
destructiveSortedIterator(comparator)
}
复制代码
def insert(partition: Int, key: K, value: V): Unit = {
update((partition, key), value)
}
}
复制代码
- 1: Spark的Map任务在输出时会根据分区进行计算,并输出数据文件和索引文件。
- 2:Spark的shuffle过程会伴随着缓存,排序,聚合,溢出,合并操做。固然远端拉取Block的操做必不可少。
复制代码
秦凯新 于深圳