East 2015 (Nov 26, 2014)html
Spark wins Daytona Gray Sort 100TB Benchmark (Nov 05, 2014)java
Archivepython
Download Sparkshell
Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.apache
外文地址:数组
http://spark.apache.org/docs/latest/programming-guide.html缓存
Spark has an advanced DAG execution engine that supports cyclic data flow and in-memory computing.并发
在最顶层,Spark包含一个起始程序来运行用户编写的main 方法,而且执行多样的并行操做在一个集群上。其中最主要的抽象就是Spark提供一个 RDD (弹性集群数据集),是一些能够经过节点访问执行并行任务的集群的集合。RDD 经过一个Hadoop文件系统中的文件被建立(或者其它支持Hadoop文件系统的系统)。用户常常会让Spark去保留一个RDD在内存里。容许它在并行操做时被高效重用。最后,Rdd将自动从节点错误中恢复。分布式
第二个被抽象是,在并行操做时,变量将被共享。默认的,当Spark 在不一样节点上并行运行任务集合中的一个方法,它为任务中每一个方法的变量都保留一个备份。有时候,一些变量须要被跨任务共享,或者两个任务间共享,或者给起始程序共享。Spark提供两种形式的变量:广播变量,能够在全部节点上缓存变量在内存里。另外一个是储蓄变量,只能被增长,例如数量,或者和。ide
本文展现这些功能,使用每一个Spark支持的语言,本文是python。若是你打开 Spark的交互shell,bin/spark-shell, 或者python的shell, bin/pyspark. 很容易学的、
Spark 所使用的数据集RDDs 是一个能够被并行操做的容错数据集。有两种方式去建立它,1,并行一个你本地存在的集合。或者引用一个额外的储存系统。例如: 一个共享的文件系统,HDFS, HBase, 或者任何提供Hadoop输入格式的资源。
并行集合经过Spark 上下文 SparkContext
’s parallelize
方法被建立,基于你磁盘上一个存在的能够迭代的集合。集合中的元素经过经过复制造成一个分布式数据集,这个数据集能够被并行使用。例如。下面是一个怎样建立一个并发数据集用数字1-5:
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
一旦被建立。这个分布式数据集能够被并发使用。例如,咱们能够 distData.reduce(lambda a, b: a + b) 去相加他们。咱们稍后并发操做。
并发数据集的一个重要的参数是数据集被分割的子集数量。Spark 将为每一个子集运行一个任务。表明性的,你想要没给CPU运行2-4个子集。正常状况下: Spark 将试图设计你分区的数量分局你的集群。可是,你依然能够经过传递第二个参数来手动设置它。 parallelize
(e.g. sc.parallelize(data, 10)
).
注意:你代码中一些地方会用到 slices (子集的别名),为了上下兼容性而存在的。
其它数据集合:
Pyspark 能够从任何被hadoop 支持的存储资源中 建立分布式数据集。包括本地文件系统,HDFS, Cassandra, HBase, Amazon S3 等。Sparks 支持文本文件,hadoop 输入格式文件。
文本文件 RDDs 能够经过 SparkContext
’s textFile 方法建立这个方法使用一个URI 为这个文件,不论是本地路径,仍是
hdfs://
, s3n://
, URI。将它读做行的集合。下面是一个例子:
>>> distFile = sc.textFile("data.txt")
一旦被建立。文件能够经过集合操做使用。例如,咱们能够相加行的数目经过使用 map 和 reduce 方法以下:
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
.
在用Spark读取文件时候的一些注意项:
若是使用一个本地文件系统,这个文件的权限必须被开放。能够被复制或者被共享。
全部的Spark 文件输入方法,包括 textFile, 也支持运行目录,压缩文件,和通配符。例如,你可使用 textFile("/my/directory") ,textFile("/my/directory/*.txt") 和 textFile("/my/directory/*.gz");
textFile 方法一样使用第二个参数去控制文件被分割的子集。默认,Spark为这个文件的每一个Block (在 HDFS中是64M )建立一个子集,可是你能够经过传递一个参数来设定更高的值。注意,这个值不能比默认 blocks分割的 更小。
除了 textFile , Spark 的Python API 提供了其它的数据格式:
SparkContext.wholeTextFiles
让你读一个包含不少小文件的目录。而后返回以键值对的方式返回它们(文件名,内容)。这个方法相对 textFile ,后者返回文件的每一行。
RDD.saveAsPickleFile
and SparkContext.
pickleFile
支持以简单python对象格式存储一个RDD 。
SequenceFile and Hadoop Input/Output Formats
注意,这个功能目前在实验阶段。用于高级用户。它或许被替代在未来。新的方式将采用基于 Saprk SQL 读写。那时候,Spark SQL 将是最优先的方式。
写支持:
PySpark SequenceFile 支持加载一个java 键值组成的RDD ,根据java 类型转换成可写的。当保存一个键值对组成的RDD 到 SequenceFile中, PySpark 进行这个转换。它把Python对象转成 Java对象,而后把它们转变成可写的。下面这些类型自动转换:
数组不被处理,用户须要去指定自定义的 ArrayWritable 子类型 当读写的时候。在写的时候,用户还须要指定自定义的转换把 数组转换成 自动以的 ArrayWritalbe 子类型。在读的时候,默认的转换将自定义的ArrayWritable 子类型转成Java 对象。而后转成Python元组。
保存和加载 SequenceFiles
和文本文件相似。SequenceFiles 经过指定路径被保存和加载。key 和 value 能够被分开。可是对于标准写来说,不须要如此。
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) >>> rdd.saveAsSequenceFile("path/to/file") >>> sorted(sc.sequenceFile("path/to/file").collect()) [(1, u'a'), (2, u'aa'), (3, u'aaa')]
Rdd 操做:
Rdd 支持两种形式的操做,1转换,根据现有的数据集建立一个新的。2动做,在数据集上运行一些计算后返回值。例如,map 是一个转换,经过一个方法将数据集的每一个元素返回到一个rdd结果中,另外一边,reduce 是一个动做,经过方法将RDDd的元素合并成一个最终结果(reduceByKey 返回一个分布式数据集)。
Spark 中全部的转换都是懒执行的。因此它们并不立刻计算它们的结果。代替的,它们记着这些用于基础数据集的转换,当一个动做要求一个结果被返回来,那么才执行这些计算。这个设计可使Spark 运行更高效。例如。咱们能够实现一个经过map建立的数据集被reduce 使用而后返回一个reduce结果,而不是一个大的map过的集合。
默认,每个被转换的RDD或许被从新计算当你每次对它使用action时。可是,你能够缓存一个RDD在内存里,经过使用 persist或者cache,方法。此时,Spark将保存这个元素在集群上,为了你下次更快使用它。也支持缓存rdds 在磁盘上。或者复制在多节点上。
lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
第一行根据一个外部文件定义了一个基础rdd。数据集不被加载到内存,除非被动做执行。lines 仅仅是一个文件的指针,第二行定义一个 lineLength 做为一个map 的转换结果。一样的,lineLength 不是当即被计算。由于懒执行。最后,我么能够reduce, 它是一个动做。 此时,Spark 分割计算成不少任务。在各个机器上运行,每一个机器运行它本身部分的map和一个reduce 的结果。仅仅返回这个结果到起始程序中。
若是咱们以后须要屡次使用lineLength,咱们能够缓存它:
lineLengths.persist()
在reduce 以前,lineLengths 将再第一次被计算出来后缓存到内存。
传递方法给Spark:
Spark 的API 特别支持传递方法给起始程序,而后在集群上运行。有三种推荐方式:
Lambda表达式, 对于简单的方法能够写成表达式,Lambda不支持多行方法 或者不返回值的方法。
对于长一点的代码。在方法内部定义。
做为一个模块。
例如,传递一个长一点的方法而不是使用lomba。看下面代码:
"""MyScript.py""" if __name__ == "__main__": def myFunc(s): words = s.split(" ") return len(words) sc = SparkContext(...) sc.textFile("file.txt").map(myFunc)
注意,当一个对象实例方法 容许传入引用时(和单例对象相反)。须要传递这个class和方法一块儿:例如:
class MyClass(object): def func(self, s): return s def doStuff(self, rdd): return rdd.map(self.func)
建立了一个新的 MyClass, 而后调用 doStuff。内部的map 引用这个MyClass 实例的 func ,因此整个对象须要被传递到 集群。
相似的,外部对象的可访问领域,将引用真个对象。
class MyClass(object): def __init__(self): self.field = "Hello" def doStuff(self, rdd): return rdd.map(lambda s: self.field + x)
为了防止出错,最简单的方式是复制 field 到一个本地变量代替访问它:
def doStuff(self, rdd): field = self.field return rdd.map(lambda s: field + x)
待续!