Spark中的键值对操做-scala

1.PairRDD介绍
    Spark为包含键值对类型的RDD提供了一些专有的操做。这些RDD被称为PairRDD。PairRDD提供了并行操做各个键或跨节点从新进行数据分组的操做接口。例如,PairRDD提供了reduceByKey()方法,能够分别规约每一个键对应的数据,还有join()方法,能够把两个RDD中键相同的元素组合在一块儿,合并为一个RDD。
2.建立Pair RDD
    程序示例:对一个英语单词组成的文本行,提取其中的第一个单词做为key,将整个句子做为value,创建 PairRDD
val rdd=sc.parallelize(List("this is a test","how are you","do you love me","can you tell me"));
//获取第一个单词做为键
val words =rdd.map(x=>(x.split(" ")(0),x));
words.collect().foreach(println);
输出结果:
(this,this is a test)
(how,how are you)
(do,do you love me)
(can,can you tell me)

3.PairRDD的转化操做
    PairRDD可使用全部标准RDD上可用的转化操做。传递函数的规则也适用于PairRDD。因为PairRDD中包含二元组,因此须要传递的函数应当操做而元素而不是独立的元素。
                                       PairRDD的相关转化操做以下表所示
针对两个PairRDD的转化操做 rdd={(1,2),(3,4),(3,6)} other={(3,9)}
函数名 目的 示例 结果
substractByKey 删掉RDD中键与other RDD
中的键相同的元素
rdd.subtractByKey(other) {(1,2)}
join 对两个RDD进行内链接
rdd.join(other) {(3,(4,9)),(3,(6,9))}
rightOuterJoin 对两个RDD进行链接操做,右外链接 rdd.rightOuterJoin(other) {(3,(4,9)),(3,(6,9))}
leftOuterJoin 对两个RDD进行链接操做,左外链接 rdd.rightOuterJoin(other) {(1,(2,None)),(3,(4,9)),(3,(6,9))}
cogroup 将两个RDD中拥有相同键的数据分组 rdd.cogroup(other) {1,([2],[]),(3,[4,6],[9])}
程序实例:
针对2 中程序生成的PairRDD,删选掉长度超过20个字符的行。
val results=words.filter(value => value._2.length()<20);
results.foreach(println)
    RDD上有fold(),combine(),reduce()等行动操做,pair RDD上则有相应的针对键的转化操做。
    (1)reduceByKey()与reduce()操做相似,它们都接收一个函数,并使用该函数对值进行合并。reduceByKey()会为数据集中的每一个键进行并行的规约操做,每一个规约操做会将键相同的值合并起来。reduceBykey()最终返回一个由各键规约出来的结果值组成的新的RDD。
程序示例:用reduceByKey实现单词计数
val rdd=sc.parallelize(List("this is a test","how are you","do you love me","can you tell me"));
val words =rdd.flatMap(line => line.split(" "));
val results=words.map(word => (word,1)).reduceByKey( {case(x,y) => x+y});
results.foreach(println)
输出:
(are,1)
(this,1)
(is,1)
(you,3)
(can,1)
(a,1)
(love,1)
(do,1)
(how,1)
(tell,1)
(me,2)
(test,1)

  (2)foldByKey()与fold()操做相似,他们都使用一个与RDD和合并函数中的数据类型相同的零值做为初始值。与fold()同样,foldByKey()操做所使用的合并函数对零值与另外一个元素进行合并,结果仍为该元素。
    程序示例:求对应key的value之和
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8)));
val results=nums.foldByKey(0)({case(x,y)=>x+y})
results.collect().foreach(println)
结果:
(1,4)
(2,10)
(3)
    combineByKey()是最为经常使用的基于键进行聚合的函数。大多数基于键聚合的函数都是用它实现的。和aggregate()同样,combineByKey()可让用户返回与输入数据类型不一样的返回值。combineByKey()会遍历分区中的全部元素,所以,每一个元素的键要么还么有遇到过,要么就和以前的某个元素的键相同。若是这是一个新的元素,combineByKey()会使用一个叫作 createCombiner()的函数来建立那个键对应的累加器的初始值。须要注意的是,这一过程会在每一个分区中第一次出现每一个键时发生,而不是在整个RDD中第一次出现一个键时发生。
    若是这是一个处理当前分区以前就已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。
    因为每一个分区都是独立处理的,所以对于同一个键能够有多个累加器。若是有两个或者更多的分区都有对应一个键的累加器,就须要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。
     如下程序示例使用combineBykey()求每一个键对应的平均值。
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8)));
val results=nums.combineByKey(
(v)=>(v,1),
(acc:(Int,Int),v) =>(acc._1+v,acc._2+1),
(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)
).map{case(key,value)=>(key,value._1/value._2.toFloat)}
results.collectAsMap().map(println)
结果:
(2,5.0)
(1,2.0)
成功求出每一个key对应value对应的平均值
*(4)并行度调优
    每一个RDD都有固定数目的分区,分区数决定了在RDD上执行操做时的并行度。
    在执行聚合或者分组操做时,能够要求Spark使用给定的分区数。Spark始终尝试根据集群的大小推断出一个有意义的默认值,可是你能够经过对并行度进行调优来得到更好的性能表现。
    在Scala中,combineByKey()函数和reduceByKey()函数的最后一个可选的参数用于指定分区的数目,即numPartitions,使用以下:
val results=nums.reduceByKey({(x,y) =>x+y},2);
5.数据分组
(1)groupByKey()
    groupByKey()会使用RDD中的键来对数据进行分组。对于一个由类型K的键和类型V的值组成的RDD,获得的RDD类型会是[K,Iterable[v]]。
    如下是程序示例,对PairRDD调用groupByKey()函数以后,返回的RDD类型是RDD [K,Iterable[v]]
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8)));
val group=nums.groupByKey();
val results=group.collect();
for(value <- results){
print(value._1+": ")
for(elem <- value._2)
print(elem+" ")
println()

}
输出结果:
1: 1 3 
2: 2 8 
(2)cogroup()
    除了对单个RDD的数据进行分组,还可使用cogroup()函数对对个共享同一个键的RDD进行分组。对两个键的类型均为K而值得类型分别为V和W的RDD进行cogroup()时,获得结果的RDD类型为[(K,(Iterable[V],Iterable[W]))]。若是其中一个RDD对于另外一个RDD中存在的某个键没有对应的记录,那么对应的迭代器则为空。
举例:
val nums1 = sc.parallelize(List(Tuple2(1, 1), Tuple2(2, 2), Tuple2(1, 3),Tuple2(2, 4),Tuple2(3, 4)));
val nums2 = sc.parallelize(List(Tuple2(1,1),Tuple2(1,3),Tuple2(2,3)))
val results=nums1.cogroup(nums2)
for(tuple2 <- results.collect()){
print(tuple2._1+" [ ")
for(it <- tuple2._2._1)
print(it+" ")
print("] [ ")
for(it<-tuple2._2._2)
print(it+" ")
println("]")
}
输出:
1 [ 1 3 ] [ 1 3 ]
3 [ 4 ] [ ]
2 [ 2 4 ] [ 3 ]
6.数据排序
在Scala中以字符串顺序对正数进行自定义排序
(1)对RDD进行排序:
val nums =sc.parallelize(List(12,4,6,8,0,8));
//隐式转换声明排序的依据
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(x: Int, y: Int): Int = x.toString().compareTo(y.toString())
}
val results=nums.sortBy(value=>value);
results.collect().foreach(println)
(2)对PairRDD,按key的值进行排序
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(2, 2), Tuple2(1, 3),Tuple2(2, 4),Tuple2(3, 4)));
//隐式转换声明排序的依据
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(x: Int, y: Int): Int = x.toString().compareTo(y.toString())
}
val results=nums.sortByKey();
results.collect().foreach(println)
7.数据分区
(1)建立数据分区
    在分布式程序中,通讯的代价很大,控制数据分布以得到最少的网络传输能够极大地提高总体性能。Spark程序能够经过控制RDD分区的方式来减小通讯消耗。只有当数据集屡次在诸如链接这种基于键的操做中,分区才会有做用
    Spark中全部的键值对RDD均可以进行分区。系统会根据一个针对键的函数对元素进行分组。Spark能够确保同一组的键出如今一个节点上。
    举个简单的例子,应用以下:内存中保存着很大的用户信息表,由(UserID,UserInfo[])组成的RDD,UserInfo是用户所订阅的全部主题列表。该应用会周期性地将这张表和一个小文件进行组合,这个小文件中存这过去5分钟发生的时间,其实就是一系列(UserId,LinkInfo)RDD,其中LinkInfo是用户访问的连接的主题。咱们须要对用户访问其未订阅主题的页面状况进行统计。咱们可使用Spark的join()操做进行组合操做。将二者根据UserId链接以后,过滤出不在UserInfo[]中的LinkInfo,就是用户访问其未订阅主题的状况。
val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book")))
val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book"))
val userData =sc.parallelize(list1)
val events = sc.parallelize(list2)
val joined=userData.join(events)
val results=joined.filter({
case (id, (info, link)) =>
!info.contains(link)
}
).count()
println(results)
输出:1
    这段代码能够正确运行,可是效率不高。由于每5分钟就要进行一次join()操做,而咱们对数据集如何分区却一无所知。默认状况下,链接操做会将两个数据集中的全部键的哈希值都求出来,将该哈希值相同的记录经过网络传到同一台机器上,而后在那台机器上对全部键相同的记录进行链接操做。由于userData表比每5分钟出现的访问日志表events要大不少,因此要浪费时间进行额外的工做:在每次调用时都对userDAta表进行哈希值计算和跨节点数据混洗,虽然这些数据历来不会变化。
    要解决此问题:在程序开始的时候,对userData表进行partitionBy()转化操做,将这张表转化为哈希分区。能够经过向patitionBy传递一个spark.HashPartitioner对象来实现该操做。
    scala自定义分区方式:
val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book")))
val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book"))
val userData =sc.parallelize(list1).partitionBy(new HashPartitioner(100)).persist(StorageLevel.MEMORY_ONLY)
    这样之后在调用join()时,Spark就知道了该RDD是根据键的哈希值来分区的,这样在调用join()时,Spark就会利用这一点,只会对events进行数据混洗操做,将events中特定userId的记录发送到userData的对应分区所在的那台机器上。这样,须要网络传输的数据就大大减少了,程序运行的速度也显著提升。
    请注意,咱们还对userData 这个RDD进行了持久化操做默认状况下,每个由转化操做获得的RDD都会在每次执行启动操做时从新计算生成,将userData持久化以后,就能保证userData可以在访问时被快速获取。
    *进一步解释数据分区带来的好处:
    若是没有将partitionBy()转化操做的结果进行持久化,那么后面每次用到这个RDD时都会重复对数据进行分区操做。不进行持久化会致使整个RDD谱系图从新求值。那样的话,partitionBy()带来的好处就会抵消,致使重复对数据进行分区以及跨节点的混洗,和没有指定分区方式时发生的状况是十分类似的。
(2)获取数据分区的方式
接(1)中程序:
val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book")))
val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book"))
val userData =sc.parallelize(list1).partitionBy(new HashPartitioner(100)).persist(StorageLevel.MEMORY_ONLY)
println(userData.partitioner)
  RDD的属性partitioner就是存储了对应的分区方式
(3)从分区中获益的操做
    Spark中的不少操做都引入了根据键跨结点进行混洗的过程。全部这些操做都会从数据分区中获益。可以从数据分区中获益的操做有:groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),以及lockup()。
    对于像reduceByKey()这样只做用于单个RDD的操做,运行在未分区的RDD的时候或致使每一个键全部对应值都在每台机器上进行本地计算,只须要把本地最终归约出的结果值从各工做节点传回主节点,因此本来的网络开销就不太大。而对于诸如cogroup()和join()这样的二元操做,预先进行数据分区会致使其中至少一个RDD(使用已知分区器的那个RDD)不发生数据混洗。若是两个RDD使用一样的分区方式,而且它们还缓存在一样的机器上(好比一个RDD是经过mapValues()从另外一个RDD中建立出来的,这两个RDD就会拥有相同的键和分区方式),或者其中一个RDD尚未计算出来,那么跨节点数据混洗就不会发生了。
(4)影响分区方式的操做
    全部会为生成的结果RDD设好分区方式的操做:cogroup(),groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),partitionBy(),sort(),mapValues()(若是父RDD有分区方式的话),filter()(若是父RDD有分区方式的话)。其余全部操做生成的结果都不会存在特定的分区方式。
注意:     
    对于二元操做,输出数据的分区方式取决于父RDD的分区方式。默认状况下,结果会采用哈希分区,分区的数量和操做的并行度是同样的。若是其中一个父RDD已经设置过度区方式,那么结果就会采用那种分区方式;若是两个父RDD都设置过度区方式,结果RDD会采用第一个RDD的分区方式。
8.示例程序-PageRank
     PageRank算法是一种从RDD分区中获益的更复杂的算法,咱们以它为例进行分析。PageRank算法用来根据外部文档指向一个文档的连接,对集合中每一个文档的重要程度赋一个度量值。该算法能够用于对网页进行排序,固然,也能够用于排序科技文章或社交网络中有影响的用户。
    算法会维护两个数据集,一个由(pageID,linklist[])组成,包含每一个页面的连接到的页面的列表;另外一个由(pageID,rank)元素组成,包含每一个页面的当前排序值。它按如下步骤进行计算:
     ① 将每一个页面的排序值初始化为1.0
          ②在每次迭代中,向每一个有直接连接的页面,发送一个值为rank(p)/numNeighbors(p)(出链数目)   的贡献量
        ③将每一个页面的排序值设置为0.15+0.85*contributionsReceived
           最后两步会重复几个循环,在此过程当中,算法会逐渐收敛于每一个页面的实际PageRank值。在实际操做中,收敛一般须要进行十个迭代。
下面用Scala来实现PageRank算法:
/*
#如下是url的内容:
www.baidu.com www.hao123.com
www.baidu.com www.2345.com
www.baidu.com www.zhouyang.com
www.hao123.com www.baidu.com
www.hao123.com www.zhouyang.com
www.zhouyang.com www.baidu.com
*/
val inputs =sc.textFile("C:\\url.txt")
//url,[urls]
val links =inputs.map(x=>(x.split(" ")(0),x.split(" ")(1)))
.distinct()
.groupByKey()
.cache()
//url,rank
var ranks =links.mapValues(value =>1.0)
for(i<-0 until 10){

val contribs =links.join(ranks).flatMap({
case(pageid,(links,rank))=>
//url Double
links.map(dest=>(dest,rank/links.size))
})
//reduce and add the contribs
ranks=contribs.reduceByKey((x,y)=>x+y).mapValues(v => 0.15+0.85*v)
}
ranks.collect().foreach(println)
结果:
(www.hao123.com,0.3685546839262259)
(www.baidu.com,0.761571325242544)
(www.2345.com,0.3685546839262259)
(www.zhouyang.com,0.5269013026650011)
9.Scala设置自定义分区方式
    Spark容许你经过自定义Partitioner对象来控制RDD的分区方式,这样可让你利用领域知识进一步减小通讯消耗。
    举个例子,假设咱们要在一个网页的集合上运行前一届中的PageRank算法。在这里,每一个页面的ID是页面的URL。当咱们使用简单的哈希函数进行分区时,拥有类似的URL的页面好比 http://www.baidu.com/news 与 http://www.baidu.com/map 可能被分在彻底不一样的节点上。可是咱们知道,同一个域名下的网页更有可能相互链接。因为PageRank须要在每次迭代中从每一个页面向它全部相邻的页面发送一条消息,因袭把这些页面分组在同一个分区中会更好。可使用自定义的分区器来实现仅根据域名而不是整个URL进行分区。
    要实现先自定义Partitioner,须要继承Partitioner类并实现其下述方法:
    override def numPartitions: Int = ???
    返回建立的分区数量
    override def getPartition(key: Any): Int = ???
    返回给定键的数量
          override def equals(other:Any):Boolean = ???
    Spark须要这个方法来检查分区器对象是否与其余分区器实例相同,这样Spark才能判断两个RDD的分区方式是否相同。

class DomainNamePartitioner (numParts:Int) extends Partitioner{
override def numPartitions: Int = numParts
//根据hashCodenumPartitions取余来获得Partition,由于返回的必须是非负数,因此对于hashCode为负的状况作了特殊处理
override def getPartition(key: Any): Int = {
val domain = new URL(key.toString).getHost();
val code=(domain.hashCode%numPartitions)
if(code<0){
code+numPartitions
}else{
code
}
}

override def equals(other:Any):Boolean = other
match {
//这个实例是DomainNamePartitioner的实例,而且numPartitions相同,返回true
case dnp:DomainNamePartitioner =>
dnp.numPartitions==numPartitions
//不然,返回false
case _ => false
}
}





















相关文章
相关标签/搜索