Spark核心编程的三大数据结构 之 RDD基础编程 (一)

这是我参与更文挑战的第1天,活动详情查看:更文挑战git

1 RDD建立

在Spark中建立RDD的建立方式能够分为四种:github

  • 从集合(内存)中建立RDD
    • 从集合中建立RDD,Spark主要提供了两个方法:parallelize和makeRDD
    • makeRDD底层代码调用的parallelize,因此两个方法同样
//内存建立RDD
    def main(args: Array[String]): Unit = {

        val sc: SparkContext = new SparkContext(
            new SparkConf()
                .setMaster("local")
                .setAppName("Rdd-Mem")
        )

        val rdd1: RDD[Int] = sc.makeRDD(
            List(1, 2, 4, 5, 6)
        )
        val rdd2: RDD[Int] = sc.parallelize(
            Array(1, 2, 3, 4, 5, 6)
        )

        rdd1.collect().foreach(println)
        rdd2.collect().foreach(println)
    }

复制代码
//makeRDD源码
  def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
      parallelize(seq, numSlices)
  }
复制代码
  • 从外部存储(文件)建立RDD
    • 由外部存储系统的数据集建立RDD包括:本地的文件系统,全部Hadoop支持的数据集,好比HDFS、HBase等。
def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf()
                .setMaster("local")
                .setAppName("Rdd-File")
        )

        val rdd1: RDD[String] = sc.textFile("data")
        //wholeTextFiles Tuple第一个数据为文件全路径 Tuple第二个为每行数据
        val rdd2: RDD[(String, String)] = sc.wholeTextFiles("data/word*.txt")

        rdd1.collect().foreach(println)
        rdd2.collect().foreach(println)

    }
复制代码
  • 从其余RDD建立
    • 主要是经过一个RDD运算完后,再产生新的RDD。详情请参考后续章节
  • 直接建立RDD(new)
    • 使用new的方式直接构造RDD,通常由Spark框架自身使用。

2 RDD并行度与分区

默认状况下,Spark能够将一个做业切分多个任务后,发送给Executor节点并行计算,而可以并行计算的任务数量咱们称之为并行度。这个数量能够在构建RDD时指定。记住,这里的并行执行的任务数量,并非指的切分任务的数量,不要混淆了。apache

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sc = new SparkContext(sparkConf)
val dataRDD: RDD[Int] =
    sc.makeRDD(
        List(1,2,3,4),
        4)
val fileRDD: RDD[String] =
    sc.textFile(
        "input",
        2)
fileRDD.collect().foreach(println)
sparkContext.stop()
复制代码
  • 读取内存数据时,数据能够按照并行度的设定进行数据的分区操做,数据分区规则的Spark核心源码以下:
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
    if (numSlices < 1) {
      throw new IllegalArgumentException("Positive number of partitions required")
    }
    // Sequences need to be sliced at the same set of index positions for operations
    // like RDD.zip() to behave as expected
    //计算每一个分区开始位置和结束位置
    //[1,2,3,4,5] 分红两个分区后会成为 [1,2][3,4,5]
    def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
      (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
      }
    }
    //下面为具体的拆分代码
    seq match {
      case r: Range =>
        positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
          // If the range is inclusive, use inclusive range for the last slice
          if (r.isInclusive && index == numSlices - 1) {
            new Range.Inclusive(r.start + start * r.step, r.end, r.step)
          }
          else {
            new Range(r.start + start * r.step, r.start + end * r.step, r.step)
          }
        }.toSeq.asInstanceOf[Seq[Seq[T]]]
      case nr: NumericRange[_] =>
        // For ranges of Long, Double, BigInteger, etc
        val slices = new ArrayBuffer[Seq[T]](numSlices)
        var r = nr
        for ((start, end) <- positions(nr.length, numSlices)) {
          val sliceSize = end - start
          slices += r.take(sliceSize).asInstanceOf[Seq[T]]
          r = r.drop(sliceSize)
        }
        slices
      case _ =>
        val array = seq.toArray // To prevent O(n^2) operations for List etc
        positions(array.length, numSlices).map { case (start, end) =>
            array.slice(start, end).toSeq
        }.toSeq
    }
  }

复制代码
  • 读取文件数据时,数据是按照Hadoop文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差别,具体Spark核心源码以下
override def getPartitions: Array[Partition] = {
    val jobConf = getJobConf()
    SparkHadoopUtil.get.addCredentials(jobConf)
    try {
      // 分区
      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
 ..........

复制代码
// 具体如何分区
public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {

    long totalSize = 0;
    for (FileStatus file: files) {
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
      
    ...
    
    for (FileStatus file: files) {
    
        ...
    
    if (isSplitable(fs, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);

          ...

  }
  protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
  }

复制代码

3.RDD序列化

  • 闭包检查

从计算的角度, 算子之外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。那么在scala的函数式编程中,就会致使算子内常常会用到算子外的数据,这样就造成了闭包的效果,若是使用的算子外的数据没法序列化,就意味着没法传值给Executor端执行,就会发生错误,因此须要在执行任务计算前,检测闭包内的对象是否能够进行序列化,这个操做咱们称之为闭包检测。Scala2.12版本后闭包编译方式发生了改变编程

  • 序列化方法和属性

从计算的角度, 算子之外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行,代码以下:数组

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("测试序列化")
        )

        val dept1 = new Dept(1, "研发部")
        val dept0 = new Dept(0, "未知")

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 1),
            ("b", 4), ("F", 5), ("K", 6)
        ))

        rdd.map(t => {
            t._2 match {
                case 1 => (t._1, dept1)
                case _ => (t._1, dept0)
            }
        }).collect() foreach println


    }
    
    class Dept(var id: Int, var name: String) extends Serializable {
        override def toString: String = id + "\t" + name
    }
复制代码
//校验的代码
 private def clean(
      func: AnyRef,
      checkSerializable: Boolean,
      cleanTransitively: Boolean,
      accessedFields: Map[Class[_], Set[String]]): Unit = {
    ..............
    // 校验序列化
    if (checkSerializable) {
      ensureSerializable(func)
    }
  }
  private def ensureSerializable(func: AnyRef): Unit = {
    try {
      if (SparkEnv.get != null) {
        SparkEnv.get.closureSerializer.newInstance().serialize(func)
      }
    } catch {
      case ex: Exception => throw new SparkException("Task not serializable", ex)
    }
  }

//不实现序列号接口会跑出以下异常
//Exception in thread "main" org.apache.spark.SparkException: Task not serializable
// at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
// at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
// at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)

复制代码
  • Kryo序列化框架

项目地址: github.com/EsotericSof…
Java的序列化可以序列化任何的类。可是比较重(字节多),序列化后,对象的提交也比较大。Spark出于性能的考虑,Spark2.0开始支持另一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化
注意:即便使用Kryo序列化,也要继承Serializable接口。markdown

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf()
                .setMaster("local[*]")
                .setAppName("测试序列化")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .registerKryoClasses(Array(classOf[Dept]))
        )

        val dept1 = new Dept(1, "研发部")
        val dept0 = new Dept(0, "未知")

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 1),
            ("b", 4), ("F", 5), ("K", 6)
        ))

        rdd.map(t => {
            t._2 match {
                case 1 => (t._1, dept1)
                case _ => (t._1, dept0)
            }
        }).collect() foreach println


    }

    class Dept(var id: Int, var name: String) extends Serializable {
        override def toString: String = id + "\t" + name
    }
复制代码
相关文章
相关标签/搜索