spark中的pair rdd,看这一篇就够了

本文始发于我的公众号:TechFlow,原创不易,求个关注web


今天是spark专题的第四篇文章,咱们一块儿来看下Pair RDD。api

定义

在以前的文章当中,咱们已经熟悉了RDD的相关概念,也了解了RDD基本的转化操做和行动操做。今天咱们来看一下RDD当中很是常见的PairRDD,也叫作键值对RDD,能够理解成KVRDD。数组

KV很好理解,就是key和value的组合,好比Python当中的dict或者是C++以及Java当中的map中的基本元素都是键值对。相比于以前基本的RDD,pariRDD能够支持更多的操做,相对来讲更加灵活,能够完成更加复杂的功能。好比咱们能够根据key进行聚合,或者是计算交集等。网络

因此自己pairRDD只不过是数据类型是KV结构的RDD而已,并无太多的内涵,你们不须要担忧。app

Pair RDD转化操做

Pair RDD也是RDD,因此以前介绍的RDD的转化操做Pair RDD天然也可使用。它们二者有些像是类继承的关系,RDD是父类,Pair RDD是实现了一些新特性的子类。子类能够调用父类当中全部的方法,可是父类却不能调用子类中的方法。编辑器

调用的时候须要注意,因为咱们的Pair RDD中的数据格式是KV的二元组,因此咱们传入的函数必须是针对二元组数据的,否则的话可能运算的结果会有问题。下面咱们来列举一些最经常使用的转化操做。函数

为了方便演示,咱们用一个固定的RDD来运行各类转化操做,来直观了解一下这些转化操做究竟起什么样的做用。spa

ex1 = sc.parallelize([[12], [34], [35]])

keys,values和sortByKey

这三个转化操做应该是最经常使用也是最简单的,简单到咱们经过字面意思就能够猜出它们的意思。3d

咱们先来看keys和values:code

咱们的RDD当中二元组当中的第一个元素会被当作key,第二个元素当作value,须要注意的是,它并非一个map或者是dict,因此key和value都是能够重复的

sortByKey也很直观,咱们从字面意思就看得出来是对RDD当中的数据根据key值进行排序,一样,咱们也来看下结果:

mapValues和flatMapValues

mapValues不能直接使用,而必需要传入一个函数做为参数。它的意思是对全部的value执行这个函数,好比咱们想把全部的value所有转变成字符串,咱们能够这么操做:

flatMapValues的操做和咱们的认知有些相反,咱们都知道flatMap操做是能够将一个嵌套的数组打散,可是咱们怎么对一个value打散嵌套呢?毕竟咱们的value不必定就是一个数组,这就要说到咱们传入的函数了,这个flatMap的操做实际上是针对函数返回的结果的,也就是说函数会返回一个迭代器,而后打散的内容实际上是这个迭代器当中的值。

我这么表述可能有些枯燥,咱们来看一个例子就明白了:

不知道这个结果有没有出乎你们的意料,它的整个流程是这样的,咱们调用flatMapValues运算以后返回一个迭代器,迭代器的内容是range(x, x+3)。实际上是每个key对应一个这样的迭代器,以后再将迭代器当中的内容打散,和key构成新的pair。

groupByKey,reduceByKey和foldByKey

这两个功能也比较接近,咱们先说第一个,若是学过SQL的同窗对于group by操做的含义应该很是熟悉。若是没有了解过也没有关系,group by能够简单理解成归并或者是分桶。也就是说将key值相同的value归并到一块儿,获得的结果是key-list的Pair RDD,也就是咱们把key值相同的value放在了一个list当中。

咱们也来看下例子:

咱们调用完groupby以后获得的结果是一个对象,因此须要调用一下mapValues将它转成list才可使用,不然的话是不能使用collect获取的。

reduceByKey和groupByKey相似,只不过groupByKey只是归并到一块儿,然而reduceByKey是传入reduce函数,执行reduce以后的结果。咱们来看一个例子:

在这个例子当中咱们执行了累加,把key值相同的value加在了一块儿。

foldByKey和fold的用法差异并不大,惟一不一样的是咱们加上了根据key值聚合的逻辑。若是咱们把分区的初始值设置成0的话,那么它用起来和reduceByKey几乎没有区别:

咱们只须要清楚foldByKey当中的初始值针对的是分区便可。

combineByKey

这个也是一个很核心而且不太容易理解的转化操做,咱们先来看它的参数,它一共接受5个参数。咱们一个一个来讲,首先是第一个参数,是createCombiner

它的做用是初始化,将value根据咱们的须要作初始化,好比将string类型的转化成int,或者是其余的操做。咱们用记号能够写成是V => C,这里的V就是value,C是咱们初始化以后的新值。

它会和value一块儿被当成新的pair传入第二个函数,因此第二个函数的接受参数是(C, V)的二元组。咱们要作的是定义这个二元组的合并,因此第二个函数能够写成(C, V) => C。源码里的注释和网上的教程都是这么写的,但我以为因为出现了两个C,可能会让人难以理解,我以为能够写成(C, V) => D,比较好。

最后一个函数是将D进行合并,因此它能够写成是(D, D) => D。

到这里咱们看似好像明白了它的原理,可是又好像有不少问号,总以为哪里有些不太对劲。我想了好久,才找到了问题的根源,出在哪里呢,在于合并。有没有发现第二个函数和第三个函数都是用来合并的,为何咱们要合并两次,它们之间的区别是什么?若是这个问题没搞明白,那么对于它的使用必定是错误的,我我的以为这个问题才是这个转化操做的核心,没讲清楚这个问题的博客都是不够清楚的。

其实这两次合并的逻辑大同小异,可是合并的范围不同,第一次合并是针对分区的,第二次合并是针对key的。由于在spark当中数据可能不止存放在一个分区内,因此咱们要合并两次,第一次先将分区内部的数据整合在一块儿,第二次再跨分区合并。因为不一样分区的数据可能相隔很远,因此会致使网络传输的时间过长,因此咱们但愿传输的数据尽可能小,这才有了groupby两次的缘由。

咱们再来看一个例子:

在这个例子当中咱们计算了每一个单词出现的平均个数,咱们一点一点来看。首先,咱们第一个函数将value转化成了(1, value)的元组,元组的第0号元素表示出现该单词的文档数,第1号元素表示文档内出现的次数。因此第二个函数,也就是在分组内聚合的函数,咱们对于出现的文档数只须要加一便可,对于出现的次数要进行累加。由于这一次聚合的对象都是(1, value)类型的元素,也就是没有聚合以前的结果。

在第三个函数当中,咱们对于出现的总数也进行累加,是由于这一个函数处理的结果是各个分区已经聚合一次的结果了。好比apple在一个分区内出如今了两个文档内,一共出现了20次,在一个分区出如今了三个文档中,一共出现了30次,那么显然咱们一共出如今了5个文档中,一共出现了50次。

因为咱们要计算平均,因此咱们要用出现的总次数除以出现的文档数。最后通过map以后因为咱们获得的仍是一个二元组,咱们不能直接collect,须要用collectAsMap。

咱们把上面这个例子用图来展现,会很容易理解:

链接操做

在spark当中,除了基础的转化操做以外,spark还提供了额外的链接操做给pair RDD。经过链接,咱们能够很方便地像是操做集合同样操做RDD。操做的方法也很是简单,和SQL当中操做数据表的形式很像,就是join操做。join操做又能够分为join(inner join)、left join和right join。

若是你熟悉SQL的话,想必这三者的区别应该很是清楚,它和SQL当中的join是同样的。若是不熟悉也没有关系,解释起来并不复杂。在join的时候咱们每每是用一张表去join另一张表,就好像两个数相减,咱们用一个数减去另一个数同样。好比A.join(B),咱们把A叫作左表,B叫作右表。所谓的join,就是把两张表当中某一个字段或者是某些字段值相同的行链接在一块儿。

好比一张表是学生表,一张表是出勤表。咱们两张表用学生的id一关联,就获得了学生的出勤记录。可是既然是集合关联,就会出现数据关联不上的状况。好比某个学生没有出勤,或者是出勤表里记错了学生id。对于数据关联不上的状况,咱们的处理方式有四种。第一种是全都丢弃,关联不上的数据就不要了。第二种是所有保留,关联不上的字段就记为NULL。第三种是左表关联不上的保留,右表丢弃。第四种是右表保留,左表丢弃。

下图展现了这四种join,很是形象。

咱们看几个实际的例子来体会一下。

首先建立数据集:

ex1 = sc.parallelize([['frank'30], ['bob'9], ['silly'3]])
ex2 = sc.parallelize([['frank'80], ['bob'12], ['marry'22], ['frank'21], ['bob'22]])

接着,咱们分别运行这四种join,观察一下join以后的结果。

从结果当中咱们能够看到,若是两个数据集当中都存在多条key值相同的数据,spark会将它们两两相乘匹配在一块儿。

行动操做

最后,咱们看下pair RDD的行动操做。pair RDD一样是rdd,因此普通rdd适用的行动操做,一样适用于pair rdd。可是除此以外,spark还为它开发了独有的行动操做。

countByKey

countByKey这个操做顾名思义就是根据Key值计算每一个Key值出现的条数,它等价于count groupby的SQL语句。咱们来看个具体的例子:

collectAsMap

这个也很好理解,其实就是讲最后的结果以map的形式输出

从返回的结果能够看到,输出的是一个dict类型。也就是Python当中的"map"。

lookup

这个单词看起来比较少见,其实它表明的是根据key值查找对应的value的意思。也就是经常使用的get函数,咱们传入一个key值,会自动返回key值对应的全部的value。若是有多个value,则会返回list。

总结

到这里,全部的pair RDD相关的操做就算是介绍完了。pair rdd在咱们平常的使用当中出现的频率很是高,利用它能够很是方便地实现一些比较复杂的操做。

另外,今天的这篇文章内容很多,想要彻底吃透,须要一点功夫。这不是看一篇文章就能够实现的,可是也没有关系,咱们初学的时候只须要对这些api和使用方法有一个大概的印象便可,具体的使用细节能够等用到的时候再去查阅相关的资料。

今天的文章就是这些,若是以为有所收获,请顺手点个关注或者转发吧,大家的举手之劳对我来讲很重要。

相关文章
相关标签/搜索