Spark入门基础(1)——RDD【转】

RDD是存储数据的最小单位,spark在并行计算的时候会将任务细化到rdd的维度,分到不一样的cluster上计算。html

生成RDD

// @param numSlices number of partitions to divide the collection into
// parallelize() 的第二个参数是slices的数目,它指定了将数据集切分的份数。
sc.parallelize(Array("one", "two", "three", "four"), 2)
sc.parallelize(List("one", "two", "three", "four"), 2)
sc.textFile("hdfs:///user/test.txt")

map

map 会操做 RDD 中的每个元素,map 中的每一次循环都是RDD中的某一个元素,map 对数据的操做互不影响,因此 map 操做是天生并行化的。apache

// 测试数据 => 姓名#语文成绩#数学成绩#英语成绩
// 小红#70#80#80
// 小明#10#20#30
// 小张#30#40#80
// 小李#80#90#90
// 小亮#30#80#80
// 小慧#80#20#10
// 小黑#10#20#10
// 小红#30#20#20
// 小黑#90#80#90

// map操做数据集的每一行数据,先按照 "#" 来把数据分割成 List,而后再操做每一个 List 将 List 中的数据按照须要取出来进行整合
val data = sc.textFile(student_grade).map(_.split("#", -1)).map {
      line =>
        val name = line(0)
        val chinese = line(1)
        val math = line(2)
        val english = line(3)
        (name, chinese, math, english)
    }

RDD全称叫作弹性分布式数据集(Resilient Distributed Datasets),它是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能经过其余RDD转换而建立,为此,RDD支持丰富的转换操做(如map, join, filter, groupBy等),经过这种转换操做,新的RDD则包含了如何从其余RDDs衍生所必需的信息,因此说RDDs之间是有依赖关系的。基于RDDs之间的依赖,RDDs会造成一个有向无环图DAG,该DAG描述了整个流式计算的流程,实际执行的时候,RDD是经过血缘关系(Lineage)一鼓作气的,即便出现数据分区丢失,也能够经过血缘关系重建分区,总结起来,基于RDD的流式计算任务可描述为:从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组肯定性操做构成的DAG,而后写回稳定存储。另外RDD还能够将数据集缓存到内存中,使得在多个操做之间能够重用数据集,基于这个特色能够很方便地构建迭代型应用(图计算、机器学习等)或者交互式数据分析应用。能够说Spark最初也就是实现RDD的一个分布式系统,后面经过不断发展壮大成为如今较为完善的大数据生态系统,简单来说,Spark-RDD的关系相似于Hadoop-MapReduce关系。编程

RDD特色

RDD表示只读的分区的数据集,对RDD进行改动,只能经过RDD的转换操做,由一个RDD获得一个新的RDD,新的RDD包含了从其余RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。若是血缘关系较长,能够经过持久化RDD来切断血缘关系。缓存

分区

以下图所示,RDD逻辑上是分区的,每一个分区的数据是抽象存在的,计算的时候会经过一个compute函数获得每一个分区的数据。若是RDD是经过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,若是RDD是经过其余RDD转换而来,则compute函数是执行转换逻辑将其余RDD的数据进行转换。 输入图片说明机器学习

只读

以下图所示,RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上建立新的RDD。分布式

输入图片说明

由一个RDD转换到另外一个RDD,能够经过丰富的操做算子实现,再也不像MapReduce那样只能写map和reduce了,以下图所示。ide

输入图片说明

RDD的操做算子包括两类,一类叫作transformations,它是用来将RDD进行转化,构建RDD的血缘关系;另外一类叫作actions,它是用来触发RDD的计算,获得RDD的相关计算结果或者将RDD保存的文件系统中。下图是RDD所支持的操做算子列表。函数

输入图片说明

依赖

RDDs经过操做算子进行转换,转换获得的新RDD包含了从其余RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。以下图所示,依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的,另外一种是宽依赖,下游RDD的每一个分区与上游RDD(也称之为父RDD)的每一个分区都有关,是多对多的关系。oop

输入图片说明

经过RDDs之间的这种依赖关系,一个任务流能够描述为DAG(有向无环图),以下图所示,在实际执行过程当中宽依赖对应于Shuffle(图中的reduceByKey和join),窄依赖中的全部转换操做能够经过相似于管道的方式一鼓作气执行(图中map和union能够一块儿执行)。post

输入图片说明

缓存

若是在应用程序中屡次使用同一个RDD,能够将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系获得分区的数据,在后续其余地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。以下图所示,RDD-1通过一系列的转换后获得RDD-n并保存到hdfs,RDD-1在这一过程当中会有个中间结果,若是将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程当中,就不会计算其以前的RDD-0了。

输入图片说明

checkpoint

虽然RDD的血缘关系自然地能够实现容错,当RDD的某个分区数据失败或丢失,能够经过血缘关系重建。可是对于长时间迭代型应用来讲,随着迭代的进行,RDDs之间的血缘关系会愈来愈长,一旦在后续迭代过程当中出错,则须要经过很是长的血缘关系去重建,势必影响性能。为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就能够切断以前的血缘关系,由于checkpoint后的RDD不须要知道它的父RDDs了,它能够从checkpoint处拿到数据。

小结

总结起来,给定一个RDD咱们至少能够知道以下几点信息:一、分区数以及分区方式;二、由父RDDs衍生而来的相关依赖信息;三、计算每一个分区的数据,计算步骤为:1)若是被缓存,则从缓存中取的分区的数据;2)若是被checkpoint,则从checkpoint处恢复数据;3)根据血缘关系计算分区的数据。

编程模型

在Spark中,RDD被表示为对象,经过对象上的方法调用来对RDD进行转换。通过一系列的transformations定义RDD以后,就能够调用actions触发RDD的计算,action能够是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时能够经过管道的方式传输多个转换。

要使用Spark,开发者须要编写一个Driver程序,它被提交到集群以调度运行Worker,以下图所示。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务。

输入图片说明

应用举例

下面介绍一个简单的spark应用程序实例WordCount,统计一个数据集中每一个单词出现的次数,首先将从hdfs中加载数据获得原始RDD-0,其中每条记录为数据中的一行句子,通过一个flatMap操做,将一行句子切分为多个独立的词,获得RDD-1,再经过map操做将每一个词映射为key-value形式,其中key为词自己,value为初始计数值1,获得RDD-2,将RDD-2中的全部记录归并,统计每一个词的计数,获得RDD-3,最后将其保存到hdfs。

import org.apache.spark._
import SparkContext._

object WordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: WordCount <inputfile> <outputfile>");
      System.exit(1);
    }
    val conf = new SparkConf().setAppName("WordCount")
    val sc = new SparkContext(conf)
    val result = sc.textFile(args(0))
                   .flatMap(line => line.split(" "))
                   .map(word => (word, 1))
                   .reduceByKey(_ + _)
    result.saveAsTextFile(args(1))
  }
}

输入图片说明

小结

基于RDD实现的Spark相比于传统的Hadoop MapReduce有什么优点呢?总结起来应该至少有三点:1)RDD提供了丰富的操做算子,再也不是只有map和reduce两个操做了,对于描述应用程序来讲更加方便;2)经过RDDs之间的转换构建DAG,中间结果不用落地;3)RDD支持缓存,能够在内存中快速完成计算。

原文连接

相关文章
相关标签/搜索