RDD是Resilient Distributed Dataset
的英文缩写,是spark的基本数据抽象,表明着一个不可变的、多分区的、可并行操做的元素集合。java
RDD有5个主要属性:ide
abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { def compute(split: Partition, context: TaskContext): Iterator[T] protected def getPartitions: Array[Partition] protected def getDependencies: Seq[Dependency[_]] = deps protected def getPreferredLocations(split: Partition): Seq[String] = Nil @transient val partitioner: Option[Partitioner] = None def this(@transient oneParent: RDD[_]) = this(oneParent.context, List(new OneToOneDependency(oneParent)))
可见血统关系是经过deps
依赖列表来保存的,若是不指定依赖列表则默认建立一对一的依赖关系OneToOneDependency
函数
RDD类中定义了一些通用的转换函数如map``fliter``union
等同时RDD的伴生对象中经过隐式转换的方式定义了一些额外的转换函数,好比kv类型的RDD一些转换函数:groupByKey
cogroup
等this
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = { new PairRDDFunctions(rdd) }
Dependency
抽象类来描述依赖关系,有两种子类:spa
def getParents(partitionId: Int): Seq[Int]
函数计算出子RDD的某个分区依赖的父RDD的分区。override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None override def getPartitions: Array[Partition] = firstParent[T].partitions override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context)
ParallelCollectionPartition
对象保存每一个分区的数据。override def getPartitions: Array[Partition] = { val slices = ParallelCollectionRDD.slice(data, numSlices).toArray slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray } override def compute(s: Partition, context: TaskContext): Iterator[T] = { new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator) } override def getPreferredLocations(s: Partition): Seq[String] = { locationPrefs.getOrElse(s.index, Nil) }