spark——spark中常说RDD,究竟RDD是什么?



259e891dcc7e1c344a74fed69b2f6e64.jpeg



今天是spark专题第二篇文章,咱们来看spark很是重要的一个概念——RDD。分布式

在上一讲当中咱们在本地安装好了spark,虽然咱们只有local一个集群,可是仍然不妨碍咱们进行实验。spark最大的特色就是不管集群的资源如何,进行计算的代码都是同样的,spark会自动为咱们作分布式调度工做ide


RDD概念


介绍spark离不开RDD,RDD是其中很重要的一个部分。可是不少初学者每每都不清楚RDD到底是什么,我本身也是同样,我在系统学习spark以前代码写了一堆,可是对于RDD等概念仍然云里雾里。函数

RDD的英文全名是Resilient Distributed Dataset,我把英文写出来就清楚了不少。即便第一个单词不认识,至少也能够知道它是一个分布式的数据集。第一个单词是弹性的意思,因此直译就是弹性分布式数据集。虽然咱们仍是不够清楚,可是已经比只知道RDD这个概念清楚多了,学习

RDD是一个不可变的分布式对象集合,每一个RDD都被分为多个分区,这些分区运行在集群的不一样节点上。spa

不少资料里只有这么一句粗浅的解释,看起来讲了不少,可是咱们都get不到。细想有不少疑问,最后我在大神的博客里找到了详细的解释,这位大神翻了spark的源码,找到了其中RDD的定义,一个RDD当中包含如下内容:3d

  • A list of partitions调试

  • A function for computing each splitorm

  • A list of dependencies on other RDDs对象

  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)blog

  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

咱们一条一条来看:

  1. 它是一组分区,分区是spark中数据集的最小单位。也就是说spark当中数据是以分区为单位存储的,不一样的分区被存储在不一样的节点上。这也是分布式计算的基础。

  2. 一个应用在各个分区上的计算任务。在spark当中数据和执行的操做是分开的,而且spark基于懒计算的机制,也就是在真正触发计算的行动操做出现以前,spark会存储起来对哪些数据执行哪些计算。数据和计算之间的映射关系就存储在RDD中。

  3. RDD之间的依赖关系,RDD之间存在转化关系,一个RDD能够经过转化操做转化成其余RDD,这些转化操做都会被记录下来。当部分数据丢失的时候,spark能够经过记录的依赖关系从新计算丢失部分的数据,而不是从新计算全部数据。

  4. 一个分区的方法,也就是计算分区的函数。spark当中支持基于hash的hash分区方法和基于范围的range分区方法。

  5. 一个列表,存储的是存储每一个分区的优先存储的位置。

经过以上五点,咱们能够看出spark一个重要的理念。即移动数据不如移动计算,也就是说在spark运行调度的时候,会倾向于将计算分发到节点,而不是将节点的数据搜集起来计算。RDD正是基于这一理念而生的,它作的也正是这样的事情。


建立RDD


spark中提供了两种方式来建立RDD,一种是读取外部的数据集,另外一种是将一个已经存储在内存当中的集合进行并行化

咱们一个一个来看,最简单的方式固然是并行化,由于这不须要外部的数据集,能够很轻易地作到。

在此以前,咱们先来看一下SparkContext的概念,SparkContext是整个spark的入口,至关于程序的main函数。在咱们启动spark的时候,spark已经为咱们建立好了一个SparkContext的实例,命名为sc,咱们能够直接访问到。

ffb1df5021d08ba4d14efed17b0b7aca.jpeg

咱们要建立RDD也须要基于sc进行,好比下面我要建立一个有字符串构成的RDD:

texts = sc.parallelize(['now test', 'spark rdd'])

返回的texts就是一个RDD:

40d1a47060623557e621ad112d982d75.jpeg

除了parallelize以外呢,咱们还能够从外部数据生成RDD,好比我想从一个文件读入,可使用sc当中的textFile方法获取:

text = sc.textFile('/path/path/data.txt')

通常来讲,除了本地调试咱们不多会用parallelize进行建立RDD,由于这须要咱们先把数据读取在内存。因为内存的限制,使得咱们很难将spark的能力发挥出来。


转化操做和行动操做


刚才咱们在介绍RDD的时候其实提到过,RDD支持两种操做,一种叫作转化操做(transformation)一种叫作行动操做(action)。

顾名思义,执行转化操做的时候,spark会将一个RDD转化成另外一个RDD。RDD中会将咱们此次转化的内容记录下来,可是不会进行运算。因此咱们获得的仍然是一个RDD而不是执行的结果。

好比咱们建立了texts的RDD以后,咱们想要对其中的内容进行过滤,只保留长度超过8的,咱们能够用filter进行转化:

textAfterFilter = texts.filter(lambda x: len(x) > 8)

咱们调用以后获得的也是一个RDD,就像咱们刚才说的同样,因为filter是一个转化操做,因此spark只会记录下它的内容,并不会真正执行。

转化操做能够操做任意数量的RDD,好比若是我执行以下操做,会一共获得4个RDD:

 
 

inputRDD = sc.textFile('path/path/log.txt')
lengthRDD = inputRDD.filter(lambda x: len(x) > 10)
errorRDD = inputRDD.filter(lambda x: 'error' in x)
unionRDD = errorRDD.union(lengthRDD)

最后的union会将两个RDD的结果组合在一块儿,若是咱们执行完上述代码以后,spark会记录下这些RDD的依赖信息,咱们把这个依赖信息画出来,就成了一张依赖图:

9731c8c422c53443dc20359f21670c0f.jpeg

不管咱们执行多少次转化操做,spark都不会真正执行其中的操做,只有当咱们执行行动操做时,记录下来的转化操做才会真正投入运算。像是first(),take(),count()等都是行动操做,这时候spark就会给咱们返回计算结果了。

695d3be67a0a2901f440ff4149e47817.jpeg

其中first的用处是返回第一个结果,take须要传入一个参数,指定返回的结果条数,count则是计算结果的数量。和咱们预期的同样,当咱们执行了这些操做以后,spark为咱们返回告终果。

本文着重讲的是RDD的概念,咱们下篇文章还会着重对转化操做和行动操做进行深刻解读。感兴趣的同窗不妨期待一下吧~

相关文章
相关标签/搜索