转载自:http://blog.csdn.net/sdujava2011/article/details/46878153?utm_source=tuicoolhtml
英文地址:https://spark.apache.org/docs/latest/programming-guide.htmljava
Spark编程指南V1.4.0python
· 简介程序员
· 接入Sparkweb
· Spark初始化算法
· 使用Shellshell
· 在集群上部署代码apache
· 弹性分布式数据集编程
· 并行集合(Parallelized Collections)api
· 其余数据集
· RDD的操做
· 基础操做
· 向Spark传递函数
· 处理键值对
· 转换
· 动做
· RDD的持久化
· 存储级别的选择
· 移除数据
· 共享变量
· 广播变量
· 累加器
· 部署到一个集群上
· 单元测试
· 从1.0以前版本的Spark迁移
· 下一步该怎么作
简介
总的来讲,每个Spark的应用,都是由一个驱动程序(driver program)构成,它运行用户的main函数,在一个集群上执行各类各样的并行操做。Spark提出的最主要抽象概念是弹性分布式数据集 (resilientdistributed dataset,RDD),它是一个元素集合,划分到集群的各个节点上,能够被并行操做。RDDs的建立能够从HDFS(或者任意其余支持Hadoop文件系统)上的一个文件开始,或者经过转换驱动程序(driver program)中已存在的Scala集合而来。用户也可让Spark保留一个RDD在内存中,使其能在并行操做中被有效的重复使用。最后,RDD能自动从节点故障中恢复。
Spark的第二个抽象概念是共享变量(shared variables),能够在并行操做中使用。在默认状况下,Spark经过不一样节点上的一系列任务来运行一个函数,它将每个函数中用到的变量的拷贝传递到每个任务中。有时候,一个变量须要在任务之间,或任务与驱动程序之间被共享。Spark支持两种类型的共享变量:广播变量,能够在内存的全部的结点上缓存变量;累加器:只能用于作加法的变量,例如计数或求和。
本指南将用每一种Spark支持的语言来展现这些特性。这都是很容易来跟着作的若是你启动了Spark的交互式Shell或者Scala的bin/spark-shell或者Python的bin/pyspark。
接入Spark
Scala
Spark1.2.1须要和Scala2.10一块儿使用。若是你要用Scala来编写应用,你须要用一个相应版本的Scala(例如2.10.X)。
要写一个Spark应用程序,你须要在添加Spark的Maven依赖,Spark能够经过Maven中心库来得到:
除此以外,若是你想访问一个HDFS集群,你须要根据你的HDFS版本,添加一个hadoop-client的依赖。一些通用的HDFS版本标签在第三方发行版页面列出。
最后,你须要将一些Spark的类和隐式转换导入到你的程序中。经过以下语句:
Java
Spark1.2.1须要运行在Java6及更高版本上。若是你正在使用Java8,Spark支持使用Lambda表达式简洁地编写函数,或者你可使用在org.apache.spark.api.java.function包中的类。
要使用Java编写Spark应用程序,你须要添加一个Spark的依赖。Spark能够经过Maven中心库得到:
此外,若是你想访问一个HDFS集群,你须要根据你的HDFS版本,添加一个hadoop-client的依赖。一些通用的HDFS版本标签在第三方发行版页面列出。
最后,你须要将Spark的类导入到你的程序中。添加以下行:
Python
Spark1.2.1须要和Python2.6或者更高的版本(但不是Python3)一块儿使用。它使用标准的CPython解释器,所以像NumPy这类的C语言库能够用。要用Python的方式运行Spark应用程序,可使用在Spark目录下的bin/spark-submit脚本。这个脚本会装载Spark的Java和Scala库并容许你将程序提交到集群。你也可使用bin/pyspark来启动一个交互式Python Shell。
若是你想要访问HDFS数据,你须要根据你的HDFS版本使用一个PySpark的构建。一些通用的HDFS版本标签在第三方发行版页面列出。针对通用的HDFS版本的预先构建的包在Spark主页上也是可得到。
最后,你须要导入一些Spark相关的类到你的程序中。添加以下的行:
from pyspark import SparkContext, SparkConf
初始化Spark
Scala
Spark程序须要作的第一件事情,就是建立一个SparkContext对象,它将告诉Spark如何访问一个集群。要建立一个SparkContext你首先须要创建一个SparkConf对象,这个对象包含你的程序的信息。
每一个JVM只能有一个活动的SparkContext。在建立一个新的SparkContext以前你必须stop()活动的SparkContext。
appName是你的应用的名称,将会在集群的Web监控UI中显示。master参数,是一个用于指定所链接的Spark,Mesos or Mesos 集群URL的字符串,也能够是一个以下面所描述的用于在local模式运行的特殊字符串“local”。在实践中,当运行在一个集群上时,你不会想把master硬编码到程序中,而是启动spark-submit来接收它。然而,对于本地测试和单元测试,你能够经过“local”模式运行Spark。
Java
Spark程序须要作的第一件事情,就是建立一个JavaSparkContext对象,它将告诉Spark如何访问一个集群。要建立一个SparkContext你首先须要创建一个SparkConf对象,这个对象包含你的程序的信息。
appName是你的应用的名称,将会在集群的Web监控UI中显示。master参数,是一个用于指定所链接的Spark,Mesos or Mesos 集群URL的字符串,也能够是一个以下面所描述的用于在local模式运行的特殊字符串“local”。在实践中,当运行在一个集群上时,你不会想把master硬编码到程序中,而是启动spark-submit来接收它。然而,对于本地测试和单元测试,你能够经过“local”模式运行Spark。
Python
Spark程序须要作的第一件事情,就是建立一个JavaSparkContext对象,它将告诉Spark如何访问一个集群。要建立一个SparkContext你首先须要创建一个SparkConf对象,这个对象包含你的程序的信息。
appName是你的应用的名称,将会在集群的Web监控UI中显示。master参数,是一个用于指定所链接的Spark,Mesos or Mesos 集群URL的字符串,也能够是一个以下面所描述的用于在local模式运行的特殊字符串“local”。在实践中,当运行在一个集群上时,你不会想把master硬编码到程序中,而是启动spark-submit来接收它。然而,对于本地测试和单元测试,你能够经过“local”模式运行Spark。
使用Shell
Scala
在Spark shell中,一个特殊的解释器感知的SparkContext已经为你建立好了,变量名叫作sc。建立本身的SparkContext将不会生效。你可使用-master参数设置context链接到那个master,而且你可使用-jars参数把用逗号分隔的一个jar包列表添加到classpath中。例如,若是在四核CPU上运行spark-shell,使用:
或者,同时在classpath中加入code.jar,使用:
想要得到完整的选项列表,运行spark-shell –help。在背后,spark-shell调用更通常的spark-submit脚本。
Python
在PySpark shell中,一个特殊的解释器感知的SparkContext已经为你建立好了,变量名叫作sc。建立本身的SparkContext将不会生效。你可使用-master参数设置context链接到那个master,而且你可使用—py-files参数把用逗号分隔的一个Python .zip,.egg或者.py文件列表添加到classpath中。例如,若是在四核CPU上运行bin/pyspark,使用:
或者,同时将code.py添加到搜索路径中(为了之后使用import code),使用:
想要得到完整的选项列表,运行pyspark –help。在背后,pyspark调用更通常的spark-submit脚本。
也能够在IPython中启动Pyspark shell,一个加强的Python解释器。PySpark要使用IPython1.0.0及其以后的版本。要使用IPython,当运行bin/pyspark时要设置PYSPARK_DRIVER_PYTHON变量为ipython:
你能够经过设置PYSPARK_DRIVER_PYTHON_OPTS参数来自定义ipython命令。例如,启动有PyLab支持的IPython Notebook支持:
弹性分布式数据集(RDDs)
Spark围绕的概念是弹性分布式数据集(RDD),是一个有容错机制并能够被并行操做的元素集合。目前有两种建立RDDs的方法:并行化一个在你的驱动程序中已经存在的集合,或者引用在外部存储系统上的数据集,例如共享文件系统,HDFS,HBase,或者任何以Hadoop输入格式提供的数据源。
并行集合
Scala
并行集合是经过调用SparkContext的parallelize方法,在一个已经存在的集合上建立的(一个Scala Seq对象)。集合的对象将会被拷贝,建立出一个能够被并行操做的分布式数据集。例如,下面展现了怎样建立一个含有数字1到5的并行集合:
一旦建立了分布式数据集(distData),就能够对其执行并行操做。例如,咱们能够调用distData.reduce((a,b)=>a+b)来累加数组的元素。后续咱们会进一步地描述对分布式数据集的操做。
并行集合的一个重要参数是分区数(the number of partitions),表示数据集切分的份数。Spark将在集群上为每一个分区数据起一个任务。典型状况下,你但愿集群的每一个CPU分布2-4个分区(partitions)。一般,Spark会尝试基于集群情况自动设置分区数。然而,你也能够进行手动设置,经过将分区数做为第二个参数传递给parallelize方法来实现。(例如:sc.parallelize(data,10))。注意:代码中的一些地方使用属于“分片(分区的近义词)”来保持向后兼容。
Java
并行集合是经过对存在于驱动程序中的集合调用JavaSparkContext的parallelize方法来构建的。构建时会拷贝集合中的元素,建立一个能够被并行操做的分布式数据集。例如,这里演示了如何建立一个包含数字1到5的并行集合:
一旦建立了分布式数据集(distData),就能够对其执行并行操做。例如,咱们能够调用distData.reduce((a,b)=>a+b)来累加数组的元素。后续咱们会进一步地描述对分布式数据集的操做。
注意:在本指南中,咱们会常用简洁地Java8的lambda语法来指明Java函数,而在Java的旧版本中,你能够实现org.apache.spark.api.java.function包中的接口。下面咱们将在把函数传递到Spark中描述更多的细节。
并行集合的一个重要参数是分区数(the number of partitions),表示数据集切分的份数。Spark将在集群上为每一个分区数据起一个任务。典型状况下,你但愿集群的每一个CPU分布2-4个分区(partitions)。一般,Spark会尝试基于集群情况自动设置分区数。然而,你也能够进行手动设置,经过将分区数做为第二个参数传递给parallelize方法来实现。(例如:sc.parallelize(data,10))。注意:代码中的一些地方使用属于“分片(分区的近义词)”来保持向后兼容。
Python
并行集合是经过对存在于驱动程序中的迭代器(iterable)或集合(collection),调用SparkContext的parallelize方法来构建的。构建时会拷贝迭代器或集合中的元素,建立一个能够被并行操做的分布式数据集。例如,这里演示了如何建立一个包含数字1到5的并行集合:
一旦建立了分布式数据集(distData),就能够对其执行并行操做。例如,咱们能够调用distData.reduce(lambda a,b:a+b)来累加列表的元素。后续咱们会进一步地描述对分布式数据集的操做。
并行集合的一个重要参数是分区数(the number of partitions),表示数据集切分的份数。Spark将在集群上为每一个分区数据起一个任务。典型状况下,你但愿集群的每一个CPU分布2-4个分区(partitions)。一般,Spark会尝试基于集群情况自动设置分区数。然而,你也能够进行手动设置,经过将分区数做为第二个参数传递给parallelize方法来实现。(例如:sc.parallelize(data,10))。注意:代码中的一些地方使用属于“分片(分区的近义词)”来保持向后兼容。
外部数据集
Scala
Spark能够从Hadoop支持的任何存储源中构建出分布式数据集,包括你的本地文件系统,HDFS,Cassandre,HBase,Amazon S3等。Spark支持text files,Sequence files,以及其余任何一种Hadoop InputFormat。
Text file RDDs的建立可使用SparkContext的textFile方法。该方法接受一个文件的URI地址(或者是机器上的一个本地路径,或者是一个hdfs://,s3n://等URI)做为参数,并读取文件的每一行数据,放入集合中,下面是一个调用例子:
一旦建立完成,就能够在distFile上执行数据集操做。例如,要相对全部行的长度进行求和,咱们能够经过以下的map和reduce操做来完成:
distFile.map(s => s.length).reduce((a, b)=> a + b)
Spark读文件时的一些注意事项:
1. 若是文件使用本地文件系统上的路径,那么该文件必须在工做节点的相同路径下也能够访问。能够将文件拷贝到全部的worker节点上,或者使用network-mounted共享文件系统。
2. Spark的全部基于文件的输入方法,包括textFile,支持在目录上运行,压缩文件和通配符。例如,你可使用textFile(”/my/directory”),textFile(“/my/directory/*.txt”),和textFile(“/my/directory/*.gz”)。
3. textFile方法也带有可选的第二个参数,用于控制文件的分区数。默认状况下,Spark会为文件的每个block建立一个分区,可是你也能够经过传入更大的值,来设置更高的分区数。注意,你设置的分区数不能比文件的块数小。
除了text文件,Spark的Scala API也支持其余几种数据格式:
1. SparkContext.wholeTextFiles可让你读取包含多个小text文件的目录,而且每一个文件对应返回一个(filename,content)对。而对应的textFile方法,文件的每一行对应返回一条记录(record)。
2. 对于Sequence文件,使用SparkContext的sequenceFile[K,V]方法,其中K和V分别对应文件中key和values的类型。这些类型必须是Hadoop的Writable接口的子类,如IntWritable和Text。另外,Spark容许你使用一些常见的Writables的原生类型;例如,sequenceFile[Int,String]会自动的转换为类型IntWritables和Texts。
3. 对于其余的Hadoop InputFormats,你可使用SparkContext.hadoopRDD方法,它能够接受一个任意类型的JobConf和输入格式类,key类和value类。像Hadoop Job设置输入源那样去设置这些参数便可。对基于“新”的MapReduce API(org.apache.hadoop.mapreduce)的InputFormats,你也可使用SparkContex.newHadoopRDD。
4. RDD.saveAsObjectFile和SparkContext.objectFile支持由序列化的Java对象组成的简单格式来保存RDD。虽然这不是一种像Avro那样有效的序列化格式,可是她提供了一种能够存储任何RDD的简单方式。
Java
Spark能够从Hadoop支持的任何存储源中构建出分布式数据集,包括你的本地文件系统,HDFS,Cassandre,HBase,Amazon S3等。Spark支持text files,Sequence files,以及其余任何一种Hadoop InputFormat。
Text file RDDs的建立可使用SparkContext的textFile方法。该方法接受一个文件的URI地址(或者是机器上的一个本地路径,或者是一个hdfs://,s3n://等URI)做为参数,并读取文件的每一行数据,放入集合中,下面是一个调用例子:
JavaRDD<String> distFile =sc.textFile("data.txt");
一旦建立完成,就能够在distFile上执行数据集操做。例如,要相对全部行的长度进行求和,咱们能够经过以下的map和reduce操做来完成:
distFile.map(s -> s.length()).reduce((a, b)-> a + b)
Spark读文件时的一些注意事项:
1. 若是文件使用本地文件系统上的路径,那么该文件必须在工做节点的相同路径下也能够访问。能够将文件拷贝到全部的worker节点上,或者使用network-mounted共享文件系统。
2. Spark的全部基于文件的输入方法,包括textFile,支持在目录上运行,压缩文件和通配符。例如,你可使用textFile(”/my/directory”),textFile(“/my/directory/*.txt”),和textFile(“/my/directory/*.gz”)。
3. textFile方法也带有可选的第二个参数,用于控制文件的分区数。默认状况下,Spark会为文件的每个block建立一个分区,可是你也能够经过传入更大的值,来设置更高的分区数。注意,你设置的分区数不能比文件的块数小。
除了text文件,Spark的Java API也支持其余几种数据格式:
1. JavaSparkContext.wholeTextFiles可让你读取包含多个小text文件的目录,而且每一个文件对应返回一个(filename,content)对。而对应的textFile方法,文件的每一行对应返回一条记录(record)。
2. 对于Sequence文件,使用SparkContext的sequenceFile[K,V]方法,其中K和V分别对应文件中key和values的类型。这些类型必须是Hadoop的Writable接口的子类,如IntWritable和Text。另外,Spark容许你使用一些常见的Writables的原生类型;例如,sequenceFile[Int,String]会自动的转换为类型IntWritables和Texts。
3. 对于其余的Hadoop InputFormats,你可使用JavaSparkContext.hadoopRDD方法,它能够接受一个任意类型的JobConf和输入格式类,key类和value类。像Hadoop Job设置输入源那样去设置这些参数便可。对基于“新”的MapReduce API(org.apache.hadoop.mapreduce)的InputFormats,你也可使用JavaSparkContex.newHadoopRDD。
4. JavaRDD.saveAsObjectFile和JavaSparkContext.objectFile支持由序列化的Java对象组成的简单格式来保存RDD。虽然这不是一种像Avro那样有效的序列化格式,可是她提供了一种能够存储任何RDD的简单方式。
Python
PySpark能够从Hadoop支持的任何存储源中构建出分布式数据集,包括你的本地文件系统,HDFS,Cassandre,HBase,Amazon S3等。Spark支持text files,Sequence files,以及其余任何一种Hadoop InputFormat。
Text file RDDs的建立可使用SparkContext的textFile方法。该方法接受一个文件的URI地址(或者是机器上的一个本地路径,或者是一个hdfs://,s3n://等URI)做为参数,并读取文件的每一行数据,放入集合中,下面是一个调用例子:
>>> distFile =sc.textFile("data.txt")
一旦建立完成,就能够在distFile上执行数据集操做。例如,要相对全部行的长度进行求和,咱们能够经过以下的map和reduce操做来完成:
distFile.map(lambda s: len(s)).reduce(lambda a,b: a + b)
Spark读文件时的一些注意事项:
1. 若是文件使用本地文件系统上的路径,那么该文件必须在工做节点的相同路径下也能够访问。能够将文件拷贝到全部的worker节点上,或者使用network-mounted共享文件系统。
2. Spark的全部基于文件的输入方法,包括textFile,支持在目录上运行,压缩文件和通配符。例如,你可使用textFile(”/my/directory”),textFile(“/my/directory/*.txt”),和textFile(“/my/directory/*.gz”)。
3. textFile方法也带有可选的第二个参数,用于控制文件的分区数。默认状况下,Spark会为文件的每个block建立一个分区,可是你也能够经过传入更大的值,来设置更高的分区数。注意,你设置的分区数不能比文件的块数小。
除了text文件,Spark的Python API也支持其余几种数据格式:
1. JavaSparkContext.wholeTextFiles可让你读取包含多个小text文件的目录,而且每一个文件对应返回一个(filename,content)对。而对应的textFile方法,文件的每一行对应返回一条记录(record)。
2. RDD.saveAsPickleFile和SparkContext.pickleFile支持由pickled Python对象组成的简单格式保存RDD。使用批量的方式处理pickle模块的对象序列化,默认批处理大小为10.
3. SequenceFile和Hadoop输入/输出格式
注意,此功能当前标识为试验性的,是为高级用户而提供的。在未来的版本中,可能会由于支持基于SparkSQL的读写而被取代,在这种状况下,SparkSQL是首选的方法。
Writable支持
PySpark的SequenceFile支持加载Java中的键值(key-value)对RDD,能够将Writable转换为基本的Java类型,而且经过Pyrolite,在结果Java对象上执行pickles序列化操做。当将一个键值对的RDD保存为SequenceFIle时,PySpark会对其进行反操做。它会unpickles Python的对象为Java对象,而后再将它们转换为Writables。下表中的Writables会被自动地转换:
Writable Type |
Python Type |
Text |
unicode str |
IntWritable |
int |
FloatWritable |
float |
DoubleWritable |
float |
BooleanWritable |
bool |
BytesWritable |
bytearray |
NullWritable |
None |
MapWritable |
dict |
数组不支持开箱(out-of-the-box)处理。当读或写数组时,用户须要指定自定义的ArrayWritable子类。当写数组时,用户也须要指定自定义的转换器(converters),将数组转换为自定义的ArrayWritable子类。当读数组时,默认的转换器会将自定义的ArrayWritable子类转换为Java的Object[],而后被pickled成Python的元组。若是要获取包含基本数据类型的数组,Python的array.array的话,用户须要为该数组指定自定义的转换器。
保存和加载SequenFiles
相似于text files,SequenceFiles能够被保存和加载到指定的路径下。能够指定key和value的类型,但对标准的Writables类型则不须要指定。
保存和加载其余的Hadoop输入/输出格式
PySpark也能够读任何Hadoop InputFormat或者写任何Hadoop OutputFormat,包括“新”和“旧”两个Hadoop MapReduce APIs。若是须要的话,能够将传递进来的一个Hadoop配置当成一个Python字典。这里有一个使用了Elasticsearch ESInputFormat的样例:
注意,若是这个InputFormat只是简单地依赖于Hadoop配置和/或输入路径,以及key和value的类型,它就能够很容易地根据上面的表格进行转换,那么这种方法应该能够很好地处理这些状况。
若是你有一个定制序列化的二进制数据(好比加载自Cassandra/HBase的数据),那么你首先要作的,是在Scala/Java侧将数据转换为能够用Pyrolite的pickler处理的东西。Converter特质提供了这一转换功能。简单地extend该特质,而后在convert方法中实现你本身的转换代码。记住要确保该类,以及访问你的InputFormat所需的依赖,都须要被打包到你的Spark做业的jar包,而且包含在PySpark的类路径中。
在Python样例和Converter样例上给出了带自定义转换器的Cassandra/HBase的InputFormat和OutputFormat的使用样例。
RDD操做
RDDs支持两种操做:转换(transformations),能够从已有的数据集建立一个新的数据集;而动做(actions),在数据集上运行计算后,会向驱动程序返回一个值。例如,map就是一种转换,它将数据集每个元素都传递给函数,并返回一个新的分布数据集来表示结果。另外一方面,reduce是一种动做,经过一些函数将全部的元素聚合起来,并将最终结果返回给驱动程序(不过还有一个并行的reduceByKey,能返回一个分布式数据集)。
Spark中的全部转换都是惰性的,也就是说,它们并不会立刻计算结果。相反的,它们只是记住应用到基础数据集(例如一个文件)上的这些转换动做。只有当发生一个要求返回结果给驱动程序的动做时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。例如,咱们对map操做建立的数据集进行reduce操做时,只会向驱动返回reduce操做的结果,而不是返回更大的map操做建立的数据集。
默认状况下,每个转换过的RDD都会在你对它执行一个动做时被从新计算。不过,你也可使用持久化或者缓存方法,把一个RDD持久化到内存中。在这种状况下,Spark会在集群中保存相关元素,以便你下次查询这个RDD时,能更快速地访问。对于把RDDs持久化到磁盘上,或在集群中复制到多个节点也是支持的。
基础操做
Scala
为了描述RDD的基础操做,能够考虑下面的简单程序:
第一行经过一个外部文件定义了一个基本的RDD。这个数据集未被加载到内存,也未在上面执行操做:lines仅仅指向这个文件。第二行定义了lineLengths做为map转换结果。此外,因为惰性,不会当即计算lineLengths。最后,咱们运行reduce,这是一个动做。这时候,Spark才会将这个计算拆分红不一样的task,并运行在独立的机器上,而且每台机器运行它本身的map部分和本地的reducatin,仅仅返回它的结果给驱动程序。
若是咱们但愿之后能够复用lineLengths,能够添加:
lineLengths.persist()
在reduce以前,这将致使lineLengths在第一次被计算以后,被保存在内存中。
Java
为了描述RDD的基础操做,能够考虑下面的简单程序:
第一行经过一个外部文件定义了一个基本的RDD。这个数据集未被加载到内存,也未在上面执行操做:lines仅仅指向这个文件。第二行定义了lineLengths做为map转换结果。此外,因为惰性,不会当即计算lineLengths。最后,咱们运行reduce,这是一个动做。这时候,Spark才会将这个计算拆分红不一样的task,并运行在独立的机器上,而且每台机器运行它本身的map部分和本地的reducatin,仅仅返回它的结果给驱动程序。
若是咱们但愿之后能够复用lineLengths,能够添加:
lineLengths.persist();
在reduce以前,这将致使lineLengths在第一次被计算以后,被保存在内存中。
Python
为了描述RDD的基础操做,能够考虑下面的简单程序:
第一行经过一个外部文件定义了一个基本的RDD。这个数据集未被加载到内存,也未在上面执行操做:lines仅仅指向这个文件。第二行定义了lineLengths做为map转换结果。此外,因为惰性,不会当即计算lineLengths。最后,咱们运行reduce,这是一个动做。这时候,Spark才会将这个计算拆分红不一样的task,并运行在独立的机器上,而且每台机器运行它本身的map部分和本地的reducatin,仅仅返回它的结果给驱动程序。
若是咱们但愿之后能够复用lineLengths,能够添加:
lineLengths.persist()
在reduce以前,这将致使lineLengths在第一次被计算以后,被保存在内存中。
把函数传递到Spark
Scala
Spark的API,在很大程度上依赖于把驱动程序中的函数传递到集群上运行。这有两种推荐的实现方式:
●使用匿名函数的语法,这能够用来替换简短的代码。
●使用全局单例对象的静态方法。好比,你能够定义函数对象objectMyFunctions,而后传递该对象的方法MyFunction.func1,以下所示:
注意:因为可能传递的是一个类实例方法的引用(而不是一个单例对象),在传递方法的时候,应该同时传递包含该方法的对象。好比,考虑:
这里,若是咱们建立了一个类实例new MyClass,而且调用了实例的doStuff方法,该方法中的map处调用了这个MyClass实例的func1方法,因此须要将整个对象传递到集群中。相似于写成:rdd.map(x=>this.func1(x))。
相似地,访问外部对象的字段时将引用整个对象:
等同于写成rdd.map(x=>this.field+x),引用了整个this。为了不这种问题,最简单的方式是把field拷贝到本地变量,而不是去外部访问它:
Java
Spark的API,在很大程度上依赖于把驱动程序中的函数传递到集群上运行。在Java中,函数由那些实现了org.apache.spark.api.java.function包中的接口的类表示。有两种建立这样的函数的方式:
●在你本身的类中实现Function接口,能够是匿名内部类,或者命名类,而且传递类的一个实例到Spark。
●在Java8中,使用lambda表达式来简明地定义函数的实现。
为了保持简洁性,本指南中大量使用了lambda语法,这在长格式中很容易使用全部相同的APIs。好比,咱们能够把上面的代码写成:
或者,若是不方便编写内联函数的话,能够写成:
注意,Java中的匿名内部类也能够访问封闭域中的变量,只要这些变量标识为final便可。Spark会像处理其余语言同样,将这些变量拷贝到每一个工做节点上。
Python
Spark的API,在很大程度上依赖于把驱动程序中的函数传递到集群上运行。有三种推荐方法可使用:
●使用Lambda表达式来编写能够写成一个表达式的简单函数(Lambdas不支持没有返回值的多语句函数或表达式)。
●Spark调用的函数中的Local defs,能够用来代替更长的代码。
●模块中的顶级函数。
例如,若是想传递一个支持使用lambda表达式的更长的函数,能够考虑如下代码:
注意:因为可能传递的是一个类实例方法的引用(而不是一个单例对象(singleton object)),在传递方法的时候,应该同时传递包含该方法的对象。好比,考虑:
这里,若是咱们建立了一个类实例new MyClass,而且调用了实例的doStuff方法,该方法中的map处调用了这个MyClass实例的func1方法,因此须要将整个对象传递到集群中。
相似地,访问外部对象的字段时将引用整个对象:
为了不这种问题,最简单的方式是把field拷贝到本地变量,而不是去外部访问它:
理解闭包
关于Spark的一个更困难的问题是理解当在一个集群上执行代码的时候,变量和方法的范围以及生命周期。修改范围以外变量的RDD操做常常是形成混乱的源头。在下面的例子中咱们看一下使用foreach()来增长一个计数器的代码,不过一样的问题也可能有其余的操做引发。
例子
考虑下面的单纯的RDD元素求和,根据是否运行在一个虚拟机上,它们的行为彻底不一样。一个日常的例子是在local模式(--master=local[n])下运行Spark对比将Spark程序部署到一个集群上(例如经过spark-submit提交到YARN)。
Scala
Java
Python
本地模式VS集群模式
主要的挑战是,上述代码的行为是未定义的。在使用单个JVM的本地模式中,上面的代码会在RDD中计算值的总和并把它存储到计数器中。这是由于RDD和计数器变量在驱动节点的同一个内存空间中。
然而,在集群模式下,发生的事情更为复杂,上面的代码可能不会按照目的工做。要执行做业,Spark将RDD操做分红任务——每一个任务由一个执行器操做。在执行前,Spark计算闭包。闭包是指执行器要在RDD上进行计算时必须对执行节点可见的那些变量和方法(在这里是foreach())。这个闭包被序列化并发送到每个执行器。在local模式下,只有一个执行器所以全部东西都分享同一个闭包。然而在其余的模式中,就不是这个状况了,运行在不一样工做节点上的执行器有它们本身的闭包的一份拷贝。
这里发生的事情是闭包中的变量被发送到每一个执行器都是被拷贝的,所以,当计数器在foreach函数中引用时,它再也不是驱动节点上的那个计数器了。在驱动节点的内存中仍然有一个计数器,但它对执行器来讲再也不是可见的了!执行器只能看到序列化闭包中的拷贝。所以,计数器最终的值仍然是0,由于全部在计数器上的操做都是引用的序列化闭包中的值。
在这种状况下要确保一个良好定义的行为,应该使用累加器。Spark中的累加器是一个专门用来在执行被分散到一个集群中的各个工做节点上的状况下安全更新变量的机制。本指南中的累加器部分会作详细讨论。
通常来讲,闭包-构造像循环或者本地定义的方法,不该该用来改变一些全局状态。Spark没有定义或者是保证改变在闭包以外引用的对象的行为。一些这样作的代码可能会在local模式下起做用,但那仅仅是个偶然,这样的代码在分布式模式下是不会按照指望工做的。若是须要一些全局的参数,可使用累加器。
打印RDD中的元素
另外一个常见的用法是使用rdd.foreach(println)方法或者rdd.map(println)方法试图打印出RDD中的元素。在一台单一的机器上,这样会产生指望的输出并打印出RDD中的元素。然而,在集群模式中,被执行器调用输出到stdout的输出如今被写到了执行器的stdout,并非在驱动上的这一个,所以驱动上的stdout不会显示这些信息!要在驱动上打印全部的元素,可使用collect()方法首先把RDD取回到驱动节点如:rdd.collect().foreach(println)。然而,这可能致使驱动内存溢出,由于collect()将整个RDD拿到了单台机器上;若是你只须要打印不多几个RDD的元素,一个更安全的方法是使用take()方法:rdd.take(100).foreach(println)。
键值对的使用
Scala
虽然,在包含任意类型的对象的RDDs中,可使用大部分的Spark操做,但也有一些特殊的操做只能在键值对的RDDs上使用。最多见的一个就是分布式的洗牌(shuffle)操做,诸如基于key值对元素进行分组或聚合的操做。
在Scala中,包含二元组(Tuple2)对象(能够经过简单地(a,b)代码,来构建内置于语言中的元组的RDDs支持这些操做),只要你在程序中导入了org.apache.spark.SparkContext._,就能进行隐式转换。PairRDDFunction类支持键值对的操做,若是你导入了隐式转换,该类型就能自动地对元组RDD的元素进行转换。
好比,下列代码在键值对上使用了reduceByKey操做,来计算在一个文件中每行文本出现的总次数:
咱们也可使用counts.sortByKey(),好比,将键值对以字典序进行排序。最后使用counts.collect()转换成对象的数组形式,返回给驱动程序。
注意:在键值对操做中,若是使用了自定义对象做为建,你必须确保该对象实现了自定义的equals()和对应的hashCode()方法。更多详情请查看Object.hashCode()文档大纲中列出的规定。
Java
虽然,在包含任意类型的对象的RDDs中,可使用大部分的Spark操做,但也有一些特殊的操做只能在键值对的RDDs上使用。最多见的一个就是分布式的洗牌(shuffle)操做,诸如基于key值对元素进行分组或聚合的操做。
在java中,可使用Scala标准库中的scala.Tuple2类来表示键值对,你能够简单地调用new Tuple2(a,b)来建立一个元组,而后使用tuple._1()和tuple._2()方法来访问元组的字段。
使用JavaPairRDD来表示键值对RDDs。你可使用指定版本的map操做,从JavaRDDs构建JavaPairRDDs,好比mapToPair和flatMapToPair。JavaPairRDD支持标准的RDD函数,也支持特殊的键值函数。
例如,下面的代码在键值(key-value)对上使用 reduceByKey操做来计算在一个文件中每行文本出现的总次数:
咱们也可使用 counts.sortByKey(),例如,将键值对以字典序(alphabetically)进行排序。后调用 counts.collect() 转换成对象的数组形式,返回给驱动程序(driverprogram)。
注意:在键值(key-value)对操做中,若是使用了自定义对象做为键,你必须确保该对象实现了自定义的 equals()和对应的 hashCode()方法。更多详情请查看 Object.hashCode() documentation文档大纲中列出的规定。
Python
虽然在包含任意类型的对象的 RDDs中,可使用大部分的 Spark操做,但也有一
些特殊的操做只能在键值(key-value)对的 RDDs上使用。最多见的一个就是分布式的洗牌("shuffle")操做,诸如基于 key值对元素进行分组或聚合的操做。
在 Python中, RDDs支持的操做包含 Python内置的元组(tuples)操做,好比 (1, 2)。你能够简单地建立这样的元组,而后调用指望的操做。
例如,下面的代码在键值(key-value)对上使用 reduceByKey操做来计算在一个文件中每行文本出现的总次数:
咱们也可使用 counts.sortByKey(),例如,按照字典序(alphabetically)排序键值对。最后调用 counts.collect() 转换成对象的数组形式,返回给驱动程序(driver program)。
转换
下表中列出了 Spark支持的一些常见的转换 (Transformations)。详情请参考 RDDAPI文档 (Scala, Java, Python)和 pair RDD函数文档 (Scala, Java)。
Transformation |
Meaning |
map(func) |
返回一个新分布式数据集,由每个输入元素通过 func函数转换后组成。 |
filter(func) |
返回一个新数据集,由通过 func函数计算后返回值为 true的输入元素组成。 |
flatMap(func) |
相似于 map,可是每个输入元素能够被映射为 0或多个输出元素(所以 func应该返回一个序列(Seq),而不是单一元素)。 |
mapPartitions(func) |
相似于 map,但独立地在 RDD的每个分区(partition,对应块(block))上运行,当在类型为 T 的 RDD上运行时, func的函数类型必须是Iterator<T> => Iterator<U>。 |
mapPartitionsWithIndex(func) |
相似于 mapPartitions,但 func带有一个整数参数表示分区(partition)的索引值。当在类型为 T的 RDD上运行时, func的函数类型必须是(Int, Iterator<T>) => Iterator<U>。 |
sample(withReplacement, fraction, seed) |
根据 fraction指定的比例,对数据进行采样,能够选择是否用随机数进行替换, seed用于指定随机数生成器种子。 |
union(otherDataset) |
返回一个新的数据集,新数据集由源数据集和参数数据集的元素联合(union)而成。 |
intersection(otherDataset) |
返回一个新的数据集,新数据集由源数据集和参数数据集的元素的交集(intersection)组成。 |
distinct([numTasks])) |
返回一个新的数据集,新数据集由源数据集过滤掉多余的重复元素只保留一个而成。 |
groupByKey([numTasks]) |
在一个 (K, V)对的数据集上调用,返回一个 (K, Iterable<V>)对的数据集。 注意:若是你想在每一个key上分组执行聚合(如总和或平均值)操做,使用 reduceByKey或combineByKey会产生更好的性能。 注意:默认状况下,输出的并行数依赖于父 RDD(parent RDD)的分区数(number of partitions)。你能够经过传递可选的第二个参数 numTasks来设置不一样的任务数。 |
reduceByKey(func, [numTasks]) |
在一个 (K, V)对的数据集上调用时,返回一个 (K, V)对的数据集,使用指定的 reduce函数func将相同 key的值聚合到一块儿,该函数的类型必须是 (V,V) => V。相似 groupByKey,reduce的任务个数是能够经过第二个可选参数来配置的。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) |
在一个 (K, V)对的数据集上调用时,返回一个 (K, U)对的数据集,对每一个键的值使用给定的组合函数(combine functions)和一个中性的“零”值进行聚合。容许聚合后的值类型不一样于输入的值类型,从而避免了没必要要的内存分配。如同 groupByKey,能够经过设置第二个可选参数来配置 reduce任务的个数。 |
sortByKey([ascending], [numTasks]) |
在一个 (K, V)对的数据集上调用,其中, K必须实现Ordered,返回一个 按照 Key进行排序的 (K, V)对数据集,升序或降序由布尔参数 ascending决定。 |
join(otherDataset, [numTasks]) |
在类型为 (K, V)和 (K, W)类型的数据集上调用时,返回一个相同 key 对应的全部元素对在一块儿的 (K, (V, W))对的数据集。也支持外联(Outer joins),经过使用 leftOuterJoin和 rightOuterJoin. |
cogroup(otherDataset, [numTasks]) |
在类型为 (K, V)和 (K, W)的数据集上调用,返回一个 (K, Iterable<V>, Iterable<W>)元组(tuples)的数据集。这个操做也能够称之为 groupWith。 |
cartesian(otherDataset) |
笛卡尔积,在类型为 T和 U类型的数据集上调用时,返回一个 (T, U)对的数据集(全部元素交互进行笛卡尔积)。 |
pipe(command, [envVars]) |
以管道(Pipe)方式将 RDD的各个分区(partition)传递到 shell命令,好比一个 Perl或 bash脚本中。 RDD的元素会被写入进程的标准输入(stdin),而且将做为字符串的 RDD(RDD of strings),在进程的标准输出(stdout)上输出一行行数据。 |
coalesce(numPartitions) |
把 RDD的分区数下降到指定的 numPartitions。过滤掉一个大数据集 以后再执行操做会更加有效。 |
repartition(numPartitions) |
随机地对 RDD的数据从新洗牌(Reshuffle),以便建立更多或更少的分区,对它们进行平衡。老是对网络上的全部数据进行洗牌(shuffles)。 |
repartitionAndSortWithinPartitions(partitioner) |
根据给定的分区器对RDD进行从新分区,在每一个结果分区中,将记录按照key值进行排序。这在每一个分区中比先调用repartition再排序效率更高,由于它能够推进排序到分牌机器上。 |
动做
下表中列出了 Spark支持的一些常见的动做 (actions)。详情请参考 RDD API文档
(Scala,Java, Python) 和pair RDD函数文档(Scala, Java)。
Action |
Meaning |
reduce(func) |
经过函数 func (接受两个参数,返回一个参数),汇集数据集中的全部元素。该函数应该是可交换和可结合的,以便它能够正确地并行计算。 |
collect() |
在驱动程序中,以数组的形式,返回数据集的全部元素。这一般会在使用filter或者其它操做,并返回一个足够小的数据子集后再使用会比较有用 |
count() |
返回数据集的元素的个数。 |
first() |
返回数据集的第一个元素。 (相似于 take(1)). |
take(n) |
返回一个由数据集的前 n个元素组成的数组。注意,这个操做目前不能并行执行,而是由驱动程序(driver program)计算全部的元素。 |
takeSample(withReplacement,num, [seed]) |
返回一个数组,由数据集中随机采样的 num个元素组成,能够选择是否用随机数替换不足的部分,能够指定可选参数seed,预先指定一个随机数生成器的种子。 |
takeOrdered(n, [ordering]) |
返回一个由数据集的前 n个元素,并使用天然顺序或定制顺序对这些元素进行排序。 |
saveAsTextFile(path) |
将数据集的元素,以 text file (或 text file的集合)的形式,保存到本地文件系统的指定目录, Spark会对每一个元素调用 toString方法,而后转换为文件中的文本行。 |
saveAsSequenceFile(path) |
将数据集的元素,以 Hadoop sequencefile的格式,保存到各类文件系统的指定路径下,包括本地系统, HDFS或者任何其它 hadoop支持的文件系统。该方法只能用于键值(key-value)对的 RDDs,或者实现了 Hadoop的Writable接口的状况下。在 Scala中,也能够用于支持隐式转换为 Writable的类型。(Spark包括了基本类型的转换,例如 Int, Double, String,等等)。 |
saveAsObjectFile(path) |
以简单地 Java序列化方式将数据集的元素写入指定的路径,对应的能够用 SparkContext.objectFile()加载该文件。 |
countByKey() |
只对 (K,V)类型的 RDD有效。返回一个 (K, Int)对的 hashmap,其中 (K,Int)对表示每个 key对应的元素个数。 |
foreach(func) |
在数据集的每个元素上,运行 func函数。这一般用于反作用(sideeffects),例如更新一个累加器变量(accumulator variable)(参见下文),或者和外部存储系统进行交互. |
洗牌操做
Spark触发一个事件后进行的一些操做成为洗牌。洗牌是Spark从新分配数据的机制,这样它就能够跨分区分组。这一般涉及在执行器和机器之间复制数据,这就使得洗牌是一个复杂和高代价的操做。
背景
为了理解在洗牌的时候发生了什么,咱们能够考虑reduceByKey操做的例子。reduceByKey操做产生了一个新的RDD,在这个RDD中,全部的单个的值被组合成了一个元组,key和执行一个reduce函数后的结果中与这个key有关的全部值。面临的挑战是一个key的全部的值并不都是在同一个分区上的,甚至不是一台机器上的,可是他们必须是可链接的以计算结果。
在Spark中,数据通常是不会跨分区分布的,除非是在一个特殊的地方为了某种特定的目的。在计算过程当中,单个任务将在单个分区上操做——所以,为了组织全部数据执行单个reduceByKey中的reduce任务,Spark须要执行一个all-to-all操做。它必须读取全部分区,找到全部key的值,并跨分区把这些值放到一块儿来计算每一个key的最终结果——这就叫作洗牌。
尽管在每一个分区中新洗牌的元素集合是肯定性的,分区自己的顺序也一样如此,这些元素的顺序就不必定是了。若是指望在洗牌后得到可预测的有序的数据,可使用:
mapPartitions 来排序每一个分区,例如使用.sorted
repartitionAndSortWithinPartitions 在从新分区的同时有效地将分区排序
sortBy来建立一个全局排序的RDD
能够引发洗牌的操做有重分区例如repartition和coalesce,‘ByKey操做(除了计数)像groupByKey和reduceByKey,还有join操做例如cogroup和join。
性能影响
Shuffle是一个代价高昂的操做,由于它调用磁盘I/O,数据序列化和网络I/O。要组织shuffle的数据,Spark生成一个任务集合——map任务来组织数据,并使用一组reduce任务集合来聚合它。它的命名来自与MapReduce,但并不直接和Spark的map和reduce操做相关。
在内部,单个的map任务的结果被保存在内存中,直到他们在内存中存不下为止。而后,他们基于目标分区进行排序,并写入到一个单个的文件中。在reduce这边,任务读取相关的已经排序的块。
某些shuffle操做会消耗大量的堆内存,由于他们用在内存中的数据结构在转换操做以前和以后都要对数据进行组织。特别的,reduceByKey和aggregateByKey在map侧建立这些结构,‘ByKey操做在reduce侧生成这些结构。当数据在内存中存不下时,Spark会将他们存储到磁盘,形成额外的磁盘开销和增长垃圾收集。
Shuffle也会在磁盘上产生大量的中间文件。在Spark1.3中,这些文件直到Spark中止运行时才会从Spark的临时存储中清理掉,这意味着长时间运行Spark做业会消耗可观的磁盘空间。这些作了以后若是lineage从新计算了,那shuffle不须要从新计算了。在配置Spark上下文时,临时存储目录由spark.local.dir配置参数指定。
Shuffle的行为能够经过调整各类配置参数来调整。请看Spark配置指南中的Shuffle Behavior部分。
RDD持久化
Spark最重要的一个功能,就是在不一样操做间,将一个数据集持久化(persisting) (或缓存(caching))到内存中。当你持久化(persist)一个 RDD,每个节点都会把它计算的全部分区(partitions)存储在内存中,并在对数据集 (或者衍生出的数据集)执行其余动做(actioins)时重用。这将使得后续动做(actions)的执行变得更加迅速(一般快 10 倍)。缓存(Caching)是用 Spark 构建迭代算法和快速地交互使用的关键。
你可使用 persist()或 cache()方法来持久化一个 RDD。在首次被一个动做(action)触发计算后,它将会被保存到节点的内存中。 Spark 的缓存是带有容错机制的,若是 RDD丢失任何一个分区的话,会自动地用原先构建它的转换(transformations)操做来从新进行计算。
此外,每个被持久化的 RDD均可以用不一样的存储级别(storage level)进行存储,好比,容许你持久化数据集到硬盘,以序列化的 Java对象(节省空间)存储到内存,跨节点复制,或者以off-heap的方式存储在 Tachyon。这些级别的选择,是经过将一个 StorageLevel对象 (Scala Java, Python)传递到 persist()方法中进行设置的。 cache()方法是使用默认存储级别的快捷方法,也就是 StorageLevel.MEMORY_ONLY (将反序列化 (deserialized)的对象存入内存)。完整的可选存储级别以下:
Storage Level |
Meaning |
MEMORY_ONLY |
将 RDD以反序列化(deserialized)的Java对象存储到 JVM。若是 RDD不能被内存装下,一些分区将不会被缓存,而且在须要的时候被从新计算。这是默认的级别。 |
MEMORY_AND_DISK |
将 RDD以反序列化(deserialized)的 Java对象存储到 JVM。若是 RDD不能被内存装下,超出的分区将被保存在硬盘上,而且在须要时被读取。 |
MEMORY_ONLY_SER |
将 RDD以序列化(serialized)的 Java对象进行存储(每一分区占用一个字节数组)。一般来讲,这比将对象反序列化(deserialized)的空间利用率更高,尤为当使用快速序列化器(fast serializer),但在读取时会比较耗 CPU。 |
MEMORY_AND_DISK_SER |
相似于 MEMORY_ONLY_SER,可是把超出内存的分区将存储在硬盘上而不是在每次须要的时候从新计算。 |
DISK_ONLY |
只将 RDD分区存储在硬盘上。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. |
与上述的存储级别同样,可是将每个分区都复制到两个集群节点上。 |
OFF_HEAP (experimental) |
以序列化的格式 (serialized format) 将 RDD存储到 Tachyon。相比于MEMORY_ONLY_SER, OFF_HEAP 下降了垃圾收集(garbage collection)的开销,并使 executors变得更小并且共享内存池,这在大堆(heaps)和多应用并行的环境下是很是吸引人的。并且,因为 RDDs驻留于 Tachyon中, executor的崩溃不会致使内存中的缓存丢失。在这种模式下, Tachyon中的内存是可丢弃的。所以, Tachyon不会尝试重建一个在内存中被清除的分块。 |
注意:在 Python中,存储对象时老是使用 Pickle库来序列化(serialized),而无论你是否选择了一个序列化的级别。Spark也会自动地持久化一些洗牌(shuffle)操做(好比,reduceByKey )的中间数据,即便用户没有调用 persist。这么作是为了不在一个节点上的洗牌(shuffle)过程失败时,从新计算整个输入。咱们仍然建议用户在结果 RDD 上调用 persist,若是但愿重用它的话。
如何选择存储级别?
Spark 的存储级别旨在知足内存使用和CPU效率权衡上的不一样需求。咱们建议经过如下方法进行选择:
●若是你的 RDDs能够很好的与默认的存储级别(MEMORY_ONLY)契合,就不须要作任何修改了。这已是 CPU使用效率最高的选项,它使得RDDs的操做尽量的快。
●若是不行,试着使用 MEMORY_ONLY_SER,而且选择一个快速序列化库使对象在有比较高的空间使用率(space-efficient)的状况下,依然能够较快被访问。
●尽量不要存储到硬盘上,除非计算数据集的函数的计算量特别大,或者它们过滤了大量的数据。不然,从新计算一个分区的速度,可能和从硬盘中读取差很少快。
●若是你想有快速的故障恢复能力,使用复制存储级别(例如:用 Spark来响应 web应用的请求)。全部的存储级别都有经过从新计算丢失的数据来恢复错误的容错机制,可是复制的存储级别可让你在 RDD 上持续地运行任务,而不须要等待丢失的分区被从新计算。
●在大量的内存或多个应用程序的环境下,试验性的 OFF_HEAP模式具备如下几个优势:
o 容许多个 executors共享 Tachyon中相同的内存池。
o 极大地下降了垃圾收集器(garbage collection)的开销。
o 即便个别的 executors崩溃了,缓存的数据也不会丢失。
移除数据
Spark 会自动监控各个节点上的缓存使用状况,并使用最近最少使用算法(least-recently-used (LRU))删除老的数据分区。若是你想手动移除一个 RDD,而不是等它自动从缓存中清除,可使用 RDD.unpersist()方法。
共享变量
通常来讲,当一个函数被传递给一个在远程集群节点上运行的 Spark操做(例如 map或 reduce) 时,它操做的是这个函数用到的全部变量的独立拷贝。这些变量会被拷贝到每一台机器,并且在远程机器上对变量的全部更新都不会被传播回驱动程序。一般看来,读-写任务间的共享变量显然不够高效。然而,Spark仍是为两种常见的使用模式,提供了两种有限的共享变量:广播变量(broadcast variables)和累加器(accumulators)。
广播变量
广播变量容许程序员保留一个只读的变量,缓存在每一台机器上,而不是每一个任务保存一份拷贝。它们能够这样被使用,例如,以一种高效的方式给每一个节点一个大的输入数据集。Spark会尝试使用一种高效的广播算法来传播广播变量,从而减小通讯的代价。
Spark动做的执行是经过一个阶段的集合,经过分布式的Shuffle操做分离。Spark自动广播在每一个阶段里任务须要的共同数据。以这种方式广播的数据以序列化的形式缓存并在运行每一个任务以前进行反序列化。这意味着显式地建立广播变量只在当多个阶段之间须要相同的数据或者是当用反序列化的形式缓存数据特别重要的时候。
广播变量是经过调用 SparkContext.broadcast(v)方法从变量 v建立的。广播变量是一个 v的封装器,它的值能够经过调用 value方法得到。以下代码展现了这个:
Scala
Java
Python
在广播变量被建立后,它应该在集群运行的任何函数中,代替 v值被调用,从而 v值不须要被再次传递到这些节点上。另外,对象 v不能在广播后修改,这样能够保证全部节点具备相同的广播变量的值(好比,后续若是变量被传递到一个新的节点)。
累加器
累加器是一种只能经过具备结合性的操做(associative operation)进行“加(added)”的变量,所以能够高效地支持并行。它们能够用来实现计数器(如 MapReduce 中)和求和器。 Spark原生就支持数值类型的累加器,开发者能够本身添加新的支持类型。若是建立了一个命名的累加器(accumulators),这些累加器将会显示在 Spark UI 界面上。这对于了解当前运行阶段(stages)的进展状况是很是有用的(注意:这在 Python中还没有支持)。
一个累加器能够经过调用 SparkContext.accumulator(v)方法从一个初始值 v中建立。运行在集群上的任务,能够经过使用 add方法或 +=操做符(在 Scala和 Python)来给它加值。然而,它们不能读取这个值。只有驱动程序可使用 value方法来读取累加器的值。
如下代码展现了如何利用一个累加器,将一个数组里面的全部元素相加:
Scala
Java
Python
虽然代码可使用内置支持的 Int类型的累加器,但程序员也能够经过子类化(subclassing) AccumulatorParam来建立本身的类型。AccumulatorParam接口有两个方法: zero,为你的数据类型提供了一个“零值(zero value)”,以及 addInPlace提供了两个值相加的方法。好比,假设咱们有一个表示数学上向量的 Vector类,咱们能够这么写:
Scala
在 Scala中, Spark也支持更通用的 Accumulable接口去累加数据,其结果类型和累加的元素不一样(好比,构建一个包含全部元素的列表),而且SparkContext.accumulableCollection方法能够累加普通的 Scala集合(collection)类型。
Java
在 Java中, Spark也支持更通用的 Accumulable接口去累加数据,其结果类型和累加的元素不一样(好比,构建一个包含全部元素的列表)。
Python
由于累加器的更新只在action中执行,Spark确保每一个任务对累加器的更新都只会被应用一次,例如,重启任务将不会更新这个值。在转换中,用户应该清楚若是任务或者做业阶段是重复运行的,每一个任务的更新可能会应用不止一次。
累加器不会改变Spark的懒惰评价模型。若是它们在一个RDD的操做中正在被更新,他们的值只会被更新一次,RDD做为动做的一部分被计算。所以,累加器更新当在执行一个懒惰转换,例如map()时,并不保证被执行。下面的代码段演示了这个属性:
Scala
Java
Python
把代码部署到集群上
应用程序提交指南(application submission guide)描述了如何将应用程序提交到一个集群,简单地说,一旦你将你的应用程序打包成一个JAR(对于 Java/Scala)或者一组的 .py或 .zip文件 (对于 Python), bin/spark-submit 脚本可让你将它提交到支持的任何集群管理器中。
从Java/Scala中启动Spark做业
Org.apache.spark.launcher包中提供了相关类来启动Spark做业做为子线程的简单的Java API。
单元测试
Spark 对单元测试很是友好,可使用任何流行的单元测试框架。在你的测试中简单地建立一个 SparkContext,并将 master URL设置成local,运行你的各类操做,而后调用 SparkContext.stop()结束测试。确保在 finally块或测试框架的 tearDown方法中调用 context的 stop方法,由于 Spark不支持在一个程序中同时运行两个contexts。
Spark1.0以前版本的迁移
Scala
Spark 1.0 冻结了 1.X系列的 Spark核心(Core) API,如今,其中的 API,除了标识为“试验性(experimental)”或“开发者的(developer) API”的,在未来的版本中都会被支持。对 Scala用户而言,惟一的改变在于组操做(grouping operations),好比, groupByKey, cogroup和 join,其返回值已经从 (Key, Seq[Value])对修改成 (Key,Iterable[Value])。
迁移指南也能够从 Spark Streaming, MLlib和 GraphX获取。
Java
Spark 1.0 冻结了 1.X系列的 Spark核心(Core) API,如今,其中的 API,只要不是标识为“试验性(experimental)”或“开发者的(developer) API”的,在未来的版本中都会被支持。其中对 Java API作了一些修改:
•对于 org.apache.spark.api.java.function中的类函数(Function classes),在 1.0版本中变成了接口,这意味着旧的代码中 extends Function应该须要为 implement Function。
•增长了 map转换(transformations)的新变体,如 mapToPair和 mapToDouble,用于建立指定数据类型的 RDDs。
•组操做(grouping operations),如 groupByKey, cogroup 和 join的返回值也被修改了,从原先返回 (Key, List<Value>)对改成(Key,Iterable<Value>)。
迁移指南也能够从 Spark Streaming, MLlib和 GraphX获取。
Python
Spark 1.0 冻结了 1.X系列的 Spark核心(Core) API,如今,其中的 API,只要不是
标识为“试验性(experimental)”或“开发者的(developer) API”的,在未来的版本中
都会被支持。对 Python用户而言,惟一的修改在于分组操做(grouping operations),比
如groupByKey,cogroup和join,其返回值从 (key, list of values)对修改成 (key,
iterableof values)。
迁移指南也能够从 Spark Streaming, MLlib和 GraphX获取。
下一步
你能够在 Spark的网站上看到 spark程序的样例。另外,Spark在 examples目录 (Scala, Java, Python,R)中也包含了一些样例。你能够经过将类名传递给 spark的 bin/run-example脚原本运行 Java和 Scala的样例,例如:
对于 Python样例,要使用 spark-submit:
对于R样例,使用spark-submit:
为了帮助优化你的程序,在配置(configuration)和调优(tuning)的指南上提供了最佳实践信息。它们在确保将你的数据用一个有效的格式存储在内存上,是很是重要的。对于部署的帮助信息,能够查看集群模式概述(cluster mode overview),描述了分布式操做以及支持集群管理器所涉及的组件。
最后,完整的 API文档能够查看 Scala, Java,Python和R。