Spark的RDD编程实战案例java
做者:尹正杰apache
版权声明:原创做品,谢绝转载!不然将追究法律责任。编程
RDD体现了装饰者设计模式,将数据处理的逻辑进行封装,接下来让咱们一块儿来体验一下吧。设计模式
一.RDD概述数组
1>.什么是RDD缓存
RDD全称为"Resilient Distributed Dataset",叫作弹性分布式数据集,是Spark中最基本的数据抽象。
代码中是一个抽象类,它表明一个不可变、可分区、里面的元素可并行计算的集合。
2>.RDD的属性网络
Internally, each RDD is characterized by five main properties: A list of partitions: 一组分区(Partition),即数据集的基本组成单位; A function for computing each split: 一个计算每一个分区的函数,换句话说,是计算数据放在哪一个分区中; A list of dependencies on other RDDs: RDD之间的依赖关系; Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned): 一个Partitioner,即RDD的分片函数; Optionally, a list of preferred locations to compute each split on (e.g. block locations foran HDFS file): 一个列表,存储存取每一个Partition的优先位置(preferred location),即数据所存储的节点;
3>.RDD的特色 dom
RDD表示只读的分区的数据集,对RDD进行改动,只能经过RDD的转换操做,由一个RDD获得一个新的RDD,新的RDD包含了从其余RDD衍生所必需的信息。
RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。若是血缘关系较长,能够经过持久化RDD来切断血缘关系。
RDD有如下几个显著特色:
分区 RDD逻辑上是分区的,每一个分区的数据是抽象存在的,计算的时候会经过一个compute函数获得每一个分区的数据;
若是RDD是经过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,若是RDD是经过其余RDD转换而来,则compute函数是执行转换逻辑将其余RDD的数据进行转换; 只读 RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上建立新的RDD; 由一个RDD转换到另外一个RDD,能够经过丰富的操做算子实现,再也不像MapReduce那样只能写map和reduce了; RDD的操做算子(Operate)包括两类:
transformations(转换算子):
它是用来将RDD进行转化,构建RDD的血缘关系;
actions(行动算子):
它是用来触发RDD的计算,获得RDD的相关计算结果或者将RDD保存的文件系统中;
依赖 RDDs经过操做算子进行转换,转换获得的新RDD包含了从其余RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。以下所示,依赖包括两种:
窄依赖:
RDDs之间分区是一一对应的;
宽依赖:
下游RDD的每一个分区与上游RDD(也称之为父RDD)的每一个分区都有关,是多对多的关系。 缓存 若是在应用程序中屡次使用同一个RDD,能够将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系获得分区的数据,在后续其余地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用;
CheckPoint 虽然RDD的血缘关系自然地能够实现容错,当RDD的某个分区数据失败或丢失,能够经过血缘关系重建。
可是对于长时间迭代型应用来讲,随着迭代的进行,RDDs之间的血缘关系会愈来愈长,一旦在后续迭代过程当中出错,则须要经过很是长的血缘关系去重建,势必影响性能。
为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就能够切断以前的血缘关系,由于checkpoint后的RDD不须要知道它的父RDDs了,它能够从checkpoint处拿到数据。
二.RDD的建立分布式
1>.编程模型ide
在Spark中,RDD被表示为对象,经过对象上的方法调用来对RDD进行转换。
通过一系列的transformations定义RDD以后,就能够调用actions触发RDD的计算,action能够是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。
在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时能够经过管道的方式传输多个转换。
要使用Spark,开发者须要编写一个Driver程序,它被提交到集群以调度运行Worker,
2>.RDD的建立
package com.yinzhengjie.bigdata.spark import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CreateRDD { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) /** * RDD的建立: * 在Spark中建立RDD的建立方式能够分为三种: * (1)从集合(内存)中建立RDD; * 从集合中建立RDD,Spark主要提供了两种函数:parallelize和makeRDD * (2)从外部存储建立RDD; * 包括本地的文件系统,还有全部Hadoop支持的数据集,好比HDFS、Cassandra、HBase等 * (3)从其余RDD建立。 */ //使用SparkContext对象的parallelize方法能够在内存中建立RDD val arrayRDD:RDD[String] = sc.parallelize(Array("yinzhengjie","JasonYin2020")) arrayRDD.collect().foreach(println) //使用SparkContext对象的makeRDD方法也能够在内存中建立RDD,其底层实现就是parallelize方法 val listRDD:RDD[Int] = sc.makeRDD(List(100,200,300)) listRDD.collect().foreach(println) /** * 使用SparkContext对象的textFile方法从外部存储中建立RDD * * 舒适提示: * 默认状况下能够读取项目路径,也能够读取其它路径,好比HDFS,HBase对应的路径等 * 默认从文件中读取的数据都是字符串类型 */ val fileRDD:RDD[String] = sc.textFile("E:\\yinzhengjie\\bigdata\\spark\\data") fileRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object RDDPartition { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) /** * 使用SparkContext对象的makeRDD函数签名以下: * def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism) : RDD[T] = withScope * * 舒适提示: * seq: * 传入一个序列集合类型,好比List,Array * Int = defaultParallelism: * 指定分区数的并行度,传入一个整形,不传也能够,即便用defaultParallelism,该值默认是您的操做系统对应的总核心数。 * */ val listRDD:RDD[String] = sc.makeRDD(List("yinzhengjie","JasonYin2020"),6) //使用6个自定义分区 //将RDD的数据保存到文件中 listRDD.saveAsTextFile("E:/yinzhengjie/bigdata/spark/output") /** * 使用SparkContext对象的textFile函数签名以下: * def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope * * 舒适提示: * path: * 指定文件的路径,能够是本地路径,也能够是hdfs,hbase等路径 * minPartitions: * 指定最小的分区数据,可是不必定是这个分区数,取决于Hadoop读取文件时分片规则。 * */ val fileRDD:RDD[String] = sc.textFile("E:\\yinzhengjie\\bigdata\\spark\\data",2) //自定义2个分区(但实际上可能比2要大,这取决于Hadoop的分片机制) //保存文件时建议不要和源文件在同一个目录,不然可能会出错哟~ fileRDD.saveAsTextFile("E:/yinzhengjie/bigdata/spark/output2") } }
三.RDD经常使用的算子(Operate)
RDD的操做算子(Operate)包括两类,即转换算子(transformations operate)和actions(行动算子)。
transformations(转换算子):
它是用来将RDD进行转化,构建RDD的血缘关系。
actions(行动算子):
它是用来触发RDD的计算,获得RDD的相关计算结果或者将RDD保存的文件系统中。
舒适提示:
转换算子只是对业务逻辑的封装并无真正执行代码,而行动算子就会真正触发代码的执行操做。换句话说,行动算子就是用来触发RDD计算操做的,一旦使用了行动算子,那么在行动算子以前的转换算子会被触发执行。
1>.Value类型
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object MapOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(20 to 30) //遍历listRDD listRDD.collect().foreach(println) //使用map算子(Operate),将listRDD的全部元素乘以5获得新的RDD val mapRDD:RDD[Int] = listRDD.map(x => x * 5) //该行可简写为"val mapRDD:RDD[Int] = listRDD.map(_ * 5)" //遍历mapRDD mapRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object MapPartitionsOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(20 to 30) //遍历listRDD listRDD.collect().foreach(println) /** * 使用mapPartitions算子(Operate),将listRDD的全部元素乘以5获得新的RDD * * mapPartitionsk能够对一个RDD中全部的分区进行遍历,假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理全部分区。 * * map()和mapPartition()的区别以下: * map(): * 每次处理一条数据。 * mapPartition(): * 每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能致使OOM。 * 开发指导: * 当内存空间较大的时候建议使用mapPartition(),以提升处理效率。 * * 舒适提示: * mapPartitions效率优于map算子(Operate),减小了发送执行器(Executor)执行交互次数(mapPartitions的Operate是基于分区为单位发送一次任务调度到Executor,而map的Operate是每处理一条数据就发送一次任务调度给Executor) * 若是分区的数据比执行器(Executor)的内存大,则使用mapPartitions可能会出现内存溢出(OOM),好比一个分区有12G数据,但Executor仅有10G大小,就会出现OOM现象。 * 综上所述,到底使用map仍是mapPartitions算子(Operate)根据实际状况而定。 */ val mapPartitionsRDD:RDD[Int] = listRDD.mapPartitions(datas => { datas.map(_ * 5) }) //遍历mapRDD mapPartitionsRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object mapPartitionsWithIndexOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD并指定分区数为3 val listRDD:RDD[Int] = sc.makeRDD(20 to 30,3) //遍历listRDD listRDD.collect().foreach(println) //使用mapPartitionsWithIndex算子(Operate),将listRDD的全部元素跟所在分区造成一个元组组成一个新的RDD val tupleRDD:RDD[(Int,String)] = listRDD.mapPartitionsWithIndex{ case (numPartition,datas) => { datas.map((_,"分区编号: " + numPartition)) } } //遍历tupleRDD tupleRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object FlatMapOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD并指定分区数为3 val listRDD:RDD[List[Int]] = sc.makeRDD(Array(List(10,20),List(60,80))) //遍历listRDD listRDD.collect().foreach(println) //使用flatMap算子(Operate),将listRDD的全部元素扁平化,它相似于map,可是每个输入元素能够被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素) val flatMapRDD:RDD[Int] = listRDD.flatMap(x =>x) //遍历flatMapRDD flatMapRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object GlomOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(100 to 161,4) //遍历listRDD listRDD.collect().foreach(println) //将一个分区的数据放到一个数组中,这样咱们能够对其进行操做,好比求和,求最值等。 val glomRDD:RDD[Array[Int]] = listRDD.glom() //遍历glomRDD glomRDD.collect().foreach( array =>{ println(array.mkString(",")) } ) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object GroupByOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(10 to 20) //遍历listRDD listRDD.collect().foreach(println) /** * 使用groupBy算子(Operate)进行分组,按照传入函数(指定规则)的返回值进行分组,将相同的key对应的值放入一个迭代器。 * * 分组后的数据造成了对偶元组(K,V),K表示分组的key,V表示分组的数据集合。 * * 下面的案例就是按照元素模以2的值进行分组。 */ val groupByRDD:RDD[(Int,Iterable[Int])] = listRDD.groupBy(i => i % 2) //遍历groupByRDD groupByRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object FilterOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(10 to 20) //遍历listRDD listRDD.collect().foreach(println) /** * 使用filter算子(Operate)进行过滤。返回一个新的RDD,该RDD由通过func函数(按照指定的规则)计算后返回值为true的输入元素组成。 * * 下面的案例就是按照元素模以2的值进行过滤,即仅保留偶数。 */ val filterRDD:RDD[Int] = listRDD.filter(x => x % 2 == 0) //遍历filterRDD filterRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SampleOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(10 to 20) //遍历listRDD listRDD.collect().foreach(println) /** * sample算子(Operate)用以指定的随机种子随机抽样出数量为fraction的数据。 * * sample的函数签名以下: * def sample( withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T] * * 如下是相关参数说明: * withReplacement: * 表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样, * fraction: * 表示sample的打分,是一个Double类型。 * seed: * 用于指定随机数生成器种子。 * */ val sampleRDD:RDD[Int] = listRDD.sample(false,0.7,1) //遍历sampleRDD sampleRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object DistinctOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(List(6,6,8,1,2,1,6,9,5,6,1,8,2,7,0,7,6,3,5,4,6,0,7,1)) //遍历listRDD listRDD.collect().foreach(println) /** * 使用distinct算子(Operate)对数据去重,可是由于去重后会致使数据减小,因此能够自定义分区数量,默认分区数是你操做系统的真实core数量。 * */ // val distinctRDD:RDD[Int] = listRDD.distinct() val distinctRDD:RDD[Int] = listRDD.distinct(3) //为了了看到试验效果,建议将结果以文件的形式保存,直接打印到控制台终端可能看不出效果哟~ // distinctRDD.collect().foreach(println) distinctRDD.saveAsTextFile("E:\\yinzhengjie\\bigdata\\spark\\output") } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CoalesceOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD,指定分区数切片为4 val listRDD:RDD[Int] = sc.makeRDD(1 to 16,4) //遍历listRDD // listRDD.collect().foreach(println) println("缩减分区前分区数量: " + listRDD.partitions.size) //使用coalesce算子(Operate)缩减分区数,用于大数据集过滤后,提升小数据集的执行效率。能够简单理解为合并分区 val coalesceRDD:RDD[Int] = listRDD.coalesce(3) println("缩减分区后分区数量: " + coalesceRDD.partitions.size) coalesceRDD.saveAsTextFile("E:\\yinzhengjie\\bigdata\\spark\\output") } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object RepartitionsOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD,指定分区数切片为4 val listRDD:RDD[Int] = sc.makeRDD(1 to 16,4) //遍历listRDD,发现数据是有序的 listRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) println("Rebalance前分区数量: " + listRDD.partitions.size) /** * 使用repartition算子(Operate)是根据分区数,从新经过网络随机洗牌全部数据。 * * coalesce和repartition的区别 * 1>.coalesce从新分区,能够选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。 * 2>.repartition其实是调用的coalesce,默认是进行shuffle的。 * * 下面的案例就是对listRDD进行从新分区(将listRDD的4个分区数从新分区为2个),生成一个新的RDD对象rebalanceRDD。 */ val rebalanceRDD:RDD[Int] = listRDD.repartition(2) println("Rebalance后分区数量:" + rebalanceRDD.partitions.size) //遍历rebalanceRDD,此时你会发现数据并非有序的,而是被打乱啦~ rebalanceRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SortByOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val listRDD:RDD[Int] = sc.parallelize(List(2,1,7,6,9,3,8,5)) //遍历listRDD listRDD.collect().foreach(println) /** * sortBy算子的函数参数列表签名以下: * def sortBy[K](f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length) * * 经过函数签名能够知道咱们使用时只须要传入一个参数便可, 其它2个参数均有默认值, * * 使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为升序(ascending: Boolean = true)。 * * 下面的案例按照自身大小进行排序,默认是升序。 */ val sortByRDD:RDD[Int] = listRDD.sortBy(x => x) //遍历sortByRDD sortByRDD.collect().foreach( x =>{ println(x) } ) //下面的案例按照自身大小进行排序,咱们指定ascending的值为false,排序则为降序。 val sortByRDD2:RDD[Int] = listRDD.sortBy(x => x,false) //遍历sortByRDD2 sortByRDD2.collect().foreach(println) } }
2>.双Value类型交互
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object UnionOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立rdd1 val rdd1:RDD[Int] = sc.parallelize(List(1,3,5,7,9)) //建立rdd2 val rdd2:RDD[Int] = sc.makeRDD(List(2,4,6,8,10)) /** * union算子(Operate)能够对源RDD和参数RDD求并集后返回一个新的RDD。 * * 下面的案例就是将rdd1和rdd2进行合并为sumrdd, */ val sumRDD:RDD[Int] =rdd1.union(rdd2) //遍历sumRDD sumRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SubtractOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立rdd1 val rdd1:RDD[Int] = sc.parallelize(10 to 20) //建立rdd2 val rdd2:RDD[Int] = sc.makeRDD(15 to 30) /** * subtract算子是用来计算差的一种函数,去除两个RDD中相同的元素,不一样的RDD将保留下来。 * * 下面的案例就是计算第一个RDD与第二个RDD的差集并打印 */ val subtractRDD:RDD[Int] =rdd1.subtract(rdd2) //遍历subtractRDD subtractRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object IntersectionOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立rdd1 val rdd1:RDD[Int] = sc.parallelize(10 to 20) //建立rdd2 val rdd2:RDD[Int] = sc.makeRDD(15 to 30) //使用计算两个RDD的交集 val intersectionRDD:RDD[Int] = rdd1.intersection(rdd2) //遍历intersectionRDD intersectionRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CartesianOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立rdd1 val rdd1:RDD[Int] = sc.parallelize(10 to 20) //建立rdd2 val rdd2:RDD[Int] = sc.makeRDD(15 to 30) //计算两个RDD的笛卡尔积并打印,生产环境中应该尽可能避免使用! val cartesian:RDD[(Int,Int)] = rdd1.cartesian(rdd2) //遍历cartesian cartesian.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object ZipOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立rdd1 val rdd1:RDD[Int] = sc.parallelize(Array(100,200,300),3) //建立rdd2 val rdd2:RDD[String] = sc.makeRDD(Array("storm","spark","flink"),3) //zip算子能够将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,不然会抛出异常。 val zipRDD:RDD[(Int,String)] = rdd1.zip(rdd2) //遍历zipRDD zipRDD.collect().foreach(println) } }
3>.Key-Value类型
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object PartitionByOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD val rdd1:RDD[(Int,String)] = sc.makeRDD(Array((1,"hdfs"),(2,"yarn"),(3,"mapreduce"),(4,"spark")),4) //遍历rdd2 rdd1.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) println("rdd1分区数是: " + rdd1.partitions.size) /** * 对rdd1进行重分区(对rdd1进行分区操做,若是原有的rdd1和现有的rdd2分区数是一致的话就不进行分区, 不然会生成ShuffleRDD,即会产生shuffle过程。) * * 须要注意的是,partitionBy算子属于PairRDDFunctions类,所以这里设计到了隐式转换哟~ * */ val rdd2:RDD[(Int,String)] = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2)) println("rdd2分区数是: " + rdd2.partitions.size) //遍历rdd2 rdd2.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object GroupByKeyOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立一个数组 val words = Array("HDFS","YARN","HDFS","STORM","HDFS","SPARK","YARN","FLLINK","HDFS") //建立RDD并将上面的words映射为二元组便于后面使用grooupByKey算子 val mapRDD:RDD[(String,Int)] = sc.makeRDD(words).map(word => (word,1)) //groupByKey也是对每一个key进行操做,但只生成一个sequence。 val groupByKeyRDD:RDD[(String,Iterable[Int])] = mapRDD.groupByKey() //遍历groupByKeyRDD groupByKeyRDD.collect().foreach(println) //对每一个单词进行统计 groupByKeyRDD.map(word => (word._1, word._2.sum)).collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object ReduceByKeyOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立一个数组 val words = Array("HDFS","YARN","HDFS","STORM","HDFS","SPARK","YARN","FLLINK","HDFS") //建立RDD并将上面的words映射为二元组便于后面使用reduceByKey算子 val mapRDD:RDD[(String,Int)] = sc.makeRDD(words).map(word => (word,1)) /** * 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一块儿,reduce任务的个数能够经过第二个可选的参数来设置。 * * reduceByKey和groupByKey的区别以下: * reduceByKey: * 按照key进行聚合,在shuffle以前有combine(预聚合)操做,返回结果是RDD[k,v]. * groupByKey: * 按照key进行分组,直接进行shuffle。 * 开发指导: * reduceByKey比groupByKey建议使用,由于预聚合操做会节省带宽传输,可是须要注意是否会影响业务逻辑。 */ val reduceByKeyRDD:RDD[(String,Int)] = mapRDD.reduceByKey(_+_) //遍历reduceByKeyRDD reduceByKeyRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object AggregateByKeyOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",30),("A",21),("C",40),("B",13),("C",61),("C",18)),2) //遍历listRDD各个分区的元素 listRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) /** * aggregateByKey的函数签名以下: * def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] = self.withScope * * 做用: * 在kv对的RDD中,按key将value进行分组合并,合并时,将每一个value和初始值做为seq函数的参数,进行计算,返回的结果做为一个新的kv对,而后再将结果按照key进行合并; * 最后将每一个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果做为一个新的kv对输出。 * 参数描述: * zeroValue: * 给每个分区中的每个key一个初始值; * seqOp: * 函数用于在每个分区中用初始值逐步迭代value; * combOp: * 函数用于合并每一个分区中的结果。 * * 下面的案例为建立一个pairRDD,取出每一个分区相同key对应值的最大值,而后相加 */ val aggregateByKeyRDD:RDD[(String, Int)] = listRDD.aggregateByKey(0)(math.max(_,_),_+_) //遍历aggregateByKeyRDD各个分区的元素 aggregateByKeyRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) //使用aggregateByKey也能够实现相似于WordCount的功能 val wcRDD:RDD[(String, Int)] = listRDD.aggregateByKey(0)(_+_,_+_) //遍历wcRDD wcRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object FoldByKeyOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",30),("A",21),("C",40),("B",13),("C",61),("C",18)),2) //遍历listRDD各个分区的元素 listRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) /** * aggregateByKey与foldByKey的区别: * aggregateByKey的简化操做,seqop和combop相同。 * 咱们会发现aggregateByKey须要传递2个参数,分别用于分区内和分区间的操做; * 而foldByKey只须要传递一个参数,由于分区内和分区间的操做相同,所以只须要传递一个参数便可. * * 下面的案例是计算相同key对应值的相加结果 */ val foldByKeyRDD:RDD[(String,Int)] = listRDD.foldByKey(0)(_+_) //遍历foldByKeyRDD各个分区的元素 foldByKeyRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CombineByKeyOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2) //遍历listRDD各个分区的元素 listRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) /** * 参数: * (createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) * 做用: * 对相同K,把V合并成一个集合。 * 参数描述: * createCombiner: * combineByKey() 会遍历分区中的全部元素,所以每一个元素的键要么尚未遇到过,要么就和以前的某个元素的键相同。 * 若是这是一个新的元素,combineByKey()会使用一个叫做createCombiner()的函数来建立那个键对应的累加器的初始值 * mergeValue: * 若是这是一个在处理当前分区以前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并 * mergeCombiners: * 因为每一个分区都是独立处理的, 所以对于同一个键能够有多个累加器。 * 若是有两个或者更多的分区都有对应同一个键的累加器, 就须要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。 * * * 下面的案例就是根据key计算每种key的均值。(先计算每一个key出现的次数以及能够对应值的总和,再相除获得结果) */ val combineByKeyRDD:RDD[(String,(Int,Int))] = listRDD.combineByKey( (_,1), //转换结构,一个key第一次出现对其计数为1 (acc:(Int,Int),v)=>(acc._1+v,acc._2+1), //定义分区内的计算规则,即相同key的vlaue相加,并将计数器加1 (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2) //定义分区间的计算规则,即将各个分区相同key的计算结果进行累加操做。 ) //遍历combineByKeyRDD各个分区的元素 combineByKeyRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) //计算平均值 val averageValueRDD:RDD[(String,Double)] = combineByKeyRDD.map{case (key,value) => (key,value._1/value._2.toDouble)} //遍历averageValueRDD各个分区的元素(能够查看对应key的平均值) averageValueRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SortByKeyOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val arrayRDD:RDD[(Int,String)] = sc.parallelize(Array((3,"Hadoop"),(8,"storm"),(2,"spark"),(6,"flink"))) /** * sortByKey算子在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD,ascending的值默认为true */ val positiveSequenceRDD:RDD[(Int,String)] = arrayRDD.sortByKey() positiveSequenceRDD.collect().foreach(println) /** * sortByKey算子在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD,ascending的值为false时,顺序为倒叙。 */ val ReverseOrderRDD:RDD[(Int,String)] = arrayRDD.sortByKey(false) ReverseOrderRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object MapValuesOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val arrayRDD:RDD[(Int,String)] = sc.parallelize(Array((3,"Hadoop"),(8,"storm"),(2,"spark"),(6,"flink"),(1,"mapreduce"))) arrayRDD.collect().foreach(println) /** * 针对于(K,V)形式的类型只对V进行操做 * * 下面的案例就是对value添加字符串"*****" */ val mapValuesRDD:RDD[(Int,String)] = arrayRDD.mapValues(_ + "*****") mapValuesRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object JoinOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val rdd1:RDD[(Int,String)] = sc.parallelize(Array((1,"MapReduce"),(2,"Spark"),(3,"Flink"))) val rdd2:RDD[(Int,Int)] = sc.parallelize(Array((1,30),(2,60),(3,90))) /** * 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的全部元素对在一块儿的(K,(V,W))的RDD. */ val rdd3:RDD[(Int,(String,Int))] = rdd1.join(rdd2) rdd3.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CogroupOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val rdd1:RDD[(Int,String)] = sc.parallelize(Array((1,"MapReduce"),(2,"Spark"),(3,"Flink"))) val rdd2:RDD[(Int,Int)] = sc.makeRDD(Array((1,30),(2,60),(3,90))) /** * 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD.. */ val rdd3:RDD[(Int, (Iterable[String], Iterable[Int]))] = rdd1.cogroup(rdd2) rdd3.collect().foreach(println) } }
4>.Actions
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object ReduceOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val rdd1:RDD[Int] = sc.makeRDD(1 to 100,2) val rdd2 = sc.parallelize(Array(("Hadoop",100),("Spark",300),("Flink",500),("MapReduce",700))) /** * 经过func函数汇集RDD中的全部元素,先聚合分区内数据,再聚合分区间数据。 */ val res1:Int = rdd1.reduce(_+_) val res2:(String,Int) = rdd2.reduce((x,y)=>(x._1 + "-" + y._1,x._2 + y._2)) println(res1) println(res2) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CollectOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD val rdd1:RDD[Int] = sc.parallelize(1 to 100) /** * 在驱动程序中,以数组的形式返回数据集的全部元素。 */ val res:Array[Int] = rdd1.collect() res.foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CountOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD val rdd1:RDD[Int] = sc.makeRDD(1 to 100) /** * 返回RDD中元素的个数 */ val count:Long = rdd1.count println(count) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object FirstOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD val rdd1:RDD[Int] = sc.makeRDD(50 to 100) /** * 返回RDD中的第一个元素 */ val res1:Int = rdd1.first() println(res1) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object TakeOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD val rdd1:RDD[Int] = sc.makeRDD(50 to 100) /** * 返回一个由RDD的前n个元素组成的数组 */ val res1:Array[Int] = rdd1.take(5) res1.foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object TakeOrderedOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(List(9,5,20,7,10,4,8,30,6)) /** * 返回该RDD排序后的前n个元素组成的数组 */ val res:Array[Int] = listRDD.takeOrdered(5) res.foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object AggregateOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val rdd1:RDD[Int] = sc.parallelize(1 to 100,2) /** * aggregate函数将每一个分区里面的元素经过seqOp和初始值进行聚合,而后用combine函数将每一个分区的结果和初始值(zeroValue)进行combine操做。这个函数最终返回的类型不须要和RDD中元素类型一致。 * * 须要注意的是,aggregate算子在计算时,各分区内部计算须要加上初始值(zeroValue),分区间计算也会加上该初始值(zeroValue) */ val res1:Int = rdd1.aggregate(0)(_+_,_+_) val res2:Int = rdd1.aggregate(100)(_+_,_+_) println(res1) println(res2) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object FoldOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val rdd1:RDD[Int] = sc.parallelize(1 to 100,2) /** * 折叠操做,aggregate的简化操做,seqop和combop同样。 */ val res1:Int = rdd1.fold(0)(_+_) val res2:Int = rdd1.fold(100)(_+_) println(res1) println(res2) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SaveAsTextFileOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2) /** * 将数据集的元素以textfile的形式保存到HDFS文件系统或者其余支持的文件系统,对于每一个元素,Spark将会调用toString方法,将它装换为文件中的文本 */ listRDD.saveAsTextFile("E:\\yinzhengjie\\bigdata\\spark\\text") } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SaveAsSequenceFileOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2) /** * 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可使HDFS或者其余Hadoop支持的文件系统。 */ listRDD.saveAsSequenceFile("E:\\yinzhengjie\\bigdata\\spark\\sequence") } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SaveAsObjectFileOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config: SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val listRDD: RDD[(String, Int)] = sc.parallelize(List(("A", 130), ("B", 121), ("A", 140), ("B", 113), ("A", 127)), 2) /** * 用于将RDD中的元素序列化成对象,存储到文件中。 */ listRDD.saveAsObjectFile("E:\\yinzhengjie\\bigdata\\spark\\object") } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import scala.collection.Map object CountByKeyOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config: SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val listRDD: RDD[(String, Int)] = sc.parallelize(List(("A", 130), ("B", 121), ("A", 140), ("B", 113), ("A", 127)), 2) /** * 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每个key对应的元素个数。 */ val res:Map[String,Long] = listRDD.countByKey println(res) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object ForeachOperate { def main(args: Array[String]): Unit = { //建立SparkConf对象 val config: SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文对象 val sc = new SparkContext(config) val listRDD: RDD[Int] = sc.makeRDD(20 to 30,2) /** * 在数据集的每个元素上,运行函数func进行更新。 */ listRDD.foreach(println) } }
5>.RDD的函数传递
在实际开发中咱们每每须要本身定义一些对于RDD的操做,那么此时须要主要的是,初始化工做是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通讯,是须要序列化的。
接下来咱们看下面2个案例操做。
package com.yinzhengjie.bigdata.spark.rdd.functionTransfer import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import java.io.Serializable /** * 传递Search对象时,必须得先序列化后才能在网络传递,不然没法在Exector端进行反序列化。 * */ class Search(query:String) extends Serializable { //过滤出包含字符串的数据 def isMatch(s: String): Boolean = { s.contains(query) } //过滤出包含字符串的RDD def getMatch1 (rdd: RDD[String]): RDD[String] = { rdd.filter(isMatch) } //过滤出包含字符串的RDD def getMatche2(rdd: RDD[String]): RDD[String] = { rdd.filter(x => x.contains(query)) } } object SerializableableMethod { def main(args: Array[String]): Unit = { //1.初始化配置信息及SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]") val sc = new SparkContext(sparkConf) //2.建立一个RDD val rdd: RDD[String] = sc.parallelize(Array("Hadoop", "Spark", "Hive", "Storm")) //3.建立一个Search对象,该过程是在Driver端执行的 val search = new Search("S") //4.运用第一个过滤函数并打印结果,该过程是在Exector端执行的,所以须要将Driver端的Search对象传递给Exector,这意味着Search对象必须是序列化的,不然就会报错哟(Caused by: java.io.NotSerializableException: com.yinzhengjie.bigdata.spark.rdd.functionTransfer.Search) val match1: RDD[String] = search.getMatch1(rdd) match1.collect().foreach(println) //5.释放资源 sc.stop() } }
package com.yinzhengjie.bigdata.spark.rdd.functionTransfer import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD /** * 仅传递字符串时,无需进行序列化操做哟~ */ class Search(query:String) { //过滤出包含字符串的数据 def isMatch(s: String): Boolean = { s.contains(query) } //过滤出包含字符串的RDD def getMatch1 (rdd: RDD[String]): RDD[String] = { rdd.filter(isMatch) } //过滤出包含字符串的RDD def getMatche2(rdd: RDD[String]): RDD[String] = { // rdd.filter(x => x.contains(query)) val query_ : String = this.query //将类变量赋值给局部变量,该代码是在Driver端执行 rdd.filter(x => x.contains(query_)) //该代码在Exector端执行,所以query_这个成员变量属性须要传递过来,而query_自己就是字符串,所以无需序列化。 } } object SerializableableAttribute { def main(args: Array[String]): Unit = { //1.初始化配置信息及SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]") val sc = new SparkContext(sparkConf) //2.建立一个RDD val rdd: RDD[String] = sc.makeRDD(Array("Hadoop", "Spark", "Hive", "Storm")) //3.建立一个Search对象 val search = new Search("o") //4.运用第一个过滤函数并打印结果 val match1: RDD[String] = search.getMatche2(rdd) match1.collect().foreach(println) //5.释放资源 sc.stop() } }
四.RDD依赖关系
1>.Lineage(血统)
package com.yinzhengjie.bigdata.spark.dependent import org.apache.spark.rdd.RDD import org.apache.spark.{Dependency, SparkConf, SparkContext} /** * RDD只支持粗粒度转换,即在大量记录上执行的单个操做。将建立RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。 * * RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它能够根据这些信息来从新运算和恢复丢失的数据分区。 */ object Lineage { def main(args: Array[String]): Unit = { //1.初始化配置信息及SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]") val sc = new SparkContext(sparkConf) //2.建立一个RDD val listRDD: RDD[Int] = sc.parallelize(1 to 10) //3.将listRDD射成一个个元组 val mapRDD: RDD[(Int,Int)] = listRDD.map((_,1)) //4.统计每一种key对应的个数 val reduceRDD:RDD[(Int,Int)] = mapRDD.reduceByKey(_+_) /** * 5 >.查看reduceRDD的Lineage(血统) * * RDD和它依赖的父RDD(s)的关系有两种不一样的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。 * 窄依赖 * 窄依赖指的是每个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖咱们形象的比喻为独生子女。 * 宽依赖 * 宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引发shuffle操做,宽依赖咱们形象的比喻为超生。 */ val lineage:String = reduceRDD.toDebugString println(lineage) //6>.查看依赖类型 val dependencies:Seq[Dependency[_]] = reduceRDD.dependencies println(dependencies) //5.释放资源 sc.stop() } }
2>.DAG
DAG(Directed Acyclic Graph)叫作有向无环图,原始的RDD经过一系列的转换就就造成了DAG,根据RDD之间的依赖关系的不一样将DAG划分红不一样的Stage;
对于窄依赖,partition的转换处理在Stage中完成计算;
对于宽依赖,因为有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,所以宽依赖是划分Stage的依据;
以下图所示,是某个job分为3个阶段(stage),窄依赖能够放在同一个阶段(stage),而宽依赖因为shuffle的存在所以不能放在同一个阶段(state)中:
A和B:
groupBy操做是宽依赖,存在shuffle操做。
F和G:
join操做是宽依赖,存在shuffle操做。
B和G:
是窄依赖,觉得B的各个分区和G的分区惟一对应。
E和F,D和F,C和D:
map和union均没有shuffle操做,所以均是宅依赖,所以他们能够在同一个阶段(stage)。
舒适提示:
宽依赖有shufle操做,窄依赖没有shuffle操做,所以咱们能够将宅依赖放在同一个阶段执行,而宽依赖则须要分开不一样的阶段操做,由于宽依赖要作shuffle的前提是须要依赖上一个阶段的执行结果。
因为窄依赖不须要等待,就能够利用并行的概念来执行数据,从而提高效率。
3>.任务规划
RDD任务切分中间分为:Application、Job、Stage和Task。
Application:
初始化一个SparkContext即生成一个Application
Job:
一个Action算子就会生成一个Job
Stage:
根据RDD之间的依赖关系的不一样将Job划分红不一样的Stage,遇到一个宽依赖则划分一个Stage。
Task:
Stage是一个TaskSet,将Stage划分的结果发送到不一样的Executor执行即为一个Task。
舒适提示:
Application->Job->Stage-> Task每一层都是1对n的关系。
五.RDD缓存
1>.RDD缓存概述
RDD经过persist方法或cache方法能够将前面的计算结果缓存,默认状况下 persist()会把数据以序列化的形式缓存在JVM的堆空间中。
可是并非这两个方法被调用时当即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
经过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
以下图所示,在存储级别的末尾加上"_2"来把持久化数据存为两份。
缓存有可能丢失,或者存储存储于内存的数据因为内存不足而被删除,RDD的缓存容错机制保证了即便缓存丢失也能保证计算的正确执行。经过基于RDD的一系列转换,丢失的数据会被重算,因为RDD的各个Partition是相对独立的,所以只须要计算丢失的部分便可,并不须要重算所有Partition。
2>.缓存代码实现案例
package com.yinzhengjie.bigdata.spark.cache import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CacheDemo { def main(args: Array[String]): Unit = { //1.初始化配置信息及SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]") val sc = new SparkContext(sparkConf) //2.建立一个RDD val listRDD: RDD[String] = sc.parallelize(Array("yinzhengjie2020")) //3.将RDD转换为携带当前时间戳不作缓存 val nocache = listRDD.map(_.toString+System.currentTimeMillis) //4>.查看nocache的Lineage(血统)关系 System.out.println(nocache.toDebugString) //5>.屡次打印无缓存结果 nocache.collect.foreach(println) nocache.collect.foreach(println) nocache.collect.foreach(println) nocache.collect.foreach(println) nocache.collect.foreach(println) //6>.将RDD转换为携带当前时间戳并作缓存 val cache = listRDD.map(_.toString+System.currentTimeMillis).cache //7>.查看cache的Lineage(血统)关系 System.out.println(cache.toDebugString) //8>.屡次打印缓存结果 cache.collect.foreach(println) cache.collect.foreach(println) cache.collect.foreach(println) cache.collect.foreach(println) cache.collect.foreach(println) //9.释放资源 sc.stop() } }
六.RDD 检查点(CheckPoint)
1>.检查点概述
Spark中对于数据的保存除了持久化操做以外,还提供了一种检查点的机制,检查点(本质是经过将RDD写入Disk作检查点)是为了经过lineage作容错的辅助,lineage过长会形成容错成本太高,这样就不如在中间阶段作检查点容错,若是以后有节点出现问题而丢失分区,从作检查点的RDD开始重作Lineage,就会减小开销。
检查点经过将数据写入到HDFS文件系统实现了RDD的检查点功能。为当前RDD设置检查点,该函数将会建立一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。
在checkpoint的过程当中,该RDD的全部依赖于父RDD中的信息将所有被移除。对RDD进行checkpoint操做并不会立刻被执行,必须执行Action操做才能触发。
2>.检查点代码实现案例
package com.yinzhengjie.bigdata.spark.cache import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CheckpointDemo { def main(args: Array[String]): Unit = { //初始化配置信息及SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]") val sc = new SparkContext(sparkConf) //设置检查点的保存目录,实际工做中应该使用hdfs路径,本地目录通常用于测试使用 sc.setCheckpointDir("E:\\yinzhengjie\\bigdata\\spark\\checkpoint") //建立一个RDD val listRDD: RDD[String] = sc.parallelize(Array("yinzhengjie2020")) //将RDD转换为携带当前时间戳 val nocache = listRDD.map(_.toString+System.currentTimeMillis) //设置检查点,数据会被持久化到sc上定义的检查点保存目录 nocache.checkpoint() //使用行动算子屡次打印结果 nocache.collect().foreach(println) nocache.collect().foreach(println) nocache.collect().foreach(println) nocache.collect().foreach(println) nocache.collect().foreach(println) //查看Lineage(血统) println(nocache.toDebugString) //释放资源 sc.stop() } }