spark RDD

RDD

RDD是Resilient Distributed Dataset的英文缩写,是spark的基本数据抽象,表明着一个不可变的、多分区的、可并行操做的元素集合。java

RDD有5个主要属性:ide

  • 分区列表 (partition list)
  • 计算某个分区函数(compute)
  • 依赖列表 (dependency list)
  • kv类型RDD的分区器(可选的)
  • 计算某个分区最优位置的函数(可选的)
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {
  
  def compute(split: Partition, context: TaskContext): Iterator[T]

  protected def getPartitions: Array[Partition]

  protected def getDependencies: Seq[Dependency[_]] = deps

  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  @transient val partitioner: Option[Partitioner] = None
  
  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context, List(new OneToOneDependency(oneParent)))

可见血统关系是经过deps依赖列表来保存的,若是不指定依赖列表则默认建立一对一的依赖关系OneToOneDependency函数

函数注入

RDD类中定义了一些通用的转换函数如map``fliter``union等同时RDD的伴生对象中经过隐式转换的方式定义了一些额外的转换函数,好比kv类型的RDD一些转换函数:groupByKey cogroupthis

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
    new PairRDDFunctions(rdd)
  }

依赖关系

Dependency抽象类来描述依赖关系,有两种子类:spa

  • NarrowDependency 这也就是常说的窄依赖,子RDD的每个分区依赖固定个父RDD的分区。这种依赖关系是固定的能够经过def getParents(partitionId: Int): Seq[Int]函数计算出子RDD的某个分区依赖的父RDD的分区。
  • ShuffleDependency 也就是常说的宽依赖,这种依赖关系会触发shuffle,也是spark任务划分stage的标准。

具体实现

  • MapPartitionsRDD 不建立新的分区列表,采用一对一的依赖关系,每一个分区的计算就是在对应父分区上运用传入的转换函数。
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context)
  • ParallelCollectionRDD 做为source类型的RDD,依赖列表为空,会根据传入的数据和并行度计算新的分区列表,用ParallelCollectionPartition对象保存每一个分区的数据。
override def getPartitions: Array[Partition] = {
    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
    slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
  }

  override def compute(s: Partition, context: TaskContext): Iterator[T] = {
    new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
  }

  override def getPreferredLocations(s: Partition): Seq[String] = {
    locationPrefs.getOrElse(s.index, Nil)
  }
  • 把握好5个主要属性很容易实现自定义的RDD
相关文章
相关标签/搜索