Spark编程进阶篇html
做者:尹正杰java
版权声明:原创做品,谢绝转载!不然将追究法律责任。算法
一.spark三大数据结构apache
Spark有三大数据结构,分别为RDD,广播变量和累加器。 RDD: RDD全称为"Resilient Distributed Dataset",叫作弹性分布式数据集,是Spark中最基本的数据抽象。 简单的理解RDD就是包含数据的计算。不少时候数据须要必定的访问规则,这个时候使用RDD来作就不太合适啦。 广播变量: 此处能够简单理解为分布式只读共享变量。 累加器: 此处能够简单理解为分布式只写共享变量。 博主推荐阅读: https://www.cnblogs.com/yinzhengjie2020/p/13155362.html
二.广播变量编程
1>.广播变量概述数据结构
广播变量用来高效分发较大的对象。向全部工做节点发送一个较大的只读值,以供一个或多个Spark操做使用。
好比,若是你的应用须要向全部节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。 在多个并行操做中使用同一个变量,可是 Spark会为每一个任务分别发送。 使用广播变量的过程以下: (1)经过对一个类型 T 的对象调用 SparkContext.broadcast 建立出一个 Broadcast[T] 对象。 任何可序列化的类型均可以这么实现。 (2)经过 value 属性访问该对象的值(在 Java 中为 value() 方法)。 (3)变量只会被发到各个节点一次,应做为只读值处理(修改这个值不会影响到别的节点)。
2>.广播变量应用案例闭包
package com.yinzhengjie.bigdata.spark.rdd import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object BroadcastVariable { def main(args: Array[String]): Unit = { //建立spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD") //建立SparkContext val sc = new SparkContext(sparkConf) val listRDD:RDD[(Int,String)] = sc.parallelize(List((1,"A"),(2,"B"),(3,"C"))) /** * 若是想要将2个RDD基于Key进行关联,使用join算子(设计到笛卡尔积,即会增长数据)是能够实现的; * * 但效率较低,由于使用join算子有一个弊端就是涉及到shuffle过程 */ // val listRDD2:RDD[(Int,Int)] = sc.makeRDD(List((1,1),(2,2),(3,3))) // val joinRDD:RDD[(Int,(String,Int))] = listRDD.join(listRDD2) // joinRDD.foreach(println) val list = List((1,1),(2,2),(3,3)) //将list变量构建为广播变量(使用广播变量减小数据的传输,即无需每一个Executor都传输一个list变量,而是spark集群共享同一个只读变量) val boradcast:Broadcast[List[(Int,Int)]] = sc.broadcast(list) /** * 使用map算子来替代join算子的好处就是map不涉及到shuffle过程,所以咱们说广播变量是一种调优策略。 */ val mapRDD:RDD[(Int,(String,Any))] = listRDD.map{ case (key,value) => { var v2:Any = null //使用广播变量 for (t <- boradcast.value){ if (key == t._1){ v2 = t._2 } } (key,(value,v2)) } } mapRDD.foreach(println) //释放资源 sc.stop() } }
三.累加器机器学习
1>.累加器概述分布式
累加器用来对信息进行聚合,一般在向 Spark传递函数时,好比使用map()函数或者用filter()传条件时,可使用驱动器程序中定义的变量,可是集群中运行的每一个任务都会获得这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。 若是咱们想实现全部分片处理时更新共享变量的功能,那么累加器能够实现咱们想要的效果。 累加器的用法以下所示。 经过在驱动器中调用SparkContext.accumulator(initialValue)方法,建立出存有初始值的累加器; 返回值为 org.apache.spark.Accumulator[T] 对象,其中 T 是初始值 initialValue 的类型; Spark闭包里的执行器代码可使用累加器的"+=”方法(在Java中是add)增长累加器的值; 驱动器程序能够调用累加器的value属性(在Java中使用value()或setValue())来访问累加器的值。 自定义累加器 自定义累加器类型的功能在1.X版本中就已经提供了,可是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,并且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。 实现自定义类型累加器须要继承AccumulatorV2并至少覆写相应的方法,累加器能够用于在程序运行过程当中收集一些文本类信息,最终以Set[String]的形式返回。 舒适提示: 工做节点上的任务不能访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。 对于要在行动操做中使用的累加器,Spark只会把每一个任务对各累加器的修改应用一次。所以,若是想要一个不管在失败仍是重复计算时都绝对可靠的累加器,咱们必须把它放在foreach()这样的行动操做中。转化操做中累加器可能会发生不止一次更新。
2>.系统累加器 ide
package com.yinzhengjie.bigdata.spark.rdd import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} object ShareData { def main(args: Array[String]): Unit = { //建立spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD") //建立SparkContext val sc = new SparkContext(sparkConf) val dataRDD:RDD[Int] = sc.parallelize(List(10,20,30,40,60,70,80,90),3) /** * 使用reduce算子计算dataRDD各个元素之和,其工做原理以下: * 1>.将Driver中的dataRDD中3个不一样分区数据发送给不一样的Executor; * 2>.各个Executor计算相应分区的数据之和; * 3>.最后,每一个Executor的数据再相加获得最终的数据。 */ // val sum1 = dataRDD.reduce(_+_) // println("sum1 = " + sum1) /** * 咱们使用foreach算子直接遍历dataRDD中的每一个元素使他们相加,但打印sum2的结果为"0",其缘由以下: * 1>.将Driver中的dataRDD中3个不一样分区数据发送给不一样的Executor; * 2>.sum2属于Driver中的变量,所以他要序列化sum2变量并其传递给各个Executor; * 3>.各个Executor使用Driver传递过来的变量进行计算,最终各个Exector算出sum2的结果,但此时各个Executor并无将计算结果发送给Driver; * * 综上所述,各个Executor均有对应的sum2数据,但并无将计算结果发送给Driver进行累加,所以咱们看到的sum2始终是"0". * */ // var sum2:Int = 0 // dataRDD.foreach(data => sum2 += data) // println("sum2 = " + sum2) /** * 使用累加器来共享变量,用来累加数据,其工做原理以下: * 1>.将Driver中的dataRDD中3个不一样分区数据发送给不一样的Executor; * 2>.sum3属于Driver中的变量,所以他要序列化sum3变量并其传递给各个Executor; * 3>.各个Executor使用Driver传递过来的变量进行计算,最终各个Executor算出sum3的结果; * 4>.最后spark发现sum3是一个累加器(Accumulator)变量,所以会将各个Executor的sum3的结果返回给Driver,由Driver将最终结果进行累加。 */ val sum3:LongAccumulator = sc.longAccumulator //建立累加器对象sum3 dataRDD.foreach{ case data => { sum3.add(data) //将每个元素和累加器相加 } } println("sum3 = " + sum3.value) //释放资源 sc.stop() } }
3>.自定义累加器
package com.yinzhengjie.bigdata.spark.rdd import java.util import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.util.AccumulatorV2 /** * 自定义累加器步骤以下: * 1>.继承AccumulatorV2; * 2>.实现抽象方法; * 3>.建立累加器 */ class WordAccumulator extends AccumulatorV2[String, util.ArrayList[String]] { //定义一个集合,用于返回结果 val list = new util.ArrayList[String]() //当前的累加器是否为初始化状态 override def isZero: Boolean = { list.isEmpty } //复制累加器对象,根据你得需求自行实现 override def copy(): AccumulatorV2[String, util.ArrayList[String]] = { new WordAccumulator() } //重置累加器对象 override def reset(): Unit = { list.clear() } //往累加器中增长数据 override def add(v: String): Unit = { if (v.contains("o")){ list.add(v) //判断字符串是否包含字母"o",若是为真则写入累加器 } } //合并累加器,即Driverd端将各个Executor返回的list集合的数据进行累加操做 override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = { list.addAll(other.value) } //获取累加器的结果,咱们直接将定义的list集合返回 override def value: util.ArrayList[String] = { list } } object CustomAccumulator { def main(args: Array[String]): Unit = { //建立spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD") //建立SparkContext val sc = new SparkContext(sparkConf) val dataRDD:RDD[String] = sc.makeRDD(Array("Hadoop","Spark","Flume","HBase","Sqoop","Flink","Storm","Hive"),3) //建立我们自定义的累加器 val wordAccumulator = new WordAccumulator //注册自定义累加器,目的是让Spark知晓 sc.register(wordAccumulator) //使用我们的累加器变量 dataRDD.foreach{ case data => { wordAccumulator.add(data) //将每个元素和累加器相加 } } //获取累加器的值 println("words = " + wordAccumulator.value) //释放资源 sc.stop() } }