Spark编程入门

1. 交互式Spark-Shell

根据前一节已经搭建好的Hadoop和Spark环境,直接经过脚本启动Hadoop和Spark服务。若是 http://localhost:8080 可以访问,说明Spark服务已经启动。Spark为咱们提供了PySpark以及Spark-shell,能够方便的经过交互试界面调试Spark应用。接下来咱们将采用Spark-Shell来调试Spark程序。在终端中输入以下命令: spark-shell --master spark://spark-B470:7077, master后面的URL就是Spark Master的URL ,能够在 http://localhost:8080 的页面上找到。html

  1. hadoop@spark-B470:~/Develop$ spark-shell --master spark://spark-B470:7077
  2. Setting default log level to "WARN".
  3. To adjust logging level use sc.setLogLevel(newLevel).
  4. SLF4J: Class path contains multiple SLF4J bindings.
  5. SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  6. SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  7. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  8. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
  9. 16/10/29 23:04:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  10. 16/10/29 23:04:25 WARN Utils: Your hostname, spark-B470 resolves to a loopback address: 127.0.1.1; using 192.168.1.110 instead (on interface enp4s0)
  11. 16/10/29 23:04:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
  12. 16/10/29 23:04:27 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
  13. Spark context Web UI available at http://192.168.1.110:4040
  14. Spark context available as 'sc' (master = spark://spark-B470:7077, app id = app-20161029230426-0000).
  15. Spark session available as 'spark'.
  16. Welcome to
  17. ____ __
  18. / __/__ ___ _____/ /__
  19. _\ \/ _ \/ _ `/ __/ '_/
  20. /___/ .__/\_,_/_/ /_/\_\ version 2.0.0
  21. /_/
  22. Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
  23. Type in expressions to have them evaluated.
  24. Type :help for more information.
  25. scala>

看到Scala的交互式窗口后,能够在 http://localhost:8080 的web上 Running Applications 这一栏 看到刚启动的应用程序,若是你要退出Spark-Shell,按 CTRL D组合键退出。java

spark_web_ui

Spark 最主要的抽象是叫 Resilient Distributed Dataset(RDD) 的弹性分布式集合。RDDs 可使用 Hadoop InputFormats(例如 HDFS 文件)建立,也能够从其余的 RDDs 转换。让咱们在 Spark 源代码目录从 /etc/protocols 文本文件中建立一个新的 RDD。python

  1. scala> val file = sc.textFile("file:///etc/protocols")
  2. file: org.apache.spark.rdd.RDD[String] = file:///etc/protocols MapPartitionsRDD[5] at textFile at <console>:24
  3. scala> file.count()
  4. res3: Long = 64
  5. scala> file.first()
  6. res4: String = # Internet (IP) protocols

上面的操做中建立了一个RDD file,执行了两个简单的操做:程序员

  • count() 获取RDD的行数
  • first() 获取第一行的内容

咱们继续执行其余操做,好比查找有多少行含有tcp和udp字符串:web

  1. scala> file.filter(line=>line.contains("tcp")).count()
  2. res2: Long = 1
  3. scala> file.filter(line=>line.contains("udp")).count()
  4. res3: Long = 2

查看一共有多少个不一样单词的方法,这里用到MapReduce的思路:算法

  1. scala> val wordcount = file.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((x,y)=>x+y)
  2. wordcount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:26
  3. scala> wordcount.count()
  4. res4: Long = 243

2. Spark 核心概念

如今你已经用Spark-shell运行了一段Spark程序,是时候对Spark编程做更细致的了解。shell

从上层来看,每一个Spark应用都由一个驱动器程序(driver program)来发起集群 上的各类并行操做。驱动器程序包包含应用的 main 函数, 而且定义了集群上的分布式数据集,还对这些分布式数据集应用了相关的操做。在前面的例子里,实际的驱动器程序就是Spark shell 自己,你只要输入想运行的操做就能够了。express

驱动器程序经过一个 SparkContext 对象来访问Spark,这个对象表明对计算集群的一个链接。shell启动时已经自动建立了一个 SparkContext 对象,是一个叫做 sc 的变量。咱们能够经过在shell里尝试输出 sc 来查看它的类型。apache

  1. scala> sc
  2. res1: org.apache.spark.SparkContext = org.apache.spark.SparkContext@1f172892

一旦有了 SparkContext,你就能够经过它来建立RDD。就像前面的例子,咱们调用了 sc.textFile() 来建立一个表明文件中各行文本的RDD。咱们能够在这些行上进行各类并行操做,好比 count()编程

要执行这些操做,驱动器程序通常要管理多个执行器(executor)节点。好比,若是咱们在集群上运行count() 操做,那么不一样的节点会统计文件的不一样部分的行数。因为咱们刚才是在本地模式下运行的Spark shell,所以全部的工做会在单个节点上执行,但你能够将这个shell链接到集群上来进行并行的数据分析。下图展现Spark如何在一个集群上运行。

Spark分布式集群

3. RDD编程

3.1 RDD基础

Spark中的RDD就是一个不可变的分布式对象集合。每一个RDD都被分为多个分区,这些分区运行在集群中的不一样节点上。用户可使用两种方法建立RDD:读取一个外部数据集,或在驱动程序里分发驱动器程序中的对象集合(好比list和set)。咱们在前面的例子已经使用 SparkContext.textFile() 来读取文本文件做为一个字符串RDD。建立出来后的RDD支持两种类型的操做:转化操做(transformation) 和 行动操做(action)

转化操做会由一个RDD生成一个新的RDD。例如,根据谓词匹配状况筛选数据就是一个常见的转化操做。在咱们的文本示例中,咱们能够用筛选来生成一个只存储包含单词tcp的字符串的的新的RDD。示例以下:

  1. scala> val tcpLines = file.filter(line => line.contains("tcp"))
  2. tcpLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:26

行动操做会对RDD计算一个结果,并把结果返回到驱动器程序中,或者把结果存储到外部存储系统(如HDFS)中。first() 就是咱们以前调用的一个行动操做,它会返回RDD的一个元素,示例以下:

  1. scala> tcpLines.first()
  2. res3: String = tcp 6 TCP # transmission control protocol

转化操做和行动操做的区别在于Spark计算RDD的方式不一样。虽然你能够在任什么时候候定义新的RDD,可是Spark只会惰性计算这些RDD。它们只有第一次在一个行动操做中用到时,才会真正计算。这种策略刚开始看起来可能会显得有些奇怪,不过在大数据领域是颇有道理的。好比,咱们以一个文本文件定义了数据, 而后把其中包含tcp的行筛选出来。若是Spark在运行 val file = sc.textFile("file:///etc/protocols") 时就把文件中的全部行都读取并存储起来,就会消耗不少的存储空间,而咱们立刻就要筛选掉其中的不少数据。相反,一旦Spark了解了完整的转化操做链以后,它就能够只计算求结果时真正须要的数据。事实上,在行动操做first()中,Spark只须要扫描文件直到找到第一个匹配的行为止,而不须要读取整个文件。

最后,默认状况下,Spark的RDD会在你每次对它们进行行动操做时从新计算。若是想在多个行动操做中重用同一个RDD,可使用 RDD.persist() 让Spark把这个RDD缓存下来。在第一次对持久化的RDD计算以后,Spark会把RDD的内容保存到内存中(以分区方式存储到集群中的各机器上),这样在以后的行动操做就能够重用这些数据了。Spark在默认状况下不进行持久化可能显得有些奇怪,不过这对于大规模数据集是颇有意义的:若是不会重用该RDD,咱们就不必浪费存储空间,Spark能够直接遍历一遍数据而后计算出结果。

在实际操做中,你会常常用 persist() 来把数据的一部分读取到内存中,并反复查询这部分数据。例如,咱们想屡次对
/etc/protocols 文件包含tcp的行进行计算,就能够写出以下脚本:

  1. scala> tcpLines.persist()
  2. res8: tcpLines.type = MapPartitionsRDD[3] at filter at <console>:26
  3. scala> tcpLines.count()
  4. res9: Long = 1
  5. scala> tcpLines.first()
  6. res10: String = tcp 6 TCP # transmission control protocol

总的来讲,每一个Spark程序通常的工做流程:

  1. 从外部数据建立输入RDD。
  2. 使用诸如 filter() 这样的转化操做对RDD进行转化,以定义新的RDD。
  3. 告诉Spark对须要被重用的中间结果RDD执行persist() 操做。
  4. 使用行动操做(如count()和first()等)来触发一次并行计算,Spark会对计算进行优化后再执行。

3.2 建立RDD

Spark提供了两种建立RDD 的方式:读取外部数据集,以及在驱动程序中对一个集合进行并行化。

3.2.1 并行集合

并行集合 (Parallelized collections) 的建立是经过在一个已有的集合(Scala Seq)上调用 SparkContext 的 parallelize 方法实现的。集合中的元素被复制到一个可并行操做的分布式数据集中。例如,这里演示了如何在一个包含 1 到 10 的数组中建立并行集合:

  1. scala> val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  2. data: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  3. scala> val distData = sc.parallelize(data)
  4. distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:26

一旦建立完成,这个分布式数据集(distData)就能够被并行操做。例如,咱们能够调用 distData.reduce((a, b) => a + b) 将这个数组中的元素相加。咱们之后再描述在分布式上的一些操做。

并行集合一个很重要的参数是切片数(slices),表示一个数据集切分的份数。Spark 会在集群上为每个切片运行一个任务。你能够在集群上为每一个 CPU 设置 2-4 个切片(slices)。正常状况下,Spark 会试着基于你的集群情况自动地设置切片的数目。然而,你也能够经过 parallelize 的第二个参数手动地设置(例如:sc.parallelize(data, 10))。

注意:除了开发原型和测试时,这种方式用的并很少,毕竟这种方式须要把你的整个数据集先放在一台机器的内存中。

3.2.2 外部数据集

Spark 能够从任何一个 Hadoop 支持的存储源建立分布式数据集,包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。 Spark 支持文本文件(text files),SequenceFiles 和其余 Hadoop InputFormat。

文本文件 RDDs 可使用 SparkContext.textFile() 方法建立。在这个方法里传入文件的 URI (机器上的本地路径 file:// 或 hdfs://,s3n:// 等),而后它会将文件读取成一个行集合。这里是一个调用例子:

  1. scala> val distFile = sc.textFile("file:///home/hadoop/Develop/start.sh")
  2. distFile: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/Develop/start.sh MapPartitionsRDD[14] at textFile at <console>:24

一旦建立完成,distFile 就能作数据集操做。例如,咱们能够用下面的方式使用 map 和 reduce 操做将全部行的长度相加:distFile.map(s => s.length).reduce((a, b) => a + b) 。

注意,Spark 读文件时:

  • 若是使用本地文件系统路径,文件必须能在 work 节点上用相同的路径访问到。要么复制文件到全部的 workers,要么使用网络的方式共享文件系统。
  • 全部 Spark 的基于文件的方法,包括 textFile,能很好地支持文件目录,压缩过的文件和通配符。例如,你可使用 textFile(“/my/文件目录”),textFile(“/my/文件目录/.txt”) 和 textFile(“/my/文件目录/.gz”)。
  • textFile 方法也能够选择第二个可选参数来控制切片(slices)的数目。默认状况下,Spark 为每个文件块(HDFS 默认文件块大小是 64M)建立一个切片(slice)。可是你也能够经过一个更大的值来设置一个更高的切片数目。注意,你不能设置一个小于文件块数目的切片值。

3.3 RDD操做

RDD 支持两种类型的操做:转化操做(transformations) 从已经存在的数据集中建立一个新的数据集;行动操做(actions) 在数据集上进行计算以后返回一个值到驱动器程序。例如,map 是一个转化操做,它将每个数据集元素传递给一个函数而且返回一个新的 RDD。另外一方面,reduce 是一个行动操做,它使用相同的函数来聚合 RDD 的全部元素,而且将最终的结果返回到驱动器程序(不过也有一个并行 reduceByKey 能返回一个分布式数据集)。

3.3.1 向Spark传递函数

Spark的大部分转化操做和一部分行动操做,都须要依赖用户传递函数来计算。这里推荐两种方式:

  • 匿名函数 (Anonymous function syntax),能够在比较短的代码中使用。
  • 全局单例对象里的静态方法。例如,你能够定义 object MyFunctions 而后传递 MyFounctions.func1,像下面这样:
  1. object MyFunctions {
  2. def func1(s: String): String = { ... }
  3. }
  4. myRdd.map(MyFunctions.func1)

注意,它可能传递的是一个类实例里的一个方法引用(而不是一个单例对象),这里必须传送包含方法的整个对象。例如:

  1. class MyClass {
  2. def func1(s: String): String = { ... }
  3. def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
  4. }

这里,若是咱们建立了一个 new MyClass 对象,而且调用它的 doStuffmap 里面引用了这个 MyClass 实例中的 func1 方法,因此这个对象必须传送到集群上。相似写成 rdd.map(x => this.func1(x))

以相似的方式,访问外部对象的字段将会引用整个对象:

  1. class MyClass {
  2. val field = "Hello"
  3. def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
  4. }

至关于写成 rdd.map(x => this.field + x),引用了整个 this 对象。为了不这个问题,最简单的方式是复制 field 到一个本地变量而不是从外部访问它:

  1. def doStuff(rdd: RDD[String]): RDD[String] = {
  2. val field_ = this.field
  3. rdd.map(x => field_ + x)
  4. }
3.3.2 使用键值对

虽然不少 Spark 操做工做在包含任意类型对象的RDD上的,可是少数几个特殊操做仅仅在键值(key-value)对RDD上可用。最多见的是分布式 “shuffle” 操做,例如根据一个 key 对一组数据进行分组和聚合。

在 Scala 中,这些操做在包含二元组(Tuple2)(在语言的内建元组中,经过简单的写 (a, b) 建立) 的 RDD 上自动地变成可用的,只要在你的程序中导入 org.apache.spark.SparkContext._ 来启用 Spark 的隐式转换。在 PairRDDFunctions 的类里键值对操做是可使用的,若是你导入隐式转换它会自动地包装成元组 RDD。

例如,下面的代码在键值对上使用 reduceByKey 操做来统计在一个文件里每一行文本内容出现的次数:

  1. val lines = sc.textFile("data.txt")
  2. val pairs = lines.map(s => (s, 1))
  3. val counts = pairs.reduceByKey((a, b) => a + b)

咱们也可使用 counts.sortByKey(),例如,将键值对按照字母进行排序,最后 counts.collect() 把它们做为一个对象数组带回到驱动程序。

3.3.3 常见的转化操做

下面的表格列了 Spark 支持的一些经常使用 transformations。

转化操做 含义
map(func) 返回一个新的分布式数据集,将数据源的每个元素传递给函数 func 映射组成
filter(func) 返回一个新的数据集,从数据源中选中一些元素经过函数 func 返回 true
flatMap(func) 相似于 map,可是每一个输入项能被映射成多个输出项(因此 func 必须返回一个 Seq,而不是单个 item)。
mapPartitions(func) 相似于 map,可是分别运行在 RDD 的每一个分区上,因此 func 的类型必须是 Iterator=> Iterator 当运行在类型为 T 的 RDD 上。
mapPartitionsWithIndex(func) 相似于 mapPartitions,可是 func 须要提供一个 integer 值描述索引(index),因此 func 的类型必须是 (Int, Iterator) => Iterator 当运行在类型为 T 的 RDD 上。
sample(withReplacement, fraction, seed) 对数据进行采样。
union(otherDataset) 生成一个包含两个RDD中全部元素的RDD
intersection(otherDataset) 返回两个RDD共同的元素的RDD
distinct([numTasks])) 对RDD进行去重
groupByKey([numTasks]) 对具备相同键的值进行分组
reduceByKey(func, [numTasks]) 合并具备相同键的值
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) aggregateByKey函数对PairRDD中相同Key的值进行聚合操做,在聚合过程当中一样使用了一个中立的初始值
sortByKey([ascending], [numTasks]) 返回一个根据键升序或降序排序的RDD
join(otherDataset, [numTasks]) 对两个RDD具备相同键的键值对进行内链接.
cogroup(otherDataset, [numTasks]) 对两个RDD中拥有的相同键的数据分组到一块儿
cartesian(otherDataset) 与另外一个RDD的笛卡尔积
pipe(command, [envVars]) 对RDD中的元素经过脚本管道执行脚本
coalesce(numPartitions, [shuffle]) 该函数用于将RDD进行重分区,使用HashPartitioner。第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;
repartition(numPartitions) 对RDD数据集进行重分区操做,该函数其实就是coalesce函数第二个参数为true的实现

表1. 对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD转化操做

函数名 示例 结果
map() rdd.map(x => x + 1) {2, 3, 4, 4}
flatMap() rdd.flatMap(x => x.to(3)) {1, 2, 3, 2, 3, 3, 3}
filter() rdd.filter(x => x != 1) {2, 3, 3}
distinct rdd.distinct() {1, 2, 3}
sample(withReplacement, fraction, seed) rdd.sample(false, 0.5) 非肯定

接下来,咱们在Spark shell里验证一下上述的几个操做,完整的代码以下:

  1. scala> val mRDD = sc.parallelize(Array(1, 2, 3, 3))
  2. mRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at <console>:24
  3. scala> mRDD.map(x => x + 1).collect()
  4. res30: Array[Int] = Array(2, 3, 4, 4)
  5. scala> mRDD.flatMap(x => x.to(3)).collect()
  6. res31: Array[Int] = Array(1, 2, 3, 2, 3, 3, 3)
  7. scala> mRDD.filter(x => x != 1).collect()
  8. res32: Array[Int] = Array(2, 3, 3)
  9. scala> mRDD.distinct().collect()
  10. res33: Array[Int] = Array(1, 2, 3)
  11. scala> mRDD.sample(false, 0.5).collect()
  12. res35: Array[Int] = Array()
  13. scala> mRDD.sample(false, 0.5).collect()
  14. res36: Array[Int] = Array(1, 2, 3, 3)
  15. scala> mRDD.sample(false, 0.5).collect()
  16. res37: Array[Int] = Array(1, 3)

上述的几个转化操做中,除sample转化操做每次返回不固定的元素,其余几个转化操做的结果都是可预期的。上述的例子中,咱们为了把结果给打印出来,咱们调用了 collect() 行动操做。

表2. 对数据分别为 {1, 2, 3} 和 {3, 4, 5} 的RDD进行针对两个RDD的转化操做

函数名 示例 结果
union() rdd.union(other) {1, 2, 3, 3, 4, 5}
intersection() rdd.intersection(other) {3}
subtract() rdd.subtract(other) {1, 2}
cartesian() rdd.cartesian(other) {(1, 3), (1, 4), … (3, 5)}

一样的,咱们在Spark shell里验证一下上述的几个操做,完整的代码以下:

  1. scala> val firstRDD = sc.parallelize(Array(1, 2, 3))
  2. firstRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24
  3. scala> val secondRDD = sc.parallelize(Array(3, 4, 5))
  4. secondRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24
  5. scala> firstRDD.union(secondRDD).collect()
  6. res26: Array[Int] = Array(1, 2, 3, 3, 4, 5)
  7. scala> firstRDD.intersection(secondRDD).collect()
  8. res27: Array[Int] = Array(3)
  9. scala> firstRDD.subtract(secondRDD).collect()
  10. res28: Array[Int] = Array(1, 2)
  11. scala> firstRDD.cartesian(secondRDD).collect()
  12. res29: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5))
3.3.4 常见的行动操做

下面的表格列了 Spark 支持的一些经常使用 actions。详细内容请参阅 RDD API 文档(ScalaJavaPython) 和 PairRDDFunctions 文档(ScalaJava)。

行动操做 含义
reduce(func) 并行整合RDD中全部数据
collect() 返回RDD中的全部元素
count() 返回RDD中的元素个数
first() 返回RDD中的第一个元素
take(n) 从RDD中返回n个元素
takeSample(withReplacement, num, [seed]) 从RDD中返回任意一些元素
takeOrdered(n, [ordering]) 从RDD中按照提供的顺序返回最前面的n个元素
saveAsTextFile(path) 把RDD中的元素写入本地文件系统或者HDFS文件系统
saveAsSequenceFile(path) (Java and Scala) 把RDD中的元素写入本地文件系统或HDFS文件系统的seqfile
saveAsObjectFile(path) (Java and Scala) 经过Java序列化把RDD中的元素写入文件系统(本地或HDFS),经过SparkContext.objectFile()加载
countByKey() RDD中各元素的Key出现的次数
foreach(func) 对RDD中的每一个元素使用给定的函数

表3,对一个数据为 {1, 2, 3, 3} 的RDD进行基本的RDD行动操做:

函数名 示例 结果
collect() rdd.collect() {1, 2, 3, 3}
count() rdd.count() 4
countByValue() rdd.countByValue() {(1, 1), (2, 1), (3, 2)}
take(num) rdd.take(2) {1, 2}
top(num) rdd.top(2) {3, 3}
takeOrdered(num)(ordering) rdd.takeOrdered(2)(myOrdering) {3, 3}
takeSample(num) rdd.takeSample(false, 1) 非肯定的
reduce(num) rdd.reduce((x, y) = > x + y) 9
fold(num) rdd.fold(0)((x, y) => x + y) 9
aggregate(zeroValue)(seqOp, combOp) rdd.aggregate((0, 0))((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) (9, 4)
foreach(func) rdd.foreach(func)

咱们把上述示例的几个行动操做在Spark shell中验证一下,结果以下:

  1. scala> val mRDD = sc.parallelize(Array(1, 2, 3, 3))
  2. mRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[52] at parallelize at <console>:24
  3. scala> mRDD.collect()
  4. res41: Array[Int] = Array(1, 2, 3, 3)
  5. scala> mRDD.count()
  6. res42: Long = 4
  7. scala> mRDD.countByValue()
  8. res43: scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)
  9. scala> mRDD.take(2)
  10. res44: Array[Int] = Array(1, 2)
  11. scala> mRDD.top(2)
  12. res45: Array[Int] = Array(3, 3)
  13. scala> implicit val myOrd = implicitly[Ordering[Int]].reverse
  14. myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@5dc5e110
  15. scala> mRDD.takeOrdered(2)(myOrd)
  16. res99: Array[Int] = Array(3, 3)
  17. scala> mRDD.takeSample(false, 1)
  18. res53: Array[Int] = Array(3)
  19. scala> mRDD.takeSample(false, 1)
  20. res54: Array[Int] = Array(2)
  21. scala> mRDD.takeSample(false, 1)
  22. res55: Array[Int] = Array(3)
  23. scala> mRDD.reduce((x,y) => x+y)
  24. res56: Int = 9
  25. scala> mRDD.fold(0)((x,y) => x+y)
  26. res57: Int = 9
  27. scala> mRDD.aggregate((0, 0))((x,y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2))
  28. res58: (Int, Int) = (9,4)
  29. scala> mRDD.foreach(println)
  30. 3
  31. 2
  32. 3
  33. 1

备注:上述的takeSample操做为从RDD中返回任意一个元素,因此每次返回的结果都有可能不一样。

3.4 RDD持久化

Spark最重要的一个功能是它能够经过各类操做(operations)持久化(或者缓存)一个集合到内存中。当你持久化一个RDD的时候,每个节点都将参与计算的全部分区数据存储到内存中,而且这些数据能够被这个集合(以及这个集合衍生的其余集合)的动做(action)重复利用。这个能力使后续的动做速度更快(一般快10倍以上)。对应迭代算法和快速的交互使用来讲,缓存是一个关键的工具。

你能经过 persist() 或者 cache() 方法持久化一个 RDD 。首先,在action中计算获得RDD;而后,将其保存在每一个节点的内存中。Spark的缓存是一个容错的技术-若是RDD的任何一个分区丢失,它 能够经过原有的转换(transformations)操做自动的重复计算而且建立出这个分区。

此外,咱们能够利用不一样的存储级别存储每个被持久化的RDD。例如,它容许咱们持久化集合到磁盘上、将集合做为序列化的Java对象持久化到内存中、在节点间复制集合或者存储集合到Tachyon中。咱们能够经过传递一个 StorageLevel 对象给 persist() 方法设置这些存储级别。cache() 方法使用了默认的存储级别— StorageLevel.MEMORY_ONLY。完整的存储级别介绍以下所示:

存储级别 含义
MEMORY_ONLY 将RDD做为非序列化的Java对象存储在jvm中。若是RDD不适合存在内存中,一些分区将不会被缓存,从而在每次须要这些分区时都需从新计算它们。这是系统默认的存储级别。
MEMORY_AND_DISK 将RDD做为非序列化的Java对象存储在jvm中。若是RDD不适合存在内存中,将这些不适合存在内存中的分区存储在磁盘中,每次须要时读出它们。
MEMORY_ONLY_SER 将RDD做为序列化的Java对象存储(每一个分区一个byte数组)。这种方式比非序列化方式更节省空间,特别是用到快速的序列化工具时,可是会更耗费cpu资源—密集的读操做。
MEMORY_AND_DISK_SER 和MEMORY_ONLY_SER相似,但不是在每次须要时重复计算这些不适合存储到内存中的分区,而是将这些分区存储到磁盘中。
DISK_ONLY 仅仅将RDD分区存储到磁盘中
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 和上面的存储级别相似,可是复制每一个分区到集群的两个节点上面
OFF_HEAP (experimental) 以序列化的格式存储RDD到Tachyon中。相对于MEMORY_ONLY_SER,OFF_HEAP减小了垃圾回收的花费,容许更小的执行者共享内存池。这使其在拥有大量内存的环境下或者多并发应用程序的环境中具备更强的吸引力。

Spark也会自动持久化一些shuffle操做(如 reduceByKey)中的中间数据,即便用户没有调用 persist 方法。这样的好处是避免了在shuffle出错状况下,须要重复计算整个输入。若是用户计划重用 计算过程当中产生的RDD,咱们仍然推荐用户调用 persist 方法。

3.4.1 如何选择存储级别

Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不一样权衡。咱们推荐经过下面的过程选择一个合适的存储级别:

  • 若是你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。由于这是cpu利用率最高的选项,会使RDD上的操做尽量的快。

  • 若是不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提升对象的空间使用率,可是仍可以至关快的访问。

  • 除非函数计算RDD的花费较大或者它们须要过滤大量的数据,不要将RDD存储到磁盘上,不然,重复计算一个分区就会和重磁盘上读取数据同样慢。

  • 若是你但愿更快的错误恢复,能够利用重复(replicated)存储级别。全部的存储级别均可以经过重复计算丢失的数据来支持完整的容错,可是重复的数据可以使你在RDD上继续运行任务,而不须要重复计算丢失的数据。

  • 在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP具备以下优点:

    • 它运行多个执行者共享Tachyon中相同的内存池
    • 它显著地减小垃圾回收的花费
    • 若是单个的执行者崩溃,缓存的数据不会丢失
3.4.2 删除数据

Spark自动的监控每一个节点缓存的使用状况,利用最近最少使用原则删除老旧的数据。若是你想手动的删除RDD,可使用 RDD.unpersist() 方法

4 共享变量

通常状况下,当一个传递给Spark操做(例如map和reduce)的函数在远程节点上面运行时,Spark操做实际上操做的是这个函数所用变量的一个独立副本。这些变量被复制到每台机器上,而且这些变量在远程机器上的全部更新都不会传递回驱动程序。一般跨任务的读写变量是低效的,可是,Spark仍是为两种常见的使用模式提供了两种有限的共享变量:广播变量(broadcast variable)和累加器(accumulator)

4.1 广播变量

广播变量容许程序员缓存一个只读的变量在每台机器上面,而不是每一个任务保存一份拷贝。例如,利用广播变量,咱们可以以一种更有效率的方式将一个大数据量输入集合的副本分配给每一个节点。Spark也尝试着利用有效的广播算法去分配广播变量,以减小通讯的成本。

一个广播变量能够经过调用 SparkContext.broadcast(v) 方法从一个初始变量v中建立。广播变量是v的一个包装变量,它的值能够经过value方法访问,下面的代码说明了这个过程:

  1. scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
  2. broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(87)
  3. scala> broadcastVar.value
  4. res114: Array[Int] = Array(1, 2, 3)

广播变量建立之后,咱们就可以在集群的任何函数中使用它来代替变量v,这样咱们就不须要再次传递变量v到每一个节点上。另外,为了保证全部的节点获得广播变量具备相同的值,对象v不能在广播以后被修改。

4.2 累加器

顾名思义,累加器是一种只能经过关联操做进行“加”操做的变量,所以它可以高效的应用于并行操做中。它们可以用来实现 counters 和 sums。Spark原生支持数值类型的累加器,开发者能够本身添加支持的类型。若是建立了一个具名的累加器,它能够在spark的UI中显示。这对于理解运行阶段(running stages)的过程有很重要的做用。

一个累加器能够经过调用 SparkContext.accumulator(v) 方法从一个初始变量v中建立。运行在集群上的任务能够经过 add 方法或者使用 += 操做来给它加值。然而,它们没法读取这个值。只有驱动程序可使用 value 方法来读取累加器的值。以下代码,展现了如何利用累加器将一个数组里面的全部元素相加:

  1. scala> val accum = sc.accumulator(0, "My Accumulator")
  2. accum: org.apache.spark.Accumulator[Int] = 0
  3. scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
  4. scala> accum.value
  5. res2: Int = 10

这个例子利用了内置的整数类型累加器。开发者能够利用子类 AccumulatorParam 建立本身的 累加器类型。AccumulatorParam 接口有两个方法:zero 方法为你的数据类型提供一个“0 值”(zero value);addInPlace 方法计算两个值的和。例如,假设咱们有一个Vector 类表明数学上的向量,咱们可以定义以下累加器:

  1. object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  2. def zero(initialValue: Vector): Vector = {
  3. Vector.zeros(initialValue.size)
  4. }
  5. def addInPlace(v1: Vector, v2: Vector): Vector = {
  6. v1 += v2
  7. }
  8. }
  9. // Then, create an Accumulator of this type:
  10. val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

在scala中,Spark支持用更通常的Accumulable接口来累积数据-结果类型和用于累加的元素类型 不同(例如经过收集的元素创建一个列表)。Spark也支持用 SparkContext.accumulableCollection 方法累加通常的scala集合类型。

5. 总结

简单介绍了Spark核心概念和RDD操做,经过这些基本的转化操做和行动操做,就能够进行简单的Spark应用开发。

相关文章
相关标签/搜索