Spark编程指南《Spark 官方文档》

《Spark 官方文档》Spark编程指南

spark-1.6.0 [原文地址]html

Spark编程指南

概述

整体上来讲,每一个Spark应用都包含一个驱动器(driver)程序,驱动器运行用户的main函数,并在集群上执行各类并行操做。java

Spark最重要的一个抽象概念就是弹性分布式数据集(resilient distributed dataset – RDD),RDD 是一个可分区的元素集合,其包含的元素能够分布在集群各个节点上,而且能够执行一些分布式并行操做。RDD一般是经过,HDFS(或者其余Hadoop支 持的文件系统)上的文件,或者驱动器中的Scala集合对象,来建立或转换获得;其次,用户也能够请求Spark将RDD持久化到内存里,以便在不一样的并 行操做里复用之;最后,RDD具有容错性,能够从节点失败中自动恢复数据。python

Spark第二个重要抽象概念是共享变量,共享变量是一种能够在并行操做之间共享使用的变量。默认状况下,当Spark把一系列任务 调度到不一样节点上运行时,Spark会同时把每一个变量的副本和任务代码一块儿发送给各个节点。但有时候,咱们须要在任务之间,或者任务和驱动器之间共享一些 变量。Spark提供了两种类型的共享变量:广播变量累加器,广播变量能够用来在各个节点上缓存数据,而累加器则是用来执行跨节点的“累加”操做,例如:计数和求和。git

本文将会使用Spark所支持的全部语言来展现Spark的特性。若是你能启动Spark的交互式shell动手实验一下,效果会更好(对scala请使用bin/spark-shell,而对于python,请使用bin/pyspark)。github

 

连接Spark

Spark 1.6.0 使用了Scala 2.10。用Scala写应用的话,你须要使用一个兼容的Scala版本(如:2.10.X)web

同时,若是你须要在maven中依赖Spark,能够用以下maven工件标识:算法

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.6.0

另外,若是你须要访问特定版本的HDFS,那么你可能须要增长相应版本的hadoop-client依赖项,其maven工件标识以下:shell

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最后,你须要以下,在你的代码里导入一些Spark class:apache

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(在Spark 1.3.0以前,你须要显示的 import org.apache.spark.SparkContext._ 来启用这些重要的隐式转换)编程

初始化Spark

Spark应用程序须要作的第一件事就是建立一个 SparkContext 对象,SparkContext对象决定了Spark如何访问集群。而要新建一个SparkContext对象,你还得须要构造一个 SparkConf 对象,SparkConf对象包含了你的应用程序的配置信息。

每一个JVM进程中,只能有一个活跃(active)的SparkContext对象。若是你非要再新建一个,那首先必须将以前那个活跃的SparkContext 对象stop()掉。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

appName参数值是你的应用展现在集群UI上的应用名称。master参数值是Spark, Mesos or YARN cluster URL 或者特殊的“local”(本地模式)。实际上,通常不该该将master参数值硬编码到代码中,而是应该用spark-submit脚本的参数来设置。然而,若是是本地测试或单元测试中,你能够直接在代码里给master参数写死一个”local”值。

使用shell

在Spark shell中,默认已经为你新建了一个SparkContext对象,变量名为sc。因此spark-shell里不能自建SparkContext对 象。你能够经过–master参数设置要链接到哪一个集群,并且能够给–jars参数传一个逗号分隔的jar包列表,以便将这些jar包加到 classpath中。你还能够经过–packages设置逗号分隔的maven工件列表,以便增长额外的依赖项。一样,还能够经过 –repositories参数增长maven repository地址。下面是一个示例,在本地4个CPU core上运行的实例:

$ ./bin/spark-shell –master local[4]

或者,将code.jar添加到classpath下:

$ ./bin/spark-shell --master local[4] --jars code.jar

经过maven标识添加依赖:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

spark-shell –help能够查看完整的选项列表。实际上,spark-shell是在后台调用spark-submit来实现其功能的(spark-submit script.)

弹性分布式数据集(RDD)

Spark的核心概念是弹性分布式数据集(RDD),RDD是一个可容错、可并行操做的分布式元素集合。整体上有两种方法能够建立RDD对象:由驱 动程序中的集合对象经过并行化操做建立,或者从外部存储系统中数据集加载(如:共享文件系统、HDFS、HBase或者其余Hadoop支持的数据源)。

并行化集合

并行化集合是以一个已有的集合对象(例如:Scala Seq)为参数,调用 SparkContext.parallelize() 方法建立获得的RDD。集合对象中全部的元素都将被复制到一个可并行操做的分布式数据集中。例如,如下代码将一个1到5组成的数组并行化成一个RDD:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

一旦建立成功,该分布式数据集(上例中的distData)就能够执行一些并行操做。如,distData.reduce((a, b) => a + b),这段代码会将集合中全部元素加和。后面咱们还会继续讨论分布式数据集上的各类操做。

并行化集合的一个重要参数是分区(partition),即这个分布式数据集能够分割为多少片。Spark中每一个任务(task)都 是基于分区的,每一个分区一个对应的任务(task)。典型场景下,通常每一个CPU对应2~4个分区。而且通常而言,Spark会基于集群的状况,自动设置 这个分区数。固然,你仍是能够手动控制这个分区数,只需给parallelize方法再传一个参数便可(如:sc.parallelize(data, 10) )。注意:Spark代码里有些地方仍然使用分片(slice)这个术语,这只不过是分区的一个别名,主要为了保持向后兼容。

外部数据集

Spark 能够经过Hadoop所支持的任何数据源来建立分布式数据集,包括:本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持的文件格式包括:文本文件(text files)、SequenceFiles,以及其余 Hadoop 支持的输入格式(InputFormat)。

文本文件建立RDD能够用 SparkContext.textFile 方法。这个方法输入参数是一个文件的URI(本地路径,或者 hdfs://,s3n:// 等),其输出RDD是一个文本行集合。如下是一个简单示例:

scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08

建立后,distFile 就能够执行数据集的一些操做。好比,咱们能够把全部文本行的长度加和:distFile.map(s => s.length).reduce((a, b) => a + b)

如下是一些Spark读取文件的要点:

  • 若是是本地文件系统,那么这个文件必须在全部的worker节点上可以以相同的路径访问到。因此要么把文件复制到全部worker节点上同一路径下,要么挂载一个共享文件系统。
  • 全部Spark基于文件输入的方法(包括textFile)都支持输入参数为:目录,压缩文件,以及通配符。例如:textFile(“/my /directory”), textFile(“/my/directory/*.txt”), 以及 textFile(“/my/directory/*.gz”)
  • textFile方法同时还支持一个可选参数,用以控制数据的分区个数。默认地,Spark会为文件的每个block建立一个分区(HDFS上默认block大小为64MB),你能够经过调整这个参数来控制数据的分区数。注意,分区数不能少于block个数。

除了文本文件以外,Spark的Scala API还支持其余几种数据格式:

  • SparkContext.wholeTextFiles 能够读取一个包含不少小文本文件的目录,而且以 (filename, content) 键值对的形式返回结果。这与textFile 不一样,textFile只返回文件的内容,每行做为一个元素。
  • 对于SequenceFiles,能够调用 SparkContext.sequenceFile[K, V],其中 K 和 V 分别是文件中key和value的类型。这些类型都应该是 Writable 接口的子类, 如:IntWritable and Text 等。另外,Spark 容许你为一些经常使用Writable指定原生类型,例如:sequenceFile[Int, String] 将自动读取 IntWritable 和 Text。
  • 对于其余的Hadoop InputFormat,你能够用 SparkContext.hadoopRDD 方法,并传入任意的JobConf 对象和 InputFormat,以及key class、value class。这和设置Hadoop job的输入源是一样的方法。你还可使用 SparkContext.newAPIHadoopRDD,该方法接收一个基于新版Hadoop MapReduce API (org.apache.hadoop.mapreduce)的InputFormat做为参数。
  • RDD.saveAsObjectFile 和 SparkContext.objectFile 支持将RDD中元素以Java对象序列化的格式保存成文件。虽然这种序列化方式不如Avro效率高,却为保存RDD提供了一种简便方式。

RDD算子

RDD支持两种类型的算子(operation):transformation算子 和 action算子;transformation 算子能够将已有RDD转换获得一个新的RDD,而action算子则是基于数据集计算,并将结果返回给驱动器(driver)。例如,map是一个 transformation算子,它将数据集中每一个元素传给一个指定的函数,并将该函数返回结果构建为一个新的RDD;而 reduce是一个action算子,它能够将RDD中全部元素传给指定的聚合函数,并将最终的聚合结果返回给驱动器(还有一个reduceByKey算 子,其返回的聚合结果是一个数据集)。

Spark中全部transformation算子都是懒惰的,也就是说,这些算子并不当即计算结果,而是记录下对基础数据集(如: 一个数据文件)的转换操做。只有等到某个action算子须要计算一个结果返回给驱动器的时候,transformation算子所记录的操做才会被计 算。这种设计使Spark能够运行得更加高效 – 例如,map算子建立了一个数据集,同时该数据集下一步会调用reduce算子,那么Spark将只会返回reduce的最终聚合结果(单独的一个数据) 给驱动器,而不是将map所产生的数据集整个返回给驱动器。

默认状况下,每次调用action算子的时候,每一个由transformation转换获得的RDD都会被从新计算。然而,你也能够经过调用 persist(或者cache)操做来持久化一个RDD,这意味着Spark将会把RDD的元素都保存在集群中,所以下一次访问这些元素的速度将大大提 高。同时,Spark还支持将RDD元素持久化到内存或者磁盘上,甚至能够支持跨节点多副本。

基础

如下简要说明一下RDD的基本操做,参考以下代码:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

其中,第一行是从外部文件加载数据,并建立一个基础RDD。这时候,数据集并无加载进内存除非有其余操做施加于lines,这时候的lines RDD其实能够说只是一个指向 data.txt 文件的指针。第二行,用lines经过map转换获得一个lineLengths RDD,一样,lineLengths也是懒惰计算的。最后,咱们使用 reduce算子计算长度之和,reduce是一个action算子。此时,Spark将会把计算分割为一些小的任务,分别在不一样的机器上运行,每台机器 上都运行相关的一部分map任务,并在本地进行reduce,并将这些reduce结果都返回给驱动器。

若是咱们后续须要重复用到 lineLengths RDD,咱们能够增长一行:

lineLengths.persist()

这一行加在调用 reduce 以前,则 lineLengths RDD 首次计算后,Spark会将其数据保存到内存中。

将函数传给Spark

Spark的API 不少都依赖于在驱动程序中向集群传递操做函数。如下是两种建议的实现方式:

  • 匿名函数(Anonymous function syntax),这种方式代码量比较少。
  • 全局单件中的静态方法。例如,你能够按以下方式定义一个 object MyFunctions 并传递其静态成员函数 MyFunctions.func1:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

注意,技术上来讲,你也能够传递一个类对象实例上的方法(不是单件对象),不过这回致使传递函数的同时,须要把相应的对象也发送到集群中各节点上。例如,咱们定义一个MyClass以下:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

若是咱们 new MyClass 建立一个实例,并调用其 doStuff 方法,同时doStuff中的 map算子引用了该MyClass实例上的 func1 方法,那么接下来,这个MyClass对象将被发送到集群中全部节点上。rdd.map(x => this.func1(x)) 也会有相似的效果。

相似地,若是应用外部对象的成员变量,也会致使对整个对象实例的引用:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

上面的代码对field的引用等价于 rdd.map(x => this.field + x),这将致使应用整个this对象。为了不相似问题,最简单的方式就是,将field执拗到一个本地临时变量中,而不是从外部直接访问之,以下:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

理解闭包

Spark里一个比较难的事情就是,理解在整个集群上跨节点执行的变量和方法的做用域以及生命周期。Spark里一个频繁出现的问题就是RDD算子 在变量做用域以外修改了其值。下面的例子,咱们将会以foreach() 算子为例,来递增一个计数器counter,不过相似的问题在其余算子上也会出现。

示例

考虑以下例子,咱们将会计算RDD中原生元素的总和,若是不是在同一个JVM中执行,其表现将有很大不一样。例如,这段代码若是使用Spark本地模 式(–master=local[n])运行,和在集群上运行(例如,用spark-submit提交到YARN上)结果彻底不一样。

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

本地模式 v.s. 集群模式

上面这段代码其行为是不肯定的。在本地模式下运行,全部代码都在运行于单个JVM中,因此RDD的元素都可以被累加并保存到counter变量中,这是由于本地模式下,counter变量和驱动器节点在同一个内存空间中。

然而,在集群模式下,状况会更复杂,以上代码的运行结果就不是所预期的结果了。为了执行这个做业,Spark会将RDD算子的计算过程分割成多个独立的任务(task)- 每一个任务分发给不一样的执行器(executor)去执行。而执行以前,Spark须要计算闭包闭包是 由执行器执行RDD算子(本例中的foreach())时所须要的变量和方法组成的。闭包将会被序列化,并发送给每一个执行器。因为本地模式下,只有一个执 行器,全部任务都共享一样的闭包。而在其余模式下,状况则有所不一样,每一个执行器都运行于不一样的worker节点,而且都拥有独立的闭包副本。

在上面的例子中,闭包中的变量会跟随不一样的闭包副本,发送到不一样的执行器上,因此等到foreach真正在执行器上运行时,其引用的counter 已经再也不是驱动器上所定义的那个counter副本了,驱动器内存中仍然会有一个counter变量副本,可是这个副本对执行器是不可见的!执行器只能看 到其所收到的序列化闭包中包含的counter副本。所以,最终驱动器上获得的counter将会是0。

为了确保相似这样的场景下,代码能有肯定的行为,这里应该使用累加器Accumulator。累加器是Spark中专门用于集群跨节点分布式执行计算中,安全地更新同一变量的机制。本指南中专门有一节详细说明累加器。

一般来讲,闭包(由循环或本地方法组成),不该该改写全局状态。Spark中改写闭包以外对象的行为是未定义的。这种代码,有可能在本地模式下能正常工做,但这只是偶然状况,一样的代码在分布式模式下其行为极可能不是你想要的。因此,若是须要全局聚合,请记得使用累加器(Accumulator

打印RDD中的元素

另外一种常见习惯是,试图用 rdd.foreach(println) 或者 rdd.map(println) 来打印RDD中全部的元素。若是是在单机上,这种写法可以如预期同样,打印出RDD全部元素。而后,在集群模式下,这些输出将会被打印到执行器的标准输出 (stdout)上,所以驱动器的标准输出(stdout)上神马也看不到!若是真要在驱动器上把全部RDD元素都打印出来,你能够先调用collect 算子,把RDD元素先拉倒驱动器上来,代码多是这样:rdd.collect().foreach(println)。不过若是RDD很大的话,有可能 致使驱动器内存溢出,由于collect会把整个RDD都弄到驱动器所在单机上来;若是你只是须要打印一部分元素,那么take是不更安全的选 择:rdd.take(100).foreach(println)

使用键值对

大部分Spark算子都能在包含任意类型对象的RDD上工做,但也有一部分特殊的算子要求RDD包含的元素必须是键值对(key-value pair)。这种算子常见于作分布式混洗(shuffle)操做,如:以key分组或聚合。

在Scala中,这种操做在包含 Tuple2 (内建与scala语言,能够这样建立:(a, b) )类型对象的RDD上自动可用。键值对操做是在 PairRDDFunctions 类上可用,这个类型也会自动包装到包含tuples的RDD上。

例如,如下代码将使用reduceByKey算子来计算文件中每行文本出现的次数:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

一样,咱们还能够用 counts.sortByKey() 来对这些键值对按字母排序,最后再用 counts.collect() 将数据以对象数据组的形式拉到驱动器内存中。

注意:若是使用自定义类型对象作键值对中的key的话,你须要确保自定义类型实现了 equals() 方法(一般须要同时也实现hashCode()方法)。完整的细节能够参考:Object.hashCode() documentation

转换算子 – transformation

如下是Spark支持的一些经常使用transformation算子。详细请参考 RDD API doc (ScalaJavaPythonR) 以及 键值对 RDD 函数 (ScalaJava) 。

transformation算子 做用
map(func) 返回一个新的分布式数据集,其中每一个元素都是由源RDD中一个元素经func转换获得的。
filter(func) 返回一个新的数据集,其中包含的元素来自源RDD中元素经func过滤后(func返回true时才选中)的结果
flatMap(func) 相似于map,但每一个输入元素能够映射到0到n个输出元素(因此要求func必须返回一个Seq而不是单个元素)
mapPartitions(func) 相似于map,但基于每一个RDD分区(或者数据block)独立运行,因此若是RDD包含元素类型为T,则 func 必须是 Iterator<T> => Iterator<U> 的映射函数。
mapPartitionsWithIndex(func) 相似于 mapPartitions,只是func 多了一个整型的分区索引值,所以若是RDD包含元素类型为T,则 func 必须是 Iterator<T> => Iterator<U> 的映射函数。
sample(withReplacementfractionseed) 采样部分(比例取决于 fraction )数据,同时能够指定是否使用回置采样(withReplacement),以及随机数种子(seed)
union(otherDataset) 返回源数据集和参数数据集(otherDataset)的并集
intersection(otherDataset) 返回源数据集和参数数据集(otherDataset)的交集
distinct([numTasks])) 返回对源数据集作元素去重后的新数据集
groupByKey([numTasks]) 只对包含键值对的RDD有效,如源RDD包含 (K, V) 对,则该算子返回一个新的数据集包含 (K, Iterable<V>) 对。
注意:若是你须要按key分组聚合的话(如sum或average),推荐使用 reduceByKey或者 aggregateByKey 以得到更好的性能。
注意:默认状况下,输出计算的并行度取决于源RDD的分区个数。固然,你也能够经过设置可选参数 numTasks 来指定并行任务的个数。
reduceByKey(func, [numTasks]) 若是源RDD包含元素类型 (K, V) 对,则该算子也返回包含(K, V) 对的RDD,只不过每一个key对应的value是通过func聚合后的结果,而func自己是一个 (V, V) => V 的映射函数。
另外,和 groupByKey 相似,能够经过可选参数 numTasks 指定reduce任务的个数。
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks]) 若是源RDD包含 (K, V) 对,则返回新RDD包含 (K, U) 对,其中每一个key对应的value都是由 combOp 函数 和 一个“0”值zeroValue 聚合获得。容许聚合后value类型和输入value类型不一样,避免了没必要要的开销。和 groupByKey 相似,能够经过可选参数 numTasks 指定reduce任务的个数。
sortByKey([ascending], [numTasks]) 若是源RDD包含元素类型 (K, V) 对,其中K可排序,则返回新的RDD包含 (K, V) 对,并按照 K 排序(升序仍是降序取决于 ascending 参数)
join(otherDataset, [numTasks]) 若是源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中将包含内关联后key对应的 (K, (V, W)) 对。外关联(Outer joins)操做请参考 leftOuterJoin、rightOuterJoin 以及 fullOuterJoin 算子。
cogroup(otherDataset, [numTasks]) 若是源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中包含 (K, (Iterable<V>, Iterable<W>))。该算子还有个别名:groupWith
cartesian(otherDataset) 若是源RDD包含元素类型 T 且参数RDD(otherDataset)包含元素类型 U,则返回的新RDD包含前两者的笛卡尔积,其元素类型为 (T, U) 对。
pipe(command[envVars]) 以shell命令行管道处理RDD的每一个分区,如:Perl 或者 bash 脚本。
RDD中每一个元素都将依次写入进程的标准输入(stdin),而后按行输出到标准输出(stdout),每一行输出字符串即成为一个新的RDD元素。
coalesce(numPartitions) 将RDD的分区数减小到numPartitions。当之后大数据集被过滤成小数据集后,减小分区数,能够提高效率。
repartition(numPartitions) 将RDD数据从新混洗(reshuffle)并随机分布到新的分区中,使数据分布更均衡,新的分区个数取决于numPartitions。该算子老是须要经过网络混洗全部数据。
repartitionAndSortWithinPartitions(partitioner) 根据partitioner(spark自带有HashPartitioner和RangePartitioner等)从新分区RDD,而且在每 个结果分区中按key作排序。这是一个组合算子,功能上等价于先 repartition 再在每一个分区内排序,但这个算子内部作了优化(将排序过程下推到混洗同时进行),所以性能更好。

动做算子 – action

如下是Spark支持的一些经常使用action算子。详细请参考 RDD API doc (ScalaJavaPythonR) 以及 键值对 RDD 函数 (ScalaJava) 。

Action算子 做用
reduce(func) 将RDD中元素按func进行聚合(func是一个 (T,T) => T 的映射函数,其中T为源RDD元素类型,而且func须要知足 交换律 和 结合律 以便支持并行计算)
collect() 将数据集中全部元素以数组形式返回驱动器(driver)程序。一般用于,在RDD进行了filter或其余过滤操做后,将一个足够小的数据子集返回到驱动器内存中。
count() 返回数据集中元素个数
first() 返回数据集中首个元素(相似于 take(1) )
take(n) 返回数据集中前 个元素
takeSample(withReplacement,num, [seed]) 返回数据集的随机采样子集,最多包含 num 个元素,withReplacement 表示是否使用回置采样,最后一个参数为可选参数seed,随机数生成器的种子。
takeOrdered(n[ordering]) 按元素排序(能够经过 ordering 自定义排序规则)后,返回前 n 个元素
saveAsTextFile(path) 将数据集中元素保存到指定目录下的文本文件中(或者多个文本文件),支持本地文件系统、HDFS 或者其余任何Hadoop支持的文件系统。
保存过程当中,Spark会调用每一个元素的toString方法,并将结果保存成文件中的一行。
saveAsSequenceFile(path)
(Java and Scala)
将数据集中元素保存到指定目录下的Hadoop Sequence文件中,支持本地文件系统、HDFS 或者其余任何Hadoop支持的文件系统。适用于实现了Writable接口的键值对RDD。在Scala中,一样也适用于可以被隐式转换为 Writable的类型(Spark实现了全部基本类型的隐式转换,如:Int,Double,String 等)
saveAsObjectFile(path)
(Java and Scala)
将RDD元素以Java序列化的格式保存成文件,保存结果文件可使用 SparkContext.objectFile 来读取。
countByKey() 只适用于包含键值对(K, V)的RDD,并返回一个哈希表,包含 (K, Int) 对,表示每一个key的个数。
foreach(func) 在RDD的每一个元素上运行 func 函数。一般被用于累加操做,如:更新一个累加器(Accumulator ) 或者 和外部存储系统互操做。
注意:用 foreach 操做出累加器以外的变量可能致使未定义的行为。更详细请参考前面的“理解闭包”(Understanding closures )这一小节。

混洗操做

有一些Spark算子会触发众所周知的混洗(Shuffle)事件。Spark中的混洗机制是用于将数据从新分布,其结果是全部数据将在各个分区间从新分组。通常状况下,混洗须要跨执行器(executor)或跨机器复制数据,这也是混洗操做通常都比较复杂并且开销大的缘由。

背景

为了理解混洗阶段都发生了哪些事,我首先以 reduceByKey 算 子为例来看一下。reduceByKey算子会生成一个新的RDD,将源RDD中一个key对应的多个value组合进一个tuple - 而后将这些values输入给reduce函数,获得的result再和key关联放入新的RDD中。这个算子的难点在于对于某一个key来讲,并不是其对 应的全部values都在同一个分区(partition)中,甚至有可能都不在同一台机器上,可是这些values又必须放到一块儿计算reduce结 果。

在Spark中,一般是因为为了进行某种计算操做,而将数据分布到所须要的各个分区当中。而在计算阶段,单个任务(task)只会操做单个分区中的 数据 – 所以,为了组织好每一个reduceByKey中reduce任务执行时所需的数据,Spark须要执行一个多对多操做。即,Spark须要读取RDD的所 有分区,并找到全部key对应的全部values,而后跨分区传输这些values,并将每一个key对应的全部values放到同一分区,以便后续计算各 个key对应values的reduce结果 – 这个过程就叫作混洗(Shuffle)。

虽然混洗好后,各个分区中的元素和分区自身的顺序都是肯定的,可是分区中元素的顺序并不是肯定的。若是须要混洗后分区内的元素有序,能够参考使用如下混洗操做:

  • mapPartitions 使用 .sorted 对每一个分区排序 
  • repartitionAndSortWithinPartitions 重分区的同时,对分区进行排序,比自行组合repartition和sort更高效
  • sortBy 建立一个全局有序的RDD

会致使混洗的算子有:重分区(repartition)类算子,如: repartition 和 coalesceByKey 类算子(除了计数类的,如 countByKey) 如:groupByKey 和 reduceByKey;以及Join类算子,如:cogroup 和 join.

性能影响

混洗(Shuffle)之因此开销大,是由于混洗操做须要引入磁盘I/O,数据序列化以及网络I/O等操做。为了组织好混洗数据,Spark须要生 成对应的任务集 – 一系列map任务用于组织数据,再用一系列reduce任务来聚合数据。注意这里的map、reduce是来自MapReduce的术语,和Spark的 map、reduce算子并无直接关系。

在Spark内部,单个map任务的输出会尽可能保存在内存中,直至放不下为止。而后,这些输出会基于目标分区从新排序,并写到一个文件里。在reduce端,reduce任务只读取与之相关的并已经排序好的blocks。

某些混洗算子会致使很是明显的内存开销增加,由于这些算子须要在数据传输先后,在内存中维护组织数据记录的各类数据结构。特别 地,reduceByKey和aggregateByKey都会在map端建立这些数据结构,而ByKey系列算子都会在reduce端建立这些数据结 构。若是数据在内存中存不下,Spark会把数据吐到磁盘上,固然这回致使额外的磁盘I/O以及垃圾回收的开销。

混洗还会再磁盘上生成不少临时文件。以Spark-1.3来讲,这些临时文件会一直保留到其对应的RDD被垃圾回收才删除。之因此这样作,是由于如 果血统信息须要从新计算的时候,这些混洗文件能够没必要从新生成。若是程序持续引用这些RDD或者垃圾回收启动频率较低,那么这些垃圾回收可能须要等较长的 一段时间。这就意味着,长时间运行的Spark做业可能会消耗大量的磁盘。Spark的临时存储目录,是由spark.local.dir 配置参数指定的。

混洗行为能够由一系列配置参数来调优。参考Spark配置指南(Spark Configuration Guide)中“混洗行为”这一小节。

RDD持久化

Spark的一项关键能力就是它能够持久化(或者缓存)数据集在内存中,从而跨操做复用这些数据集。若是你持久化了一个RDD,那么每一个节点上都会 存储该RDD的一些分区,这些分区是由对应的节点计算出来并保持在内存中,后续能够在其余施加在该RDD上的action算子中复用(或者从这些数据集派 生新的RDD)。这使得后续动做的速度提升不少(一般高于10倍)。所以,缓存对于迭代算法和快速交互式分析是一个很关键的工具。

你能够用persist() 或者 cache() 来标记一下须要持久化的RDD。等到该RDD首次被施加action算子的时候,其对应的数据分区就会被保留在内存里。同时,Spark的缓存具有必定的 容错性 – 若是RDD的任何一个分区丢失了,Spark将自动根据其原来的血统信息从新计算这个分区。

另外,每一个持久化的RDD可使用不一样的存储级别,好比,你能够把RDD保存在磁盘上,或者以java序列化对象保存到内存里(为了省空间),或者跨节点多副本,或者使用 Tachyon 存 到虚拟机之外的内存里。这些存储级别均可以由persist()的参数StorageLevel对象来控制。cache() 方法自己就是一个使用默认存储级别作持久化的快捷方式,默认存储级别是 StorageLevel.MEMORY_ONLY(以java序列化方式存到内存里)。完整的存储级别列表以下:

存储级别 含义
MEMORY_ONLY 以未序列化的Java对象形式将RDD存储在JVM内存中。若是RDD不能所有装进内存,那么将一部分分区缓存,而另外一部分分区将每次用到时从新计算。这个是Spark的RDD的默认存储级别。
MEMORY_AND_DISK 以未序列化的Java对象形式存储RDD在JVM中。若是RDD不能所有装进内存,则将不能装进内存的分区放到磁盘上,而后每次用到的时候从磁盘上读取。
MEMORY_ONLY_SER 以序列化形式存储RDD(每一个分区一个字节数组)。一般这种方式比未序列化存储方式要更省空间,尤为是若是你选用了一个比较好的序列化协议(fast serializer),可是这种方式也相应的会消耗更多的CPU来读取数据。
MEMORY_AND_DISK_SER 和MEMORY_ONLY_SER相似,只是当内存装不下的时候,会将分区的数据吐到磁盘上,而不是每次用到都从新计算。
DISK_ONLY RDD数据只存储于磁盘上。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 和上面没有”_2″的级别相对应,只不过每一个分区数据会在两个节点上保存两份副本。
OFF_HEAP (实验性的) 将RDD以序列化格式保存到Tachyon。与 MEMORY_ONLY_SER相比,OFF_HEAP减小了垃圾回收开销,而且使执行器(executor)进程更小且能够共用同一个内存池,这一特性 在须要大量消耗内存和多Spark应用并发的场景下比较吸引人。并且,由于RDD存储于Tachyon中,因此一个执行器挂了并不会致使数据缓存的丢失。 这种模式下Tachyon 的内存是可丢弃的。所以,Tachyon并不会重建一个它逐出内存的block。若是你打算用Tachyon作为堆外存储,Spark和Tachyon具 有开箱即用的兼容性。请参考这里,有建议使用的Spark和Tachyon的匹配版本对:page

注意:在Python中存储的对象老是会使用 Pickle 作序列化,因此这时是否选择一个序列化级别已经可有可无了。

Spark会自动持久化一些混洗操做(如:reduceByKey)的中间数据,即使用户根本没有调用persist。这么作是为了不一旦有一个节点在混洗过程当中失败,就要重算整个输入数据。固然,咱们仍是建议对须要重复使用的RDD调用其persist算子。

如何选择存储级别?

Spark的存储级别主要可于在内存使用和CPU占用之间作一些权衡。建议根据如下步骤来选择一个合适的存储级别:

  • 若是RDD能使用默认存储级别(MEMORY_ONLY),那就尽可能使用默认级别。这是CPU效率最高的方式,全部RDD算子都能以最快的速度运行。
  • 若是步骤1的答案是否(不适用默认级别),那么能够尝试MEMORY_ONLY_SER级别,并选择一个高效的序列化协议(selecting a fast serialization library),这回大大节省数据对象的存储空间,同时速度也还不错。
  • 尽可能不要把数据吐到磁盘上,除非:1.你的数据集从新计算的代价很大;2.你的数据集是从一个很大的数据源中过滤获得的结果。不然的话,重算一个分区的速度极可能和从磁盘上读取差很少。
  • 若是须要支持容错,能够考虑使用带副本的存储级别(例如:用Spark来服务web请求)。全部的存储级别都可以以重算丢失数据的方式来提供容错性,可是带副本的存储级别可让你的应用持续的运行,而没必要等待重算丢失的分区。
  • 在一些须要大量内存或者并行多个应用的场景下,实验性的OFF_HEAP会有如下几个优点:
    • 这个级别下,能够容许多个执行器共享同一个Tachyon中内存池。
    • 能够有效地减小垃圾回收的开销。
    • 即便单个执行器挂了,缓存数据也不会丢失。

删除数据

Spark可以自动监控各个节点上缓存使用率,而且以LRU(最近常用)的方式将老数据逐出内存。若是你更喜欢手动控制的话,能够用RDD.unpersist() 方法来删除无用的缓存。

共享变量

通常而言,当咱们给Spark算子(如 map 或 reduce)传递一个函数时,这些函数将会在远程的集群节点上运行,而且这些函数所引用的变量都是各个节点上的独立副本。这些变量都会以副本的形式复制 到各个机器节点上,若是更新这些变量副本的话,这些更新并不会传回到驱动器(driver)程序。一般来讲,支持跨任务的可读写共享变量是比较低效的。不 过,Spark仍是提供了两种比较通用的共享变量:广播变量和累加器。

广播变量

广播变量提供了一种只读的共享变量,它是把在每一个机器节点上保存一个缓存,而不是每一个任务保存一份副本。一般能够用来在每一个节点上保存一个较大的输 入数据集,这要比常规的变量副本更高效(通常的变量是每一个任务一个副本,一个节点上可能有多个任务)。Spark还会尝试使用高效的广播算法来分发广播变 量,以减小通讯开销。

Spark的操做有时会有多个阶段(stage),不一样阶段之间的分割线就是混洗操做。Spark会自动广播各个阶段用到的公共数据。这些方式广播的数据都是序列化过的,而且在运行各个任务前须要反序列化。这也意味着,显示地建立广播变量,只有在跨多个阶段(stage)的任务须要一样的数据 或者 缓存数据的序列化和反序列化格式很重要的状况下 才是必须的。

广播变量能够经过一个变量v来建立,只需调用 SparkContext.broadcast(v)便可。这个广播变量是对变量v的一个包装,要访问其值,能够调用广播变量的 value 方法。代码示例以下:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

广播变量建立以后,集群中任何函数都不该该再使用原始变量v,这样才能保证v不会被屡次复制到同一个节点上。另外,对象v在广播后不该该再被更新, 这样才能保证全部节点上拿到一样的值(例如,更新后,广播变量又被同步到另外一新节点,新节点有可能获得的值和其余节点不同)。

累加器

累加器是一种只支持知足结合律的“累加”操做的变量,所以它能够很高效地支持并行计算。利用累加器能够实现计数(相似MapReduce中的计数 器)或者求和。Spark原生支持了数字类型的累加器,开发者也能够自定义新的累加器。若是建立累加器的时候给了一个名字,那么这个名字会展现在 Spark UI上,这对于了解程序运行处于哪一个阶段很是有帮助(注意:Python尚不支持该功能)。

创捷累加器时须要赋一个初始值v,调用 SparkContext.accumulator(v) 能够建立一个累加器。后续集群中运行的任务可使用 add 方法 或者 += 操做符 (仅Scala和Python支持)来进行累加操做。不过,任务自己并不能读取累加器的值,只有驱动器程序能够用 value 方法访问累加器的值。

如下代码展现了如何使用累加器对一个元素数组求和:

scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10

以上代码使用了Spark内建支持的Int型累加器,开发者也能够经过子类化 AccumulatorParam 来 自定义累加器。累加器接口(AccumulatorParam )主要有两个方法:1. zero:这个方法为累加器提供一个“零值”,2.addInPlace 将收到的两个参数值进行累加。例如,假设咱们须要为Vector提供一个累加机制,那么可能的实现方式以下:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

若是使用Scala,Spark还支持几种更通用的接口:1.Accumulable,这个接口能够支持所累加的数据类型与结果类型不一样(如:构建一个收集元素的list);2.SparkContext.accumulableCollection 方法能够支持经常使用的Scala集合类型。

对于在action算子中更新的累加器,Spark保证每一个任务对累加器的更新只会被应用一次,例如,某些任务若是重启过,则不会再次更新累加器。 而若是在transformation算子中更新累加器,那么用户须要注意,一旦某个任务由于失败被从新执行,那么其对累加器的更新可能会实施屡次。

累加器并不会改变Spark懒惰求值的运算模型。若是在RDD算子中更新累加器,那么其值只会在RDD作action算子计算的时候被更新一次。因 此,在transformation算子(如:map)中更新累加器,其值并不能保证必定被更新。如下代码片断说明了这一特性:

也不会进行实际的计算val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
// 这里,accum任然是0,由于没有action算子,因此map

部署到集群

应用提交指南(application submission guide)中描述了如何向集群提交应用。换句话说,就是你须要把你的应用打包成 JAR文件(Java/Scala)或者一系列 .py 或 .zip 文件(Python),而后再用 bin/spark-submit 脚本将其提交给Spark所支持的集群管理器。

从Java/Scala中启动Spark做业

org.apache.spark.launcher 包提供了简明的Java API,能够将Spark做业做为子进程启动。

单元测试

Spark对全部常见的单元测试框架提供友好的支持。你只须要在测试中建立一个SparkContext对象,而后吧master URL设为local,运行测试操做,最后调用 SparkContext.stop() 来中止测试。注意,必定要在 finally 代码块或者单元测试框架的 tearDown方法里调用SparkContext.stop(),由于Spark不支持同一程序中有多个SparkContext对象同时运行。

从1.0以前版本迁移过来

Spark 1.0 冻结了Spark Core 1.x 系列的核心API,只要是没有标记为 “experimental” 或者 “developer API”的API,在将来的版本中会一直支持。对于Scala用户来讲,惟一的变化就是分组相关的算子,如:groupByKey, cogroup, join,这些算子的返回类型由 (Key, Seq[Value]) 变为 (Key, Iterable[Value])。

更详细迁移向导请参考这里:Spark StreamingMLlib 以及 GraphX.

下一步

你能够去Spark的官网上看看示例程序(example Spark programs)。另外,Spark代码目录下也自带了很多例子,见 examples 目录(Scala,JavaPythonR)。你能够把示例中的类名传给 bin/run-example 脚原本运行这些例子;例如:

./bin/run-example SparkPi

若是须要运行Python示例,则须要使用 spark-submit 脚本:

./bin/spark-submit examples/src/main/python/pi.py

对R语言,一样也须要使用 spark-submit:

./bin/spark-submit examples/src/main/r/dataframe.R

配置(configuration)和调优(tuning)指南提供了很多最佳实践的信息,能够帮助你优化程序,特别是这些信息能够帮助你确保数据以一种高效的格式保存在内存里。集群模式概览(cluster mode overview)这篇文章描述了分布式操做中相关的组件,以及Spark所支持的各类集群管理器。

最后,完整的API文件见:ScalaJavaPython 以及 R.

 

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文连接地址: 《Spark 官方文档》Spark编程指南

相关文章
相关标签/搜索