Spark编程指南分享

 

转载自:https://www.2cto.com/kf/201604/497083.htmlhtml

一、概述

在高层的角度上看,每个Spark应用都有一个驱动程序(driver program)。驱动程序就是运行用户的main主程序并在集群上执行各类并行操做的程序。Spark中的一个主要的抽象概念就是弹性分布数据集(resilient distributed dataset,RDD),RDD是分布在多个节点构成的集群上的元素的集合,并支持并行操做。RDD能够由Hadoop的分布式文件系统(或其余支持Hadoop分布式系统的文件系统)中的文件建立,也能够经过在驱动程序中的Scala集合建立,同时,对一个RDD进行转换操做(transform)也能够建立一个新的RDD。用户也能够将RDD存储在内存中,这样RDD就能够在后序的并行操做中高效地重复使用。最后,RDD可以从某个节点的实效中进行恢复。java

Spark的第二个抽象概念就是共享变量(shared variables)。共享变量应用于并行操做中。默认状况下,当Spark在几个节点构成的集群上并行执行一系列任务时,Spark会携带函数中使用的每个变量到每个任务中。有时,一些变量须要在几个任务间共享,或者在任务和驱动程序间共享。Spark支持两种共享变量:将数据缓存中全部节点中的广播变量(broadcast variables),和只能增长的累加器(accumulators),好比计数器和总和(sums)。算法

这里我只使用了Scala的版本。Spark还支持Java和Python。若是打开Spark的交互式脚本,很容易理解这个知道的内容。shell

二、链接Spark

Spark 1.6.1使用Scala 2.10版本。若是使用Scala编写Spark应用,应该使用兼容的版本(好比2.10.x)。apache

在Intellij Idea搭建Spark开发环境中介绍了使用Idea+Maven搭建Spark开发环境。若是编写Spark应用,应该添加Spark的依赖,具体的信息以下:数组

 
 
  1. groupId = org.apache.spark
  2. artifactId = spark-core_2.10
  3. version = 1.6.1 
 

一样,若是使用HDFS的分布式文件系统,也要添加hadoop-client的依赖:缓存

 
 
  1. groupId = org.apache.hadoop
  2. artifactId = hadoop-client
  3. version = <your-hdfs-version></your-hdfs-version> 
 

最后,须要在Scala中添加以下import语句:安全

 
 
  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.SparkConf 

 这里要注意的是,在Spark 1.3.0版本之前,Scala中须要显示添加import org.apache.spark.SparkContext._来使用重要的隐式转换。不过1.3.0之后的版本就不须要了。markdown

三、初始化Spark

编写Spark的第一件事就是建立SparkContext对象,来告诉Spark如何使用一个集群。建立SparkContext对象须要使用SparkConf对象,这个对象包含一些关于应用程序的信息。网络

注意,每个JVM上只能有一个活跃的SparkContext对象。因此,必须调用stop()来终止SparkContext对象才能建立另外一个新的SparkContext对象。

 
 
  1. val conf = new SparkConf().setAppName(appName).setMaster(master)
  2. new SparkContext(conf) 
 

其中,appName参数是应用程序在集群中的名字。master参数指定主节点的位置,它能够是一个Spark,Mesos或者YARN集群的URL地址,也可使用本地模式的“local”。通常来讲,当应用程序运行在集群上时,在代码上硬编码master并不方便,而是在使用spark-submti提交应用的时候使用参数指定master。对于本地测试来讲,可使用“local”来运行Spark程序。

3.一、使用交互式shell

Spark提供了交互式的shell,在这个交互式shell中,已经建立了一个SparkContext对象,这个变量就是sc,不用建立直接使用便可,本身建立的反而不能用。可使用--master参数指定sc连接到哪一个master,还有不少参数能够选择,这里给出几个例子。

下面使用本地模式4个核:

 
 
  1. ./bin/spark-shell --master local[4] 
 

下面使用--jar指定了要运行的jar包:

 
 
  1. ./bin/spark-shell --master local[4] --jars code.jar 

 下面使用--packages添加了依赖:

 
 
 
  1. ./bin/spark-shell --master local[4] --packages "org.example:example:0.1" 
 

上面仅仅是一些例子,Spark还有不少启动参数,能够运行spark-shell --help得到更多的信息。下面是master可选的值:

 

Master URLs
Master URL 含义
local 在本地运行Spark而且只有一个worker线程(也就是说没有并行)
local[K] 在本地运行Spark并使用K个worker线程(基本上设置K值为本地机器的核心数)
local[*] 在本地运行Spark,而且使用本地机器尽量多的核心数
spark://HOST:PORT 链接到给定的Spark standalone集群上。端口号必须是主节点设置使用的端口号,默认使用7077
mesos://HOST:PORT 链接到给定的Mesos集群。端口号默认使用5050
yarn 链接到yarn集群,能够经过--deploy-mode参数设置使用client或cluster两种模式
yarn-client 和使用--deploy-mode client链接到yarn等价
yarn-cluster 和使用--deploy-mode cluster链接到yarn等价

 

四、弹性分布数据集(RDD)

Spark的一个核心抽象概念就是弹性分布数据集(resilient distributed dataset,RDD)。RDD是一个能够并行操做的可容错的元素集合。有两种方法能够建立一个RDD:将驱动程序中已存在的集合进行并行化操做,或者从外部存储系统中建立RDD。事实上,还能够经过对已有的RDD进行转化操做建立一个新的RDD。

4.一、对集合序列化

能够经过调用SparkContext对象的parallelize方法把一个在驱动程序中已存在的集合序列化为RDD。集合中的元素会被复制为分布式数据集来支持并行计算。下面的例子将一个1到5的数组序列化操做为一个RDD:

 
 
  1. val data = Array(1, 2, 3, 4, 5)
  2. val distData = sc.parallelize(data) 
 

RDD一旦建立,就能够并行操做。例如,可使用下面的操做计算全部元素的和:

 
 
  1. val sum=distData.reduce((a,b)=>a+b) 

 稍后会介绍RDD的一些操做。

 

在将一个集合序列化时一个重要的参数就是partitions,也就是说要将这个集合分红几个部分。Spark会对每一个部分执行一个任务来达到并行操做的效果。通常来讲,集群中的每一个CPU分配2到4个部分。一般,Spark会基于集群的配置自动设置这个值。然而,用户也能够本身设置这个值:

 
 
  1. val distData=sc.parallelize(data,10) 
 
 

这样,就把data分为10个部分。

 

4.二、外部数据集

Spark能够经过任何支持Hadoop的存储系统建立分布式数据集,包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3等等。Spark支持文本文件,SequenceFiles,和任何实现了Hadoop InputFormat接口的文件。

文本文件的RDD能够经过SparkContext对象的textFile方法建立。这个方法传递一个URI参数来指定文件,URI参数可使本地文件路径或者hdfs://等,而后将文件读取为行的集合。例子以下:

 
 
  1. scala> val distFile = sc.textFile("data.txt")
  2. distFile: RDD[String] = MappedRDD@1d4cee08 
 

一旦建立,distFile就能够进行数据集操做。例如,下面的代码使用map和reduce操做计算全部行的大小:

 
 
  1. distFile.map(=> s.length).reduce((a, b) => a + b) 

 

 

使用Spark读取文件时的一些注意事项:

若是传递本地文件的路径,那么这个文件也必须在集群的全部worker节点中的相同路径上。或者将这个文件复制到全部worker上的相同路径上;Spark中全部关于文件输入的方法,包括textFile,都支持目录,压缩文件和通配符。好比textFile("/my/directory"),textFile("/my/directory/*.txt"), 和textFile("/my/directory/*.gz");textFile方法也能够经过第二个参数来指定这个文件被分为几个部分。默认状况下,文件基于块来进行划分(在HDFS中块默认为64MB或128MB,我这里是128MB)。不过能够指定的分区数比块数多,但不能比块数少;

除了文本文件,Spark的Scala API还支持另外的数据格式:

SparkContext.wholeTextFiles容许读取一个目录下全部的小文本文件,而后返回(文件名,内容)键值对。这和textFile返回文件中的全部行不一样;对于SequenceFiles,可使用SparkContext对象的sequenceFile[K,V]方法,其中K和V是文件中键和值的类型。这些文件必须是实现Hadoop的Writable接口的类,好比IntWritable和Text。并且,Spark也容许使用基本类型,好比sequenceFile[Int,String]就会自动读取IntWritable和Text;对于其它的Hadoop输入格式,可使用SparkContext.hadoopRDD方法;RDD.saveAsObjectFile和SparkContext.objectFiles方法支持将一个RDD中的Java对象序列化为对象文件。不过这个方法并不像Avro那样高效;

4.三、RDD操做

RDD支持两种类型的操做:转换操做(transformations)和行动操做(actions)。转换操做从一个已有的RDD建立一个新的RDD;行动操做对这个RDD数据集进行一系列运算后返回驱动程序一个结果。区分两种操做的办法就是看返回结果的类型,若是返回的是一个RDD,那么就是转换操做,不然就是行动操做。好比,map就是一个转换操做,它把数据集中的每个元素都调用一个函数,将结果做为新的RDD的元素。而reduce就是一个行动操做,它对数据集的全部元素调用一个聚合函数,而后把最终结果返回驱动程序(尽管还存在一个并行的方法reduceByKey返回一个分布数据集)。

Spark中全部的转换操做都是惰性求值的。所谓惰性求值,是说并不立刻计算结果,而仅仅记住对这个RDD进行的转换操做序列。只有当对这个RDD调用一个须要返回给驱动程序一个结果的行动操做时才计算结果。Spark的这个设计使得程序运行得更有效。好比,使用map方法建立的一个RDD极可能调用reduce来计算结果并将结果返回个驱动程序,惰性求值使得只需给驱动程序返回reduce计算的结果,不然要返回一个map操做建立的RDD,显然这个代价太大了。

默认状况下,对经过转换操做造成的RDD执行行动操做时都会从新计算这个RDD。这种状况下,能够将RDD经过persist或者cache方法存储在内存中,Spark会把数据集中的元素存在集群中的全部节点上,这样下次计算的时候就能够快速获得结果。一样,Spark也能够把RDD存储在磁盘上,或者在多个节点间进行复制。

4.3.一、基础

下面的程序给出了RDD的基础操做:

 
 
  1. val lines = sc.textFile("data.txt")
  2. val lineLengths = lines.map(=> s.length)
  3. val totalLength = lineLengths.reduce((a, b) => a + b) 
 

第一行经过SparkContext对象的textFile读取文件建立了一个RDD变量lines,lines数据集并不会加载到内存中或采起其它行动,它仅仅是一个指向文件的指针。第二行定义了一个变量lineLengths保存map操做的结果,这是一个转换操做,所以lineLengths也是一个RDD,map方法须要一个函数参数,对象Spark传递一个参数后面会介绍,这里的函数参数将每一行映射为一个数值,这个数值就是这一行的长度。注意,因为Spark的惰性求值程序进行到这里并无计算lineLengths。最终,对lineLengths调用reduce方法,这是一个行动操做。这时,Spark将计算分解为多个任务运行在集群的多个机器上,而后每一个机器执行本身那部分的map和reduce操做,计算完后将本身这部分的结果返回给驱动程序。

 

若是在后序的操做中还会用到lineLengths,就能够将它存储在内存中:

 
 
  1. lineLengths.persist() 
 

这样,下一次调用reduce的时候就不用再计算了。

 

4.3.二、给Spark传递函数

Spark的API严重依赖从驱动程序传递函数给集群。有两种方式来传递函数:

 

匿名函数语法,能够减小代码;在一个单独的object中定义一个函数。好比,能够定义一个object MyFunctions,而后传递MyFunctions:

 
 
  1. object MyFunctions {
  2.   def func1(s: String): String = { ... }
  3. }
  4.  
  5. myRdd.map(MyFunctions.func1) 

 

 

 

注意还能够传递一个类实例的方法引用(和object相反),不过这须要把包含这个类的对象同这个方法传递过去。好比,考虑下面的代码:

 
 
  1. class MyClass {
  2.   def func1(s: String): String = { ... }
  3.   def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
  4. } 
 

这里,若是咱们建立一个新的MyClass实例而后调用doStuff方法,里面的map引用这个MyClass实例里面的func1方法,因此整个对象须要被传递到集群中。这和rdd.map(x => this.func1(x))类似。

 

一样,若是访问外部属性也须要传递整个对象:

 
 
  1. class MyClass {
  2.   val field = "Hello"
  3.   def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(=> field + x) }
  4. } 
 

这和rdd.map(x => this.field + x)类似。为了不这样,能够将属性复制到方法内部而不是在外部访问:

 
 
  1. def doStuff(rdd: RDD[String]): RDD[String] = {
  2.   val field_ = this.field
  3.   rdd.map(=> field_ + x)
  4. } 

 

 

4.3.三、理解闭包

Spark的一个难点就是理解在集群执行代码时变量和方法的做用域和声明周期。对在RDD做用域外部的变量进行修改操做很让人迷惑。在下面的例子中,咱们考察使用foreach()方法增长的代码,不过一样的事情也会发生在其它的操做中。

4.3.3.一、一个例子

考虑下面的RDD元素相加的例子,这个例子是否运行在同一个JVM上会产生不一样的结果。好比,运行在本地模式(--master=local[n])和将Spark应用部署到集群上(spark-submit):

 
 
  1. var counter = 0
  2. var rdd = sc.parallelize(data)
  3.  
  4. // Wrong: Don't do this!!
  5. rdd.foreach(=> counter += x)
  6.  
  7. println("Counter value: " + counter) 
 


4.3.3.二、本地模式与集群模式

上面代码的行为是不肯定的,可能不会获得预期结果。执行工做(job)时,Spark会把RDD的操做分解为多个任务(task),每个任务由一个执行者(executor)执行。在执行以前,Spark会计算任务的闭包(closure)。闭包就是执行者在对RDD进行本身那部分计算时须要可见的变量和方法(在这个例子中是foreach)。这些闭包会被序列化并传递到每一个执行者上。

传递到每一个执行者上的闭包中的变量如今被赋值,如今foreach方法中的counter再也不是驱动程序中的counter。在驱动节点的内存中仍然有一个counter,但这个counter并不能被执行者访问到,执行者只能访问到在闭包的序列化中复制到执行者上的counter。所以,counter的最终结果仍然是0,由于对counter的全部操做都是在闭包序列化中的counter值。

在本地模式,一些状况下foreach方法会像驱动程序同样运行在同一个JVM上,所以会引用原始的counter,并正确更新它的值。

在这种状况下为了写出良好行为的代码,应该使用累加器(accumulator)。Spark中的累加器在特殊状况下使用,它提供一种在集群中执行被分解的状况下安全更新变量值的机制。累加器的细节会在这个指导的累加器部分讨论。

通常的,闭包,不该该被用来改变全局状态。Spark不定义也不保证对从闭包外部引用的对象的作出改变的行为。一些代码可能在本地模式下可以运行,不过这仅仅是巧合,在分布式模式下就不会获得指望的结果。若是须要全局的聚合操做就使用累加器。

4.3.3.三、打印RDD中的元素

另外一个常见用法就是使用rdd.foreach(println)或rdd.map(println)方法打印RDD中的元素。在一台机器上,这个操做会获得预期的结果,打印出RDD中的全部元素。然而在集群模式下,执行者调用的打印方法会在执行者的标准输出(stdout)上打印结果,而不是驱动程序的标准输出上,所以驱动程序的标准输出上并无结果。若是要将结果打印在驱动程序的标准输出上,须要使用collect()方法先使执行者将各自的RDD部分返回给驱动程序,而后调用foreach或map,即:

 
 
  1. rdd.collect().map(println) 
 

不过,这样可能会耗尽驱动程序的内存,由于大多数状况下RDD很大。若是只想查看RDD中的部分元素 ,可使用take()方法:

 
 
  1. rdd.take(100).foreach(println) 

这样只取RDD中的100个元素。

 

4.3.四、键值对操做

尽管Spark中的大多数操做都支持任何类型的RDD,不过Spark为包含键值对类型的RDD提供了一些特殊的操做。最多见的操做就是“混洗”(shuffle),好比根据键来对元素进行分组或聚合。

在Scala中,这些操做自动支持包含Tuple2(Scala中内置的元组类型,可使用(a,b)来建立)对象的RDD。

好比,下面的代码使用reduceByKey方法操做键值对RDD来计算每一行出现的次数:

 
 
  1. val lines = sc.textFile("data.txt")
  2. val pairs = lines.map(s =>(s,1))
  3. val counts = pairs.reduceByKey((a, b)=> a + b) 
 

第一行,使用textFile方法从文本文件建立一个RDD变量lines,第二行使用map操做,将lines中的每个元素(即一行)映射为一个元组(line,1),构成一个键值对RDD,即pairs,第三行调用行动操做reduceByKey,根据键进行reduce操做,将键相同的值相加,就获得了每一行元素出现的次数。

 

咱们也可使用counts.sortByKey()来对结果进行排序,还可使用counts.collect()将结果以数组的形式返回给驱动程序。

注意:当在键值对中的键使用自定义类型时,必须保证这个自定义类型的equals方法和hashCode方法匹配。也就是说,若是两个自定义类型的变量a,b的hashCode方法的返回值相同,那么a.equals(b)也必定返回true。

4.3.五、转换操做

下表列出了Spark支持的转换操做。具体的细节能够查看RDD的API文档(包括Scala,Java,Python,R),和键值对RDD的函数文档(Scala和R):

 

转换操做 含义
map(func) 对RDD中的每一个元素调用func函数,而后返回结果构成新的RDD
filter(func) 返回一个由经过传给filter的函数的元素组成的RDD
flatMap(func) 将函数应用于RDD中的每个元素,将返回的迭代器的全部内容构成新的RDD
mapPartitions(func) 和map相似,不过运行在RDD的不一样分块上,所以func的类型必须是Iterator=>Iterator
mapPartitionsWithIndex(func) 和mapPartitions相似,不过func函数提供一个整数值表示分块的下标,因此函数的类型是(Int,Iterator=>Iterator)
sample(withReplacement,fraction,seed) 对RDD采样,以及是否替换
union(otherDataset) 生成一个包含两个RDD中全部元素的RDD
intersection(otherDataset) 返回由两个RDD中共同元素组成的RDD
distinct([numTasks]) 返回去除原RDD中重复元素的新的RDD
groupByKey([numTasks]) 对具备相同键的值进行分组。注意若是仅仅是为了聚合,使用reduceByKey或aggregateByKey性能更好
reduceByKey(func,[numTasks]) 合并既有相同键的值
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]) 和reduceByKey相似,不过须要提供一个初始值
sortByKey([ascending],[numTasks]) 返回一个根据键排序的RDD
join(otherDataset,[numTasks]) 对两个RDD进行内链接。其它的链接操做还有leftOuterJoin,rightOuterJoin和fullOuterJoin
cogroup(otherDataset,[numTasks]) 也叫groupWith,对类型(K,V)和(K,W)的RDD进行操做,返回(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset) 对类型T和U的RDD进行操做,返回(T,U)类型的RDD
pipe(command,[envVars]) 将RDD的每一个分区经过管道传给一个shell脚本
coalesce(numPartitions) 减小RDD的分区数量。当对一个大的RDD执行filter操做后使用会有效
repartition(numPartitions) 对RDD从新分区
repartitionAndSortWithinPartitions(partitioner) 根据给定的partitioner对RDD从新分区,在每一个分区再根据键排序

4.3.六、行动操做

下面列出了RDD的行动操做:

 

 

行动操做 含义
reduce(func) 使用func函数并行整合RDD中的全部元素
collect() 返回RDD中的全部元素
count() 返回RDD中的元素个数
first() 返回RDD中的第一个元素
take(n) 返回RDD中的n个元素
takeSample(withReplacement,num,[seed]) 从RDD中返回任意一些元素,结果不肯定
takeOrdered(n,[ordering]) 从RDD中按照提供的顺序返回最前面的n个元素
saveAsTextFile(path) 将RDD中的元素写入文本文件,Spark会调用元素的toString方法
saveAsSequenceFile(path) 将RDD中的元素保存为Hadoop的SequenceFile文件
saveAsObjectFile(path) 将RDD中的元素使用Java中的序列化保存为对象文件,可使用SparkContext.objectFile()读取
countByKey() 操做键值对RDD,根据键值分别计数
foreach(func) 对RDD中的每一个元素调用给定的函数func

 

4.3.七、混洗操做(Shuffle)

在Spark中,一些操做会触发一个叫作混洗(shuffle)的事件。混洗是Spark的机制,经过对数据进行从新分组使得同一组的在同一个分区。这一般会致使在执行者和机器之间数据的复制与传递,所以混洗操做是一个复杂并消耗性能的操做。

4.3.7.一、背景

咱们以reduceByKey操做来理解混洗操做期间发生了什么。reduceByKey操做会生成一个新的RDD,原键值对RDD中的全部元素根据键的不一样分组后使用reduce操做获得一个结果,由全部的键和这个结果构成的键值对元素构成了这个新的RDD。问题在于并非每全部键相同的元素都在同一个分区上,甚至不在同一个机器上,但为了计算结果,它们必须从新存储到同一个位置。

在Spark中,数据通常不会为了某个操做而具体地根据须要进行分组存储。在计算过程当中,每个执行者对本身的分区进行计算,所以,为了给执行者组织对应的分区,Spark须要执行一个满射操做来从新组织数据。Spark必须读取全部的分区来获得全部键,而后对每一个键将键相同的元素组织到一块儿执行reduce操做来计算结果。这就是混洗。

尽管通过混洗后每一个分区的元素集合分区自己都是肯定的,可是元素的顺序不肯定。若是要是数据具备肯定的顺序,可使用下面的混洗方法:

 

使用mapPartitions来对分区排序;使用reparttitionAndSortWithinPartitions高效的在重分区的同时排序分区;使用sortBy排序一个RDD;

 

能够致使混洗的操做有重分区操做好比reparation和coalesce,ByKey操做(除了计数counting)好比groupByKey和reduceByKey,还有链接操做(join)好比cogroup和join。

4.3.7.二、性能影响

因为涉及到磁盘I/O,数据序列化和网络I/O,因此混洗操做性能消耗较大。为了从新组织数据,Spark会产生一些map任务来组织数据,一些reduce任务来进行聚合。这一名称来自于MapRedece,但并不直接和Spark的map和reduce操做相关。

本质上,map任务的结果会存在内存中直到存不下为止。而后,这些结果根据所在的分区进行排序,写入单一的文件中。在reduce阶段,任务会读取这些排好序的相关分块。

一些混洗操做会消耗大量的堆空间,由于它们在转化记录以前或以后会之内存数据结构组织记录。具体来讲,reduceByKey和aggregateByKey在map阶段构造这些数据结构,而后ByKey系列操做在reduce阶段生成数据。当内存中存不下这些数据时,Spark会将这些数据存到磁盘中,致使额外的磁盘I/O并增长垃圾收集。

混洗也会在磁盘上产生大量的中间数据。在Spark 1.3中,这些数据会一直保存到相关的RDD不会再次使用,而后被当作垃圾收集。这对于操做谱系还会从新计算的时候是有益的。若是应用常用这些RDD或者垃圾收集机制没能常常收集,垃圾收集会通过很长一段时间才发生。这意味着长时间运行的应用会占用大量的磁盘空间。在建立SparkContext时可使用spark.local.dir来指定这个临时存储路径。

经过调整各类参数配置能够设置混洗的行为。这会在Spark配置里介绍。

4.四、RDD的持久化(缓存)

Spark中的一个重要特征就是在操做过程当中将数据集缓存在内存中。当缓存一个RDD后,计算RDD的节点会分别保存它们所求出的分区数据,而后在随后的操做中重复使用。这使得后序的操做执行的更快(一般快10几倍)。持久化是迭代式算法和快速交互式使用的关键。

可使用persist或cache方法持久化一个RDD。当对一个RDD第一次执行行动操做时,RDD会保存在节点的内存中。Spark的缓存是可容错的,意味着若是某个分区丢失了,RDD会自动根据在建立这个RDD时的转换操做从新计算。

并且,每个持久化的RDD可使用不一样的持久化级别,容许将数据持久化到磁盘,做为Java序列化对象存储在内存,在节点中备份,或者存在堆外空间上。这些持久化级别能够经过传递一个StorageLevel对象给persist方法。cache方法只能使用默认的StorageLevel.MEMORY_ONLY这一个持久化级别。下面是全部的级别:

 

持久化级别 含义
MEMORY_ONLY 在JVM上存储非序列化的Java对象。若是内存不够,一些分区不会存储,直到须要的时候从新计算。这是默认的级别。
MEMORY_AND_DISK 若是内存不够,会把剩余的分区存储在磁盘上。
MEMORY_ONLY_SER 存储序列化的Java对象。这通常比序列化空间效率高,不过读取的时候消耗CPU较多。
MEMORY_AND_DISK_SER 和上一个类似,不过若是内存不够会存储到磁盘上,内存中存放序列化后的数据。
DISK_ONLY 仅存储在磁盘上。
MEMORY_ONLY_2,MEMORY_AND_DISK_2 和上一个类似,只不过会在集群中的两个节点上备份。
OFF_HEAP(实验中)  

 

注意,在Python中,会始终序列化要存储的数据,因此持久化级别默认值就是以序列化后的对象存储在JVM堆空间中。

Spark会在混洗操做(好比reduceByKey)中自动持久化一些中间数据,尽管用户并无调用persist。这就避免了在混洗过程当中若是某个节点发生故障而从新计算整个数据集。咱们仍然建议若是打算重复使用RDD就使用persist对其进行持久化。

4.4.一、选择哪一个持久化级别呢?

Spark的持久化级别是为了提供在内存使用和CPU效率间不一样的平衡选择。咱们建议经过如下步骤进行级别选择:

 

若是RDD适合默认的级别(MEMORY_ONLY),那么就使用默认值。这是CPU效率最高的选项,使得对RDD的操做尽量的快;若是RDD不适合MEMORY_ONLY,尝试使用MEMORY_ONLY_SER,而后选择一个快的序列化库对数据进行序列化来高效使用空间,不过访问仍是很快;除非计算数据集的函数很是耗时,或者这些函数过滤掉大多数的数据,不然不要将数据持久化到磁盘上。否则,从新计算数据可能会和从磁盘中读取同样快;若是想出错时尽快恢复,就是用备份。全部的级别都使用从新计算保证容错性,但备份级别能够保证程序继续执行而不用等待从新计算丢失的分区;在有大量内存空间和多应用程序的实验中,实验中的OFF_HEAP模式有以下的优势:容许多个执行者能共享Tachyon的内存池;有效的下降了垃圾收集的消耗;若是单个执行者发生故障缓存的数据不会丢失;

 

4.4.二、删除数据

Spark会自动跟踪每一个节点的缓存使用状况,而且会根据最近最少使用原则(LRU)将最老的分区从内存中删除。若是想手动删除数据,使用unpersist方法。

五、共享变量

一般,当一个传递给一个Spark操做(好比map或reduce)的函数执行在远程的集群节点上时,它是对函数中使用的变量的另外一份副本进行操做的。这些变量会被复制到每个机器上,而且全部对这些变量的更新不会返回到驱动程序那里。在任务间支持通用的、读写共享的变量并不有效。然而,Spark提供两种经常使用形式的有限类型的共享变量:广播变量(broadcast)和累加器(accumulators)。

5.一、广播变量

广播变量容许开发者在每一个机器上缓存一个只读变量而不是把它在任务间复制。它们给每个节点一个大规模数据的一个副本,并经过高效的方式完成。Spark也会试图使用更好的广播算法来分布式存储广播变量来减小网络流量。

Spark的行动操做在一系列阶段(stage)执行,经过混洗操做进行分割。Spark会自动广播每一个阶段任务都须要的数据。这些数据以序列化的形式缓存而后再每一个任务使用以前反序列化。这意味着只有当任务须要在跨多个阶段执行过程当中使用同一个数据时,或者以反序列化缓存数据是重要的时候,显示广播数据才有用。

能够经过使用SparkContext.broadcast()方法来对变量v建立一个广播变量。广播变量将v包裹起来,能够经过调用value方法获取v。下面的代码演示了广播变量的用法:

 
 
  1. scala> val broadcastVar = sc.broadcast(Array(1,2,3))
  2. broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]]=Broadcast(0)
  3.  
  4. scala> broadcastVar.value
  5. res0:Array[Int]=Array(1,2,3) 
 


当广播变量建立后,在集群中全部使用变量v的方法都应该使用这个广播变量,所以v不会被复制到节点超过一次。并且,对象v在广播以后不该该改变,这样全部的节点都会得到广播变量中相同的值。

 

5.二、累加器

累加器就是在相应操做中只能增长的变量,所以能更有效的支持并行操做。累加器能够用来做为计数器和求和。Spark支持数字类型的累加器,开发者能够增长新类型的支持。若是累加器在建立时设置了一个名字,那么名字就会在Spark的UI中显示。这对于理解阶段的执行过程有必定的帮助。

能够经过调用SparkContext.accumulator(v)对变量v建立一个累加器。以后集群上的任务就能够经过add方法或+=操做符增长累加器。然而,任务并不能读取累加器的值。只用驱动程序才能够经过value方法读取累加器的值。

下面的代码演示了使用累加器计算数组的和:

 
 
  1. scala> val accum = sc.accumulator(0,"My Accumulator")
  2. accum: spark.Accumulator[Int]=0
  3.  
  4. scala> sc.parallelize(Array(1,2,3,4)).foreach(x => accum += x)
  5. ...
  6. 10/09/2918:41:08 INFO SparkContext:Tasks finished in0.317106 s
  7.  
  8. scala> accum.value
  9. res2:Int=10 
 


上面的代码使用内置支持的Int类型建立了累加器,开发者能够经过实现AccumulatorParam接口来对本身的类型增长累加器支持。AccumulatorParam接口有两个方法:zero方法提供一个本身的类型的零值,addInPlace方法定义两个值相加的操做。好比,假设咱们有一个能够表明数学上向量的类型Vector,能够这样写:

 
 
  1. objectVectorAccumulatorParamextendsAccumulatorParam[Vector]{
  2.   def zero(initialValue:Vector):Vector={
  3.     Vector.zeros(initialValue.size)
  4.   }
  5.   def addInPlace(v1:Vector, v2:Vector):Vector={
  6.     v1 += v2
  7.   }
  8. }
  9.  
  10. // Then, create an Accumulator of this type:
  11. val vecAccum = sc.accumulator(newVector(...))(VectorAccumulatorParam) 

 

 


Scala中,Spark也支持更经常使用的A吃醋姆拉不了接口来累加那些结果类型和元素类型不一样的数据(好比经过收集数据构成一个list列表),还有一个SparkContext.accumulableCollection方法累加常见的Scala集合类型。

 

对累加器来讲,更新老是在行动操做中执行,Spark保证每个任务对累加器的更新只有一次,好比重启的任务不会更新这个值。在转换操做中,用户应该意识到若是任务或者job阶段重复执行,那每一个任务的更新操做可能执行屡次。

累加器并无改变Spark的惰性求值策略。若是一个RDD的一个累加器被更新了,RDD在行动操做中累加器的值只更新一次。所以,当在惰性的转换操做好比map中,累加器的更新并不能保证会执行。下面的代码片断演示了这个属性:

 
 
  1. val accum = sc.accumulator(0)
  2. data.map { x => accum += x; f(x)}
  3. // Here, accum is still 0 because no actions have caused the map to be computed. 
 

六、部署到集群上

 

在应用提交指导中介绍如何提交应用到集群上。简单来讲,一旦应用打包成jar文件,spark-submit能够将你的应用部署到任何集群上。

七、更多

在Spark的网站上有一些Spark应用的例子。并且,在Spark的examples目录下也有一些程序实例。你能够经过run-example脚本运行例子:

 
 
  1. ./bin/run-example SparkPi
相关文章
相关标签/搜索