本文始发于我的公众号:TechFlow,原创不易,求个关注web
今天是spark第三篇文章,咱们继续来看RDD的一些操做。数组
咱们前文说道在spark当中RDD的操做能够分为两种,一种是转化操做(transformation),另外一种是行动操做(action)。在转化操做当中,spark不会为咱们计算结果,而是会生成一个新的RDD节点,记录下这个操做。只有在行动操做执行的时候,spark才会从头开始计算整个计算。缓存
而转化操做又能够进一步分为针对元素的转化操做以及针对集合的转化操做。框架
针对元素的转化操做很是经常使用,其中最经常使用的就是map和flatmap。从名字上看这二者都是map操做,map操做咱们都知道,在以前的MapReduce文章以及Python map、reduce用法的文章当中都有说起。简而言之就是能够将一个操做映射在每个元素上。编辑器
好比假设咱们有一个序列[1, 3, 4, 7],咱们但愿将当中每个元素执行平方操做。咱们固然能够用for循环执行,可是在spark当中更好的办法是使用map。分布式
nums = sc.parallelize([1, 3, 4, 7])
spuare = nums.map(lambda x: x * x)
咱们知道map是一个转化操做,因此square仍然是一个RDD,咱们直接将它输出不会获得结果,只会获得RDD的相关信息:函数
内部RDD的转化图是这样的:性能
咱们想看结果就必需要执行行动操做,好比take,咱们take一下查看一下结果:学习
和咱们的预期一致,对于以前一直关注的同窗来讲map操做应该已经很熟悉了,那么这个flatmap又是什么呢?优化
差异就在这个flat,咱们都知道flat是扁平的意思,因此flatmap就是说map执行以后的结果扁平化。说白了也就是说若是map执行以后的结果是一个数组的话,那么会将数组拆开,把里面的内容拿出来组合到一块儿。
咱们一块儿来看一个例子:
texts = sc.parallelize(['now test', 'spark rdd'])
split = texts.map(lambda x: x.split(' '))
因为咱们执行map的对象是一个字符串,一个字符串执行split操做以后会获得一个字符串数组。若是咱们执行map,获得的结果会是:
若是咱们执行flatmap呢?咱们也能够试一下:
对比一下,有没有注意到差异?
是了,map执行的结果是一个array的array,由于每个string split以后就是一个array,咱们把array拼接到一块儿天然是一个array的array。而flatMap会把这些array摊平以后放在一块儿,这也是二者最大的差异。
上面介绍了针对元素的转化操做,下面来看看针对集合的转化操做。
针对集合的操做大概有union,distinct,intersection和subtract这几种。咱们能够先看下下图有一个直观地感觉,以后咱们再一一分析:
首先来看distinct,这个顾名思义,就是去除重复。和SQL当中的distinct是同样的,这个操做的输入是两个集合RDD,执行以后会生成一个新的RDD,这个RDD当中的全部元素都是unique的。有一点须要注意,执行distinct的开销很大,由于它会执行shuffle操做将全部的数据进行乱序,以确保每一个元素只有一份。若是你不明白shuffle操做是什么意思,没有关系,咱们在后序的文章当中会着重讲解。只须要记住它的开销很大就好了。
第二种操做是union,这个也很好理解,就是把两个RDD当中的全部元素合并。你能够把它当成是Python list当中的extend操做,一样和extend同样,它并不会作重复元素的检测,因此若是合并的两个集合当中有相同的元素并不会被过滤,而是会被保留。
第三个操做是intersection,它的意思是交集,也就是两个集合重叠的部分。这个应该蛮好理解的,咱们看下下图:
下图当中蓝色的部分,也就是A和B两个集合的交集部分就是A.intersection(B)的结果,也就是两个集合当中共有的元素。一样,这个操做也会执行shuffle,因此开销同样很大,而且这个操做会去掉重复的元素。
最后一个是subtract,也就是差集,就是属于A不属于B的元素,一样咱们能够用图来表示:
上图当中灰色阴影部分就是A和B两个集合的差集,一样,这个操做也会执行shuffle,很是耗时。
除了以上几种以外,还有cartesian,即笛卡尔积,sample抽样等集合操做,不过相对而言用的稍微少一些,这里就不过多介绍了,感兴趣的同窗能够了解一下,也并不复杂。
RDD中最经常使用的行动操做应该就是获取结果的操做了,毕竟咱们算了半天就是为了拿结果,只获取RDD显然不是咱们的目的。获取结果的RDD主要是take,top和collect,这三种没什么特别的用法,简单介绍一下。
其中collect是获取全部结果,会返回全部的元素。take和top都须要传入一个参数指定条数,take是从RDD中返回指定条数的结果,top是从RDD中返回最前面的若干条结果,top和take的用法彻底同样,惟一的区别就是拿到的结果是不是最前面的。
除了这几个以外,还有一个很经常使用的action是count,这个应该也不用多说,计算数据条数的操做,count一下就能够知道有多少条数据了。
除了这些比较简单的以外,再介绍另外两个比较有意思的,首先,先来介绍reduce。reduce顾名思义就是MapReduce当中的reduce,它的用法和Python当中的reduce几乎彻底同样,它接受一个函数来进行合并操做。咱们来看个例子:
在这个例子当中,咱们的reduce函数是将两个int执行加和,reduce机制会重复执行这个操做将全部的数据合并,因此最终获得的结果就是1 + 3 + 4 + 7 = 15.
除了reduce以外还有一个叫作fold的action,它和reduce彻底同样,惟一不一样的是它能够自定义一个初始值,而且是针对分区的,咱们还拿上面的例子举例:
直接看这个例子可能有点懵逼,简单解释一下就明白了,其实不复杂。咱们注意到咱们在使用parallelize创造数据的时候多加了一个参数2,这个2表示分区数。简单能够理解成数组[1, 3, 4, 7]会被分红两部分,可是咱们直接collect的话仍是原值。
如今咱们使用fold,传入了两个参数,除了一个函数以外还传入了一个初始值2。因此整个计算过程是这样的:
对于第一个分区的答案是1 + 3 + 2 = 6,对于第二个分区的答案是4 + 7 + 2 = 13,最后将两个分区合并:6 + 13 + 2 = 21。
也就是说咱们对于每一个分区的结果赋予了一个起始值,而且对分区合并以后的结果又赋予了一个起始值。
老实讲这个action是最难理解的,由于它比较反常。首先,对于reduce和fold来讲都有一个要求就是返回值的类型必须和rdd的数据类型相同。好比数据的类型是int,那么返回的结果也要是int。
可是对于有些场景这个是不适用的,好比咱们想求平均,咱们须要知道term的和,也须要知道term出现的次数,因此咱们须要返回两个值。这个时候咱们初始化的值应该是0, 0,也就是对于加和与计数而言都是从0开始的,接着咱们须要传入两个函数,好比写成这样:
nums.aggregate((0, 0), lambda x, y: (x[0] + y, x[1] + 1), lambda x, y: (x[0] + y[0], x[1] + y[1]))
看到这行代码会懵逼是必然的,不用担忧,咱们一点一点解释。
首先是第一个lambda函数,这里的x不是一个值而是两个值,或者说是一个二元组,也就是咱们最后返回的结果,在咱们的返回预期里,第一个返回的数是nums的和,第二个返回的数是nums当中数的个数。而这里的y则是nums输入的结果,显然nums输入的结果只有一个int,因此这里的y是一维的。那么咱们要求和固然是用x[0] + y,也就是说把y的值加在第一维上,第二维天然是加一,由于咱们每读取一个数就应该加一。
这点还比较容易理解,第二个函数可能有些费劲,第二个函数和第一个不一样,它不是用在处理nums的数据的,而是用来处理分区的。当咱们执行aggregate的时候,spark并非单线程执行的,它会将nums中的数据拆分红许多分区,每一个分区获得结果以后须要合并,合并的时候会调用这个函数。
和第一个函数相似,第一个x是最终结果,而y则是其余分区运算结束须要合并进来的值。因此这里的y是二维的,第一维是某个分区的和,第二维是某个分区当中元素的数量,那么咱们固然要把它都加在x上。
上图展现了两个分区的时候的计算过程,其中lambda1就是咱们传入的第一个匿名函数,同理,lambda2就是咱们传入的第二个匿名函数。我想结合图应该很容易看明白。
行动操做除了这几个以外还有一些,因为篇幅缘由咱们先不赘述了,在后序的文章当中若是有出现,咱们会再进行详细解释的。初学者学习spark比较抗拒的一个主要缘由就是以为太过复杂,就连操做还区分什么转化操做和行动操做。其实这一切都是为了惰性求值从而优化性能。这样咱们就能够把若干个操做合并在一块儿执行,从而减小消耗的计算资源,对于分布式计算框架而言,性能是很是重要的指标,理解了这一点,spark为何会作出这样的设计也就很容易理解了。
不只spark如此,TensorFlow等深度学习框架也是如此,本质上许多看似反直觉的设计都是有更深层的缘由的,理解了以后其实也很容易猜到,凡是拿到最终结果的操做每每都是行动操做,若是只是一些计算,那么十有八九是转化操做。
Spark当中的RDD是惰性求值的,有的时候咱们会但愿屡次使用同一个RDD。若是咱们只是简单地调用行动操做,那么spark会屡次重复计算RDD和它对应的全部数据以及其余依赖,这显然会带来大量开销。咱们很天然地会但愿对于咱们常用的RDD能够缓存起来,在咱们须要的时候随时拿来用,而不是每次用到的时候都须要从新跑。
为了解决这个问题,spark当中提供了持久化的操做。所谓的持久化能够简单理解成缓存起来。用法也很简单,咱们只须要对RDD进行persist便可:
texts = sc.parallelize(['now test', 'hello world'])
split = texts.split(lambda x: x.split(' '))
split.persist()
调用完持久化以后,RDD会被缓存进内存或磁盘当中,咱们须要的时候能够随时调出来使用,就不用把前面的整个流程所有跑一遍了。而且spark当中支持多种级别的持久化操做,咱们能够经过StorageLevel的变量来控制。咱们来看下这个StorageLevel的取值:
咱们根据须要选择对应的缓存级别便可。固然既然有持久化天然就有反持久化,对于一些已经再也不须要缓存的RDD,咱们能够调用unpersist将它们从缓存当中去除。
今天的内容虽然看起来各类操做五花八门,可是有些并非常常用到,咱们只须要大概有个印象,具体操做的细节能够等用到的时候再作仔细的研究。但愿你们都能忽略这些并不重要的细节,抓住核心的本质。
今天的文章就是这些,若是以为有所收获,请顺手点个关注或者转发吧,大家的举手之劳对我来讲很重要。