首先,spark应用程序由一个驱动程序构成,由它运行用户的main函数,而且在集群上执行各类并行化操做。这个抽象的spark应用函数提供弹性分部式数据集【Spark provides is a resilient distributed dataset (RDD)】,一个rdd能够从hadoop文件系统,或者现存的scala集合,或者从其它集合中转换生成。咱们可让rdd保存在内存中,可让rdd能够高效的作序列化操做。而且RDD还有一个牛逼的功能,就是自动恢复无效的节点。html
其次,spark支持并行化操做中共享变量。当一个spark在不一样的节点上运行一系列任务,spark能复制函数中每个变量到每个任务节点中。然而,有时一个变量要在不一样集群中共享,spark支持两种方式解决这个问题:广播变量(缓存一个数值到全部节点中),累加器(只能用来累加和求和的变量)java
创建spark2.2.0须要依赖相关库,若是须要使用hadoop 集群还须要使用hdfs的库,maven示例node
spark coreweb
groupId = org.apache.spark artifactId = spark-core_2.11 version = 2.2.0
hadoopshell
groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version>
spark编程的第一件事就是建立sparkContext对象,该对象告诉spark如何访问集群。数据库
val conf = new SparkConf().setAppName(appName).setMaster(master) new SparkContext(conf)
每一个jvm只能实例化一个spark上下文,在建立一个新的sparkContext以前你必须调用stop函数(估计是单例模式?)apache
appName是显示在集群控制界面Ui上的名称,master是运行spark的模式,有 Spark, Mesos or YARN cluster URL,或者是本地模式“local”,若是是运行集群模式,就不须要以硬编码方式设置master,而是以spark-submit的方式启动程序,而且在集群中接受运行。对于本地测试和单机测试,建议使用设置“local”的方式去在进程中运行spark。编程
在spark shell中,一个sparkContext已经内建好了,变量名是sc,建立的SparkContext是不生效的。api
在shell中,能够经过--master参数设置运行模式,--jars添加jar包,--packages添加额外包数组
$ ./bin/spark-shell --master local[4]
$ ./bin/spark-shell --master local[4] --jars code.jar
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
spark的核心概念是rdd,一个容错的可并行处理的分布式数据集合框架。rdd经过两种方式建立:1,外部数据源(各类集合对象并行化,文件系统,hdfs,hbase等);2,rdd转换。
在程序中现有的集合中调用sparkContext的parallelize(集合对象)方法。举个栗子:建立1到5的并行化集合对象
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)
代码很简单,spark官方文档这里絮絮不休的讲了一些废话,只有一句是重点,
We describe operations on distributed datasets later on.spark是懒执行,既是遇到action操做的时候才运行程序。
对于并行化集合的一个重要的参数是数据集拆分的数量。spark将在集群上的每一个节点运行一个任务。一般集群的cpu数量和分区数一致比较好。
spark能经过如下几种方式建立rdd:本地文件系统,hdfs, Cassandra(一款开源分布式NOSql图形数据库), HBase, Amazon S3等,spark支持文本文件,序列化文件,其它hadoop输入格式(官方文档重复了n次了,估计是spark引以自豪的东西)。
对于文本文件,rdd可以使用 SparkContext
的 textFile
方法建立rdd对象。这个方法参数uri能够是本地路径,hdfs://,s3n://,而后读取其中的每一行。举个栗子,咱们能够经过这种方式累加文本文件的行数的长度。
distFile.map(s => s.length).reduce((a, b) => a + b).
一些注意点
textFile("/my/directory"), textFile("/my/directory/*.txt"), textFile("/my/directory/*.gz")
textFile
method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.举个栗子val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/" //local file val rdd1 = sc.textFile(path,2)
从本地系统中读取licenses这个文件夹下的全部文件
這里特别注意的是,好比這个文件夹下有35个文件,上面分区数设置是2,那么整个RDD的分区数是35*2?這是错误的,這个RDD的分区数无论你的partition数设置为多少时,只要license這个文件夹下的這个文件a.txt(好比有a.txt)没有超过128m,那么a.txt就只有一个partition。那么就是说只要这35个文件其中没有一个超过128m,那么分区数就是 35个。
RDD.saveAsObjectFile
and SparkContext.objectFile
将rdd对象序列化后保存。RDD实现类支持两种类型的操做:Transformations,从另外一个RDD转换成一个新的数据集;Action,通过从数据中计算后返回一个新的值。好比 map 函数,传入一个rdd,返回一个新的处理过的rdd。另外一方面,reduce是action操做,其对rdd中全部元素通过某种函数处理,返回一个最终结果给驱动程序。
全部spark的Transformation是懒处理的,其不会马上进行计算,而是记录须要用于处理的数据集。只有当遇到action操做后才进行计算处理。这种方式让spark运行更高效。
所以,当你运行一个action的时候每一个rdd会被再次计算。然而,经过persist/cache你也能持久化一个rdd在内存中,在你下次查询这个rdd的时候,程序能从集群中快速读取。
rdd也支持在硬盘或者在多个节点持久化。
先上一段代码
val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b)
第一行定义一个来自外部文件的简单rdd对象。这个数据集不会立刻加载到内存中,lines几乎只是一个指向文件的指针。第二行定义一个lineLengths做为map转换的结果。因此lineLengths由于懒原则不会马上处理。最终,咱们运行一个reduce,reduce做为action操做,会立刻出发程序,将任务下发到不一样机器,每一个机器运行map的一部分和reduction的一部分,最终返回驱动一个结果。
若是咱们会再次使用到lineLengths,建议持久化
lineLengths.persist()
在运行reduce以前,lineLengths在第一次计算后依旧会被保存在内存中数据不会被释放,下次计算时不须要再次出来以前的lines.map(s => s.length)操做。
spark api在集群中运行过程高度依赖传函数功能。有两种推荐的方法:
object MyFunctions { def func1(s: String): String = { ... } } myRdd.map(MyFunctions.func1)
注意,你也能够传递一个类对象实例山谷的方法(而不是单个对象),不过这会致使传递函数的同时,须要把相应的对象也发送到集群中各个节点上。例如,咱们定义一个MyClass以下:
class MyClass { def func1(s: String): String = { ... } def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) } }
若是咱们 new MyClass 建立一个实例,并调用其 doStuff 方法,同时doStuff中的 map引用了该MyClass实例上的 func1 方法,那么接下来,这个MyClass对象将被发送到集群中全部节点上。rdd.map(x => this.func1(x)) 也会有相似的效果。
相似地,若是引用外部对象的成员变量,也会致使对整个对象实例的引用:
class MyClass { val field = "Hello" def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) } }
上面的代码对field的引用等价于 rdd.map(x => this.field + x),这将致使引用整个this对象。
为了不相似问题,最简单的方式就是,将field执拗到一个本地临时变量中,而不是从外部直接访问之,以下:
def doStuff(rdd: RDD[String]): RDD[String] = { val field_ = this.field rdd.map(x => field_ + x) }
spark中比较须要注意的点是:跨节点执行代码中的变量和方法的做用域和生命周期。在生命周期范围外修改rdd中的数值容易形成错误。在spark单机模式和集群模式下,结果每每不同。
var counter = 0 var rdd = sc.parallelize(data) // Wrong: Don't do this!! rdd.foreach(x => counter += x) println("Counter value: " + counter)
运行结果counter:0
spark把rdd操做分割成多个独立子操做,每一个任务发布给不一样计算节点。执行操做前,rdd会计算闭包,把计算所须要的变量和方法副本序列化后发送给每一个计算节点。节点上的counter变量和驱动器上的counter变量不是同一个变量。因此二者值不同。
未解决这类问题,须要使用累加器。累加器是Spark中专门用于集群跨节点分布式执行计算中,安全地更新同一变量的机制。
一般来讲,闭包(由循环或本地方法组成),不该该改写全局状态。Spark中改写闭包以外对象的行为是未定义的。这种代码,有可能在本地模式下能正常工做,但这只是偶然状况,一样的代码在分布式模式下其行为极可能不是你想要的。因此,若是须要全局聚合,请记得使用累加器。
在实际编程中,咱们常常须要把RDD中的元素打印输出到屏幕上(标准输出stdout),通常会采用语句rdd.foreach(println)或者rdd.map(println)。当采用本地模式(local)在单机上执行时,这些语句会打印出一个RDD中的全部元素。可是,当采用集群模式执行时,在worker节点上执行打印语句是输出到worker节点的stdout中,而不是输出到任务控制节点Driver Program中,所以,任务控制节点Driver Program中的stdout是不会显示打印语句的这些输出内容的。为了可以把全部worker节点上的打印输出信息也显示到Driver Program中,可使用collect()方法,好比,rdd.collect().foreach(println),可是,因为collect()方法会把各个worker节点上的全部RDD元素都抓取到Driver Program中,所以,这可能会致使内存溢出。所以,当你只须要打印RDD的部分元素时,能够采用语句rdd.take(100).foreach(println)。
当大部分spark均可以在任意类型对象上进行rdd操做,然然而也有部分操做只能在键值对上进行。其中最多见的是“shuffle”操做(对应中文是拖动的意思),好比经过key进行分组或聚合元素操做。
一些通用的rddtransformation函数;
函数名 | 说明 | |||||||||||||||||||||
map | 返回一个新的数据集,其中每一个元素都是通过func处理后得来 |
|||||||||||||||||||||
filter | 返回一个新的数据集,其中每一个元素都是通过func过滤后得来 | |||||||||||||||||||||
flatmap | 和map类似,可是其中的每一个输入元素可能有0或多个输出。 | |||||||||||||||||||||
mapPartitions | 和map类似,只是map输入对应于每一个分区 | |||||||||||||||||||||
mapPartitionsWithIndex | ||||||||||||||||||||||
sample(withReplacement, fraction, seed) | 采样部分(比例取决于 fraction )数据,同时能够指定是否使用回置采样(withReplacement),以及随机数种子(seed) | |||||||||||||||||||||
Union(other Dataset) | 返回数据集的并集 | |||||||||||||||||||||
intersection(otherDataset) | 返回原数据集和参数数据集的交集 | |||||||||||||||||||||
distinct([numTasks]) | 去重 | |||||||||||||||||||||
|
||||||||||||||||||||||
mapPartitions说明
map()的输入函数是应用于RDD中每一个元素,而mapPartitions()的输入函数是应用于每一个分区
package test import scala.Iterator import org.apache.spark.SparkConf import org.apache.spark.SparkContext object TestRdd { def sumOfEveryPartition(input: Iterator[Int]): Int = { var total = 0 input.foreach { elem => total += elem } total } def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Rdd Test") val spark = new SparkContext(conf) val input = spark.parallelize(List(1, 2, 3, 4, 5, 6), 2)//RDD有6个元素,分红2个partition val result = input.mapPartitions( partition => Iterator(sumOfEveryPartition(partition)))//partition是传入的参数,是个list,要求返回也是list,即Iterator(sumOfEveryPartition(partition)) result.collect().foreach { println(_)//6 15 } spark.stop() } }
Action函数 | 做用 |
---|---|
reduce(func) | 将RDD中元素按func进行聚合(func是一个 (T,T) => T 的映射函数,其中T为源RDD元素类型,而且func须要知足 交换律 和 结合律 以便支持并行计算) |
collect() | 将数据集中全部元素以数组形式返回驱动器(driver)程序。一般用于,在RDD进行了filter或其余过滤操做后,将一个足够小的数据子集返回到驱动器内存中。 |
count() | 返回数据集中元素个数 |
first() | 返回数据集中首个元素(相似于 take(1) ) |
take(n) | 返回数据集中前 n 个元素 |
takeSample(withReplacement,num, [seed]) | 返回数据集的随机采样子集,最多包含 num 个元素,withReplacement 表示是否使用回置采样,最后一个参数为可选参数seed,随机数生成器的种子。 |
takeOrdered(n, [ordering]) | 按元素排序(能够经过 ordering 自定义排序规则)后,返回前 n 个元素 |
saveAsTextFile(path) | 将数据集中元素保存到指定目录下的文本文件中(或者多个文本文件),支持本地文件系统、HDFS 或者其余任何Hadoop支持的文件系统。 保存过程当中,Spark会调用每一个元素的toString方法,并将结果保存成文件中的一行。 |
saveAsSequenceFile(path) (Java and Scala) |
将数据集中元素保存到指定目录下的Hadoop Sequence文件中,支持本地文件系统、HDFS 或者其余任何Hadoop支持的文件系统。适用于实现了Writable接口的键值对RDD。在Scala中,一样也适用于可以被隐式转换为Writable的类型(Spark实现了全部基本类型的隐式转换,如:Int,Double,String 等) |
saveAsObjectFile(path) (Java and Scala) |
将RDD元素以Java序列化的格式保存成文件,保存结果文件可使用 SparkContext.objectFile 来读取。 |
countByKey() | 只适用于包含键值对(K, V)的RDD,并返回一个哈希表,包含 (K, Int) 对,表示每一个key的个数。 |
foreach(func) | 在RDD的每一个元素上运行 func 函数。一般被用于累加操做,如:更新一个累加器(Accumulator ) 或者 和外部存储系统互操做。 注意:用 foreach 操做出累加器以外的变量可能致使未定义的行为。更详细请参考前面的“理解闭包”(Understanding closures )这一小节。 |
有一些Spark算子会触发众所周知的混洗(Shuffle)事件。Spark中的混洗机制是用于将数据从新分布,其结果是全部数据将在各个分区间从新分组。通常状况下,混洗须要跨执行器(executor)或跨机器复制数据,这也是混洗操做通常都比较复杂并且开销大的缘由。
为了理解混洗阶段都发生了哪些事,我首先以reduceByKey 为例来看一下。reduceByKey会生成一个新的RDD,将源RDD中一个key对应的多个value组合进一个tuple - 而后将这些values输入给reduce函数,获得的result再和key关联放入新的RDD中。这个的难点在于对于某一个key来讲,并不是其对应的全部values都在同一个分区(partition)中,甚至有可能都不在同一台机器上,可是这些values又必须放到一块儿计算reduce结果。
在Spark中,一般是因为为了进行某种计算操做,而将数据分布到所须要的各个分区当中。而在计算阶段,单个任务(task)只会操做单个分区中的数据 – 所以,为了组织好每一个reduceByKey中reduce任务执行时所需的数据,Spark须要执行一个多对多操做。即,Spark须要读取RDD的全部分区,并找到全部key对应的全部values,而后跨分区传输这些values,并将每一个key对应的全部values放到同一分区,以便后续计算各个key对应values的reduce结果 – 这个过程就叫作混洗(Shuffle)。
shuffle开销大。
RDD持久化
rdd持久化分七个级别
Storage Level | Meaning |
---|---|
MEMORY_ONLY | Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level. 未序列化java对象存储在jvm内,若是内存不足,部分数据不会存储,且再次使用的过程会从新计算。这是默认级别,且cpu处理器最有效率的方式。 |
MEMORY_AND_DISK | Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed. 未序列化java对象存储在jvm内,若是内存不足,部分数据存储在硬盘,且再次使用的过程会读取硬盘。 |
MEMORY_ONLY_SER (Java and Scala) |
Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read. 以序列化形式存储RDD(每一个分区一个字节数组)。一般这种方式比未序列化存储方式要更省空间,尤为是若是你选用了一个比较好的序列化协议(fast serializer),可是这种方式也相应的会消耗更多的CPU来读取数据。 |
MEMORY_AND_DISK_SER (Java and Scala) |
Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed. 和MEMORY_ONLY_SER相似,只是当内存装不下的时候,会将分区的数据吐到磁盘上,而不是每次用到都从新计算。 |
DISK_ONLY | Store the RDD partitions only on disk. 只存储在磁盘上。这种缓存估计用在处理超大文件的过程。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | Same as the levels above, but replicate each partition on two cluster nodes. 和上面没有”_2″的级别相对应,只不过每一个分区数据会在两个节点上保存两份副本。 |
OFF_HEAP (experimental) | Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled. |
Spark的存储级别主要可于在内存使用和CPU占用之间作一些权衡。建议根据如下步骤来选择一个合适的存储级别:
Spark可以自动监控各个节点上缓存使用率,而且以LRU(last recent used)的方式将老数据逐出内存。若是你更喜欢手动控制的话,能够用RDD.unpersist() 方法来删除无用的缓存。
通常来讲,当spark传递一个函数操做到远程集群节点,驱动程序会把相关的数据以副本形式发送到各个节点。由于跨节点的读写效率过低了,因此通常不会对远程的副本数据进行更新。然而有时候也须要对一些数据进行读写,这就是广播变量和累加器。
广播变量(groadcast varible)为只读变量,它有运行SparkContext的驱动程序建立后发送给参与计算的节点。对那些须要让工做节点高效地访问相同数据的应用场景,好比机器学习。咱们能够在SparkContext上调用broadcast方法建立广播变量:
val broadcastList = sc.broadcast(List("Spark","Impala","Hadoop"))
广播变量也能够被非驱动程序所在节点(即工做节点)访问,访问方法就是调用该变量的value方法
sc.parallelize(List("1","2","3")).map(x => broadcastList.value ++ x).collect
使用广播变量能够优化资源提升性能
广播变量的优点:是由于不是每一个task一份变量副本,而是变成每一个节点的executor才一份副本。这样的话,就可让变量产生的副本大大减小。
广播变量,初始的时候,就在Drvier上有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在本身本地的Executor对应的
BlockManager中,尝试获取变量副本;若是本地没有,BlockManager,也许会从远程的Driver上面去获取变量副本;也有可能从距离比较近的其余
节点的Executor的BlockManager上去获取,并保存在本地的BlockManager中;BlockManager负责管理某个Executor对应的内存和磁盘上的数据,
此后这个executor上的task,都会直接使用本地的BlockManager中的副本。
例如,50个executor,1000个task。一个map,10M:
默认状况下,1000个task,1000份副本。10G的数据,网络传输,在集群中,耗费10G的内存资源。
若是使用了广播变量。50个execurtor,50个副本。500M的数据,网络传输,并且不必定都是从Driver传输到每一个节点,还多是就近从最近的
节点的executor的bockmanager上拉取变量副本,网络传输速度大大增长;500M,大大下降了内存消耗。