Spark开发指南
简介
总的来讲,每个Spark的应用,都是由一个驱动程序(driver program)构成,它运行用户的main函数,在一个集群上执行各类各样的并行操做。Spark提出的最主要抽象概念是弹性分布式数据集 (resilient distributed dataset,RDD),它是一个元素集合,划分到集群的各个节点上,能够被并行操做。RDDs的建立能够从HDFS(或者任意其余支持Hadoop文件系统) 上的一个文件开始,或者经过转换驱动程序(driver program)中已存在的Scala集合而来。用户也可让Spark保留一个RDD在内存中,使其能在并行操做中被有效的重复使用。最后,RDD能自动从节点故障中恢复。
Spark的第二个抽象概念是共享变量(shared variables),能够在并行操做中使用。在默认状况下,Spark经过不一样节点上的一系列任务来运行一个函数,它将每个函数中用到的变量的拷贝传递到每个任务中。有时候,一个变量须要在任务之间,或任务与驱动程序之间被共享。Spark 支持两种类型的共享变量:广播变量,能够在内存的全部的结点上缓存变量;累加器:只能用于作加法的变量,例如计数或求和。
本指南将展现这些特性,并给出一些例子。读者最比如较熟悉Scala,尤为是闭包的语法。请留意,你也能够经过spark-shell脚本,来交互式地运行Spark。咱们建议你在接下来的步骤中这样作。
接入Spark
Spark 0.8.1 须要搭配使用 Scala 2.9.3. 若是你用Scala 来编写应用,你须要使用相同版本的Scala,更新的大版本极可能不兼容。
要写一个Spark 应用,你须要给它加上Spark的依赖。若是你使用SBT或者Maven,Spark能够经过Maven中心库来得到:
1 2 3 |
groupId = org.apache.spark artifactId = spark-core_2.9.3 version = 0.8.1-incubating |
另外,若是你想访问一个HDFS集群,你须要根据你的HDFS版本,添加一个hadoop-client的依赖:
1 2 3 |
groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version> |
对于其余编译系统,你能够经过运行sbt/sbt assembly来把Spark及其依赖打包到一个JAR(assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop*.jar)中,而后将其加入到你的CLASSPATH中。并按照这里的描述设置HDFS版本。
最后,你须要将一些Spark的类和隐式转换导入到你的程序中。经过以下语句:
1 2 |
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ |
Spark初始化
Spark程序须要作的第一件事情,就是建立一个SparkContext对象,它将告诉Spark如何访问一个集群。这个一般是经过下面的构造器来实现的:
1 |
new SparkContext(master, appName, [sparkHome], [jars]) |
master参数,是一个用于指定所链接的Spark or Mesos 集群URL的字符串,也能够是一个以下面所描述的用于在local模式运行的特殊字符串“local”。appName是你的应用的名称,将会在集群的Web监控UI中显示。最后,若是部署到集群,在分布式模式下运行,最后两个参数是必须的。后面会有具体的描述。
在Spark shell中,一个特殊的解释器感知的SparkContext已经为你建立好了,变量名是sc。建立你本身的SparkContext是不会生效的。你能够用MASTER环境变量来设置SparkContext链接到的master。也能够用ADD_JARS变量来将JARs加入到你的classpath。例如,若是在四核CPU上运行spark-shell,使用:
1 |
$ MASTER=local[4] ./spark-shell |
或者,同时在classpath中加入code.jar,使用:
1 |
$ MASTER=local[4] ADD_JARS=code.jar ./spark-shell |
Master URLs
传递给Spark的master URL能够是如下任一种形式:
Master URL | 含义 |
local | 使用一个Worker线程本地化运行SPARK(彻底不并行) |
local[K] | 使用K个Worker线程本地化运行Spark(理想状况下,K应该根据运行机器的CPU核数设定) |
spark://HOST:PORT | 链接到指定的Spark单机版集群(Spark standalone cluster)master。必须使用master所配置的接口,默认接口是7077. |
mesos://HOST:PORT | 链接到指定的Mesos集群。host参数是Moses master的hostname。必须使用master所配置的接口,默认接口是5050. |
若是没有指定的msater URL, spark shell 的默认值是“local”。
若是在YARN上运行,Spark会在YARN上,启动一个standalone部署的集群实例,查看 running on YARN得到更多详情。
在集群上部署代码
若是你要在集群上运行应用,你须要给SparkContext指定两个可选参数,使其能找到你的代码:
sparkHome:你的集群机器上Spark的安装路径(全部机器上路径必须一致)
jars: 在本地机器上的JAR文件列表,其中包括你应用的代码以及任何的依赖,Spark将会把他们部署到全部的集群结点上。你须要使用你的编译系统将你的应用打包成一系列JAR文件。例如,若是你使用SBT,用sbt-assembly插件将你的代码和全部依赖变成一个JAR文件是一个好的办法。
若是你在一个集群上运行spark-shell, 在启动以前你能够经过指定ADD_JAR环境变量将JAR文件们加载在集群上,这个变量须要包括一个用逗号分隔的JAR文件列表。例如,ADD_JARS=a.jar,b.jar ./spark-shell将启动一个在classpath中带有a.jar和b.jar的shell。另外,在shell中定义的任何新类,都会被自动分发出去。
弹性分布式数据集
Spark围绕的概念是弹性分布式数据集(RDD),这是一个有容错机制并能够被并行操做的元素集合。目前有两种类型的RDD:并行集合(Parallelized Collections):接收一个已经存在的Scala集合,而后进行各类并行计算。 Hadoop数据集(Hadoop Datasets):在一个文件的每条记录上运行函数。只要文件系统是HDFS,或者hadoop支持的任意存储系统便可。 这两种类型的RDD均可以经过相同的方式进行操做。
并行集合(Parallelized Collections)
并行集合是经过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上建立的(一个Seq对象)。集合的对象将会被拷贝,建立出一个能够被并行操做的分布式数据集。例如,下面的解释器输出,演示了如何从一个数组建立一个并行集合:
1 2 3 4 5 |
scala> val data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val distData = sc.parallelize(data) distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e |
一旦分布式数据集(distData)被建立好,它们将能够被并行操做。例如,咱们能够调用distData.reduce(_ +_)来将数组的元素相加。咱们会在后续的分布式数据集运算中进一步描述。
并行集合的一个重要参数是slices,表示数据集切分的份数。Spark将会在集群上为每一份数据起一个任务。典型地,你能够在集群的每一个CPU上分布2-4个slices. 通常来讲,Spark会尝试根据集群的情况,来自动设定slices的数目。然而,你也能够经过传递给parallelize的第二个参数来进行手动设置。(例如:sc.parallelize(data, 10)).
Hadoop数据集(Hadoop Datasets)
Spark能够从存储在HDFS,或者Hadoop支持的其它文件系统(包括本地文件,Amazon S3, Hypertable, HBase等等)上的文件建立分布式数据集。Spark能够支持TextFile,SequenceFiles以及其它任何Hadoop输入格式。(Python接口目前还不支持SequenceFile,很快会支持吧)
Text file的RDDs能够经过SparkContext’s textFile的方式建立,该方法接受一个文件的URI地址(或者机器上的一个本地路径,或者一个hdfs://, sdn://,kfs://,其它URI). 下面是一个调用例子:
1 2 |
scala> val distFile = sc.textFile("data.txt") distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08 |
一旦建立完成,distFile能够被进行数据集操做。例如,咱们能够经过使用以下的map和reduce操做:distFile.map(_.size).reduce(_ + _ )将全部数据行的长度相加。
textFile方法也能够经过输入一个可选的第二参数,来控制文件的分片数目。默认状况下,Spark为每一块文件建立一个分片(HDFS默认的块大小为64MB),可是你也能够经过传入一个更大的值,来指定一个更高的片值。注意,你不能指定一个比块数更小的片值(和Map数不能小于Block数同样,可是能够比它多)
对于SequenceFiles,可使用SparkContext的sequenceFile[K, V]方法建立,其中K和V是文件中的key和values的类型。像IntWritable和Text同样,它们必须是Hadoop的Writable interface的子类。另外,对于几种通用Writable类型,Spark容许你指定原生类型来替代。例如:sequencFile[Int, String]将会自动读取IntWritable和Texts。
最后,对于其余类型的Hadoop输入格式,你可使用SparkContext.hadoopRDD方法,它能够接收任意类型的JobConf和输入格式类,键类型和值类型。按照像Hadoop做业同样的方法,来设置输入源就能够了。
RDD 的操做
RDD支持两种操做:转换(transformation)从现有的数据集建立一个新的数据集;而动做(actions)在数据集上运行计算后,返回一个值给驱动程序。 例如,map就是一种转换,它将数据集每个元素都传递给函数,并返回一个新的分布数据集表示结果。另外一方面,reduce是一种动做,经过一些函数将全部的元素叠加起来,并将最终结果返回给Driver程序。(不过还有一个并行的reduceByKey,能返回一个分布式数据集)
Spark中的全部转换都是惰性的,也就是说,他们并不会直接计算结果。相反的,它们只是记住应用到基础数据集(例如一个文件)上的这些转换动做。只有当发生一个要求返回结果给Driver的动做时,这些转换才会真正运行。这个设计让Spark更加有效率的运行。例如,咱们能够实现:经过map建立的一个新数据集,并在reduce中使用,最终只返回reduce的结果给driver,而不是整个大的新数据集。
默认状况下,每个转换过的RDD都会在你在它之上执行一个动做时被从新计算。不过,你也可使用persist(或者cache)方法,持久化一个RDD在内存中。在这种状况下,Spark将会在集群中,保存相关元素,下次你查询这个RDD时,它将能更快速访问。在磁盘上持久化数据集,或在集群间复制数据集也是支持的,这些选项将在本文档的下一节进行描述。
下面的表格列出了目前所支持的转换和动做(详情请参见 RDD API doc):
转换(transformation)
转换 | 含义 |
map(func) | 返回一个新分布式数据集,由每个输入元素通过func函数转换后组成 |
filter(func) | 返回一个新数据集,由通过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 相似于map,可是每个输入元素能够被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 相似于map,但独立地在RDD的每个分块上运行,所以在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithSplit(func) | 相似于mapPartitions, 但func带有一个整数参数表示分块的索引值。所以在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) => Iterator[U] |
sample(withReplacement,fraction, seed) | 根据fraction指定的比例,对数据进行采样,能够选择是否用随机数进行替换,seed用于指定随机数生成器种子 |
union(otherDataset) | 返回一个新的数据集,新数据集是由源数据集和参数数据集联合而成 |
distinct([numTasks])) | 返回一个包含源数据集中全部不重复元素的新数据集 |
groupByKey([numTasks]) | 在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集 注意:默认状况下,只有8个并行任务来作操做,可是你能够传入一个可选的numTasks参数来改变它 |
reduceByKey(func, [numTasks]) | 在一个(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一块儿。相似groupByKey,reduce任务个数是能够经过第二个可选参数来配置的 |
sortByKey([ascending], [numTasks]) | 在一个(K,V)对的数据集上调用,K必须实现Ordered接口,返回一个按照Key进行排序的(K,V)对数据集。升序或降序由ascending布尔参数决定 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)类型的数据集上调用时,返回一个相同key对应的全部元素对在一块儿的(K, (V, W))数据集 |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的数据集上调用,返回一个 (K, Seq[V], Seq[W])元组的数据集。这个操做也能够称之为groupwith |
cartesian(otherDataset) | 笛卡尔积,在类型为 T 和 U 类型的数据集上调用时,返回一个 (T, U)对数据集(两两的元素对) |
完整的转换列表能够在RDD API doc中得到。
动做(actions)
动做 | 含义 |
reduce(func) | 经过函数func(接受两个参数,返回一个参数)汇集数据集中的全部元素。这个功能必须可交换且可关联的,从而能够正确的被并行执行。 |
collect() | 在驱动程序中,以数组的形式,返回数据集的全部元素。这一般会在使用filter或者其它操做并返回一个足够小的数据子集后再使用会比较有用。 |
count() | 返回数据集的元素的个数。 |
first() | 返回数据集的第一个元素(相似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组。注意,这个操做目前并不是并行执行,而是由驱动程序计算全部的元素 |
takeSample(withReplacement,num, seed) | 返回一个数组,在数据集中随机采样num个元素组成,能够选择是否用随机数替换不足的部分,Seed用于指定的随机数生成器种子 |
saveAsTextFile(path) | 将数据集的元素,以textfile的形式,保存到本地文件系统,HDFS或者任何其它hadoop支持的文件系统。对于每一个元素,Spark将会调用toString方法,将它转换为文件中的文本行 |
saveAsSequenceFile(path) | 将数据集的元素,以Hadoop sequencefile的格式,保存到指定的目录下,本地系统,HDFS或者任何其它hadoop支持的文件系统。这个只限于由key-value对组成,并实现了Hadoop的Writable接口,或者隐式的能够转换为Writable的RDD。(Spark包括了基本类型的转换,例如Int,Double,String,等等) |
countByKey() | 对(K,V)类型的RDD有效,返回一个(K,Int)对的Map,表示每个key对应的元素个数 |
foreach(func) | 在数据集的每个元素上,运行函数func进行更新。这一般用于边缘效果,例如更新一个累加器,或者和外部存储系统进行交互,例如HBase |
完整的转换列表能够在RDD API doc中得到。
RDD 的持久化
Spark最重要的一个功能,就是在不一样操做间,持久化(或缓存)一个数据集在内存中。当你持久化一个RDD,每个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动做中重用。这将使得后续的动做(Actions)变得更加迅速(一般快10倍)。缓存是用Spark构建迭代算法的关键。
你能够用persist()或cache()方法来标记一个要被持久化的RDD,而后一旦首次被一个动做(Action)触发计算,它将会被保留在计算结点的内存中并重用。Cache有容错机制,若是RDD的任一分区丢失了,经过使用原先建立它的转换操做,它将会被自动重算(不须要所有重算,只计算丢失的部分)。
此外,每个RDD均可以用不一样的保存级别进行保存,从而容许你持久化数据集在硬盘,或者在内存做为序列化的Java对象(节省空间),甚至于跨结点复制。这些等级选择,是经过将一个org.apache.spark.storage.StorageLevel对象传递给persist()方法进行肯定。cache()方法是使用默认存储级别的快捷方法,也就是StorageLevel.MEMORY_ONLY(将反序列化的对象存入内存)。
完整的可选存储级别以下:
存储级别 | 意义 |
MEMORY_ONLY | 将RDD做为反序列化的的对象存储JVM中。若是RDD不能被内存装下,一些分区将不会被缓存,而且在须要的时候被从新计算。这是是默认的级别 |
MEMORY_AND_DISK | 将RDD做为反序列化的的对象存储在JVM中。若是RDD不能被与内存装下,超出的分区将被保存在硬盘上,而且在须要时被读取 |
MEMORY_ONLY_SER | 将RDD做为序列化的的对象进行存储(每一分区占用一个字节数组)。一般来讲,这比将对象反序列化的空间利用率更高,尤为当使用fast serializer,但在读取时会比较占用CPU |
MEMORY_AND_DISK_SER | 与MEMORY_ONLY_SER类似,可是把超出内存的分区将存储在硬盘上而不是在每次须要的时候从新计算 |
DISK_ONLY | 只将RDD分区存储在硬盘上 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 与上述的存储级别同样,可是将每个分区都复制到两个集群结点上 |
存储级别的选择
Spark的不一样存储级别,旨在知足内存使用和CPU效率权衡上的不一样需求。咱们建议经过如下的步骤来进行选择:
若是你的RDDs能够很好的与默认的存储级别(MEMORY_ONLY)契合,就不须要作任何修改了。这已是CPU使用效率最高的选项,它使得RDDs的操做尽量的快。
若是不行,试着使用MEMORY_ONLY_SER而且选择一个快速序列化的库使得对象在有比较高的空间使用率的状况下,依然能够较快被访问。
尽量不要存储到硬盘上,除非计算数据集的函数,计算量特别大,或者它们过滤了大量的数据。不然,从新计算一个分区的速度,和与从硬盘中读取基本差很少快。
若是你想有快速故障恢复能力,使用复制存储级别(例如:用Spark来响应web应用的请求)。全部的存储级别都有经过从新计算丢失数据恢复错误的容错机制,可是复制存储级别可让你在RDD上持续的运行任务,而不须要等待丢失的分区被从新计算。
若是你想要定义你本身的存储级别(好比复制因子为3而不是2),可使用StorageLevel 单例对象的apply()方法。
共享变量
通常来讲,当一个函数被传递给Spark操做(例如map和reduce),在一个远程集群上运行,它实际上操做的是这个函数用到的全部变量的独立拷贝。这些变量会被拷贝到每一台机器,在远程机器上对变量的全部更新都不会被传播回驱动程序。一般看来,在任务之间中,读写共享变量显然不够高效。然而,Spark仍是为两种常见的使用模式,提供了两种有限的共享变量:广播变量和累加器。
广播变量
广播变量容许程序员保留一个只读的变量,缓存在每一台机器上,而非每一个任务保存一份拷贝。他们能够这样被使用,例如,以一种高效的方式给每一个结点一个大的输入数据集。Spark会尝试使用一种高效的广播算法来传播广播变量,从而减小通讯的代价。
广播变量是经过调用SparkContext.broadcast(v)方法从变量v建立的。广播变量是一个v的封装器,它的值能够经过调用value方法得到。以下模块展现了这个:
1 2 3 4 5 |
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3) |
在广播变量被建立后,它应该在集群运行的任何函数中,代替v值被调用,从而v值不须要被再次传递到这些结点上。另外,对象v不能在广播后修改,这样能够保证全部结点的收到的都是如出一辙的广播值。
累加器
累加器是一种只能经过关联操做进行“加”操做的变量,所以能够高效被并行支持。它们能够用来实现计数器(如MapReduce中)和求和器。Spark原生就支持Int和Double类型的累加器,开发者能够本身添加新的支持类型。
一个累加器能够经过调用SparkContext.accumulator(v)方法从一个初始值v中建立。运行在集群上的任务,能够经过使用+=来给它加值。然而,他们不能读取这个值。只有驱动程序可使用value的方法来读取累加器的值。
以下的解释器模块,展现了如何利用累加器,将一个数组里面的全部元素相加:
1 2 3 4 5 6 7 8 9 |
scala> val accum = sc.accumulator(0) accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value res2: Int = 10 |
更多信息
你能够在Spark的网站上看到spark程序的样例。Spark还在examples/src/main/scala上收入了一些例子,其中一些既有Spark版本,又有本地(非并行)版本。这些案例让你看到要让程序以集群化的方式跑起来的话,须要作什么修改。你能够经过将类名传递给spark中的run-example脚原本运行它们,例如:
1 |
./run-example org.apache.spark.examples.SparkPi |
任何样例程序在运行时若是没有提供任何参数,都会打印使用帮助。
当须要优化程序的帮助,configuration 和tuning指导提供了最佳实践信息。它们对于确保你的数据以高效的格式存储在内存中,相当重要。