本篇接着讲解RDD的API,讲解那些不是很容易理解的API,同时本篇文章还将展现如何将外部的函数引入到RDD的API里使用,最后经过对RDD的API深刻学习,咱们还讲讲一些和RDD开发相关的scala语法。算法
1) aggregate(zeroValue)(seqOp,combOp)数组
该函数的功能和reduce函数同样,也是对数据进行聚合操做,不过aggregate能够返回和原RDD不一样的数据类型,使用时候还要提供初始值。数据结构
咱们来看看下面的用法,代码以下:函数
val rddInt: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5), 1) val rddAggr1: (Int, Int) = rddInt.aggregate((0, 0))((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) println("====aggregate 1====:" + rddAggr1.toString()) // (15,5)
该方法是将有数字组成的RDD的数值进行求和,同时还要统计元素的个数,这样咱们就能够计算出一个平均值,这点在实际运算中是很是有用的。学习
假如读者不太懂scala语言,或者就算懂那么一点点scala语法,该API的使用仍是让人很难理解的,这个x是什么东西,这个y又是什么东西,为何它们放在一块儿这么运算就能够获得预期结果呢?spa
其实aggregate方法使用了scala里元组的结构,元组是scala里很具特点的数据结构,咱们看看下面的代码:scala
val tuple2Param1:Tuple2[String,Int] = Tuple2("x01",12)// 标准定义二元组 val tuple2Param2:(String,Int) = ("x02",29)// 字面量定义二元组 /* 结果: x01:12*/ println("====tuple2Param1====:" + tuple2Param1._1 + ":" + tuple2Param1._2) /* 结果: x02:29 */ println("====tuple2Param2====:" + tuple2Param2._1 + ":" + tuple2Param2._2) val tuple6Param1:Tuple6[String,Int,Int,Int,Int,String] = Tuple6("xx01",1,2,3,4,"x1x")// 标准定义6元组 val tuple6Param2:(String,Int,Int,Int,Int,String) = ("xx02",1,2,3,4,"x2x")// 字面量定义6元组 /* 结果: xx01:1:2:3:4:x1x */ println("====tuple6Param1====:" + tuple6Param1._1 + ":" + tuple6Param1._2 + ":" + tuple6Param1._3 + ":" + tuple6Param1._4 + ":" + tuple6Param1._5 + ":" + tuple6Param1._6) /* 结果: xx02:1:2:3:4:x2x */ println("====tuple6Param2====:" + tuple6Param2._1 + ":" + tuple6Param2._2 + ":" + tuple6Param2._3 + ":" + tuple6Param2._4 + ":" + tuple6Param2._5 + ":" + tuple6Param2._6)
元组在scala里使用Tuple来构造,不过实际运用中咱们会给Tuple带上数字后缀,例如Tuple2就是二元组它包含两个元素,Tuple6是6元组它包含6个元素,元组看起来很像数组,可是数组只能存储相同数据类型的数据结构,而元组是能够存储不一样数据类型的数据结构,元组里元素访问使用_1,_2这样的形式,第一个元素是从1开始标记的,这点和数组是不一样的。实际使用中咱们不多使用Tuple构造元组,而是使用字面量定义方式(参见代码注释),由此咱们能够看出spark里键值对RDD其实就是使用二元组来表示键值对数据结构,回到aggregate方法,它的运算也是经过二元组这种数据结构完成的。blog
下面咱们来看看aggregate的运算过程,这里我将aggregate方法里的算子都使用外部函数,代码以下所示:开发
def aggrFtnOne(par: ((Int, Int), Int)): (Int, Int) = { /* *aggregate的初始值为(0,0): ====aggrFtnOne Param===:((0,0),1) ====aggrFtnOne Param===:((1,1),2) ====aggrFtnOne Param===:((3,2),3) ====aggrFtnOne Param===:((6,3),4) ====aggrFtnOne Param===:((10,4),5)*/ /* *aggregate的初始值为(1,1): ====aggrFtnOne Param===:((1,1),1) ====aggrFtnOne Param===:((2,2),2) ====aggrFtnOne Param===:((4,3),3) ====aggrFtnOne Param===:((7,4),4) ====aggrFtnOne Param===:((11,5),5) * */ println("====aggrFtnOne Param===:" + par.toString()) val ret: (Int, Int) = (par._1._1 + par._2, par._1._2 + 1) ret } def aggrFtnTwo(par: ((Int, Int), (Int, Int))): (Int, Int) = { /*aggregate的初始值为(0,0):::::((0,0),(15,5))*/ /*aggregate的初始值为(1,1):::::((1,1),(16,6))*/ println("====aggrFtnTwo Param===:" + par.toString()) val ret: (Int, Int) = (par._1._1 + par._2._1, par._1._2 + par._2._2) ret } val rddAggr2: (Int, Int) = rddInt.aggregate((0, 0))((x, y) => aggrFtnOne(x, y), (x, y) => aggrFtnTwo(x, y)) // 参数能够省略元组的括号 println("====aggregate 2====:" + rddAggr2.toString()) // (15,5) val rddAggr3: (Int, Int) = rddInt.aggregate((1, 1))((x, y) => aggrFtnOne((x, y)), (x, y) => aggrFtnTwo((x, y))) // 参数使用元组的括号 println("====aggregate 3====:" + rddAggr3.toString()) // (17,7)
由以上代码咱们就能够清晰看到aggregate方法的实际运算过程了。spark
aggrFtnOne方法的参数格式是((Int, Int), Int),这个复合二元组里第二个元素才是实际的值,而第一个元素就是咱们给出的初始化值,第一个元素里的第一个值就是咱们实际求和的值,里面第二个元素就是累计记录元素个数的值。
aggrFtnTwo方法的参数里的二元组第一个元素仍是初始化值,第二个元素则是aggrFtnOne计算的结果,这样咱们就能够计算出咱们要的结果。
做为对比我将初始化参数改成(1,1)二元组,最终结果在求和运算以及计算元素个数上都会加2,这是由于初始化值两次参入求和所致的,由上面代码咱们能够很清晰的看到缘由所在。
若是咱们想要结果二元组里第一个元素求积那么初始化值就不能是(0,0),而应该是(1,0),理解了原理咱们就很清晰知道初始值该如何设定了,具体代码以下:
val rddAggr4: (Int, Int) = rddInt.aggregate((1, 0))((x, y) => (x._1 * y, x._2 + 1), (x, y) => (x._1 * y._1, x._2 + y._2)) println("====aggregate 4====:" + rddAggr4.toString()) // (120,5)
2) fold(zero)(func)
该函数和reduce函数功能同样,只不过使用时候须要加上初始化值。
代码以下所示:
def foldFtn(par: (Int, Int)): Int = { /*fold初始值为0: =====foldFtn Param====:(0,1) =====foldFtn Param====:(1,2) =====foldFtn Param====:(3,3) =====foldFtn Param====:(6,4) =====foldFtn Param====:(10,5) =====foldFtn Param====:(0,15) * */ /* * fold初始值为1: =====foldFtn Param====:(1,1) =====foldFtn Param====:(2,2) =====foldFtn Param====:(4,3) =====foldFtn Param====:(7,4) =====foldFtn Param====:(11,5) =====foldFtn Param====:(1,16) * */ println("=====foldFtn Param====:" + par.toString()) val ret: Int = par._1 + par._2 ret } val rddFold2: Int = rddInt.fold(0)((x, y) => foldFtn(x, y)) // 参数能够省略元组的括号 println("====fold 2=====:" + rddFold2) // 15 val rddFold3: Int = rddInt.fold(1)((x, y) => foldFtn((x, y))) // 参数使用元组的括号 println("====fold 3====:" + rddFold3) // 17
咱们发现当初始化值为1时候,求和增长的不是1而是2,缘由就是fold计算时候为了凑齐一个完整的二元组,在第一个元素计算以及最后一个元素计算时候都会让初始化值凑数组成二元组,所以初始值会被使用两遍求和,所以实际结果就不是增长1,而是增长2了。
做为对比咱们看看reduce实际运算过程,代码以下:
def reduceFtn(par:(Int,Int)):Int = { /* * ======reduceFtn Param=====:1:2 ======reduceFtn Param=====:3:3 ======reduceFtn Param=====:6:4 ======reduceFtn Param=====:10:5 */ println("======reduceFtn Param=====:" + par._1 + ":" + par._2) par._1 + par._2 } val rddReduce1:Int = rddInt.reduce((x,y) => x + y) println("====rddReduce 1====:" + rddReduce1)// 15 val rddReduce2:Int = rddInt.reduce((x,y) => reduceFtn(x,y)) println("====rddReduce 2====:" + rddReduce2)// 15
3) combineByKey[C](createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C): RDD[(String, C)]
combineByKey做用是使用不一样的返回类型合并具备相同键的值,combineByKey适用键值对RDD,普通RDD是没有这个方法。
有上面定义咱们看到combineByKey会通过三轮运算,前一个运算步骤结果就是下一个运算步骤的参数,咱们看下面的代码:
def combineFtnOne(par:Int):(Int,Int) = { /* * ====combineFtnOne Param====:2 ====combineFtnOne Param====:5 ====combineFtnOne Param====:8 ====combineFtnOne Param====:3 */ println("====combineFtnOne Param====:" + par) val ret:(Int,Int) = (par,1) ret } def combineFtnTwo(par:((Int,Int),Int)):(Int,Int) = { /* ====combineFtnTwo Param====:((2,1),12) ====combineFtnTwo Param====:((8,1),9) * */ println("====combineFtnTwo Param====:" + par.toString()) val ret:(Int,Int) = (par._1._1 + par._2,par._1._2 + 1) ret } def combineFtnThree(par:((Int,Int),(Int,Int))):(Int,Int) = { /* * 无结果打印 */ println("@@@@@@@@@@@@@@@@@@") println("====combineFtnThree Param===:" + par.toString()) val ret:(Int,Int) = (par._1._1 + par._2._1,par._1._2 + par._2._2) ret } val rddPair: RDD[(String, Int)] = sc.parallelize(List(("x01", 2), ("x02", 5), ("x03", 8), ("x04", 3), ("x01", 12), ("x03", 9)), 1) /* def combineByKey[C](createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C): RDD[(String, C)] */ val rddCombine1:RDD[(String,(Int,Int))] = rddPair.combineByKey(x => (x, 1), (com: (Int, Int), x) => (com._1 + x, com._2 + 1), (com1: (Int, Int), com2: (Int, Int)) => (com1._1 + com2._1, com1._2 + com2._2)) println("====combineByKey 1====:" + rddCombine1.collect().mkString(",")) // (x02,(5,1)),(x03,(17,2)),(x01,(14,2)),(x04,(3,1)) val rddCombine2:RDD[(String,(Int,Int))] = rddPair.combineByKey(x => combineFtnOne(x), (com: (Int, Int), x) => combineFtnTwo(com,x), (com1: (Int, Int), com2: (Int, Int)) => combineFtnThree(com1,com2)) println("=====combineByKey 2====:" + rddCombine2.collect().mkString(",")) // (x02,(5,1)),(x03,(17,2)),(x01,(14,2)),(x04,(3,1))
这个算法和上面aggregate求和方法很像,不过combineByKey很奇怪,它第三个算子彷佛并无被执行,第二个算子打印的信息也不齐备,不过我认为它们都执行了,只不过有些语句没有打印出来,至于缘由为什么,我之后再研究下吧。
本篇就写到这里吧,其他内容我在下篇里讲解了。